詳解java中的阻塞隊(duì)列
阻塞隊(duì)列簡(jiǎn)介
阻塞隊(duì)列(BlockingQueue)首先是一個(gè)支持先進(jìn)先出的隊(duì)列,與普通的隊(duì)列完全相同;
其次是一個(gè)支持阻塞操作的隊(duì)列,即:
- 當(dāng)隊(duì)列滿時(shí),會(huì)阻塞執(zhí)行插入操作的線程,直到隊(duì)列不滿。
- 當(dāng)隊(duì)列為空時(shí),會(huì)阻塞執(zhí)行獲取操作的線程,直到隊(duì)列不為空。
阻塞隊(duì)列用在多線程的場(chǎng)景下,因此阻塞隊(duì)列使用了鎖機(jī)制來(lái)保證同步,這里使用的可重入鎖;
而對(duì)于阻塞與喚醒機(jī)制則有與鎖綁定的Condition
實(shí)現(xiàn)
應(yīng)用場(chǎng)景:生產(chǎn)者消費(fèi)者模式
java中的阻塞隊(duì)列
java中的阻塞隊(duì)列根據(jù)容量可以分為有界隊(duì)列和無(wú)界隊(duì)列:
- 有界隊(duì)列:隊(duì)列中只能存儲(chǔ)有限個(gè)元素,超出后存放元素線程會(huì)被阻塞或者失敗。
- 無(wú)界隊(duì)列:隊(duì)列中可以存儲(chǔ)無(wú)限個(gè)元素。
java8中提供了7種阻塞隊(duì)列阻塞隊(duì)列供開(kāi)發(fā)者使用,如下表:
類名 | 描述 |
ArrayBlockingQueue | 一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列 |
LinkedBlockingQueue | 由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列(默認(rèn)大小Integer.MAX_VALUE) |
PriorityBlockingQueue | 支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列 |
DelayQueue | 使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的延遲無(wú)界阻塞隊(duì)列 |
SynchronousQueue | 不存儲(chǔ)元素的阻塞隊(duì)列,即單個(gè)元素的隊(duì)列 |
LinkedTransferQueue | 由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列 |
LinkedBlockingDeque | 由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列 |
另外還有一個(gè)在ScheduledThreadPoolExecutor
中實(shí)現(xiàn)的DelayedWorkQueue
阻塞隊(duì)列,
但這個(gè)阻塞隊(duì)列開(kāi)發(fā)者不能使用。它們之間的UML類圖如下圖:
BlockingQueue接口是阻塞隊(duì)列對(duì)外的訪問(wèn)接口,所有的阻塞隊(duì)列都實(shí)現(xiàn)了BlockQueue中的方法
BlockQueue中方法
作為一個(gè)隊(duì)列的核心方法就是入隊(duì)和出隊(duì)。由于存在阻塞策略,BlockQueue
將出隊(duì)入隊(duì)的情況分為了四組,每組提供不同的方法:
- 拋出異常:當(dāng)隊(duì)列滿時(shí),如果再往隊(duì)列中插入元素,則拋出
IllegalStateException
異常;當(dāng)隊(duì)列為空時(shí),從隊(duì)列中獲取元素則拋出NoSuchElementException
異常。 - 返回特定值(布爾值):當(dāng)隊(duì)列滿時(shí),如果再往隊(duì)列中插入元素,則返回false;當(dāng)隊(duì)列為空時(shí),從隊(duì)列中獲取元素則返回null。
- 一直阻塞:當(dāng)隊(duì)列滿時(shí),如果再往隊(duì)列中插入元素,阻塞當(dāng)前線程直到隊(duì)列中至少一個(gè)被移除或者響應(yīng)中斷退出;當(dāng)隊(duì)列為空時(shí),則阻塞當(dāng)前線程直到至少一個(gè)元素元素入隊(duì)或者響應(yīng)中斷退出。
- 超時(shí)退出:當(dāng)隊(duì)列滿時(shí),如果再往隊(duì)列中插入元素,阻塞當(dāng)前線程直到隊(duì)列中至少一個(gè)被移除或者達(dá)到指定的等待時(shí)間退出或者響應(yīng)中斷退出;當(dāng)隊(duì)列為空時(shí),則阻塞當(dāng)前線程直到至少一個(gè)元素元素入隊(duì)或者達(dá)到指定的等待時(shí)間退出或者響應(yīng)中斷退出。
對(duì)于每種情況BlockingQueue
提供的方法如下表:
方法\處理方式 | 拋出異常 | 返回特定值(布爾值) | 一直阻塞 | 超時(shí)退出 |
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time.unit) |
檢查 | element() | peek() | 不可用 | 不可用 |
上述方法一般用于生產(chǎn)者-消費(fèi)者模型中,是其中的生產(chǎn)和消費(fèi)操作隊(duì)列的核心方法。
除了這些方法,BlockingQueue
還提供了一些其他的方法如下表:
方法名稱 | 描述 |
remove(Object o) | 從隊(duì)列中移除一個(gè)指定值 |
size() | 獲取隊(duì)列中元素的個(gè)數(shù) |
contains(Object o) | 判斷隊(duì)列是否包含指定的元素,但是這個(gè)元素在這次判斷完可能就會(huì)被消費(fèi) |
drainTo(Collection<? super E> c) | 將隊(duì)列中元素放在給定的集合中,并返回添加的元素個(gè)數(shù) |
drainTo(Collection<? super E> c, int maxElements) | 將隊(duì)列中元素取maxElements(不超過(guò)隊(duì)列中元素個(gè)數(shù))個(gè)放在給定的集合中,并返回添加的元素個(gè)數(shù) |
remainingCapacity() | 計(jì)算隊(duì)列中還可以存放的元素個(gè)數(shù) |
toArray() | 以objetc數(shù)組的形式獲取隊(duì)列中所有的元素 |
toArray(T[] a) | 以給定類型數(shù)組的方式獲取隊(duì)列中所有的元素 |
clear() | 清空隊(duì)列,危險(xiǎn)的操作 |
阻塞隊(duì)列的實(shí)現(xiàn)原理
阻塞隊(duì)列的實(shí)現(xiàn)依靠通知模式實(shí)現(xiàn):當(dāng)生產(chǎn)者向滿了的隊(duì)列中添加元素時(shí),會(huì)阻塞住生產(chǎn)者,
直到消費(fèi)者消費(fèi)了一個(gè)隊(duì)列中的元素后會(huì)通知消費(fèi)者隊(duì)列可用,此時(shí)再由生產(chǎn)者向隊(duì)列中添加元素。反之亦然。
阻塞隊(duì)列的阻塞喚醒依靠Condition
——條件隊(duì)列來(lái)實(shí)現(xiàn)。
以ArrayBlockingQueue
為例說(shuō)明:
ArrayBlockingQueue
的定義:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** The queued items */ //以數(shù)組的結(jié)構(gòu)存儲(chǔ)隊(duì)列的元素,采用的是循環(huán)數(shù)組 final Object[] items; /** items index for next take, poll, peek or remove */ //隊(duì)列的隊(duì)頭索引 int takeIndex; /** items index for next put, offer, or add */ //隊(duì)列的隊(duì)尾索引 int putIndex; /** Number of elements in the queue */ //隊(duì)列中元素的個(gè)數(shù) int count; /** Main lock guarding all access */ //對(duì)于ArrayBlockingQueue所有的操作都需要加鎖, final ReentrantLock lock; /** Condition for waiting takes */ //條件隊(duì)列,當(dāng)隊(duì)列為空時(shí)阻塞消費(fèi)者并在生產(chǎn)者生產(chǎn)后喚醒消費(fèi)者 private final Condition notEmpty; /** Condition for waiting puts */ //條件隊(duì)列,當(dāng)隊(duì)列滿時(shí)阻塞生產(chǎn)者,并在消費(fèi)者消費(fèi)隊(duì)列后喚醒生產(chǎn)者 private final Condition notFull; }
根據(jù)類的定義字段可以看到,有兩個(gè)Condition
條件隊(duì)列,猜測(cè)以下過(guò)程
- 當(dāng)隊(duì)列為空,消費(fèi)者試圖消費(fèi)時(shí)應(yīng)該調(diào)用
notEmpty.await()
方法阻塞,并在生產(chǎn)者生產(chǎn)后調(diào)用notEmpty.single()
方法 - 當(dāng)隊(duì)列已滿,生產(chǎn)者試圖放入元素應(yīng)調(diào)用
notFull.await()
方法阻塞,并在消費(fèi)者消費(fèi)隊(duì)列后調(diào)用notFull.single()
方法向隊(duì)
向隊(duì)列中添加元素put()
方法的添加過(guò)程。
/** * 向隊(duì)列中添加元素 * 當(dāng)隊(duì)列已滿時(shí)需要阻塞當(dāng)前線程 * 放入元素后喚醒因隊(duì)列為空阻塞的消費(fèi)者 */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //當(dāng)隊(duì)列已滿時(shí)需要notFull.await()阻塞當(dāng)前線程 //offer(e,time,unit)方法就是阻塞的時(shí)候加了超時(shí)設(shè)定 while (count == items.length) notFull.await(); //放入元素的過(guò)程 enqueue(e); } finally { lock.unlock(); } } /**enqueue實(shí)際添加元素的方法*/ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; //如果條件隊(duì)列中存在等待的線程 //喚醒 notEmpty.signal(); }
從隊(duì)列中獲取元素take()
方法的獲取過(guò)程。
/** * 從隊(duì)列中獲取元素 * 當(dāng)隊(duì)列已空時(shí)阻塞當(dāng)前線程 * 從隊(duì)列中消費(fèi)元素后喚醒等待的生產(chǎn)線程 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //隊(duì)列為空需要阻塞當(dāng)前線程 while (count == 0) notEmpty.await(); //獲取元素的過(guò)程 return dequeue(); } finally { lock.unlock(); } } /**dequeue實(shí)際消費(fèi)元素的方法*/ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); //消費(fèi)元素后從喚醒阻塞的生產(chǎn)者線程 notFull.signal(); return x; }
總結(jié)
阻塞隊(duì)列提供了不同于普通隊(duì)列的增加、刪除元素的方法,核心在與隊(duì)列滿時(shí)阻塞生產(chǎn)者和隊(duì)列空時(shí)阻塞消費(fèi)者。
這一阻塞過(guò)程依靠與鎖綁定的Condition
對(duì)象實(shí)現(xiàn)。Condition
接口的實(shí)現(xiàn)在AQS中實(shí)現(xiàn),具體的實(shí)現(xiàn)類是
ConditionObject
以上就是詳解java中的阻塞隊(duì)列的詳細(xì)內(nèi)容,更多關(guān)于java 阻塞隊(duì)列的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java的validation參數(shù)校驗(yàn)代碼實(shí)例
這篇文章主要介紹了Java的validation參數(shù)校驗(yàn)代碼實(shí)例,Validation參數(shù)校驗(yàn)是指在程序運(yùn)行中對(duì)傳進(jìn)來(lái)的參數(shù)進(jìn)行合法性檢查,以保證程序的正確性和安全性,需要的朋友可以參考下2023-10-10基于python locust庫(kù)實(shí)現(xiàn)性能測(cè)試
這篇文章主要介紹了基于python locust庫(kù)實(shí)現(xiàn)性能測(cè)試,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-05-05SpringSecurity自定義AuthenticationProvider無(wú)法@Autowire的解決
這篇文章主要介紹了SpringSecurity自定義AuthenticationProvider無(wú)法@Autowire的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12Java實(shí)現(xiàn)簡(jiǎn)單班級(jí)管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)簡(jiǎn)單班級(jí)管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-02-02SpringBoot中的@EnableAutoConfiguration注解解析
這篇文章主要介紹了SpringBoot中的@EnableAutoConfiguration注解解析,@EnableAutoConfiguration也是借助@Import的幫助,將所有符合自動(dòng)配置條件的bean定義注冊(cè)到IoC容器,需要的朋友可以參考下2023-09-09mybatis查詢結(jié)果返回至實(shí)體類的示例代碼
這篇文章主要介紹了mybatis查詢結(jié)果返回至實(shí)體類的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07java中關(guān)于轉(zhuǎn)義字符的一個(gè)bug
本文主要介紹了java中關(guān)于轉(zhuǎn)義字符的一個(gè)bug。具有很好的參考價(jià)值,下面跟著小編一起來(lái)看下吧2017-02-02詳解使用JavaCV/OpenCV抓取并存儲(chǔ)攝像頭圖像
本篇文章主要介紹了使用JavaCV/OpenCV抓取并存儲(chǔ)攝像頭圖像,實(shí)例分析了使用JavaCV/OpenCV抓取并存儲(chǔ)攝像頭圖像的技巧,非常具有實(shí)用價(jià)值,需要的朋友可以參考下2017-04-04