JavaEE多線程中阻塞隊(duì)列的項(xiàng)目實(shí)踐
1. 前言
阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列。這兩個(gè)附加的操作是:
- 在隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强?/li>
- 當(dāng)隊(duì)列滿時(shí),存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用
阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器,而消費(fèi)者也只從容器里拿元素。
2. 什么是生產(chǎn)者-消費(fèi)者模型
生產(chǎn)者消費(fèi)者模型是一種多線程并發(fā)協(xié)作的模型,由兩類線程和一個(gè)緩沖區(qū)組成:生產(chǎn)者線程生產(chǎn)數(shù)據(jù)并把數(shù)據(jù)放在緩沖區(qū),消費(fèi)者線程從緩沖區(qū)取數(shù)據(jù)并消費(fèi)。生產(chǎn)者和消費(fèi)者在同一時(shí)間段內(nèi)共用同一個(gè)存儲(chǔ)空間,生產(chǎn)者往存儲(chǔ)空間中添加產(chǎn)品,消費(fèi)者從存儲(chǔ)空間中取走產(chǎn)品。當(dāng)存儲(chǔ)空間為空時(shí),消費(fèi)者阻塞;當(dāng)存儲(chǔ)空間滿時(shí),生產(chǎn)者阻塞。
3. 生產(chǎn)者-消費(fèi)者模型的意義
- 解耦合
- 削峰填谷
3.1 解耦合
兩個(gè)模塊的聯(lián)系越緊密,耦合度就越高,耦合度越高就意味著兩個(gè)模塊的影響程度很大,當(dāng)一個(gè)模塊出現(xiàn)問(wèn)題的時(shí)候,另一個(gè)模塊也會(huì)受到影響而導(dǎo)致出現(xiàn)問(wèn)題,特別是對(duì)分布式系統(tǒng)來(lái)說(shuō):解耦合是非常重要的。
假設(shè)上面是一個(gè)簡(jiǎn)單的分布式系統(tǒng),服務(wù)器 A 和服務(wù)器 B 之間直接進(jìn)行交互(服務(wù)器 A 向服務(wù)器 B 發(fā)送請(qǐng)求并接收服務(wù)器 B 返回的信息,服務(wù)器 B 向服務(wù)器 A 發(fā)送請(qǐng)求,以及接收服務(wù)器 A 返回的信息),服務(wù)器 A 和服務(wù)器 B 之間的耦合度比較高,當(dāng)兩個(gè)服務(wù)器之間的一個(gè)發(fā)生故障的時(shí)候就會(huì)導(dǎo)致兩個(gè)服務(wù)器都無(wú)法使用。
不僅如此,當(dāng)我們想要再添加一個(gè)服務(wù)器 C 與服務(wù)器 A 之間進(jìn)行交互的時(shí)候,不僅需要對(duì)服務(wù)器 C 做出修改,還需要對(duì)服務(wù)器 A 作出修改。
相比上面的情況,如果我們使用生產(chǎn)者-消費(fèi)者模型的話就可以解決上面的耦合度過(guò)高的問(wèn)題。
服務(wù)器 A 接收到客戶端發(fā)來(lái)的請(qǐng)求不是直接發(fā)送給服務(wù)器 B ,而是將接收到的請(qǐng)求加入到阻塞隊(duì)列中,然后服務(wù)器 B 從阻塞隊(duì)列中獲取到請(qǐng)求,這樣就避免了兩個(gè)服務(wù)器之間進(jìn)行直接的交互,降低了耦合性;不僅如此,當(dāng)我們需要額外添加一個(gè)服務(wù)器 C 的時(shí)候,就不需要對(duì)服務(wù)器 A 做出修改,而是直接從阻塞隊(duì)列獲取請(qǐng)求信息。
3.2 削峰填谷
當(dāng)客戶端向服務(wù)器 A 短時(shí)間發(fā)出大量請(qǐng)求信息的話,那么當(dāng)服務(wù)器 A 接收到客戶端發(fā)來(lái)的請(qǐng)求的時(shí)候,就會(huì)立即將收到的所有信息都發(fā)送給服務(wù)器 B ,但是由于雖然服務(wù)器 A 能夠接收的請(qǐng)求量可以很多,但是服務(wù)器 B 卻不能一次接收這么多請(qǐng)求,就會(huì)導(dǎo)致服務(wù)器 B 會(huì)掛掉。
如果使用生產(chǎn)者-消費(fèi)者莫模型的話,當(dāng)客戶端向服務(wù)器 A 短時(shí)間發(fā)送大量請(qǐng)求的話,服務(wù)器 A 不會(huì)將請(qǐng)求發(fā)送給 B ,而是發(fā)送給阻塞隊(duì)列中,當(dāng)阻塞隊(duì)列滿了的時(shí)候,服務(wù)器 A 就會(huì)停止向阻塞隊(duì)列中發(fā)送請(qǐng)求,陷入阻塞狀態(tài),等服務(wù)器 B 向阻塞隊(duì)列中受到請(qǐng)求使得阻塞隊(duì)列容量減少的時(shí)候,服務(wù)器 A 才會(huì)繼續(xù)向阻塞隊(duì)列中發(fā)送收到的請(qǐng)求,這樣就避免了服務(wù)器 B 短時(shí)間內(nèi)受到大量的請(qǐng)求而掛掉的情況;如果阻塞隊(duì)列中收到的請(qǐng)求信息被讀取完的時(shí)候,服務(wù)器 B 就會(huì)停止從阻塞隊(duì)列中讀取請(qǐng)求,進(jìn)入阻塞狀態(tài),直到服務(wù)器 A 向阻塞隊(duì)列中發(fā)送請(qǐng)求。
4. 如何使用Java標(biāo)準(zhǔn)庫(kù)提供的阻塞隊(duì)列
當(dāng)知道了生產(chǎn)者-消費(fèi)者模型的意義之后,我們就來(lái)看看如何使用阻塞隊(duì)列。在Java標(biāo)準(zhǔn)庫(kù)中提供了阻塞隊(duì)列 BlockingQueue 可以直接使用。
因?yàn)?BlockingQueu 是一個(gè)接口,無(wú)法實(shí)例化,所以需要?jiǎng)?chuàng)建出實(shí)現(xiàn)了 BlockingQueue 接口的類,而 ArrayBlockingQueue 和 LinkedBlockingQueue 實(shí)現(xiàn)了這個(gè)接口。
我們還可以觀察到,BlockingQueue 還繼承了 Queue ,也就是說(shuō)我們也可以使用 Queue 中的方法,比如 offer 和 poll 等,但是在阻塞隊(duì)列中不使用這兩個(gè)方法,因?yàn)檫@兩個(gè)方法不具有阻塞特性,而是使用 put 和 take 方法。
public class Demo1 { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> queue = new LinkedBlockingQueue<>(); queue.put("123"); queue.put("234"); queue.put("345"); queue.put("456"); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); } }
這里向阻塞隊(duì)列中加入了四個(gè)數(shù)據(jù),但是讀取的時(shí)候讀取了五次,所以看到線程進(jìn)入了阻塞狀態(tài)。
5. 自己實(shí)現(xiàn)一個(gè)阻塞隊(duì)列
阻塞隊(duì)列是建立在隊(duì)列的基礎(chǔ)上的,所以要想實(shí)現(xiàn)一個(gè)阻塞隊(duì)列,首先需要實(shí)現(xiàn)出來(lái)一個(gè)隊(duì)列,那么就先來(lái)看看如何實(shí)現(xiàn)出一個(gè)循環(huán)隊(duì)列。
5.1 實(shí)現(xiàn)出循環(huán)隊(duì)列
隊(duì)列比較容易實(shí)現(xiàn),但是循環(huán)隊(duì)列該如何實(shí)現(xiàn)呢?當(dāng)數(shù)據(jù)到達(dá)數(shù)組的最后的時(shí)候,將數(shù)組的下標(biāo)修改為0,這樣就可以達(dá)到循環(huán)的目的。
當(dāng) tail == head 的時(shí)候有兩種情況:
- 隊(duì)列中沒有數(shù)據(jù)的時(shí)候
- 隊(duì)列滿了的時(shí)候
為了區(qū)分這兩種情況,我們可以使用兩種方法:
- 浪費(fèi)一個(gè)空間,當(dāng)tail++之后,如果tail + 1 == head,則表示隊(duì)列滿了,將 tail 修改為 0
- 定義一個(gè)size變量來(lái)表示隊(duì)列中有效數(shù)據(jù)的個(gè)數(shù),當(dāng)size == queue.length的時(shí)候,表示隊(duì)列滿了
class MyQueue { private final String[] data = new String[1000]; private int size; private int head = 0; private int tail = 0; public void put(String str) { //當(dāng)添加數(shù)據(jù)的時(shí)候需要判斷隊(duì)列的容量是否已經(jīng)滿了 if(size == data.length) return; data[tail++] = str; size++; if(tail == data.length) tail = 0; } public String take() { //讀取數(shù)據(jù)的時(shí)候需要判斷隊(duì)列是否為空 if(size == 0) return null; String ret = data[head++]; size--; if(head == data.length) head = 0; return ret; } }
5.2 實(shí)現(xiàn)阻塞隊(duì)列
阻塞隊(duì)列就是在隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强?;?dāng)隊(duì)列滿時(shí),存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用。并且因?yàn)樽枞?duì)列運(yùn)用的環(huán)境是多線程,需要考慮到線程安全的問(wèn)題。
5.2.1 加鎖
當(dāng)需要進(jìn)行查詢和修改的操作時(shí),需要對(duì)該操作進(jìn)行加鎖。因?yàn)槲覀兊?put 和 take 基本上都在查詢和修改數(shù)據(jù),所以可以將這兩個(gè)操作直接進(jìn)行加鎖操作。
class MyBlockingQueue { private final String[] data = new String[1000]; private int size; private int head = 0; private int tail = 0; public void put(String str) { synchronized (this) { if(size == data.length) return; data[tail++] = str; size++; if(tail == data.length) tail = 0; } } public String take() { synchronized (this) { if(size == 0) return null; String ret = data[head++]; size--; if(head == data.length) head = 0; return ret; } } }
5.2.2 進(jìn)行阻塞操作
當(dāng)進(jìn)行完加鎖操作之后,我們還需要實(shí)現(xiàn)阻塞的作用,當(dāng)添加數(shù)據(jù)的時(shí)候,如果隊(duì)列中容量滿了的時(shí)候就進(jìn)入阻塞等待狀態(tài),直到進(jìn)行了 take 讀取數(shù)據(jù)操作刪除數(shù)據(jù)的時(shí)候,才停止等待;當(dāng)讀取數(shù)據(jù)的時(shí)候,如果隊(duì)列為空,那么該線程就進(jìn)入阻塞等待狀態(tài),直到進(jìn)行了 put 操作。
class MyBlockingQueue { private final String[] data = new String[1000]; private int size; private int head = 0; private int tail = 0; public void put(String str) throws InterruptedException { synchronized (this) { if(size == data.length) { this.wait(); } data[tail++] = str; size++; if(tail == data.length) tail = 0; //這個(gè) notify 用來(lái)喚醒 take 操作的等待 this.notify(); } } public String take() throws InterruptedException { synchronized (this) { if(size == 0) { this.wait(); } String ret = data[head++]; size--; if(head == data.length) head = 0; //這個(gè) notify 用來(lái)喚醒 put 操作的等待 this.notify(); return ret; } } }
5.2.3 解決因被 interrupt 喚醒 waiting 狀態(tài)的問(wèn)題
當(dāng)使用了 wait 和 notify 對(duì) put 和 take 操作進(jìn)行了阻塞等待和喚醒操作之后,我們還需要注意,難道只有 notify 才會(huì)喚醒 WAITING 狀態(tài)嗎?前面我們學(xué)習(xí)了使用 interrupt 來(lái)終止線程,但是 interrup 還會(huì)喚醒處于 WAITING 狀態(tài)的線程,也就是說(shuō)這里的 WAITING 狀態(tài)的線程不僅可以被 notify 喚醒,還可以被 interrupt 喚醒。
- 當(dāng)線程是因?yàn)?put 操作隊(duì)列滿了的時(shí)候進(jìn)入阻塞等待狀態(tài)的時(shí)候,如果是因?yàn)楸?interrupt 喚醒而不是 take 操作的 notify 喚醒的時(shí)候就意味著此時(shí)隊(duì)列還是滿的,當(dāng)進(jìn)行添加操作的時(shí)候,就會(huì)將有效的數(shù)據(jù)覆蓋掉;
- 當(dāng)線程是因?yàn)?take 操作隊(duì)列為空的時(shí)候進(jìn)入阻塞等待狀態(tài)的時(shí)候,如果是因?yàn)楸?interrupt 喚醒而不是 put 操作的 notify 喚醒的時(shí)候就意味著此時(shí)隊(duì)列還是空的,如果進(jìn)行刪除操作,并沒有意義。
為了解決 WAITING 狀態(tài)被 interrupt 喚醒而造成的問(wèn)題,當(dāng)線程被喚醒的時(shí)候,需要進(jìn)行判斷 size 是否還等于 0 或者 queue.length,如果還等于,就繼續(xù)進(jìn)入 WAITING 狀態(tài),但是光一次判斷是不夠的,因?yàn)檫€可能是被 interrupt 喚醒的,所以需要進(jìn)行多次判斷,可以用 while 循環(huán)來(lái)解決。
class MyBlockingQueue { private final String[] data = new String[1000]; private int size; private int head = 0; private int tail = 0; public void put(String str) throws InterruptedException { synchronized (this) { while(size == data.length) { this.wait(); } data[tail++] = str; size++; if(tail == data.length) tail = 0; //這個(gè) notify 用來(lái)喚醒 take 操作的等待 this.notify(); } } public String take() throws InterruptedException { synchronized (this) { while(size == 0) { this.wait(); } String ret = data[head++]; size--; if(head == data.length) head = 0; //這個(gè) notify 用來(lái)喚醒 put 操作的等待 this.notify(); return ret; } } }
5.2.4 解決因指令重排序造成的問(wèn)題
因?yàn)?put 和 take 操作要進(jìn)行讀和寫的操作,可能會(huì)因?yàn)橹噶钪嘏判虻膯?wèn)題造成其他問(wèn)題,這里就需要使用 volatile 解決指令重排序問(wèn)題。
class MyBlockingQueue { private final String[] data = new String[1000]; private volatile int size; private volatile int head = 0; private volatile int tail = 0; public void put(String str) throws InterruptedException { synchronized (this) { while(size == data.length) { this.wait(); } data[tail++] = str; size++; if(tail == data.length) tail = 0; //這個(gè) notify 用來(lái)喚醒 take 操作的等待 this.notify(); } } public String take() throws InterruptedException { synchronized (this) { while(size == 0) { this.wait(); } String ret = data[head++]; size--; if(head == data.length) head = 0; //這個(gè) notify 用來(lái)喚醒 put 操作的等待 this.notify(); return ret; } } }
測(cè)試實(shí)現(xiàn)的阻塞隊(duì)列
public class Demo4 { public static void main(String[] args) { MyBlockingQueue queue = new MyBlockingQueue(); Thread t1 = new Thread(() -> { while(true) { try { System.out.println("消費(fèi)元素" + queue.take()); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); Thread t2 = new Thread(() -> { int count = 1; while(true) { try { queue.put(count + ""); System.out.println("生產(chǎn)元素" + count); count++; Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t1.start(); t2.start(); } }
讓生產(chǎn)速度較慢,使得讀取操作阻塞等待插入數(shù)據(jù)才執(zhí)行。
public class Demo4 { public static void main(String[] args) { MyBlockingQueue queue = new MyBlockingQueue(); Thread t1 = new Thread(() -> { while(true) { try { System.out.println("消費(fèi)元素" + queue.take()); Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); Thread t2 = new Thread(() -> { int count = 1; while(true) { try { queue.put(count + ""); System.out.println("生產(chǎn)元素" + count); count++; } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t1.start(); t2.start(); } }
讓生產(chǎn) put 操作進(jìn)入阻塞等待狀態(tài)。
到此這篇關(guān)于JavaEE多線程中阻塞隊(duì)列的項(xiàng)目實(shí)踐的文章就介紹到這了,更多相關(guān)JavaEE 阻塞隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java的PriorityBlockingQueue優(yōu)先級(jí)阻塞隊(duì)列代碼實(shí)例
- Java中的SynchronousQueue阻塞隊(duì)列使用代碼實(shí)例
- Java中的SynchronousQueue阻塞隊(duì)列及使用場(chǎng)景解析
- java中的BlockingQueue(阻塞隊(duì)列)解析
- Java中的BlockingQueue阻塞隊(duì)列原理以及實(shí)現(xiàn)詳解
- Java的非阻塞隊(duì)列ConcurrentLinkedQueue解讀
- java中阻塞隊(duì)列和非阻塞隊(duì)列的實(shí)現(xiàn)
- Java多線程實(shí)現(xiàn)阻塞隊(duì)列的示例代碼
相關(guān)文章
Rabbitmq延遲隊(duì)列實(shí)現(xiàn)定時(shí)任務(wù)的方法
這篇文章主要介紹了Rabbitmq延遲隊(duì)列實(shí)現(xiàn)定時(shí)任務(wù),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-05-05SpringCloud實(shí)現(xiàn)基于RabbitMQ消息隊(duì)列的詳細(xì)步驟
在Spring Cloud框架中,我們可以利用RabbitMQ實(shí)現(xiàn)強(qiáng)大而可靠的消息隊(duì)列系統(tǒng),本篇將詳細(xì)介紹如何在Spring Cloud項(xiàng)目中集成RabbitMQ,并創(chuàng)建一個(gè)簡(jiǎn)單的消息隊(duì)列,感興趣的朋友一起看看吧2024-03-03基于Spring中的線程池和定時(shí)任務(wù)功能解析
下面小編就為大家?guī)?lái)一篇基于Spring中的線程池和定時(shí)任務(wù)功能解析。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-09-09Spring Boot集成spring-boot-devtools開發(fā)時(shí)實(shí)現(xiàn)熱部署的方式
這篇文章主要介紹了Spring Boot集成spring-boot-devtools開發(fā)時(shí)實(shí)現(xiàn)熱部署的方式,文中還給大家提到了spring boot 實(shí)現(xiàn)熱部署的方式及集成注意事項(xiàng),感興趣的朋友跟隨腳本之家小編一起學(xué)習(xí)吧2018-05-05使用Filter過(guò)濾器中訪問(wèn)getSession()要轉(zhuǎn)化
這篇文章主要介紹了使用Filter過(guò)濾器中訪問(wèn)getSession()要轉(zhuǎn)化,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01還在用if(obj!=null)做非空判斷,帶你快速上手Optional
這篇文章主要介紹了還在用if(obj!=null)做非空判斷,帶你快速上手Optional,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05JAVA中l(wèi)ist,set,數(shù)組之間的轉(zhuǎn)換詳解
以下是對(duì)JAVA中l(wèi)ist,set,數(shù)組之間的轉(zhuǎn)換進(jìn)行了詳細(xì)的分析介紹,需要的朋友可以過(guò)來(lái)參考下2013-09-09