一文帶你掌握J(rèn)ava?LinkedBlockingQueue
開篇語
隊(duì)列在生活中隨處可見,醫(yī)院繳費(fèi)需要排隊(duì)、做核酸需要排隊(duì)、汽車等紅綠燈需要排隊(duì)等等。
隊(duì)列是一個(gè)按照先來到就排在前面,后來到排在后面的數(shù)據(jù)結(jié)構(gòu),并且出隊(duì)的時(shí)候也是按照先來到先出隊(duì)。使用數(shù)組和鏈表進(jìn)行實(shí)現(xiàn)。通常用于協(xié)調(diào)任務(wù)的執(zhí)行和數(shù)據(jù)的交換。
介紹
LinkedBlockingQueue 是一個(gè)可選有界阻塞隊(duì)列,有界指的是隊(duì)列存在一個(gè)最大容量;阻塞指的是如果隊(duì)列已經(jīng)滿了,想要往隊(duì)列繼續(xù)添加元素的話,那么這個(gè)操作將會(huì)被暫停,直到隊(duì)列中有空位才會(huì)繼續(xù)完成添加操作。如果隊(duì)列已經(jīng)為空,想要從隊(duì)列中獲取元素,那么這個(gè)操作將會(huì)被暫停,直接隊(duì)列中存在元素才會(huì)繼續(xù)完成獲取操作。
實(shí)現(xiàn)原理
LinkedBlockingQueue 內(nèi)部使用鏈表作為元素的存儲(chǔ)結(jié)構(gòu)。內(nèi)部使用了兩個(gè)鎖,分別使用于存操作和取操作。
執(zhí)行存取操作時(shí),都必須先獲取鎖,才可以執(zhí)行存取操作,保證 LinkedBlockingQueue 是線程安全。
LinkedBlockingQueue 通過兩個(gè) Condition 條件隊(duì)列,一個(gè) notFull 條件,一個(gè) notEmpty 條件。在對(duì)隊(duì)列進(jìn)行插入元素操作時(shí),判斷當(dāng)前隊(duì)列已經(jīng)滿,則通過 notFull 條件將線程阻塞,直到其他線程通知該線程隊(duì)列可以繼續(xù)插入元素。在對(duì)隊(duì)列進(jìn)行移除元素操作時(shí),判斷當(dāng)前隊(duì)列已經(jīng)空,則通過 notEmpty 條件阻塞線程,直到其他線程通過該線程可以繼續(xù)獲取元素。
這樣保證線程的存取操作不會(huì)出現(xiàn)錯(cuò)誤。避免隊(duì)列在滿時(shí),丟棄插入的元素;也避免在隊(duì)列空時(shí)取到一個(gè) null 值。
構(gòu)造函數(shù)
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
無參構(gòu)造函數(shù)中,默認(rèn)使用 Integer.MAX_VALUE
作為隊(duì)列的最大容量。
有參構(gòu)造函數(shù)中,可以自己指定隊(duì)列的最大容量,并且創(chuàng)建了頭節(jié)點(diǎn)和尾節(jié)點(diǎn)。那么 LinkedBlockingQueue 使用的是有頭單向鏈表。
private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); transient Node<E> head; private transient Node<E> last; // 取鎖 private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); // 存鎖 private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition();
并且在對(duì)象初始化時(shí),創(chuàng)建了兩個(gè)鎖,分別使用于存操作和取操作。創(chuàng)建了兩個(gè)條件隊(duì)列,分別用于隊(duì)列空和滿的情況。
插入函數(shù)
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final int c; final Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
1.獲取鎖
2.判斷當(dāng)前隊(duì)列是否已經(jīng)滿了
- 如果隊(duì)列已經(jīng)滿了,調(diào)用 notFull 條件隊(duì)列的 await() 方法,將該線程阻塞,暫停該線程的插入操作。避免內(nèi)部溢出的問題。
- 如果沒有滿,則直接調(diào)用入隊(duì)函數(shù) enqueue 插入到隊(duì)列末尾。
3.檢查此時(shí)隊(duì)列是否已滿
如果未滿,則調(diào)用 notFull 條件隊(duì)列的 signal() 方法,喚醒被阻塞在 notFull 條件隊(duì)列的線程。
4.解鎖
- 檢查插入元素前的隊(duì)列元素?cái)?shù)量是否等于0
- 等于 0,則調(diào)用 notEmpty 條件隊(duì)列的 signal() 方法,通知其隊(duì)列現(xiàn)在不為空,可以喚醒阻塞線程獲取元素。
為什么需要調(diào)用 notFull 條件隊(duì)列的 signal() 方法? 因?yàn)殛?duì)列取操作和存操作所使用的鎖是不一樣的,那么就說明,一個(gè)線程執(zhí)行存入操作時(shí),其他線程是可以執(zhí)行取出操作的。我們來看下面這個(gè)例子:
- 隊(duì)列總?cè)萘繛?5,當(dāng)前元素?cái)?shù)量為5。線程 A 獲取了存鎖,想要插入了元素。但是因?yàn)殛?duì)列容量已滿,釋放鎖,并且加入到條件隊(duì)列中,等待被喚醒。
- 線程 B 獲取了存鎖,想要插入了元素。但是因?yàn)殛?duì)列容量已滿,釋放鎖,并且加入到條件隊(duì)列中,等待被喚醒。
- 線程 C 獲取了取鎖,取出了元素 1。并且通過 notFull 的 signal 方法喚醒條件隊(duì)列中被阻塞的線程 A。線程 A 被喚醒后加入到同步隊(duì)列中,但是此時(shí)還沒有競(jìng)爭(zhēng)到鎖。
- 線程 D 獲取了取鎖,取出了元素 2。但是還沒有執(zhí)行到喚醒阻塞線程的代碼。
- 線程 A 競(jìng)爭(zhēng)到鎖,開始執(zhí)行插入元素操作。將元素插入之后,檢查到隊(duì)列元素?cái)?shù)量為 4,小于隊(duì)列的總?cè)萘?,因此?huì)執(zhí)行 notFull 的 signal 方法喚醒條件隊(duì)列中被阻塞的線程 B。線程 B 被喚醒后加入到同步隊(duì)列中,開始競(jìng)爭(zhēng)鎖。
- 線程 B 競(jìng)爭(zhēng)到鎖,開始執(zhí)行插入元素操作。將元素插入之后,檢查到隊(duì)列元素?cái)?shù)量等于 5,不進(jìn)行喚醒操作。
這樣做的目的是盡快的喚醒阻塞線程,可以更快的完成插入元素操作。因?yàn)榫€程存和取的操作相互之間并不是互斥的,而是獨(dú)立運(yùn)行的,提高吞吐量。
獲取函數(shù)
public E take() throws InterruptedException { final E x; final int c; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
1.獲得取鎖
2.判斷當(dāng)前隊(duì)列是否為空
- 如果隊(duì)列沒有元素,調(diào)用 notEmpty 條件隊(duì)列的 await() 方法,將該線程阻塞,暫停該線程的獲取操作。避免獲取元素出錯(cuò)。
- 如果不為空,則直接調(diào)用出隊(duì)函數(shù) dequeue 移除隊(duì)列第一個(gè)元素,并返回給客戶端。
3.檢查此時(shí)隊(duì)列是否為空
如果不為空,則調(diào)用 notEmpty 條件隊(duì)列的 signal() 方法,喚醒被阻塞在 notEmpty 條件隊(duì)列的線程。
4.釋放鎖
5.檢查獲取元素前的隊(duì)列元素?cái)?shù)量是否等于最大容量
等于最大容量,因?yàn)榇藭r(shí)已經(jīng)取出一個(gè)元素,因此隊(duì)列處于未滿的狀態(tài),可以喚醒阻塞在 notFull 條件的線程,讓線程繼續(xù)插入元素。
步驟 3 的目的是盡快的喚醒阻塞線程,可以更快的完成取元素操作。提高吞吐量。可以嘗試自己畫出流程圖。
入隊(duì)函數(shù)
private void enqueue(Node<E> node) { last = last.next = node; }
入隊(duì)函數(shù)極其簡(jiǎn)單,只要將最后一個(gè)元素的 next 指針指向當(dāng)前元素即完成了插入操作。
出隊(duì)函數(shù)
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
我們前面說 LinkedBlockingQueue 使用的是有頭鏈表。頭節(jié)點(diǎn)只是作為一個(gè)標(biāo)志,實(shí)際上并不是一個(gè)真正的元素。當(dāng)獲取元素時(shí),將頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)作為頭節(jié)點(diǎn),將原來的頭節(jié)點(diǎn)取消引用,被垃圾回收即可。
應(yīng)用場(chǎng)景
適用場(chǎng)景
LinkedBlockingQueue 和 ArrayBlockingQueue 一樣適用于多個(gè)線程之間需要共享數(shù)據(jù)、協(xié)調(diào)任務(wù)執(zhí)行的場(chǎng)景。因此可以總結(jié)出以下幾個(gè)應(yīng)用場(chǎng)景:
線程池:線程池是一個(gè)常見的并發(fā)編程模型,它通過線程池中的線程執(zhí)行任務(wù)。并且可以重復(fù)使用這些線程。在線程池中,可以使用 LinkedBlockingQueue 來存儲(chǔ)需要執(zhí)行的任務(wù),以此控制任務(wù)數(shù)量和執(zhí)行順序。當(dāng)線程池中的線程執(zhí)行完任務(wù)之后,可以從 LinkedBlockingQueue 中取出下一個(gè)任務(wù)執(zhí)行。
生產(chǎn)者-消費(fèi)者:在生產(chǎn)者-消費(fèi)者模型中,生產(chǎn)者負(fù)責(zé)生產(chǎn)數(shù)據(jù),消費(fèi)者負(fù)責(zé)對(duì)數(shù)據(jù)進(jìn)行處理。在這種模式下,LinkedBlockingQueue 可以作為生產(chǎn)者與消費(fèi)者之間的數(shù)據(jù)通道,保證線程安全和數(shù)據(jù)正確。
實(shí)際應(yīng)用場(chǎng)景
- Nacos: Nacos 是一個(gè)動(dòng)態(tài)服務(wù)發(fā)現(xiàn)、配置和服務(wù)管理平臺(tái),它使用 LinkedBlockingQueue 來實(shí)現(xiàn)內(nèi)部的任務(wù)隊(duì)列。
- Tomcat:從 Tomcat 7 開始,請(qǐng)求隊(duì)列默認(rèn)使用了 LinkedBlockingQueue 實(shí)現(xiàn)。
- Hystrix: 一個(gè)流行的容錯(cuò)框架,其默認(rèn)使用 LinkedBlockingQueue 作為請(qǐng)求隊(duì)列。
總結(jié)
LinkedBlockingQueue 和 ArrayBlockQueue 的對(duì)比:
- 實(shí)現(xiàn)方式不同:LinkedBlockingQueue 是基于鏈表實(shí)現(xiàn)的隊(duì)列,而 ArrayBlockingQueue 是基于數(shù)組實(shí)現(xiàn)的隊(duì)列。
- 插入和刪除元素的效率不同:LinkedBlockingQueue 的插入和刪除元素的效率較高,因?yàn)樗捎昧藘砂焰i來控制讀寫操作;而 ArrayBlockingQueue 的插入和刪除元素的效率相對(duì)較低,因?yàn)樗捎昧艘话焰i來控制讀寫操作。
- 內(nèi)存占用不同:LinkedBlockingQueue 的內(nèi)存占用比 ArrayBlockingQueue 更高,因?yàn)殒湵斫Y(jié)構(gòu)需要額外的指針存儲(chǔ),而數(shù)組結(jié)構(gòu)則不需要。
- 阻塞模式不同:ArrayBlockingQueue支持公平模式和非公平模式,可以控制入隊(duì)和出隊(duì)的順序;而 LinkedBlockingQueue 只支持非公平模式。
到此這篇關(guān)于一文帶你掌握J(rèn)ava LinkedBlockingQueue的文章就介紹到這了,更多相關(guān)Java LinkedBlockingQueue內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
最全面的JVM優(yōu)化經(jīng)驗(yàn)總結(jié)
這篇文章主要介紹了最全面的JVM優(yōu)化經(jīng)驗(yàn)總結(jié),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,,需要的朋友可以參考下2019-06-06Java?EasyExcel實(shí)現(xiàn)合并相同內(nèi)容單元格與動(dòng)態(tài)標(biāo)題功能
這篇文章主要為大家詳細(xì)介紹了Java?EasyExcel如何實(shí)現(xiàn)合并相同內(nèi)容單元格與動(dòng)態(tài)標(biāo)題功能,文中的示例代碼講解詳細(xì),有需要的小伙伴可以參考下2023-12-12詳解Guava Cache本地緩存在Spring Boot應(yīng)用中的實(shí)踐
Guava Cache是一個(gè)全內(nèi)存的本地緩存實(shí)現(xiàn),本文將講述如何將 Guava Cache緩存應(yīng)用到 Spring Boot應(yīng)用中。具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-01-01SpringBoot+OCR?實(shí)現(xiàn)圖片文字識(shí)別
本文主要介紹了SpringBoot+OCR 實(shí)現(xiàn)圖片文字識(shí)別,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12