python3學(xué)習(xí)筆記之多進(jìn)程分布式小例子
最近一直跟著廖大在學(xué)Python,關(guān)于分布式進(jìn)程的小例子挺有趣的,這里做個(gè)記錄。
分布式進(jìn)程
Python的multiprocessing模塊不但支持多進(jìn)程,其中managers子模塊還支持把多進(jìn)程分布到多臺(tái)機(jī)器上。一個(gè)服務(wù)進(jìn)程可以作為調(diào)度者,將任務(wù)分布到其他多個(gè)進(jìn)程中,依靠網(wǎng)絡(luò)通信。由于managers模塊封裝很好,不必了解網(wǎng)絡(luò)通信的細(xì)節(jié),就可以很容易地編寫分布式多進(jìn)程程序。
master服務(wù)端原理:通過managers模塊把Queue通過網(wǎng)絡(luò)暴露出去,其他機(jī)器的進(jìn)程就可以訪問Queue了
服務(wù)進(jìn)程負(fù)責(zé)啟動(dòng)Queue,把Queue注冊到網(wǎng)絡(luò)上,然后往Queue里面寫入任務(wù),代碼如下:
#task_master.py
#coding=utf-8
#多進(jìn)程分布式例子
#服務(wù)器端
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support #server啟動(dòng)報(bào)錯(cuò),提示需要引用此包
import random,time,queue
#發(fā)送任務(wù)的隊(duì)列
task_queue = queue.Queue()
#接收結(jié)果的隊(duì)列
result_queue = queue.Queue()
#從BaseManager繼承的QueueManager
class QueueManager(BaseManager):
pass
#win7 64 貌似不支持callable下調(diào)用匿名函數(shù)lambda,這里封裝一下
def return_task_queue():
global task_queue
return task_queue
def return_result_queue():
global result_queue
return result_queue
def test():
#把兩個(gè)Queue注冊到網(wǎng)絡(luò)上,callable參數(shù)關(guān)聯(lián)了Queue對象
#QueueManager.register('get_task_queue',callable=lambda:task_queue)
#QueueManager.register('get_result_queue',callable=lambda:result_queue)
QueueManager.register('get_task_queue',callable=return_task_queue)
QueueManager.register('get_result_queue',callable=return_result_queue)
#綁定端口5000,設(shè)置驗(yàn)證碼‘a(chǎn)bc'
manager = QueueManager(address=('127.0.0.1',5000),authkey=b'abc')#這里必須加上本地默認(rèn)ip地址127.0.0.1
#啟動(dòng)Queue
manager.start()
#server = manager.get_server()
#server.serve_forever()
print('start server master')
#獲得通過網(wǎng)絡(luò)訪問的Queue對象
task = manager.get_task_queue()
result = manager.get_result_queue()
#放幾個(gè)任務(wù)進(jìn)去
for i in range(10):
n = random.randint(0,10000)
print('put task %d...' % n)
task.put(n)
#從result隊(duì)列讀取結(jié)果
print('try get results...')
for i in range(10):
r = result.get(timeout=10)
print('result:%s' % r)
#關(guān)閉
manager.shutdown()
print('master exit')
if __name__ == '__main__':
freeze_support()
test()
運(yùn)行截圖如下:

在分布式多進(jìn)程環(huán)境下,添加任務(wù)到Queue不可以直接對原始的task_queue進(jìn)行操作,那樣就繞過了QueueManager的封裝,必須通過manager.get_task_queue()獲得的Queue接口添加。
任務(wù)進(jìn)程,代碼如下:
#task_worker.py
#coding=utf-8
#多進(jìn)程分布式例子
#非服務(wù)端:worker
import time,sys,queue
from multiprocessing.managers import BaseManager
#創(chuàng)建類似的QueueManager
class QueueManager(BaseManager):
pass
#由于這個(gè)QueueManager只從網(wǎng)絡(luò)上獲取Queue,所以注冊時(shí)只提供名字即可
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
#連接到服務(wù)器,也就是運(yùn)行task_master.py的機(jī)器
server_addr = '127.0.0.1'
print('connect to server %s...'% server_addr)
#端口和驗(yàn)證碼注意要保持完全一致
m = QueueManager(address=(server_addr,5000),authkey=b'abc')
#從網(wǎng)絡(luò)連接
m.connect()
#獲取Queue的對象
task = m.get_task_queue()
result = m.get_result_queue()
#從task隊(duì)列獲取任務(wù),并把結(jié)果寫入result隊(duì)列
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...'% (n,n))
r = '%d * %d = %d' % (n,n,n*n)
time.sleep(1)
result.put(r)
except queue.Empty:
print('task queue is empty')
#處理結(jié)果
print('worker exit')
運(yùn)行截圖如下:

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
讓你的Python代碼實(shí)現(xiàn)類型提示功能
今天小編就為大家分享一篇讓你的Python代碼實(shí)現(xiàn)類型提示功能,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-11-11
Python基礎(chǔ)知識(shí)快速上手入門學(xué)習(xí)
本篇文章使用代碼示例,一看就會(huì),從基礎(chǔ)語法、變量類型、運(yùn)算符和條件語句多個(gè)方面詳細(xì)闡述了Python基礎(chǔ)知識(shí)快速上手入門學(xué)習(xí)的內(nèi)容,希望本文能對Python初學(xué)者有所幫助2023-08-08
Python報(bào)錯(cuò)SyntaxError:unexpected?EOF?while?parsing的解決辦法
在運(yùn)行或編寫一個(gè)程序時(shí)常會(huì)遇到錯(cuò)誤異常,這時(shí)python會(huì)給你一個(gè)錯(cuò)誤提示類名,告訴出現(xiàn)了什么樣的問題,下面這篇文章主要給大家介紹了關(guān)于Python報(bào)錯(cuò)SyntaxError:unexpected?EOF?while?parsing的解決辦法,需要的朋友可以參考下2022-07-07
jupyter notebook遠(yuǎn)程訪問不了的問題解決方法
這篇文章主要介紹了jupyter notebook遠(yuǎn)程訪問不了的問題解決方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01

