Java多線程實現(xiàn)阻塞隊列的示例代碼
1. 阻塞隊列簡介
1.1 阻塞隊列概念
阻塞隊列:是一種特殊的隊列,具有隊列"先進先出"的特性,同時相較于普通隊列,阻塞隊列是線程安全
的,并且?guī)в?code>阻塞功能,表現(xiàn)形式如下:
- 當隊列滿時,繼續(xù)入隊列就會阻塞,直到有其他線程從隊列中取出元素
- 當隊列空時,繼續(xù)出隊列就會阻塞,直到有其他線程往隊列中插入元素
基于阻塞隊列我們可以實現(xiàn)生產(chǎn)者消費者模型
,這在后端開發(fā)場景中是相當重要的!
1.2 生產(chǎn)者-消費者模型優(yōu)勢
基于阻塞隊列實現(xiàn)的 生產(chǎn)者消費者模型 具有以下兩大優(yōu)勢:
- 解耦合:
以搜狗搜索的服務器舉例,用戶輸入搜索關(guān)鍵字 **美容,**客戶端的請求到達搜狗的"入口服務器"時,會將請求轉(zhuǎn)發(fā)到 廣告服務器 和 大搜索服務器,此時廣告服務器返回相關(guān)廣告內(nèi)容,大搜索服務器根據(jù)搜索算法匹配對應結(jié)果返回,如果按照這種方式通信,那么入口服務器需要編寫兩套代碼分別同廣告服務器和大搜索服務器進行交互,并且一個嚴重問題是如果其中廣告服務器宕機了,會導致入口服務器無法正常工作進而影響大搜索服務器也無法正常工作??!
而引入阻塞隊列后,入口服務器不需要知曉廣告服務器和大搜索服務器的存在,只需要往阻塞隊列中發(fā)送請求即可,而廣告服務器和大搜索服務器也不需要知道入口服務器的存在,只需要從阻塞隊列中取出請求處理完畢返回給阻塞隊列即可,并且當其中大搜索服務器宕機時,不影響其他服務器以及入口服務器的正常運作!
- 削峰填谷:
如果沒有阻塞隊列,當遇到一些突發(fā)場景例如"雙十一"大促等客戶請求量激增的時候,入口服務器轉(zhuǎn)發(fā)的請求量增多,壓力就會變大,同理廣告服務器和大搜索服務器處理過程復雜繁多,消耗的硬件資源就會激增,達到硬件瓶頸之后服務器就宕機了(直觀現(xiàn)象就是客戶端發(fā)送請求,服務器不會響應了)
而引入阻塞隊列/消息隊列之后,由于阻塞隊列只負責存儲相應的請求或者響應,無需額外的業(yè)務處理,因此抗壓能力比廣告服務器和大搜索服務器更強,當客戶請求量激增的時候交由阻塞隊列承受,而廣告服務器和大搜索服務器只需要按照特定的速率進行讀取并返回處理結(jié)果即可,就起到了 削峰填谷 的作用!
注意:此處的阻塞隊列在現(xiàn)實場景中并不是一個單純的數(shù)據(jù)結(jié)構(gòu),往往是一個基于阻塞隊列的服務器程序,例如消息隊列(MQ)
2. 標準庫中的阻塞隊列
2.1 基本介紹
Java標準庫提供了現(xiàn)成的阻塞隊列數(shù)據(jù)結(jié)構(gòu)供開發(fā)者使用,即BlockingQueue
接口
BlockingQueue:該接口具有以下實現(xiàn)類:
- ArrayBlockingQueue:基于數(shù)組實現(xiàn)的阻塞隊列
- LinkedBlockingQueue:基于鏈表實現(xiàn)的阻塞隊列
- PriorityBlockingQueue:帶有優(yōu)先級的阻塞隊列
BlockingQueue方法:該接口具有以下常用方法
- 帶有阻塞功能:
put
:向隊列中入元素,隊列滿則阻塞等待take
:向隊列中取出元素,隊列空則阻塞等待
- 不帶有阻塞功能:
peek
:返回隊頭元素(不取出)poll
:返回隊頭元素(取出)offer
:向隊列中插入元素
2.2 代碼示例
/** * 測試Java標準庫提供的阻塞隊列實現(xiàn) */ public class TestStandardBlockingQueue { private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); public static void main(String[] args) { // 生產(chǎn)者 Thread t1 = new Thread(() -> { int i = 0; while (true) { try { queue.put(i); System.out.println("生產(chǎn)數(shù)據(jù):" + i); i++; } catch (InterruptedException e) { throw new RuntimeException(e); } } }); // 消費者 Thread t2 = new Thread(() -> { while (true) { try { Thread.sleep(1000); int ele = queue.take(); System.out.println("消費數(shù)據(jù):" + ele); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t1.start(); t2.start(); } }
運行效果:
我們在主線程中創(chuàng)建了兩個線程,其中t1
線程作為生產(chǎn)者不斷循環(huán)生產(chǎn)元素,而線程t2
作為消費者每隔1s消費一個數(shù)據(jù),所以我們很快看到當生產(chǎn)數(shù)據(jù)個數(shù)達到容量capacity
時就會繼續(xù)生產(chǎn)就會阻塞等待,直到消費者線程消費數(shù)據(jù)后才可以繼續(xù)入隊列,這樣就實現(xiàn)了一個 生產(chǎn)者-消費者模型 !
3. 自定義實現(xiàn)阻塞隊列
首先我們需要明確實現(xiàn)一個阻塞隊列需要哪些步驟?
- 首先我們需要實現(xiàn)一個普通隊列
- 使用鎖機制將普通隊列變成線程安全的
- 通過特殊機制讓該隊列能夠帶有"阻塞"功能
3.1 實現(xiàn)普通隊列
相信大家如果學過 數(shù)據(jù)結(jié)構(gòu)與算法 相關(guān)課程,應該對隊列這種數(shù)據(jù)結(jié)構(gòu)的實現(xiàn)并不陌生!實現(xiàn)隊列有基于數(shù)組的也有基于鏈表的,我們此處采用基于數(shù)組實現(xiàn)的,基于數(shù)組實現(xiàn)的循環(huán)隊列也有以下兩種方式:
- 騰出一個空間用來判斷隊列空或者滿
- 使用額外的變量
size
用來記錄當前元素的個數(shù)
我們使用第二種方式實現(xiàn),實現(xiàn)代碼如下:
/** * 自定義實現(xiàn)阻塞隊列 */ public class MyBlockingQueue { private int head = 0; // 頭指針 private int tail = 0; // 尾指針 private int size = 0; // 當前元素個數(shù) private String[] array = null; private int capacity; // 容量 public MyBlockingQueue(int capacity) { this.capacity = capacity; this.array = new String[capacity]; } /** * 入隊列方法 */ public void put(String elem) { if (size == capacity) { // 隊列已經(jīng)滿了 return; } array[tail] = elem; tail++; if (tail >= capacity) { tail = 0; } size++; } /** * 出隊列方法 */ public String take() { // 判斷隊列是否為空 if (size == 0) { return null; } String topElem = array[head]; head++; if (head >= capacity) { head = 0; } size--; return topElem; } public static void main(String[] args) { MyBlockingQueue queue = new MyBlockingQueue(3); queue.put("11"); queue.put("22"); queue.put("33"); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); } }
3.2 引入鎖機制實現(xiàn)線程安全
引入synchronized
關(guān)鍵字在原有隊列實現(xiàn)的基礎上實現(xiàn)線程安全,代碼如下:
/** * 自定義實現(xiàn)阻塞隊列 */ public class MyBlockingQueue { private int head = 0; // 頭指針 private int tail = 0; // 尾指針 private int size = 0; // 當前元素個數(shù) private String[] array = null; private int capacity; // 容量 private Object locker = new Object(); // 鎖對象 public MyBlockingQueue(int capacity) { this.capacity = capacity; this.array = new String[capacity]; } /** * 入隊列方法 */ public void put(String elem) { synchronized (locker) { if (size == capacity) { // 隊列已經(jīng)滿了 return; } array[tail] = elem; tail++; if (tail >= capacity) { tail = 0; } size++; } } /** * 出隊列方法 */ public String take() { String topElem = ""; synchronized (locker) { // 判斷隊列是否為空 if (size == 0) { return null; } topElem = array[head]; head++; if (head >= capacity) { head = 0; } size--; } return topElem; } public static void main(String[] args) { MyBlockingQueue queue = new MyBlockingQueue(3); queue.put("11"); queue.put("22"); queue.put("33"); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); } }
我們在put
、take
等關(guān)鍵方法上將 多個線程修改同一個變量 部分的操作進行加鎖處理,實現(xiàn)線程安全!
3.3 加入阻塞功能
在普通隊列的實現(xiàn)中,如果隊列滿或者空我們直接使用return
關(guān)鍵字返回,但是在多線程環(huán)境下我們希望實現(xiàn)阻塞等待的功能,這就可以使用Object類提供的wait/notify
這組方法實現(xiàn)阻塞與喚醒機制了!我們就需要考慮阻塞與喚醒的時機了!
何時阻塞:這個問題非常簡單,當隊列滿時入隊列操作就應該阻塞等待,而當隊列為空時出隊列操作就需要阻塞等待
何時喚醒:想必大家都可以想到,對于入隊列操作來說,只要隊列不滿就可以被喚醒,而對于出隊列操作來說,隊列不為空就可以被喚醒,因此,只要有線程調(diào)用take
操作出隊列,那么入隊列的線程就可以被喚醒,而只要有線程調(diào)用put
操作入隊列,那么出隊列的線程就可以被喚醒
/** * 自定義實現(xiàn)阻塞隊列 */ public class MyBlockingQueue { private int head = 0; // 頭指針 private int tail = 0; // 尾指針 private int size = 0; // 當前元素個數(shù) private String[] array = null; private int capacity; // 容量 private Object locker = new Object(); // 鎖對象 public MyBlockingQueue(int capacity) { this.capacity = capacity; this.array = new String[capacity]; } /** * 入隊列方法 */ public void put(String elem) { synchronized (locker) { while (size == capacity) { // 隊列已經(jīng)滿了(進行阻塞) try { locker.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } array[tail] = elem; tail++; if (tail >= capacity) { tail = 0; } size++; locker.notifyAll(); } } /** * 出隊列方法 */ public String take() { String topElem = ""; synchronized (locker) { // 判斷隊列是否為空 while (size == 0) { try { locker.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } topElem = array[head]; head++; if (head >= capacity) { head = 0; } size--; locker.notifyAll(); } return topElem; } public static void main(String[] args) { MyBlockingQueue queue = new MyBlockingQueue(10); // 生產(chǎn)者 Thread producer = new Thread(() -> { int i = 0; while (true) { queue.put(i + ""); System.out.println("生產(chǎn)元素:" + i); i++; } }); // 消費者 Thread consumer = new Thread(() -> { while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } String elem = queue.take(); System.out.println("消費元素" + elem); } }); producer.start(); consumer.start(); } }
我們使用wait/notify
這組操作實現(xiàn)了阻塞/喚醒功能,并且滿足必須使用在synchronized
關(guān)鍵字內(nèi)部的使用條件,這里有一個注意點
為什么我們將if判斷條件改成了while循環(huán)呢???這是需要考慮清楚的!
如圖所示:一開始由于隊列滿所以生產(chǎn)者1進入阻塞狀態(tài),釋放鎖,然后生產(chǎn)者2也進入阻塞狀態(tài)釋放鎖,此時消費者消費一個元素后喚醒生產(chǎn)者1,然后生產(chǎn)者1生產(chǎn)一個元素后(記住此時隊列已滿)繼續(xù)喚醒,但是此時喚醒的恰恰是 生產(chǎn)者2 ,生產(chǎn)者2繼續(xù)執(zhí)行生產(chǎn)元素,于是就出現(xiàn)問題,我們總結(jié)一下出現(xiàn)問題的原因:
notifyAll
是隨機喚醒,無法指定喚醒線程,因此可能出現(xiàn)生產(chǎn)者喚醒生產(chǎn)者,消費者喚醒消費者的情況if
判定條件一經(jīng)執(zhí)行就無法繼續(xù)判定,所以生產(chǎn)者2被喚醒后沒有再次判斷當前隊列是否滿
于是我們的應對策略就是使用while
循環(huán),當線程被喚醒使重新判斷,如果隊列仍滿,入隊列操作繼續(xù)阻塞,而隊列仍空,出隊列操作繼續(xù)阻塞!Java標準也推薦我們使用 while 關(guān)鍵字和 wait 關(guān)鍵字一起使用!
4. 應用場景(實現(xiàn)生產(chǎn)者消費者模型)
我們繼續(xù)基于我們自定義實現(xiàn)的阻塞隊列再來實現(xiàn) 生產(chǎn)者-消費者模型代碼示例(主函數(shù)):
public static void main(String[] args) { MyBlockingQueue queue = new MyBlockingQueue(10); // 生產(chǎn)者 Thread producer = new Thread(() -> { int i = 0; while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } queue.put(i + ""); System.out.println("生產(chǎn)元素:" + i); i++; } }); // 消費者 Thread consumer = new Thread(() -> { while (true) { String elem = queue.take(); System.out.println("消費元素" + elem); } }); producer.start(); consumer.start(); }
運行效果:
此時我們創(chuàng)建兩個兩個線程,producer
作為生產(chǎn)者線程每隔1s生產(chǎn)一個元素,consumer
作為消費者線程不斷消費元素,此時我們看到的就是消費者消費很快,當阻塞隊列空時就進入阻塞狀態(tài),直到生產(chǎn)者線程生產(chǎn)元素后才被喚醒繼續(xù)執(zhí)行!此時我們真正模擬實現(xiàn)了 阻塞隊列 這樣的數(shù)據(jù)結(jié)構(gòu)!
到此這篇關(guān)于Java多線程實現(xiàn)阻塞隊列的示例代碼的文章就介紹到這了,更多相關(guān)Java 阻塞隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中的HashSet詳解和使用示例_動力節(jié)點Java學院整理
HashSet 是一個沒有重復元素的集合。接下來通過實例代碼給大家介紹java中的hashset相關(guān)知識,感興趣的朋友一起看看吧2017-05-05Java中forEach使用lambda表達式,數(shù)組和集合的區(qū)別說明
這篇文章主要介紹了Java中forEach使用lambda表達式,數(shù)組和集合的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07java中FileOutputStream中文亂碼問題解決辦法
這篇文章主要介紹了java中FileOutputStream中文亂碼問題解決辦法的相關(guān)資料,需要的朋友可以參考下2017-04-04