Java并發(fā)編程中的阻塞隊(duì)列解析
1. 什么是阻塞隊(duì)列?
阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列。
這兩個(gè)附加的操作是:
- 在隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡?/li>
- 當(dāng)隊(duì)列滿時(shí),存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用。
阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程。
阻塞隊(duì)列就是生產(chǎn)者存放元素的容器,而消費(fèi)者也只從容器里拿元素。
阻塞隊(duì)列提供了處理方法:
- 返回特殊值:插入方法會(huì)返回是否成功,成功則返回 true。移除方法,則是從隊(duì)列里拿出一個(gè)元素,如果沒(méi)有則返回 null
- 一直阻塞:當(dāng)阻塞隊(duì)列滿時(shí),如果生產(chǎn)者線程往隊(duì)列里 put 元素,隊(duì)列會(huì)一直阻塞生產(chǎn)者線程,直到拿到數(shù)據(jù),或者響應(yīng)中斷退出。當(dāng)隊(duì)列空時(shí),消費(fèi)者線程試圖從隊(duì)列里 take 元素,隊(duì)列也會(huì)阻塞消費(fèi)者線程,直到隊(duì)列可用。
- 超時(shí)退出:當(dāng)阻塞隊(duì)列滿時(shí),隊(duì)列會(huì)阻塞生產(chǎn)者線程一段時(shí)間,如果超過(guò)一定的時(shí)間,生產(chǎn)者線程就會(huì)退出。
2. Java 里的阻塞隊(duì)列
JDK7 提供了 7 個(gè)阻塞隊(duì)列。分別是
- ArrayBlockingQueue :一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。
- LinkedBlockingQueue :一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列。
- PriorityBlockingQueue :一個(gè)支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列。
- DelayQueue:一個(gè)使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無(wú)界阻塞隊(duì)列。
- SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。
- LinkedTransferQueue:一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列。
- LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。
ArrayBlockingQueue 是一個(gè)用數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列。此隊(duì)列按照先進(jìn)先出(FIFO)的原則對(duì)元素進(jìn)行排序。默認(rèn)情況下不保證訪問(wèn)者公平的訪問(wèn)隊(duì)列,所謂公平訪問(wèn)隊(duì)列是指阻塞的所有生產(chǎn)者線程或消費(fèi)者線程,當(dāng)隊(duì)列可用時(shí),可以按照阻塞的先后順序訪問(wèn)隊(duì)列,即先阻塞的生產(chǎn)者線程,可以先往隊(duì)列里插入元素,先阻塞的消費(fèi)者線程,可以先從隊(duì)列里獲取元素。通常情況下為了保證公平性會(huì)降低吞吐量。我們可以使用以下代碼創(chuàng)建一個(gè)公平的阻塞隊(duì)列:
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
訪問(wèn)者的公平性是使用可重入鎖實(shí)現(xiàn)的,代碼如下:
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
LinkedBlockingQueue 是一個(gè)用鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列。此隊(duì)列的默認(rèn)和最大長(zhǎng)度為 Integer.MAX_VALUE。此隊(duì)列按照先進(jìn)先出的原則對(duì)元素進(jìn)行排序。
PriorityBlockingQueue 是一個(gè)支持優(yōu)先級(jí)的無(wú)界隊(duì)列。默認(rèn)情況下元素采取自然順序排列,也可以通過(guò)比較器 comparator 來(lái)指定元素的排序規(guī)則。元素按照升序排列。
DelayQueue 是一個(gè)支持延時(shí)獲取元素的無(wú)界阻塞隊(duì)列。隊(duì)列使用 PriorityQueue 來(lái)實(shí)現(xiàn)。隊(duì)列中的元素必須實(shí)現(xiàn) Delayed 接口,在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素。只有在延遲期滿時(shí)才能從隊(duì)列中提取元素。我們可以將 DelayQueue 運(yùn)用在以下應(yīng)用場(chǎng)景:
- 緩存系統(tǒng)的設(shè)計(jì):可以用 DelayQueue 保存緩存元素的有效期,使用一個(gè)線程循環(huán)查詢 DelayQueue,一旦能從 DelayQueue 中獲取元素時(shí),表示緩存有效期到了。
- 定時(shí)任務(wù)調(diào)度。使用 DelayQueue 保存當(dāng)天將會(huì)執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從 DelayQueue 中獲取到任務(wù)就開(kāi)始執(zhí)行,從比如 TimerQueue 就是使用 DelayQueue 實(shí)現(xiàn)的。
隊(duì)列中的 Delayed 必須實(shí)現(xiàn) compareTo 來(lái)指定元素的順序。比如讓延時(shí)時(shí)間最長(zhǎng)的放在隊(duì)列的末尾。實(shí)現(xiàn)代碼如下:
public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask x = (ScheduledFutureTask)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); }
如何實(shí)現(xiàn) Delayed 接口
我們可以參考 ScheduledThreadPoolExecutor 里 ScheduledFutureTask 類。
這個(gè)類實(shí)現(xiàn)了 Delayed 接口。
首先:在對(duì)象創(chuàng)建的時(shí)候,使用 time 記錄前對(duì)象什么時(shí)候可以使用,代碼如下:
ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); }
然后使用 getDelay 可以查詢當(dāng)前元素還需要延時(shí)多久,代碼如下:
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), TimeUnit.NANOSECONDS); }
通過(guò)構(gòu)造函數(shù)可以看出延遲時(shí)間參數(shù) ns 的單位是納秒,自己設(shè)計(jì)的時(shí)候最好使用納秒,因?yàn)?getDelay 時(shí)可以指定任意單位,一旦以納秒作為單位,而延時(shí)的時(shí)間又精確不到納秒就麻煩了。使用時(shí)請(qǐng)注意當(dāng) time 小于當(dāng)前時(shí)間時(shí),getDelay 會(huì)返回負(fù)數(shù)。
如何實(shí)現(xiàn)延時(shí)隊(duì)列
延時(shí)隊(duì)列的實(shí)現(xiàn)很簡(jiǎn)單,當(dāng)消費(fèi)者從隊(duì)列里獲取元素時(shí),如果元素沒(méi)有達(dá)到延時(shí)時(shí)間,就阻塞當(dāng)前線程。
long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); else if (leader != null) available.await();
SynchronousQueue 是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每一個(gè) put 操作必須等待一個(gè) take 操作,否則不能繼續(xù)添加元素。SynchronousQueue 可以看成是一個(gè)傳球手,負(fù)責(zé)把生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費(fèi)者線程。隊(duì)列本身并不存儲(chǔ)任何元素,非常適合于傳遞性場(chǎng)景, 比如在一個(gè)線程中使用的數(shù)據(jù),傳遞給另外一個(gè)線程使用,SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。
LinkedTransferQueue 是一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞 TransferQueue 隊(duì)列。相對(duì)于其他阻塞隊(duì)列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。
transfer 方法。如果當(dāng)前有消費(fèi)者正在等待接收元素(消費(fèi)者使用 take() 方法或帶時(shí)間限制的 poll() 方法時(shí)),transfer 方法可以把生產(chǎn)者傳入的元素立刻 transfer(傳輸)給消費(fèi)者。如果沒(méi)有消費(fèi)者在等待接收元素,transfer 方法會(huì)將元素存放在隊(duì)列的 tail 節(jié)點(diǎn),并等到該元素被消費(fèi)者消費(fèi)了才返回。transfer 方法的關(guān)鍵代碼如下:
Node pred = tryAppend(s, haveData); return awaitMatch(s, pred, e, (how == TIMED), nanos);
第一行代碼是試圖把存放當(dāng)前元素的 s 節(jié)點(diǎn)作為 tail 節(jié)點(diǎn)。第二行代碼是讓 CPU 自旋等待消費(fèi)者消費(fèi)元素。因?yàn)樽孕龝?huì)消耗 CPU,所以自旋一定的次數(shù)后使用 Thread.yield() 方法來(lái)暫停當(dāng)前正在執(zhí)行的線程,并執(zhí)行其他線程。
tryTransfer 方法。則是用來(lái)試探下生產(chǎn)者傳入的元素是否能直接傳給消費(fèi)者。如果沒(méi)有消費(fèi)者等待接收元素,則返回 false。和 transfer 方法的區(qū)別是 tryTransfer 方法無(wú)論消費(fèi)者是否接收,方法立即返回。而 transfer 方法是必須等到消費(fèi)者消費(fèi)了才返回。
對(duì)于帶有時(shí)間限制的 tryTransfer(E e, long timeout, TimeUnit unit) 方法,則是試圖把生產(chǎn)者傳入的元素直接傳給消費(fèi)者,但是如果沒(méi)有消費(fèi)者消費(fèi)該元素則等待指定的時(shí)間再返回,如果超時(shí)還沒(méi)消費(fèi)元素,則返回 false,如果在超時(shí)時(shí)間內(nèi)消費(fèi)了元素,則返回 true。
LinkedBlockingDeque 是一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。所謂雙向隊(duì)列指的你可以從隊(duì)列的兩端插入和移出元素。雙端隊(duì)列因?yàn)槎嗔艘粋€(gè)操作隊(duì)列的入口,在多線程同時(shí)入隊(duì)時(shí),也就減少了一半的競(jìng)爭(zhēng)。相比其他的阻塞隊(duì)列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 單詞結(jié)尾的方法,表示插入,獲?。╬eek)或移除雙端隊(duì)列的第一個(gè)元素。以 Last 單詞結(jié)尾的方法,表示插入,獲取或移除雙端隊(duì)列的最后一個(gè)元素。另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。但是 take 方法卻等同于 takeFirst,不知道是不是 Jdk 的 bug,使用時(shí)還是用帶有 First 和 Last 后綴的方法更清楚。
在初始化 LinkedBlockingDeque 時(shí)可以設(shè)置容量防止其過(guò)渡膨脹。另外雙向阻塞隊(duì)列可以運(yùn)用在“工作竊取”模式中。
3. 阻塞隊(duì)列的實(shí)現(xiàn)原理
如果隊(duì)列是空的,消費(fèi)者會(huì)一直等待,當(dāng)生產(chǎn)者添加元素時(shí)候,消費(fèi)者是如何知道當(dāng)前隊(duì)列有元素的呢?如果讓你來(lái)設(shè)計(jì)阻塞隊(duì)列你會(huì)如何設(shè)計(jì),讓生產(chǎn)者和消費(fèi)者能夠高效率的進(jìn)行通訊呢?讓我們先來(lái)看看 JDK 是如何實(shí)現(xiàn)的。
使用通知模式實(shí)現(xiàn)。所謂通知模式,就是當(dāng)生產(chǎn)者往滿的隊(duì)列里添加元素時(shí)會(huì)阻塞住生產(chǎn)者,當(dāng)消費(fèi)者消費(fèi)了一個(gè)隊(duì)列中的元素后,會(huì)通知生產(chǎn)者當(dāng)前隊(duì)列可用。通過(guò)查看 JDK 源碼發(fā)現(xiàn) ArrayBlockingQueue 使用了 Condition 來(lái)實(shí)現(xiàn),代碼如下:
private final Condition notFull; private final Condition notEmpty; public ArrayBlockingQueue(int capacity, boolean fair) { // 省略其他代碼 notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); }
當(dāng)我們往隊(duì)列里插入一個(gè)元素時(shí),如果隊(duì)列不可用,阻塞生產(chǎn)者主要通過(guò) LockSupport.park(this); 來(lái)實(shí)現(xiàn)
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
繼續(xù)進(jìn)入源碼,發(fā)現(xiàn)調(diào)用 setBlocker 先保存下將要阻塞的線程,然后調(diào)用 unsafe.park 阻塞當(dāng)前線程。
public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, 0L); setBlocker(t, null); }
unsafe.park 是個(gè) native 方法,代碼如下:
public native void park(boolean isAbsolute, long time);
park 這個(gè)方法會(huì)阻塞當(dāng)前線程,只有以下四種情況中的一種發(fā)生時(shí),該方法才會(huì)返回。
- 與 park 對(duì)應(yīng)的 unpark 執(zhí)行或已經(jīng)執(zhí)行時(shí)。注意:已經(jīng)執(zhí)行是指 unpark 先執(zhí)行,然后再執(zhí)行的 park。
- 線程被中斷時(shí)。
- 如果參數(shù)中的 time 不是零,等待了指定的毫秒數(shù)時(shí)。
- 發(fā)生異?,F(xiàn)象時(shí)。這些異常事先無(wú)法確定。
我們繼續(xù)看一下 JVM 是如何實(shí)現(xiàn) park 方法的,park 在不同的操作系統(tǒng)使用不同的方式實(shí)現(xiàn),在 linux 下是使用的是系統(tǒng)方法 pthread_cond_wait 實(shí)現(xiàn)。
實(shí)現(xiàn)代碼在 JVM 源碼路徑 src/os/linux/vm/os_linux.cpp 里的 os::PlatformEvent::park 方法,代碼如下:
void os::PlatformEvent::park() { int v ; for (;;) { v = _Event ; if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ; } guarantee (v >= 0, "invariant") ; if (v == 0) { // Do this the hard way by blocking ... int status = pthread_mutex_lock(_mutex); assert_status(status == 0, status, "mutex_lock"); guarantee (_nParked == 0, "invariant") ; ++ _nParked ; while (_Event < 0) { status = pthread_cond_wait(_cond, _mutex); // for some reason, under 2.7 lwp_cond_wait() may return ETIME ... // Treat this the same as if the wait was interrupted if (status == ETIME) { status = EINTR; } assert_status(status == 0 || status == EINTR, status, "cond_wait"); } -- _nParked ; // In theory we could move the ST of 0 into _Event past the unlock(), // but then we'd need a MEMBAR after the ST. _Event = 0 ; status = pthread_mutex_unlock(_mutex); assert_status(status == 0, status, "mutex_unlock"); } guarantee (_Event >= 0, "invariant") ; } }
pthread_cond_wait 是一個(gè)多線程的條件變量函數(shù),cond 是 condition 的縮寫(xiě),字面意思可以理解為線程在等待一個(gè)條件發(fā)生,這個(gè)條件是一個(gè)全局變量。
這個(gè)方法接收兩個(gè)參數(shù),一個(gè)共享變量 _cond,一個(gè)互斥量 _mutex。
而 unpark 方法在 linux 下是使用 pthread_cond_signal 實(shí)現(xiàn)的。
park 在 windows 下則是使用 WaitForSingleObject 實(shí)現(xiàn)的。
當(dāng)隊(duì)列滿時(shí),生產(chǎn)者往阻塞隊(duì)列里插入一個(gè)元素,生產(chǎn)者線程會(huì)進(jìn)入 WAITING (parking) 狀態(tài)。
我們可以使用 jstack dump 阻塞的生產(chǎn)者線程看到這點(diǎn):
"main" prio=5 tid=0x00007fc83c000000 nid=0x10164e000 waiting on condition [0x000000010164d000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000140559fe8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:324) at blockingqueue.ArrayBlockingQueueTest.main(ArrayBlockingQueueTest.java:11)
到此這篇關(guān)于Java并發(fā)編程中的阻塞隊(duì)列解析的文章就介紹到這了,更多相關(guān)Java阻塞隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot深入講解單元測(cè)試與熱部署應(yīng)用
這篇文章介紹了SpringBoot單元測(cè)試與熱部署,文中通過(guò)示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-06-06Java?HttpURLConnection使用方法與實(shí)例演示分析
這篇文章主要介紹了Java?HttpURLConnection使用方法與實(shí)例演示,HttpURLConnection一個(gè)抽象類是標(biāo)準(zhǔn)的JAVA接口,該類位于java.net包中,它提供了基本的URL請(qǐng)求,響應(yīng)等功能,下面我們來(lái)深入看看2023-10-10springboot中JSONObject遍歷并替換部分json值
這篇文章主要介紹了springboot中JSONObject遍歷并替換部分json值,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11Java 實(shí)現(xiàn)常見(jiàn)的非對(duì)稱加密算法
這篇文章主要介紹了Java 實(shí)現(xiàn)常見(jiàn)的非對(duì)稱加密算法,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下2020-11-11Spring AOP實(shí)現(xiàn)功能權(quán)限校驗(yàn)功能的示例代碼
本篇文章主要介紹了Spring AOP實(shí)現(xiàn)功能權(quán)限校驗(yàn)功能的示例代碼,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-12-12java反射機(jī)制的一些學(xué)習(xí)心得小結(jié)
這篇文章主要給大家介紹了關(guān)于java反射機(jī)制的一些學(xué)習(xí)心得,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02javaweb圖書(shū)商城設(shè)計(jì)之圖書(shū)模塊(4)
這篇文章主要介紹了javaweb圖書(shū)商城設(shè)計(jì)之圖書(shū)模塊的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-11-11詳解springboot?springsecuroty中的注銷(xiāo)和權(quán)限控制問(wèn)題
這篇文章主要介紹了springboot-springsecuroty?注銷(xiāo)和權(quán)限控制,賬戶注銷(xiāo)需要在SecurityConfig中加入開(kāi)啟注銷(xiāo)功能的代碼,權(quán)限控制要導(dǎo)入springsecurity和thymeleaf的整合依賴,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友參考下吧2022-03-03Java實(shí)現(xiàn)公眾號(hào)功能、關(guān)注及消息推送實(shí)例代碼
公眾號(hào)開(kāi)發(fā)近些年是一個(gè)比較熱門(mén)的方向,下面這篇文章主要給大家介紹了關(guān)于Java實(shí)現(xiàn)公眾號(hào)功能、關(guān)注及消息推送的相關(guān)資料,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-11-11