python中如何使用分步式進(jìn)程計算詳解
前言
在python中使用多進(jìn)程和多線程都能達(dá)到同時運行多個任務(wù),和多進(jìn)程和多線程的選擇上,應(yīng)該優(yōu)先選擇多進(jìn)程的方式,因為多進(jìn)程更加穩(wěn)定,且對于進(jìn)程的操作管理也更加方便,但有一點是多進(jìn)程獨有的殺手锏,多進(jìn)程可以將進(jìn)程分步到多臺機(jī)器上跑,假如有很多個任務(wù),一臺機(jī)器即使開了多進(jìn)程或者多進(jìn)程跑起來還是要耗很多時間,那么這時就要想一下可否將任務(wù)分配到多臺機(jī)器上跑,這樣可以更快的完成任務(wù)。
在分步式進(jìn)程運算中,進(jìn)程之前的通信還是依賴于Queue,但此時的隊列不能直接使用,需要使用multiprocessing.managers.BaseManager
進(jìn)行包裝,通過回調(diào)以后才能使用,既然是分步式的調(diào)用,那么應(yīng)該有一個服務(wù)端和一個客戶端,服務(wù)端通過網(wǎng)絡(luò)協(xié)議將隊列中的信息給各個客戶端進(jìn)行調(diào)用,客戶端也可以通過隊列將結(jié)果返回,然后服務(wù)端進(jìn)行結(jié)果的收集展示,流程如下
分步式流程
服務(wù)端將任務(wù)放到 task_queue 中,然后四個客戶端通過網(wǎng)絡(luò)端口從task_queue中獲取到任務(wù),然后進(jìn)行計算,再將結(jié)果放到result_queue中,最后服務(wù)端統(tǒng)一處理結(jié)果。整體的流程比較清晰,只是需要強(qiáng)調(diào),這里的隊列不能是原始的隊列,需要使用BaseManager 進(jìn)行包裝。
先看一下服務(wù)端的代碼
#coding:gbk import time, queue from multiprocessing.managers import BaseManager from multiprocessing import freeze_support # 任務(wù)個數(shù) task_number = 10 # 定義收發(fā)隊列 task_queue = queue.Queue(task_number) result_queue = queue.Queue(task_number) def gettask(): return task_queue def getresult(): return result_queue def test(): # windows下綁定調(diào)用接口不能使用lambda,所以只能先定義函數(shù)再綁定 BaseManager.register('get_task', callable=gettask) BaseManager.register('get_result', callable=getresult) # 綁定端口并設(shè)置驗證碼,windows下需要填寫ip地址,linux下不填默認(rèn)為本地 manager = BaseManager(address=('127.0.0.1', 5002), authkey=b'123') # 啟動 manager.start() try: # 通過網(wǎng)絡(luò)獲取任務(wù)隊列和結(jié)果隊列 task = manager.get_task() result = manager.get_result() # 添加任務(wù) for i in range(task_number): print('Put task %d...' % i) task.put(i) # 每秒檢測一次是否所有任務(wù)都被執(zhí)行完 while not result.full(): print(task.qsize()) time.sleep(1) for i in range(result.qsize()): ans = result.get() print('task %d is finish , runtime:%d s' % ans) except: print('Manager error') finally: manager.shutdown() if __name__ == '__main__': # windows下多進(jìn)程可能會炸,添加這句可以緩解 freeze_support() test()
這里重點說一下 BaseManager.register('get_task', callable=gettask)
這行代碼,它的意思是注冊一個get_task的操作,執(zhí)行的操作是gettask()
函數(shù),上面定義了gettask()
函數(shù),返回的是task_queue,這也是之前說的不能直接使用queue.Queue
,必須要使用通過BaseManager的register接口封裝過的的隊列,下面使用task = manager.get_task()
來獲取到這個隊列。
manager = BaseManager(address=('127.0.0.1', 5002), authkey=b'123')
這行代碼初始了一個manager,它綁定了本機(jī)的5002端口,并且在客戶端連接的時候需要一個密碼:123。
接下來看一下客戶端代碼。
#coding:gbk import time, sys, queue, random from multiprocessing.managers import BaseManager BaseManager.register('get_task') BaseManager.register('get_result') conn = BaseManager(address = ('127.0.0.1',5002), authkey = b'123') try: conn.connect() except: print('連接失敗') sys.exit() task = conn.get_task() result = conn.get_result() while not task.empty(): print(task.qsize()) n = task.get(timeout = 1) print('run task %d' % n) sleeptime = random.randint(0,3) time.sleep(sleeptime) rt = (n, sleeptime) result.put(rt) if __name__ == '__main__': pass;
這里主要看以下的代碼
BaseManager.register('get_task') BaseManager.register('get_result')
這兩個是注冊函數(shù),和之前的服務(wù)端所對應(yīng),之前服務(wù)端注冊了這兩個函數(shù),這里才能注冊使用,注意這里不能注冊服務(wù)端沒有注冊的函數(shù)
運行一下,先運行服務(wù)端,然后再啟兩個cmd運行客戶端,也可以在局域網(wǎng)中的另外的機(jī)器上運行,但是要修改服務(wù)端的ip地址
服務(wù)端的結(jié)果如下
Put task 0...
Put task 1...
Put task 2...
Put task 3...
Put task 4...
Put task 5...
Put task 6...
Put task 7...
Put task 8...
Put task 9...
task 0 is finish , runtime:3 s
task 1 is finish , runtime:0 s
task 2 is finish , runtime:2 s
task 4 is finish , runtime:1 s
task 3 is finish , runtime:3 s
task 6 is finish , runtime:1 s
task 7 is finish , runtime:0 s
task 5 is finish , runtime:3 s
task 8 is finish , runtime:2 s
task 9 is finish , runtime:3 s
兩個客戶端的結(jié)果分別如下
客戶端1
10
run task 0
9
run task 1
8
run task 2
6
run task 4
5
run task 5
1
run task 9
客戶端2
7
run task 3
4
run task 6
3
run task 7
2
run task 8
一起運行的截圖如下
結(jié)果
由于隊列是線程安全的,所以這里不用加鎖,在客戶端中打印print(task.qsize()) 當(dāng)前的隊列大小,可以看到隊列的信息中同步到各個客戶端的。
最后還是要多說一句,分步式多進(jìn)程雖然可以把任務(wù)分散到不同的機(jī)器上運行,可以處理多任務(wù),但是如果此時服務(wù)端掛掉的話,任務(wù)就全丟掉了,所以在生產(chǎn)環(huán)境下還是考慮使用消息中間件如kafka等。
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,謝謝大家對腳本之家的支持。
相關(guān)文章
Python調(diào)用Zoomeye搜索接口的實現(xiàn)
本文主要介紹了Python調(diào)用Zoomeye搜索接口的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01python 的 scapy庫,實現(xiàn)網(wǎng)卡收發(fā)包的例子
今天小編就為大家分享一篇python 的 scapy庫,實現(xiàn)網(wǎng)卡收發(fā)包的例子,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-07-07pytest解讀fixtures中yield與addfinalizer區(qū)別
這篇文章主要為大家介紹了pytest官方解讀fixtures中yield與addfinalizer區(qū)別,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06python實現(xiàn)字符串連接的三種方法及其效率、適用場景詳解
本篇文章主要介紹了python實現(xiàn)字符串連接的三種方法及其效率、適用場景詳解,具有一定的參考價值,感興趣的小伙伴們可以參考一下。2017-01-01