用Python的線程來解決生產者消費問題的示例
我們將使用Python線程來解決Python中的生產者—消費者問題。這個問題完全不像他們在學校中說的那么難。
如果你對生產者—消費者問題有了解,看這篇博客會更有意義。
為什么要關心生產者—消費者問題:
- 可以幫你更好地理解并發(fā)和不同概念的并發(fā)。
- 信息隊列中的實現中,一定程度上使用了生產者—消費者問題的概念,而你某些時候必然會用到消息隊列。
當我們在使用線程時,你可以學習以下的線程概念:
- Condition:線程中的條件。
- wait():在條件實例中可用的wait()。
- notify() :在條件實例中可用的notify()。
我假設你已經有這些基本概念:線程、競態(tài)條件,以及如何解決靜態(tài)條件(例如使用lock)。否則的話,你建議你去看我上一篇文章basics of Threads。
引用維基百科:
生產者的工作是產生一塊數據,放到buffer中,如此循環(huán)。與此同時,消費者在消耗這些數據(例如從buffer中把它們移除),每次一塊。
這里的關鍵詞是“同時”。所以生產者和消費者是并發(fā)運行的,我們需要對生產者和消費者做線程分離。
from threading import Thread class ProducerThread(Thread): def run(self): pass class ConsumerThread(Thread): def run(self): pass
再次引用維基百科:
這個為描述了兩個共享固定大小緩沖隊列的進程,即生產者和消費者。
假設我們有一個全局變量,可以被生產者和消費者線程修改。生產者產生數據并把它加入到隊列。消費者消耗這些數據(例如把它移出)。
queue = []
在剛開始,我們不會設置固定大小的條件,而在實際運行時加入(指下述例子)。
一開始帶bug的程序:
from threading import Thread, Lock import time import random queue = [] lock = Lock() class ProducerThread(Thread): def run(self): nums = range(5) #Will create the list [0, 1, 2, 3, 4] global queue while True: num = random.choice(nums) #Selects a random number from list [0, 1, 2, 3, 4] lock.acquire() queue.append(num) print "Produced", num lock.release() time.sleep(random.random()) class ConsumerThread(Thread): def run(self): global queue while True: lock.acquire() if not queue: print "Nothing in queue, but consumer will try to consume" num = queue.pop(0) print "Consumed", num lock.release() time.sleep(random.random()) ProducerThread().start() ConsumerThread().start()
運行幾次并留意一下結果。如果程序在IndexError異常后并沒有自動結束,用Ctrl+Z結束運行。
樣例輸出:
Produced 3 Consumed 3 Produced 4 Consumed 4 Produced 1 Consumed 1 Nothing in queue, but consumer will try to consume Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner self.run() File "producer_consumer.py", line 31, in run num = queue.pop(0) IndexError: pop from empty list
解釋:
- 我們開始了一個生產者線程(下稱生產者)和一個消費者線程(下稱消費者)。
- 生產者不停地添加(數據)到隊列,而消費者不停地消耗。
- 由于隊列是一個共享變量,我們把它放到lock程序塊內,以防發(fā)生競態(tài)條件。
- 在某一時間點,消費者把所有東西消耗完畢而生產者還在掛起(sleep)。消費者嘗試繼續(xù)進行消耗,但此時隊列為空,出現IndexError異常。
- 在每次運行過程中,在發(fā)生IndexError異常之前,你會看到print語句輸出”Nothing in queue, but consumer will try to consume”,這是你出錯的原因。
我們把這個實現作為錯誤行為(wrong behavior)。
什么是正確行為?
當隊列中沒有任何數據的時候,消費者應該停止運行并等待(wait),而不是繼續(xù)嘗試進行消耗。而當生產者在隊列中加入數據之后,應該有一個渠道去告訴(notify)消費者。然后消費者可以再次從隊列中進行消耗,而IndexError不再出現。
關于條件
條件(condition)可以讓一個或多個線程進入wait,直到被其他線程notify。參考:?http://docs.python.org/2/library/threading.html#condition-objects
這就是我們所需要的。我們希望消費者在隊列為空的時候wait,只有在被生產者notify后恢復。生產者只有在往隊列中加入數據后進行notify。因此在生產者notify后,可以確保隊列非空,因此消費者消費時不會出現異常。
- condition內含lock。
- condition有acquire()和release()方法,用以調用內部的lock的對應方法。
condition的acquire()和release()方法內部調用了lock的acquire()和release()。所以我們可以用condiction實例取代lock實例,但lock的行為不會改變。
生產者和消費者需要使用同一個condition實例, 保證wait和notify正常工作。
重寫消費者代碼:
from threading import Condition condition = Condition() class ConsumerThread(Thread): def run(self): global queue while True: condition.acquire() if not queue: print "Nothing in queue, consumer is waiting" condition.wait() print "Producer added something to queue and notified the consumer" num = queue.pop(0) print "Consumed", num condition.release() time.sleep(random.random())
重寫生產者代碼:
class ProducerThread(Thread): def run(self): nums = range(5) global queue while True: condition.acquire() num = random.choice(nums) queue.append(num) print "Produced", num condition.notify() condition.release() time.sleep(random.random())
樣例輸出:
Produced 3 Consumed 3 Produced 1 Consumed 1 Produced 4 Consumed 4 Produced 3 Consumed 3 Nothing in queue, consumer is waiting Produced 2 Producer added something to queue and notified the consumer Consumed 2 Nothing in queue, consumer is waiting Produced 2 Producer added something to queue and notified the consumer Consumed 2 Nothing in queue, consumer is waiting Produced 3 Producer added something to queue and notified the consumer Consumed 3 Produced 4 Consumed 4 Produced 1 Consumed 1
解釋:
- 對于消費者,在消費前檢查隊列是否為空。
- 如果為空,調用condition實例的wait()方法。
- 消費者進入wait(),同時釋放所持有的lock。
- 除非被notify,否則它不會運行。
- 生產者可以acquire這個lock,因為它已經被消費者release。
- 當調用了condition的notify()方法后,消費者被喚醒,但喚醒不意味著它可以開始運行。
- notify()并不釋放lock,調用notify()后,lock依然被生產者所持有。
- 生產者通過condition.release()顯式釋放lock。
- 消費者再次開始運行,現在它可以得到隊列中的數據而不會出現IndexError異常。
為隊列增加大小限制
生產者不能向一個滿隊列繼續(xù)加入數據。
它可以用以下方式來實現:
- 在加入數據前,生產者檢查隊列是否為滿。
- 如果不為滿,生產者可以繼續(xù)正常流程。
- 如果為滿,生產者必須等待,調用condition實例的wait()。
- 消費者可以運行。消費者消耗隊列,并產生一個空余位置。
- 然后消費者notify生產者。
- 當消費者釋放lock,消費者可以acquire這個lock然后往隊列中加入數據。
最終程序如下:
from threading import Thread, Condition import time import random queue = [] MAX_NUM = 10 condition = Condition() class ProducerThread(Thread): def run(self): nums = range(5) global queue while True: condition.acquire() if len(queue) == MAX_NUM: print "Queue full, producer is waiting" condition.wait() print "Space in queue, Consumer notified the producer" num = random.choice(nums) queue.append(num) print "Produced", num condition.notify() condition.release() time.sleep(random.random()) class ConsumerThread(Thread): def run(self): global queue while True: condition.acquire() if not queue: print "Nothing in queue, consumer is waiting" condition.wait() print "Producer added something to queue and notified the consumer" num = queue.pop(0) print "Consumed", num condition.notify() condition.release() time.sleep(random.random()) ProducerThread().start() ConsumerThread().start()
樣例輸出:
Produced 0 Consumed 0 Produced 0 Produced 4 Consumed 0 Consumed 4 Nothing in queue, consumer is waiting Produced 4 Producer added something to queue and notified the consumer Consumed 4 Produced 3 Produced 2 Consumed 3
更新:
很多網友建議我在lock和condition下使用Queue來代替使用list。我同意這種做法,但我的目的是展示Condition,wait()和notify()如何工作,所以使用了list。
以下用Queue來更新一下代碼。
Queue封裝了Condition的行為,如wait(),notify(),acquire()。
現在不失為一個好機會讀一下Queue的文檔(http://docs.python.org/2/library/queue.html)。
更新程序:
from threading import Thread import time import random from Queue import Queue queue = Queue(10) class ProducerThread(Thread): def run(self): nums = range(5) global queue while True: num = random.choice(nums) queue.put(num) print "Produced", num time.sleep(random.random()) class ConsumerThread(Thread): def run(self): global queue while True: num = queue.get() queue.task_done() print "Consumed", num time.sleep(random.random()) ProducerThread().start() ConsumerThread().start()
解釋:
- 在原來使用list的位置,改為使用Queue實例(下稱隊列)。
- 這個隊列有一個condition,它有自己的lock。如果你使用Queue,你不需要為condition和lock而煩惱。
- 生產者調用隊列的put方法來插入數據。
- put()在插入數據前有一個獲取lock的邏輯。
- 同時,put()也會檢查隊列是否已滿。如果已滿,它會在內部調用wait(),生產者開始等待。
- 消費者使用get方法。
- get()從隊列中移出數據前會獲取lock。
- get()會檢查隊列是否為空,如果為空,消費者進入等待狀態(tài)。
- get()和put()都有適當的notify()?,F在就去看Queue的源碼吧。
相關文章
python GUI庫圖形界面開發(fā)之pyinstaller打包python程序為exe安裝文件
這篇文章主要介紹了python GUI庫圖形界面開發(fā)之pyinstaller打包python程序為exe安裝文件,需要的朋友可以參考下2020-02-02PyCharm專業(yè)最新版2019.1安裝步驟(含激活碼)
這篇文章主要介紹了PyCharm專業(yè)最新版2019.1安裝步驟(含激活碼),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-10-10