python 并發(fā)編程 非阻塞IO模型原理解析
非阻塞IO(non-blocking IO)
Linux下,可以通過設(shè)置socket使其變?yōu)閚on-blocking。當(dāng)對一個(gè)non-blocking socket執(zhí)行讀操作時(shí),流程是這個(gè)樣子:
從圖中可以看出,當(dāng)用戶進(jìn)程發(fā)出read操作時(shí),如果kernel中的數(shù)據(jù)還沒有準(zhǔn)備好,那么它并不會block用戶進(jìn)程,而是立刻返回一個(gè)error。從用戶進(jìn)程角度講 ,它發(fā)起一個(gè)read操作后,并不需要等待,而是馬上就得到了一個(gè)結(jié)果。用戶進(jìn)程判斷結(jié)果是一個(gè)error時(shí),它就知道數(shù)據(jù)還沒有準(zhǔn)備好,于是用戶就可以在本次到下次再發(fā)起read詢問的時(shí)間間隔內(nèi)做其他事情,或者直接再次發(fā)送read操作。一旦kernel中的數(shù)據(jù)準(zhǔn)備好了,并且又再次收到了用戶進(jìn)程的system call,那么它馬上就將數(shù)據(jù)拷貝到了用戶內(nèi)存(這一階段仍然是阻塞的,這段是本地拷貝,copy data ),然后返回。
也就是說非阻塞的recvform系統(tǒng)調(diào)用調(diào)用之后,進(jìn)程并沒有被阻塞,內(nèi)核馬上返回給進(jìn)程,如果數(shù)據(jù)還沒準(zhǔn)備好,
此時(shí)會返回一個(gè)error。進(jìn)程在返回之后,可以干點(diǎn)別的事情,然后再發(fā)起recvform系統(tǒng)調(diào)用。重復(fù)上面的過程,
循環(huán)往復(fù)的進(jìn)行recvform系統(tǒng)調(diào)用。這個(gè)過程通常被稱之為輪詢。輪詢檢查內(nèi)核數(shù)據(jù),直到數(shù)據(jù)準(zhǔn)備好,再拷貝數(shù)據(jù)到進(jìn)程,
進(jìn)行數(shù)據(jù)處理。需要注意,拷貝數(shù)據(jù)整個(gè)過程,進(jìn)程仍然是屬于阻塞的狀態(tài)。
所以,在非阻塞式IO中,用戶進(jìn)程其實(shí)是需要不斷的主動詢問kernel操作系統(tǒng)內(nèi)存 數(shù)據(jù)準(zhǔn)備好了沒有。
非阻塞IO示例
- 設(shè)置socket接口為 非阻塞IO接口
- 默認(rèn)是True 為阻塞
- server.setblocking(False)
- 處理一下這個(gè)異常
BlockingIOError: [WinError 10035] 無法立即完成一個(gè)非阻止性套接字操作。
from socket import * server = socket(AF_INET,SOCK_STREAM) server.bind(('127.0.0.1',8000)) server.listen(5) # 設(shè)置socket接口為 非阻塞IO接口 # 默認(rèn)是True 為阻塞 server.setblocking(False) print("starting...") while True: try: conn,addr = server.accept() print(addr) except BlockingIOError: print("干其他的工作") server.close()
執(zhí)行結(jié)果,如上面的圖,一直返回error消息
starting... 干其他的工作 干其他的工作 干其他的工作 干其他的工作
服務(wù)端 可以與 多個(gè)客戶端建立連接,實(shí)現(xiàn)服務(wù)端可以不停的建立連接
from socket import * server = socket(AF_INET,SOCK_STREAM) server.bind(('127.0.0.1',8000)) server.listen(5) # 設(shè)置socket接口為 非阻塞IO接口 # 默認(rèn)是True 為阻塞 server.setblocking(False) r_list = [] print("starting...") while True: try: conn,addr = server.accept() r_list.append(conn) print(r_list) except BlockingIOError: pass server.close()
起三個(gè)客戶端與服務(wù)端建立連接
r_list 存著所有建立的連接
有連接來,就建立連接,沒有連接來,就拋出異常
實(shí)現(xiàn)IO非阻塞 并發(fā) 多個(gè)連接
from socket import * server = socket(AF_INET,SOCK_STREAM) server.bind(('127.0.0.1',8000)) server.listen(5) # 設(shè)置socket接口為 非阻塞IO接口 # 默認(rèn)是True 為阻塞 server.setblocking(False) r_list = [] print("starting...") while True: try: conn,addr = server.accept() r_list.append(conn) print(r_list) except BlockingIOError: # 定義刪除連接列表 del_rlist = [] for conn in r_list: try: data = conn.recv(1024) # 收空數(shù)據(jù)時(shí)候 if not data: del_rlist.append(conn) continue conn.send(data.upper()) # 沒有連接,拋出異常,就結(jié)束這次循環(huán),繼續(xù) except BlockingIOError: continue # 套接字出現(xiàn)異常,客戶端單方面連接斷開 except Exception: conn.close() del_rlist.append(conn) break # 結(jié)束上面循環(huán)之后,循環(huán)del_list 連接元素 刪除連接 for conn in del_rlist: del_rlist.remove(conn) server.close()
BUG:send也是IO阻塞接口
當(dāng)send在數(shù)據(jù)量過大時(shí)候,也會阻塞。
send操作是,把應(yīng)用程序把數(shù)據(jù)發(fā)送到操作系統(tǒng)緩存區(qū)里,而操作系統(tǒng)緩存區(qū)空間也是有限的。緩存區(qū)也會滿了,后面還有數(shù)據(jù)需要發(fā)送,那只能等緩存區(qū)清掉數(shù)據(jù),有空間了,才能發(fā)送數(shù)據(jù)。所以在這里緩存區(qū)滿了,就阻塞。
修改后服務(wù)端的代碼 可以自己檢測IO,遇到IO切換單個(gè)線程的其他任務(wù),去運(yùn)行,實(shí)現(xiàn)單線程并發(fā)
from socket import * server = socket(AF_INET,SOCK_STREAM) server.bind(('127.0.0.1',8000)) server.listen(5) # 設(shè)置socket接口為 非阻塞IO接口 # 默認(rèn)是True 為阻塞 server.setblocking(False) r_list = [] w_list = [] print("starting...") while True: try: conn,addr = server.accept() r_list.append(conn) print(r_list) except BlockingIOError: # 收消息 # 定義刪除連接列表 del_rlist = [] for conn in r_list: try: data = conn.recv(1024) # 收空數(shù)據(jù)時(shí)候 if not data: del_rlist.append(conn) continue '''加入元祖 元祖有兩個(gè)元素 1.存放套接字連接 2.準(zhǔn)備要發(fā)送的的數(shù)據(jù) ''' w_list.append((conn, data.upper())) # 沒有連接,拋出異常,就結(jié)束這次循環(huán),繼續(xù) except BlockingIOError: continue # 套接字出現(xiàn)異常,客戶端單方面連接斷開 except Exception: conn.close() del_rlist.append(conn) break # 發(fā)消息 # 用于 發(fā)成功數(shù)據(jù)后,刪除套接字連接的列表 del_wlist = [] for item in w_list: try: conn = item[0] data = item[1] conn.send(data) # 發(fā)成功后,從列表刪除連接 del_wlist.append(item) # send 有可能出現(xiàn)異常 沒發(fā)完情況 except BlockingIOError: pass # 結(jié)束上面循環(huán)之后,循環(huán)del_wlist 連接元素 刪除連接 for item in del_wlist: del_wlist.remove(item) # 結(jié)束上面循環(huán)之后,循環(huán)del_rlist 連接元素 刪除連接 for conn in del_rlist: del_rlist.remove(conn) server.close()
這就是非阻塞IO
但是非阻塞IO模型絕不被推薦。
我們不能否則其優(yōu)點(diǎn):能夠在等待任務(wù)完成的時(shí)間里干其他活了(包括提交其他任務(wù),也就是 “后臺” 可以有多個(gè)任務(wù)在“”同時(shí)“”執(zhí)行)。
干其他活時(shí)候,有可能來新的連接,新的連接來了,不能及時(shí)響應(yīng)與該新的連接,建立連接。所以會導(dǎo)致問題:數(shù)據(jù)不會及時(shí)響應(yīng)
但是也難掩其缺點(diǎn):
1. 循環(huán)調(diào)用recv()將大幅度推高CPU占用率;這也是我們在代碼中留一句time.sleep(2)的原因,否則在低配主機(jī)下極容易出現(xiàn)卡機(jī)情況
2. 任務(wù)完成的響應(yīng)延遲增大了,因?yàn)槊窟^一段時(shí)間才去輪詢一次read操作,而任務(wù)可能在兩次輪詢之間的任意時(shí)間完成。
這會導(dǎo)致整體數(shù)據(jù)吞吐量的降低。
3.死循環(huán)While True會導(dǎo)致CPU的無用的耗用、占用
此外,在這個(gè)方案中recv()更多的是起到檢測“操作是否完成”的作用,實(shí)際操作系統(tǒng)提供了更為高效的檢測“操作是否完成“作用的接口,例如select()多路復(fù)用模式,可以一次檢測多個(gè)連接是否活躍
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Python 比較兩個(gè) CSV 文件的三種方法并打印出差異
這篇文章主要介紹了Python 比較兩個(gè) CSV 文件并打印出差異,本文將討論比較兩個(gè) CSV 文件的各種方法,我們將包括執(zhí)行此操作的最“Pythonic”方式和可幫助簡化此任務(wù)的外部 Python 模塊,需要的朋友可以參考下2023-06-06Python實(shí)現(xiàn)發(fā)送QQ郵件的封裝
這篇文章主要為大家詳細(xì)介紹了Python實(shí)現(xiàn)發(fā)送QQ郵件的具體代碼,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-07-07python編程簡單幾行代碼實(shí)現(xiàn)視頻轉(zhuǎn)換Gif示例
這篇文章主要為大家介紹了簡單使用幾行python代碼就可以實(shí)現(xiàn)將視頻轉(zhuǎn)換Gif的示例過程,有需要的朋友可以借鑒參考下,希望能夠有所幫助2021-10-10