Python定義一個Actor任務
問題
你想定義跟actor模式中類似“actors”角色的任務
解決方案
actor模式是一種最古老的也是最簡單的并行和分布式計算解決方案。 事實上,它天生的簡單性是它如此受歡迎的重要原因之一。 簡單來講,一個actor就是一個并發(fā)執(zhí)行的任務,只是簡單的執(zhí)行發(fā)送給它的消息任務。 響應這些消息時,它可能還會給其他actor發(fā)送更進一步的消息。 actor之間的通信是單向和異步的。因此,消息發(fā)送者不知道消息是什么時候被發(fā)送, 也不會接收到一個消息已被處理的回應或通知。
結合使用一個線程和一個隊列可以很容易的定義actor,例如:
from queue import Queue
from threading import Thread, Event
# Sentinel used for shutdown
class ActorExit(Exception):
pass
class Actor:
def __init__(self):
self._mailbox = Queue()
def send(self, msg):
'''
Send a message to the actor
'''
self._mailbox.put(msg)
def recv(self):
'''
Receive an incoming message
'''
msg = self._mailbox.get()
if msg is ActorExit:
raise ActorExit()
return msg
def close(self):
'''
Close the actor, thus shutting it down
'''
self.send(ActorExit)
def start(self):
'''
Start concurrent execution
'''
self._terminated = Event()
t = Thread(target=self._bootstrap)
t.daemon = True
t.start()
def _bootstrap(self):
try:
self.run()
except ActorExit:
pass
finally:
self._terminated.set()
def join(self):
self._terminated.wait()
def run(self):
'''
Run method to be implemented by the user
'''
while True:
msg = self.recv()
# Sample ActorTask
class PrintActor(Actor):
def run(self):
while True:
msg = self.recv()
print('Got:', msg)
# Sample use
p = PrintActor()
p.start()
p.send('Hello')
p.send('World')
p.close()
p.join()
這個例子中,你使用actor實例的 send() 方法發(fā)送消息給它們。 其機制是,這個方法會將消息放入一個隊里中, 然后將其轉交給處理被接受消息的一個內部線程。 close() 方法通過在隊列中放入一個特殊的哨兵值(ActorExit)來關閉這個actor。 用戶可以通過繼承Actor并定義實現(xiàn)自己處理邏輯run()方法來定義新的actor。 ActorExit 異常的使用就是用戶自定義代碼可以在需要的時候來捕獲終止請求 (異常被get()方法拋出并傳播出去)。
如果你放寬對于同步和異步消息發(fā)送的要求, 類actor對象還可以通過生成器來簡化定義。例如:
def print_actor():
while True:
try:
msg = yield # Get a message
print('Got:', msg)
except GeneratorExit:
print('Actor terminating')
# Sample use
p = print_actor()
next(p) # Advance to the yield (ready to receive)
p.send('Hello')
p.send('World')
p.close()
討論
actor模式的魅力就在于它的簡單性。 實際上,這里僅僅只有一個核心操作 send() . 甚至,對于在基于actor系統(tǒng)中的“消息”的泛化概念可以已多種方式被擴展。 例如,你可以以元組形式傳遞標簽消息,讓actor執(zhí)行不同的操作,如下:
class TaggedActor(Actor):
def run(self):
while True:
tag, *payload = self.recv()
getattr(self,'do_'+tag)(*payload)
# Methods correponding to different message tags
def do_A(self, x):
print('Running A', x)
def do_B(self, x, y):
print('Running B', x, y)
# Example
a = TaggedActor()
a.start()
a.send(('A', 1)) # Invokes do_A(1)
a.send(('B', 2, 3)) # Invokes do_B(2,3)
a.close()
a.join()
作為另外一個例子,下面的actor允許在一個工作者中運行任意的函數(shù), 并且通過一個特殊的Result對象返回結果:
from threading import Event
class Result:
def __init__(self):
self._evt = Event()
self._result = None
def set_result(self, value):
self._result = value
self._evt.set()
def result(self):
self._evt.wait()
return self._result
class Worker(Actor):
def submit(self, func, *args, **kwargs):
r = Result()
self.send((func, args, kwargs, r))
return r
def run(self):
while True:
func, args, kwargs, r = self.recv()
r.set_result(func(*args, **kwargs))
# Example use
worker = Worker()
worker.start()
r = worker.submit(pow, 2, 3)
worker.close()
worker.join()
print(r.result())
最后,“發(fā)送”一個任務消息的概念可以被擴展到多進程甚至是大型分布式系統(tǒng)中去。 例如,一個類actor對象的 send() 方法可以被編程讓它能在一個套接字連接上傳輸數(shù)據(jù) 或通過某些消息中間件(比如AMQP、ZMQ等)來發(fā)送。
以上就是Python定義一個Actor任務的詳細內容,更多關于Python actor任務的資料請關注腳本之家其它相關文章!
相關文章
Python自動生成代碼 使用tkinter圖形化操作并生成代碼框架
這篇文章主要為大家詳細介紹了Python自動生成代碼,使用tkinter圖形化操作并生成代碼框架,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-09-09
使用scipy.optimize的fsolve,root函數(shù)求解非線性方程問題
這篇文章主要介紹了使用scipy.optimize的fsolve,root函數(shù)求解非線性方程問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-12-12
Python+Selenium實現(xiàn)短視頻自動上傳與發(fā)布的實踐
本文主要介紹了Python+Selenium實現(xiàn)短視頻自動上傳與發(fā)布的實踐,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-04-04
pycharm中選中一個單詞替換所有重復單詞的實現(xiàn)方法
這篇文章主要介紹了pycharm中選中一個單詞替換所有重復單詞的實現(xiàn)方法,類似于sublime 里的ctrl+D功能,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧2020-11-11

