JDK數(shù)組阻塞隊(duì)列源碼深入分析總結(jié)
前言
在前面一篇文章從零開始自己動(dòng)手寫阻塞隊(duì)列當(dāng)中我們仔細(xì)介紹了阻塞隊(duì)列提供給我們的功能,以及他的實(shí)現(xiàn)原理,并且基于談到的內(nèi)容我們自己實(shí)現(xiàn)了一個(gè)低配版的數(shù)組阻塞隊(duì)列。在這篇文章當(dāng)中我們將仔細(xì)介紹JDK具體是如何實(shí)現(xiàn)數(shù)組阻塞隊(duì)列的。
阻塞隊(duì)列的功能
而在本篇文章所談到的阻塞隊(duì)列當(dāng)中,是在并發(fā)的情況下使用的,上面所談到的是隊(duì)列是并發(fā)不安全的,但是阻塞隊(duì)列在并發(fā)下情況是安全的。阻塞隊(duì)列的主要的需求如下:
- 隊(duì)列基礎(chǔ)的功能需要有,往隊(duì)列當(dāng)中放數(shù)據(jù),從隊(duì)列當(dāng)中取數(shù)據(jù)。
- 所有的隊(duì)列操作都要是并發(fā)安全的。
- 當(dāng)隊(duì)列滿了之后再往隊(duì)列當(dāng)中放數(shù)據(jù)的時(shí)候,線程需要被掛起,當(dāng)隊(duì)列當(dāng)中的數(shù)據(jù)被取出,讓隊(duì)列當(dāng)中有空間的時(shí)候線程需要被喚醒。
- 當(dāng)隊(duì)列空了之后再往隊(duì)列當(dāng)中取數(shù)據(jù)的時(shí)候,線程需要被掛起,當(dāng)有線程往隊(duì)列當(dāng)中加入數(shù)據(jù)的時(shí)候被掛起的線程需要被喚醒。
- 在我們實(shí)現(xiàn)的隊(duì)列當(dāng)中我們使用數(shù)組去存儲(chǔ)數(shù)據(jù),因此在構(gòu)造函數(shù)當(dāng)中需要提供數(shù)組的初始大小,設(shè)置用多大的數(shù)組。
上面就是數(shù)組阻塞隊(duì)列給我們提供的最核心的功能,其中將線程掛起和喚醒就是阻塞隊(duì)列的核心,掛起和喚醒體現(xiàn)了“阻塞”這一核心思想。
數(shù)組阻塞隊(duì)列設(shè)計(jì)
閱讀這部分內(nèi)容你需要熟悉可重入鎖ReentrantLock和條件變量Condition的使用。
數(shù)組的循環(huán)使用
因?yàn)槲覀兪鞘褂脭?shù)組存儲(chǔ)隊(duì)列當(dāng)中的數(shù)據(jù),從下表為0的位置開始,當(dāng)我們往隊(duì)列當(dāng)中加入一些數(shù)據(jù)之后,隊(duì)列的情況可能如下,其中head表示隊(duì)頭,tail表示隊(duì)尾。
在上圖的基礎(chǔ)之上我們?cè)谶M(jìn)行四次出隊(duì)操作,結(jié)果如下:
在上面的狀態(tài)下,我們繼續(xù)加入8個(gè)數(shù)據(jù),那么布局情況如下:
我們知道上圖在加入數(shù)據(jù)的時(shí)候不僅將數(shù)組后半部分的空間使用完了,而且可以繼續(xù)使用前半部分沒有使用過的空間,也就是說在隊(duì)列內(nèi)部實(shí)現(xiàn)了一個(gè)循環(huán)使用的過程。
字段設(shè)計(jì)
在JDK當(dāng)中數(shù)組阻塞隊(duì)列的實(shí)現(xiàn)是ArrayBlockingQueue類,在他的內(nèi)部是使用數(shù)組實(shí)現(xiàn)的,我們現(xiàn)在來看一下它的主要的字段,為了方便閱讀將所有的解釋說明都寫在的注釋當(dāng)中:
/** The queued items */ final Object[] items; // 這個(gè)就是具體存儲(chǔ)數(shù)據(jù)的數(shù)組 /** items index for next take, poll, peek or remove */ int takeIndex; // 因?yàn)槭顷?duì)列 因此我們需要知道下一個(gè)出隊(duì)的數(shù)據(jù)的下標(biāo) 這個(gè)就是表示下一個(gè)將要出隊(duì)的數(shù)據(jù)的下標(biāo) /** items index for next put, offer, or add */ int putIndex; // 我們同時(shí)也需要下一個(gè)入隊(duì)的數(shù)據(jù)的下標(biāo) /** Number of elements in the queue */ int count; // 統(tǒng)計(jì)隊(duì)列當(dāng)中一共有多少個(gè)數(shù)據(jù) /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ final ReentrantLock lock; // 因?yàn)樽枞?duì)列是一種可以并發(fā)使用的數(shù)據(jù)結(jié)構(gòu) /** Condition for waiting takes */ private final Condition notEmpty; // 這個(gè)條件變量主要用于喚醒被 take 函數(shù)阻塞的線程 也就是從隊(duì)列當(dāng)中取數(shù)據(jù)的線程 /** Condition for waiting puts */ private final Condition notFull; // 這個(gè)條件變量主要用于喚醒被 put 函數(shù)阻塞的線程 也就是從隊(duì)列當(dāng)中放數(shù)據(jù)的線程
構(gòu)造函數(shù)
構(gòu)造函數(shù)的主要功能是申請(qǐng)指定大小的內(nèi)存空間,并且對(duì)類的成員變量進(jìn)行賦值操作。
public ArrayBlockingQueue(int capacity) { // capacity 表示用與存儲(chǔ)數(shù)據(jù)的數(shù)組的長度 this(capacity, false); } // fair 這個(gè)參數(shù)主要是用于說明 是否使用公平鎖 // 如果為 true 表示使用公平鎖 執(zhí)行效率低 但是各個(gè)線程進(jìn)入臨界區(qū)的順序是先來后到的順序 更加公平 // 如果為 false 表示使用非公平鎖 執(zhí)行效率更高 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; // 對(duì)變量進(jìn)行賦值操作 lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
put函數(shù)
這個(gè)函數(shù)是阻塞隊(duì)列對(duì)核心的函數(shù)之一了,首先我們需要了解的是,如果一個(gè)線程調(diào)用了這個(gè)函數(shù)往隊(duì)列當(dāng)中加入數(shù)據(jù),如果此時(shí)隊(duì)列已經(jīng)滿了則線程需要被掛起,如果沒有滿則需要將數(shù)據(jù)加入到隊(duì)列當(dāng)中,也就是將數(shù)據(jù)存儲(chǔ)到數(shù)組當(dāng)中。注意還有一個(gè)很重要的一點(diǎn)是,當(dāng)我們往隊(duì)列當(dāng)中加入一個(gè)數(shù)據(jù)之后需要發(fā)一個(gè)信號(hào)給其他被take函數(shù)阻塞的線程,因?yàn)檫@些線程在取數(shù)據(jù)的時(shí)候可能隊(duì)列當(dāng)中已經(jīng)空了,因此需要將這些線程喚醒。
public void put(E e) throws InterruptedException { checkNotNull(e); // 保證輸入的數(shù)據(jù)不為 null 代碼在下方 final ReentrantLock lock = this.lock; // 進(jìn)行加鎖操作,因?yàn)橄旅媸桥R界區(qū) lock.lockInterruptibly(); try { while (count == items.length) // 如果隊(duì)列已經(jīng)滿了 也就是隊(duì)列當(dāng)中數(shù)據(jù)的個(gè)數(shù) count == 數(shù)組的長度的話 就需要將線程掛起 notFull.await(); // 當(dāng)隊(duì)列當(dāng)中有空間的之后將數(shù)據(jù)加入到隊(duì)列當(dāng)中 這個(gè)函數(shù)在下面仔細(xì)分析 代碼在下方 enqueue(e); } finally { lock.unlock(); } } private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException(); } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; // 進(jìn)入這個(gè)函數(shù)的線程已經(jīng)在 put 函數(shù)當(dāng)中加上鎖了 因此這里不需要加鎖 final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) // 因?yàn)檫@個(gè)數(shù)據(jù)是循環(huán)使用的 因此可以回到下標(biāo)為0的位置 // 因?yàn)殛?duì)列當(dāng)中的數(shù)據(jù)可以出隊(duì) 因此下標(biāo)為 0 的位置不存在數(shù)據(jù)可以使用 putIndex = 0; count++; // 在這里需要將一個(gè)被 take 函數(shù)阻塞的線程喚醒 如果調(diào)用這個(gè)方法的時(shí)候沒有線程阻塞 // 那么調(diào)用這個(gè)方法相當(dāng)于沒有調(diào)用 如果有線程阻塞那么將會(huì)喚醒一個(gè)線程 notEmpty.signal(); }
注意:這里有一個(gè)地方非常容易被忽略,那就是在將線程掛起的時(shí)候使用的是while循環(huán)而不是if條件語句,代碼:
final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); }
這是因?yàn)?,線程被喚醒之后并不會(huì)立馬執(zhí)行,因?yàn)榫€程在調(diào)用await方法的之后會(huì)釋放鎖,他想再次執(zhí)行還需要再次獲得鎖,然后就在他獲取鎖之前的這段時(shí)間里面,可能其他的線程也會(huì)從數(shù)組當(dāng)中放數(shù)據(jù),因此這個(gè)線程執(zhí)行的時(shí)候隊(duì)列可能還是滿的,因此需要再次判斷,否則就會(huì)覆蓋數(shù)據(jù),像這種喚醒之后并沒有滿足線程執(zhí)行條件的現(xiàn)象叫做虛假喚醒,因此大家在寫程序的時(shí)候要格外注意,當(dāng)需要將線程掛起或者喚醒的之后,最好考慮清楚,如果不確定可以使用while替代if,這樣的話更加保險(xiǎn)。
take函數(shù)
這個(gè)函數(shù)主要是從隊(duì)列當(dāng)中取數(shù)據(jù),但是當(dāng)隊(duì)列為空的時(shí)候需要將調(diào)用這個(gè)方法的線程阻塞。當(dāng)隊(duì)列當(dāng)中有數(shù)據(jù)的時(shí)候,就可以從隊(duì)列當(dāng)中取出數(shù)據(jù),但是有一點(diǎn)很重要的就是當(dāng)從隊(duì)列當(dāng)中取出數(shù)據(jù)之后,需要調(diào)用signal方法,用于喚醒被 put 函數(shù)阻塞的線程,因?yàn)閺年?duì)列當(dāng)中取出數(shù)據(jù)了,隊(duì)列肯定已經(jīng)不滿了,因此可以喚醒被 put 函數(shù)阻塞的線程了。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 因?yàn)槿?shù)據(jù)的代碼涉及到數(shù)據(jù)競爭 也就是說多個(gè)線程同時(shí)競爭 數(shù)組數(shù)據(jù)items 因此需要用鎖保護(hù)起來 lock.lockInterruptibly(); try { // 當(dāng) count == 0 說明隊(duì)列當(dāng)中沒有數(shù)據(jù) while (count == 0) notEmpty.await(); // 當(dāng)隊(duì)列當(dāng)中還有數(shù)據(jù)的時(shí)候可以將數(shù)據(jù)出隊(duì) return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") // 取出數(shù)據(jù) E x = (E) items[takeIndex]; items[takeIndex] = null; // 將對(duì)應(yīng)的位置設(shè)置為 null 數(shù)據(jù)就可以被垃圾收集器回收了 if (++takeIndex == items.length) takeIndex = 0; count--; // 迭代器也需要出隊(duì) 如果不了 if (itrs != null) itrs.elementDequeued(); // 調(diào)用 signal 函數(shù) 將被 put 函數(shù)阻塞的線程喚醒 如果調(diào)用這個(gè)方法的時(shí)候沒有線程阻塞 // 那么調(diào)用這個(gè)方法相當(dāng)于沒有調(diào)用 如果有線程阻塞那么將會(huì)喚醒一個(gè)線程 notFull.signal(); return x; }
同樣的道理這里也需要使用while循環(huán)去進(jìn)行阻塞,否則可能存在虛假喚醒,可能隊(duì)列當(dāng)中沒有數(shù)據(jù)返回的數(shù)據(jù)為 null,而且會(huì)破壞隊(duì)列的結(jié)構(gòu)因?yàn)闀?huì)涉及隊(duì)列的兩個(gè)端點(diǎn)的值的改變,也就是takeIndex和putIndex的改變。
offer函數(shù)
這個(gè)函數(shù)的作用和put函數(shù)一樣,只不過當(dāng)隊(duì)列滿了的時(shí)候,這個(gè)函數(shù)返回false,加入數(shù)據(jù)成功之后這個(gè)函數(shù)返回true,下面的代碼就比較簡單了。
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { // 如果隊(duì)列當(dāng)中的數(shù)據(jù)個(gè)數(shù)和數(shù)組的長度相等 說明隊(duì)列滿了 直接返回 false 即可 if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
add函數(shù)
這個(gè)函數(shù)和上面兩個(gè)函數(shù)的意義也是一樣的,只不過當(dāng)隊(duì)列滿了之后這個(gè)函數(shù)會(huì)拋出異常。
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
poll函數(shù)
這個(gè)函數(shù)和take函數(shù)的作用差不多,但是這個(gè)函數(shù)不會(huì)阻塞,當(dāng)隊(duì)列當(dāng)中沒有數(shù)據(jù)的時(shí)候直接返回null,有數(shù)據(jù)的話返回?cái)?shù)據(jù)。
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
總結(jié)
在本篇文章當(dāng)中主要介紹了JDK內(nèi)部是如何實(shí)現(xiàn)ArrayBlockingQueue的,如果你對(duì)鎖和隊(duì)列的使用有一定的了解本篇文章應(yīng)該還是比較容易理解的。在實(shí)現(xiàn)ArrayBlockingQueue當(dāng)中有以下需要注意的點(diǎn):
put函數(shù),如果在往隊(duì)列當(dāng)中加入數(shù)據(jù)的時(shí)候隊(duì)列滿了,則需要將線程掛起。在隊(duì)列當(dāng)中有空間之后,線程被喚醒繼續(xù)執(zhí)行,在往隊(duì)列當(dāng)中加入了數(shù)據(jù)之后,需要調(diào)用signal方法,喚醒被take函數(shù)阻塞的線程。
take函數(shù),如果在往隊(duì)列當(dāng)中取出數(shù)據(jù)的時(shí)候隊(duì)列空了,則需要將線程掛起。在隊(duì)列當(dāng)中有數(shù)據(jù)之后,線程被喚醒繼續(xù)執(zhí)行,在從隊(duì)列當(dāng)中取出數(shù)據(jù)之后,需要調(diào)用signal方法,喚醒被put函數(shù)阻塞的線程。
在調(diào)用await函數(shù)的時(shí)候,需要小心虛假喚醒現(xiàn)象。
到此這篇關(guān)于JDK數(shù)組阻塞隊(duì)列源碼深入分析總結(jié)的文章就介紹到這了,更多相關(guān)JDK數(shù)組阻塞隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot+mybatis-plus 兩種方式打印sql語句的方法
這篇文章主要介紹了springboot+mybatis-plus 兩種方式打印sql語句的方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10JDK8中的HashMap初始化和擴(kuò)容機(jī)制詳解
這篇文章主要介紹了JDK8中的HashMap初始化和擴(kuò)容機(jī)制,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06Java UrlRewriter偽靜態(tài)技術(shù)運(yùn)用深入分析
通常我們?yōu)榱烁玫木徑夥?wù)器壓力,和增強(qiáng)搜索引擎的友好面,都將文章內(nèi)容生成靜態(tài)頁面,這就產(chǎn)生了偽靜態(tài)技術(shù),也就是我們常說的Url Rewriter重寫技術(shù)2012-12-12簡單了解springboot中的配置文件相關(guān)知識(shí)
這篇文章主要介紹了簡單了解springboot中的配置文件相關(guān)知識(shí),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11Java基于外觀模式實(shí)現(xiàn)美食天下食譜功能實(shí)例詳解
這篇文章主要介紹了Java基于外觀模式實(shí)現(xiàn)美食天下食譜功能,較為詳細(xì)的講述了外觀模式的概念、原理并結(jié)合實(shí)例形似詳細(xì)分析了Java基于外觀模式實(shí)現(xiàn)美食天下食譜功能的具體操作步驟與相關(guān)注意事項(xiàng),需要的朋友可以參考下2018-05-05SpringBoot中使用Cookie實(shí)現(xiàn)記住登錄的示例代碼
這篇文章主要介紹了SpringBoot中使用Cookie實(shí)現(xiàn)記住登錄的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07