Python通過future處理并發(fā)問題
future初識
通過下面腳本來對future進(jìn)行一個初步了解:
例子1:普通通過循環(huán)的方式
import os import time import sys import requests POP20_CC = ( "CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR" ).split() BASE_URL = 'http://flupy.org/data/flags' DEST_DIR = 'downloads/' def save_flag(img,filename): path = os.path.join(DEST_DIR,filename) with open(path,'wb') as fp: fp.write(img) def get_flag(cc): url = "{}/{cc}/{cc}.gif".format(BASE_URL,cc=cc.lower()) resp = requests.get(url) return resp.content def show(text): print(text,end=" ") sys.stdout.flush() def download_many(cc_list): for cc in sorted(cc_list): image = get_flag(cc) show(cc) save_flag(image,cc.lower()+".gif") return len(cc_list) def main(download_many): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time()-t0 msg = "\n{} flags downloaded in {:.2f}s" print(msg.format(count,elapsed)) if __name__ == '__main__': main(download_many)
例子2:通過future方式實(shí)現(xiàn),這里對上面的部分代碼進(jìn)行了復(fù)用
from concurrent import futures from flags import save_flag, get_flag, show, main MAX_WORKERS = 20 def download_one(cc): image = get_flag(cc) show(cc) save_flag(image, cc.lower()+".gif") return cc def download_many(cc_list): workers = min(MAX_WORKERS,len(cc_list)) with futures.ThreadPoolExecutor(workers) as executor: res = executor.map(download_one, sorted(cc_list)) return len(list(res)) if __name__ == '__main__': main(download_many)
分別運(yùn)行三次,兩者的平均速度:13.67和1.59s,可以看到差別還是非常大的。
future
future是concurrent.futures模塊和asyncio模塊的重要組件
從python3.4開始標(biāo)準(zhǔn)庫中有兩個名為Future的類:concurrent.futures.Future和asyncio.Future
這兩個類的作用相同:兩個Future類的實(shí)例都表示可能完成或者尚未完成的延遲計算。與Twisted中的Deferred類、Tornado框架中的Future類的功能類似
注意:通常情況下自己不應(yīng)該創(chuàng)建future,而是由并發(fā)框架(concurrent.futures或asyncio)實(shí)例化
原因:future表示終將發(fā)生的事情,而確定某件事情會發(fā)生的唯一方式是執(zhí)行的時間已經(jīng)安排好,因此只有把某件事情交給concurrent.futures.Executor子類處理時,才會創(chuàng)建concurrent.futures.Future實(shí)例。
如:Executor.submit()方法的參數(shù)是一個可調(diào)用的對象,調(diào)用這個方法后會為傳入的可調(diào)用對象排定時間,并返回一個
future
客戶端代碼不能應(yīng)該改變future的狀態(tài),并發(fā)框架在future表示的延遲計算結(jié)束后會改變期物的狀態(tài),我們無法控制計算何時結(jié)束。
這兩種future都有.done()方法,這個方法不阻塞,返回值是布爾值,指明future鏈接的可調(diào)用對象是否已經(jīng)執(zhí)行??蛻舳舜a通常不會詢問future是否運(yùn)行結(jié)束,而是會等待通知。因此兩個Future類都有.add_done_callback()方法,這個方法只有一個參數(shù),類型是可調(diào)用的對象,future運(yùn)行結(jié)束后會調(diào)用指定的可調(diào)用對象。
.result()方法是在兩個Future類中的作用相同:返回可調(diào)用對象的結(jié)果,或者重新拋出執(zhí)行可調(diào)用的對象時拋出的異常。但是如果future沒有運(yùn)行結(jié)束,result方法在兩個Futrue類中的行為差別非常大。
對concurrent.futures.Future實(shí)例來說,調(diào)用.result()方法會阻塞調(diào)用方所在的線程,直到有結(jié)果可返回,此時,result方法可以接收可選的timeout參數(shù),如果在指定的時間內(nèi)future沒有運(yùn)行完畢,會拋出TimeoutError異常。
而asyncio.Future.result方法不支持設(shè)定超時時間,在獲取future結(jié)果最好使用yield from結(jié)構(gòu),但是concurrent.futures.Future不能這樣做
不管是asyncio還是concurrent.futures.Future都會有幾個函數(shù)是返回future,其他函數(shù)則是使用future,在最開始的例子中我們使用的Executor.map就是在使用future,返回值是一個迭代器,迭代器的__next__方法調(diào)用各個future的result方法,因此我們得到的是各個futrue的結(jié)果,而不是future本身
關(guān)于future.as_completed函數(shù)的使用,這里我們用了兩個循環(huán),一個用于創(chuàng)建并排定future,另外一個用于獲取future的結(jié)果
from concurrent import futures from flags import save_flag, get_flag, show, main MAX_WORKERS = 20 def download_one(cc): image = get_flag(cc) show(cc) save_flag(image, cc.lower()+".gif") return cc def download_many(cc_list): cc_list = cc_list[:5] with futures.ThreadPoolExecutor(max_workers=3) as executor: to_do = [] for cc in sorted(cc_list): future = executor.submit(download_one,cc) to_do.append(future) msg = "Secheduled for {}:{}" print(msg.format(cc,future)) results = [] for future in futures.as_completed(to_do): res = future.result() msg = "{}result:{!r}" print(msg.format(future,res)) results.append(res) return len(results) if __name__ == '__main__': main(download_many)
結(jié)果如下:
注意:Python代碼是無法控制GIL,標(biāo)準(zhǔn)庫中所有執(zhí)行阻塞型IO操作的函數(shù),在等待操作系統(tǒng)返回結(jié)果時都會釋放GIL.運(yùn)行其他線程執(zhí)行,也正是因?yàn)檫@樣,Python線程可以在IO密集型應(yīng)用中發(fā)揮作用
以上都是concurrent.futures啟動線程,下面通過它啟動進(jìn)程
concurrent.futures啟動進(jìn)程
concurrent.futures中的ProcessPoolExecutor類把工作分配給多個Python進(jìn)程處理,因此,如果需要做CPU密集型處理,使用這個模塊能繞開GIL,利用所有的CPU核心。
其原理是一個ProcessPoolExecutor創(chuàng)建了N個獨(dú)立的Python解釋器,N是系統(tǒng)上面可用的CPU核數(shù)。
使用方法和ThreadPoolExecutor方法一樣
總結(jié)
相關(guān)文章
Python打包工具PyInstaller的安裝與pycharm配置支持PyInstaller詳細(xì)方法
這篇文章主要介紹了Python打包工具PyInstaller的安裝與pycharm配置支持PyInstaller詳細(xì)方法,需要的朋友可以參考下2020-02-02Scrapy將數(shù)據(jù)保存到Excel和MySQL中的方法實(shí)現(xiàn)
本文主要介紹了Scrapy將數(shù)據(jù)保存到Excel和MySQL中的方法實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-02-02基于python實(shí)現(xiàn)的抓取騰訊視頻所有電影的爬蟲
這篇文章主要介紹了用python實(shí)現(xiàn)的抓取騰訊視頻所有電影的爬蟲,這個程序使用芒果存, 所以大家需要下載使用mongodb才可以2016-04-04Python隨機(jī)生成一個6位的驗(yàn)證碼代碼分享
這篇文章主要介紹了Python隨機(jī)生成一個6位的驗(yàn)證碼代碼分享,本文直接給出代碼實(shí)例,需要的朋友可以參考下2015-03-03Python常見庫matplotlib學(xué)習(xí)筆記之多個子圖繪圖
Matplotlib是Python提供的一個繪圖庫,通過該庫我們可以很容易的繪制出折線圖、直方圖、散點(diǎn)圖、餅圖等豐富的統(tǒng)計圖,下面這篇文章主要給大家介紹了關(guān)于Python常見庫matplotlib學(xué)習(xí)筆記之多個子圖繪圖的相關(guān)資料,需要的朋友可以參考下2023-05-05