Python使用multiprocessing實(shí)現(xiàn)多進(jìn)程的詳細(xì)步驟記錄
前言
當(dāng)我們工作中涉及到處理大量數(shù)據(jù)、并行計(jì)算或并發(fā)任務(wù)時(shí),Python的multiprocessing
模塊是一個(gè)強(qiáng)大而實(shí)用的工具。通過它,我們可以輕松地利用多核處理器的優(yōu)勢,將任務(wù)分配給多個(gè)進(jìn)程并同時(shí)執(zhí)行,從而提高程序的性能和效率。在本文中,我們將探索如何使用multiprocessing
模塊實(shí)現(xiàn)多進(jìn)程編程。將介紹進(jìn)程池的概念和用法,以及如何使用它來管理和調(diào)度多個(gè)進(jìn)程。我們還將討論并發(fā)任務(wù)的處理、進(jìn)程間通信和結(jié)果獲取等關(guān)鍵問題,希望能給大家的工作帶來一些幫助。
一、介紹
Python多進(jìn)程是一種并行編程模型,允許在Python程序中同時(shí)執(zhí)行多個(gè)進(jìn)程。每個(gè)進(jìn)程都擁有自己的獨(dú)立內(nèi)存空間和執(zhí)行環(huán)境,可以并行地執(zhí)行任務(wù),從而提高程序的性能和效率。
優(yōu)點(diǎn):
并行處理:多進(jìn)程可以同時(shí)執(zhí)行多個(gè)任務(wù),充分利用多核處理器的能力,實(shí)現(xiàn)并行處理。這可以顯著提高程序的性能和效率,特別是在處理密集型任務(wù)或需要大量計(jì)算的場景中。
獨(dú)立性:每個(gè)進(jìn)程都有自己的獨(dú)立地址空間和執(zhí)行環(huán)境,進(jìn)程之間互不干擾。這意味著每個(gè)進(jìn)程都可以獨(dú)立地執(zhí)行任務(wù),不會(huì)受到其他進(jìn)程的影響。這種獨(dú)立性使得多進(jìn)程編程更加健壯和可靠。
內(nèi)存隔離:由于每個(gè)進(jìn)程都擁有自己的地址空間,多進(jìn)程之間的數(shù)據(jù)是相互隔離的。這意味著不同進(jìn)程之間的變量和數(shù)據(jù)不會(huì)相互影響,減少了數(shù)據(jù)共享和同步的復(fù)雜性。
故障隔離:如果一個(gè)進(jìn)程崩潰或出現(xiàn)錯(cuò)誤,不會(huì)影響其他進(jìn)程的執(zhí)行。每個(gè)進(jìn)程是獨(dú)立的實(shí)體,一個(gè)進(jìn)程的故障不會(huì)對(duì)整個(gè)程序產(chǎn)生致命影響,提高了程序的穩(wěn)定性和容錯(cuò)性。
可移植性:多進(jìn)程編程可以在不同的操作系統(tǒng)上運(yùn)行,因?yàn)檫M(jìn)程是操作系統(tǒng)提供的基本概念。這使得多進(jìn)程編程具有很好的可移植性,可以在不同的平臺(tái)上部署和運(yùn)行。
缺點(diǎn):
資源消耗:每個(gè)進(jìn)程都需要獨(dú)立的內(nèi)存空間和系統(tǒng)資源,包括打開的文件、網(wǎng)絡(luò)連接等。多進(jìn)程編程可能會(huì)增加系統(tǒng)的資源消耗,尤其是在創(chuàng)建大量進(jìn)程時(shí)。
上下文切換開銷:在多進(jìn)程編程中,進(jìn)程之間的切換需要保存和恢復(fù)進(jìn)程的執(zhí)行環(huán)境,這涉及到上下文切換的開銷。頻繁的進(jìn)程切換可能會(huì)導(dǎo)致額外的開銷,影響程序的性能。
數(shù)據(jù)共享與同步:由于多進(jìn)程之間的數(shù)據(jù)是相互隔離的,需要通過特定的機(jī)制進(jìn)行數(shù)據(jù)共享和同步。這可能涉及到進(jìn)程間通信(IPC)的復(fù)雜性,如隊(duì)列、管道、共享內(nèi)存等。正確處理數(shù)據(jù)共享和同步是多進(jìn)程編程中的挑戰(zhàn)之一。
編程復(fù)雜性:相比于單線程或多線程編程,多進(jìn)程編程可能更加復(fù)雜。需要考慮進(jìn)程的創(chuàng)建和管理、進(jìn)程間通信、數(shù)據(jù)共享和同步等問題。編寫和調(diào)試多進(jìn)程程序可能需要更多的工作和經(jīng)驗(yàn)。
進(jìn)程與線程:
- 在討論多進(jìn)程之前,需要明確進(jìn)程(Process)和線程(Thread)的概念。
- 進(jìn)程是計(jì)算機(jī)中正在運(yùn)行的程序的實(shí)例。每個(gè)進(jìn)程都有自己的地址空間、數(shù)據(jù)棧和控制信息,可以獨(dú)立執(zhí)行任務(wù)。
- 線程是進(jìn)程中的一個(gè)執(zhí)行單元,可以看作是輕量級(jí)的進(jìn)程。多個(gè)線程共享同一進(jìn)程的資源,包括內(nèi)存空間、文件描述符等。
多進(jìn)程編程在并行處理和資源隔離方面具有明顯的優(yōu)勢,但也涉及到資源消耗、上下文切換開銷、數(shù)據(jù)共享和同步等問題。在實(shí)際應(yīng)用中,開發(fā)者應(yīng)權(quán)衡利弊,根據(jù)具體場景選擇適合的編程模型和工具。
二、創(chuàng)建進(jìn)程
在Python中,可以使用multiprocessing
模塊來創(chuàng)建和管理進(jìn)程。該模塊提供了豐富的類和函數(shù),用于創(chuàng)建、啟動(dòng)和管理進(jìn)程。
1、導(dǎo)入multiprocessing模塊
在使用multiprocessing
模塊之前,需要先導(dǎo)入它:
import multiprocessing
2、創(chuàng)建進(jìn)程
可以使用multiprocessing.Process
類來創(chuàng)建進(jìn)程對(duì)象。需要傳入一個(gè)目標(biāo)函數(shù)作為進(jìn)程的執(zhí)行邏輯。可以通過繼承multiprocessing.Process
類來自定義進(jìn)程類。
import multiprocessing def worker(): # 進(jìn)程執(zhí)行的邏輯 if __name__ == '__main__': process = multiprocessing.Process(target=worker)
在上面的示例中,worker
函數(shù)是進(jìn)程的執(zhí)行邏輯。進(jìn)程對(duì)象創(chuàng)建后,可以通過設(shè)置參數(shù)、調(diào)用方法等來配置進(jìn)程。
3、啟動(dòng)進(jìn)程
通過調(diào)用進(jìn)程對(duì)象的start()
方法,可以啟動(dòng)進(jìn)程。進(jìn)程會(huì)在后臺(tái)開始執(zhí)行。
process.start()
4、進(jìn)程的狀態(tài)
進(jìn)程對(duì)象提供了一些方法來獲取和管理進(jìn)程的狀態(tài):
is_alive()
:檢查進(jìn)程是否正在運(yùn)行。join([timeout])
:等待進(jìn)程結(jié)束??蛇x參數(shù)timeout
指定等待的最長時(shí)間。
if process.is_alive(): print("進(jìn)程正在運(yùn)行") process.join()
5、進(jìn)程的終止
進(jìn)程可以通過調(diào)用進(jìn)程對(duì)象的terminate()
方法來終止。這會(huì)立即停止進(jìn)程的執(zhí)行。
process.terminate()
二、進(jìn)程間通信
進(jìn)程間通信(Inter-Process Communication,IPC)是指不同進(jìn)程之間進(jìn)行數(shù)據(jù)交換和共享信息的機(jī)制。在多進(jìn)程編程中,進(jìn)程之間通常需要進(jìn)行數(shù)據(jù)傳輸、共享狀態(tài)或進(jìn)行同步操作。Python提供了多種進(jìn)程間通信的機(jī)制,包括隊(duì)列(Queue)、管道(Pipe)、共享內(nèi)存(Value、Array)等。
1、隊(duì)列(Queue)
隊(duì)列是一種常用的進(jìn)程間通信方式,通過隊(duì)列可以實(shí)現(xiàn)進(jìn)程之間的數(shù)據(jù)傳輸。Python的multiprocessing
模塊提供了Queue
類來實(shí)現(xiàn)多進(jìn)程之間的隊(duì)列通信。進(jìn)程可以通過put()
方法將數(shù)據(jù)放入隊(duì)列,其他進(jìn)程則可以通過get()
方法從隊(duì)列中獲取數(shù)據(jù)。
from multiprocessing import Queue # 創(chuàng)建隊(duì)列 queue = Queue() # 進(jìn)程1放入數(shù)據(jù) queue.put(data) # 進(jìn)程2獲取數(shù)據(jù) data = queue.get()
2、管道(Pipe)
管道是另一種常用的進(jìn)程間通信方式,通過管道可以實(shí)現(xiàn)進(jìn)程之間的雙向通信。Python的multiprocessing
模塊提供了Pipe
類來創(chuàng)建管道對(duì)象。Pipe()
方法返回兩個(gè)連接的管道端,一個(gè)用于發(fā)送數(shù)據(jù),另一個(gè)用于接收數(shù)據(jù)。
from multiprocessing import Pipe # 創(chuàng)建管道 conn1, conn2 = Pipe() # 進(jìn)程1發(fā)送數(shù)據(jù) conn1.send(data) # 進(jìn)程2接收數(shù)據(jù) data = conn2.recv()
3、共享內(nèi)存(Value、Array)
共享內(nèi)存是一種在多進(jìn)程之間共享數(shù)據(jù)的高效方式。Python的multiprocessing
模塊提供了Value
和Array
類來實(shí)現(xiàn)進(jìn)程間共享數(shù)據(jù)。Value
用于共享單個(gè)值,而Array
用于共享數(shù)組。
from multiprocessing import Value, Array # 創(chuàng)建共享值 shared_value = Value('i', 0) # 創(chuàng)建共享數(shù)組 shared_array = Array('i', [1, 2, 3, 4, 5])
在創(chuàng)建共享值和共享數(shù)組時(shí),需要指定數(shù)據(jù)類型(如整數(shù)、浮點(diǎn)數(shù))和初始值。進(jìn)程可以通過讀寫共享值和共享數(shù)組來進(jìn)行進(jìn)程間的數(shù)據(jù)共享。
4、信號(hào)量(Semaphore)
信號(hào)量是一種用于控制對(duì)共享資源的訪問的機(jī)制。在多進(jìn)程編程中,信號(hào)量可以用于限制同時(shí)訪問某個(gè)共享資源的進(jìn)程數(shù)量。
from multiprocessing import Semaphore, Process import time def worker(semaphore, name): semaphore.acquire() print("Worker", name, "acquired semaphore") time.sleep(2) print("Worker", name, "released semaphore") semaphore.release() semaphore = Semaphore(2) processes = [] for i in range(5): p = Process(target=worker, args=(semaphore, i)) processes.append(p) p.start() for p in processes: p.join()
在上述例子中,創(chuàng)建了一個(gè)信號(hào)量,初始值為2。然后創(chuàng)建了5個(gè)進(jìn)程,每個(gè)進(jìn)程在執(zhí)行前會(huì)嘗試獲取信號(hào)量,如果信號(hào)量的值大于0,則成功獲??;否則,進(jìn)程將被阻塞,直到有進(jìn)程釋放信號(hào)量。每個(gè)進(jìn)程獲取信號(hào)量后,會(huì)執(zhí)行一段任務(wù),并在執(zhí)行完后釋放信號(hào)量。
5、事件(Event)
事件是一種用于多進(jìn)程間通信的同步機(jī)制,它允許一個(gè)或多個(gè)進(jìn)程等待某個(gè)事件的發(fā)生,然后再繼續(xù)執(zhí)行。
from multiprocessing import Event, Process import time def worker(event, name): print("Worker", name, "waiting for event") event.wait() print("Worker", name, "received event") time.sleep(2) print("Worker", name, "completed task") event = Event() processes = [] for i in range(3): p = Process(target=worker, args=(event, i)) processes.append(p) p.start() time.sleep(3) event.set() for p in processes: p.join()
在上述例子中,創(chuàng)建了一個(gè)事件。然后創(chuàng)建了3個(gè)進(jìn)程,每個(gè)進(jìn)程在執(zhí)行前會(huì)等待事件的發(fā)生,即調(diào)用event.wait()
方法。主進(jìn)程休眠3秒后,設(shè)置事件的狀態(tài)為已發(fā)生,即調(diào)用event.set()
方法。此時(shí),所有等待事件的進(jìn)程將被喚醒,并繼續(xù)執(zhí)行任務(wù)。
6、條件變量(Condition)
條件變量是一種用于多進(jìn)程間協(xié)調(diào)和同步的機(jī)制,它可以用于控制多個(gè)進(jìn)程之間的執(zhí)行順序。
from multiprocessing import Condition, Process import time def consumer(condition): with condition: print("Consumer is waiting") condition.wait() print("Consumer is consuming the product") def producer(condition): with condition: time.sleep(2) print("Producer is producing the product") condition.notify() condition = Condition() consumer_process = Process(target=consumer, args=(condition,)) producer_process = Process(target=producer, args=(condition,)) consumer_process.start() producer_process.start() consumer_process.join() producer_process.join()
在上述例子中,創(chuàng)建了一個(gè)條件變量。然后創(chuàng)建了一個(gè)消費(fèi)者進(jìn)程和一個(gè)生產(chǎn)者進(jìn)程。消費(fèi)者進(jìn)程在執(zhí)行前等待條件的滿足,即調(diào)用condition.wait()
方法。生產(chǎn)者進(jìn)程休眠2秒后,生成產(chǎn)品并通過condition.notify()
方法通知消費(fèi)者。消費(fèi)者收到通知后繼續(xù)執(zhí)行任務(wù)。
三、進(jìn)程間同步
進(jìn)程間同步是確保多個(gè)進(jìn)程按照特定順序執(zhí)行或在共享資源上進(jìn)行互斥訪問的一種機(jī)制。進(jìn)程間同步的目的是避免競態(tài)條件(race condition)和數(shù)據(jù)不一致的問題。Python提供了多種機(jī)制來實(shí)現(xiàn)進(jìn)程間的同步,包括鎖(Lock)、信號(hào)量(Semaphore)、事件(Event)、條件變量(Condition)等。
1、鎖(Lock)
鎖是一種最基本的同步機(jī)制,用于保護(hù)共享資源的互斥訪問,確保在任意時(shí)刻只有一個(gè)進(jìn)程可以訪問共享資源。在Python中,可以使用multiprocessing
模塊的Lock
類來實(shí)現(xiàn)鎖。
from multiprocessing import Lock, Process lock = Lock() def worker(lock, data): lock.acquire() try: # 對(duì)共享資源進(jìn)行操作 pass finally: lock.release() processes = [] for i in range(5): p = Process(target=worker, args=(lock, i)) processes.append(p) p.start() for p in processes: p.join()
在上述例子中,每個(gè)進(jìn)程在訪問共享資源之前會(huì)先獲取鎖,然后在完成操作后釋放鎖。這樣可以確保在同一時(shí)刻只有一個(gè)進(jìn)程能夠訪問共享資源,避免數(shù)據(jù)競爭問題。
2、信號(hào)量(Semaphore)
信號(hào)量是一種更為靈活的同步機(jī)制,它允許多個(gè)進(jìn)程同時(shí)訪問某個(gè)資源,但限制同時(shí)訪問的進(jìn)程數(shù)量。在Python中,可以使用multiprocessing
模塊的Semaphore
類來實(shí)現(xiàn)信號(hào)量。
from multiprocessing import Semaphore, Process semaphore = Semaphore(2) def worker(semaphore, data): semaphore.acquire() try: # 對(duì)共享資源進(jìn)行操作 pass finally: semaphore.release() processes = [] for i in range(5): p = Process(target=worker, args=(semaphore, i)) processes.append(p) p.start() for p in processes: p.join()
在上述例子中,創(chuàng)建了一個(gè)初始值為2的信號(hào)量。每個(gè)進(jìn)程在訪問共享資源之前會(huì)嘗試獲取信號(hào)量,只有當(dāng)信號(hào)量的值大于0時(shí)才能獲取成功,否則進(jìn)程將被阻塞。獲取成功后,進(jìn)程可以進(jìn)行操作,并在完成后釋放信號(hào)量。
3、事件(Event)
事件是一種同步機(jī)制,用于實(shí)現(xiàn)進(jìn)程之間的等待和通知機(jī)制。一個(gè)進(jìn)程可以等待事件的發(fā)生,而另一個(gè)進(jìn)程可以觸發(fā)事件的發(fā)生。在Python中,可以使用multiprocessing
模塊的Event
類來實(shí)現(xiàn)事件。
from multiprocessing import Event, Process event = Event() def worker(event, data): event.wait() # 執(zhí)行任務(wù) processes = [] for i in range(5): p = Process(target=worker, args=(event, i)) processes.append(p) p.start() # 觸發(fā)事件的發(fā)生 event.set() for p in processes: p.join()
在上述例子中,多個(gè)進(jìn)程在執(zhí)行任務(wù)前會(huì)等待事件的發(fā)生,即調(diào)用event.wait()
方法。主進(jìn)程通過調(diào)用event.set()
方法來觸發(fā)事件的發(fā)生,進(jìn)而喚醒等待的進(jìn)程繼續(xù)執(zhí)行。
4、條件變量(Condition)
條件變量是一種復(fù)雜的同步機(jī)制,它允許進(jìn)程按照特定的條件等待和通知。在Python中,可以使用multiprocessing
模塊的Condition
類來實(shí)現(xiàn)條件變量。
from multiprocessing import Condition, Process condition = Condition() def consumer(condition(續(xù)): def consumer(condition, data): with condition: while True: # 檢查條件是否滿足 while not condition_is_met(): condition.wait() # 從共享資源中消費(fèi)數(shù)據(jù) def producer(condition, data): with condition: # 生成數(shù)據(jù)并更新共享資源 condition.notify_all() processes = [] for i in range(5): p = Process(target=consumer, args=(condition, i)) processes.append(p) p.start() producer_process = Process(target=producer, args=(condition, data)) producer_process.start() for p in processes: p.join() producer_process.join()
在上述例子中,消費(fèi)者進(jìn)程在執(zhí)行任務(wù)前會(huì)檢查條件是否滿足,如果條件不滿足,則調(diào)用condition.wait()
方法等待條件的滿足。生產(chǎn)者進(jìn)程生成數(shù)據(jù)并更新共享資源后,調(diào)用condition.notify_all()
方法通知所有等待的消費(fèi)者進(jìn)程條件已滿足。被喚醒的消費(fèi)者進(jìn)程會(huì)重新檢查條件并執(zhí)行任務(wù)。
四、進(jìn)程池
進(jìn)程池是一種用于管理和調(diào)度多個(gè)進(jìn)程的機(jī)制,它可以有效地處理并行任務(wù)和提高程序的性能。進(jìn)程池在Python中通常使用multiprocessing
模塊提供的Pool
類來實(shí)現(xiàn)。
進(jìn)程池的工作原理如下:
- 創(chuàng)建進(jìn)程池時(shí),會(huì)啟動(dòng)指定數(shù)量的進(jìn)程,并將它們放入池中。
- 池中的進(jìn)程會(huì)等待主進(jìn)程提交任務(wù)。
- 主進(jìn)程通過提交任務(wù)給進(jìn)程池,將任務(wù)分配給空閑的進(jìn)程。
- 進(jìn)程池中的進(jìn)程執(zhí)行任務(wù),并將結(jié)果返回給主進(jìn)程。
- 主進(jìn)程獲取任務(wù)的結(jié)果,繼續(xù)執(zhí)行其他操作。
- 當(dāng)所有任務(wù)完成后,主進(jìn)程關(guān)閉進(jìn)程池。
1、創(chuàng)建進(jìn)程池
要使用進(jìn)程池,首先需要?jiǎng)?chuàng)建一個(gè)Pool
對(duì)象,可以指定池中的進(jìn)程數(shù)量。通常,可以使用multiprocessing.cpu_count()
函數(shù)來獲取當(dāng)前系統(tǒng)的CPU核心數(shù),然后根據(jù)需要來指定進(jìn)程池的大小。
from multiprocessing import Pool, cpu_count pool = Pool(processes=cpu_count())
在上述例子中,創(chuàng)建了一個(gè)進(jìn)程池,進(jìn)程數(shù)量與系統(tǒng)的CPU核心數(shù)相同。
2、提交任務(wù)
一旦創(chuàng)建了進(jìn)程池,就可以使用apply()
、map()
或imap()
方法來提交任務(wù)給進(jìn)程池。
- apply()方法用于提交單個(gè)任務(wù),并等待任務(wù)完成后返回結(jié)果。
result = pool.apply(function, args=(arg1, arg2))
- map()方法用于提交多個(gè)任務(wù),并按照任務(wù)提交的順序返回結(jié)果列表。
results = pool.map(function, iterable)
- imap()方法也用于提交多個(gè)任務(wù),但可以通過迭代器逐個(gè)獲取結(jié)果,而不需要等待所有任務(wù)完成。
results = pool.imap(function, iterable)
在上述例子中,function
表示要執(zhí)行的函數(shù),args
是函數(shù)的參數(shù),iterable
是一個(gè)可迭代對(duì)象,可以是列表、元組等。
3、獲取結(jié)果
對(duì)于apply()
方法,調(diào)用后會(huì)阻塞主進(jìn)程,直到任務(wù)完成并返回結(jié)果。對(duì)于map()
方法,調(diào)用后會(huì)等待所有任務(wù)完成,并按照任務(wù)提交的順序返回結(jié)果列表。對(duì)于imap()
方法,可以通過迭代器逐個(gè)獲取結(jié)果。
for result in results: print(result)
在上述例子中,使用for
循環(huán)逐個(gè)獲取結(jié)果并進(jìn)行處理。
4、關(guān)閉進(jìn)程池
在所有任務(wù)完成后,需要顯式地關(guān)閉進(jìn)程池,以釋放資源。
pool.close() pool.join()
調(diào)用close()
方法后,進(jìn)程池將不再接受新的任務(wù)。調(diào)用join()
方法會(huì)阻塞主進(jìn)程,直到所有任務(wù)都已完成。
5、使用進(jìn)程池的示例
from multiprocessing import Pool # 定義一個(gè)任務(wù)函數(shù) def square(x): return x ** 2 if __name__ == '__main__': # 創(chuàng)建進(jìn)程池 with Pool(processes=4) as pool: # 提交任務(wù)給進(jìn)程池 results = pool.map(square, range(10)) # 打印結(jié)果 print(results)
在上述示例中,首先定義了一個(gè)任務(wù)函數(shù)square
,它接受一個(gè)數(shù)值作為參數(shù),并返回該數(shù)值的平方。
在if __name__ == '__main__':
中,創(chuàng)建了一個(gè)進(jìn)程池,指定進(jìn)程數(shù)量為4。使用with
語句可以確保進(jìn)程池在使用完畢后被正確關(guān)閉。
然后,通過pool.map(square, range(10))
將任務(wù)提交給進(jìn)程池。map()
方法會(huì)將任務(wù)函數(shù)square
和一個(gè)可迭代對(duì)象range(10)
作為參數(shù),它會(huì)將可迭代對(duì)象中的每個(gè)元素依次傳遞給任務(wù)函數(shù)進(jìn)行處理,并返回結(jié)果列表。最后,打印結(jié)果列表,即每個(gè)數(shù)值的平方。
需要注意的是,在使用進(jìn)程池時(shí),需要將主程序代碼放在if __name__ == '__main__':
中,以確保在子進(jìn)程中不會(huì)重復(fù)執(zhí)行主程序的代碼。
以下是一個(gè)更加復(fù)雜的多進(jìn)程示例,展示了如何使用進(jìn)程池處理多個(gè)任務(wù),并在任務(wù)完成時(shí)獲取結(jié)果。
import time from multiprocessing import Pool # 定義一個(gè)任務(wù)函數(shù) def process_data(data): # 模擬耗時(shí)操作 time.sleep(1) # 返回處理結(jié)果 return data.upper() if __name__ == '__main__': # 創(chuàng)建進(jìn)程池 with Pool(processes=3) as pool: # 準(zhǔn)備數(shù)據(jù) data_list = ['apple', 'banana', 'cherry', 'date', 'elderberry'] # 提交任務(wù)給進(jìn)程池 results = [pool.apply_async(process_data, args=(data,)) for data in data_list] # 等待所有任務(wù)完成并獲取結(jié)果 final_results = [result.get() for result in results] # 打印結(jié)果 for result in final_results: print(result)
在上述示例中,除了使用進(jìn)程池的map()
方法提交任務(wù)之外,還使用了apply_async()
方法來異步提交任務(wù),并通過get()
方法獲取任務(wù)的結(jié)果。
在if __name__ == '__main__':
中,創(chuàng)建了一個(gè)進(jìn)程池,指定進(jìn)程數(shù)量為3。使用with
語句可以確保進(jìn)程池在使用完畢后被正確關(guān)閉。然后,準(zhǔn)備了一個(gè)數(shù)據(jù)列表data_list
,其中包含了需要處理的數(shù)據(jù)。
通過列表推導(dǎo)式,使用pool.apply_async(process_data, args=(data,))
將任務(wù)異步提交給進(jìn)程池。apply_async()
方法會(huì)將任務(wù)函數(shù)process_data
和數(shù)據(jù)data
作為參數(shù),返回一個(gè)AsyncResult
對(duì)象,表示異步任務(wù)的結(jié)果。將這些對(duì)象存儲(chǔ)在results
列表中。
接下來,使用列表推導(dǎo)式,通過result.get()
方法等待所有任務(wù)完成并獲取結(jié)果,將結(jié)果存儲(chǔ)在final_results
列表中。最后,使用for
循環(huán)遍歷final_results
列表,并打印每個(gè)任務(wù)的處理結(jié)果。
進(jìn)程池的優(yōu)點(diǎn)是可以自動(dòng)管理和調(diào)度多個(gè)進(jìn)程,充分利用系統(tǒng)資源,提高程序的并行執(zhí)行能力。通過合理設(shè)置進(jìn)程池的大小,可以在不過度消耗系統(tǒng)資源的情況下,實(shí)現(xiàn)最佳的并發(fā)效果。但需要注意的是,進(jìn)程池適用于那些需要并行執(zhí)行的任務(wù),而不適用于IO密集型任務(wù),因?yàn)檫M(jìn)程池中的進(jìn)程是通過復(fù)制主進(jìn)程來創(chuàng)建的,而IO密集型任務(wù)更適合使用線程池來實(shí)現(xiàn)并發(fā)。
總結(jié)
到此這篇關(guān)于Python使用multiprocessing實(shí)現(xiàn)多進(jìn)程的文章就介紹到這了,更多相關(guān)Python multiprocessing實(shí)現(xiàn)多進(jìn)程內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python udp如何實(shí)現(xiàn)同時(shí)收發(fā)信息
這篇文章主要介紹了python udp如何實(shí)現(xiàn)同時(shí)收發(fā)信息,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-05-05Python實(shí)現(xiàn)敲擊木魚積累功德小項(xiàng)目
最近大家都很流行用手機(jī)敲擊電子木魚積累功德,這在很多短視頻中也常常見到。本文將用Python實(shí)現(xiàn)這一效果,感興趣的小伙伴可以了解一下2022-11-11對(duì)pycharm代碼整體左移和右移縮進(jìn)快捷鍵的介紹
今天小編就為大家分享一篇對(duì)pycharm代碼整體左移和右移縮進(jìn)快捷鍵的介紹,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-07-07