python3?queue多線(xiàn)程通信
queue分類(lèi)
python3 queue分三類(lèi):
- 先進(jìn)先出隊(duì)列
- 后進(jìn)先出的棧
- 優(yōu)先級(jí)隊(duì)列
他們的導(dǎo)入方式分別是:
from queue import Queue from queue import LifoQueue from queue import
具體方法見(jiàn)下面引用說(shuō)明。
例子一、生產(chǎn)消費(fèi)模式
Queue
對(duì)象已經(jīng)包含了必要的鎖,所以你可以通過(guò)它在多個(gè)線(xiàn)程間多安全地共享數(shù)據(jù)。 當(dāng)使用隊(duì)列時(shí),協(xié)調(diào)生產(chǎn)者和消費(fèi)者的關(guān)閉問(wèn)題可能會(huì)有一些麻煩。一個(gè)通用的解決方法是在隊(duì)列中放置一個(gè)特殊的值,當(dāng)消費(fèi)者讀到這個(gè)值的時(shí)候,終止執(zhí)行。
例如:
from queue import Queue from threading import Thread # 用來(lái)表示終止的特殊對(duì)象 _sentinel = object() # A thread that produces data def producer(out_q): for i in range(10): print("生產(chǎn)") out_q.put(i) out_q.put(_sentinel) # A thread that consumes data def consumer(in_q): while True: data = in_q.get() if data is _sentinel: in_q.put(_sentinel) break else: print("消費(fèi)", data) # Create the shared queue and launch both threads q = Queue() t1 = Thread(target=consumer, args=(q,)) t2 = Thread(target=producer, args=(q,)) t1.start() t2.start()
結(jié)果:
本例中有一個(gè)特殊的地方:消費(fèi)者在讀到這個(gè)特殊值之后立即又把它放回到隊(duì)列中,將之傳遞下去。這樣,所有監(jiān)聽(tīng)這個(gè)隊(duì)列的消費(fèi)者線(xiàn)程就可以全部關(guān)閉了。 盡管隊(duì)列是最常見(jiàn)的線(xiàn)程間通信機(jī)制,但是仍然可以自己通過(guò)創(chuàng)建自己的數(shù)據(jù)結(jié)構(gòu)并添加所需的鎖和同步機(jī)制來(lái)實(shí)現(xiàn)線(xiàn)程間通信。最常見(jiàn)的方法是使用 Condition
變量來(lái)包裝你的數(shù)據(jù)結(jié)構(gòu)。下邊這個(gè)例子演示了如何創(chuàng)建一個(gè)線(xiàn)程安全的優(yōu)先級(jí)隊(duì)列。
import heapq import threading class PriorityQueue: def __init__(self): self._queue = [] self._count = 0 self._cv = threading.Condition() def put(self, item, priority): with self._cv: heapq.heappush(self._queue, (-priority, self._count, item)) self._count += 1 self._cv.notify() def get(self): with self._cv: while len(self._queue) == 0: self._cv.wait() return heapq.heappop(self._queue)[-1]
例子二、task_done和join
使用隊(duì)列來(lái)進(jìn)行線(xiàn)程間通信是一個(gè)單向、不確定的過(guò)程。通常情況下,你沒(méi)有辦法知道接收數(shù)據(jù)的線(xiàn)程是什么時(shí)候接收到的數(shù)據(jù)并開(kāi)始工作的。不過(guò)隊(duì)列對(duì)象提供一些基本完成的特性,比如下邊這個(gè)例子中的task_done()
和 join()
:
from queue import Queue from threading import Thread class Producer(Thread): def __init__(self, q): super().__init__() self.count = 5 self.q = q def run(self): while self.count > 0: print("生產(chǎn)") if self.count == 1: self.count -= 1 self.q.put(2) else: self.count -= 1 self.q.put(1) class Consumer(Thread): def __init__(self, q): super().__init__() self.q = q def run(self): while True: print("消費(fèi)") data = self.q.get() if data == 2: print("stop because data=", data) # 任務(wù)完成,從隊(duì)列中清除一個(gè)元素 self.q.task_done() break else: print("data is good,data=", data) # 任務(wù)完成,從隊(duì)列中清除一個(gè)元素 self.q.task_done() def main(): q = Queue() p = Producer(q) c = Consumer(q) p.setDaemon(True) c.setDaemon(True) p.start() c.start() # 等待隊(duì)列清空 q.join() print("queue is complete") if __name__ == '__main__': main()
結(jié)果:
例子三、多線(xiàn)程里用queue
設(shè)置倆隊(duì)列,一個(gè)是要做的任務(wù)隊(duì)列todo_queue
,一個(gè)是已經(jīng)完成的隊(duì)列done_queue
。
每次執(zhí)行線(xiàn)程,先從todo_queue
隊(duì)列里取出一個(gè)值,然后執(zhí)行完,放入done_queue
隊(duì)列。
如果todo_queue
為空,就退出。
import logging import logging.handlers import threading import queue log_mgr = None todo_queue = queue.Queue() done_queue = queue.Queue() class LogMgr: def __init__(self, logpath): self.LOG = logging.getLogger('log') loghd = logging.handlers.RotatingFileHandler(logpath, "a", 0, 1) fmt = logging.Formatter("%(asctime)s %(threadName)-10s %(message)s", "%Y-%m-%d %H:%M:%S") loghd.setFormatter(fmt) self.LOG.addHandler(loghd) self.LOG.setLevel(logging.INFO) def info(self, msg): if self.LOG is not None: self.LOG.info(msg) class Worker(threading.Thread): global log_mgr def __init__(self, name): threading.Thread.__init__(self) self.name = name def run(self): while True: try: task = todo_queue.get(False) if task: log_mgr.info("HANDLE_TASK: %s" % task) done_queue.put(1) except queue.Empty: break return def main(): global log_mgr log_mgr = LogMgr("mylog") for i in range(30): todo_queue.put("data"+str(i)) workers = [] for i in range(3): w = Worker("worker"+str(i)) workers.append(w) for i in range(3): workers[i].start() for i in range(3): workers[i].join() total_num = done_queue.qsize() log_mgr.info("TOTAL_HANDLE_TASK: %d" % total_num) exit(0) if __name__ == '__main__': main()
輸出日志文件結(jié)果:
到此這篇關(guān)于python3 queue多線(xiàn)程通信的文章就介紹到這了,更多相關(guān)python queue多線(xiàn)程通信內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python字符串對(duì)象實(shí)現(xiàn)原理詳解
這篇文章主要介紹了Python字符串對(duì)象實(shí)現(xiàn)原理詳解,在Python世界中將對(duì)象分為兩種:一種是定長(zhǎng)對(duì)象,比如整數(shù),整數(shù)對(duì)象定義的時(shí)候就能確定它所占用的內(nèi)存空間大小,另一種是變長(zhǎng)對(duì)象,在對(duì)象定義時(shí)并不知道是多少,需要的朋友可以參考下2019-07-07一文帶你解密Python可迭代對(duì)象的排序問(wèn)題
這篇文章主要為大家詳細(xì)介紹一下Python中可迭代對(duì)象的排序問(wèn)題,文中的示例代碼講解詳細(xì),對(duì)我們深入了解Python有一定幫助,感興趣的可以了解一下2022-07-07Pandas進(jìn)行數(shù)據(jù)編碼的十種方式總結(jié)
在機(jī)器學(xué)習(xí)中,很多算法都需要我們對(duì)分類(lèi)特征進(jìn)行轉(zhuǎn)換(編碼),即根據(jù)某一列的值,新增(修改)一列。本文為大家總結(jié)了Pandas中十種數(shù)據(jù)編碼的方式,需要的可以參考一下2022-04-04在Python中使用defaultdict初始化字典以及應(yīng)用方法
今天小編就為大家分享一篇在Python中使用defaultdict初始化字典以及應(yīng)用方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-10-10小議Python中自定義函數(shù)的可變參數(shù)的使用及注意點(diǎn)
Python函數(shù)的默認(rèn)值參數(shù)只會(huì)在函數(shù)定義處被解析一次,以后再使用時(shí)這個(gè)默認(rèn)值還是一樣,這在與可變參數(shù)共同使用時(shí)便會(huì)產(chǎn)生困惑,下面就來(lái)小議Python中自定義函數(shù)的可變參數(shù)的使用及注意點(diǎn)2016-06-06windows下python安裝paramiko模塊和pycrypto模塊(簡(jiǎn)單三步)
這篇文章主要給大家介紹了通過(guò)簡(jiǎn)單的三個(gè)步驟在windows下python中安裝paramiko模塊和pycrypto模塊的相關(guān)資料,文中安裝的步驟,簡(jiǎn)單而且又易于大家理解,需要的朋友們下面跟著小編一起來(lái)學(xué)習(xí)學(xué)習(xí)吧。2017-07-07