簡單有效上手Python3異步asyncio問題
Python3異步asyncio問題
官方文檔:
https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.run
看了一大堆相關(guān)的資料和教程,針對的Python版本不同,寫法也各不一致,翻了翻官方的文檔,發(fā)現(xiàn)其實(shí)越高版本的Python對異步進(jìn)行封裝的越方便,官方說法叫高層級API
,甚至都不用去理解什么Future\task\loop之類的概念了,我現(xiàn)在用的是Python 3.7.5
,可以這樣很簡單的實(shí)現(xiàn)阻塞等待\異步并發(fā):如果沒有復(fù)雜需求的話,用高層級API就可以了。
如果涉及到回調(diào)的話貌似還得用低層級的API,后面單獨(dú)記錄。
import asyncio async def first(): print('first函數(shù)調(diào)用開始,下面將會等待3秒模擬函數(shù)運(yùn)行完畢') await asyncio.sleep(3) print('first函數(shù)執(zhí)行完畢') async def last(): print('last函數(shù)調(diào)用開始') await asyncio.sleep(2) print('last函數(shù)執(zhí)行完畢') async def func(delay): print('開始異步同時(shí)執(zhí)行的函數(shù)+延遲: ' + str(delay)) await asyncio.sleep(delay) print('--異步函數(shù)執(zhí)行完畢+延遲: ' + str(delay)) async def main(): await first() # 這里先調(diào)用first()函數(shù),并且等它執(zhí)行完了才會開始 await asyncio.gather( func(1), func(2), func(3) ) await last() asyncio.run(main())
上面代碼實(shí)際執(zhí)行的過程是:
- 開始執(zhí)行
first()
函數(shù) - 等
first()
執(zhí)行完畢后開始并發(fā)執(zhí)行下面gather
中加入的 - 三個(gè)函數(shù)(任務(wù))三個(gè)函數(shù)全部并發(fā)執(zhí)行完畢后執(zhí)行
last()
官方文檔中給的建議是只創(chuàng)建一個(gè)主入口的main()
函數(shù)(當(dāng)然這個(gè)函數(shù)名可以自定義的),將要調(diào)用的其他函數(shù)都放在這個(gè)函數(shù)中,然后再使用asyncio.run()
啟動(dòng),理想情況下應(yīng)當(dāng)只被調(diào)用一次.
上圖:
更新
上面所謂的高階層API
用法最后一行asyncio.run(main())
和下面使用低階層API
實(shí)現(xiàn)效果是一樣的:
loop = asyncio.get_event_loop() task = loop.create_task(main()) loop.run_until_complete(task)
下面是學(xué)習(xí)過程中記錄的偏低層實(shí)現(xiàn)的資料
最基本的定義和應(yīng)用
import asyncio # 定義一個(gè)可以異步調(diào)用的函數(shù),其類型為coroutine async def func1(): pass if __name__ == '__main__': loop = asyncio.get_event_loop() # 定義一個(gè)用來循環(huán)異步函數(shù)的loop對象 task = asyncio.ensure_future(func1()) # 創(chuàng)建一個(gè)調(diào)用異步函數(shù)的任務(wù),task類型為future # task = loop.create_task(func1()) # 使用loop的.create_task()創(chuàng)建任務(wù),效果一樣 loop.run_until_complete(task) # 開始在loop循環(huán)中執(zhí)行異步函數(shù),直到該函數(shù)運(yùn)行完畢
asyncio.ensure_future(coroutine)
和 loop.create_task(coroutine)
都可以創(chuàng)建一個(gè)task
,run_until_complete
的參數(shù)是一個(gè)futrue對象。
當(dāng)傳入一個(gè)協(xié)程,其內(nèi)部會自動(dòng)封裝成task,task是Future的子類。
isinstance(task, asyncio.Future)
將會輸出True。
future
類型的任務(wù)可以在loop.run_until_complete
中執(zhí)行,也可以直接用await
+任務(wù)變量名阻塞?調(diào)用
什么時(shí)候使用異步
import asyncio # 這是一個(gè)耗時(shí)很少的異步函數(shù) async def msg(text): await asyncio.sleep(0.1) print(text) # 這是一個(gè)耗時(shí)較長的異步函數(shù) async def long_operation(): print('long_operation started') await asyncio.sleep(3) print('long_operation finished') # 主函數(shù)部分,同樣需要聲明為async異步類型 async def main(): await msg('first') # 現(xiàn)在需要調(diào)用一個(gè)耗時(shí)較長的函數(shù)操作,不希望阻塞的等待它執(zhí)行完畢 # 希望long_operation在開始執(zhí)行后,立即調(diào)用msg,這里就可以將long_operation封裝到task任務(wù)中 task = asyncio.ensure_future(long_operation()) await msg('second') # 開始task中的long_operation函數(shù) await task # task執(zhí)行完畢后會繼續(xù)下面的代碼 print('All done.') if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main())
運(yùn)行結(jié)果:
并發(fā)和并行
并發(fā)和并行一直是容易混淆的概念。并發(fā)通常指有多個(gè)任務(wù)需要同時(shí)進(jìn)行,并行則是同一時(shí)刻有多個(gè)任務(wù)執(zhí)行。
用上課來舉例就是,并發(fā)情況下是一個(gè)老師在同一時(shí)間段輔助不同的人功課。
并行則是好幾個(gè)老師分別同時(shí)輔助多個(gè)學(xué)生功課。
簡而言之就是一個(gè)人同時(shí)吃三個(gè)饅頭還是三個(gè)人同時(shí)分別吃一個(gè)的情況,吃一個(gè)饅頭算一個(gè)任務(wù)。
asyncio實(shí)現(xiàn)并發(fā),就需要多個(gè)協(xié)程來完成任務(wù),每當(dāng)有任務(wù)阻塞的時(shí)候就await,然后其他協(xié)程繼續(xù)工作。
創(chuàng)建多個(gè)協(xié)程的列表,然后將這些協(xié)程注冊到事件循環(huán)中。
import asyncio import time now = lambda: time.time() async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) start = now() coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) for task in tasks: print('Task ret: ', task.result()) print('TIME: ', now() - start)
結(jié)果如下
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
TIME: 4.003541946411133
總時(shí)間為4s左右。4s的阻塞時(shí)間,足夠前面兩個(gè)協(xié)程執(zhí)行完畢。如果是同步順序的任務(wù),那么至少需要7s。此時(shí)我們使用了aysncio實(shí)現(xiàn)了并發(fā)。
asyncio.wait(tasks) 也可以使用 asyncio.gather(*tasks) ,前者接受一個(gè)task列表,后者接收一堆task。
異步結(jié)果回調(diào)
找了個(gè)別人寫的例子,大致理解了下實(shí)現(xiàn)過程:
- 定義
async
異步函數(shù) - 定義普通函數(shù)用于處理回調(diào)
- 獲取異步函數(shù)的
coroutine
協(xié)程對象(其實(shí)就是不帶await
修飾直接執(zhí)行異步函數(shù)返回的那個(gè)對象) - 獲取
loop
循環(huán)對象 - 使用低階層API手動(dòng)創(chuàng)建
task
任務(wù) - 為
task
任務(wù)對象注冊回調(diào)函數(shù) - 啟動(dòng)
loop
循環(huán)調(diào)用異步函數(shù)
import time import asyncio now = lambda : time.time() async def do_some_work(x): print('Waiting: ', x) return 'Done after {}s'.format(x) def callback(future): print('Callback: ', future.result()) start = now() coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = loop.create_task(coroutine) #task = asyncio.ensure_future(coroutine) # 貌似和上面用loop創(chuàng)建任務(wù)效果一樣 task.add_done_callback(callback) loop.run_until_complete(task) print('TIME: ', now() - start)
這里需要注意,在使用低層級API
手動(dòng)創(chuàng)建異步任務(wù)的時(shí)候,不能同時(shí)使用高層級API
的簡單操作了,比如這里創(chuàng)建task
任務(wù)對象的時(shí)候,就不能用asyncio.create_task()
,否則會找不到loop
對象,返回下面的錯(cuò)誤
RuntimeError: no running event loop
翻了一下asyncio\tasks.py
源代碼,原來所謂的高層級API
就是這么簡單的封裝啊…
調(diào)用的時(shí)候在函數(shù)內(nèi)部又定義了一遍loop
def create_task(coro): """Schedule the execution of a coroutine object in a spawn task. Return a Task object. """ loop = events.get_running_loop() return loop.create_task(coro)
我自己寫的例子
創(chuàng)建4個(gè)異步任務(wù)同時(shí)開始執(zhí)行,每個(gè)任務(wù)執(zhí)行完成后將結(jié)果追加到result_list
數(shù)組變量中.
import asyncio result_list = [] async def fun(var): return var + 1 def callbackFun(future): result_list.append(future.result()) task_list = [] for i in range(1, 5): cor = fun(i) task = asyncio.ensure_future(cor) task.add_done_callback(callbackFun) task_list.append(task) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(task_list)) print(result_list)
當(dāng)然,如果不需要使用回調(diào)函數(shù),而是等所有提交的異步任務(wù)都執(zhí)行完成后獲取它們的結(jié)果,可以這樣寫:
# 前面省略 loop.run_until_complete(asyncio.wait(task_list)) # task_list中的每個(gè)任務(wù)執(zhí)行完成后可以調(diào)用它的.result()方法來獲取結(jié)果 for task in task_list: print('每個(gè)task執(zhí)行完的結(jié)果:', task.result())
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
python學(xué)習(xí)pymongo模塊的使用方法
這篇文章主要介紹了python學(xué)習(xí)pymongo模塊的使用方法,pymongo模塊是python操作mongo數(shù)據(jù)的第三方模塊,總結(jié)一下常用到的簡單用,需要的小伙伴可以參考一下2022-09-09Centos部署django服務(wù)nginx+uwsgi的方法
這篇文章主要介紹了Centos部署django服務(wù)nginx+uwsgi的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2019-01-01python matplotlib 注釋文本箭頭簡單代碼示例
這篇文章主要介紹了python matplotlib 注釋文本箭頭簡單代碼示例,具有一定借鑒價(jià)值。2018-01-01在Django的URLconf中使用多個(gè)視圖前綴的方法
這篇文章主要介紹了在Django的URLconf中使用多個(gè)視圖前綴的方法,Django是Python中最為著名的遵循MVC結(jié)構(gòu)的開發(fā)框架,需要的朋友可以參考下2015-07-07python+OpenCV人臉識別考勤系統(tǒng)實(shí)現(xiàn)的詳細(xì)代碼
作為一個(gè)基于人臉識別算法的考勤系統(tǒng)的設(shè)計(jì)與實(shí)現(xiàn)教程,以下內(nèi)容將提供詳細(xì)的步驟和代碼示例。本教程將使用 Python 語言和 OpenCV 庫進(jìn)行實(shí)現(xiàn),需要的朋友可以參考下2023-05-05