Python中的Joblib庫使用學(xué)習(xí)總結(jié)
Joblib
實(shí)踐環(huán)境
- python 3.6.2
- Joblib
簡介
Joblib是一組在Python中提供輕量級流水線的工具。特別是:
- 函數(shù)的透明磁盤緩存和延遲重新計(jì)算(記憶模式)
- 簡單易用的并行計(jì)算
Joblib已被優(yōu)化得很快速,很健壯了,特別是在大數(shù)據(jù)上,并對numpy數(shù)組進(jìn)行了特定的優(yōu)化。
主要功能
1.輸出值的透明快速磁盤緩存(Transparent and fast disk-caching of output value):
Python函數(shù)的內(nèi)存化或類似make的功能,適用于任意Python對象,包括非常大的numpy數(shù)組。
通過將操作寫成一組具有定義良好的輸入和輸出的步驟:Python函數(shù),將持久性和流執(zhí)行邏輯與域邏輯或算法代碼分離開來。
Joblib可以將其計(jì)算保存到磁盤上,并僅在必要時(shí)重新運(yùn)行:
>>> from joblib import Memory >>> cachedir = 'your_cache_dir_goes_here' >>> mem = Memory(cachedir) >>> import numpy as np >>> a = np.vander(np.arange(3)).astype(float) >>> square = mem.cache(np.square) >>> b = square(a) ______________________________________________________________________... [Memory] Calling square... square(array([[0., 0., 1.], [1., 1., 1.], [4., 2., 1.]])) _________________________________________________...square - ...s, 0.0min >>> c = square(a) # The above call did not trigger an evaluation
2.并行助手(parallel helper):
輕松編寫可讀的并行代碼并快速調(diào)試
>>> from joblib import Parallel, delayed >>> from math import sqrt >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10)) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] >>> res = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10)) >>> res [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
3.快速壓縮的持久化(Fast compressed Persistence):
代替pickle在包含大數(shù)據(jù)的Python對象上高效工作( joblib.dump & joblib.load )。
parallel for loops
常見用法
Joblib提供了一個(gè)簡單的助手類,用于使用多進(jìn)程為循環(huán)實(shí)現(xiàn)并行。核心思想是將要執(zhí)行的代碼編寫為生成器表達(dá)式,并將其轉(zhuǎn)換為并行計(jì)算
>>> from math import sqrt >>> [sqrt(i ** 2) for i in range(10)] [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
使用以下代碼,可以分布到2個(gè)CPU上:
>>> from math import sqrt >>> from joblib import Parallel, delayed >>> Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10)) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
輸出可以是一個(gè)生成器,在可以獲取結(jié)果時(shí)立即返回結(jié)果,即使后續(xù)任務(wù)尚未完成。輸出的順序始終與輸入的順序相匹配:輸出的順序總是匹配輸入的順序:
>>> from math import sqrt >>> from joblib import Parallel, delayed >>> parallel = Parallel(n_jobs=2, return_generator=True) # py3.7往后版本才支持return_generator參數(shù) >>> output_generator = parallel(delayed(sqrt)(i ** 2) for i in range(10)) >>> print(type(output_generator)) <class 'generator'> >>> print(next(output_generator)) 0.0 >>> print(next(output_generator)) 1.0 >>> print(list(output_generator)) [2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
此生成器允許減少joblib.Parallel的內(nèi)存占用調(diào)用
基于線程的并行VS基于進(jìn)程的并行
默認(rèn)情況下, joblib.Parallel 使用 'loky' 后端模塊啟動(dòng)單獨(dú)的Python工作進(jìn)程,以便在分散的CPU上同時(shí)執(zhí)行任務(wù)。
對于一般的Python程序來說,這是一個(gè)合理的默認(rèn)值,但由于輸入和輸出數(shù)據(jù)需要在隊(duì)列中序列化以便同工作進(jìn)程進(jìn)行通信,因此可能會導(dǎo)致大量開銷(請參閱序列化和進(jìn)程)。
當(dāng)你知道你調(diào)用的函數(shù)是基于一個(gè)已編譯的擴(kuò)展,并且該擴(kuò)展在大部分計(jì)算過程中釋放了Python全局解釋器鎖(GIL)時(shí),使用線程而不是Python進(jìn)程作為并發(fā)工作者會更有效。
例如,在Cython函數(shù)的with nogil 塊中編寫CPU密集型代碼。
如果希望代碼有效地使用線程,只需傳遞 preferre='threads' 作為 joblib.Parallel 構(gòu)造函數(shù)的參數(shù)即可。在這種情況下,joblib將自動(dòng)使用 "threading" 后端,而不是默認(rèn)的 "loky" 后端
>>> Parallel(n_jobs=2, prefer=threads')( ... delayed(sqrt)(i ** 2) for i in range(10)) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
也可以在上下文管理器的幫助下手動(dòng)選擇特定的后端實(shí)現(xiàn):
>>> from joblib import parallel_backend >>> with parallel_backend('threading', n_jobs=2): ... Parallel()(delayed(sqrt)(i ** 2) for i in range(10)) ... [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
后者在調(diào)用內(nèi)部使用 joblib.Parallel 的庫時(shí)特別有用,不會將后端部分作為其公共API的一部分公開。
'loky' 后端可能并不總是可獲取。
一些罕見的系統(tǒng)不支持多處理(例如Pyodide)。在這種情況下,loky后端不可用,使用線程作為默認(rèn)后端。
除了內(nèi)置的joblib后端之外,還可以使用幾個(gè)特定于集群的后端:
- 用于Dask集群的Dask后端
- 用于Ray集群的Ray后端
- 用于Spark集群上分發(fā)joblib任務(wù)的Joblib Apache Spark Backend
序列化與進(jìn)程
要在多個(gè)python進(jìn)程之間共享函數(shù)定義,必須依賴序列化協(xié)議。python中的標(biāo)準(zhǔn)協(xié)議是 pickle ,但它在標(biāo)準(zhǔn)庫中的默認(rèn)實(shí)現(xiàn)有幾個(gè)限制。例如,它不能序列化交互式定義的函數(shù)或在 __main__ 模塊中定義的函數(shù)。
為了避免這種限制, loky 后端現(xiàn)在依賴于 cloudpickle 以序列化python對象。 cloudpickle 是 pickle 協(xié)議的另一種實(shí)現(xiàn)方式,允許序列化更多的對象,特別是交互式定義的函數(shù)。因此,對于大多數(shù)用途, loky 后端應(yīng)該可以完美的工作。
cloudpickle 的主要缺點(diǎn)就是它可能比標(biāo)準(zhǔn)類庫中的 pickle 慢,特別是,對于大型python字典或列表來說,這一點(diǎn)至關(guān)重要,因?yàn)樗鼈兊男蛄谢瘯r(shí)間可能慢100倍。有兩種方法可以更改 joblib 的序列化過程以緩和此問題:
- 如果您在UNIX系統(tǒng)上,則可以切換回舊的 multiprocessing 后端。有了這個(gè)后端,可以使用很快速的 pickle 在工作進(jìn)程中共享交互式定義的函數(shù)。該解決方案的主要問題是,使用 fork 啟動(dòng)進(jìn)程會破壞標(biāo)準(zhǔn)POSIX,并可能與 numpy 和 openblas 等第三方庫進(jìn)行非正常交互。
- 如果希望將 loky 后端與不同的序列化庫一起使用,則可以設(shè)置 LOKY_PICKLER=mod_pickle 環(huán)境變量,以使用 mod_pickle 作為 loky 的序列化庫。作為參數(shù)傳遞的模塊 mod_pickle 應(yīng)按 import mod_picke 導(dǎo)入,并且應(yīng)包含一個(gè) Pickler 對象,該對象將用于序列化為對象。可以設(shè)置 LOKY_PICKLER=pickle 以使用表中類庫中的pickling模塊。 LOKY_PICKLER=pickle 的主要缺點(diǎn)是不能序列化交互式定義的函數(shù)。為了解決該問題,可以將此解決方案與 joblib.wrap_non_picklable_objects() 一起使用, joblib.wrap_non_picklable_objects() 可用作裝飾器以為特定對下本地啟用 cloudpickle 。通過這種方式,可以為所有python對象使用速度快的picking,并在本地為交互式函數(shù)啟用慢速的pickling。查閱loky_wrapper獲取示例。
共享內(nèi)存語義
joblib的默認(rèn)后端將在獨(dú)立的Python進(jìn)程中運(yùn)行每個(gè)函數(shù)調(diào)用,因此它們不能更改主程序中定義的公共Python對象。
然而,如果并行函數(shù)確實(shí)需要依賴于線程的共享內(nèi)存語義,則應(yīng)顯示的使用 require='sharemem' ,例如:
>>> shared_set = set() >>> def collect(x): ... shared_set.add(x) ... >>> Parallel(n_jobs=2, require='sharedmem')( ... delayed(collect)(i) for i in range(5)) [None, None, None, None, None] >>> sorted(shared_set) [0, 1, 2, 3, 4]
請記住,從性能的角度來看,依賴共享內(nèi)存語義可能是次優(yōu)的,因?yàn)閷蚕鞵ython對象的并發(fā)訪問將受到鎖爭用的影響。
注意,不使用共享內(nèi)存的情況下,任務(wù)進(jìn)程之間的內(nèi)存資源是相互獨(dú)立的,舉例說明如下:
#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading from joblib import Parallel, delayed, parallel_backend from collections import deque GLOBAL_LIST = [] class TestClass(): def __init__(self): self.job_queue = deque() def add_jobs(self): i = 0 while i < 3: time.sleep(1) i += 1 GLOBAL_LIST.append(i) self.job_queue.append(i) print('obj_id:', id(self), 'job_queue:', self.job_queue, 'global_list:', GLOBAL_LIST) def get_job_queue_list(obj): i = 0 while not obj.job_queue and i < 3: time.sleep(1) i += 1 print('obj_id:', id(obj), 'job_queue:', obj.job_queue, 'global_list:', GLOBAL_LIST) return obj.job_queue if __name__ == "__main__": obj = TestClass() def test_fun(): with parallel_backend("multiprocessing", n_jobs=2): Parallel()(delayed(get_job_queue_list)(obj) for i in range(2)) thread = threading.Thread(target=test_fun, name="parse_log") thread.start() time.sleep(1) obj.add_jobs() print('global_list_len:', len(GLOBAL_LIST))
控制臺輸出:
obj_id: 1554577912664 job_queue: deque([]) global_list: []
obj_id: 1930069893920 job_queue: deque([]) global_list: []
obj_id: 2378500766968 job_queue: deque([1]) global_list: [1]
obj_id: 1554577912664 job_queue: deque([]) global_list: []
obj_id: 1930069893920 job_queue: deque([]) global_list: []
obj_id: 2378500766968 job_queue: deque([1, 2]) global_list: [1, 2]
obj_id: 1554577912664 job_queue: deque([]) global_list: []
obj_id: 1930069893920 job_queue: deque([]) global_list: []
obj_id: 2378500766968 job_queue: deque([1, 2, 3]) global_list: [1, 2, 3]
global_list_len: 3
通過輸出可知,通過joblib.Parallel開啟的進(jìn)程,其占用內(nèi)存和主線程占用的內(nèi)存資源是相互獨(dú)立
復(fù)用worer池
一些算法需要對并行函數(shù)進(jìn)行多次連續(xù)調(diào)用,同時(shí)對中間結(jié)果進(jìn)行處理。在一個(gè)循環(huán)中多次調(diào)用 joblib.Parallel 次優(yōu)的,因?yàn)樗鼤啻蝿?chuàng)建和銷毀一個(gè)workde(線程或進(jìn)程)池,這可能會導(dǎo)致大量開銷。
在這種情況下,使用 joblib.Parallel 類的上下文管理器API更有效,以便對 joblib.Parallel 對象的多次調(diào)用可以復(fù)用同一worker池。
from joblib import Parallel, delayed from math import sqrt with Parallel(n_jobs=2) as parallel: accumulator = 0. n_iter = 0 while accumulator < 1000: results = parallel(delayed(sqrt)(accumulator + i ** 2) for i in range(5)) accumulator += sum(results) # synchronization barrier n_iter += 1 print(accumulator, n_iter) #輸出: 1136.5969161564717 14
請注意,現(xiàn)在基于進(jìn)程的并行默認(rèn)使用 'loky' 后端,該后端會自動(dòng)嘗試自己維護(hù)和重用worker池,即使是在沒有上下文管理器的調(diào)用中也是如此
筆者實(shí)踐發(fā)現(xiàn),即便采用這種實(shí)現(xiàn)方式,其運(yùn)行效率也是非常低下的,應(yīng)該盡量避免這種設(shè)計(jì)(實(shí)踐環(huán)境 Python3.6)
Parallel參考文檔
class joblib.Parallel(n_jobs=default(None), backend=None, return_generator=False, verbose=default(0), timeout=None, pre_dispatch='2 * n_jobs', batch_size='auto', temp_folder=default(None), max_nbytes=default('1M'), mmap_mode=default('r'), prefer=default(None), require=default(None))
常用參數(shù)說明
- n_jobs :int, 默認(rèn): None
并發(fā)運(yùn)行作業(yè)的最大數(shù)量,例如當(dāng) backend='multiprocessing' 時(shí)Python工作進(jìn)程的數(shù)量,或者當(dāng) backend='threading' 時(shí)線程池的大小。如果設(shè)置為 -1,則使用所有CPU。如果設(shè)置為1,則根本不使用并行計(jì)算代碼,并且行為相當(dāng)于一個(gè)簡單的python for循環(huán)。此模式與 timeout 不兼容。如果 n_jobs 小于-1,則使用 (n_cpus+1+n_jobs) 。因此,如果 n_jobs=-2 ,將使用除一個(gè)CPU之外的所有CPU。如果為 None ,則默認(rèn) n_jobs=1 ,除非在 parallel_backend() 上下文管理器下執(zhí)行調(diào)用,此時(shí)會為 n_jobs 設(shè)置另一個(gè)值。
- backend : str, ParallelBackendBase 實(shí)例或者 None , 默認(rèn): 'loky'
指定并行化后端實(shí)現(xiàn)。支持的后端有:
- loky 在與工作Python進(jìn)程交換輸入和輸出數(shù)據(jù)時(shí),默認(rèn)使用的 loky 可能會導(dǎo)致一些通信和內(nèi)存開銷。在一些罕見的系統(tǒng)(如Pyiode)上, loky 后端可能不可用。
- multiprocessing 以前基于進(jìn)程的后端,基于 multiprocessing.Pool 。不如loky健壯。
- threading 是一個(gè)開銷很低的后端,但如果被調(diào)用的函數(shù)大量依賴于Python對象,它會受到Python GIL的影響。當(dāng)執(zhí)行瓶頸是顯式釋放GIL的已編譯擴(kuò)展時(shí), threading 最有用(例如, with-nogil 塊中封裝的Cython循環(huán)或?qū)umPy等庫的昂貴調(diào)用)。
- 最后,可以通過調(diào)用 register_pallel_backend() 來注冊后端。
不建議在類庫中調(diào)用 Parallel 時(shí)對 backend 名稱進(jìn)行硬編碼,取而代之,建議設(shè)置軟提示( prefer )或硬約束( require ),以便庫用戶可以使用 parallel_backend() 上下文管理器從外部更改 backend 。
- return_generator : bool
如果為 True ,則對此實(shí)例的調(diào)用將返回一個(gè)生成器,并在結(jié)果可獲取時(shí)立即按原始順序返回結(jié)果。請注意,預(yù)期用途是一次運(yùn)行一個(gè)調(diào)用。對同一個(gè)Parallel對象的多次調(diào)用將導(dǎo)致 RuntimeError
- prefer : str 可選值 ‘processes’ , ‘threads’ , None , 默認(rèn): None
如果使用 parallel_backen() 上下文管理器時(shí)沒有指定特定后端,則選擇默認(rèn) prefer 給定值。默認(rèn)的基于進(jìn)程的后端是 loky ,而默認(rèn)的基于線程的后端則是 threading 。如果指定了 backend 參數(shù),則忽略該參數(shù)。
- require : ‘sharedmem’ 或者 None , 默認(rèn) None
用于選擇后端的硬約束。如果設(shè)置為 'sharedmem' ,則所選后端將是單主機(jī)和基于線程的,即使用戶要求使用具有 parallel_backend 的非基于線程的后端。
到此這篇關(guān)于Python中的Joblib庫使用學(xué)習(xí)總結(jié)的文章就介紹到這了,更多相關(guān)Python中的Joblib庫內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python基礎(chǔ)學(xué)習(xí)之常見的內(nèi)建函數(shù)整理
所謂的內(nèi)建函數(shù),可以直接使用,而不需要import。下面這篇文章主要給大家整理介紹了關(guān)于Python基礎(chǔ)學(xué)習(xí)之常見的一些內(nèi)建函數(shù),文中通過示例代碼為大家介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面跟著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2017-09-09教你用Python+selenium搭建自動(dòng)化測試環(huán)境
今天給大家?guī)淼氖顷P(guān)于Python的相關(guān)知識,文章圍繞著如何用Python+selenium搭建自動(dòng)化測試環(huán)境展開,文中有非常詳細(xì)的介紹,需要的朋友可以參考下2021-06-06Python3.4學(xué)習(xí)筆記之 idle 清屏擴(kuò)展插件用法分析
這篇文章主要介紹了Python3.4 idle 清屏擴(kuò)展插件用法,簡單分析了idle清屏的幾種方法及idle清屏插件的相關(guān)使用技巧,需要的朋友可以參考下2019-03-03Python中選擇結(jié)構(gòu)實(shí)例講解
在本篇文章里小編給大家整理了關(guān)于Python選擇結(jié)構(gòu)的基礎(chǔ)知識點(diǎn)及相關(guān)實(shí)例,有需要的朋友們可以學(xué)習(xí)參考下。2022-11-11在Python中實(shí)現(xiàn)決策樹算法的示例代碼
決策樹(Decision Tree)是一種常見的機(jī)器學(xué)習(xí)算法,被廣泛應(yīng)用于分類和回歸任務(wù)中,并且再其之上的隨機(jī)森林和提升樹等算法一直是表格領(lǐng)域的最佳模型,所以本文將介紹理解其數(shù)學(xué)概念,并在Python中動(dòng)手實(shí)現(xiàn),這可以作為了解這類算法的基礎(chǔ)知識2023-08-08