python并發(fā)和異步編程實例
關(guān)于并發(fā)、并行、同步阻塞、異步非阻塞、線程、進程、協(xié)程等這些概念,單純通過文字恐怕很難有比較深刻的理解,本文就通過代碼一步步實現(xiàn)這些并發(fā)和異步編程,并進行比較。解釋器方面本文選擇python3,畢竟python3才是python的未來,并且python3用原生的庫實現(xiàn)協(xié)程已經(jīng)非常方便了。
1、準備階段
下面為所有測試代碼所需要的包
#! python3 # coding:utf-8 import socket from concurrent import futures from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ import asyncio import aiohttp import time from time import ctime
在進行不同實現(xiàn)方式的比較時,實現(xiàn)場景就是在進行爬蟲開發(fā)的時候通過向?qū)Ψ骄W(wǎng)站發(fā)起一系列的http請求訪問,統(tǒng)計耗時來判斷實現(xiàn)方式的優(yōu)劣,具體地,通過建立通信套接字,訪問新浪主頁,返回源碼,作為一次請求。先實現(xiàn)一個裝飾器用來統(tǒng)計函數(shù)的執(zhí)行時間:
def tsfunc(func): def wrappedFunc(*args,**kargs): start = time.clock() action = func(*args,**kargs) time_delta = time.clock() - start print ('[{0}] {1}() called, time delta: {2}'.format(ctime(),func.__name__,time_delta)) return action return wrappedFunc
輸出的格式為:當(dāng)前時間,調(diào)用的函數(shù),函數(shù)的執(zhí)行時間。
2、阻塞/非阻塞和同步/異步
這兩對概念不是很好區(qū)分,從定義上理解:
阻塞:在進行socket通信過程中,一個線程發(fā)起請求,如果當(dāng)前請求沒有返回結(jié)果,則進入sleep狀態(tài),期間線程掛起不能做其他操作,直到有返回結(jié)果,或者超時(如果設(shè)置超時的話)。
非阻塞:與阻塞相似,只不過在等待請求結(jié)果時,線程并不掛起而是進行其他操作,即在不能立刻得到結(jié)果之前,該函數(shù)不會阻掛起當(dāng)前線程,而會立刻返回。
同步:同步和阻塞比較相似,但是二者并不是同一個概念,同步是指完成事件的邏輯,是指一件事完成之后,再完成第二件事,以此類推…
異步:異步和非阻塞比較類似,異步的概念和同步相對。當(dāng)一個異步過程調(diào)用發(fā)出后,調(diào)用者不能立刻得到結(jié)果。實際處理這個調(diào)用的部件在完成后,通過狀態(tài)、通知和回調(diào)來通知調(diào)用者,實現(xiàn)異步的方式通俗講就是“等會再告訴你”。
1)阻塞方式
回到代碼上,首先實現(xiàn)阻塞方式的請求函數(shù):
def blocking_way(): sock = socket.socket() sock.connect(('www.sina.com',80)) request = 'GET / HTTP/1.0\r\nHOST:www.sina.com\r\n\r\n' sock.send(request.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) return response
測試線程、多進程和多線程
# 阻塞無并發(fā) @tsfunc def sync_way(): res = [] for i in range(10): res.append(blocking_way()) return len(res) @tsfunc # 阻塞、多進程 def process_way(): worker = 10 with futures.ProcessPoolExecutor(worker) as executor: futs = {executor.submit(blocking_way) for i in range(10)} return len([fut.result() for fut in futs]) # 阻塞、多線程 @tsfunc def thread_way(): worker = 10 with futures.ThreadPoolExecutor(worker) as executor: futs = {executor.submit(blocking_way) for i in range(10)} return len([fut.result() for fut in futs])
運行結(jié)果:
[Wed Dec 13 16:52:25 2017] sync_way() called, time delta: 0.06371647809425328 [Wed Dec 13 16:52:28 2017] process_way() called, time delta: 2.31437644946734 [Wed Dec 13 16:52:28 2017] thread_way() called, time delta: 0.010172946070299727
可見與非并發(fā)的方式相比,啟動10個進程完成10次請求訪問耗費的時間最長,進程確實需要很大的系統(tǒng)開銷,相比多線程則效果好得多,啟動10個線程并發(fā)請求,比順序請求速度快了6倍左右。
2)非阻塞方式
實現(xiàn)非阻塞的請求代碼,與阻塞方式的區(qū)別在于等待請求時并不掛起而是直接返回,為了確保能正確讀取消息,最原始的方式就是循環(huán)讀取,知道讀取完成為跳出循環(huán),代碼如下:
def nonblocking_way(): sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sock.setblocking(False) try: sock.connect(('www.sina.com', 80)) except BlockingIOError: pass request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n' data = request.encode('ascii') while True: try: sock.send(data) break except OSError: pass response = b'' while True: try: chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) break except OSError: pass return response
測試單線程異步非阻塞方式:
@tsfunc def async_way(): res = [] for i in range(10): res.append(nonblocking_way()) return len(res)
測試結(jié)果與單線程同步阻塞方式相比:
[Wed Dec 13 17:18:30 2017] sync_way() called, time delta: 0.07342884475822574 [Wed Dec 13 17:18:30 2017] async_way() called, time delta: 0.06509009095694886
非阻塞方式起到了一定的效果,但是并不明顯,原因肯定是讀取消息的時候雖然不是在線程掛起的時候而是在循環(huán)讀取消息的時候浪費了時間,如果大部分時間讀浪費了并沒有發(fā)揮異步編程的威力,解決的辦法就是后面要說的【事件驅(qū)動】
3、回調(diào)、生成器和協(xié)程
a、回調(diào)
class Crawler(): def __init__(self,url): self.url = url self.sock = None self.response = b'' def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('www.sina.com',80)) except BlockingIOError: pass selector.register(self.sock.fileno(),EVENT_WRITE,self.connected) def connected(self,key,mask): selector.unregister(key.fd) get = 'GET {0} HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url) self.sock.send(get.encode('ascii')) selector.register(key.fd,EVENT_READ,self.read_response) def read_response(self,key,mask): global stopped while True: try: chunk = self.sock.recv(4096) if chunk: self.response += chunk chunk = self.sock.recv(4096) else: selector.unregister(key.fd) urls_todo.remove(self.url) if not urls_todo: stopped = True break except: pass def loop(): while not stopped: events = selector.select() for event_key,event_mask in events: callback = event_key.data callback(event_key,event_mask) @tsfunc def callback_way(): for url in urls_todo: crawler = Crawler(url) crawler.fetch() loop1()
這是通過傳統(tǒng)回調(diào)方式實現(xiàn)的異步編程,結(jié)果如下:
[Tue Mar 27 17:52:49 2018] callback_way() called, time delta: 0.054735804048789374
b、生成器
class Crawler2: def __init__(self, url): self.url = url self.response = b'' def fetch(self): global stopped sock = socket.socket() yield from connect(sock, ('www.sina.com', 80)) get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url) sock.send(get.encode('ascii')) self.response = yield from read_all(sock) urls_todo.remove(self.url) if not urls_todo: stopped = True class Task: def __init__(self, coro): self.coro = coro f = Future1() f.set_result(None) self.step(f) def step(self, future): try: # send會進入到coro執(zhí)行, 即fetch, 直到下次yield # next_future 為yield返回的對象 next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) def loop1(): while not stopped: events = selector.select() for event_key,event_mask in events: callback = event_key.data callback()
運行結(jié)果如下:
[Tue Mar 27 17:54:27 2018] generate_way() called, time delta: 0.2914336347673473
c、協(xié)程
def nonblocking_way(): sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sock.setblocking(False) try: sock.connect(('www.sina.com', 80)) except BlockingIOError: pass request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n' data = request.encode('ascii') while True: try: sock.send(data) break except OSError: pass response = b'' while True: try: chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) break except OSError: pass return response @tsfunc def asyncio_way(): tasks = [fetch(host+url) for url in urls_todo] loop.run_until_complete(asyncio.gather(*tasks)) return (len(tasks))
運行結(jié)果:
[Tue Mar 27 17:56:17 2018] asyncio_way() called, time delta: 0.43688060698484166
到此終于把并發(fā)和異步編程實例代碼測試完,下邊貼出全部代碼,共讀者自行測試,在任務(wù)量加大時,相信結(jié)果會大不一樣。
#! python3 # coding:utf-8 import socket from concurrent import futures from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ import asyncio import aiohttp import time from time import ctime def tsfunc(func): def wrappedFunc(*args,**kargs): start = time.clock() action = func(*args,**kargs) time_delta = time.clock() - start print ('[{0}] {1}() called, time delta: {2}'.format(ctime(),func.__name__,time_delta)) return action return wrappedFunc def blocking_way(): sock = socket.socket() sock.connect(('www.sina.com',80)) request = 'GET / HTTP/1.0\r\nHOST:www.sina.com\r\n\r\n' sock.send(request.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) return response def nonblocking_way(): sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sock.setblocking(False) try: sock.connect(('www.sina.com', 80)) except BlockingIOError: pass request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n' data = request.encode('ascii') while True: try: sock.send(data) break except OSError: pass response = b'' while True: try: chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) break except OSError: pass return response selector = DefaultSelector() stopped = False urls_todo = ['/','/1','/2','/3','/4','/5','/6','/7','/8','/9'] class Crawler(): def __init__(self,url): self.url = url self.sock = None self.response = b'' def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('www.sina.com',80)) except BlockingIOError: pass selector.register(self.sock.fileno(),EVENT_WRITE,self.connected) def connected(self,key,mask): selector.unregister(key.fd) get = 'GET {0} HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url) self.sock.send(get.encode('ascii')) selector.register(key.fd,EVENT_READ,self.read_response) def read_response(self,key,mask): global stopped while True: try: chunk = self.sock.recv(4096) if chunk: self.response += chunk chunk = self.sock.recv(4096) else: selector.unregister(key.fd) urls_todo.remove(self.url) if not urls_todo: stopped = True break except: pass def loop(): while not stopped: events = selector.select() for event_key,event_mask in events: callback = event_key.data callback(event_key,event_mask) # 基于生成器的協(xié)程 class Future: def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self,fn): self._callbacks.append(fn) def set_result(self,result): self.result = result for fn in self._callbacks: fn(self) class Crawler1(): def __init__(self,url): self.url = url self.response = b'' def fetch(self): sock = socket.socket() sock.setblocking(False) try: sock.connect(('www.sina.com',80)) except BlockingIOError: pass f = Future() def on_connected(): f.set_result(None) selector.register(sock.fileno(),EVENT_WRITE,on_connected) yield f selector.unregister(sock.fileno()) get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url) sock.send(get.encode('ascii')) global stopped while True: f = Future() def on_readable(): f.set_result(sock.recv(4096)) selector.register(sock.fileno(),EVENT_READ,on_readable) chunk = yield f selector.unregister(sock.fileno()) if chunk: self.response += chunk else: urls_todo.remove(self.url) if not urls_todo: stopped = True break # yield from 改進的生成器協(xié)程 class Future1: def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self,fn): self._callbacks.append(fn) def set_result(self,result): self.result = result for fn in self._callbacks: fn(self) def __iter__(self): yield self return self.result def connect(sock, address): f = Future1() sock.setblocking(False) try: sock.connect(address) except BlockingIOError: pass def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield from f selector.unregister(sock.fileno()) def read(sock): f = Future1() def on_readable(): f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield from f selector.unregister(sock.fileno()) return chunk def read_all(sock): response = [] chunk = yield from read(sock) while chunk: response.append(chunk) chunk = yield from read(sock) return b''.join(response) class Crawler2: def __init__(self, url): self.url = url self.response = b'' def fetch(self): global stopped sock = socket.socket() yield from connect(sock, ('www.sina.com', 80)) get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url) sock.send(get.encode('ascii')) self.response = yield from read_all(sock) urls_todo.remove(self.url) if not urls_todo: stopped = True class Task: def __init__(self, coro): self.coro = coro f = Future1() f.set_result(None) self.step(f) def step(self, future): try: # send會進入到coro執(zhí)行, 即fetch, 直到下次yield # next_future 為yield返回的對象 next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) def loop1(): while not stopped: events = selector.select() for event_key,event_mask in events: callback = event_key.data callback() # asyncio 協(xié)程 host = 'http://www.sina.com' loop = asyncio.get_event_loop() async def fetch(url): async with aiohttp.ClientSession(loop=loop) as session: async with session.get(url) as response: response = await response.read() return response @tsfunc def asyncio_way(): tasks = [fetch(host+url) for url in urls_todo] loop.run_until_complete(asyncio.gather(*tasks)) return (len(tasks)) @tsfunc def sync_way(): res = [] for i in range(10): res.append(blocking_way()) return len(res) @tsfunc def process_way(): worker = 10 with futures.ProcessPoolExecutor(worker) as executor: futs = {executor.submit(blocking_way) for i in range(10)} return len([fut.result() for fut in futs]) @tsfunc def thread_way(): worker = 10 with futures.ThreadPoolExecutor(worker) as executor: futs = {executor.submit(blocking_way) for i in range(10)} return len([fut.result() for fut in futs]) @tsfunc def async_way(): res = [] for i in range(10): res.append(nonblocking_way()) return len(res) @tsfunc def callback_way(): for url in urls_todo: crawler = Crawler(url) crawler.fetch() loop1() @tsfunc def generate_way(): for url in urls_todo: crawler = Crawler2(url) Task(crawler.fetch()) loop1() if __name__ == '__main__': #sync_way() #process_way() #thread_way() #async_way() #callback_way() #generate_way() asyncio_way()
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
python 實現(xiàn)數(shù)組list 添加、修改、刪除的方法
下面小編就為大家分享一篇python 實現(xiàn)數(shù)組list 添加、修改、刪除的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-04-04Django Admin中增加導(dǎo)出Excel功能過程解析
這篇文章主要介紹了Django Admin中增加導(dǎo)出Excel功能過程解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-09-09