Python定義一個(gè)Actor任務(wù)
問(wèn)題
你想定義跟actor模式中類似“actors”角色的任務(wù)
解決方案
actor模式是一種最古老的也是最簡(jiǎn)單的并行和分布式計(jì)算解決方案。 事實(shí)上,它天生的簡(jiǎn)單性是它如此受歡迎的重要原因之一。 簡(jiǎn)單來(lái)講,一個(gè)actor就是一個(gè)并發(fā)執(zhí)行的任務(wù),只是簡(jiǎn)單的執(zhí)行發(fā)送給它的消息任務(wù)。 響應(yīng)這些消息時(shí),它可能還會(huì)給其他actor發(fā)送更進(jìn)一步的消息。 actor之間的通信是單向和異步的。因此,消息發(fā)送者不知道消息是什么時(shí)候被發(fā)送, 也不會(huì)接收到一個(gè)消息已被處理的回應(yīng)或通知。
結(jié)合使用一個(gè)線程和一個(gè)隊(duì)列可以很容易的定義actor,例如:
from queue import Queue from threading import Thread, Event # Sentinel used for shutdown class ActorExit(Exception): pass class Actor: def __init__(self): self._mailbox = Queue() def send(self, msg): ''' Send a message to the actor ''' self._mailbox.put(msg) def recv(self): ''' Receive an incoming message ''' msg = self._mailbox.get() if msg is ActorExit: raise ActorExit() return msg def close(self): ''' Close the actor, thus shutting it down ''' self.send(ActorExit) def start(self): ''' Start concurrent execution ''' self._terminated = Event() t = Thread(target=self._bootstrap) t.daemon = True t.start() def _bootstrap(self): try: self.run() except ActorExit: pass finally: self._terminated.set() def join(self): self._terminated.wait() def run(self): ''' Run method to be implemented by the user ''' while True: msg = self.recv() # Sample ActorTask class PrintActor(Actor): def run(self): while True: msg = self.recv() print('Got:', msg) # Sample use p = PrintActor() p.start() p.send('Hello') p.send('World') p.close() p.join()
這個(gè)例子中,你使用actor實(shí)例的 send()
方法發(fā)送消息給它們。 其機(jī)制是,這個(gè)方法會(huì)將消息放入一個(gè)隊(duì)里中, 然后將其轉(zhuǎn)交給處理被接受消息的一個(gè)內(nèi)部線程。 close()
方法通過(guò)在隊(duì)列中放入一個(gè)特殊的哨兵值(ActorExit)來(lái)關(guān)閉這個(gè)actor。 用戶可以通過(guò)繼承Actor并定義實(shí)現(xiàn)自己處理邏輯run()方法來(lái)定義新的actor。 ActorExit
異常的使用就是用戶自定義代碼可以在需要的時(shí)候來(lái)捕獲終止請(qǐng)求 (異常被get()方法拋出并傳播出去)。
如果你放寬對(duì)于同步和異步消息發(fā)送的要求, 類actor對(duì)象還可以通過(guò)生成器來(lái)簡(jiǎn)化定義。例如:
def print_actor(): while True: try: msg = yield # Get a message print('Got:', msg) except GeneratorExit: print('Actor terminating') # Sample use p = print_actor() next(p) # Advance to the yield (ready to receive) p.send('Hello') p.send('World') p.close()
討論
actor模式的魅力就在于它的簡(jiǎn)單性。 實(shí)際上,這里僅僅只有一個(gè)核心操作 send()
. 甚至,對(duì)于在基于actor系統(tǒng)中的“消息”的泛化概念可以已多種方式被擴(kuò)展。 例如,你可以以元組形式傳遞標(biāo)簽消息,讓actor執(zhí)行不同的操作,如下:
class TaggedActor(Actor): def run(self): while True: tag, *payload = self.recv() getattr(self,'do_'+tag)(*payload) # Methods correponding to different message tags def do_A(self, x): print('Running A', x) def do_B(self, x, y): print('Running B', x, y) # Example a = TaggedActor() a.start() a.send(('A', 1)) # Invokes do_A(1) a.send(('B', 2, 3)) # Invokes do_B(2,3) a.close() a.join()
作為另外一個(gè)例子,下面的actor允許在一個(gè)工作者中運(yùn)行任意的函數(shù), 并且通過(guò)一個(gè)特殊的Result對(duì)象返回結(jié)果:
from threading import Event class Result: def __init__(self): self._evt = Event() self._result = None def set_result(self, value): self._result = value self._evt.set() def result(self): self._evt.wait() return self._result class Worker(Actor): def submit(self, func, *args, **kwargs): r = Result() self.send((func, args, kwargs, r)) return r def run(self): while True: func, args, kwargs, r = self.recv() r.set_result(func(*args, **kwargs)) # Example use worker = Worker() worker.start() r = worker.submit(pow, 2, 3) worker.close() worker.join() print(r.result())
最后,“發(fā)送”一個(gè)任務(wù)消息的概念可以被擴(kuò)展到多進(jìn)程甚至是大型分布式系統(tǒng)中去。 例如,一個(gè)類actor對(duì)象的 send()
方法可以被編程讓它能在一個(gè)套接字連接上傳輸數(shù)據(jù) 或通過(guò)某些消息中間件(比如AMQP、ZMQ等)來(lái)發(fā)送。
以上就是Python定義一個(gè)Actor任務(wù)的詳細(xì)內(nèi)容,更多關(guān)于Python actor任務(wù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
pandas創(chuàng)建series的三種方法小結(jié)
這篇文章主要介紹了pandas創(chuàng)建series的三種方法小結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-05-05Python模塊學(xué)習(xí) datetime介紹
Python提供了多個(gè)內(nèi)置模塊用于操作日期時(shí)間,像calendar,time,datetime。time模塊我在之前的文章已經(jīng)有所介紹,它提供的接口與C標(biāo)準(zhǔn)庫(kù)time.h基本一致2012-08-08Python自動(dòng)生成代碼 使用tkinter圖形化操作并生成代碼框架
這篇文章主要為大家詳細(xì)介紹了Python自動(dòng)生成代碼,使用tkinter圖形化操作并生成代碼框架,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-09-09使用scipy.optimize的fsolve,root函數(shù)求解非線性方程問(wèn)題
這篇文章主要介紹了使用scipy.optimize的fsolve,root函數(shù)求解非線性方程問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-12-12Python+Selenium實(shí)現(xiàn)短視頻自動(dòng)上傳與發(fā)布的實(shí)踐
本文主要介紹了Python+Selenium實(shí)現(xiàn)短視頻自動(dòng)上傳與發(fā)布的實(shí)踐,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04淺談Pycharm調(diào)用同級(jí)目錄下的py腳本bug
今天小編就為大家分享一篇淺談Pycharm調(diào)用同級(jí)目錄下的py腳本bug,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-12-12pycharm中選中一個(gè)單詞替換所有重復(fù)單詞的實(shí)現(xiàn)方法
這篇文章主要介紹了pycharm中選中一個(gè)單詞替換所有重復(fù)單詞的實(shí)現(xiàn)方法,類似于sublime 里的ctrl+D功能,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2020-11-11Python遠(yuǎn)程創(chuàng)建docker容器的方法
這篇文章主要介紹了Python遠(yuǎn)程創(chuàng)建docker容器的方法,如果docker??ps找不到該容器,可以使用?docker?ps?-a查看所有的,然后看剛才創(chuàng)建的容器的STATUS是EXIT0還是EXIT1如果是1,那應(yīng)該是有報(bào)錯(cuò),使用?docker?logs?容器id命令來(lái)查看日志,根據(jù)日志進(jìn)行解決,需要的朋友可以參考下2024-04-04