Python如何實(shí)現(xiàn)線程間通信
問題
你的程序中有多個(gè)線程,你需要在這些線程之間安全地交換信息或數(shù)據(jù)
解決方案
從一個(gè)線程向另一個(gè)線程發(fā)送數(shù)據(jù)最安全的方式可能就是使用 queue 庫中的隊(duì)列了。創(chuàng)建一個(gè)被多個(gè)線程共享的 Queue 對(duì)象,這些線程通過使用 put() 和 get() 操作來向隊(duì)列中添加或者刪除元素。 例如:
from queue import Queue from threading import Thread # A thread that produces data def producer(out_q): while True: # Produce some data ... out_q.put(data) # A thread that consumes data def consumer(in_q): while True: # Get some data data = in_q.get() # Process the data ... # Create the shared queue and launch both threads q = Queue() t1 = Thread(target=consumer, args=(q,)) t2 = Thread(target=producer, args=(q,)) t1.start() t2.start()
Queue 對(duì)象已經(jīng)包含了必要的鎖,所以你可以通過它在多個(gè)線程間多安全地共享數(shù)據(jù)。 當(dāng)使用隊(duì)列時(shí),協(xié)調(diào)生產(chǎn)者和消費(fèi)者的關(guān)閉問題可能會(huì)有一些麻煩。一個(gè)通用的解決方法是在隊(duì)列中放置一個(gè)特殊的值,當(dāng)消費(fèi)者讀到這個(gè)值的時(shí)候,終止執(zhí)行。例如:
from queue import Queue from threading import Thread # Object that signals shutdown _sentinel = object() # A thread that produces data def producer(out_q): while running: # Produce some data ... out_q.put(data) # Put the sentinel on the queue to indicate completion out_q.put(_sentinel) # A thread that consumes data def consumer(in_q): while True: # Get some data data = in_q.get() # Check for termination if data is _sentinel: in_q.put(_sentinel) break # Process the data ...
本例中有一個(gè)特殊的地方:消費(fèi)者在讀到這個(gè)特殊值之后立即又把它放回到隊(duì)列中,將之傳遞下去。這樣,所有監(jiān)聽這個(gè)隊(duì)列的消費(fèi)者線程就可以全部關(guān)閉了。 盡管隊(duì)列是最常見的線程間通信機(jī)制,但是仍然可以自己通過創(chuàng)建自己的數(shù)據(jù)結(jié)構(gòu)并添加所需的鎖和同步機(jī)制來實(shí)現(xiàn)線程間通信。最常見的方法是使用 Condition
變量來包裝你的數(shù)據(jù)結(jié)構(gòu)。下邊這個(gè)例子演示了如何創(chuàng)建一個(gè)線程安全的優(yōu)先級(jí)隊(duì)列
import heapq import threading class PriorityQueue: def __init__(self): self._queue = [] self._count = 0 self._cv = threading.Condition() def put(self, item, priority): with self._cv: heapq.heappush(self._queue, (-priority, self._count, item)) self._count += 1 self._cv.notify() def get(self): with self._cv: while len(self._queue) == 0: self._cv.wait() return heapq.heappop(self._queue)[-1]
使用隊(duì)列來進(jìn)行線程間通信是一個(gè)單向、不確定的過程。通常情況下,你沒有辦法知道接收數(shù)據(jù)的線程是什么時(shí)候接收到的數(shù)據(jù)并開始工作的。不過隊(duì)列對(duì)象提供一些基本完成的特性,比如下邊這個(gè)例子中的 task_done()
和 join()
:
from queue import Queue from threading import Thread # A thread that produces data def producer(out_q): while running: # Produce some data ... out_q.put(data) # A thread that consumes data def consumer(in_q): while True: # Get some data data = in_q.get() # Process the data ... # Indicate completion in_q.task_done() # Create the shared queue and launch both threads q = Queue() t1 = Thread(target=consumer, args=(q,)) t2 = Thread(target=producer, args=(q,)) t1.start() t2.start() # Wait for all produced items to be consumed q.join()
如果一個(gè)線程需要在一個(gè)“消費(fèi)者”線程處理完特定的數(shù)據(jù)項(xiàng)時(shí)立即得到通知,你可以把要發(fā)送的數(shù)據(jù)和一個(gè) Event
放到一起使用,這樣“生產(chǎn)者”就可以通過這個(gè)Event
對(duì)象來監(jiān)測(cè)處理的過程了。示例如下:
from queue import Queue from threading import Thread, Event # A thread that produces data def producer(out_q): while running: # Produce some data ... # Make an (data, event) pair and hand it to the consumer evt = Event() out_q.put((data, evt)) ... # Wait for the consumer to process the item evt.wait() # A thread that consumes data def consumer(in_q): while True: # Get some data data, evt = in_q.get() # Process the data ... # Indicate completion evt.set()
討論
基于簡單隊(duì)列編寫多線程程序在多數(shù)情況下是一個(gè)比較明智的選擇。從線程安全隊(duì)列的底層實(shí)現(xiàn)來看,你無需在你的代碼中使用鎖和其他底層的同步機(jī)制,這些只會(huì)把你的程序弄得亂七八糟。此外,使用隊(duì)列這種基于消息的通信機(jī)制可以被擴(kuò)展到更大的應(yīng)用范疇,比如,你可以把你的程序放入多個(gè)進(jìn)程甚至是分布式系統(tǒng)而無需改變底層的隊(duì)列結(jié)構(gòu)。 使用線程隊(duì)列有一個(gè)要注意的問題是,向隊(duì)列中添加數(shù)據(jù)項(xiàng)時(shí)并不會(huì)復(fù)制此數(shù)據(jù)項(xiàng),線程間通信實(shí)際上是在線程間傳遞對(duì)象引用。如果你擔(dān)心對(duì)象的共享狀態(tài),那你最好只傳遞不可修改的數(shù)據(jù)結(jié)構(gòu)(如:整型、字符串或者元組)或者一個(gè)對(duì)象的深拷貝。例如:
from queue import Queue from threading import Thread import copy # A thread that produces data def producer(out_q): while True: # Produce some data ... out_q.put(copy.deepcopy(data)) # A thread that consumes data def consumer(in_q): while True: # Get some data data = in_q.get() # Process the data ...
Queue
對(duì)象提供一些在當(dāng)前上下文很有用的附加特性。比如在創(chuàng)建 Queue 對(duì)象時(shí)提供可選的 size
參數(shù)來限制可以添加到隊(duì)列中的元素?cái)?shù)量。對(duì)于“生產(chǎn)者”與“消費(fèi)者”速度有差異的情況,為隊(duì)列中的元素?cái)?shù)量添加上限是有意義的。比如,一個(gè)“生產(chǎn)者”產(chǎn)生項(xiàng)目的速度比“消費(fèi)者” “消費(fèi)”的速度快,那么使用固定大小的隊(duì)列就可以在隊(duì)列已滿的時(shí)候阻塞隊(duì)列,以免未預(yù)期的連鎖效應(yīng)擴(kuò)散整個(gè)程序造成死鎖或者程序運(yùn)行失常。在通信的線程之間進(jìn)行“流量控制”是一個(gè)看起來容易實(shí)現(xiàn)起來困難的問題。如果你發(fā)現(xiàn)自己曾經(jīng)試圖通過擺弄隊(duì)列大小來解決一個(gè)問題,這也許就標(biāo)志著你的程序可能存在脆弱設(shè)計(jì)或者固有的可伸縮問題。 get()
和 put()
方法都支持非阻塞方式和設(shè)定超時(shí),例如:
import queue q = queue.Queue() try: data = q.get(block=False) except queue.Empty: ... try: q.put(item, block=False) except queue.Full: ... try: data = q.get(timeout=5.0) except queue.Empty: ...
這些操作都可以用來避免當(dāng)執(zhí)行某些特定隊(duì)列操作時(shí)發(fā)生無限阻塞的情況,比如,一個(gè)非阻塞的 put()
方法和一個(gè)固定大小的隊(duì)列一起使用,這樣當(dāng)隊(duì)列已滿時(shí)就可以執(zhí)行不同的代碼。比如輸出一條日志信息并丟棄。
def producer(q): ... try: q.put(item, block=False) except queue.Full: log.warning('queued item %r discarded!', item)
如果你試圖讓消費(fèi)者線程在執(zhí)行像 q.get()
這樣的操作時(shí),超時(shí)自動(dòng)終止以便檢查終止標(biāo)志,你應(yīng)該使用 q.get()
的可選參數(shù) timeout
,如下:
_running = True def consumer(q): while _running: try: item = q.get(timeout=5.0) # Process item ... except queue.Empty: pass
最后,有 q.qsize()
, q.full()
, q.empty()
等實(shí)用方法可以獲取一個(gè)隊(duì)列的當(dāng)前大小和狀態(tài)。但要注意,這些方法都不是線程安全的??赡苣銓?duì)一個(gè)隊(duì)列使用 empty()
判斷出這個(gè)隊(duì)列為空,但同時(shí)另外一個(gè)線程可能已經(jīng)向這個(gè)隊(duì)列中插入一個(gè)數(shù)據(jù)項(xiàng)。所以,你最好不要在你的代碼中使用這些方法。
以上就是Python如何實(shí)現(xiàn)線程間通信的詳細(xì)內(nèi)容,更多關(guān)于Python 線程間通信的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Python TCP通信客戶端服務(wù)端代碼實(shí)例
- Python警察與小偷的實(shí)現(xiàn)之一客戶端與服務(wù)端通信實(shí)例
- Python通過隊(duì)列來實(shí)現(xiàn)進(jìn)程間通信的示例
- Python 串口通信的實(shí)現(xiàn)
- Python使用socket模塊實(shí)現(xiàn)簡單tcp通信
- Python實(shí)現(xiàn)UDP程序通信過程圖解
- 如何通過Python3和ssl實(shí)現(xiàn)加密通信功能
- Python通過4種方式實(shí)現(xiàn)進(jìn)程數(shù)據(jù)通信
- python實(shí)現(xiàn)串口通信的示例代碼
- python實(shí)現(xiàn)局域網(wǎng)內(nèi)實(shí)時(shí)通信代碼
- python 實(shí)現(xiàn)客戶端與服務(wù)端的通信
相關(guān)文章
python matplotlib畫圖庫學(xué)習(xí)繪制常用的圖
這篇文章主要為大家詳細(xì)介紹了python matplotlib畫圖庫學(xué)習(xí)繪制常用的圖,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-03-03解決Pytorch 訓(xùn)練與測(cè)試時(shí)爆顯存(out of memory)的問題
今天小編就為大家分享一篇解決Pytorch 訓(xùn)練與測(cè)試時(shí)爆顯存(out of memory)的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-08-08matplotlib交互式數(shù)據(jù)光標(biāo)mpldatacursor的實(shí)現(xiàn)
這篇文章主要介紹了matplotlib交互式數(shù)據(jù)光標(biāo)mpldatacursor的實(shí)現(xiàn) ,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02Python自動(dòng)化處理Excel數(shù)據(jù)的操作過程
在實(shí)際數(shù)據(jù)處理和分析過程中,經(jīng)常會(huì)遇到需要從大量數(shù)據(jù)中提取出特定日期范圍內(nèi)的信息的需求,本文將介紹如何使用Python的pandas庫來處理Excel文件,感興趣的朋友跟隨小編一起看看吧2023-11-11Python下使用Scrapy爬取網(wǎng)頁內(nèi)容的實(shí)例
今天小編就為大家分享一篇Python下使用Scrapy爬取網(wǎng)頁內(nèi)容的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-05-05python計(jì)算書頁碼的統(tǒng)計(jì)數(shù)字問題實(shí)例
這篇文章主要介紹了python計(jì)算書頁碼的統(tǒng)計(jì)數(shù)字問題實(shí)例,對(duì)比2個(gè)實(shí)例講述了數(shù)字統(tǒng)計(jì)的技巧,非常實(shí)用,需要的朋友可以參考下2014-09-09