Java基于阻塞隊列實現(xiàn)生產(chǎn)者消費者模型示例詳解
一、阻塞式隊列
什么是阻塞式隊列(有兩點):
- 第一點:當隊列滿的時候,如果此時入隊列的話就會出現(xiàn)阻塞,直到其它線程從隊列中取走元素為止。
- 第二點:當隊列為空的時候,如果繼續(xù)出隊列,此時就會出現(xiàn)阻塞,一直阻塞到其它線程往隊列中添加元素為止。
二、生產(chǎn)者消費者模型
什么是生產(chǎn)者消費者模型
生產(chǎn)者消費者模型是常見的多線程編程模型,可以用來解決生產(chǎn)者和消費者之間的數(shù)據(jù)交互問題。
阻塞隊列的最主要的一個目的之一就是實現(xiàn)生產(chǎn)者消費者模型(基于阻塞隊列實現(xiàn)),生產(chǎn)者消費主模型是處理多線程問題的一種方式。
生產(chǎn)消費者模型的優(yōu)勢
生產(chǎn)者消費主模型的優(yōu)勢:針對分布式系統(tǒng)有兩個優(yōu)勢,一個是解耦合(耦合我們可以理解為依賴程度)、另一個是削峰填谷。
- 解耦合:生產(chǎn)者和消費主之間通過緩沖區(qū)進行解耦合,而不會對彼此產(chǎn)生直接的依賴,我們通過引入生產(chǎn)者消費者模型(即阻塞隊列)就可以達到解耦合的效果,但是付出的代價就是效率有所降低。
- 削峰填谷:服務(wù)器接收到的來自用戶端的請求數(shù)量可能會因為一些突發(fā)時間而暴增,此時服務(wù)器面臨的壓力就非常大了。我們要知道一臺服務(wù)器承擔的上限是一樣的,不同的服務(wù)器所能承擔的上限又是不同的。(機器的硬件資源(CPU、內(nèi)存、硬盤、網(wǎng)絡(luò)帶寬等等)是有限的,而服務(wù)器每處理一個請求都需要消耗一定的資源,請求足夠多直到機器的硬件資源招架不住的時候服務(wù)器也就掛了)通過引入生產(chǎn)消費者模型(即阻塞隊列)就可以起到一個緩沖的作用,其中阻塞隊列就承擔了服務(wù)器的一部分壓力,然后當峰值消退的時候,服務(wù)器接收到的請求就相對較少了,此時服務(wù)器由于阻塞隊列的原因依然可以按照既定的順序處理請求。
阻塞隊列只是一個數(shù)據(jù)結(jié)構(gòu),如果我們把這個數(shù)據(jù)結(jié)構(gòu)單獨實現(xiàn)稱了一個服務(wù)器程序,并且使用單獨的主機或者主機群來進行部署的話,此時阻塞式隊列就進化成了消息隊列。而在Java標準庫中已經(jīng)實現(xiàn)了阻塞隊列,并且實現(xiàn)了三種阻塞隊列的實現(xiàn)方式:
三、生產(chǎn)者消費者舉例代碼
生產(chǎn)消費者模型代碼如下(基于阻塞式隊列):
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; // 生產(chǎn)消費者模型——阻塞隊列 public class Demo20 { public static void main(String[] args) { // 創(chuàng)建一個阻塞隊列來作為交易場所 BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10); Thread t1 = new Thread(() -> { int count = 0; while(true) { try { queue.put(count); System.out.println("生產(chǎn)元素:" + count); count++; Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread t2 = new Thread(() -> { while(true) { while(true) { try { Integer n = queue.take(); System.out.println("消費元素:" + n); } catch (InterruptedException e) { e.printStackTrace(); } } } }); t1.start(); t2.start(); } }
代碼運行結(jié)果如下:
四、基于阻塞式隊列實現(xiàn)生產(chǎn)者消費者模型
現(xiàn)在,我們自己來基于循環(huán)隊列來實現(xiàn)阻塞式隊列。注意我們這里實現(xiàn)的阻塞隊列是基于數(shù)組、基于循環(huán)隊列的阻塞隊列。
我們在實現(xiàn)阻塞隊列的時候有以下幾點需要注意:
- 線程安全問題:需要給put方法和take()方法進行加鎖操作。
- 經(jīng)過加鎖之后還需要考慮到內(nèi)存可見性問題,這里就涉及到volatile關(guān)鍵字的使用。
- 阻塞狀態(tài)以及阻塞狀態(tài)的解除時機要把握好(即wait()方法和notify()方法的使用)。
- wait()方法不一定是被notify()方法喚醒的,還有可能是被interrupt()方法喚醒的:如果interrupt方法是按照try catch的形式來進行編寫的,一旦interrupt方法喚醒wait方法,接著執(zhí)行完catch之后,代碼并不會結(jié)束而是繼續(xù)往后執(zhí)行,此時就會出現(xiàn)覆蓋元素的問題。(解決方法,使用while循環(huán)不斷等待和檢查條件。如果不使用 while 循環(huán)在狀態(tài)被滿足之前不斷地等待和檢查條件,就有可能在 wait 方法返回之后仍然不能安全地進行操作,這可能導(dǎo)致程序出現(xiàn)異常和錯誤。)強烈建議使用wait方法的時候搭配while循環(huán)來判定條件
代碼如下:
class MyBlockQueue { // 使用string類型的數(shù)組來保存元素,我們假設(shè)這里只存string private String[] items = new String[1000]; //head表示指向隊列的頭部 volatile private int head = 0; volatile private int tail = 0; volatile private int size = 0; // size表示元素個數(shù) private Object locker = new Object(); public void put(String elem) throws InterruptedException { synchronized(locker) { while(size >= items.length) { //隊列已滿 locker.wait(); //return; } items[tail] = elem; tail++; if(tail >= items.length) { tail = 0; } //tail++和下面的if判斷可以替換成tail = (tail + 1) % (items.length) //但是站在CPU的角度來看,其實還是簡單的if判斷比較快 size++; locker.notify(); // 用來喚醒隊列為空的阻塞情況 } } //出隊列 public String take() throws InterruptedException { synchronized(locker) { while(size == 0) { locker.wait(); } String elem = items[head]; head++; if(head >= items.length) { head = 0; } size--; //使用notify來喚醒隊列阻塞滿的情況 locker.notify(); return elem; } } } public class Demo21 { public static void main(String[] args) { // 創(chuàng)建兩個線程分別表示消費者和生產(chǎn)者 MyBlockQueue queue = new MyBlockQueue(); Thread t1 = new Thread(() -> { int count = 0; while(true) { try { queue.put(count + ""); System.out.println("生產(chǎn)元素: " + count); count++; } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread t2 = new Thread(() -> { while(true) { try { String count = queue.take(); System.out.println("消費元素: " + count); Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t1.start(); t2.start(); } }
以上就是Java基于阻塞隊列實現(xiàn)生產(chǎn)者消費者模型示例詳解的詳細內(nèi)容,更多關(guān)于Java阻塞隊列的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java多態(tài)(動力節(jié)點Java學(xué)院整理)
多態(tài)是指允許不同類的對象對同一消息做出響應(yīng)。即同一消息可以根據(jù)發(fā)送對象的不同而采用多種不同的行為方式。接下來通過本文給大家介紹java多態(tài)相關(guān)知識,感興趣的朋友一起學(xué)習(xí)吧2017-04-04Java自帶定時任務(wù)ScheduledThreadPoolExecutor實現(xiàn)定時器和延時加載功能
今天小編就為大家分享一篇關(guān)于Java自帶定時任務(wù)ScheduledThreadPoolExecutor實現(xiàn)定時器和延時加載功能,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-12-12SpringMvc靜態(tài)資源訪問實現(xiàn)方法代碼實例
這篇文章主要介紹了SpringMvc靜態(tài)資源訪問實現(xiàn)方法代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-08-08SpringBoot應(yīng)用部署于外置Tomcat容器的方法
這篇文章主要介紹了SpringBoot應(yīng)用部署于外置Tomcat容器的方法,本文分步驟給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2018-06-06