Java多線程中的CountDownLatch詳細(xì)解讀
簡介
一個(gè)同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線程一直等待。用給定的計(jì)數(shù) 初始化 CountDownLatch。
由于調(diào)用了 countDown() 方法會使初始化的計(jì)數(shù)減一,所以在當(dāng)前計(jì)數(shù)到達(dá)零之前,await()方法會一直受阻塞。
之后,會釋放所有等待的線程,await 的所有后續(xù)調(diào)用都將立即返回。這種現(xiàn)象只出現(xiàn)一次——計(jì)數(shù)無法被重置。 一個(gè)線程(或者多個(gè)), 等待另外N個(gè)線程完成某個(gè)事情之后才能執(zhí)行
CountDownLatch和CyclicBarrier的區(qū)別
- CountDownLatch的作用是允許1或N個(gè)線程等待其他線程完成執(zhí)行;而CyclicBarrier則是允許N個(gè)線程相互等待。
- CountDownLatch的計(jì)數(shù)器無法被重置;CyclicBarrier的計(jì)數(shù)器可以被重置后使用,因此它被稱為是循環(huán)的barrier。 CountDownLatch類的方法
源碼:
package java.util.concurrent; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); } public long getCount() { return sync.getCount(); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
CountDownLatch的數(shù)據(jù)結(jié)構(gòu)很簡單,它是通過共享鎖實(shí)現(xiàn)的。它包含了sync對象,sync是Sync類型。Sync是實(shí)例類,它繼承于AQS。
核心方法
下面,我們分析CountDownLatch中3個(gè)核心方法: CountDownLatch(int count), await(), countDown()。
CountDownLatch(int count)
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
此方法創(chuàng)建了一個(gè)Sync對象,而Sync是繼承于AQS類。Sync構(gòu)造函數(shù)如下:
Sync(int count) { setState(count); }
setState()在AQS中實(shí)現(xiàn),源碼如下:
private volatile int state; //state在AQS中的聲明 protected final void setState(int newState) { state = newState; }
說明:在AQS中,state是一個(gè)private volatile long類型的對象。對于CountDownLatch而言,state表示的”鎖計(jì)數(shù)器“。CountDownLatch中的getCount()最終是調(diào)用AQS中的getState(),返回的state對象,即”鎖計(jì)數(shù)器“。
//CountDownLatch類的getCount方法 public long getCount() { return sync.getCount(); } //Sync類中的方法 int getCount() { return getState();//調(diào)用AQS中的getState()方法 } //AQS中的方法 protected final int getState() { return state; }
countDown()
//CountDownLatch類中的countDown()方法 public void countDown() { sync.releaseShared(1);//調(diào)用的是AQS中的releaseShared()方法 } //AQS類中的releaseShared()方法 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
說明:releaseShared()的目的是讓當(dāng)前線程釋放它所持有的共享鎖。 它首先會通過tryReleaseShared()去嘗試釋放共享鎖。嘗試成功,則直接返回;嘗試失敗,則通過doReleaseShared()去釋放共享鎖。
tryReleaseShared()在CountDownLatch.java中被重寫,源碼如下:
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { // 獲取“鎖計(jì)數(shù)器”的狀態(tài) int c = getState(); if (c == 0) return false; // “鎖計(jì)數(shù)器”-1 int nextc = c-1; // 通過CAS函數(shù)進(jìn)行賦值。 if (compareAndSetState(c, nextc)) return nextc == 0; } }
說明:tryReleaseShared()的作用是釋放共享鎖,將“鎖計(jì)數(shù)器”的值-1。
await()、await(long timeout, TimeUnit unit)
當(dāng)計(jì)數(shù)值不為零,調(diào)用await(),會一直阻塞當(dāng)前線程,直到計(jì)數(shù)值為0,才會喚醒執(zhí)行。
有參的awaitawait(long timeout, TimeUnit unit)方法,設(shè)置一個(gè)阻塞的超時(shí) 時(shí)間timeout。最多阻塞這個(gè)時(shí)間
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
接下來說無參構(gòu)造方法await():
說明:該函數(shù)實(shí)際上是調(diào)用的AQS的acquireSharedInterruptibly(1);
AQS中的acquireSharedInterruptibly()的源碼如下:
public final void acquireSharedInterruptibly(long arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
說明:acquireSharedInterruptibly()的作用是獲取共享鎖。 如果當(dāng)前線程是中斷狀態(tài),則拋出異常InterruptedException。否則,調(diào)用tryAcquireShared(arg)嘗試獲取共享鎖;嘗試成功則返回,否則就調(diào)用doAcquireSharedInterruptibly()。doAcquireSharedInterruptibly()會使當(dāng)前線程一直等待,直到當(dāng)前線程獲取到共享鎖(或被中斷)才返回。
tryAcquireShared()在CountDownLatch.java中被重寫,它的源碼如下:
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
說明:tryAcquireShared()的作用是嘗試獲取共享鎖。 如果"鎖計(jì)數(shù)器=0",即鎖是可獲取狀態(tài),則返回1;否則,鎖是不可獲取狀態(tài),則返回-1。
AQS中的doAcquireSharedInterruptibly(long arg)方法
private void doAcquireSharedInterruptibly(long arg) throws InterruptedException { // 創(chuàng)建"當(dāng)前線程"的Node節(jié)點(diǎn),且Node中記錄的鎖是"共享鎖"類型;并將該節(jié)點(diǎn)添加到CLH隊(duì)列末尾。 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 獲取上一個(gè)節(jié)點(diǎn)。 // 如果上一節(jié)點(diǎn)是CLH隊(duì)列的表頭,則"嘗試獲取共享鎖"。 final Node p = node.predecessor(); if (p == head) { long r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // (上一節(jié)點(diǎn)不是CLH隊(duì)列的表頭) 當(dāng)前線程一直等待,直到獲取到共享鎖。 // 如果線程在等待過程中被中斷過,則再次中斷該線程(還原之前的中斷狀態(tài))。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
說明:
(1) addWaiter(Node.SHARED)的作用是,創(chuàng)建”當(dāng)前線程“的Node節(jié)點(diǎn),且Node中記錄的鎖的類型是”共享鎖“(Node.SHARED);并將該節(jié)點(diǎn)添加到CLH隊(duì)列末尾。
(2) node.predecessor()的作用是,獲取上一個(gè)節(jié)點(diǎn)。如果上一節(jié)點(diǎn)是CLH隊(duì)列的表頭,則”嘗試獲取共享鎖“。
(3) shouldParkAfterFailedAcquire()的作用和它的名稱一樣,如果在嘗試獲取鎖失敗之后,線程應(yīng)該等待,則返回true;否則,返回false。
(4) 當(dāng)shouldParkAfterFailedAcquire()返回ture時(shí),則調(diào)用parkAndCheckInterrupt(),當(dāng)前線程會進(jìn)入等待狀態(tài),直到獲取到共享鎖才繼續(xù)運(yùn)行。
總結(jié):
CountDownLatch是通過“共享鎖”實(shí)現(xiàn)的。在創(chuàng)建CountDownLatch中時(shí),會傳遞一個(gè)int類型參數(shù)count,該參數(shù)是“鎖計(jì)數(shù)器”的初始狀態(tài),表示該“共享鎖”最多能被count給線程同時(shí)獲取。當(dāng)某線程調(diào)用該CountDownLatch對象的await()方法時(shí),該線程會等待“共享鎖”可用時(shí),才能獲取“共享鎖”進(jìn)而繼續(xù)運(yùn)行。而“共享鎖”可用的條件,就是“鎖計(jì)數(shù)器”的值為0!而“鎖計(jì)數(shù)器”的初始值為count,每當(dāng)一個(gè)線程調(diào)用該CountDownLatch對象的countDown()方法時(shí),才將“鎖計(jì)數(shù)器”-1;通過這種方式,必須有count個(gè)線程調(diào)用countDown()之后,“鎖計(jì)數(shù)器”才為0,而前面提到的等待線程才能繼續(xù)運(yùn)行!
以上,就是CountDownLatch的實(shí)現(xiàn)原理。
CountDownLatch示例
/** * 主線程等待子線程執(zhí)行完成再執(zhí)行 */ public class CountdownLatchTest1 { public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(3); final CountDownLatch latch = new CountDownLatch(3); for (int i = 0; i < 3; i++) { Runnable runnable = new Runnable() { @Override public void run() { try { System.out.println("子線程" + Thread.currentThread().getName() + "開始執(zhí)行"); Thread.sleep((long) (Math.random() * 10000)); System.out.println("子線程"+Thread.currentThread().getName()+"執(zhí)行完成"); latch.countDown();//當(dāng)前線程調(diào)用此方法,則計(jì)數(shù)減一 } catch (InterruptedException e) { e.printStackTrace(); } } }; service.execute(runnable); } try { System.out.println("主線程"+Thread.currentThread().getName()+"等待子線程執(zhí)行完成..."); latch.await();//阻塞當(dāng)前線程,直到計(jì)數(shù)器的值為0 System.out.println("主線程"+Thread.currentThread().getName()+"開始執(zhí)行..."); } catch (InterruptedException e) { e.printStackTrace(); } } }
執(zhí)行結(jié)果:
public class CountdownLatchTest2 { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CountDownLatch cdOrder = new CountDownLatch(1); final CountDownLatch cdAnswer = new CountDownLatch(4); for (int i = 0; i < 4; i++) { Runnable runnable = new Runnable() { @Override public void run() { try { System.out.println("選手" + Thread.currentThread().getName() + "正在等待裁判發(fā)布口令"); cdOrder.await(); System.out.println("選手" + Thread.currentThread().getName() + "已接受裁判口令"); Thread.sleep((long) (Math.random() * 10000)); System.out.println("選手" + Thread.currentThread().getName() + "到達(dá)終點(diǎn)"); cdAnswer.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }; service.execute(runnable); } try { Thread.sleep((long) (Math.random() * 10000)); System.out.println("裁判"+Thread.currentThread().getName()+"即將發(fā)布口令"); cdOrder.countDown(); System.out.println("裁判"+Thread.currentThread().getName()+"已發(fā)送口令,正在等待所有選手到達(dá)終點(diǎn)"); cdAnswer.await(); System.out.println("所有選手都到達(dá)終點(diǎn)"); System.out.println("裁判"+Thread.currentThread().getName()+"匯總成績排名"); } catch (InterruptedException e) { e.printStackTrace(); } service.shutdown(); } }
執(zhí)行結(jié)果:
到此這篇關(guān)于Java多線程中的CountDownLatch詳細(xì)解讀的文章就介紹到這了,更多相關(guān)CountDownLatch詳細(xì)解讀內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Boot項(xiàng)目@RestController使用重定向redirect方式
這篇文章主要介紹了Spring Boot項(xiàng)目@RestController使用重定向redirect方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09SpringBoot微信掃碼支付的實(shí)現(xiàn)示例
這篇文章主要介紹了SpringBoot微信掃碼支付的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01解決Mybatis 大數(shù)據(jù)量的批量insert問題
這篇文章主要介紹了解決Mybatis 大數(shù)據(jù)量的批量insert問題,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01SpringBoot 多線程事務(wù)回滾的實(shí)現(xiàn)
本文是基于springboot的@Async注解開啟多線程,并通過自定義注解和AOP實(shí)現(xiàn)的多線程事務(wù),避免繁瑣的手動提交/回滾事務(wù),感興趣的可以了解一下2024-02-02SpringBoot @PostConstruct和@PreDestroy的使用說明
這篇文章主要介紹了SpringBoot @PostConstruct和@PreDestroy的使用說明,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09