Python?Asyncio庫之a(chǎn)syncio.task常用函數(shù)詳解
前記
Asyncio
在經(jīng)過一段時間的發(fā)展以及獲取Curio
等第三方庫的經(jīng)驗來提供更多的功能,目前高級功能也基本完善,但是相對于其他語言,Python
的Asyncio
高級功能還是不夠的,但好在Asyncio
的低級API也比較完善,開發(fā)者可以通過參考Asyncio
高級API的例子來自己實現(xiàn)一些功能,同時也可以通過這些功能更加了解Asyncio
的原理和避免踩到高級API的坑。
0.基礎(chǔ)
在《Python Asyncio調(diào)度原理》中介紹了Asyncio
的兩種調(diào)度基本單位,Handler
和TimeHandler
,他們只能被loop.call_xx
函數(shù)調(diào)用,開發(fā)者從表面上不知道他們的存在,他們和loop.call_xx
屬于事件循環(huán)的基礎(chǔ)功能,但是這些操作都屬于單一操作,需要開發(fā)者自己編寫代碼把他們的操作給串聯(lián)起來。 而在《Python的可等待對象在Asyncio的作用》中介紹了協(xié)程鏈的發(fā)起者asyncio.Task
能通過loop.call_soon
跟事件循環(huán)進行交互,并串聯(lián)整個協(xié)程鏈中可等待對象以及安排可等待對象的運行。 不過對于loop.call_at
和loop.call_later
仍需要開發(fā)者通過asyncio.Future
來把Timehandler
的執(zhí)行結(jié)果與asyncio.Task
給串聯(lián)起來,比如休眠一秒的代碼實現(xiàn):
import asyncio async def main(): loop = asyncio.get_event_loop() f = asyncio.Future() def _on_complete(): f.set_result(True) loop.call_later(1, _on_complete) return await f if __name__ == "__main__": import time s_t = time.time() asyncio.run(main()) print(time.time() - s_t)
這段代碼中asyncio.Future
執(zhí)行的是類似容器的功能,自己本身會接受各種狀態(tài),并把自己的狀態(tài)同步給管理當(dāng)前協(xié)程鏈的asyncio.Task
,使asyncio.Task
能管理其他類型的操作。 在asyncio.tasks
模塊中的所有功能函數(shù)的原理也差不多,他們接受的參數(shù)基本是都是可等待對象,然后通過asyncio.Futurte
作為容器來同步調(diào)用端和可等待對象間的狀態(tài),也可以通過其他的一些方法把asyncio.Task
的狀態(tài)同步給可等待對象。
1.休眠--asyncio.sleep
asyncio.sleep
是一個常用的方法,開發(fā)者通過它可以很方便的讓協(xié)程休眠設(shè)定的時間,它本身也非常簡單,它的源碼如下:
@types.coroutine def __sleep0(): yield async def sleep(delay, result=None): """Coroutine that completes after a given time (in seconds).""" if delay <= 0: await __sleep0() return result loop = events.get_running_loop() future = loop.create_future() h = loop.call_later(delay, futures._set_result_unless_cancelled, future, result) try: return await future finally: h.cancel()
通過源碼可以發(fā)現(xiàn)當(dāng)設(shè)置的休眠時間等于小于0的時候,sleep
只執(zhí)行了yield
,并不會執(zhí)行其他邏輯,而在值大于0時會創(chuàng)建一個Future
對象,接著就一直等待,直到Future
對象被loop.call_later
控制結(jié)束時才返回結(jié)果值。
需要注意的是,當(dāng)asyncio.sleep
在值為0時,sleep
執(zhí)行yield
可以讓Task.__step
感知而讓出控制權(quán),這是最小的讓出當(dāng)前協(xié)程控制權(quán)的方法,所以我們在編寫涉及到CPU比較多的時候或者消耗時間較長的函數(shù)時可以通過asyncio.sleep(0)
來主動讓出控制權(quán),如下:
import asyncio async def demo() -> None: for index, i in enumerate(range(10000)): if index % 100 == 0: await asyncio.sleep(0) ... # 假設(shè)這里的代碼占用過多的CPU時間
在這個例子中每循環(huán)100次就讓出控制權(quán),以減少對其他協(xié)程的影響。
2.屏蔽取消--asyncio.shield
asyncio.shield
可以保護一個可等待對象被取消,或者說是防止協(xié)程鏈上的取消傳播到被asyncio.shield
托管的可等待對象,但是調(diào)用可等待對象的cancel
方法仍然可以取消可等待對象的運行,如下例子:
import asyncio async def sub(f): await asyncio.shield(f) async def main(): f1 = asyncio.Future() f2 = asyncio.Future() sub1 = asyncio.create_task(sub(f1)) sub2 = asyncio.create_task(sub(f2)) f1.cancel() sub2.cancel() await asyncio.sleep(0) # 確保已經(jīng)取消完成 print("f1 future run success:", f1.done()) print("f2 future run success:", f2.done()) print("sub1 future run result:", sub1.done()) print("sub2 future run result:", sub2.done()) asyncio.run(main()) # >>> future run success: True # >>> future run success: False # >>> sub1 future run result: True # >>> sub2 future run result: True
其中f1
, f2
都在main
函數(shù)中創(chuàng)建, 然后同時被sub
函數(shù)包裹,并通過asyncio.create_task
在后臺異步運行并分別返回sub1
和sub2
兩個Future
對應(yīng)著sub
函數(shù)的執(zhí)行情況。 接著分別取消f1
和sub2
的執(zhí)行,并把f1
,f2
,sub1
,sub2
是否為done
打印出來,可以發(fā)現(xiàn)f1
,sub1
,sub2
的狀態(tài)都為done
(被取消也認為是done),而f2
則還在運行中。
在文章《Python的可等待對象在Asyncio的作用》中說過,一條協(xié)程鏈?zhǔn)怯?code>asyncio.Task牽頭組成的,后續(xù)的所有成功和異常都會在這條鏈上傳播,而取消本質(zhì)上就是一種異常,所以也可以在協(xié)程鏈上傳播。 而shield
為了杜絕運行的可等待對象收到協(xié)程鏈的異常傳播又能讓協(xié)程鏈知道可等待對象的執(zhí)行結(jié)果,會先讓可等待對象在另外一條協(xié)程鏈運行,然后創(chuàng)建一個容器接到原來鏈上,并在可等待對象執(zhí)行完成的時候把結(jié)果告訴容器,由容器把結(jié)果傳播到原有的協(xié)程鏈上,對應(yīng)的源碼如下:
def shield(arg): # 如果是Coro,則需要包裝成future inner = _ensure_future(arg) if inner.done(): # 如果已經(jīng)完成,就不需要被處理了 return inner loop = futures._get_loop(inner) # 創(chuàng)建一個future容器 outer = loop.create_future() def _inner_done_callback(inner): if outer.cancelled(): if not inner.cancelled(): # 如果容器已經(jīng)被取消,而自己沒被取消且已經(jīng)完成,則手動獲取下結(jié)果,方便被回收 inner.exception() return if inner.cancelled(): # 如果自己被取消,則把取消通過容器傳播到協(xié)程鏈上 outer.cancel() else: # 自己已經(jīng)完成且容器未完成,把自己的結(jié)果或者異常通過替身傳播到協(xié)程鏈上 exc = inner.exception() if exc is not None: outer.set_exception(exc) else: outer.set_result(inner.result()) def _outer_done_callback(outer): if not inner.done(): inner.remove_done_callback(_inner_done_callback) # 添加回調(diào),在執(zhí)行成功或被取消時通知對方 inner.add_done_callback(_inner_done_callback) outer.add_done_callback(_outer_done_callback) return outer
通過源碼可以發(fā)現(xiàn)shield
被調(diào)用的時候(假設(shè)驅(qū)動調(diào)用shield
的Task
名為main.Task
),會先通過_ensure_future
輔助函數(shù)創(chuàng)建一個Task
(other.Task
)在后臺異步運行可等待對象,驅(qū)動可等待對象的運行,由于是新的Task
驅(qū)動著可等待對象的執(zhí)行,所以main.Task
的任何狀態(tài)不會傳播到當(dāng)前的可等待對象。 接著創(chuàng)建一個Future
容器,并在other.Task
和Future
容器掛上完成的回調(diào)使他們在完成的時候都能通知到對方,最后返回Future
容器給main.Task
,使main.Task
能夠間接的知道可等待對象的運行結(jié)果,如下圖:
不過Future
容器完成的回調(diào)只是把托管可等待對象的other.Task
回調(diào)給移除了,導(dǎo)致main.Task
的狀態(tài)不會同步到other.Task
中(圖中Future
通知可等待對象aws
的通道是不通的),進而不會影響到托管的可等待對象。 而other.Task
完成的回調(diào)會把任何狀態(tài)同步到Future
中,進而影響到main.Task
。
3.超時--asyncio.wait_for
asyncio.wait_for
可以托管可等待對象,直到可等待對象完成,不過可等待對象在設(shè)定的時間內(nèi)還沒執(zhí)行完成時會被直接取消執(zhí)行并拋出asyncio.TimeoutError
異常。 它的運行原理綜合了上面的asyncio.shield
和asyncio.sleep
,它一樣會為可等待對象創(chuàng)建一個Future
容器,并在容器上掛了一個超時的回調(diào)和可等待對象執(zhí)行結(jié)束的回調(diào),接著就等待容器執(zhí)行結(jié)束。 不過在了解asyncio.wait_for
之前,先了解他用到的兩個輔助函數(shù)_cancel_and_wait
和_release_waiter
,他們的源碼如下:
def _release_waiter(waiter, *args): if not waiter.done(): waiter.set_result(None) async def _cancel_and_wait(fut, loop): waiter = loop.create_future() cb = functools.partial(_release_waiter, waiter) fut.add_done_callback(cb) try: fut.cancel() await waiter finally: fut.remove_done_callback(cb)
可以看出源碼比較簡單,他們的作用都是為了確??傻却龑ο竽芡耆珗?zhí)行結(jié)束才返回,其中_release_waiter
是確??傻却龑ο笠欢ū辉O(shè)置為執(zhí)行結(jié)束,而_cancel_and_wait
是為了確保能等到可等待對象被取消且完整結(jié)束時才返回。
可等待對象的cancel
方法可以認為是異步的,調(diào)用后需要等事件循環(huán)再次調(diào)用可等待對象時,可等待對象才會被取消。而_cancel_and_wait
通過一個容器來規(guī)避這個問題,使取消這個操作變?yōu)橥降模@個方法在某些開發(fā)場景經(jīng)常被使用,如果不是私有API就更好了。
接下來就可以通過wait_for
的源碼了解他的執(zhí)行邏輯了,源碼如下:
async def wait_for(fut, timeout): loop = events.get_running_loop() if timeout is None: return await fut if timeout <= 0: # 當(dāng)超時的值小于等于0時就意味著想馬上得到結(jié)果 fut = ensure_future(fut, loop=loop) if fut.done(): # 如果執(zhí)行完成就返回可等待對象的數(shù)據(jù) return fut.result() # 取消可等待對象并等待 await _cancel_and_wait(fut, loop=loop) # 如果被_cancel_and_wait取消,那么會拋出CancelledError異常,這時候把它轉(zhuǎn)為超時異常 try: return fut.result() except exceptions.CancelledError as exc: raise exceptions.TimeoutError() from exc # 初始化一個Future,只有在超時和完成時才會變?yōu)閐one waiter = loop.create_future() timeout_handle = loop.call_later(timeout, _release_waiter, waiter) cb = functools.partial(_release_waiter, waiter) fut = ensure_future(fut, loop=loop) fut.add_done_callback(cb) try: try: await waiter except exceptions.CancelledError: # 此時是asyncio.Task被取消,并把取消傳播到waiter if fut.done(): return fut.result() else: # 如果任務(wù)被取消了,那么需要確保任務(wù)沒有被執(zhí)行才返回 fut.remove_done_callback(cb) await _cancel_and_wait(fut, loop=loop) raise # 計時結(jié)束或者是執(zhí)行完畢的情況 if fut.done(): # 執(zhí)行完畢,返回對應(yīng)的值 return fut.result() else: # 計時結(jié)束,清理資源,并拋出異常 fut.remove_done_callback(cb) # 如果任務(wù)被取消了,那么需要確保任務(wù)沒有被執(zhí)行才返回 await _cancel_and_wait(fut, loop=loop) # 如果被_cancel_and_wait取消,那么會拋出CancelledError異常,這時候把它轉(zhuǎn)為超時異常 try: return fut.result() except exceptions.CancelledError as exc: raise exceptions.TimeoutError() from exc finally: timeout_handle.cancel()
wait_for
的源碼為了兼容各種情況,代碼復(fù)雜度比較高,同時超時參數(shù)小于等于0跟大于0的邏輯是一樣的,分開寫只是為了避免在小于等于0時創(chuàng)建了一些額外的對象,在精簡了一些asyncio.Task
傳播異常給waiter
的邏輯后,wait_for
的執(zhí)行邏輯如下圖:
fut為可等待對象,timeout為超時時間
可以看到wait_for
的主要邏輯是先創(chuàng)建一個名為waiter
的容器,接著通過loop.call_later
指定在多少時間后釋放容器,然后再通過ensure_future
使另一個asyncio.Task
來托管可等待對象,并安排執(zhí)行完畢的時候釋放容器,再等待waiter
容器的執(zhí)行直到被釋放。當(dāng)容器被釋放的時候再判斷可等待對象是否執(zhí)行完畢,如果執(zhí)行完畢了就直接返回,否則拋出超時異常。
4.簡單的等待--wait
asyncio.wait
用于等待一批可等待對象,當(dāng)有一個可等待對象執(zhí)行完成或者出現(xiàn)異常的時候才會返回數(shù)據(jù)(具體還是要看return_when
指定的條件,默認為所有等待對象結(jié)束或取消時才返回),需要注意的是wait
雖然支持timeout
參數(shù),但是在超時的試試不會取消可等待對象,也不會拋出超時的異常,只會把完成的可等待對象放在完成的集合,把未完成的可等待對象放在未完成的集合并返回,如下代碼:
import asyncio async def main(): return await asyncio.wait( {asyncio.create_task(asyncio.sleep(1))}, timeout=0.5 ) if __name__ == "__main__": asyncio.run(main())
這段代碼可以正常的運作,不會拋出超時錯,不過還要注意的是在后續(xù)版本中asyncio.wait
只支持Task
對象,如果想要傳入的是coro
和Future
對象,則需要開發(fā)者自己手動轉(zhuǎn)換。 wait
的邏輯與wait_for
類似,源碼如下:
async def _wait(fs, timeout, return_when, loop): assert fs, 'Set of Futures is empty.' waiter = loop.create_future() timeout_handle = None if timeout is not None: # 定義一個time handler,在timeout秒后通過`_release_waiter`完成. timeout_handle = loop.call_later(timeout, _release_waiter, waiter) counter = len(fs) def _on_completion(f): # 每個可等待對象執(zhí)行完成的回調(diào) nonlocal counter counter -= 1 if (counter <= 0 or return_when == FIRST_COMPLETED or return_when == FIRST_EXCEPTION and (not f.cancelled() and f.exception() is not None) ): # 如果所有任務(wù)執(zhí)行完成,或者是第一個完成或者是第一個拋出異常時, # 意味著執(zhí)行完成,需要取消time handler,并標(biāo)記為完成 if timeout_handle is not None: timeout_handle.cancel() if not waiter.done(): waiter.set_result(None) # 為每個可等待對象添加回調(diào) for f in fs: f.add_done_callback(_on_completion) try: # 等待替身執(zhí)行完成 await waiter finally: # 取消time handler并移除回調(diào)(因為cancel是異步的) if timeout_handle is not None: timeout_handle.cancel() for f in fs: f.remove_done_callback(_on_completion) # 處理并返回done和pending,其中done代表完成,pending代表執(zhí)行中。 done, pending = set(), set() for f in fs: if f.done(): done.add(f) else: pending.add(f) return done, pending
可以看到wait_for
的復(fù)雜度沒有wait
高,而且可以看到asyncio.wait
是等waiter
這個容器執(zhí)行完并移除可等待對象上面的_on_completion
回調(diào)后才把可等待對象按照是否完成區(qū)分到done
和pending
兩個集合,這樣的準(zhǔn)確度比在_on_completion
高一些,但是如果開發(fā)者在處理集合時觸發(fā)一些異步操作也可能導(dǎo)致pending
集合中的部分可等待對象變?yōu)橥瓿傻?,如下代碼:
import asyncio async def main(): f_list = [asyncio.Future() for _ in range(10)] done, pending = await asyncio.wait(f_list, timeout=1) print(len(done), len(pending)) print([i for i in pending if i.done()]) f_list[1].set_result(True) print([i for i in pending if i.done()]) if __name__ == "__main__": asyncio.run(main()) # >>> 0 10 # >>> [] # >>> [<Future finished result=True>]
通過輸出可以發(fā)現(xiàn),在asyncio.wait
執(zhí)行完畢后,pending
中的完成的元素只有0個,而在后續(xù)強制為其中的一個Future
設(shè)置數(shù)據(jù)后,pending
中完成的元素有1個了。
5.迭代可等待對象的完成--asyncio.as_completed
asyncio.wait
的機制是只要被觸發(fā)就會返回,其他尚未完成的可等待對象需要開發(fā)者自己在處理,而asyncio.as_completed
可以確保每個可等待對象完成返回數(shù)據(jù)或者超時時拋出異常,使用方法如下:
import asyncio async def sub(i): await asyncio.sleep(i) return i async def main(): for f in asyncio.as_completed([sub(i) for i in range(5)], timeout=3): print(await f) if __name__ == "__main__": asyncio.run(main()) # >>> 0 # >>> 1 # >>> 2 # >>> Traceback (most recent call last): # File "/home/so1n/github/demo_project/demo.py", line 18, in <module> # asyncio.run(main()) # File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run # return loop.run_until_complete(main) # File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete # return future.result() # File "/home/so1n/github/demo_project/demo.py", line 14, in main # print(await f) # File "/usr/lib/python3.7/asyncio/tasks.py", line 532, in _wait_for_one # raise futures.TimeoutError # concurrent.futures._base.TimeoutError
該程序并發(fā)執(zhí)行5個協(xié)程,其中執(zhí)行最久的時間是5秒,而as_completed
設(shè)置的超時為3秒。通過輸出可以發(fā)現(xiàn),每當(dāng)一個可等待對象執(zhí)行結(jié)束時就會把數(shù)據(jù)拋出來,當(dāng)超時則會拋出超時錯誤。為了能達每有一個可等待對象就返回一次數(shù)據(jù)的效果,as_completed
通過一個隊列來維護數(shù)據(jù)的返回,它的源碼如下:
def as_completed(fs, *, timeout=None): from .queues import Queue # Import here to avoid circular import problem. done = Queue() loop = events._get_event_loop() todo = {ensure_future(f, loop=loop) for f in set(fs)} timeout_handle = None def _on_timeout(): # 超時時調(diào)用,需要注意的是,失敗時結(jié)果為空,所以要推送一個空的數(shù)據(jù)到隊列中 # 在消費者發(fā)現(xiàn)元素為空時拋出錯誤 for f in todo: f.remove_done_callback(_on_completion) done.put_nowait(None) # Queue a dummy value for _wait_for_one(). todo.clear() # Can't do todo.remove(f) in the loop. def _on_completion(f): # 如果成功,就把Future推送到隊列中,消費者可以通過Future獲取到結(jié)果 if not todo: return # _on_timeout() was here first. todo.remove(f) done.put_nowait(f) if not todo and timeout_handle is not None: timeout_handle.cancel() async def _wait_for_one(): f = await done.get() if f is None: # 如果元素為空,則證明已經(jīng)超時了,要拋出異常 raise exceptions.TimeoutError return f.result() for f in todo: f.add_done_callback(_on_completion) if todo and timeout is not None: timeout_handle = loop.call_later(timeout, _on_timeout) # 通過生成器語法返回協(xié)程函數(shù),該協(xié)程函數(shù)可以獲取最近完成的可等待對象的結(jié)果 for _ in range(len(todo)): yield _wait_for_one()
通過源碼可以發(fā)現(xiàn)可等待對象就像生產(chǎn)者一樣,執(zhí)行結(jié)束的時候就會把結(jié)果投遞給隊列,同時as_completed
會迭代跟可等待對象的數(shù)量一樣的_wait_for_one
協(xié)程函數(shù),供開發(fā)者消費數(shù)據(jù)。不過需要注意的是as_completed
在超時的時候,并不會取消尚未完成的可等待對象,他們會變?yōu)椴豢煽氐臓顟B(tài),在某些時候會造成內(nèi)存溢出,如下示例代碼:
import asyncio import random async def sub(): # 一半的幾率會被set一個值并返回,一半的幾率會卡死 f = asyncio.Future() if random.choice([0, 1]) == 0: f.set_result(None) return await f async def main(): try: for f in asyncio.as_completed([sub() for i in range(5)], timeout=1): print(await f) except asyncio.TimeoutError: # 忽略超時 pass # 統(tǒng)計未完成的sub任務(wù) cnt = 0 for i in asyncio.all_tasks(): if i._coro.__name__ == sub.__name__: cnt += 1 print("runing task by name sub:", cnt) if __name__ == "__main__": asyncio.run(main()) # >>> None # >>> None # >>> None # >>> runing task by name sub: 2
通過結(jié)果(由于采用隨機,結(jié)果可能不一樣)可以發(fā)現(xiàn),sub
成功執(zhí)行完成的數(shù)量有3個(輸出None
),而在as_completed
觸發(fā)超時后仍有兩個sub
在執(zhí)行中,這時的兩個sub
成為無人管理的可等待對象,除非開發(fā)者通過asyncio.all_tasks
去找到他并清理掉,否則這幾個可等待對象會一直伴隨著程序運行,這很容易造成內(nèi)存溢出。
以上就是Python Asyncio庫之a(chǎn)syncio.task常用函數(shù)詳解的詳細內(nèi)容,更多關(guān)于Python Asyncio asyncio.task的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
python和pygame實現(xiàn)簡單俄羅斯方塊游戲
這篇文章主要為大家詳細介紹了python和pygame實現(xiàn)簡單俄羅斯方塊游戲,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-06-06Python列表reverse()函數(shù)使用方法詳解
這篇文章主要詳細介紹了Python列表reverse()函數(shù)使用方法,文章通過代碼示例講解的非常詳細,對我們的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2023-07-07python3 實現(xiàn)函數(shù)寫文件路徑的正確方法
今天小編就為大家分享一篇python3 實現(xiàn)函數(shù)寫文件路徑的正確方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-11-11python實現(xiàn)mp3文件播放的具體實現(xiàn)代碼
前段時間在搞一個基于python的語音助手,其中需要用到python播放音頻的功能,下面這篇文章主要給大家介紹了關(guān)于python實現(xiàn)mp3文件播放的具體實現(xiàn)代碼,需要的朋友可以參考下2023-05-05python 應(yīng)用之Pycharm 新建模板默認添加編碼格式-作者-時間等信息【推薦】
這篇文章主要介紹了Pycharm 新建模板默認添加編碼格式-作者-時間等信息 ,本文圖文并茂給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2019-06-06關(guān)于Python?中IndexError:list?assignment?index?out?of?rang
這篇文章主要介紹了Python?中IndexError:list?assignment?index?out?of?range?錯誤解決,概述了兩個常見的列表函數(shù),它們可以幫助我們在替換兩個列表時幫助我們處理?Python?中的索引錯誤,需要的朋友可以參考下2023-05-05