Python實(shí)現(xiàn)cpu并行運(yùn)算的兩種方式
Python一共有兩種并行方式
1. 使用multiprocessing
第一種方式用于單個(gè)節(jié)點(diǎn)內(nèi)部的并行,也就是說(shuō)同時(shí)發(fā)起的進(jìn)程數(shù)不能超過(guò)你單個(gè)機(jī)器CPU的線程數(shù)。
以下是第一種方式的并行程序:
import multiprocessing import time import os import numpy as np ncore=20 def run(core): Your code reture 0 if __name__ == '__main__': print(time.strftime('%Y-%m-%d %H:%M:%S')) param = np.arange(20) p = multiprocessing.Pool(ncore) p.map(run, param) p.close() p.join() print(time.strftime('%Y-%m-%d %H:%M:%S'))
提交腳本直接:
python your_job_name.py
2. 使用mpi4py
第二種方式用于跨節(jié)點(diǎn)的并行,可以發(fā)起成千上百個(gè)CPU的并行。
以下是第二中方式的并行程序:
from mpi4py import MPI import time import os import numpy as np ncore=20 def run(core): Your code reture 0 if __name__ == '__main__': print(time.strftime('%Y-%m-%d %H:%M:%S')) comm = MPI.COMM_WORLD rank = comm.Get_rank() run(rank) print(time.strftime('%Y-%m-%d %H:%M:%S'))
提交腳本需要用到mpi
mpiexec -n cpu_number python your_job_name.py
知識(shí)拓展:python多進(jìn)程模式實(shí)現(xiàn)多核CPU的并行計(jì)算
Python中的多進(jìn)程模式
在Python中,可以使用multiprocessing模塊來(lái)實(shí)現(xiàn)多進(jìn)程。multiprocessing是Python標(biāo)準(zhǔn)庫(kù)中的一個(gè)模塊,用于管理多進(jìn)程的創(chuàng)建和通信。
在multiprocessing中,可以使用Process類(lèi)來(lái)創(chuàng)建進(jìn)程,Process類(lèi)的構(gòu)造函數(shù)可以接受一個(gè)函數(shù)作為參數(shù)。
該函數(shù)將在子進(jìn)程中執(zhí)行。下面是一個(gè)簡(jiǎn)單的示例:
import multiprocessing def worker(): print("Worker process started") if __name__ == '__main__': p = multiprocessing.Process(target=worker) p.start() p.join()
在上面的示例中,我們首先定義了一個(gè)worker函數(shù),然后使用Process類(lèi)創(chuàng)建了一個(gè)進(jìn)程,并將worker函數(shù)作為參數(shù)傳遞給Process類(lèi)的構(gòu)造函數(shù)。
最后,我們調(diào)用Process類(lèi)的start方法啟動(dòng)進(jìn)程,并調(diào)用Process類(lèi)的join方法等待進(jìn)程結(jié)束。
- 提高程序執(zhí)行效率的方法
在Python中使用多進(jìn)程模式提高程序執(zhí)行效率,可以通過(guò)以下幾種方式來(lái)實(shí)現(xiàn):
- 1 多進(jìn)程并發(fā)執(zhí)行任務(wù)
在多進(jìn)程模式下,可以將任務(wù)分配給多個(gè)進(jìn)程并行執(zhí)行,從而利用多核CPU的優(yōu)勢(shì)。
在Python中,可以使用multiprocessing模塊來(lái)實(shí)現(xiàn)多進(jìn)程并發(fā)執(zhí)行任務(wù)。
下面是一個(gè)簡(jiǎn)單的示例:
import multiprocessing def worker(name): print("Worker %s started" % name) if __name__ == '__main__': for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) p.start()
在上面的示例中,我們定義了一個(gè)worker函數(shù),該函數(shù)接受一個(gè)參數(shù)name,并在函數(shù)體中打印出Worker name started的信息。
然后我們使用for循環(huán)創(chuàng)建了5個(gè)進(jìn)程,并將worker函數(shù)和對(duì)應(yīng)的參數(shù)傳遞給Process類(lèi)的構(gòu)造函數(shù)。
最后,我們調(diào)用Process類(lèi)的start方法啟動(dòng)進(jìn)程。
- 2 進(jìn)程池
對(duì)于大量重復(fù)的任務(wù),可以使用進(jìn)程池來(lái)維護(hù)一定數(shù)量的進(jìn)程,每個(gè)進(jìn)程執(zhí)行一個(gè)任務(wù)后返回結(jié)果,然后再由進(jìn)程池分配下一個(gè)任務(wù)。
這樣可以避免頻繁地創(chuàng)建和銷(xiāo)毀進(jìn)程,提高效率。在Python中,可以使用multiprocessing模塊的Pool類(lèi)來(lái)實(shí)現(xiàn)進(jìn)程池。
下面是一個(gè)簡(jiǎn)單的示例:
import multiprocessing def worker(name): print("Worker %s started" % name) if __name__ == '__main__': with multiprocessing.Pool(processes=4) as pool: pool.map(worker, range(10))
在上面的示例中,我們定義了一個(gè)worker函數(shù),該函數(shù)接受一個(gè)參數(shù)name,并在函數(shù)體中打印出Worker name started的信息。
然后我們使用with語(yǔ)句創(chuàng)建了一個(gè)進(jìn)程池,并指定進(jìn)程池中的進(jìn)程數(shù)量為4。
最后,我們使用Pool類(lèi)的map方法將worker函數(shù)和對(duì)應(yīng)的參數(shù)傳遞給進(jìn)程池,進(jìn)程池會(huì)自動(dòng)分配任務(wù)給不同的進(jìn)程執(zhí)行。
- 3 消息隊(duì)列
在多進(jìn)程模式下,不同的進(jìn)程之間需要進(jìn)行通信,可以利用消息隊(duì)列來(lái)實(shí)現(xiàn)進(jìn)程間通信。
Python中可以使用Queue模塊來(lái)實(shí)現(xiàn)消息隊(duì)列。下面是一個(gè)簡(jiǎn)單的示例:
import multiprocessing def producer(queue): for i in range(10): queue.put(i) def consumer(queue): while not queue.empty(): print(queue.get()) if __name__ == '__main__': queue = multiprocessing.Queue() p1 = multiprocessing.Process(target=producer, args=(queue,)) p2 = multiprocessing.Process(target=consumer, args=(queue,)) p1.start() p2.start() p1.join() p2.join()
在上面的示例中,我們定義了一個(gè)producer函數(shù)和一個(gè)consumer函數(shù),producer函數(shù)將0~9的數(shù)字放入消息隊(duì)列,consumer函數(shù)從消息隊(duì)列中取出數(shù)字并打印出來(lái)。
然后我們使用multiprocessing模塊的Queue類(lèi)創(chuàng)建了一個(gè)消息隊(duì)列,并使用Process類(lèi)創(chuàng)建了兩個(gè)進(jìn)程分別執(zhí)行producer函數(shù)和consumer函數(shù)。
- 4 共享內(nèi)存
對(duì)于需要多個(gè)進(jìn)程共享的數(shù)據(jù),可以使用共享內(nèi)存來(lái)避免數(shù)據(jù)拷貝和進(jìn)程間通信的開(kāi)銷(xiāo)。
在Python中,可以使用multiprocessing模塊的Value和Array類(lèi)來(lái)實(shí)現(xiàn)共享內(nèi)存。
下面是一個(gè)簡(jiǎn)單的示例:
import multiprocessing def worker(counter): counter.value += 1 if __name__ == '__main__': counter = multiprocessing.Value('i', 0) processes = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(counter,)) processes.append(p) p.start() for p in processes: p.join() print(counter.value)
在上面的示例中,我們定義了一個(gè)worker函數(shù),該函數(shù)接受一個(gè)參數(shù)counter,每次執(zhí)行時(shí)將counter的值加1。
然后我們使用multiprocessing模塊的Value類(lèi)創(chuàng)建了一個(gè)整型變量counter,并使用Process類(lèi)創(chuàng)建了5個(gè)進(jìn)程分別執(zhí)行worker函數(shù)。
最后,我們打印出counter的值。
- 5 異步IO
對(duì)于I/O密集型任務(wù),可以使用異步IO來(lái)提高效率。在Python中,可以使用asyncio模塊來(lái)實(shí)現(xiàn)異步IO。
下面是一個(gè)簡(jiǎn)單的示例:
import asyncio async def worker(): await asyncio.sleep(1) print("Worker process started") loop = asyncio.get_event_loop() loop.run_until_complete(worker())
在上面的示例中,我們定義了一個(gè)worker函數(shù),該函數(shù)使用asyncio庫(kù)的異步IO特性。
在函數(shù)體中,使用asyncio.sleep函數(shù)模擬了一個(gè)長(zhǎng)時(shí)間的I/O操作,并在操作完成后打印了一條消息。
然后我們使用asyncio庫(kù)的get_event_loop函數(shù)創(chuàng)建了一個(gè)事件循環(huán),并使用run_until_complete函數(shù)啟動(dòng)worker函數(shù)。在程序執(zhí)行過(guò)程中,事件循環(huán)會(huì)負(fù)責(zé)調(diào)度和執(zhí)行異步IO操作。
- 總結(jié)
在Python中,使用多進(jìn)程模式可以實(shí)現(xiàn)多核CPU的并行計(jì)算,從而提高程序的執(zhí)行效率。
在本文中,我們介紹了如何使用Python的multiprocessing模塊實(shí)現(xiàn)多進(jìn)程并發(fā)執(zhí)行任務(wù)、進(jìn)程池、消息隊(duì)列、共享內(nèi)存、異步IO等方式來(lái)提高程序執(zhí)行效率。
實(shí)際應(yīng)用中,需要根據(jù)具體的場(chǎng)景選擇合適的并行計(jì)算方式,并注意避免死鎖等常見(jiàn)問(wèn)題。
到此這篇關(guān)于Python實(shí)現(xiàn)cpu并行運(yùn)算的兩種方式的文章就介紹到這了,更多相關(guān)Python cpu并行運(yùn)算內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python實(shí)現(xiàn)腳本鎖功能(同時(shí)只能執(zhí)行一個(gè)腳本)
這篇文章主要介紹了Python實(shí)現(xiàn)腳本鎖功能(同時(shí)只能執(zhí)行一個(gè)腳本),本文給大家分享了兩種方法,大家可以根據(jù)個(gè)人所需選擇適合自己的方法2017-05-05Python獲取"3年前的今天"的日期時(shí)間問(wèn)題
在Python中,如何獲取"?3年前的今天"的datetime對(duì)象,本文通過(guò)實(shí)例代碼給大家詳細(xì)講解,代碼簡(jiǎn)單易懂對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2023-01-01Python中查看變量的類(lèi)型內(nèi)存地址所占字節(jié)的大小
這篇文章主要介紹了Python中查看變量的類(lèi)型,內(nèi)存地址,所占字節(jié)的大小,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-06-06Django中redis的使用方法(包括安裝、配置、啟動(dòng))
下面小編就為大家分享一篇Django中redis的使用方法(包括安裝、配置、啟動(dòng)),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-02-02解決pyinstaller 打包exe文件太大,用pipenv 縮小exe的問(wèn)題
這篇文章主要介紹了解決pyinstaller 打包exe文件太大,用pipenv 縮小exe的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-07-07