Python使用future處理并發(fā)問題方案詳解
從Python3.2引入的concurrent.futures模塊,Python2.5以上需要在pypi中安裝futures包。
future指一種對象,表示異步執(zhí)行的操作。這個(gè)概念的作用很大,是concurrent.futures模塊和asyncio包的基礎(chǔ)。
網(wǎng)絡(luò)下載的三種風(fēng)格
為了高效的處理網(wǎng)絡(luò)IO,需要使用并發(fā),因?yàn)榫W(wǎng)絡(luò)有很高的延遲,所以為了不浪費(fèi)CPU周期去等待,最好再收到網(wǎng)絡(luò)響應(yīng)之前去做其他的事情。
下面有三種示例程序,
第一個(gè)程序是依序下載的,第二個(gè)是使用theadpool來自concurrent.futures模塊,第三個(gè)是使用asyncio包
按照順序下載
下面示例是依次下載
import os import time import sys import requests POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split() BASE_URL = 'http://flupy.org/data/flags' DEST_DIR = 'downloads/' def sava_flag(img, filename): path = os.path.join(DEST_DIR, filename) with open(path, 'wb') as fp: fp.write(img) def get_flag(cc): url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) resp = requests.get(url) return resp.content def show(text): print(text, end=" ") sys.stdout.flush() # 能在一行中顯示 def download_many(cc_list): for cc in sorted(cc_list): image = get_flag(cc) show(cc) sava_flag(image, cc.lower() + ".gif") return len(cc_list) def main(): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n{} flags download in {:.2f}s' print(msg.format(count, elapsed)) if __name__ == '__main__': main()
打印
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
20 flags download in 24.16s
知識點(diǎn):
- 按照慣例,requests不在標(biāo)準(zhǔn)庫中,在導(dǎo)入標(biāo)準(zhǔn)庫之后,用一個(gè)空行分隔開
- sys.stdout.flush() : 顯示一個(gè)字符串,然后刷新sys.stdout,這樣能在一行消息中看到進(jìn)度。Python中正常情況下,遇到換行才會刷新stdout緩沖。
使用conrurrent.futures模塊多線程下載
concurrent.futures模塊的主要特色是TheadPoolExecutor和ProcessPoolExecutor類,這兩個(gè)類實(shí)現(xiàn)的結(jié)構(gòu)能分別在不同線程或者進(jìn)程中執(zhí)行可調(diào)用對象。
這兩個(gè)類內(nèi)部維護(hù)著一個(gè)工作線程池或者進(jìn)程池,以及要執(zhí)行的任務(wù)隊(duì)列。不過,這個(gè)接口抽象的層級很高,無需關(guān)心任何實(shí)現(xiàn)細(xì)節(jié)。
下面展示如何使用TheadPoolExecutor.map方法,最簡單的方式實(shí)現(xiàn)并發(fā)下載。
import os import time import sys from concurrent import futures import requests POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split() BASE_URL = 'http://flupy.org/data/flags' DEST_DIR = 'downloads/' max_workers = 20 # 設(shè)定線程數(shù) def sava_flag(img, filename): path = os.path.join(DEST_DIR, filename) with open(path, 'wb') as fp: fp.write(img) def get_flag(cc): url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) resp = requests.get(url) return resp.content def show(text): print(text, end=" ") sys.stdout.flush() # 能在一行中顯示 def download_one(cc): image = get_flag(cc) show(cc) sava_flag(image, cc.lower() + ".gif") def download_many(cc_list): works = min(len(cc_list), max_workers) # 取其中的最小值,以免創(chuàng)建多余的線程 with futures.ThreadPoolExecutor(works) as executor: res = executor.map(download_one, cc_list) return len(list(res)) def main(): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n{} flags download in {:.2f}s' print(msg.format(count, elapsed)) if __name__ == '__main__': main()
打印
FR IN RU ID BD JP CN VN TR CD PH NG DE ET US EG IR BR MX PK
20 flags download in 2.92s
知識點(diǎn):
- 使用線程數(shù)實(shí)例化ThreadPoolExecute類。executor.__exit__方法會調(diào)用executor.shutdown(wait=True)方法,它會在所有線程執(zhí)行完畢前阻塞線程。
- executor.map方法的作用于內(nèi)置map函數(shù)類似,區(qū)別是在多個(gè)線程中并發(fā)調(diào)用,此map方法會返回一個(gè)生成器,因此可以迭代,獲取各個(gè)函數(shù)返回的值。
- return len(list(res))這里要注意下,讀取res也就是executor.map各個(gè)函數(shù)的返回值,如果線程中有異常,會在這里拋出,這與隱式調(diào)用next()函數(shù)從迭代器中獲取相應(yīng)的返回值一樣。
使用asyncio異步下載
后續(xù)章節(jié)會介紹
future是什么
從Python3.4開始,標(biāo)準(zhǔn)庫中有兩個(gè)名為Future的類:concurrent.futures.Future和asyncio.Future
這兩個(gè)類的作用相同:兩個(gè)future類的實(shí)例都可以表示已經(jīng)完成或者尚未完成的延遲計(jì)算。這與Twister引擎中的Deferred類、Tornado框架中的Future類,以及多個(gè)JavaScript庫中的Promise對象類似。
future封裝待完成的操作,可以放入隊(duì)列,完成的狀態(tài)可以查詢,得到結(jié)果(或拋出異常)。
通常情況下自己不應(yīng)該創(chuàng)建future,而只能由并發(fā)框架concurrent.futures和asyncio實(shí)例化。
原因很簡單:future表示終將發(fā)生的事情,而確定某件時(shí)間發(fā)生的唯一方式是執(zhí)行的時(shí)間(順序)已經(jīng)排定。因此,只有排定把某件事情交給concurrent.futures.Executor子類處理時(shí),才會創(chuàng)建concurrent.futures.Future實(shí)例。
Executor.submit()方法的參數(shù)是一個(gè)可調(diào)用的對象,調(diào)用這個(gè)方法后會為傳入的可調(diào)用對象排期,并返回一個(gè)future。
客戶端代碼不應(yīng)該改變future的狀態(tài),并發(fā)框架在future表示的延遲計(jì)算結(jié)束后會改變future的狀態(tài),而我們無法控制計(jì)算何時(shí)結(jié)束。
兩種future都有.done()方法,這個(gè)方法并不阻塞,返回值是布爾值,指明future鏈接的可調(diào)用對象是否已經(jīng)執(zhí)行。
客戶端代碼通常不會詢問future是否運(yùn)行結(jié)束,而是會等待通知。因
兩個(gè)Future類都有.add_done_callback()方法:這個(gè)方法只有一個(gè)參數(shù),類型是可調(diào)用對象,future運(yùn)行結(jié)束后會調(diào)用此可調(diào)用對象。
還有.result()方法,如果在future運(yùn)行結(jié)束后調(diào)用的haunt,這個(gè)方法在兩個(gè)Future類的作用相同:返回可調(diào)用對象的結(jié)果,或者拋出異常(重新拋出執(zhí)行可調(diào)用對象時(shí)拋出的異常)??墒牵绻鹒uture沒有運(yùn)行結(jié)束,result方法在兩個(gè)Future類中的行為相差很大:
對于concurrent.futures.Future實(shí)例來說,調(diào)用了f.result()方法會阻塞調(diào)用方所在的線程,直到有結(jié)果返回,此時(shí)的result方法,可以接受可選的timeout參數(shù),如果在指定實(shí)現(xiàn)內(nèi)future沒有運(yùn)行完畢,會拋出TimeoutError異常。
對于asyncio.Future.result方法,不支持設(shè)置timeout超時(shí)時(shí)間,在那個(gè)庫中獲取future結(jié)果最好使用yield from結(jié)構(gòu)。
這兩個(gè)庫中有幾個(gè)函數(shù)會返回future,其他函數(shù)則使用future以用戶易于理解的方式實(shí)現(xiàn)自身。
Executor.map方法是使用future:返回值是一個(gè)迭代器,迭代器的__next__方法調(diào)用各個(gè)future的result方法,因此得到各個(gè)future的結(jié)果。
concurrent.futures.as_completed函數(shù)參數(shù)是一個(gè)列表,返回值是一個(gè)迭代器,在future運(yùn)行結(jié)束后產(chǎn)出future。
為了理解future,使用as_completed函數(shù),把較為抽象的executor.map調(diào)換成兩個(gè)for循環(huán):一個(gè)用戶創(chuàng)建并排定future(使用summit方法),一個(gè)用于獲取future的結(jié)果。
示例,一窺神秘的future。
...其余代碼省略 def download_one(cc): image = get_flag(cc) show(cc) sava_flag(image, cc.lower() + ".gif") return cc def download_many(cc_list): cc_list = cc_list[:5] # 這次演示5個(gè)國家 future_list = [] with futures.ThreadPoolExecutor(max_workers=3) as executor: # 線程池為3,便于觀察 for cc in sorted(cc_list): future = executor.submit(download_one, cc) # submit方法排定可調(diào)用對象的執(zhí)行時(shí)間,然后返回一個(gè)future,表示待執(zhí)行操作 future_list.append(future) print("{}: {}".format(cc, future)) res = [] for future in futures.as_completed(future_list): # as_completed函數(shù)在future都執(zhí)行完畢后,返回future result = future.result() # 獲取future結(jié)果 print('{} result: {!r}'.format(future, result)) res.append(result)
打印
BR: <Future at 0x3af2870 state=running>
CN: <Future at 0x3af2c90 state=running>
ID: <Future at 0x3af2ff0 state=running>
IN: <Future at 0x3aff3b0 state=pending>
US: <Future at 0x3aff410 state=pending>
CN <Future at 0x3af2c90 state=finished returned str> result: 'CN'
ID <Future at 0x3af2ff0 state=finished returned str> result: 'ID'
BR <Future at 0x3af2870 state=finished returned str> result: 'BR'
IN <Future at 0x3aff3b0 state=finished returned str> result: 'IN'
US <Future at 0x3aff410 state=finished returned str> result: 'US'
知識點(diǎn):
- summit方法把一個(gè)可執(zhí)行對象(函數(shù)),變?yōu)橐粋€(gè)future對象,并且記錄了執(zhí)行時(shí)間,表示待執(zhí)行打操作。
- futures.as_completed在所有的future執(zhí)行完畢后,產(chǎn)出future對象。而后可以使用future.result()獲取結(jié)果
- 直接打印future對象(調(diào)用future的repr()方法),會顯示future的狀態(tài),例如running、pending(等待)、finished
GIL和阻塞型I/O
嚴(yán)格來說,上面實(shí)現(xiàn)的多線程并發(fā)腳本都不能實(shí)現(xiàn)并行下載。
使用concurrent.futures庫實(shí)現(xiàn)的示例,都會受到GIL(Global Interpreter Lock,全局解釋器鎖)的限制,腳本只能在單個(gè)線程中執(zhí)行。
CPython解釋器本身就不是線程安全的,因此會有全局解釋器鎖GIL,一次只允許使用一個(gè)線程執(zhí)行Python字節(jié)碼。
因此一個(gè)Python進(jìn)程通常不能同時(shí)使用多個(gè)CPU核心。(在Jython和IronPython中沒有此限制,目前最快的PyPy解釋器也存在GIL) IronPython是.net實(shí)現(xiàn)的
Python代碼無法控制GIL,然而,標(biāo)準(zhǔn)庫中所以執(zhí)行阻塞型IO操作的函數(shù),在等待操作系統(tǒng)返回結(jié)果時(shí)都會釋放GIL。這意味著在Python語言這個(gè)層次上可以使用多線程,而IO密集型操作能從中受益:一個(gè)Python線程等待網(wǎng)絡(luò)響應(yīng)時(shí),阻塞型IO函數(shù)會釋放GIL,再運(yùn)行一個(gè)線程。比如time.sleep()函數(shù)也會釋放GIL。
GIL簡化了CPython和C語言擴(kuò)展的實(shí)現(xiàn)。得益于GIL,Python有很多C語言擴(kuò)展。
使用concurrent.futures模塊多進(jìn)程
在處理CPU密集型操作時(shí),可以使用多進(jìn)程,實(shí)現(xiàn)真正的并行計(jì)算。
使用ProcessPoolExecutor類把任務(wù)分配給多個(gè)Python進(jìn)程處理。因此如果需要做CPU密集型操作,使用這個(gè)模塊多進(jìn)程能繞開GIL,利用所有可用的CPU核心。
ProcessPoolExecutor和ThreadPoolExecutor類都實(shí)現(xiàn)了通用的Executor接口,因此使用concurrent.futures模塊能輕松的把基于線程的方案轉(zhuǎn)成基于進(jìn)程的方案。
這兩個(gè)實(shí)現(xiàn)Executor接口的類,唯一的區(qū)別是,ThreadPoolExecutor.__init__方法需要max_workers參數(shù),指定線程池中線程的數(shù)量。
但是在ProcessPoolExecutor類中,這個(gè)參數(shù)是可選的,而且大多數(shù)情況下使用默認(rèn)值:os.cpu_count()函數(shù)返回的CPU數(shù)量。因?yàn)閷τ贑PU密集型操作來說,不可能要求使用超過CPU數(shù)量的進(jìn)程。
經(jīng)過測試,使用ProcessPoolExecutor實(shí)例下載20個(gè)國旗的時(shí)間,要比ThreadPoolExecutor要慢,主要原因是我電腦是四核八線程,八個(gè)邏輯處理器,因此限制只有4個(gè)并發(fā)下載,而使用線程池的版本有20個(gè)工作線程。
ProcessPoolExecutor的價(jià)值體現(xiàn)在CPU密集型操作上。比如對于加密算法上,使用ProcessPoolExecutor類派生出四個(gè)工作進(jìn)程后,性能可以提高兩倍。
如果使用PyPy比CPython相比,速度又能提高3.8倍。所以使用Python進(jìn)行CPU密集型操作,應(yīng)該試試PyPy,普遍快3.8~5.1倍。
實(shí)驗(yàn)Executor.map方法
若想并發(fā)運(yùn)行多個(gè)可調(diào)用對象,最簡單是是使用Executor.map方法。
示例,演示Executor.map方法的某些運(yùn)作細(xì)節(jié)
import time from concurrent import futures def display(*args): """把參數(shù)打印前,加上時(shí)間顯示""" print(time.strftime('[%H:%M:%S]'), end=" ") print(*args) def loiter(n): """開始時(shí)顯示一個(gè)消息,然后休眠n秒,最后再結(jié)束的時(shí)候在顯示一個(gè)消息 消息使用制表符縮進(jìn),縮進(jìn)量由n值確定 loiter:徘徊,閑著,閑蕩 """ msg = '{}loiter({}):doing nothing for {}s' display(msg.format('\t' * n, n, n)) time.sleep(n) msg = '{}loiter({}):done' display(msg.format('\t' * n, n)) return n * 10 # 隨意返回一個(gè)結(jié)果 def main(): display('Script starting.') # 腳本開始 executor = futures.ThreadPoolExecutor(max_workers=3) results = executor.map(loiter, range(5)) # 把5個(gè)任務(wù)交個(gè)3個(gè)線程 display('results :', results) # 打印調(diào)用executor.map的結(jié)果,是一個(gè)生成器 display('waiting for individual results:') # 等待個(gè)體結(jié)果 for i, result in enumerate(results): display('result {}: {}'.format(i, result)) if __name__ == '__main__': main()
打印
[17:40:08] Script starting.
[17:40:08] loiter(0):doing nothing for 0s
[17:40:08] loiter(0):done
[17:40:08] loiter(1):doing nothing for 1s
[17:40:08] loiter(2):doing nothing for 2s
[17:40:08][17:40:08] results : loiter(3):doing nothing for 3s
<generator object Executor.map.<locals>.result_iterator at 0x0318CDB0>
[17:40:08] waiting for individual results:
[17:40:08] result 0: 0
[17:40:09] loiter(1):done
[17:40:09] loiter(4):doing nothing for 4s
[17:40:09] result 1: 10
[17:40:10] loiter(2):done
[17:40:10] result 2: 20
[17:40:11] loiter(3):done
[17:40:11] result 3: 30
[17:40:13] loiter(4):done
[17:40:13] result 4: 40
知識點(diǎn):
- 示例中把5個(gè)任務(wù)交給executor(3個(gè)線程),其中前三個(gè)任務(wù)會立即開始;這是非阻塞調(diào)用。
- 在for循環(huán)中會隱式調(diào)用next(results),這個(gè)函數(shù)又會在第一個(gè)任務(wù)的future上調(diào)用future.result()方法。result方法會阻塞,直到這個(gè)future運(yùn)行結(jié)束。所以這個(gè)for循環(huán)每次迭代都會阻塞,等到結(jié)果出來后,才會繼續(xù)。
- 每次的打印結(jié)果都可能不一樣。由于sleep函數(shù)總會釋放GIL,即使是sleep(0),所以loiter(1)有可能在loiter(0)結(jié)束之前開始運(yùn)行,但是這個(gè)示例中沒有。三個(gè)線程是同時(shí)開始。
- executor.map的結(jié)果是一個(gè)生成器,這個(gè)操作不會阻塞。
- loiter(0)的結(jié)果result 0: 0打印沒有阻塞的原因是,在for循環(huán)之前future已經(jīng)執(zhí)行完成,可以看到輸出了done。
綜上,Executor.map函數(shù)易于使用,有個(gè)特征算是優(yōu)點(diǎn),但也可能沒用變成缺點(diǎn),具體情況取決于需求:map函數(shù)返回的結(jié)果順序于調(diào)用開始的順序一致。
如果第一個(gè)任務(wù)生成結(jié)果用時(shí)10秒,而其他任務(wù)調(diào)用只用1秒,代碼就會阻塞10秒,獲取map方法返回生成器的第一個(gè)結(jié)果。在此之后,獲取后續(xù)結(jié)果時(shí)不會阻塞,因?yàn)楹罄m(xù)的調(diào)用已經(jīng)結(jié)束,所以,獲取循環(huán)所有結(jié)果,阻塞的用時(shí)等于最長的任務(wù)時(shí)間。
如果必須等待獲取所有結(jié)果后再處理的場景,這種行為沒問題;不過,通常更常用的方式是,不管提交的順序,只有有結(jié)果就獲取。這樣就要使用executor.submit方法和futures.as_completed函數(shù)結(jié)合起來使用。
executor.submit和futures.as_completed這個(gè)組合比executor.map更靈活:
- 因?yàn)閟ubmit方法能處理不同的可調(diào)用對象和參數(shù),而executor.map只能處理參數(shù)不同的同一個(gè)可調(diào)用對象
- 此外,傳給future.as_completed函數(shù)的future集合可以來自多個(gè)Executor實(shí)例
- futures.as_completed只返回已經(jīng)運(yùn)行結(jié)束的future
顯示下載進(jìn)度條
Python內(nèi)置庫有tqdm包,taqadum在阿拉伯語中的意思是進(jìn)展。
tqdm可以在長循環(huán)中添加一個(gè)進(jìn)度提示信息,用戶只需要封裝任意的迭代器 tqdm(iterator)
import time from tqdm import tqdm for i in tqdm(range(1000)): time.sleep(.01) 100%|██████████| 1000/1000 [00:10<00:00, 95.34it/s]
tqdm能處理任何可迭代對象,生成一個(gè)迭代器;使用這個(gè)迭代器時(shí),顯示進(jìn)度條和完成全部迭代
為了計(jì)算剩余時(shí)間,tqdm函數(shù)要獲取可以使用len函數(shù)的可迭代對象,或者在第二個(gè)參數(shù)中指定預(yù)期的元素?cái)?shù)量。
例如,futures.as_completed函數(shù)的結(jié)果,就不支持len函數(shù),只能使用tdqm的第二個(gè)參數(shù)total=來指定數(shù)量。
網(wǎng)絡(luò)下載增加錯(cuò)誤處理和進(jìn)度條
下面的示例中負(fù)責(zé)下載一個(gè)文件的函數(shù)(download_one)中使用相同的策略處理HTTP 404錯(cuò)誤。其他異常則向上冒泡,交給download_many函數(shù)處理。
示例,負(fù)責(zé)下載的基本函數(shù)。
def get_flag(base_url, cc): url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower()) resp = requests.get(url) if resp.status_code != 200: resp.raise_for_status() # 如果狀態(tài)碼不是200,產(chǎn)生一個(gè)HttpError的異常 return resp.content def download_one(cc, base_url, verbose=False): try: image = get_flag(base_url, cc) except requests.exceptions.HTTPError as exc: res = exc.response if res.status_code == 404: status = HTTPStatus.not_found msg = 'not found' else: # 如果不是404異常,向上冒泡,傳給調(diào)用方 raise else: sava_flag(image, cc.lower() + '.gif') status = HTTPStatus.ok msg = 'ok' if verbose: print(cc, msg) return Result(status, cc)
示例,依序下載的download_many函數(shù)
def download_many(cc_list, base_url, verbose, max_req): """實(shí)現(xiàn)依序下載""" counter = collections.Counter() # 統(tǒng)計(jì)不同的下載狀態(tài):HTTPStatus.ok、HTTPStatus.not_found、HTTPStatus.error cc_iter = sorted(cc_list) if not verbose: cc_iter = tqdm.tqdm(cc_iter) # 如果不需要詳細(xì)模式,就使用進(jìn)度條展示 for cc in cc_iter: try: res = download_one(cc, base_url, verbose) except requests.exceptions.HTTPError as exc: error_msg = "HTTP error {res.status_code} - {res.reason}" error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' status = res.status if error_msg: status = HTTPStatus.error counter[status] += 1 if verbose and error_msg: print('*** Error for {}:{}'.format(cc, error_msg)) return counter
知識點(diǎn):
requests.exceptions中有所有的requests相關(guān)的異常類,可以用來捕獲相關(guān)異常。
如果有響應(yīng)信息后,產(chǎn)生的異常,異常對象exc.response的status_code狀態(tài)碼和reason異常原因
示例,多線程下載的download_many函數(shù)
default_concur_req = 30 # 默認(rèn)的線程池大小 max_concur_req = 1000 # 最大并發(fā)請求數(shù),這是一個(gè)安全措施 def download_many(cc_list, base_url, verbose, concur_req): counter = collections.Counter() with futures.ThreadPoolExecutor(max_workers=concur_req) as executor: to_do_map = {} for cc in sorted(cc_list): future = executor.submit(download_one, cc, base_url, verbose) # submit排定一個(gè)可調(diào)用對象的執(zhí)行時(shí)間,返回一個(gè)Future實(shí)例 to_do_map[future] = cc # 把各個(gè)Future實(shí)例映射到國家代碼上,在錯(cuò)誤處理時(shí)使用 done_iter = futures.as_completed(to_do_map) # 返回一個(gè)迭代器,在future運(yùn)行結(jié)束后產(chǎn)出future if not verbose: done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) # 如果不是詳細(xì)模式,就顯示進(jìn)度條,因?yàn)閐one_iter沒有l(wèi)en函數(shù),只能通過total參數(shù)傳入 for future in done_iter: try: res = future.result() except requests.exceptions.HTTPError as exc: error_msg = 'HTTP {res.status_code} - {res.reason}' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = "Connection error" else: error_msg = "" status = res.status if error_msg: status = HTTPStatus.error counter[status] +=1 if verbose and error_msg: cc = to_do_map[future] print('*** Error for {}:{}'.format(cc, error_msg)) return counter
知識點(diǎn):
對futures.as_completed函數(shù)的慣用法:構(gòu)建一個(gè)字典,把各個(gè)future映射到其他數(shù)據(jù)上,future運(yùn)行結(jié)束后可能會有用。比如上述示例,把future映射到國家代碼上。
線程和多進(jìn)程的代替方案
對于多線程,如果futures.ThreadPoolExecutor類對某個(gè)作業(yè)來首不夠靈活,可能要使用到threading模塊中的組件(如Thread、Lock、Semaphore等)自行制定方案,
比如使用queue模塊創(chuàng)建線程安全的隊(duì)列,在線程之間傳遞數(shù)據(jù)。futures.ThreadPoolExecutor類已經(jīng)封裝好了這些組件。
對于CPU密集型工作來說,要啟動多個(gè)進(jìn)程,規(guī)避GIL。創(chuàng)建多個(gè)進(jìn)程的最簡單方式是用futures.ProcessPoolExecutor類。如果使用場景較復(fù)雜,需要更高級的工具,multiprocessing模塊的API和threading模塊相仿,不過作業(yè)交給多個(gè)進(jìn)程處理。
到此這篇關(guān)于Python使用future處理并發(fā)問題方案詳解的文章就介紹到這了,更多相關(guān)Python future處理并發(fā)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python使用pyppeteer模塊實(shí)現(xiàn)無頭瀏覽器自動化
pyppeteer是一個(gè)基于Python的模塊,它提供了一個(gè)高級的API,可以通過控制無頭瀏覽器來實(shí)現(xiàn)自動化網(wǎng)頁操作,下面我們就來看看Python如何使用pyppeteer模塊實(shí)現(xiàn)無頭瀏覽器自動化吧2024-02-02Python和JS反爬之解決反爬參數(shù)?signKey
這篇文章主要介紹了Python和JS反爬之解決反爬參數(shù)?signKey,Python?反爬中有一大類,叫做字體反爬,核心的理論就是通過字體文件或者?CSS?偏移,接下來文章的詳細(xì)介紹,需要的小伙伴可以參考一下2022-05-05python SVD壓縮圖像的實(shí)現(xiàn)代碼
這篇文章主要介紹了python SVD壓縮圖像的實(shí)現(xiàn)代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11關(guān)于PyTorch 自動求導(dǎo)機(jī)制詳解
今天小編就為大家分享一篇關(guān)于PyTorch 自動求導(dǎo)機(jī)制詳解,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-08-08Python之urlencode和urldecode案例講解
這篇文章主要介紹了Python之urlencode和urldecode案例講解,本篇文章通過簡要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-08-08淺談django model的get和filter方法的區(qū)別(必看篇)
下面小編就為大家?guī)硪黄獪\談django model的get和filter方法的區(qū)別(必看篇)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-05-05