Python并行庫(kù)joblib之delayed函數(shù)與Parallel函數(shù)詳解
Joblib
Joblib就是一個(gè)可以簡(jiǎn)單地將Python代碼轉(zhuǎn)換為并行計(jì)算模式的軟件包,它可非常簡(jiǎn)單并行我們的程序,從而提高計(jì)算速度。
主要提供了以下功能
- 程序并行
- 用于在每次使用相同的輸入?yún)?shù)調(diào)用函數(shù)時(shí)將其返回值緩存
- 數(shù)據(jù)存儲(chǔ)(包括不可哈希的數(shù)據(jù)和大規(guī)模numpy數(shù)組)
程序并行
joblib提供了一個(gè)簡(jiǎn)單地程序并行方案,主要有Parallel函數(shù)實(shí)現(xiàn),并涉及了一個(gè)技巧性的函數(shù)delayed。
delayed函數(shù)
以下為delayed函數(shù)的源碼
def delayed(function): """Decorator used to capture the arguments of a function.""" def delayed_function(*args, **kwargs): return function, args, kwargs try: delayed_function = functools.wraps(function)(delayed_function) except AttributeError: " functools.wraps fails on some callable objects " return delayed_function
*functools.wraps 旨在消除裝飾器對(duì)原函數(shù)造成的影響,即對(duì)原函數(shù)的相關(guān)屬性進(jìn)行拷貝,已達(dá)到裝飾器不修改原函數(shù)的目的。
從功能上來(lái)說(shuō),可以認(rèn)為被wrap修飾后的函數(shù)與原函數(shù)功能完全相同,暫時(shí)忽略不計(jì)
delayed函數(shù)顧名思義就是延遲函數(shù)的執(zhí)行。
根據(jù)源碼來(lái)看,delayed函數(shù)保留被修飾的函數(shù)function和參數(shù)*args, **kwargs,在碰到調(diào)用時(shí),并不直接執(zhí)行函數(shù)function(*args, **kwargs),而是返回返回元組(function,args,kwargs)。
返回的這個(gè)結(jié)果留待其他函數(shù)執(zhí)行,在joblib里具體是與Parallel配合的。
下面我們通過具體例子看一下delayed函數(shù)如何工作的
import functools def delayed(function): """Decorator used to capture the arguments of a function.""" def delayed_function(*args, **kwargs): return function, args, kwargs try: delayed_function = functools.wraps(function)(delayed_function) except AttributeError: " functools.wraps fails on some callable objects " return delayed_function def f(x,y): return x+y res = delayed(f)(1,y=3) print(res)
執(zhí)行結(jié)果為:
(<function f at 0x00000000081C29D8>, (1,), {'y': 3})
返回了原始的函數(shù)f和調(diào)用它是的兩個(gè)參數(shù)。
上面也說(shuō)過delayed函數(shù)其實(shí)是一個(gè)修飾器,因此上面的代碼與下面的寫法等價(jià)
@delayed def f(x,y): return x+y res = f(1,y=3) print(res)
*delayed之后并未得到函數(shù)的執(zhí)行結(jié)果,我們?nèi)绻氲玫筋A(yù)期的執(zhí)行結(jié)果應(yīng)該怎么做呢?其實(shí)delayed函數(shù)主要是與其他函數(shù)配合的,我們可以再寫一個(gè)程序進(jìn)行計(jì)算:
def f(x,y): return x+y res = delayed(f)(1,y=2) print(res) #out: (<function f at 0x00000000081C2510>, (1,), {'y': 2}) foo,args,kwargs = res final_res = foo(*args,**kwargs) print(final_res) #out: 3
我們先解包了delayed之后的結(jié)果res,得到了函數(shù)f,和參數(shù)args,kwargs,然后調(diào)用foo(*args,**kwargs)得到最終得到期待的計(jì)算結(jié)果的final_res。
開始可能很難理解我們?yōu)槭裁匆舆t一個(gè)函數(shù)的調(diào)用呢,調(diào)用函數(shù)不就是為了得到執(zhí)行結(jié)果嗎,這樣延遲之后不就無(wú)法達(dá)到我們的預(yù)期了?
先別急,看看下面這個(gè)例子應(yīng)該就明白了
假設(shè)我們需要對(duì)x_list = [1,2,3], y_list = [ -1,-2,-3] 這兩個(gè)列表進(jìn)行逐個(gè)元素相加。
可能會(huì)覺得這不是很容易實(shí)現(xiàn)嗎,依次遍歷兩個(gè)集合的元素,在循環(huán)過程中調(diào)用函數(shù)f進(jìn)行元素相加就好了,可能寫出的代碼如下:
def f(x,y): return x+y x_list = [1,2,3] y_list = [-1,-2,-3] res = [] for x,y in zip(x_list,y_list): res.append(f(x,y)) print(res)
這樣線性的多次調(diào)用函數(shù)f當(dāng)然沒問題,但是如果我們并行的執(zhí)行f的三次調(diào)用,也就是同時(shí)執(zhí)行f(1,-1),f(2,-2),f(3,-3)時(shí)代碼該怎么寫呢?我們希望使用三個(gè)線程/進(jìn)程,每個(gè)進(jìn)程分別執(zhí)行一個(gè)函數(shù)調(diào)用。這時(shí)就可以看出delayed函數(shù)的用法了。
@delayed def f(x,y): return x+y x_list = [1,2,3] y_list = [-1,-2,-3] res = [] for x,y in zip(x_list,y_list): res.append(f(x,y)) print(res)
執(zhí)行之后的結(jié)果為:
[(<function f at 0x00000000081C2950>, (1, -1), {}),
(<function f at 0x00000000081C2950>, (2, -2), {}),
(<function f at 0x00000000081C2950>, (3, -3), {})]
我們獲得了由(函數(shù),args,kwargs)這樣的元組組成的列表,我們可以為列表中的每一個(gè)元組(f,args,kwargs)分配給不同的線程,在每個(gè)線程里面執(zhí)行一個(gè)f(args,kwargs)。這樣是不是就可以完成并行的目的了呢?如何將元組分配給不同的線程由函數(shù)Parallel實(shí)現(xiàn),后面再講。我們現(xiàn)在只需要知道的delayed函數(shù)只是為了生成(函數(shù),args,kwargs)這樣的元組,暫緩函數(shù)的執(zhí)行,方便將各個(gè)計(jì)算過程分配給不同的線程。
tips:一般我們只想得到一個(gè)這樣的列表,并不想改變?cè)己瘮?shù)f,因此上面的修飾器寫法一般等價(jià)的寫為:
def f(x,y): return x+y x_list = [1,2,3] y_list = [-1,-2,-3] res = [delayed(f)(x,y) for x,y in zip(x_list,y_list)] print(res)
Parallel函數(shù)
看名字就知道Parallel主要的功能是實(shí)現(xiàn)程序并行。
因此在講Parallel之前,先看一下并行的實(shí)際處理流程:
Parallel實(shí)際上就是封裝了這個(gè)任務(wù)拆分,并行和結(jié)果合并的這個(gè)過程。其主要的功能是線程和進(jìn)程的創(chuàng)建,和多任務(wù)執(zhí)行這個(gè)流程。
由于其源碼過于復(fù)雜,這里只看和調(diào)用相關(guān)的兩個(gè)部分:
(1)、初始化部分
Parallel的初始化主要是與程序并行的配置相關(guān),其函數(shù)定義為
class joblib.parallel(n_jobs=None, backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs', batch_size='auto',temp_folder=None, max_nbytes='1M', mmap_mode='r', prefer=None, require=None)
參數(shù)解釋:(參考:官方文檔)
- n_jobs: int, default: None —— 設(shè)置并行執(zhí)行任務(wù)的最大數(shù)量
當(dāng)backend="multiprocessing"時(shí)指python工作進(jìn)程的數(shù)量,或者backend="threading"時(shí)指線程池大小。
當(dāng)n_jobs=-1時(shí),使用所有的CPU執(zhí)行并行計(jì)算;
當(dāng)n_jobs=1時(shí),就不會(huì)使用并行代碼,即等同于順序執(zhí)行,可以在debug情況下使用;
當(dāng)n_jobs<-1時(shí),將會(huì)使用(n_cpus + 1 + n_jobs)個(gè)CPU,例如n_jobs=-2時(shí),將會(huì)使用n_cpus-1個(gè)CPU核,其中n_cpus為CPU核的數(shù)量;
當(dāng)n_jobs=None的情況等同于n_jobs=1。
- backend: str, default: 'loky' —— 指定并行化后端的實(shí)現(xiàn)方法
backend='loky': 在與Python進(jìn)程交換輸入和輸出數(shù)據(jù)時(shí),可導(dǎo)致一些通信和內(nèi)存開銷。
backend='multiprocessing': 基于multiprocessing.Pool的后端,魯棒性不如loky。
backend='threading': threading是一個(gè)開銷非常低的backend。但是如果被調(diào)用的函數(shù)大量依賴于Python對(duì)象,它就會(huì)受到Python全局解釋器(GIL)鎖的影響。當(dāng)執(zhí)行瓶頸是顯式釋放GIL的已編譯擴(kuò)展時(shí),
“threading”非常有用(例如,封裝在“with nogil”塊中的Cython循環(huán),或者對(duì)庫(kù)(如NumPy)的大量調(diào)用)。
- verbose: int, 可選項(xiàng) —— 執(zhí)行期間的信息顯示(ps:這個(gè)參數(shù)實(shí)際上有點(diǎn)類似log的日志級(jí)別,數(shù)字越大,顯示的程序執(zhí)行過程越詳細(xì))
信息級(jí)別:如果非零,則打印進(jìn)度消息。超過50,輸出被發(fā)送到stdout。消息的頻率隨著信息級(jí)別的增加而增加。如果大于10,則報(bào)告所有迭代。
- timeout: float, 可選項(xiàng) —— 任務(wù)運(yùn)行時(shí)間限制
timeout僅用在n_jobs != 1的情況下,用來(lái)限制每個(gè)任務(wù)完成的時(shí)間,如果任何任務(wù)的執(zhí)行超過這個(gè)限制值,將會(huì)引發(fā)“TimeOutError”錯(cuò)誤。
- pre_dispatch: {'all', integer, or expression, as in '3*n_jobs'}
預(yù)先分派的(任務(wù)的)批數(shù)(batches)。默認(rèn)設(shè)置是“2 * n_jobs”。
ps: 這個(gè)參數(shù)有點(diǎn)難理解,直觀來(lái)說(shuō)一下吧。pre_dispatch是預(yù)派遣的意思,就是提前先把任務(wù)派遣給各個(gè)處理器。
注意一下,這里的派遣并不是口頭的安排任務(wù),而是把任務(wù)和任務(wù)對(duì)應(yīng)的數(shù)據(jù)(劃重點(diǎn))也發(fā)送給處理器。假設(shè)我們總共有12,3個(gè)處理器,如果不設(shè)置pre_dispatch,那么程序在開始時(shí)會(huì)一次性將所有任務(wù)都分配出去,一個(gè)處理器領(lǐng)到了四個(gè)任務(wù)。但是這個(gè)派遣過程需要為每個(gè)任務(wù)準(zhǔn)備相應(yīng)的數(shù)據(jù),需要處理時(shí)間和占據(jù)內(nèi)存空間。
一次分配任務(wù)過多可能造成的后果就是準(zhǔn)備數(shù)據(jù)的時(shí)間變長(zhǎng)和可能造成內(nèi)存爆炸。所以設(shè)置一下預(yù)派遣任務(wù)的數(shù)量,減小預(yù)處理時(shí)間和內(nèi)存占用,保證處理器不空轉(zhuǎn)就行。
- batch_size: int or 'auto', default: 'auto' —— 一次性分派給每個(gè)worker的原子任務(wù)的數(shù)量
當(dāng)單個(gè)原子任務(wù)執(zhí)行非??鞎r(shí),由于開銷的原因,使用dispatching的worker可能比順序計(jì)算慢。一起進(jìn)行批量快速計(jì)算可以緩解這種情況。
“auto”策略會(huì)跟蹤一個(gè)批處理完成所需的時(shí)間,并動(dòng)態(tài)調(diào)整batch_size大小,使用啟發(fā)式方法將時(shí)間保持在半秒以內(nèi)。初始batch_size為1。
batch_size="auto"且backend="threading時(shí),將一次分派一個(gè)任務(wù)的batches,因?yàn)閠hreading后端有非常小的開銷,使用更大的batch_size在這種情況下沒有證明帶來(lái)任何好處。
這個(gè)可以簡(jiǎn)單的理解為每個(gè)處理器同時(shí)處理的任務(wù)數(shù)量,也就是一次分配每個(gè)處理的任務(wù)數(shù)量。
ps:插一句,嚴(yán)格從執(zhí)行順序來(lái)看實(shí)際上每個(gè)batch內(nèi)的原子任務(wù)還是順序執(zhí)行的(這點(diǎn)與深度學(xué)習(xí)框架中的batch是不同的),當(dāng)每個(gè)任務(wù)執(zhí)行時(shí)間非常短時(shí),可以從邏輯上認(rèn)為每個(gè)bacth內(nèi)的任務(wù)是同時(shí)執(zhí)行的。因此這個(gè)參數(shù)的應(yīng)用場(chǎng)景是每個(gè)任務(wù)單獨(dú)執(zhí)行的時(shí)間非常短,但是又希望使用并行加快速度的場(chǎng)景。
n_jobs,pre_dispatch,batch_size這三個(gè)參數(shù)要綜合理解。舉個(gè)例子,假設(shè)我們有24個(gè)任務(wù),設(shè)定n_jobs = 3(使用3個(gè)處理器),pre_disaptch = 2 * 3,batch_size = 2
那么實(shí)際上預(yù)派遣了2*3*bacth_size=12個(gè)任務(wù),也就是說(shuō)隊(duì)列里有12個(gè)任務(wù)可供隨時(shí)選擇執(zhí)行。然后batch_size = 2,表示每個(gè)處理器需要拿走2個(gè)任務(wù),n_jobs=3的情況下,總共需要拿走n_jobs*batch_szie = 6個(gè)任務(wù)去執(zhí)行。這樣隊(duì)列里還有6個(gè)任務(wù)可供下次提取,保證了處理器不會(huì)空轉(zhuǎn)。
- temp_folder: str, 可選項(xiàng)
這個(gè)看名字就知道是臨時(shí)文件夾,存儲(chǔ)進(jìn)程中的一些臨時(shí)緩存數(shù)據(jù)
- max_nbytes int, str, or None, optional, 1M by default
觸發(fā)緩存機(jī)制的閾值,當(dāng)worker中的numpy數(shù)組超過閾值時(shí)會(huì)觸發(fā)內(nèi)存映射
- mmap_mode: {None, 'r+', 'r', 'w+', 'c'}
映射后的數(shù)據(jù)的打開模式,就是常規(guī)的讀寫權(quán)限控制
- prefer: str in {'processes', 'threads'} or None, default: None
如果使用parallel_backend上下文管理器沒有選擇任何特定backend,則使用軟提示選擇默認(rèn)backend。默認(rèn)的基于進(jìn)程(thread-based)的backend是“loky”,默認(rèn)的基于線程的backend是“threading”。
如果指定了“backend”參數(shù),則忽略。
- require: 'sharedmem' or None, default None
硬約束選擇backend。如果設(shè)置為'sharedmem',即使用戶要求使用parallel_backend實(shí)現(xiàn)非基于線程的后端,所選backend也將是single-host和thread-based的。
使用示例:
from joblib import Parallel #4個(gè)線程 works_4 = Parallel(n_jobs=4,backend = 'threading') #2個(gè)進(jìn)程 works_2 = Parallel(n_jobs=4,backend = 'multiprocessing')
(2)調(diào)用
Parallel實(shí)際上是一個(gè)類,但是實(shí)現(xiàn)了__call__方法,因此可以像函數(shù)一樣進(jìn)行調(diào)用。
但是Parallel的輸入需要可迭代的對(duì)象,也就是一些任務(wù)的集合。這些任務(wù)由(function, args,kwargs)這種函數(shù)名和參數(shù)的方式表示。
from joblib import Parallel def f(x,y): return x+y works_2 = Parallel(n_jobs=2,backend = 'threading') task1 = [f, [2,3],{}] task2 = [f,[4],{'y':5}] res = works_2([task1,task2]) print(res) #out: res = [5,9]
上面的代碼我們先定義了一個(gè)2線程的并行處理器works_2,然后構(gòu)造了兩個(gè)任務(wù)task1和task2。[task1,task2]合并成一個(gè)任務(wù)集合作為works_2的參數(shù)進(jìn)行執(zhí)行,最終我們也得到了2+3和4+5的正確結(jié)果。
我們可能會(huì)想任務(wù)集合中各個(gè)任務(wù)的函數(shù)可以不同嗎?答案當(dāng)然是可以的
from joblib import Parallel def f1(x,y): return x+y def f2(x,y): return y-x task1 = [f1, [4,10],{}] task2 = [f2,[4,10],{}] works_2 = Parallel(n_jobs=2,backend = 'threading') res = works_2([task1,task2]) print(res #out: res = [14,6]
講到這兒就可以基本寫出多任務(wù)并行的程序了,但是是不是覺得每次構(gòu)造一個(gè)任務(wù)集合非常的麻煩,需要指定函數(shù)名,參數(shù)啥的。回想一下之前說(shuō)的Dealyed函數(shù)不就是幫我們做這件事的嗎,自動(dòng)創(chuàng)建一個(gè)由(函數(shù),args,kwargs)這樣的原子組成的列表。
from joblib import delayed def f(x,y): return x+y x_list = [1,2,3] y_list = [-1,-2,-3] tasks = [delayed(f)(x,y) for x,y in zip(x_list,y_list)] print(tasks)
結(jié)果如下:
[(<function f at 0x00000000081C2950>, (1, -1), {}),
(<function f at 0x00000000081C2950>, (2, -2), {}),
(<function f at 0x00000000081C2950>, (3, -3), {})]
這下我們結(jié)合Parallel和delayed更為方便的創(chuàng)建并行程序
from joblib import Parallel,delayed def f(x,y): return x+y x_list = [1,2,3] y_list = [-1,-2,-3] tasks = [delayed(f)(x,y) for x,y in zip(x_list,y_list)] print(tasks) works_2 = Parallel(n_jobs=2,backend = 'threading') res = works_2(tasks) print(res)
為了程序簡(jiǎn)潔,我們可以省略一些中間過程,上面的代碼也可以寫為:
from joblib import Parallel,delayed def f(x,y): return x+y x_list = [1,2,3] y_list = [-1,-2,-3] res = Parallel(n_jobs=2,backend = 'threading')([delayed(f)(x,y) for x,y in zip(x_list,y_list)] ) print(res)
這就是我們一般看到的示例了。
到此這篇關(guān)于Python并行庫(kù)joblib之delayed函數(shù)與Parallel函數(shù)詳解的文章就介紹到這了,更多相關(guān)python并行庫(kù)joblib內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python上傳時(shí)包含boundary時(shí)的解決方法
這篇文章主要介紹了python上傳時(shí)包含boundary時(shí)的解決方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來(lái)看看吧2020-04-04基于python for in if 連著寫與分開寫的區(qū)別說(shuō)明
這篇文章主要介紹了基于python for in if 連著寫與分開寫的區(qū)別說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來(lái)看看吧2021-03-03scratch3.0二次開發(fā)之用blocks生成python代碼
python是blockl.generator的一個(gè)實(shí)例,會(huì)調(diào)用generator里的方法,這篇文章主要介紹了scratch3.0二次開發(fā)之用blocks生成python代碼,需要的朋友可以參考下2021-08-08在Mac OS上使用mod_wsgi連接Python與Apache服務(wù)器
這篇文章主要介紹了在Mac OS上使用mod_wsgi連接Python與Apache服務(wù)器的方法,同時(shí)文中還介紹了使用Python的Django框架時(shí)mod_wsgi連接方式下可能遇到的問題的一般解決方法,需要的朋友可以參考下2015-12-12flask的orm框架SQLAlchemy查詢實(shí)現(xiàn)解析
這篇文章主要介紹了flask的orm框架SQLAlchemy查詢實(shí)現(xiàn)解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12