JDK數(shù)組阻塞隊(duì)列源碼深入分析總結(jié)
前言
在前面一篇文章從零開始自己動(dòng)手寫阻塞隊(duì)列當(dāng)中我們仔細(xì)介紹了阻塞隊(duì)列提供給我們的功能,以及他的實(shí)現(xiàn)原理,并且基于談到的內(nèi)容我們自己實(shí)現(xiàn)了一個(gè)低配版的數(shù)組阻塞隊(duì)列。在這篇文章當(dāng)中我們將仔細(xì)介紹JDK具體是如何實(shí)現(xiàn)數(shù)組阻塞隊(duì)列的。
阻塞隊(duì)列的功能
而在本篇文章所談到的阻塞隊(duì)列當(dāng)中,是在并發(fā)的情況下使用的,上面所談到的是隊(duì)列是并發(fā)不安全的,但是阻塞隊(duì)列在并發(fā)下情況是安全的。阻塞隊(duì)列的主要的需求如下:
- 隊(duì)列基礎(chǔ)的功能需要有,往隊(duì)列當(dāng)中放數(shù)據(jù),從隊(duì)列當(dāng)中取數(shù)據(jù)。
- 所有的隊(duì)列操作都要是并發(fā)安全的。
- 當(dāng)隊(duì)列滿了之后再往隊(duì)列當(dāng)中放數(shù)據(jù)的時(shí)候,線程需要被掛起,當(dāng)隊(duì)列當(dāng)中的數(shù)據(jù)被取出,讓隊(duì)列當(dāng)中有空間的時(shí)候線程需要被喚醒。
- 當(dāng)隊(duì)列空了之后再往隊(duì)列當(dāng)中取數(shù)據(jù)的時(shí)候,線程需要被掛起,當(dāng)有線程往隊(duì)列當(dāng)中加入數(shù)據(jù)的時(shí)候被掛起的線程需要被喚醒。
- 在我們實(shí)現(xiàn)的隊(duì)列當(dāng)中我們使用數(shù)組去存儲(chǔ)數(shù)據(jù),因此在構(gòu)造函數(shù)當(dāng)中需要提供數(shù)組的初始大小,設(shè)置用多大的數(shù)組。
上面就是數(shù)組阻塞隊(duì)列給我們提供的最核心的功能,其中將線程掛起和喚醒就是阻塞隊(duì)列的核心,掛起和喚醒體現(xiàn)了“阻塞”這一核心思想。
數(shù)組阻塞隊(duì)列設(shè)計(jì)
閱讀這部分內(nèi)容你需要熟悉可重入鎖ReentrantLock和條件變量Condition的使用。
數(shù)組的循環(huán)使用
因?yàn)槲覀兪鞘褂脭?shù)組存儲(chǔ)隊(duì)列當(dāng)中的數(shù)據(jù),從下表為0的位置開始,當(dāng)我們往隊(duì)列當(dāng)中加入一些數(shù)據(jù)之后,隊(duì)列的情況可能如下,其中head表示隊(duì)頭,tail表示隊(duì)尾。

在上圖的基礎(chǔ)之上我們?cè)谶M(jìn)行四次出隊(duì)操作,結(jié)果如下:

在上面的狀態(tài)下,我們繼續(xù)加入8個(gè)數(shù)據(jù),那么布局情況如下:

我們知道上圖在加入數(shù)據(jù)的時(shí)候不僅將數(shù)組后半部分的空間使用完了,而且可以繼續(xù)使用前半部分沒有使用過的空間,也就是說在隊(duì)列內(nèi)部實(shí)現(xiàn)了一個(gè)循環(huán)使用的過程。
字段設(shè)計(jì)
在JDK當(dāng)中數(shù)組阻塞隊(duì)列的實(shí)現(xiàn)是ArrayBlockingQueue類,在他的內(nèi)部是使用數(shù)組實(shí)現(xiàn)的,我們現(xiàn)在來看一下它的主要的字段,為了方便閱讀將所有的解釋說明都寫在的注釋當(dāng)中:
/** The queued items */
final Object[] items; // 這個(gè)就是具體存儲(chǔ)數(shù)據(jù)的數(shù)組
/** items index for next take, poll, peek or remove */
int takeIndex; // 因?yàn)槭顷?duì)列 因此我們需要知道下一個(gè)出隊(duì)的數(shù)據(jù)的下標(biāo) 這個(gè)就是表示下一個(gè)將要出隊(duì)的數(shù)據(jù)的下標(biāo)
/** items index for next put, offer, or add */
int putIndex; // 我們同時(shí)也需要下一個(gè)入隊(duì)的數(shù)據(jù)的下標(biāo)
/** Number of elements in the queue */
int count; // 統(tǒng)計(jì)隊(duì)列當(dāng)中一共有多少個(gè)數(shù)據(jù)
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock; // 因?yàn)樽枞?duì)列是一種可以并發(fā)使用的數(shù)據(jù)結(jié)構(gòu)
/** Condition for waiting takes */
private final Condition notEmpty; // 這個(gè)條件變量主要用于喚醒被 take 函數(shù)阻塞的線程 也就是從隊(duì)列當(dāng)中取數(shù)據(jù)的線程
/** Condition for waiting puts */
private final Condition notFull; // 這個(gè)條件變量主要用于喚醒被 put 函數(shù)阻塞的線程 也就是從隊(duì)列當(dāng)中放數(shù)據(jù)的線程
構(gòu)造函數(shù)
構(gòu)造函數(shù)的主要功能是申請(qǐng)指定大小的內(nèi)存空間,并且對(duì)類的成員變量進(jìn)行賦值操作。
public ArrayBlockingQueue(int capacity) {
// capacity 表示用與存儲(chǔ)數(shù)據(jù)的數(shù)組的長(zhǎng)度
this(capacity, false);
}
// fair 這個(gè)參數(shù)主要是用于說明 是否使用公平鎖
// 如果為 true 表示使用公平鎖 執(zhí)行效率低 但是各個(gè)線程進(jìn)入臨界區(qū)的順序是先來后到的順序 更加公平
// 如果為 false 表示使用非公平鎖 執(zhí)行效率更高
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
// 對(duì)變量進(jìn)行賦值操作
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
put函數(shù)
這個(gè)函數(shù)是阻塞隊(duì)列對(duì)核心的函數(shù)之一了,首先我們需要了解的是,如果一個(gè)線程調(diào)用了這個(gè)函數(shù)往隊(duì)列當(dāng)中加入數(shù)據(jù),如果此時(shí)隊(duì)列已經(jīng)滿了則線程需要被掛起,如果沒有滿則需要將數(shù)據(jù)加入到隊(duì)列當(dāng)中,也就是將數(shù)據(jù)存儲(chǔ)到數(shù)組當(dāng)中。注意還有一個(gè)很重要的一點(diǎn)是,當(dāng)我們往隊(duì)列當(dāng)中加入一個(gè)數(shù)據(jù)之后需要發(fā)一個(gè)信號(hào)給其他被take函數(shù)阻塞的線程,因?yàn)檫@些線程在取數(shù)據(jù)的時(shí)候可能隊(duì)列當(dāng)中已經(jīng)空了,因此需要將這些線程喚醒。
public void put(E e) throws InterruptedException {
checkNotNull(e); // 保證輸入的數(shù)據(jù)不為 null 代碼在下方
final ReentrantLock lock = this.lock;
// 進(jìn)行加鎖操作,因?yàn)橄旅媸桥R界區(qū)
lock.lockInterruptibly();
try {
while (count == items.length) // 如果隊(duì)列已經(jīng)滿了 也就是隊(duì)列當(dāng)中數(shù)據(jù)的個(gè)數(shù) count == 數(shù)組的長(zhǎng)度的話 就需要將線程掛起
notFull.await();
// 當(dāng)隊(duì)列當(dāng)中有空間的之后將數(shù)據(jù)加入到隊(duì)列當(dāng)中 這個(gè)函數(shù)在下面仔細(xì)分析 代碼在下方
enqueue(e);
} finally {
lock.unlock();
}
}
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
// 進(jìn)入這個(gè)函數(shù)的線程已經(jīng)在 put 函數(shù)當(dāng)中加上鎖了 因此這里不需要加鎖
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) // 因?yàn)檫@個(gè)數(shù)據(jù)是循環(huán)使用的 因此可以回到下標(biāo)為0的位置
// 因?yàn)殛?duì)列當(dāng)中的數(shù)據(jù)可以出隊(duì) 因此下標(biāo)為 0 的位置不存在數(shù)據(jù)可以使用
putIndex = 0;
count++;
// 在這里需要將一個(gè)被 take 函數(shù)阻塞的線程喚醒 如果調(diào)用這個(gè)方法的時(shí)候沒有線程阻塞
// 那么調(diào)用這個(gè)方法相當(dāng)于沒有調(diào)用 如果有線程阻塞那么將會(huì)喚醒一個(gè)線程
notEmpty.signal();
}
注意:這里有一個(gè)地方非常容易被忽略,那就是在將線程掛起的時(shí)候使用的是while循環(huán)而不是if條件語句,代碼:
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
這是因?yàn)?,線程被喚醒之后并不會(huì)立馬執(zhí)行,因?yàn)榫€程在調(diào)用await方法的之后會(huì)釋放鎖,他想再次執(zhí)行還需要再次獲得鎖,然后就在他獲取鎖之前的這段時(shí)間里面,可能其他的線程也會(huì)從數(shù)組當(dāng)中放數(shù)據(jù),因此這個(gè)線程執(zhí)行的時(shí)候隊(duì)列可能還是滿的,因此需要再次判斷,否則就會(huì)覆蓋數(shù)據(jù),像這種喚醒之后并沒有滿足線程執(zhí)行條件的現(xiàn)象叫做虛假喚醒,因此大家在寫程序的時(shí)候要格外注意,當(dāng)需要將線程掛起或者喚醒的之后,最好考慮清楚,如果不確定可以使用while替代if,這樣的話更加保險(xiǎn)。
take函數(shù)
這個(gè)函數(shù)主要是從隊(duì)列當(dāng)中取數(shù)據(jù),但是當(dāng)隊(duì)列為空的時(shí)候需要將調(diào)用這個(gè)方法的線程阻塞。當(dāng)隊(duì)列當(dāng)中有數(shù)據(jù)的時(shí)候,就可以從隊(duì)列當(dāng)中取出數(shù)據(jù),但是有一點(diǎn)很重要的就是當(dāng)從隊(duì)列當(dāng)中取出數(shù)據(jù)之后,需要調(diào)用signal方法,用于喚醒被 put 函數(shù)阻塞的線程,因?yàn)閺年?duì)列當(dāng)中取出數(shù)據(jù)了,隊(duì)列肯定已經(jīng)不滿了,因此可以喚醒被 put 函數(shù)阻塞的線程了。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 因?yàn)槿?shù)據(jù)的代碼涉及到數(shù)據(jù)競(jìng)爭(zhēng) 也就是說多個(gè)線程同時(shí)競(jìng)爭(zhēng) 數(shù)組數(shù)據(jù)items 因此需要用鎖保護(hù)起來
lock.lockInterruptibly();
try {
// 當(dāng) count == 0 說明隊(duì)列當(dāng)中沒有數(shù)據(jù)
while (count == 0)
notEmpty.await();
// 當(dāng)隊(duì)列當(dāng)中還有數(shù)據(jù)的時(shí)候可以將數(shù)據(jù)出隊(duì)
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 取出數(shù)據(jù)
E x = (E) items[takeIndex];
items[takeIndex] = null; // 將對(duì)應(yīng)的位置設(shè)置為 null 數(shù)據(jù)就可以被垃圾收集器回收了
if (++takeIndex == items.length)
takeIndex = 0;
count--;
// 迭代器也需要出隊(duì) 如果不了
if (itrs != null)
itrs.elementDequeued();
// 調(diào)用 signal 函數(shù) 將被 put 函數(shù)阻塞的線程喚醒 如果調(diào)用這個(gè)方法的時(shí)候沒有線程阻塞
// 那么調(diào)用這個(gè)方法相當(dāng)于沒有調(diào)用 如果有線程阻塞那么將會(huì)喚醒一個(gè)線程
notFull.signal();
return x;
}
同樣的道理這里也需要使用while循環(huán)去進(jìn)行阻塞,否則可能存在虛假喚醒,可能隊(duì)列當(dāng)中沒有數(shù)據(jù)返回的數(shù)據(jù)為 null,而且會(huì)破壞隊(duì)列的結(jié)構(gòu)因?yàn)闀?huì)涉及隊(duì)列的兩個(gè)端點(diǎn)的值的改變,也就是takeIndex和putIndex的改變。
offer函數(shù)
這個(gè)函數(shù)的作用和put函數(shù)一樣,只不過當(dāng)隊(duì)列滿了的時(shí)候,這個(gè)函數(shù)返回false,加入數(shù)據(jù)成功之后這個(gè)函數(shù)返回true,下面的代碼就比較簡(jiǎn)單了。
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果隊(duì)列當(dāng)中的數(shù)據(jù)個(gè)數(shù)和數(shù)組的長(zhǎng)度相等 說明隊(duì)列滿了 直接返回 false 即可
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
add函數(shù)
這個(gè)函數(shù)和上面兩個(gè)函數(shù)的意義也是一樣的,只不過當(dāng)隊(duì)列滿了之后這個(gè)函數(shù)會(huì)拋出異常。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
poll函數(shù)
這個(gè)函數(shù)和take函數(shù)的作用差不多,但是這個(gè)函數(shù)不會(huì)阻塞,當(dāng)隊(duì)列當(dāng)中沒有數(shù)據(jù)的時(shí)候直接返回null,有數(shù)據(jù)的話返回?cái)?shù)據(jù)。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
總結(jié)
在本篇文章當(dāng)中主要介紹了JDK內(nèi)部是如何實(shí)現(xiàn)ArrayBlockingQueue的,如果你對(duì)鎖和隊(duì)列的使用有一定的了解本篇文章應(yīng)該還是比較容易理解的。在實(shí)現(xiàn)ArrayBlockingQueue當(dāng)中有以下需要注意的點(diǎn):
put函數(shù),如果在往隊(duì)列當(dāng)中加入數(shù)據(jù)的時(shí)候隊(duì)列滿了,則需要將線程掛起。在隊(duì)列當(dāng)中有空間之后,線程被喚醒繼續(xù)執(zhí)行,在往隊(duì)列當(dāng)中加入了數(shù)據(jù)之后,需要調(diào)用signal方法,喚醒被take函數(shù)阻塞的線程。
take函數(shù),如果在往隊(duì)列當(dāng)中取出數(shù)據(jù)的時(shí)候隊(duì)列空了,則需要將線程掛起。在隊(duì)列當(dāng)中有數(shù)據(jù)之后,線程被喚醒繼續(xù)執(zhí)行,在從隊(duì)列當(dāng)中取出數(shù)據(jù)之后,需要調(diào)用signal方法,喚醒被put函數(shù)阻塞的線程。
在調(diào)用await函數(shù)的時(shí)候,需要小心虛假喚醒現(xiàn)象。
到此這篇關(guān)于JDK數(shù)組阻塞隊(duì)列源碼深入分析總結(jié)的文章就介紹到這了,更多相關(guān)JDK數(shù)組阻塞隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot+mybatis-plus 兩種方式打印sql語句的方法
這篇文章主要介紹了springboot+mybatis-plus 兩種方式打印sql語句的方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10
JDK8中的HashMap初始化和擴(kuò)容機(jī)制詳解
這篇文章主要介紹了JDK8中的HashMap初始化和擴(kuò)容機(jī)制,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06
Java UrlRewriter偽靜態(tài)技術(shù)運(yùn)用深入分析
通常我們?yōu)榱烁玫木徑夥?wù)器壓力,和增強(qiáng)搜索引擎的友好面,都將文章內(nèi)容生成靜態(tài)頁面,這就產(chǎn)生了偽靜態(tài)技術(shù),也就是我們常說的Url Rewriter重寫技術(shù)2012-12-12
簡(jiǎn)單了解springboot中的配置文件相關(guān)知識(shí)
這篇文章主要介紹了簡(jiǎn)單了解springboot中的配置文件相關(guān)知識(shí),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11
Java基于外觀模式實(shí)現(xiàn)美食天下食譜功能實(shí)例詳解
這篇文章主要介紹了Java基于外觀模式實(shí)現(xiàn)美食天下食譜功能,較為詳細(xì)的講述了外觀模式的概念、原理并結(jié)合實(shí)例形似詳細(xì)分析了Java基于外觀模式實(shí)現(xiàn)美食天下食譜功能的具體操作步驟與相關(guān)注意事項(xiàng),需要的朋友可以參考下2018-05-05
SpringBoot中使用Cookie實(shí)現(xiàn)記住登錄的示例代碼
這篇文章主要介紹了SpringBoot中使用Cookie實(shí)現(xiàn)記住登錄的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07

