Python混合使用同步和異步函數(shù)的方法
前言
異步編程可以提高應(yīng)用程序的性能和吞吐量,因?yàn)樗梢猿浞掷?CPU 和 I/O 資源。當(dāng)某個(gè)任務(wù)被阻塞時(shí),事件循環(huán)可以切換到另一個(gè)任務(wù),從而避免浪費(fèi) CPU 時(shí)間。此外,異步編程還可以簡(jiǎn)化代碼,使其更易于維護(hù)和調(diào)試。
我們最常用的是同步編程,在同步場(chǎng)景中,某個(gè)任務(wù)被阻塞時(shí),整個(gè)線程都會(huì)被掛起,直到該任務(wù)完成,所以為了避免整個(gè)程序被阻塞的情況,又引入了多線程和鎖。同步編程通常需要使用鎖和其他同步原語(yǔ)來(lái)確保線程安全。
混合編寫的場(chǎng)景
在實(shí)際開(kāi)發(fā)過(guò)程中,通常會(huì)遇到同時(shí)進(jìn)行異步和同步操作的場(chǎng)景。例如,在使用異步 Web 框架 FastAPI 時(shí),對(duì)于實(shí)際的一些業(yè)務(wù)需求將不得不采用同步編程。例如,可能需要使用同步數(shù)據(jù)庫(kù)驅(qū)動(dòng)程序或同步文件系統(tǒng)接口,又或者調(diào)用同步接口等等。
在協(xié)程函數(shù)中調(diào)用同步函數(shù)
在協(xié)程函數(shù)中直接調(diào)用同步函數(shù)會(huì)阻塞事件循環(huán),從而影響整個(gè)程序的性能。我們先來(lái)看一個(gè)例子:
以下是使用異步 Web 框架 FastAPI 寫的一個(gè)例子,F(xiàn)astAPI 是比較快,但不正確的操作將會(huì)變得很慢。
import time from fastapi import FastAPI app = FastAPI() @app.get("/") async def root(): time.sleep(10) return {"message": "Hello World"} @app.get("/health") async def health(): return {"status": "ok"}
上面我們寫了兩個(gè)接口,假設(shè) root
接口函數(shù)耗時(shí) 10 秒,在這 10 秒內(nèi)訪問(wèn) health
接口,想一想會(huì)發(fā)生什么?
訪問(wèn) root
接口(左),立即訪問(wèn) health
接口(右),health
接口被阻塞,直至 root
接口返回后,health
接口才成功響應(yīng)。
time.sleep
就是一個(gè)「同步」函數(shù),它會(huì)阻塞整個(gè)事件循環(huán)。
如何解決呢?想一想以前的處理方法,如果一個(gè)函數(shù)會(huì)阻塞主線程,那么就再開(kāi)一個(gè)線程讓這個(gè)阻塞函數(shù)單獨(dú)運(yùn)行。所以,這里也是同理,開(kāi)一個(gè)線程單獨(dú)去運(yùn)行那些阻塞式操作,比如讀取文件等。
loop.run_in_executor
方法將同步函數(shù)轉(zhuǎn)換為異步非阻塞方式進(jìn)行處理。具體來(lái)說(shuō),loop.run_in_executor()
可以將同步函數(shù)創(chuàng)建為一個(gè)線程或進(jìn)程,并在其中執(zhí)行該函數(shù),從而避免阻塞事件循環(huán)。
那么,我們使用 loop.run_in_executor
改寫上面例子,如下:
import asyncio import time from fastapi import FastAPI app = FastAPI() @app.get("/") async def root(): loop = asyncio.get_event_loop() def do_blocking_work(): time.sleep(10) print("Done blocking work!!") await loop.run_in_executor(None, do_blocking_work) return {"message": "Hello World"} @app.get("/health") async def health(): return {"status": "ok"}
效果如下:
root
接口被阻塞期間,health
依然正常訪問(wèn)互不影響。
注意: 這里都是為了演示,實(shí)際在使用 FastAPI 開(kāi)發(fā)時(shí),你可以直接將
async def root
更換成def root
,也就是將其換成同步接口函數(shù),F(xiàn)astAPI 內(nèi)部會(huì)自動(dòng)創(chuàng)建線程處理這個(gè)同步接口函數(shù)??偟膩?lái)說(shuō),F(xiàn)astAPI 內(nèi)部也是依靠線程去處理同步函數(shù)從而避免阻塞主線程(或主線程中的事件循環(huán))。
在同步函數(shù)中調(diào)用異步函數(shù)
協(xié)程只能在「事件循環(huán)」內(nèi)被執(zhí)行,且同一時(shí)刻只能有一個(gè)協(xié)程被執(zhí)行。
所以,在同步函數(shù)中調(diào)用異步函數(shù),其本質(zhì)就是將協(xié)程「扔進(jìn)」事件循環(huán)中,等待該協(xié)程執(zhí)行完獲取結(jié)果即可。
以下這些函數(shù),都可以實(shí)現(xiàn)這個(gè)效果:
asyncio.run
asyncio.run_coroutine_threadsafe
loop.run_until_complete
create_task
接下來(lái),我們將一一講解這些方法并舉例說(shuō)明。
asyncio.run
這個(gè)方法使用起來(lái)最簡(jiǎn)單,先看下如何使用,然后緊跟著講一下哪些場(chǎng)景不能直接使用 asyncio.run
import asyncio async def do_work(): return 1 def main(): result = asyncio.run(do_work()) print(result) # 1 if __name__ == "__main__": main()
直接 run
就完事了,然后接受返回值即可。
但是需要,注意的是 asyncio.run
每次調(diào)用都會(huì)新開(kāi)一個(gè)事件循環(huán),當(dāng)結(jié)束時(shí)自動(dòng)關(guān)閉該事件循環(huán)。
一個(gè)線程內(nèi)只存在一個(gè)事件循環(huán),所以如果當(dāng)前線程已經(jīng)有存在的事件循環(huán)了,就不應(yīng)該使用 asyncio.run
了,否則就會(huì)拋出如下異常:
RuntimeError: asyncio.run() cannot be called from a running event loop
因此,asyncio.run
用作新開(kāi)一個(gè)事件循環(huán)時(shí)使用。
asyncio.run_coroutine_threadsafe
文檔: https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.run_coroutine_threadsafe
向指定事件循環(huán)提交一個(gè)協(xié)程。(線程安全)
返回一個(gè)concurrent.futures.Future
以等待來(lái)自其他 OS 線程的結(jié)果。
換句話說(shuō),就是將協(xié)程丟給其他線程中的事件循環(huán)去運(yùn)行。
值得注意的是這里的「事件循環(huán)」應(yīng)該是其他線程中的事件循環(huán),非當(dāng)前線程的事件循環(huán)。
其返回的結(jié)果是一個(gè) future 對(duì)象,如果你需要獲取協(xié)程的執(zhí)行結(jié)果可以使用 future.result()
獲取,關(guān)于 future 對(duì)象的更多介紹,見(jiàn) https://docs.python.org/zh-cn/3/library/concurrent.futures.html#concurrent.futures.Future
下方給了一個(gè)例子,一共有兩個(gè)線程:thread_with_loop
和 another_thread
,分別用于啟動(dòng)事件循環(huán)和調(diào)用 run_coroutine_threadsafe
import asyncio import threading import time loop = None def get_loop(): global loop if loop is None: loop = asyncio.new_event_loop() return loop def another_thread(): async def coro_func(): return 1 loop = get_loop() # 將協(xié)程提交到另一個(gè)線程的事件循環(huán)中執(zhí)行 future = asyncio.run_coroutine_threadsafe(coro_func(), loop) # 等待協(xié)程執(zhí)行結(jié)果 print(future.result()) # 停止事件循環(huán) loop.call_soon_threadsafe(loop.stop) def thread_with_loop(): loop = get_loop() # 啟動(dòng)事件循環(huán),確保事件循環(huán)不會(huì)退出,直到 loop.stop() 被調(diào)用 loop.run_forever() loop.close() # 啟動(dòng)一個(gè)線程,線程內(nèi)部啟動(dòng)了一個(gè)事件循環(huán) threading.Thread(target=thread_with_loop).start() time.sleep(1) # 在主線程中啟動(dòng)一個(gè)協(xié)程, 并將協(xié)程提交到另一個(gè)線程的事件循環(huán)中執(zhí)行 t = threading.Thread(target=another_thread) t.start() t.join()
loop.run_until_complete
文檔: https://docs.python.org/zh-cn/3.10/library/asyncio-eventloop.html#asyncio.loop.run_until_complete
運(yùn)行直到 future (
Future
的實(shí)例 ) 被完成。
這個(gè)方法和
asyncio.run
類似。
具體就是傳入一個(gè)協(xié)程對(duì)象或者任務(wù),然后可以直接拿到協(xié)程的返回值。
run_until_complete
屬于 loop
對(duì)象的方法,所以這個(gè)方法的使用前提是有一個(gè)事件循環(huán),注意這個(gè)事件循環(huán)必須是非運(yùn)行狀態(tài),如果是運(yùn)行中就會(huì)拋出如下異常:
RuntimeError: This event loop is already running
例子:
loop = asyncio.new_event_loop() loop.run_until_complete(do_async_work())
create_task
文檔: https://docs.python.org/zh-cn/3/library/asyncio-task.html#creating-tasks
再次準(zhǔn)確一點(diǎn):要運(yùn)行一個(gè)協(xié)程函數(shù)的本質(zhì)是將攜帶協(xié)程函數(shù)的任務(wù)提交至事件循環(huán)中,由事件循環(huán)發(fā)現(xiàn)、調(diào)度并執(zhí)行。
其實(shí)一共就是滿足兩個(gè)條件:
- 任務(wù);
- 事件循環(huán)。
我們使用 async def func
定義的函數(shù)叫做協(xié)程函數(shù),func()
這樣調(diào)用之后返回的結(jié)果是協(xié)程對(duì)象,到這一步協(xié)程函數(shù)內(nèi)的代碼都沒(méi)有被執(zhí)行,直到協(xié)程對(duì)象被包裝成了任務(wù),事件循環(huán)才會(huì)“正眼看它們”。
所以事件循環(huán)調(diào)度運(yùn)行的基本單元就是任務(wù),那為什么我們?cè)谑褂?async/await
這些語(yǔ)句時(shí)沒(méi)有涉及到任務(wù)這個(gè)概念呢?
這是因?yàn)?await
語(yǔ)法糖在內(nèi)部將協(xié)程對(duì)象封裝成了任務(wù),再次強(qiáng)調(diào)事件循環(huán)只認(rèn)識(shí)任務(wù)。
所以,想要運(yùn)行一個(gè)協(xié)程對(duì)象,其實(shí)就是將協(xié)程對(duì)象封裝成一個(gè)任務(wù),至于事件循環(huán)是如何發(fā)現(xiàn)、調(diào)度和執(zhí)行的,這個(gè)我們不用關(guān)心。
那將協(xié)程封裝成的任務(wù)的方法有哪些呢?
asyncio.create_task
asyncio.ensure_future
loop.create_task
看著有好幾個(gè)的,沒(méi)關(guān)系,我們只關(guān)心 loop.create_task
,因?yàn)槠渌椒ㄗ罱K都是調(diào)用 loop.create_task
。
使用起來(lái)也是很簡(jiǎn)單的,將協(xié)程對(duì)象傳入,返回值是一個(gè)任務(wù)對(duì)象。
async def do_work(): return 222 task = loop.create_task(do_work())
do_work
會(huì)被異步執(zhí)行,那么 do_work 的結(jié)果怎么獲取呢,task.result()
可以嗎?
分情況:
- 如果是在一個(gè)協(xié)程函數(shù)內(nèi)使用
await task.result()
,這是可以的; - 如果是在普通函數(shù)內(nèi)則不行。你不可能立即獲得協(xié)程函數(shù)的返回值,因?yàn)閰f(xié)程函數(shù)還沒(méi)有被執(zhí)行呢。
asyncio.Task 運(yùn)行使用 add_done_callback 添加完成時(shí)的回調(diào)函數(shù),所以我們可以「曲線救國(guó)」,使用回調(diào)函數(shù)將結(jié)果添加到隊(duì)列、Future 等等。
我這里給個(gè)基于 concurrent.futures.Future
獲取結(jié)果的例子,如下:
import asyncio from asyncio import Task from concurrent.futures import Future from fastapi import FastAPI app = FastAPI() loop = asyncio.get_event_loop() async def do_work1(): return 222 @app.get("/") def root(): # 新建一個(gè) future 對(duì)象,用于接受結(jié)果值 future = Future() # 提交任務(wù)至事件循環(huán) task = loop.create_task(do_work1()) # 回調(diào)函數(shù) def done_callback(task: Task): # 設(shè)置結(jié)果 future.set_result(task.result()) # 為這個(gè)任務(wù)添加回調(diào)函數(shù) task.add_done_callback(done_callback) # future.result 會(huì)被阻塞,直到有結(jié)果返回為止 return future.result() # 222
總結(jié)
協(xié)程函數(shù)中調(diào)用同步函數(shù)
核心思想:為了避免同步函數(shù)的執(zhí)行阻塞當(dāng)前線程(當(dāng)前線程中的事件循環(huán)),應(yīng)該由其他線程或進(jìn)程執(zhí)行后獲取結(jié)果。
同步函數(shù)中調(diào)用協(xié)程函數(shù)
核心思想:任務(wù)協(xié)程對(duì)象都需要依賴事件循環(huán)。
所以要么啟動(dòng)新的事件循環(huán)以執(zhí)行協(xié)程,要么依賴已有的事件循環(huán), 至于實(shí)現(xiàn)方式就根據(jù)實(shí)際情況而選擇。
到此這篇關(guān)于Python混合使用同步和異步函數(shù)的方法的文章就介紹到這了,更多相關(guān)Python混合使用同步和異步函數(shù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Keras模型轉(zhuǎn)成tensorflow的.pb操作
這篇文章主要介紹了Keras模型轉(zhuǎn)成tensorflow的.pb操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-07-07Python3爬蟲(chóng)之自動(dòng)查詢天氣并實(shí)現(xiàn)語(yǔ)音播報(bào)
這篇文章主要介紹了Python3爬蟲(chóng)之自動(dòng)查詢天氣并實(shí)現(xiàn)語(yǔ)音播報(bào),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-02-02Django項(xiàng)目中使用JWT的實(shí)現(xiàn)代碼
這篇文章主要介紹了Django項(xiàng)目中使用JWT的實(shí)現(xiàn)代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11Pygame游戲開(kāi)發(fā)之太空射擊實(shí)戰(zhàn)敵人精靈篇
相信大多數(shù)8090后都玩過(guò)太空射擊游戲,在過(guò)去游戲不多的年代太空射擊自然屬于經(jīng)典好玩的一款了,今天我們來(lái)自己動(dòng)手實(shí)現(xiàn)它,在編寫學(xué)習(xí)中回顧過(guò)往展望未來(lái),下面開(kāi)始講解敵人精靈的使用2022-08-08