Python中Async語(yǔ)法協(xié)程的實(shí)現(xiàn)
前言
在io比較多的場(chǎng)景中, Async
語(yǔ)法編寫的程序會(huì)以更少的時(shí)間, 更少的資源來(lái)完成相同的任務(wù), 這篇文章則是介紹了Python
的Async
語(yǔ)法的協(xié)程是如何實(shí)現(xiàn)的。
1.傳統(tǒng)的Sync語(yǔ)法請(qǐng)求例子
還是一樣, 在了解Async
語(yǔ)法的實(shí)現(xiàn)之前, 先從一個(gè)Sync
的語(yǔ)法例子開(kāi)始, 現(xiàn)在假設(shè)有一個(gè)HTTP請(qǐng)求, 這個(gè)程序會(huì)通過(guò)這個(gè)請(qǐng)求獲取對(duì)應(yīng)的響應(yīng)內(nèi)容, 并打印出來(lái), 代碼如下:
import socket def request(host: str) -> None: """模擬請(qǐng)求并打印響應(yīng)體""" url: str = f"http://{host}" sock: socket.SocketType = socket.socket() sock.connect((host, 80)) sock.send(f"GET {url} HTTP/1.0\r\nHost: {host}\r\n\r\n".encode("ascii")) response_bytes: bytes = b"" chunk: bytes = sock.recv(4096) while chunk: response_bytes += chunk chunk = sock.recv(4096) print("\n".join([i for i in response_bytes.decode().split("\r\n")])) if __name__ == "__main__": request("so1n.me")
運(yùn)行程序, 程序能夠正常輸出, 上部分打印了對(duì)應(yīng)的HTTP響應(yīng)Header, 下部分打印了HTTP響應(yīng)體, , 可以看到服務(wù)端叫我們以https的形式重新請(qǐng)求, 輸出結(jié)果如下:
HTTP/1.1 301 Moved Permanently Server: GitHub.com Content-Type: text/html Location: https://so1n.me/ X-GitHub-Request-Id: A744:3871:4136AF:48BD9F:6188DB50 Content-Length: 162 Accept-Ranges: bytes Date: Mon, 08 Nov 2021 08:11:37 GMT Via: 1.1 varnish Age: 104 Connection: close X-Served-By: cache-qpg1272-QPG X-Cache: HIT X-Cache-Hits: 2 X-Timer: S1636359097.026094,VS0,VE0 Vary: Accept-Encoding X-Fastly-Request-ID: 22fa337f777553d33503cee5282598c6a293fb5e <html> <head><title>301 Moved Permanently</title></head> <body> <center><h1>301 Moved Permanently</h1></center> <hr><center>nginx</center> </body> </html>
不過(guò)這里并不是想說(shuō)HTTP請(qǐng)求是如何實(shí)現(xiàn)的, 具體我也不太了解, 在這個(gè)代碼中, socket的默認(rèn)調(diào)用是阻塞的, 當(dāng)線程調(diào)用connect
或者recv
時(shí)(send
是不用等待的, 但在高并發(fā)下需要先等待drain
后才可以send
, 小demo不需要用到drain
方法), 程序?qū)?huì)暫停直到操作完成。 當(dāng)一次要下載很多網(wǎng)頁(yè)的話, 這將會(huì)如上篇文章所說(shuō)的一樣, 大部分的等待時(shí)間都花在io上面, cpu卻一直空閑時(shí), 而使用線程池雖然可以解決這個(gè)問(wèn)題, 但是開(kāi)銷是很大的, 同時(shí)操作系統(tǒng)往往會(huì)限制一個(gè)進(jìn)程,用戶或者機(jī)器可以使用的線程數(shù), 而協(xié)程卻沒(méi)有這些限制, 占用的資源少, 也沒(méi)有系統(tǒng)限制瓶頸。
2.異步的請(qǐng)求
異步可以讓一個(gè)單獨(dú)的線程處理并發(fā)的操作, 不過(guò)在上面已經(jīng)說(shuō)過(guò)了, socket是默認(rèn)阻塞的, 所以需要把socket設(shè)置為非阻塞的, socket提供了setblocking
這個(gè)方法供開(kāi)發(fā)者選擇是否阻塞, 在設(shè)置了非阻塞后, connect
和recv
方法也要進(jìn)行更改。
由于沒(méi)有了阻塞, 程序在調(diào)用了connect
后會(huì)馬上返回, 只不過(guò)Python
的底層是C
, 這段代碼在C
中調(diào)用非阻塞的socket.connect后會(huì)拋出一個(gè)異常, 我們需要捕獲它, 就像這樣:
import socket sock: socket.SocketType = socket.socket() sock.setblocking(Flase) try: sock.connect(("so1n.me", 80)) except BlockingIOError: pass
經(jīng)過(guò)一頓操作后, 就開(kāi)始申請(qǐng)建立連接了, 但是我們還不知道連接啥時(shí)候完成建立, 由于連接沒(méi)建立時(shí)調(diào)用send
會(huì)報(bào)錯(cuò), 所以可以一直輪詢調(diào)用send
直到?jīng)]報(bào)錯(cuò)就認(rèn)為是成功(真實(shí)代碼需要加超時(shí)):
while True: try: sock.send(request) break except OSError as e: pass
但是這樣讓CPU空轉(zhuǎn)太浪費(fèi)性能了, 而且期間還不能做別的事情, 就像我們點(diǎn)外賣后一直打電話過(guò)去問(wèn)飯菜做好了沒(méi)有, 十分浪費(fèi)電話費(fèi)用, 要是飯菜做完了就打電話告訴我們, 那就只產(chǎn)生了一筆費(fèi)用, 非常的省錢(正常情況下也是這樣子)。
這時(shí)就需要事件循環(huán)登場(chǎng)了,在類UNIX中, 有一個(gè)叫select
的功能, 它可以等待事件發(fā)生后再調(diào)用監(jiān)聽(tīng)的函數(shù), 不過(guò)一開(kāi)始的實(shí)現(xiàn)性能不是很好, 在Linux
上被epoll
取代, 不過(guò)接口是類似的, 所在在Python
中把這幾個(gè)不同的事件循環(huán)都封裝在selectors
庫(kù)中, 同時(shí)可以通過(guò)DefaultSelector
從系統(tǒng)中挑出最好的類select
函數(shù)。
這里先暫時(shí)不說(shuō)事件循環(huán)的原理, 事件循環(huán)最主要的是他名字的兩部分, 一個(gè)是事件, 一個(gè)是循環(huán), 在Python
中, 可以通過(guò)如下方法把事件注冊(cè)到事件循環(huán)中:
def demo(): pass selector.register(fd, EVENT_WRITE, demo)
這樣這個(gè)事件循環(huán)就會(huì)監(jiān)聽(tīng)對(duì)應(yīng)的文件描述符fd, 當(dāng)這個(gè)文件描述符觸發(fā)寫入事件(EVENT_WRITE)時(shí),事件循環(huán)就會(huì)告訴我們可以去調(diào)用注冊(cè)的函數(shù)demo
。不過(guò)如果把上面的代碼都改為這種方法去運(yùn)行的話就會(huì)發(fā)現(xiàn), 程序好像沒(méi)跑就結(jié)束了, 但程序其實(shí)是有跑的, 只不過(guò)他們是完成的了注冊(cè), 然后就等待開(kāi)發(fā)者接收事件循環(huán)的事件進(jìn)行下一步的操作, 所以我們只需要在代碼的最后面寫上如下代碼:
while True: for key, mask in selector.select(): key.data()
這樣程序就會(huì)一直運(yùn)行, 當(dāng)捕獲到事件的時(shí)候, 就會(huì)通過(guò)for循環(huán)告訴我們, 其中key.data
是我們注冊(cè)的回調(diào)函數(shù), 當(dāng)事件發(fā)生時(shí), 就會(huì)通知我們, 我們可以通過(guò)拿到回調(diào)函數(shù)然后就運(yùn)行, 了解完畢后, 我們可以來(lái)編寫我們的第一個(gè)并發(fā)程序, 他實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的I/O復(fù)用的小邏輯, 代碼和注釋如下:
import socket from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE # 選擇事件循環(huán) selector: DefaultSelector = DefaultSelector() # 用于判斷是否有事件在運(yùn)行 running_cnt: int = 0 def request(host: str) -> None: """模擬請(qǐng)求并打印響應(yīng)體""" # 告訴主函數(shù), 自己的事件還在運(yùn)行 global running_cnt running_cnt += 1 # 初始化socket url: str = f"http://{host}" sock: socket.SocketType = socket.socket() sock.setblocking(False) try: sock.connect((host, 80)) except BlockingIOError: pass response_bytes: bytes = b"" def read_response() -> None: """接收響應(yīng)參數(shù), 并判斷請(qǐng)求是否結(jié)束""" nonlocal response_bytes chunk: bytes = sock.recv(4096) print(f"recv {host} body success") if chunk: response_bytes += chunk else: # 沒(méi)有數(shù)據(jù)代表請(qǐng)求結(jié)束了, 注銷監(jiān)聽(tīng) selector.unregister(sock.fileno()) global running_cnt running_cnt -= 1 def connected() -> None: """socket建立連接時(shí)的回調(diào)""" # 取消監(jiān)聽(tīng) selector.unregister(sock.fileno()) print(f"{host} connect success") # 發(fā)送請(qǐng)求, 并監(jiān)聽(tīng)讀事件, 以及注冊(cè)對(duì)應(yīng)的接收響應(yīng)函數(shù) sock.send(f"GET {url} HTTP/1.0\r\nHost: {host}\r\n\r\n".encode("ascii")) selector.register(sock.fileno(), EVENT_READ, read_response) selector.register(sock.fileno(), EVENT_WRITE, connected) if __name__ == "__main__": # 同時(shí)多個(gè)請(qǐng)求 request("so1n.me") request("github.com") request("google.com") request("baidu.com") # 監(jiān)聽(tīng)是否有事件在運(yùn)行 while running_cnt > 0: # 等待事件循環(huán)通知事件是否已經(jīng)完成 for key, mask in selector.select(): key.data()
這段代碼接近同時(shí)注冊(cè)了4個(gè)請(qǐng)求并注冊(cè)建立連接回調(diào), 然后就進(jìn)入事件循環(huán)邏輯, 也就是把控制權(quán)交給事件循環(huán), 直到事件循環(huán)告訴程序說(shuō)收到了socket建立的通知, 程序就會(huì)取消注冊(cè)的回調(diào)然后發(fā)送請(qǐng)求, 并注冊(cè)一個(gè)讀的事件回調(diào), 然后又把控制權(quán)交給事件循環(huán), 直到收到了響應(yīng)的結(jié)果才進(jìn)入處理響應(yīng)結(jié)果函數(shù)并且只有收完所有響應(yīng)結(jié)果才會(huì)退出程序。
下面是我其中的一次執(zhí)行結(jié)果
so1n.me connect success
github.com connect success
google.com connect success
recv google.com body success
recv google.com body success
baidu.com connect success
recv github.com body success
recv github.com body success
recv baidu.com body success
recv baidu.com body success
recv so1n.me body success
recv so1n.me body success
可以看到他們的執(zhí)行順序是隨機(jī)的, 不是嚴(yán)格的按照so1n.me
, github.com
, google.com
, baidu.com
順序執(zhí)行, 同時(shí)他們執(zhí)行速度很快, 這個(gè)程序的耗時(shí)約等于響應(yīng)時(shí)長(zhǎng)最長(zhǎng)的函數(shù)耗時(shí)。
但是可以看出, 這個(gè)程序里面出現(xiàn)了兩個(gè)回調(diào), 回調(diào)會(huì)讓代碼變得非常的奇怪, 降低可讀性, 也容易造成回調(diào)地獄, 而且當(dāng)回調(diào)發(fā)生報(bào)錯(cuò)的時(shí)候, 我們是很難知道這是由于什么導(dǎo)致的錯(cuò)誤, 因?yàn)樗纳舷挛膩G失了, 這樣子排查問(wèn)題十分的困惑。 作為程序員, 一般都不止?jié)M足于速度快的代碼, 真正想要的是又快, 又能像Sync
的代碼一樣簡(jiǎn)單, 可讀性強(qiáng), 也能容易排查問(wèn)題的代碼, 這種組合形式的代碼的設(shè)計(jì)模式就叫協(xié)程。
協(xié)程出現(xiàn)得很早, 它不像線程一樣, 被系統(tǒng)調(diào)度, 而是能自主的暫停, 并等待事件循環(huán)通知恢復(fù)。由于協(xié)程是軟件層面實(shí)現(xiàn)的, 所以它的實(shí)現(xiàn)方式有很多種, 這里要說(shuō)的是基于生成器的協(xié)程, 因?yàn)樯善鞲鷧f(xié)程一樣, 都有暫停讓步和恢復(fù)的方法(還可以通過(guò)throw
來(lái)拋錯(cuò)), 同時(shí)它跟Async
語(yǔ)法的協(xié)程很像, 通過(guò)了解基于生成器的協(xié)程, 可以了解Async
的協(xié)程是如何實(shí)現(xiàn)的。
3.基于生成器的協(xié)程
3.1生成器
在了解基于生成器的協(xié)程之前, 需要先了解下生成器, Python
的生成器函數(shù)與普通的函數(shù)會(huì)有一些不同, 只有普通函數(shù)中帶有關(guān)鍵字yield
, 那么它就是生成器函數(shù), 具體有什么不同可以通過(guò)他們的字節(jié)碼來(lái)了解:
In [1]: import dis # 普通函數(shù) In [2]: def aaa(): pass In [3]: dis.dis(aaa) 1 0 LOAD_CONST 0 (None) 2 RETURN_VALUE # 普通函數(shù)調(diào)用函數(shù) In [4]: def bbb(): ...: aaa() ...: In [5]: dis.dis(bbb) 2 0 LOAD_GLOBAL 0 (aaa) 2 CALL_FUNCTION 0 4 POP_TOP 6 LOAD_CONST 0 (None) 8 RETURN_VALUE # 普通生成器函數(shù) In [6]: def ccc(): yield In [7]: dis.dis(ccc) 1 0 LOAD_CONST 0 (None) 2 YIELD_VALUE 4 POP_TOP 6 LOAD_CONST 0 (None) 8 RETURN_VALUE
上面分別是普通函數(shù), 普通函數(shù)調(diào)用函數(shù)和普通生成器函數(shù)的字節(jié)碼, 從字節(jié)碼可以看出來(lái), 最簡(jiǎn)單的函數(shù)只需要LOAD_CONST
來(lái)加載變量None壓入自己的棧, 然后通過(guò)RETURN_VALUE
來(lái)返回值, 而有函數(shù)調(diào)用的普通函數(shù)則先加載變量, 把全局變量的函數(shù)aaa
加載到自己的棧里面, 然后通過(guò)CALL_FUNCTION
來(lái)調(diào)用函數(shù), 最后通過(guò)POP_TOP
把函數(shù)的返回值從棧里拋出來(lái), 再把通過(guò)LOAD_CONST
把None壓入自己的棧, 最后返回值。
而生成器函數(shù)則不一樣, 它會(huì)先通過(guò)LOAD_CONST
來(lái)加載變量None壓入自己的棧, 然后通過(guò)YIELD_VALUE
返回值, 接著通過(guò)POP_TOP
彈出剛才的棧并重新把變量None壓入自己的棧, 最后通過(guò)RETURN_VALUE
來(lái)返回值。從字節(jié)碼來(lái)分析可以很清楚的看到, 生成器能夠在yield
區(qū)分兩個(gè)棧幀, 一個(gè)函數(shù)調(diào)用可以分為多次返回, 很符合協(xié)程多次等待的特點(diǎn)。
接著來(lái)看看生成器的一個(gè)使用, 這個(gè)生成器會(huì)有兩次yield
調(diào)用, 并在最后返回字符串'None'
, 代碼如下:
In [8]: def demo(): ...: a = 1 ...: b = 2 ...: print('aaa', locals()) ...: yield 1 ...: print('bbb', locals()) ...: yield 2 ...: return 'None' ...: In [9]: demo_gen = demo() In [10]: demo_gen.send(None) aaa {'a': 1, 'b': 2} Out[10]: 1 In [11]: demo_gen.send(None) bbb {'a': 1, 'b': 2} Out[11]: 2 In [12]: demo_gen.send(None) --------------------------------------------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-12-8f8cb075d6af> in <module> ----> 1 demo_gen.send(None) StopIteration: None
這段代碼首先通過(guò)函數(shù)調(diào)用生成一個(gè)demo_gen
的生成器對(duì)象, 然后第一次send
調(diào)用時(shí)返回值1, 第二次send
調(diào)用時(shí)返回值2, 第三次send
調(diào)用則拋出StopIteration
異常, 異常提示為None
, 同時(shí)可以看到第一次打印aaa
和第二次打印bbb
時(shí), 他們都能打印到當(dāng)前的函數(shù)局部變量, 可以發(fā)現(xiàn)在即使在不同的棧幀中, 他們讀取到當(dāng)前的局部函數(shù)內(nèi)的局部變量是一致的, 這意味著如果使用生成器來(lái)模擬協(xié)程時(shí), 它還是會(huì)一直讀取到當(dāng)前上下文的, 非常的完美。
此外, Python
還支持通過(guò)yield from
語(yǔ)法來(lái)返回一個(gè)生成器, 代碼如下:
In [1]: def demo_gen_1(): ...: for i in range(3): ...: yield i ...: In [2]: def demo_gen_2(): ...: yield from demo_gen_1() ...: In [3]: demo_gen_obj = demo_gen_2() In [4]: demo_gen_obj.send(None) Out[4]: 0 In [5]: demo_gen_obj.send(None) Out[5]: 1 In [6]: demo_gen_obj.send(None) Out[6]: 2 In [7]: demo_gen_obj.send(None) --------------------------------------------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-7-f9922a2f64c9> in <module> ----> 1 demo_gen_obj.send(None) StopIteration:
通過(guò)yield from
就可以很方便的支持生成器調(diào)用, 假如把每個(gè)生成器函數(shù)都當(dāng)做一個(gè)協(xié)程, 那通過(guò)yield from
就可以很方便的實(shí)現(xiàn)協(xié)程間的調(diào)用, 此外生成器的拋出異常后的提醒非常人性化, 也支持throw
來(lái)拋出異常, 這樣我們就可以實(shí)現(xiàn)在協(xié)程運(yùn)行時(shí)設(shè)置異常, 比如Cancel
,演示代碼如下:
In [1]: def demo_exc(): ...: yield 1 ...: raise RuntimeError() ...: In [2]: def demo_exc_1(): ...: for i in range(3): ...: yield i ...: In [3]: demo_exc_gen = demo_exc() In [4]: demo_exc_gen.send(None) Out[4]: 1 In [5]: demo_exc_gen.send(None) --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-5-09fbb75fdf7d> in <module> ----> 1 demo_exc_gen.send(None) <ipython-input-1-69afbc1f9c19> in demo_exc() 1 def demo_exc(): 2 yield 1 ----> 3 raise RuntimeError() 4 RuntimeError: In [6]: demo_exc_gen_1 = demo_exc_1() In [7]: demo_exc_gen_1.send(None) Out[7]: 0 In [8]: demo_exc_gen_1.send(None) Out[8]: 1 In [9]: demo_exc_gen_1.throw(RuntimeError) --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-9-1a1cc55d71f4> in <module> ----> 1 demo_exc_gen_1.throw(RuntimeError) <ipython-input-2-2617b2366dce> in demo_exc_1() 1 def demo_exc_1(): 2 for i in range(3): ----> 3 yield i 4 RuntimeError:
從中可以看到在運(yùn)行中拋出異常時(shí), 會(huì)有一個(gè)非常清楚的拋錯(cuò), 可以明顯看出錯(cuò)誤堆棧, 同時(shí)throw
指定異常后, 會(huì)在下一處yield
拋出異常(所以協(xié)程調(diào)用Cancel
后不會(huì)馬上取消, 而是下一次調(diào)用的時(shí)候才被取消)。
3.2用生成器實(shí)現(xiàn)協(xié)程
我們已經(jīng)簡(jiǎn)單的了解到了生成器是非常的貼合協(xié)程的編程模型, 同時(shí)也知道哪些生成器API是我們需要的API, 接下來(lái)可以模仿Asyncio
的接口來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的協(xié)程。
首先是在Asyncio
中有一個(gè)封裝叫Feature
, 它用來(lái)表示協(xié)程正在等待將來(lái)時(shí)的結(jié)果, 以下是我根據(jù)asyncio.Feature
封裝的一個(gè)簡(jiǎn)單的Feature
, 它的API沒(méi)有asyncio.Feature
全, 代碼和注釋如下:
class Status: """用于判斷Future狀態(tài)""" pending: int = 1 finished: int = 2 cancelled: int = 3 class Future(object): def __init__(self) -> None: """初始化時(shí), Feature處理pending狀態(tài), 等待set result""" self.status: int = Status.pending self._result: Any = None self._exception: Optional[Exception] = None self._callbacks: List[Callable[['Future'], None]] = [] def add_done_callback(self, fn: [['Future'], None]Callable) -> None: """添加完成時(shí)的回調(diào)""" self._callbacks.append(fn) def cancel(self): """取消當(dāng)前的Feature""" if self.status != Status.pending: return False self.status = Status.cancelled for fn in self._callbacks: fn(self) return True def set_exception(self, exc: Exception) -> None: """設(shè)置異常""" if self.status != Status.pending: raise RuntimeError("Can not set exc") self._exception = exc self.status = Status.finished def set_result(self, result: Any) -> None: """設(shè)置結(jié)果""" if self.status != Status.pending: raise RuntimeError("Can not set result") self.status = Status.finished self._result = result for fn in self._callbacks: fn(self) def result(self): """獲取結(jié)果""" if self.status == Status.cancelled: raise asyncio.CancelledError elif self.status != Status.finished: raise RuntimeError("Result is not read") elif self._exception is not None: raise self._exception return self._result def __iter__(self): """通過(guò)生成器來(lái)模擬協(xié)程, 當(dāng)收到結(jié)果通知時(shí), 會(huì)返回結(jié)果""" if self.status == Status.pending: yield self return self.result()
在理解Future
時(shí), 可以把它假想為一個(gè)狀態(tài)機(jī), 在啟動(dòng)初始化的時(shí)候是peding
狀態(tài), 在運(yùn)行的時(shí)候我們可以切換它的狀態(tài), 并且通過(guò)__iter__
方法來(lái)支持調(diào)用者使用yield from Future()
來(lái)等待Future
本身, 直到收到了事件通知時(shí), 可以得到結(jié)果。
但是可以發(fā)現(xiàn)這個(gè)Future
是無(wú)法自我驅(qū)動(dòng), 調(diào)用了__iter__
的程序不知道何時(shí)被調(diào)用了set_result
, 在Asyncio
中是通過(guò)一個(gè)叫Task
的類來(lái)驅(qū)動(dòng)Future
, 它將一個(gè)協(xié)程的執(zhí)行過(guò)程安排好, 并負(fù)責(zé)在事件循環(huán)中執(zhí)行該協(xié)程。它主要有兩個(gè)方法:
- 1.初始化時(shí), 會(huì)先通過(guò)
send
方法激活生成器 - 2.后續(xù)被調(diào)度后馬上安排下一次等待, 除非拋出
StopIteration
異常
還有一個(gè)支持取消運(yùn)行托管協(xié)程的方法(在原代碼中, Task
是繼承于Future
, 所以Future
有的它都有), 經(jīng)過(guò)簡(jiǎn)化后的代碼如下:
class Task: def __init__(self, coro: Generator) -> None: # 初始化狀態(tài) self.cancelled: bool = False self.coro: Generator = coro # 預(yù)激一個(gè)普通的future f: Future = Future() f.set_result(None) self.step(f) def cancel(self) -> None: """用于取消托管的coro""" self.coro.throw(asyncio.CancelledError) def step(self, f: Future) -> None: """用于調(diào)用coro的下一步, 從第一次激活開(kāi)始, 每次都添加完成時(shí)的回調(diào), 直到遇到取消或者StopIteration異常""" try: _future = self.coro.send(f.result()) except asyncio.CancelledError: self.cancelled = True return except StopIteration: return _future.add_done_callback(self.step)
這樣Future
和Task
就封裝好了, 可以簡(jiǎn)單的試一試效果如何:
In [2]:def wait_future(f: Future, flag_int: int) -> Generator[Future, None, None]: ...: result = yield from f ...: print(flag_int, result) ...: ...:future: Future = Future() ...:for i in range(3): ...: coro = wait_future(future, i) ...: # 托管wait_future這個(gè)協(xié)程, 里面的Future也會(huì)通過(guò)yield from被托管 ...: Task(coro) ...: ...:print('ready') ...:future.set_result('ok') ...: ...:future = Future() ...:Task(wait_future(future, 3)).cancel() ...: ready 0 ok 1 ok 2 ok --------------------------------------------------------------------------- CancelledError Traceback (most recent call last) <ipython-input-2-2d1b04db2604> in <module> 12 13 future = Future() ---> 14 Task(wait_future(future, 3)).cancel() <ipython-input-1-ec3831082a88> in cancel(self) 81 82 def cancel(self) -> None: ---> 83 self.coro.throw(asyncio.CancelledError) 84 85 def step(self, f: Future) -> None: <ipython-input-2-2d1b04db2604> in wait_future(f, flag_int) 1 def wait_future(f: Future, flag_int: int) -> Generator[Future, None, None]: ----> 2 result = yield from f 3 print(flag_int, result) 4 5 future: Future = Future() <ipython-input-1-ec3831082a88> in __iter__(self) 68 """通過(guò)生成器來(lái)模擬協(xié)程, 當(dāng)收到結(jié)果通知時(shí), 會(huì)返回結(jié)果""" 69 if self.status == Status.pending: ---> 70 yield self 71 return self.result() 72 CancelledError:
這段程序會(huì)先初始化Future
, 并把Future
傳給wait_future
并生成生成器, 再交由給Task
托管, 預(yù)激, 由于Future
是在生成器函數(shù)wait_future
中通過(guò)yield from
與函數(shù)綁定的, 真正被預(yù)激的其實(shí)是Future
的__iter__
方法中的yield self
, 此時(shí)代碼邏輯會(huì)暫停在yield self
并返回。
在全部預(yù)激后, 通過(guò)調(diào)用Future
的set_result
方法, 使Future
變?yōu)榻Y(jié)束狀態(tài), 由于set_result
會(huì)執(zhí)行注冊(cè)的回調(diào), 這時(shí)它就會(huì)執(zhí)行托管它的Task
的step
方法中的send
方法, 代碼邏輯回到Future
的__iter__
方法中的yield self
, 并繼續(xù)往下走, 然后遇到return
返回結(jié)果, 并繼續(xù)走下去, 從輸出可以發(fā)現(xiàn)程序封裝完成且打印了ready
后, 會(huì)依次打印對(duì)應(yīng)的返回結(jié)果, 而在最后一個(gè)的測(cè)試cancel
方法中可以看到,Future
拋出異常了, 同時(shí)這些異常很容易看懂, 能夠追隨到調(diào)用的地方。
現(xiàn)在Future
和Task
正常運(yùn)行了, 可以跟我們一開(kāi)始執(zhí)行的程序進(jìn)行整合, 代碼如下:
class HttpRequest(object): def __init__(self, host: str): """初始化變量和sock""" self._host: str = host global running_cnt running_cnt += 1 self.url: str = f"http://{host}" self.sock: socket.SocketType = socket.socket() self.sock.setblocking(False) try: self.sock.connect((host, 80)) except BlockingIOError: pass def read(self) -> Generator[Future, None, bytes]: """從socket獲取響應(yīng)數(shù)據(jù), 并set到Future中, 并通過(guò)Future.__iter__方法或得到數(shù)據(jù)并通過(guò)變量chunk_future返回""" f: Future = Future() selector.register(self.sock.fileno(), EVENT_READ, lambda: f.set_result(self.sock.recv(4096))) chunk_future = yield from f selector.unregister(self.sock.fileno()) return chunk_future # type: ignore def read_response(self) -> Generator[Future, None, bytes]: """接收響應(yīng)參數(shù), 并判斷請(qǐng)求是否結(jié)束""" response_bytes: bytes = b"" chunk = yield from self.read() while chunk: response_bytes += chunk chunk = yield from self.read() return response_bytes def connected(self) -> Generator[Future, None, None]: """socket建立連接時(shí)的回調(diào)""" # 取消監(jiān)聽(tīng) f: Future = Future() selector.register(self.sock.fileno(), EVENT_WRITE, lambda: f.set_result(None)) yield f selector.unregister(self.sock.fileno()) print(f"{self._host} connect success") def request(self) -> Generator[Future, None, None]: # 發(fā)送請(qǐng)求, 并監(jiān)聽(tīng)讀事件, 以及注冊(cè)對(duì)應(yīng)的接收響應(yīng)函數(shù) yield from self.connected() self.sock.send(f"GET {self.url} HTTP/1.0\r\nHost: {self._host}\r\n\r\n".encode("ascii")) response = yield from self.read_response() print(f"request {self._host} success, length:{len(response)}") global running_cnt running_cnt -= 1 if __name__ == "__main__": # 同時(shí)多個(gè)請(qǐng)求 Task(HttpRequest("so1n.me").request()) Task(HttpRequest("github.com").request()) Task(HttpRequest("google.com").request()) Task(HttpRequest("baidu.com").request()) # 監(jiān)聽(tīng)是否有事件在運(yùn)行 while running_cnt > 0: # 等待事件循環(huán)通知事件是否已經(jīng)完成 for key, mask in selector.select(): key.data()
這段代碼通過(guò)Future
和生成器方法盡量的解耦回調(diào)函數(shù), 如果忽略了HttpRequest
中的connected
和read
方法則可以發(fā)現(xiàn)整段代碼跟同步的代碼基本上是一樣的, 只是通過(guò)yield
和yield from
交出控制權(quán)和通過(guò)事件循環(huán)恢復(fù)控制權(quán)。 同時(shí)通過(guò)上面的異常例子可以發(fā)現(xiàn)異常排查非常的方便, 這樣一來(lái)就沒(méi)有了回調(diào)的各種糟糕的事情, 開(kāi)發(fā)者只需要按照同步的思路進(jìn)行開(kāi)發(fā)即可, 不過(guò)我們的事件循環(huán)是一個(gè)非常簡(jiǎn)單的事件循環(huán)例子, 同時(shí)對(duì)于socket相關(guān)都沒(méi)有進(jìn)行封裝, 也缺失一些常用的API, 而這些都會(huì)被Python
官方封裝到Asyncio
這個(gè)庫(kù)中, 通過(guò)該庫(kù), 我們可以近乎完美的編寫Async
語(yǔ)法的代碼。
NOTE: 由于生成器協(xié)程中無(wú)法通過(guò)
yield from
語(yǔ)法使用生成器, 所以Python
在3.5之后使用了Await
的原生協(xié)程。
到此這篇關(guān)于Python中Async語(yǔ)法協(xié)程的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Python協(xié)程內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章

利用python操作SQLite數(shù)據(jù)庫(kù)及文件操作詳解

python動(dòng)態(tài)進(jìn)度條的實(shí)現(xiàn)代碼

帶你徹底搞懂python操作mysql數(shù)據(jù)庫(kù)(cursor游標(biāo)講解)

python實(shí)現(xiàn)簡(jiǎn)單登陸流程的方法

pytorch加載語(yǔ)音類自定義數(shù)據(jù)集的方法教程

使用python如何提取JSON數(shù)據(jù)指定內(nèi)容

使用Python實(shí)現(xiàn)圖像標(biāo)記點(diǎn)的坐標(biāo)輸出功能