Java多線程使用阻塞隊列實現生產者消費者模型詳解
1. 什么是阻塞隊列
在數據結構的學習中,我們知道了隊列有普通隊列、循環(huán)隊列,它們都遵循“先進先出”的原則。
阻塞隊列也遵循這個原則,它是一種特殊的隊列(帶有阻塞功能的隊列),并且滿足以下兩點:
- 當隊列滿的時候,如果繼續(xù)往隊列中插入數據,則發(fā)生阻塞狀態(tài),直到有數據出隊列。
- 當隊列空的時候,如果往外取數據,也發(fā)生阻塞狀態(tài),直到有數序入隊列。
Java 標準庫中的阻塞隊列為:BlockingDeque<>,是一個泛型接口。因此,我們使用的時候直接遵循標準庫的寫法即可。注意以下兩點:
- BlockingDeque 是一個接口,因此我們實例對象時用的是 LinkedBlockingQueue類。
- put 方法用于阻塞式的入隊列, take 用于阻塞式的出隊列。
通過上述介紹,我們可以寫出一段簡易的阻塞隊列代碼:
public static void main(String[] args) throws InterruptedException { //BlockingQueue<>為阻塞隊列的原型 BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(); //take(取元素)、put(插入元素)為阻塞隊列的兩個核心方法 blockingQueue.put(20);//插入元素20 Integer result = blockingQueue.take();//從隊頭取元素 System.out.println(result); }
運行后打?。?/p>
通過上述代碼,大家已經對阻塞隊列有了一個淺的認識,當然你可以可以多 take 幾次來達到阻塞效果。
阻塞隊列主要用于“生產者消費者模型”,是實際開發(fā)中常用到的,下面我就來介紹它的用法。
2. 生產者消費者模型
什么是生產者消費者模型?從字面上來看,前者是生產者,后者是消費者。
因此,生產者與消費者之間進行交互需要一個中間平臺,這個平臺就是阻塞隊列,如果沒有中間平臺交易就會產生一定風險、效率也會降低很多。
生產者消費者體現:過年大家都包餃子,假設一家有三個人員,人員1 搟餃子皮,搟完后放在砧板上,人員2 和 人員3 負責包餃子。這樣一個例子中 人員1 就是生產者,砧板就是平臺,人員2 和 人員3 是消費者。如果三個人員自己搟皮自己包,這樣的效率是非常低的?。ㄖ挥幸粋€搟面杖、無砧板情況下)
中間平臺優(yōu)點體現:假如,有兩個服務器它們直接進行交互。服務器1掛了,緊接著服務器2也掛了。因此,我們需要一個中間平臺(阻塞隊列),連接這兩個服務器并進行交互。這樣無論那一個服務器掛了也不影響另一個服務器。
生產者消費者模型的優(yōu)點有很多,但最突出了有兩點:解耦合和削峰填谷。請看下方講解。
2.1 解耦合
大家都聽過高內聚低耦合這個概念,在此我來做個解釋:
何為內聚,舉個例子:在快遞站拿快遞,我們可以根據貨物號來快速的找到想要的物品,這就是高內聚。
但某一天,快遞站來了個怪人,他在找快遞的過程中把每個拿起來的快遞都隨意放在其他位置。因此別人再去找自己的快遞時就不能快速的找到自己的快遞了,這就是低內聚的一個體現。
在 Java 中高內聚主要體現在代碼的條理性,相關聯的代碼很好的放在一起。低內聚則是相關聯的代碼沒有放在一起,東一塊、西一塊。
何為耦合,主要體現一個關聯性。也是舉個例子:假設我的親人生病住院了,我會放下手中的一切去好好照顧他/她,哪怕對我現實生活影響很大,我也義無反顧。這樣的行為就是高耦合的。
但我的女神生病了,她發(fā)了個朋友圈。由于我和她只是“朋友圈點贊之交”,我只會給她點個贊并且評論句多喝熱水。因為她生病了對我的影響是很低的,所以可以稱為低耦合。
耦合高,在 Java 主要體現在多個模塊之間的關聯,關聯越強耦合越高,關聯越弱耦合越低。
回歸正題,阻塞隊列的解耦合主要體現在多個線程之間進行交互。如以下例子:
在上、下圖中,A、B、C是我們的業(yè)務服務器,會經常更改代碼, 因此會經常出現 bug 就容易掛。通過消費者模型就能很好的避免這個問題。
當然,阻塞隊列服務器也會掛,但相對于ABC業(yè)務服務器來說掛的機率較小。
2.2 削峰填谷
三峽大壩利用的就是削峰填谷機制,有效緩解了電力系統(tǒng)在高峰期的壓力和在低峰期的浪費現象。
當電力系統(tǒng)電力值達到高峰時,三峽大壩則會把部分的水存儲在水庫里面,只放出適合的水流量,減少并調節(jié)電力系統(tǒng)的負荷,有效緩解電力系統(tǒng)在高峰期的浪費現象。
當電力處于低峰期時也就是電力供給不足的情況,三峽大壩會把水庫里存儲的水給放出來,通過電站的發(fā)電量、水庫的排水等措施,緩解了電力系統(tǒng)在低峰期的電力不足。
上述例子就是削峰填谷的一個簡單理解,在 Java 中阻塞隊列就能達到削峰填谷的功能。
當服務器與服務器之間進行交互常常是以一個很平緩的速率進行的,但某一時刻突然達到了一個峰值。
這個時候阻塞隊列就能把峰值帶來的壓力給頂下來,讓服務器之間還是以平穩(wěn)的速率進行交互。
如:服務器A 作為生產者,服務器B 作為消費者,服務器A 最高可達到 1秒3萬 次的速率,服務器B 最高只能 1秒1萬 次這時候就會出現下圖這樣的問題。
上圖中 服務器A 作為生產者、服務器B 作為消費者。當 服務器A 收到的請求多了?;貜徒o阻塞隊列的內容也變多了。
但 服務器B 最多能接受 1秒1萬 次的數據。因此,阻塞隊列就會把多的請求存儲下來并按照 1秒1萬 次的速率給 服務器B 傳輸數據,這樣就不會導致 服務器B 崩潰。
以上的三峽大壩、服務器交互的例子就是對削峰填谷進行的一個講解,當然比較淺顯。具體代碼的實現,請看下方講解。
2.3 生產者消費者案例
生產者消費者主要體現一個線程生產,一個線程消費。如下代碼:
public static void main(String[] args) { BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<>(); //消費者 Thread thread1 = new Thread(()->{ while (true) { try { int value = blockingDeque.take(); System.out.println("消費者: "+value); } catch (InterruptedException e) { e.printStackTrace(); } } }); thread1.start();//啟動線程1 //生產者 Thread thread2 = new Thread(()->{ int value = 1; while (true) { try { blockingDeque.put(value); System.out.println("生產者: "+value); Thread.sleep(1000); value++; } catch (InterruptedException e) { e.printStackTrace(); } } }); thread2.start();//啟動線程2 }
運行后打?。?/p>
以上代碼不難看懂,主要用到阻塞隊列的 take 和 put 方法。生產者 thread2 使用 put 方法生產元素,消費者 thread1 使用 take 方法消費元素。
注意,在線程內調用 take 或put 方法,都得 try/catch InterruptedException 這個異常。我們直接Alt+Enter take 或 put方法即可。
3. 阻塞隊列生產者消費者模型的實現
使用阻塞隊列實現生產者消費者模式過程如下:
首先我們要讓這個隊列循環(huán)下去,如何讓一個隊列循環(huán)下去,最好實現方法就是使用循環(huán)隊列。
設計中我們可以用 head 作為隊頭元素下標、tail 作為隊尾元素下標、size 作為當前元素的個數。
head 等于 tail 的時候證明是初始狀態(tài)(隊列空),或者是隊列已滿。因此,有以下幾點注意事項:
入隊列:
- 當 size 等于隊列長度時,證明隊列已滿,此時不能插入數據。
- 當 tail 等于隊列長度時,tail 置為0,從第一個位置開始插入元素。
出隊列:
- 當 size 等于 0 時,證明隊列已空,此時不能出數據。
- 當 head 等于隊列長度時候,head 置為 0 ,從第一個元素開始出元素。
當然,為了達到阻塞的效果,在隊列滿狀態(tài)或空狀態(tài)的方法里面使用 wait 方法造成阻塞狀態(tài)。在插元素方法里面里面 notify 喚醒隊列空時的阻塞狀態(tài),在拿元素里面 notify 喚醒隊列滿時的阻塞狀態(tài)。
具體代碼實現如下:
class MyBlockingQueue { int [] array = new int[100];//定義一個數組為隊列 int head = 0;//隊頭下標 int tail = 0;//隊尾下標 int size = 0;//元素個數 //模擬實現 put 方法 synchronized public void put(int value) throws InterruptedException { if (size == array.length) { this.wait();//隊列已滿設為阻塞狀態(tài) } array[tail] = value;//把value值放在數組對應下標中 tail++;//隊尾下表自增 size++;//元素個數自增 if (tail == array.length) { tail = 0;//隊尾下標重置為0 } this.notify();//喚醒隊列空的阻塞狀態(tài) } //模擬實現 take 方法 synchronized public int take() throws InterruptedException { if (size == 0){ this.wait();//隊列已空設為阻塞狀態(tài) } int value = array[head];//隊頭元素負責個value head++;//隊頭下標往后自增 size--;//元素個數自減 if (head == array.length) { head = 0;//隊頭下標置為0 } this.notify();//喚醒隊列滿的阻塞狀態(tài) return value;//返回隊頭元素 } } public class ThreadDemo2 { public static void main(String[] args) { MyBlockingQueue myBlockingQueue = new MyBlockingQueue(); //生產者 Thread thread1 = new Thread(()-> { int i = 1; while (true) { try { System.out.println("生產者: "+i); myBlockingQueue.put(i); i++; Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); thread1.start(); //消費者 Thread thread2 = new Thread(()-> { while (true) { try { int i = myBlockingQueue.take(); System.out.println("消費者: "+i); } catch (InterruptedException e) { e.printStackTrace(); } } }); thread2.start(); } }
運行后打印:
以上代碼,我使用一個數組來模擬實現循環(huán)隊列的這樣更容易去理解。其他細節(jié)大家可以在代碼中的注釋進行理解。 隊列已經循環(huán)隊列不太熟悉朋友可以回頭好好復習一下。
注意,一個隊列不可能為空狀態(tài)又為滿狀態(tài),因此在上述代碼中,notify 喚醒的都是對方的狀態(tài)。這樣一個阻塞隊列生產者消費者模式就能很好的實現了。
另外,阻塞隊列不存在線程安全問題,因為阻塞隊列底層有加鎖機制。因此,大家可以安心使用。
到此這篇關于Java多線程使用阻塞隊列實現生產者消費者模型詳解的文章就介紹到這了,更多相關Java多線程生產者消費者模型內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!