Python實(shí)現(xiàn)線程池之線程安全隊(duì)列
本文實(shí)例為大家分享了Python實(shí)現(xiàn)線程池之線程安全隊(duì)列的具體代碼,供大家參考,具體內(nèi)容如下
一、線程池組成
一個(gè)完整的線程池由下面幾部分組成,線程安全隊(duì)列、任務(wù)對(duì)象、線程處理對(duì)象、線程池對(duì)象。其中一個(gè)線程安全的隊(duì)列是實(shí)現(xiàn)線程池和任務(wù)隊(duì)列的基礎(chǔ),本節(jié)我們通過threading包中的互斥量threading.Lock()和條件變量threading.Condition()來實(shí)現(xiàn)一個(gè)簡單的、讀取安全的線程隊(duì)列。

二、線程安全隊(duì)列的實(shí)現(xiàn)
包括put、pop、get等方法,為保證線程安全,讀寫操作時(shí)要添加互斥鎖;并且pop操作可以設(shè)置等待時(shí)間以阻塞當(dāng)前獲取元素的線程,當(dāng)新元素寫入隊(duì)列時(shí)通過條件變量通知解除等待操作。
class ThreadSafeQueue(object): ? ? def __init__(self, max_size=0): ? ? ? ? self.queue = [] ? ? ? ? self.max_size = max_size ?# max_size為0表示無限大 ? ? ? ? self.lock = threading.Lock() ?# 互斥量 ? ? ? ? self.condition = threading.Condition() ?# 條件變量 ? ? def size(self): ? ? ? ? """ ? ? ? ? 獲取當(dāng)前隊(duì)列的大小 ? ? ? ? :return: 隊(duì)列長度 ? ? ? ? """ ? ? ? ? # 加鎖 ? ? ? ? self.lock.acquire() ? ? ? ? size = len(self.queue) ? ? ? ? self.lock.release() ? ? ? ? return size ? ? def put(self, item): ? ? ? ? """ ? ? ? ? 將單個(gè)元素放入隊(duì)列 ? ? ? ? :param item: ? ? ? ? :return: ? ? ? ? """ ? ? ? ? # 隊(duì)列已滿 max_size為0表示無限大 ? ? ? ? if self.max_size != 0 and self.size() >= self.max_size: ? ? ? ? ? ? return ThreadSafeException() ? ? ? ? # 加鎖 ? ? ? ? self.lock.acquire() ? ? ? ? self.queue.append(item) ? ? ? ? self.lock.release() ? ? ? ? self.condition.acquire() ? ? ? ? # 通知等待讀取的線程 ? ? ? ? self.condition.notify() ? ? ? ? self.condition.release() ? ? ? ? return item ? ? def batch_put(self, item_list): ? ? ? ? """ ? ? ? ? 批量添加元素 ? ? ? ? :param item_list: ? ? ? ? :return: ? ? ? ? """ ? ? ? ? if not isinstance(item_list, list): ? ? ? ? ? ? item_list = list(item_list) ? ? ? ? res = [self.put(item) for item in item_list] ? ? ? ? return res ? ? def pop(self, block=False, timeout=0): ? ? ? ? """ ? ? ? ? 從隊(duì)列頭部取出元素 ? ? ? ? :param block: 是否阻塞線程 ? ? ? ? :param timeout: 等待時(shí)間 ? ? ? ? :return: ? ? ? ? """ ? ? ? ? if self.size() == 0: ? ? ? ? ? ? if block: ? ? ? ? ? ? ? ? self.condition.acquire() ? ? ? ? ? ? ? ? self.condition.wait(timeout) ? ? ? ? ? ? ? ? self.condition.release() ? ? ? ? ? ? else: ? ? ? ? ? ? ? ? return None ? ? ? ? # 加鎖 ? ? ? ? self.lock.acquire() ? ? ? ? item = None ? ? ? ? if len(self.queue): ? ? ? ? ? ? item = self.queue.pop() ? ? ? ? self.lock.release() ? ? ? ? return item ? ? def get(self, index): ? ? ? ? """ ? ? ? ? 獲取指定位置的元素 ? ? ? ? :param index: ? ? ? ? :return: ? ? ? ? """ ? ? ? ? if self.size() == 0 or index >= self.size(): ? ? ? ? ? ? return None ? ? ? ? # 加鎖 ? ? ? ? self.lock.acquire() ? ? ? ? item = self.queue[index] ? ? ? ? self.lock.release() ? ? ? ? return item class ThreadSafeException(Exception): ? ? pass
三、測試邏輯
3.1、測試阻塞邏輯
def thread_queue_test_1():
? ? thread_queue = ThreadSafeQueue(10)
? ? def producer():
? ? ? ? while True:
? ? ? ? ? ? thread_queue.put(random.randint(0, 10))
? ? ? ? ? ? time.sleep(2)
? ? def consumer():
? ? ? ? while True:
? ? ? ? ? ? print('current time before pop is %d' % time.time())
? ? ? ? ? ? item = thread_queue.pop(block=True, timeout=3)
? ? ? ? ? ? # item = thread_queue.get(2)
? ? ? ? ? ? if item is not None:
? ? ? ? ? ? ? ? print('get value from queue is %s' % item)
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? print(item)
? ? ? ? ? ? print('current time after pop is %d' % time.time())
? ? t1 = threading.Thread(target=producer)
? ? t2 = threading.Thread(target=consumer)
? ? t1.start()
? ? t2.start()
? ? t1.join()
? ? t2.join()測試結(jié)果:
我們可以看到生產(chǎn)者線程每隔2s向隊(duì)列寫入一個(gè)元素,消費(fèi)者線程當(dāng)無數(shù)據(jù)時(shí)默認(rèn)阻塞3s。通過執(zhí)行時(shí)間發(fā)現(xiàn)消費(fèi)者線程確實(shí)發(fā)生了阻塞,當(dāng)生產(chǎn)者寫入數(shù)據(jù)時(shí)結(jié)束當(dāng)前等待操作。

3.2、測試讀寫加鎖邏輯
def thread_queue_test_2():
? ? thread_queue = ThreadSafeQueue(10)
? ? def producer():
? ? ? ? while True:
? ? ? ? ? ? thread_queue.put(random.randint(0, 10))
? ? ? ? ? ? time.sleep(2)
? ? def consumer(name):
? ? ? ? while True:
? ? ? ? ? ? item = thread_queue.pop(block=True, timeout=1)
? ? ? ? ? ? # item = thread_queue.get(2)
? ? ? ? ? ? if item is not None:
? ? ? ? ? ? ? ? print('%s get value from queue is %s' % (name, item))
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? print('%s get value from queue is None' % name)
? ? t1 = threading.Thread(target=producer)
? ? t2 = threading.Thread(target=consumer, args=('thread1',))
? ? t3 = threading.Thread(target=consumer, args=('thread2',))
? ? t1.start()
? ? t2.start()
? ? t3.start()
? ? t1.join()
? ? t2.join()
? ? t3.join()測試結(jié)果:
生產(chǎn)者還是每2s生成一個(gè)元素寫入隊(duì)列,消費(fèi)者開啟兩個(gè)線程進(jìn)行消費(fèi),默認(rèn)阻塞時(shí)間為1s,打印結(jié)果顯示通過加鎖確保每次只有一個(gè)線程能獲取數(shù)據(jù),保證了線程讀寫的安全。

以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
一個(gè)小示例告訴你Python語言的優(yōu)雅之處
本篇中, 我們展示一下一段非常小的代碼, 這段代碼十分吸引我們, 因?yàn)樗褂檬謨?yōu)雅和直接的方式解決了一個(gè)常見的問題.2014-07-07
Python易忽視知識(shí)點(diǎn)小結(jié)
這篇文章主要介紹了Python易忽視知識(shí)點(diǎn),實(shí)例分析了Python中容易被忽視的常見操作技巧,需要的朋友可以參考下2015-05-05
對(duì)python中raw_input()和input()的用法詳解
下面小編就為大家分享一篇對(duì)python中raw_input()和input()的用法詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-04-04
python初學(xué)者,用python實(shí)現(xiàn)基本的學(xué)生管理系統(tǒng)(python3)代碼實(shí)例
這篇文章主要介紹了用python實(shí)現(xiàn)學(xué)生管理系統(tǒng),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04
Python 獲取div標(biāo)簽中的文字實(shí)例
今天小編就為大家分享一篇Python 獲取div標(biāo)簽中的文字實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-12-12
python [::-1] [::-1,::-1]的具體使用
本文主要介紹了python [::-1] [::-1,::-1]的具體使用,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-05-05
Pytorch pth 格式轉(zhuǎn)ONNX 格式的詳細(xì)過程
PyTorch 訓(xùn)練的模型,需要在Jetson nano 上部署,jetson 原生提供了TensorRT 的支持,所以一個(gè)比較好的方式是把它轉(zhuǎn)換成ONNX 格式,然后在通過ONNX 轉(zhuǎn)換成TensorRT 格式,這篇文章主要介紹了Pytorch pth 格式轉(zhuǎn)ONNX 格式,需要的朋友可以參考下2023-05-05

