Java阻塞隊列中的BlockingQueue接口詳解
BlockingQueue
對于Queue而言,BlockingQueue是主要的線程安全的版本,具有阻塞功能,可以允許添加、刪除元素被阻塞,直到成功為止,BlockingQueue相對于Queue而言增加了兩個方法put、take元素。
BlockingQueue接口
屬于并發(fā)容器中的接口,在java.util.concurrent包路徑下
- BlockingQueue不接受null元素,加入嘗試通過add、put、offer等添加一個null元素時,某些實現(xiàn)上會拋出nullpointExeception問題。
- BlockingQueue是可以指定容量,如果給定的數(shù)據(jù)超過給定容量,便無法添加元素,如果沒有指定容量約束,最大大小是Interger.MAX_VALUE值
- BlockingQueue實現(xiàn)類主要用于生產(chǎn)者-消費者隊列,另支持Collection接口。
- BlockingQueue實現(xiàn)了線程安全,所有排隊方法都可以使用內(nèi)部鎖或者其他并發(fā)控制形式來達到線程安全的目的。
三個主要實現(xiàn)類介紹:
- ArrayBlockingQueue:有界阻塞隊列
- LinkedBlockingQueue:無界阻塞隊列
- SynchronousQueue: 同步隊列
ArrayBlockingQueue:有界隊列
ArrayBlockingQueue 有界隊列底層實現(xiàn)是數(shù)組,數(shù)組大小是固定的,假如數(shù)組一端為頭,另一端為尾,那么頭和尾構(gòu)建一個FIFO隊列
屬性和默認(rèn)值:
//存儲的數(shù)據(jù) 存放在數(shù)組中 final Object[] items; //讀數(shù)據(jù)位置 int takeIndex; //寫入數(shù)據(jù)位置 int putIndex; //數(shù)據(jù)數(shù)量 int count; //隊列同步相關(guān)屬性 final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull;
通過 ArrayBlockingQueue 數(shù)據(jù)結(jié)構(gòu)可知:首先是有一個數(shù)組 T[], 用來存儲所有的元素,由于 ArrayBlockingQueue 最終設(shè)置為一個不可擴展大小的 Queue ,所以這里items就是初始化就固定大小的數(shù)組(final),另外有兩個索引,頭索引 takeIndex ,尾索引 putIndex ,一個隊列的大小 count ,要阻塞的話就必須用到一個鎖和兩個條件(非空,非滿),這三個條件都是不可變類型。因為只有一把鎖,所以任意時刻對隊列只能有一個線程,意味著索引和大小的操作都是線程安全的,所以可以看到takeindex等不需要原子操作和volatile語義了。
構(gòu)造函數(shù):
public ArrayBlockingQueue(int capacity) { this(capacity, false); } //通過初始容量和是否公平性搶鎖標(biāo)志來進行實例化 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } //通過初始容量capacity、公平性標(biāo)志fair和集合c public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { //數(shù)據(jù)是不能為null checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
put操作
可阻塞的添加元素
public void put(E e) throws InterruptedException { //檢測插入數(shù)據(jù)不能為null checkNotNull(e); //添加可中斷的鎖 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) //容量滿了需要阻塞 notFull.await(); //當(dāng)前集合未滿,執(zhí)行插入操作 insert(e); } finally { //釋放鎖 lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; //通知take操作已經(jīng)有數(shù)據(jù)嗎,如果有take方法阻塞,此時可被喚醒來執(zhí)行take操作 notEmpty.signal(); } //循環(huán)數(shù)組的特殊標(biāo)志處理 ,如果是到最大值則重定向到0號索引 final int inc(int i) { return (++i == items.length) ? 0 : i; }
插入操作,在隊列滿的情況下會阻塞,直到有數(shù)據(jù)take出隊列時才能結(jié)束阻塞,將當(dāng)前數(shù)據(jù)插入隊列。
take方法
將數(shù)據(jù)從隊列中移除
public E take() throws InterruptedException { //添加可中斷的鎖 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) //隊列中沒有數(shù)據(jù)時,需要阻塞,直到有數(shù)據(jù)put進入隊列通知該操作可以繼續(xù)執(zhí)行 notEmpty.await(); //有數(shù)據(jù)時 return extract(); } finally { //釋放鎖 lock.unlock(); } } private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); items[takeIndex] = null; takeIndex = inc(takeIndex); --count; //發(fā)出通知 通知put方法,喚醒put操作 notFull.signal(); return x; }
ArrayBlockingQueue特點:
1、底層數(shù)據(jù)結(jié)構(gòu)是數(shù)組,且數(shù)組大小一旦確定不可更改
2、不能存儲null
3、阻塞功能是通過一個鎖和兩個隸屬于該鎖的Condition進行通信完成阻塞
LinkedBlockingQueue:無界隊列
LinkedBlockingQueue有兩個lock鎖和兩個Condition以及用于計數(shù)的AtomicInteger底層數(shù)據(jù)結(jié)構(gòu)是鏈表,都是采用頭尾節(jié)點,每個節(jié)點執(zhí)行下一個節(jié)點的結(jié)構(gòu)數(shù)據(jù)存儲在Node結(jié)構(gòu)中。
引入兩把鎖,一個入隊列鎖,一個出隊列的鎖。滿足同時有一個隊列不滿的Condition和一個隊列不空的Condition。
為什么使用兩把鎖,一把鎖是否可以?
一把鎖完全可以的,一把鎖意味著入隊列和出隊列同時只能有一個在進行,另一個必須等待釋放鎖,而從實際實現(xiàn)上來看,head和last是分離的,相互獨立的,入隊列實現(xiàn)是不會修改出隊列的數(shù)據(jù)的,同理,出隊列時也不會修改入隊列的數(shù)據(jù),這兩個操作實際是相互獨立,這個鎖相當(dāng)于兩個寫入鎖,入隊列是一種寫操作,操作head,出隊列是一種寫操作,操作的是tail,這兩是無關(guān)的。
SynchronousQueue:同步隊列
SynchronousQueue 為同步隊列:每個插入操作必須等待另一個線程的移除操作,同樣,任何一個移除操作都要等待另一個線程的插入操作,因此此隊列中其實沒有任何一個數(shù)據(jù),或者說容量為0,SynchronousQueue更像一個管道,不像容器,資源從一個方向快速的傳遞到另一個方向。
隊列對比
- 如果不需要阻塞隊列,優(yōu)先選擇ConcurrentLinkedQueue;
- 如果需要阻塞隊列,隊列大小固定優(yōu)先選擇ArrayBlockingQueue;
- 隊列大小不固定優(yōu)先選擇LinkedBlockingQueue;
- 如果需要對隊列進行排序,選擇PriorityBlockingQueue;
- 如果需要一個快速交換的隊列,選擇SynchronousQueue;
- 如果需要對隊列中的元素進行延時操作,則選擇DelayQueue。
到此這篇關(guān)于Java阻塞隊列中的BlockingQueue接口詳解的文章就介紹到這了,更多相關(guān)Java阻塞隊列BlockingQueue內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
基于springboot和redis實現(xiàn)單點登錄
這篇文章主要為大家詳細(xì)介紹了基于springboot和redis實現(xiàn)單點登錄,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-06-06RabbitMQ,RocketMQ,Kafka?事務(wù)性,消息丟失,消息順序性和消息重復(fù)發(fā)送的處理策略問題
這篇文章主要介紹了RabbitMQ,RocketMQ,Kafka?事務(wù)性,消息丟失,消息順序性和消息重復(fù)發(fā)送的處理策略,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-03-03Java實現(xiàn)限定時間CountDownLatch并行場景
本文將結(jié)合實例代碼,介紹Java實現(xiàn)限定時間CountDownLatch并行場景,文中通過示例代碼介紹的非常詳細(xì),需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-07-07詳解SpringBoot中自定義starter的開發(fā)與使用
starter是SpringBoot中非常重要的一個機制,他是基于約定優(yōu)于配置的思想所衍生出來的,本文主要介紹了SpringBoot中自定義starter的開發(fā)與使用,感興趣的可以了解下2023-09-09linux系統(tǒng)下查看jdk版本、路徑及配置環(huán)境變量
在Linux系統(tǒng)中,配置JDK環(huán)境變量是非常重要的,它可以讓你在終端中直接使用Java命令,這篇文章主要給大家介紹了關(guān)于linux系統(tǒng)下查看jdk版本、路徑及配置環(huán)境變量的相關(guān)資料,需要的朋友可以參考下2024-01-01