Python多進程并發(fā)與多線程并發(fā)編程實例總結
本文實例總結了Python多進程并發(fā)與多線程并發(fā)。分享給大家供大家參考,具體如下:
這里對python支持的幾種并發(fā)方式進行簡單的總結。
Python支持的并發(fā)分為多線程并發(fā)與多進程并發(fā)(異步IO本文不涉及)。概念上來說,多進程并發(fā)即運行多個獨立的程序,優(yōu)勢在于并發(fā)處理的任務都由操作系統(tǒng)管理,不足之處在于程序與各進程之間的通信和數(shù)據共享不方便;多線程并發(fā)則由程序員管理并發(fā)處理的任務,這種并發(fā)方式可以方便地在線程間共享數(shù)據(前提是不能互斥)。Python對多線程和多進程的支持都比一般編程語言更高級,最小化了需要我們完成的工作。
一.多進程并發(fā)
Mark Summerfield指出,對于計算密集型程序,多進程并發(fā)優(yōu)于多線程并發(fā)。計算密集型程序指的程序的運行時間大部分消耗在CPU的運算處理過程,而硬盤和內存的讀寫消耗的時間很短;相對地,IO密集型程序指的則是程序的運行時間大部分消耗在硬盤和內存的讀寫上,CPU的運算時間很短。
對于多進程并發(fā),python支持兩種實現(xiàn)方式,一種是采用進程安全的數(shù)據結構:multiprocessing.JoinableQueue,這種數(shù)據結構自己管理“加鎖”的過程,程序員無需擔心“死鎖”的問題;python還提供了一種更為優(yōu)雅而高級的實現(xiàn)方式:采用進程池。下面一一介紹。
1.隊列實現(xiàn)——使用multiprocessing.JoinableQueue
multiprocessing是python標準庫中支持多進程并發(fā)的模塊,我們這里采用multiprocessing中的數(shù)據結構:JoinableQueue,它本質上仍是一個FIFO的隊列,它與一般隊列(如queue中的Queue)的區(qū)別在于它是多進程安全的,這意味著我們不用擔心它的互斥和死鎖問題。JoinableQueue主要可以用來存放執(zhí)行的任務和收集任務的執(zhí)行結果。舉例來看(以下皆省去導入包的過程):
def read(q): while True: try: value = q.get() print('Get %s from queue.' % value) time.sleep(random.random()) finally: q.task_done() def main(): q = multiprocessing.JoinableQueue() pw1 = multiprocessing.Process(target=read, args=(q,)) pw2 = multiprocessing.Process(target=read, args=(q,)) pw1.daemon = True pw2.daemon = True pw1.start() pw2.start() for c in [chr(ord('A')+i) for i in range(26)]: q.put(c) try: q.join() except KeyboardInterrupt: print("stopped by hand") if __name__ == '__main__': main()
對于windows系統(tǒng)的多進程并發(fā),程序文件里必須含有“入口函數(shù)”(如main函數(shù)),且結尾處必須調用入口點。例如以if __name__ == '__main__': main()
結尾。
在這個最簡單的多進程并發(fā)例子里,我們用多進程實現(xiàn)將26個字母打印出來。首先定義一個存放任務的JoinableQueue對象,然后實例化兩個Process對象(每個對象對應一個子進程),實例化Process對象需要傳送target和args參數(shù),target是實現(xiàn)每個任務工作中的具體函數(shù),args是target函數(shù)的參數(shù)。
pw1.daemon = True pw2.daemon = True
這兩句話將子進程設置為守護進程——主進程結束后隨之結束。
pw1.start() pw2.start()
一旦運行到這兩句話,子進程就開始獨立于父進程運行了,它會在單獨的進程里調用target引用的函數(shù)——在這里即read函數(shù),它是一個死循環(huán),將參數(shù)q中的數(shù)一一讀取并打印出來。
value = q.get()
這是多進程并發(fā)的要點,q是一個JoinableQueue對象,支持get方法讀取第一個元素,如果q中沒有元素,進程就會阻塞,直至q中被存入新元素。
因此執(zhí)行完pw1.start()
pw2.start()
這兩句話后,子進程雖然開始運行了,但很快就堵塞住。
for c in [chr(ord('A')+i) for i in range(26)]: q.put(c)
將26個字母依次放入JoinableQueue對象中,這時候兩個子進程不再阻塞,開始真正地執(zhí)行任務。兩個子進程都用value = q.get()來讀取數(shù)據,它們都在修改q對象,而我們并不用擔心同步問題,這就是multiProcessing.Joinable數(shù)據結構的優(yōu)勢所在——它是多進程安全的,它會自動處理“加鎖”的過程。
try: q.join()
q.join()
方法會查詢q中的數(shù)據是否已讀完——這里指的就是任務是否執(zhí)行完,如果沒有,程序會阻塞住等待q中數(shù)據讀完才開始繼續(xù)執(zhí)行(可以用Ctrl+C強制停止)。
對Windows系統(tǒng),調用任務管理器應該可以看到有多個子進程在運行。
2.進程池實現(xiàn)——使用concurrent.futures.ProcessPoolExecutor
Python還支持一種更為優(yōu)雅的多進程并發(fā)方式,直接看例子:
def read(q): print('Get %s from queue.' % q) time.sleep(random.random()) def main(): futures = set() with concurrent.futures.ProcessPoolExecutor() as executor: for q in (chr(ord('A')+i) for i in range(26)): future = executor.submit(read, q) futures.add(future) try: for future in concurrent.futures.as_completed(futures): err = future.exception() if err is not None: raise err except KeyboardInterrupt: print("stopped by hand") if __name__ == '__main__': main()
這里我們采用concurrent.futures.ProcessPoolExecutor對象,可以把它想象成一個進程池,子進程往里“填”。我們通過submit方法實例一個Future對象,然后把這里Future對象都填到池——futures里,這里futures是一個set對象。只要進程池里有future,就會開始執(zhí)行任務。這里的read函數(shù)更為簡單——只是把一個字符打印并休眠一會而已。
try: for future in concurrent.futures.as_completed(futures):
這是等待所有子進程都執(zhí)行完畢。子進程執(zhí)行過程中可能拋出異常,err = future.exception()
可以收集這些異常,便于后期處理。
可以看出用Future對象處理多進程并發(fā)更為簡潔,無論是target函數(shù)的編寫、子進程的啟動等等,future對象還可以向使用者匯報其狀態(tài),也可以匯報執(zhí)行結果或執(zhí)行時的異常。
二.多線程并發(fā)
對于IO密集型程序,多線程并發(fā)可能要優(yōu)于多進程并發(fā)。因為對于網絡通信等IO密集型任務來說,決定程序效率的主要是網絡延遲,這時候是使用進程還是線程就沒有太大關系了。
1.隊列實現(xiàn)——使用queue.Queue
程序與多進程基本一致,只是這里我們不必使用multiProcessing.JoinableQueue
對象了,一般的隊列(來自queue.Queue)就可以滿足要求:
def read(q): while True: try: value = q.get() print('Get %s from queue.' % value) time.sleep(random.random()) finally: q.task_done() def main(): q = queue.Queue() pw1 = threading.Thread(target=read, args=(q,)) pw2 = threading.Thread(target=read, args=(q,)) pw1.daemon = True pw2.daemon = True pw1.start() pw2.start() for c in [chr(ord('A')+i) for i in range(26)]: q.put(c) try: q.join() except KeyboardInterrupt: print("stopped by hand") if __name__ == '__main__': main()
并且這里我們實例化的是Thread對象,而不是Process對象,程序的其余部分看起來與多進程并沒有什么兩樣。
2. 線程池實現(xiàn)——使用concurrent.futures.ThreadPoolExecutor
直接看例子:
def read(q): print('Get %s from queue.' % q) time.sleep(random.random()) def main(): futures = set() with concurrent.futures.ThreadPoolExecutor(multiprocessing.cpu_count()*4) as executor: for q in (chr(ord('A')+i) for i in range(26)): future = executor.submit(read, q) futures.add(future) try: for future in concurrent.futures.as_completed(futures): err = future.exception() if err is not None: raise err except KeyboardInterrupt: print("stopped by hand") if __name__ == '__main__': main()
用ThreadPoolExecutor與用ProcessPoolExecutor看起來沒什么區(qū)別,只是改了一下簽名而已。
不難看出,不管是使用隊列還是使用進/線程池,從多進程轉化到多線程是十分容易的——僅僅是修改了幾個簽名而已。當然內部機制完全不同,只是python的封裝非常好,使我們可以不用關心這些細節(jié),這正是python優(yōu)雅之處。
更多關于Python相關內容感興趣的讀者可查看本站專題:《Python進程與線程操作技巧總結》、《Python Socket編程技巧總結》、《Python數(shù)據結構與算法教程》、《Python函數(shù)使用技巧總結》、《Python字符串操作技巧匯總》、《Python入門與進階經典教程》及《Python文件與目錄操作技巧匯總》
希望本文所述對大家Python程序設計有所幫助。
相關文章
大語言模型的開發(fā)利器langchainan安裝使用快速入門學習
這篇文章主要為大家介紹了大語言模型的開發(fā)利器langchain安裝使用快速入門學習,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-07-07