pytorch 數(shù)據(jù)預(yù)加載的實(shí)現(xiàn)示例
1. Abstract
本文介紹一個(gè)工具 PreDataLoader
,它包裝 torch.utils.data.DataLoader
,接收該類的一個(gè)實(shí)例 loader
,啟動(dòng)一個(gè)線程 t
,創(chuàng)建一個(gè)隊(duì)列 q
,t
將 loader
中的數(shù)據(jù)預(yù)加載到隊(duì)列 q
中, 以在模型計(jì)算時(shí)也能啟動(dòng)啟動(dòng)數(shù)據(jù)加載程序, 節(jié)省數(shù)據(jù)加載時(shí)間。代碼:
class PreDataLoader(object): """ @Author: Yuwei from https://www.zhihu.com/people/aewil-zheng, with few changes ** 包裝 torch.utils.data.DataLoader, 接收該類的一個(gè)實(shí)例 loader, 啟動(dòng)一個(gè)線程 t, 創(chuàng)建一個(gè)隊(duì)列 q t 將 loader 中的數(shù)據(jù)預(yù)加載到隊(duì)列 q 中, 以在模型計(jì)算時(shí)也能啟動(dòng)啟動(dòng)數(shù)據(jù)加載程序, 節(jié)省數(shù)據(jù)加載時(shí)間 ** 若提供了 cuda device, 數(shù)據(jù)將直接被加載到 GPU 上 """ def __init__(self, loader, device=None, queue_size=2): """ :param loader: torch.utils.data.DataLoader :param device: torch.device('cuda' or 'cpu'), to use cpu, set None :param queue_size: the number of samples to be preloaded """ self.__loader = loader self.__device = device self.__queue_size = queue_size self.__load_stream = torch.cuda.Stream(device=device) \ if str(device).startswith('cuda') else None # 如果提供了 cuda device, 則創(chuàng)建 cuda 流 self.__queue = Queue(maxsize=self.__queue_size) self.__idx = 0 self.__worker = Thread(target=self._load_loop) self.__worker.setDaemon(True) self.__worker.start() def _load_loop(self): """ 不斷的將數(shù)據(jù)加載到隊(duì)列里 """ if str(self.__device).startswith('cuda'): logging.info(f'>>> data will be preloaded into device \'{self.__device}\'') logging.info(f'>>> this may cost more GPU memory!!!') # The loop that will load into the queue in the background torch.cuda.set_device(self.__device) while True: for sample in self.__loader: self.__queue.put(self._load_instance(sample)) else: while True: for sample in self.__loader: self.__queue.put(sample) def _load_instance(self, sample): """ 將 batch 數(shù)據(jù)從 CPU 加載到 GPU 中 """ if torch.is_tensor(sample): with torch.cuda.stream(self.__load_stream): return sample.to(self.__device, non_blocking=True) elif sample is None or type(sample) == str: return sample elif isinstance(sample, dict): return {k: self._load_instance(v) for k, v in sample.items()} else: return [self._load_instance(s) for s in sample] def __iter__(self): self.__idx = 0 return self def __next__(self): # 加載線程掛了 if not self.__worker.is_alive() and self.__queue.empty(): self.__idx = 0 self.__queue.join() self.__worker.join() raise StopIteration # 一個(gè) epoch 加載完了 elif self.__idx >= len(self.__loader): self.__idx = 0 raise StopIteration # 下一個(gè) batch else: out = self.__queue.get() self.__queue.task_done() self.__idx += 1 return out def next(self): return self.__next__() def __len__(self): return len(self.__loader) @property def sampler(self): return self.__loader.sampler @property def dataset(self): return self.__loader.dataset
如果你對實(shí)現(xiàn)技術(shù)細(xì)節(jié)不感興趣,也可直接拿來用。后面我將對相關(guān)細(xì)節(jié)展開討論,包括:
- python 中的并發(fā)與并行;
- cuda 流:
torch.cuda.Stream(device=device)
;
2. python 中的并發(fā)與并行
總所周知,由于 Global Interpreter Lock (GIL) 的存在,Python 語言中,任何時(shí)間點(diǎn)只有一個(gè)線程在執(zhí)行,即便在多核 CPU 上,Python 的多線程也無法實(shí)現(xiàn)真正的并行計(jì)算。
GIL 的原因在于 Python 的內(nèi)存管理并不是線程安全的。為了防止多個(gè)線程同時(shí)操作一個(gè)對象,造成數(shù)據(jù)混亂的問題,Python 設(shè)定了 GIL 來限制多線程的并發(fā)執(zhí)行。因此,盡管你可以在 Python 中創(chuàng)建多線程,并且看起來他們是同時(shí)運(yùn)行的,但實(shí)質(zhì)上,在任一時(shí)刻,只有一個(gè)線程在執(zhí)行。
既然如此,上面代碼使用多線程是如何提高程序的效率的?再看:
然而,如果你的程序是 IO 密集型的,例如大量的網(wǎng)絡(luò)請求或文件讀寫操作,那么使用多線程還是能顯著提高程序的效率的,因?yàn)樵诘却?IO 的過程中,其他線程還可以繼續(xù)執(zhí)行。
數(shù)據(jù)的預(yù)加載應(yīng)該算是 IO 吧,那模型計(jì)算和數(shù)據(jù)加載能并行嗎?
2.1 Numpy 和 PyTorch 底層計(jì)算是多線程并行的
Numpy 的底層實(shí)現(xiàn)是 C 語言,計(jì)算速度和并發(fā)性遠(yuǎn)勝于 Python,當(dāng)我們使用 numpy 進(jìn)行計(jì)算時(shí),特別是復(fù)雜的矩陣運(yùn)算,Python 程序會(huì)把這個(gè)任務(wù)拋給底層的 C 語言進(jìn)行計(jì)算,從而能夠使用 CPU 多核。驗(yàn)證:
import time import numpy as np def dot(): start = time.time() a = np.random.randn(10000, 10000) b = np.random.randn(10000, 10000) np.dot(a, b) end = time.time() print(end - start) dot()
驗(yàn)證代碼用 numpy.dot()
計(jì)算兩個(gè) 10000 10000 10000 維的矩陣乘法,觀察 CPU 的使用效率(i5-10400,6核心12線程),發(fā)現(xiàn) CPU 使用率很快從不足 20% 提升至 80% 左右。計(jì)算時(shí)間約為 15s
。
為了確定是否真的使用了多核,再設(shè)計(jì)一個(gè) Python 計(jì)算程序:
import time def add(): cnt = 1 start = time.time() for i in range(500000000): # 累加 cnt += 1 end = time.time() print(end - start) add()
五億次加法運(yùn)算,耗時(shí)約 20s
,CPU 使用率全程維持在 20% 以下。如此說來,numpy 確實(shí)是在使用多核并行計(jì)算。
下面看一看 Python 多線程能不能使它們并行計(jì)算:
import threading import time import numpy as np def dot(): start = time.time() a = np.random.randn(10000, 10000) b = np.random.randn(10000, 10000) np.dot(a, b) end = time.time() print(end - start) def add(): cnt = 1 start = time.time() for i in range(500000000): cnt += 1 end = time.time() print(end - start) t = threading.Thread(target=dot) s = time.time() add() t.start() t1.join() e = time.time() print(e - s)
輸出:
15.057043313980103
23.129913806915283
23.13091516494751
如果說整個(gè)程序只能同時(shí)使用一個(gè) CPU 核,那么整體計(jì)算時(shí)間應(yīng)該是兩部分計(jì)算時(shí)間的和 35s
左右,但這里只用了 23s
,可見 numpy 底層并行計(jì)算是實(shí)錘了。而且,這兩個(gè)函數(shù)的計(jì)算是并行的,即 np.dot()
在計(jì)算的時(shí)候,add()
也在計(jì)算。為什么 add
計(jì)算相比其單獨(dú)運(yùn)行時(shí)多了 3s
?而 np.dot()
計(jì)算時(shí)間基本沒變?
可以排除 CPU 資源不夠的可能,否則的話,np.dot()
的計(jì)算時(shí)間也要加長;再者我觀察了 CPU 利用率,全程未達(dá)到 100%。我覺得這是線程切換的開銷,add()
可能不是一直在運(yùn)行的,多個(gè) Python 線程還是只能使用一個(gè) CPU 核,線程之間交替執(zhí)行,只不過 np.dot()
線程在離開后,底層運(yùn)行還在繼續(xù),而 add()
線程離開后,其不再運(yùn)行。即:有那么 3s
時(shí)間,add()
沒運(yùn)行,“單核 CPU” 轉(zhuǎn)向了線程 np.dot()
檢查計(jì)算結(jié)果是否已返回。
再增加一個(gè) numpy 計(jì)算任務(wù)線程:
... t1 = threading.Thread(target=dot) t2 = threading.Thread(target=dot) s = time.time() add() t1.start() t2.start() t1.join() t2.join() e = time.time() print(e - s)
輸出:
25.624603986740112
27.81219220161438
30.751672983169556
30.752644538879395
時(shí)間增加了不少,基本快趕上計(jì)算一次 dot()
時(shí)間的兩倍了。這大概是由于 CPU 的計(jì)算達(dá)到了極限:
CPU 利用率長時(shí)間維持在 100%。
以上驗(yàn)證對于 PyTorch 也是一樣的。
結(jié)論:numpy 和 pytorch 的計(jì)算不受 GIL 的限制,可以使用 CPU 多核;一個(gè)線程中,numpy 和 pytorch 將計(jì)算丟給底層的 C/C++ 語言后,“等待計(jì)算結(jié)果”類似于 IO,會(huì)釋放 GIL 鎖,而計(jì)算還在繼續(xù),其他 python 線程可以得到執(zhí)行。
推論:使用 GPU 計(jì)算是同樣的道理,python 程序?qū)⒂?jì)算丟給 GPU 后,等待計(jì)算結(jié)果,當(dāng)前線程阻塞,釋放 GIL 鎖,其他 python 線程得以執(zhí)行,從而提高計(jì)算效率。
3. torch.cuda.Stream(device=device)
torch.cuda.Stream
是 PyTorch 庫中的一個(gè)類,用于管理 GPU 上的異步操作。
在 GPU 上執(zhí)行計(jì)算任務(wù)時(shí),通??梢允褂枚鄠€(gè)流(stream)來并行執(zhí)行不同的操作。每個(gè)流都有自己的命令隊(duì)列,可以獨(dú)立地執(zhí)行操作,從而提高計(jì)算效率。torch.cuda.Stream
就是用來創(chuàng)建和管理這些流的。
使用 torch.cuda.Stream
,可以將一系列 GPU 操作放入一個(gè)流中,并且可以通過調(diào)用流的 synchronize()
方法來等待流中所有操作完成。這對于需要處理多個(gè) GPU 操作的情況非常有用。
以下是一個(gè)使用 torch.cuda.Stream
的示例代碼:
import torch stream = torch.cuda.Stream() # 創(chuàng)建流對象 with torch.cuda.stream(stream): # 在流中執(zhí)行操作 # 執(zhí)行GPU操作 # ... stream.synchronize() # 等待流中操作完成
在上述示例中,我們首先創(chuàng)建了一個(gè) torch.cuda.Stream
對象 stream
。然后,我們使用 with
語句塊將一些 GPU 操作放入流中執(zhí)行。最后,我們調(diào)用 stream.synchronize()
來等待流中的操作完成。
通過使用 torch.cuda.Stream
,我們可以更靈活地控制 GPU 操作的執(zhí)行順序和并行性,以優(yōu)化計(jì)算性能。
以上是 GPT3.5 給出的關(guān)于 torch.cuda.Stream
的簡介。另外,還可參考教程《如何在 Pytorch 中使用 CUDA 流(CUDA stream)》 講的不錯(cuò)。我現(xiàn)在將其搬過來:
什么是 CUDA 流(CUDA stream)?
CUDA 流是一種在 GPU 上并行執(zhí)行操作的機(jī)制。在默認(rèn)情況下,PyTorch 會(huì)在默認(rèn)的流上執(zhí)行所有的操作,即在主流(default stream)上進(jìn)行。但是,當(dāng)我們有一些可以并行執(zhí)行的操作時(shí),通過將這些操作分配到不同的流上,我們可以在 GPU 上更有效地利用計(jì)算資源。
第一句就強(qiáng)調(diào):并行執(zhí)行操作的機(jī)制。
如何創(chuàng)建 CUDA 流?
可以通過 torch.cuda.Stream()
函數(shù)來創(chuàng)建 CUDA 流:
stream = torch.cuda.Stream()
使用 torch.cuda.Stream()
函數(shù)創(chuàng)建了一個(gè)名為 stream
的 CUDA 流。
如何使用 CUDA 流?
通過 with
上下文管理操作,并使用 stream.synchronize()
方法等待操作完成:
import torch # 創(chuàng)建兩個(gè)CUDA流 stream1 = torch.cuda.Stream() stream2 = torch.cuda.Stream() # 分別將操作記錄到兩個(gè)流上 with torch.cuda.stream(stream1): # 執(zhí)行操作1 # ... with torch.cuda.stream(stream2): # 執(zhí)行操作2 # ... # 等待兩個(gè)流上的操作完成 torch.cuda.synchronize(stream1) torch.cuda.synchronize(stream2)
我們創(chuàng)建了兩個(gè) CUDA 流 stream1
和 stream2
。然后,在兩個(gè)流上分別記錄操作,并使用torch.cuda.synchronize()
方法等待這些操作完成。
如何利用 CUDA 流提高性能?
一種常見的用法是將計(jì)算和數(shù)據(jù)傳輸操作分配到不同的流上,從而實(shí)現(xiàn)計(jì)算和數(shù)據(jù)傳輸?shù)牟⑿袌?zhí)行。
3.1 對 PreDataLoader 中 CUDA 流的解釋
with torch.cuda.stream(self.__load_stream): return sample.to(self.__device, non_blocking=True)
這一句 sample.to(self.__device, non_blocking=True)
算是數(shù)據(jù)傳輸吧,它處在一個(gè)數(shù)據(jù)預(yù)加載線程中,想要與模型計(jì)算并行。那么按照上面的教程:一個(gè) CUDA 流中的操作是順序執(zhí)行的,模型計(jì)算使用的是默認(rèn)流(default stream),平時(shí)我們的代碼 sample.to(device)
也使用了默認(rèn)流,這意味著數(shù)據(jù)的傳輸和模型計(jì)算是串行的。
所以,PreDataLoader
中定義了一個(gè)新的 CUDA 流,把 sample.to(self.__device, non_blocking=True)
放入這個(gè)新 CUDA 流,就可以和模型計(jì)算并行了。
4. @property
@property
是一個(gè)裝飾器,用于將類的方法轉(zhuǎn)換為屬性。通過使用 @property
,您可以定義一個(gè)方法,并將其作為實(shí)例的屬性來訪問,而不需要使用函數(shù)調(diào)用的語法。
下面是一個(gè)示例,說明如何使用 @property 裝飾器:
class Circle: def __init__(self, radius): self.radius = radius @property def diameter(self): return 2 * self.radius @diameter.setter def diameter(self, value): self.radius = value / 2 # 創(chuàng)建 Circle 對象 circle = Circle(5) # 訪問 diameter 屬性(實(shí)際上是調(diào)用了 diameter 方法) print(circle.diameter) # 輸出:10 # 設(shè)置 diameter 屬性(實(shí)際上是調(diào)用了 diameter.setter 方法) circle.diameter = 14 print(circle.radius) # 輸出:7
在上面的示例中,Circle
類定義了一個(gè) radius
實(shí)例變量和一個(gè) diameter
方法(被 @property
裝飾)。當(dāng)我們像訪問屬性一樣訪問 circle.diameter
時(shí),實(shí)際上是調(diào)用了 diameter
方法并返回其結(jié)果。
此外,我們還可以使用 @property
創(chuàng)建一個(gè) setter
方法,用于設(shè)置屬性的值。在示例中,diameter
屬性的 setter
方法名為 diameter.setter
,它接受一個(gè)參數(shù) value
,我們可以在 setter
方法中對 self.radius
進(jìn)行更新。
總結(jié):使用 @property
裝飾器可以將一個(gè)方法定義為屬性,并提供更加方便和易讀的方式來訪問和設(shè)置屬性。
既然擔(dān)心 Python 線程的 GIL 問題,為何不直接用多進(jìn)程?
答:多進(jìn)程沒那么好用,進(jìn)程是重量級的,有獨(dú)立的內(nèi)存管理,共享內(nèi)存是比較麻煩的:
import multiprocessing class Int(object): def __init__(self, i): self.__int = i def add(self): self.__int += 1 def print(self): print(self.__int) def add(integer: Int): integer.add() integer.print() print(id(integer)) if __name__ == '__main__': a_integer = Int(0) p1 = multiprocessing.Process(target=add, args=(a_integer,)) p2 = multiprocessing.Process(target=add, args=(a_integer,)) p3 = multiprocessing.Process(target=add, args=(a_integer,)) p1.start() p2.start() p3.start() add(a_integer) a_integer.print()
輸出:
1
1839132811024
1
1
2091010788944
1
1721319788112
1
2095109213776
可見,各進(jìn)程操作的 Int
對象不是同一個(gè),即,創(chuàng)建子進(jìn)程時(shí)傳入?yún)?shù)會(huì)是參數(shù)的一份拷貝。
如果將 multiprocessing.Process
換成 threading.Thread
,則輸出:
1
2691328945888
2
2691328945888
3
2691328945888
4
2691328945888
4
創(chuàng)建線程時(shí)傳入?yún)?shù)會(huì)是參數(shù)對象本身。
此外,子進(jìn)程不能訪問主線程的變量,如果:
def add(integer: Int): integer.add() integer.print() b_integer.add() # 加一個(gè)主進(jìn)程中的變量 print(id(integer))
則會(huì)報(bào)錯(cuò)。而線程則可以。
可以看到,PreDataLoader
中的線程是訪問了主程序的數(shù)據(jù)了的,如果用進(jìn)程,一是編程比較麻煩,二是效率也未必就高。
到此這篇關(guān)于pytorch 數(shù)據(jù)預(yù)加載的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)pytorch 數(shù)據(jù)預(yù)加載內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python中出現(xiàn)"No?module?named?'requests'"
這篇文章主要給大家介紹了關(guān)于Python中出現(xiàn)"No?module?named?'requests'"的解決辦法,"No?module?named?requests"是Python報(bào)錯(cuò)提示,意味著你在使用某個(gè)Python程序或腳本時(shí),沒有找到名為requests的模塊,需要的朋友可以參考下2023-11-11基于Python實(shí)現(xiàn)拆分和合并GIF動(dòng)態(tài)圖
這篇文章主要介紹了Python拆分和合并GIF動(dòng)態(tài)圖,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-10-10Python的Flask框架中使用Flask-Migrate擴(kuò)展遷移數(shù)據(jù)庫的教程
Flask-Migrate可以幫助Flask應(yīng)用程序通過預(yù)設(shè)的Python腳本完成數(shù)據(jù)庫遷移操作,這里我們就來看一下Python的Flask框架中使用Flask-Migrate擴(kuò)展遷移數(shù)據(jù)庫的教程,需要的朋友可以參考下2016-06-06Pytorch訓(xùn)練模型得到輸出后計(jì)算F1-Score 和AUC的操作
這篇文章主要介紹了Pytorch訓(xùn)練模型得到輸出后計(jì)算F1-Score 和AUC的操作,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-05-05Python中的getattr、__getattr__、__getattribute__、__get__詳解
這篇文章主要為大家介紹了Python中的getattr,__getattr__,__getattribute__和__get__,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助2021-12-12