Java隊列同步器之CountDownLatch實現(xiàn)詳解
CountDownLatch使用場景
CountDownLatch是一個同步工具類,它允許一個或多個線程一直等待,直到其他線程執(zhí)行完后再執(zhí)行。
例如,應(yīng)用程序的主線程希望在負責(zé)啟動框架服務(wù)的線程已經(jīng)啟動所有框架服務(wù)之后執(zhí)行。CountDownLatch在多線程并發(fā)編程中充當(dāng)一個計時器的功能。
典型的使用例子如下:
public class CountDownLatchTest { private static CountDownLatch latch = new CountDownLatch(2); public static void main(String[] args) throws Exception { long now = System.currentTimeMillis(); ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); executor.execute(new QuickTask(latch)); executor.execute(new SlowTask(latch)); latch.await(); System.out.println("Both QuickTask and SlowTask are finished cost: " + (System.currentTimeMillis() - now)); executor.shutdown(); } static final class QuickTask implements Runnable { private CountDownLatch countDownLatch; public QuickTask(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { try { System.out.println("QuickTaskThread: " + Thread.currentThread().getName()); Thread.sleep(3000); System.out.println("QuickTaskThread finished"); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (countDownLatch != null) { countDownLatch.countDown(); } } } } static final class SlowTask implements Runnable { private CountDownLatch countDownLatch; public SlowTask(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { try { System.out.println("SlowTaskThread: " + Thread.currentThread().getName()); Thread.sleep(5000); System.out.println("SlowTaskThread finished"); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (countDownLatch != null) { countDownLatch.countDown(); } } } } }
運行結(jié)果:
CountDownLatch實現(xiàn)分析
CountDownLatch類的源碼很簡單,如下:
public class CountDownLatch { private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } /** * 這里繼承隊列同步器,并重寫tryAcquireShared(int acquires)、tryReleaseShared(int releases)方法 */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; /** * 設(shè)置AQS同步狀態(tài),同步狀態(tài)變量定義在抽象類AbstractQueuedSynchronizer中 * private volatile int state; */ Sync(int count) { setState(count); } int getCount() { return getState(); } /** * 只有當(dāng)CountDownLatch里面的計數(shù)器為0時,才會返回1 * 在AbstractQueuedSynchronizer里面tryAcquireShared(int acquires)表示共享式獲取同步狀態(tài), * 只有返回值大于等于0的時候表示獲取成功,反之則表示獲取失敗 */ protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } /** * 在AbstractQueuedSynchronizer里面tryAcquireShared(int acquires)表示共享式釋放同步狀態(tài) * 每成功調(diào)用一次這個方法Sync的實例的status值就會減一,當(dāng)status的值減為0時,則不會再減。 */ protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero // compareAndSetState方法執(zhí)行不成功就一直循環(huán)執(zhí)行直到成功 for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /** * countDown:每調(diào)用一次countDown()方法都會使sync成員變量status減一(直到status為0),status為0,則會讓調(diào)用await()方法的地方不在阻塞, * 從而達到可以等待多個并發(fā)事件完成的目標 * void */ public void countDown() { sync.releaseShared(1); } public long getCount() { return sync.getCount(); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
CountDownLatch內(nèi)部依賴Sync實現(xiàn),而Sync繼承AQS。
CountDownLatch主要分析以下三點:
- 構(gòu)造方法,創(chuàng)建CountDownLatch對象時指定count值,即線程個數(shù)
- countDown()方法的實現(xiàn),每執(zhí)行一個線程方法就將計數(shù)器減一,當(dāng)計數(shù)為0時,啟用當(dāng)前線程
- await()方法的實現(xiàn),當(dāng)前線程在計數(shù)器為0之前一直等待,除非線程被中斷
countDown()方法實現(xiàn)
countDown()方法源碼如下:
public void countDown() { //遞減鎖重入次數(shù),當(dāng)state=0時喚醒所有阻塞線程 sync.releaseShared(1); }
releaseShared()方法定義在抽象類AbstractQueuedSynchronizer中,如下這里使用了模板方法模式,該方法中的tryReleaseShard()共享式釋放鎖方法交給子類去實現(xiàn),doReleaseShared()方法在抽象類中實現(xiàn)。
AbstractQueuedSynchronizer中的doReleaseShared()方法如下:
private void doReleaseShared() { //喚醒所有阻塞隊列里面的線程 for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //節(jié)點是否在等待喚醒狀態(tài) if (ws == Node.SIGNAL) { //修改狀態(tài)為初始 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; //成功則喚醒線程 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
CountDownLatch內(nèi)部通過共享鎖實現(xiàn)。在創(chuàng)建CountDownLatch實例時,需要傳遞一個int型的參數(shù):count,該參數(shù)為計數(shù)器的初始值,也可以理解為該共享鎖可以獲取的總次數(shù)。
await()方法實現(xiàn)
await()方法源碼如下:
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
acquireSharedInterruptibly()方法定義在抽象類AbstractQueuedSynchronizer中,如下這里使用了模板方法模式,該方法中的tryAcquiredShard()共享式獲取鎖方法交給子類去實現(xiàn),doAcquiredSharedInterruptibly()方法在抽象類中實現(xiàn)。
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
CountDownLatch中的內(nèi)部類Sync對AQS的tryAcquireShared方法進行了復(fù)寫。
- 當(dāng)前計數(shù)器的值為0的時候返回1,表示獲取鎖成功,此時acquireSharedInterruptibly()方法直接返回,線程可繼續(xù)操作。
- 當(dāng)前計數(shù)器的值不為0的時候返回-1,表示獲取鎖失敗 ,進入doAcquiredSharedInterruptibly()方法,進入隊列中排隊等待。
AQS中doAcquiredSharedInterruptibly()方法實現(xiàn)如下:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //加入等待隊列 final Node node = addWaiter(Node.SHARED); boolean failed = true; // 進入 CAS 循環(huán) try { for (;;) { //當(dāng)一個節(jié)點進入等待隊列后, 獲取此節(jié)點的prev節(jié)點 final Node p = node.predecessor(); // 如果獲取到的prev是head,也就是隊列中第一個等待線程 if (p == head) { //再次嘗試申請,反應(yīng)到CountDownLatch就是查看是否還有線程需要等待(state是否為0) int r = tryAcquireShared(arg); // 如果 r >=0 說明 沒有線程需要等待了state==0 if (r >= 0) { //嘗試將第一個線程關(guān)聯(lián)的節(jié)點設(shè)置為head setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //經(jīng)過自旋tryAcquireShared后,state還不為0,就會到這里,第一次的時候,waitStatus是0,那么node的waitStatus就會被置為SIGNAL,第二次再走到這里,就會用LockSupport的park方法把當(dāng)前線程阻塞住 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
這個方法進來執(zhí)行的第一個動作就是嘗試將當(dāng)前線程封裝成Node加入同步隊列 ,即調(diào)用addWaiter()方法。到這里就走到了AQS的核心部分,AQS用內(nèi)部的一個Node類維護一個Node FIFO隊列。
addWaiter()方法內(nèi)部用CAS實現(xiàn)隊列出入不會發(fā)生阻塞。
LockSupport是JDK中比較底層的類,用來創(chuàng)建鎖和其他同步工具類的基本線程阻塞原語。
setHeadAndPropagate()方法負責(zé)將自旋等待或被LockSupport阻塞的線程喚醒。
private void setHeadAndPropagate(Node node, int propagate) { //備份現(xiàn)在的head Node h = head; //搶到鎖的線程被喚醒,將這個節(jié)點設(shè)置為head setHead(node) // propagate 一般都會大于0 或者存在可被喚醒的線程 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 只有一個節(jié)點或者是共享模式 釋放所有等待線程各自嘗試搶占鎖 if (s == null || s.isShared()) doReleaseShared(); } }
線程封裝成Node對象時,waitStatus是volatile變量,初始值是0,對其賦值可能有4個取值
//當(dāng)前節(jié)點線程由于超時或中斷被取消,這種狀態(tài)的節(jié)點將會被忽略,并移出隊列 static final int CANCELLED = 1; //表示當(dāng)前線程已被掛起,其后繼節(jié)點可以嘗試搶占鎖 static final int SIGNAL = -1; //線程在Condition條件隊列中等待,當(dāng)從同步隊列中復(fù)制到條件隊列時變?yōu)? static final int CONDITION = -2; //共享模式下,釋放共享資源時通知其他節(jié)點 static final int PROPAGATE = -3;
AQS獨占與共享小結(jié)
AQS的功能可以分為兩類:獨占與共享;如ReentrantLock利用了其獨占功能,CountDownLatch利用了其共享功能。AQS的靜態(tài)內(nèi)部類Node里有兩個變量,獨占鎖與共享鎖在創(chuàng)建自己的節(jié)點時(addWaiter方法)用于表明身份,它們會被賦值給Node的nextWaiter變量。
/** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; final boolean isShared() { return nextWaiter == SHARED; } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; }
獨占鎖就是每次只允許一個線程執(zhí)行,當(dāng)前線程執(zhí)行完會release將同步狀態(tài)歸零,再喚醒后繼節(jié)點。通過自定義tryAcquire()方法來實現(xiàn)公平與非公平。
獨占式獲取及釋放資源acquire & release
//成功代表同步狀態(tài)的變更,排斥其他線程;否則加入等待隊列 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //同步狀態(tài)歸0后,喚醒后繼節(jié)點 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
獨占式地釋放和獲取資源都是嚴格按照FIFO的,即通過鏈表的前后驅(qū)指針按順序來的。在獲取資源時,節(jié)點每次都看自身前節(jié)點是否是頭節(jié)點,若是就嘗試獲取資源;獲取沒成功不要緊,此時頭節(jié)點狀態(tài)是SIGNAL,此時該節(jié)點會使用LokSupport的park掛起自己,頭節(jié)點釋放資源后就會unpark該節(jié)點線程,下一輪循環(huán)中該節(jié)點就可以成功獲取資源啦! 如果前節(jié)點不是頭節(jié)點,那就繼續(xù)自旋。
不同于ReentrantLock,CountDownLatch調(diào)用的是AQS里的acquireSharedInterruptibly()與releaseShared()方法,這兩個方法是共享式獲取與釋放資源的實現(xiàn)。CountDownLatch實現(xiàn)了自己的tryAcquireShared()與tryReleaseShared()方法。
共享式獲取及釋放資源acquireSharedInterruptibly & releaseShared
//tryAcquireShared()方法返回正數(shù),代表資源獲取成功,只要不為0嘗試獲取資源的線程就一直成功,返回0代表最后一個資源被獲取了,返回負數(shù)代表資源已經(jīng)沒有了,加入等待隊列 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } //tryReleaseShared()方法返回true,代表資源釋放成功,喚醒所有等待資源的阻塞線程 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
獨占鎖的tryAcquire()及tryRelease()返回boolean代表同步狀態(tài)更改的成功與否;tryReleaseShared()方法也返回boolean值代表資源釋放成功與否,但是AQS中定義的tryAcquireShared()方法返回的卻是int值,這正好體現(xiàn)了獨占與共享的區(qū)別。
來看tryAcquireShared()方法對返回值的注釋
* @return a negative value on failure; zero if acquisition in shared * mode succeeded but no subsequent shared-mode acquire can * succeed; and a positive value if acquisition in shared * mode succeeded and subsequent shared-mode acquires might * also succeed, in which case a subsequent waiting thread * must check availability. (Support for three different * return values enables this method to be used in contexts * where acquires only sometimes act exclusively.) Upon * success, this object has been acquired.
翻譯一下,就是tryAcquireShared()返回大于0的正數(shù)代表當(dāng)前線程能夠正常獲取資源,其之后的線程也可能正常獲取資源,返回0代表當(dāng)前線程能夠正常獲取資源,但之后的線程將會進入等待隊列中。獨占與共享最大不同就在各自的tryAcquire里,對于獨占來說只有true或false,只有一個線程得以執(zhí)行任務(wù);而對于共享鎖的tryAcquireShared()來說,線程數(shù)沒達到限制都可以直接執(zhí)行。 但本質(zhì)上都是對AQS同步狀態(tài)的修改,一個是0與1之間,另一個允許更多而已。
到此這篇關(guān)于Java隊列同步器之CountDownLatch實現(xiàn)詳解的文章就介紹到這了,更多相關(guān)Java的CountDownLatch實現(xiàn)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
關(guān)于Spring?Boot內(nèi)存泄露排查的記錄
這篇文章主要介紹了關(guān)于Spring?Boot內(nèi)存泄露排查的記錄,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-06-06SpringBoot使用slf4j日志并輸出到文件中的操作方法
這篇文章主要介紹了SpringBoot使用slf4j日志并輸出到文件中,本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-08-08postman測試post請求參數(shù)為json類型的實例講解
下面小編就為大家分享一篇postman測試post請求參數(shù)為json類型的實例講解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-03-03JDBC操作數(shù)據(jù)庫的增加、刪除、更新、查找實例分析
這篇文章主要介紹了JDBC操作數(shù)據(jù)庫的增加、刪除、更新、查找方法,以完整實例形式分析了Java基于JDBC連接數(shù)據(jù)庫及進行數(shù)據(jù)的增刪改查等技巧,具有一定參考借鑒價值,需要的朋友可以參考下2015-10-10