深入理解Java中的并發(fā)工具類CountDownLatch
CountDownLatch 概述及使用方式
本篇文章想要講解 JUC 工具類 CountDownLatch,因為 CountDownLatch 提供了簡單有效的線程協(xié)調(diào)和控制機制,所以實際開發(fā)中是比較常用的,所以有必要了解一下 CountDownLatch。
初識 CountDownLatch
CountDownLatch 作為 Java 中的一個同步工具類,用于在多線程間實現(xiàn)協(xié)調(diào)和控制,允許一個或多個線程等待其他線程完成操作后再繼續(xù)執(zhí)行。
CountDownLatch 內(nèi)部維護(hù)了一個計數(shù)器,可以通過構(gòu)造函數(shù)指定初始計數(shù)值。當(dāng)一個線程完成了自己的任務(wù)后,可以調(diào)用 countDown()
方法將計數(shù)值減一。而其他線程可以通過調(diào)用 await()
方法等待計數(shù)值減為零,然后再繼續(xù)執(zhí)行。
一般情況下,主線程會創(chuàng)建 CountDownLatch 對象,然后傳遞給其他線程。其他線程執(zhí)行完自己的任務(wù)后,調(diào)用 countDown()
方法進(jìn)行計數(shù),主線程調(diào)用 await()
方法等待計數(shù)值為零。
CountDownLatch 的核心方法
CountDownLatch 提供了四個核心方法來實現(xiàn)線程的協(xié)調(diào)和控制,核心方法如下:
public CountDownLatch(int count)
CountDownLatch 的構(gòu)造方法,用于創(chuàng)建一個 CountDownLatch 對象,并指定初始計數(shù)值(計數(shù)值表示需要等待的線程數(shù)量)。
public void countDown()
當(dāng)一個線程完成任務(wù)后,可以調(diào)用該方法將計數(shù)器的值減一(如果計數(shù)器的值已經(jīng)為零,那么調(diào)用該方法沒有任何影響,即計數(shù)器的值不會再減,而是一直為零)。
public void await()
- 當(dāng)一個線程需要等待其他線程完成任務(wù)后再繼續(xù)執(zhí)行時,可以調(diào)用該方法進(jìn)行等待(如果計數(shù)器的值已經(jīng)為零,那么調(diào)用該方法會立即返回)。
- 如果在等待過程中,當(dāng)前線程被中斷,則會拋出
InterruptedException
異常。 - 需要注意的是調(diào)用該方法時,計數(shù)器的值應(yīng)當(dāng)在所有線程都能夠完成任務(wù)后變?yōu)榱?,否則可能導(dǎo)致線程一直等待或提前繼續(xù)執(zhí)行的問題。
public boolean await(long timeout, TimeUnit unit)
- 與
await()
方法作用一樣都能使當(dāng)前線程等待,不同點在于允許設(shè)置超時時間(即如果計數(shù)器的值在超時時間內(nèi)變?yōu)榱?,那么方法會返?true,否則返回 false)。 - 參數(shù)中的
timeout
表示超時時間的數(shù)值,unit
表示超時時間的單位。 - 如果在等待過程中,當(dāng)前線程被中斷,則會拋出
InterruptedException
異常。
CountDownLatch 的應(yīng)用場景
通過上面的介紹,應(yīng)該能了解到 CountDownLatch 是什么以及如何使用,接下來通過具體的應(yīng)用場景來看看 CountDownLatch 都可以在實際開發(fā)中起到怎樣的作用。
應(yīng)用場景一:等待多個線程任務(wù)執(zhí)行完成
場景:如果需要等待多個線程執(zhí)行完成后,才能進(jìn)行下一步操作,就可以使用 CountDownLatch 來實現(xiàn)。通過創(chuàng)建一個 CountDownLatch 對象,并將計數(shù)器的值初始化為線程數(shù)(任務(wù)數(shù)),每個線程執(zhí)行完成后,調(diào)用 countDown()
方法將計數(shù)器減一,主線程通過調(diào)用 await()
方法等待所有線程執(zhí)行完成后執(zhí)行下一步操作。
示例:有一個主線程需要等待五個子任務(wù)(線程)都完成后再進(jìn)行后續(xù)操作(匯總子任務(wù)的結(jié)果)。
示例代碼:
/** * CountDownLatch 示例 * @author 單程車票 */ public class CountDownLatchDemo { public static void main(String[] args) { // 任務(wù)數(shù)為5 CountDownLatch countDownLatch = new CountDownLatch(5); for (int i = 0; i < 5; i++) { int task = i; // 創(chuàng)建線程 new Thread(() -> { try { System.out.println("執(zhí)行任務(wù)" + task + "業(yè)務(wù)"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {e.printStackTrace();} } finally { countDownLatch.countDown(); } }).start(); } // 阻塞直到所有任務(wù)執(zhí)行完成或超出超時時間(30min) try { countDownLatch.await(30, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("子線程任務(wù)完成,主線程合并子線程結(jié)果"); } }
示例結(jié)果:
應(yīng)用場景二:等待外部資源初始化
場景:當(dāng)多個線程在執(zhí)行前需要初始化某個系統(tǒng)組件或外部資源(如數(shù)據(jù)庫連接池)時,可以使用 CountDownLatch 實現(xiàn)。通過主線程創(chuàng)建 CountDownLatch 對象,設(shè)定計數(shù)值為 1。初始化線程在完成資源初始化后調(diào)用 countDown()
方法,然后其他線程通過 await()
方法等待初始化完成后再開始使用資源。
示例:有三個線程等待外部資源初始化線程執(zhí)行完成后再執(zhí)行各自線程的業(yè)務(wù)。
示例代碼:
/** * CountDownLatch 示例 * @author 單程車票 */ public class CountDownLatchDemo { public static void main(String[] args) { // 初始計數(shù)值為1 CountDownLatch countDownLatch = new CountDownLatch(1); // 三個線程等待外部資源線程初始化后在執(zhí)行 for (int i = 0; i < 3; i++) { int task = i; // 創(chuàng)建線程 new Thread(() -> { // 阻塞直到外部資源初始化完成 try { countDownLatch.await(30, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("外部資源初始化完成,執(zhí)行任務(wù)" + task + "業(yè)務(wù)"); }).start(); } // 創(chuàng)建線程進(jìn)行外部資源初始化 new Thread(() -> { try { System.out.println("初始化外部資源"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {e.printStackTrace();} } finally { countDownLatch.countDown(); } }).start(); } }
示例結(jié)果:
應(yīng)用場景三:控制線程執(zhí)行順序
場景:當(dāng)需要保證多個線程按照特定的順序執(zhí)行時,可以通過 CountDownLatch 實現(xiàn)。主線程可以根據(jù)特定執(zhí)行順序創(chuàng)建多個 CountDownLatch 對象對應(yīng)多個線程,每個 CountDownLatch 對象的初始計數(shù)值都為 1,保證某一時刻只有指定順序的線程執(zhí)行,執(zhí)行完成后,調(diào)用下一個 CountDownLatch 對象的 countDown()
方法喚醒下一個指定順序線程執(zhí)行。
示例:有三個線程,需要按照 3 1 2 的順序依次執(zhí)行各自線程的業(yè)務(wù)。
示例代碼:
/** * CountDownLatch 示例 * @author 單程車票 */ public class CountDownLatchDemo { public static void main(String[] args) { // 初始計數(shù)值為1 CountDownLatch order1 = new CountDownLatch(1); CountDownLatch order2 = new CountDownLatch(1); CountDownLatch order3 = new CountDownLatch(1); // 三個線程按照 3 1 2 的順序執(zhí)行 order3.countDown(); // 開啟多個線程順序執(zhí)行 // 創(chuàng)建線程1 new Thread(() -> { // 阻塞直到線程3完成 try { order1.await(30, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } try { System.out.println("執(zhí)行任務(wù) 1 的業(yè)務(wù)"); } finally { order2.countDown(); } }).start(); // 創(chuàng)建線程2 new Thread(() -> { // 阻塞直到線程1完成 try { order2.await(30, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("執(zhí)行任務(wù) 2 的業(yè)務(wù)"); }).start(); // 創(chuàng)建線程3 new Thread(() -> { // 阻塞直到主線程開啟順序執(zhí)行 try { order3.await(30, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } try { System.out.println("執(zhí)行任務(wù) 3 的業(yè)務(wù)"); } finally { order1.countDown(); } }).start(); } }
示例結(jié)果:
CountDownLatch 的源碼分析
通過前兩部分的內(nèi)容可以了解到 CountDownLatch 的使用方式和應(yīng)用場景了,可以看到 CountDownLatch 最為核心的兩個方法是 countDown()
和 await()
。接下來通過源碼分析來看看這兩個方法是如何實現(xiàn)的。
通過源碼可以看到 CountDownLatch 其實是基于 AQS 實現(xiàn)的(想進(jìn)一步了解 AQS 的,可以查看深入理解AbstractQueuedSynchronizer), CountDownLatch 內(nèi)部通過一個靜態(tài)內(nèi)部類 Sync 繼承 AQS 來實現(xiàn)構(gòu)建同步鎖。下面從 countDown()
和 await()
這兩個方法開始進(jìn)行源碼分析。
核心方法一:await()
await()
源碼:
可以看到 await()
方法中調(diào)用了 Sync 的 acquireSharedInterruptibly()
方法,但是 Sync 中并沒有實現(xiàn)該方法,所以實際上調(diào)用的是 AQS 中的 acquireSharedInterruptibly()
方法,進(jìn)入方法:
方法中先判斷線程是否被中斷,如果被中斷則拋出 InterruptedException
異常,通過調(diào)用 tryAcquireShared()
方法嘗試搶占共享鎖,這個方法是 AQS 的抽象方法由子類實現(xiàn),這里實際上調(diào)用的就是 Sync 的 tryAcquireShared()
方法,進(jìn)入方法:
該方法調(diào)用 getState()
方法獲取當(dāng)前計數(shù)器的值,并判斷是否為 0,若為 0 則返回 1,不為 0 則返回 -1?;氐缴厦娴?tryAcquireShared()
中可以看到當(dāng)計數(shù)器的值為 0 時則不需要進(jìn)入等待隊列,當(dāng)計數(shù)器的值不為 0 時,則調(diào)用 doAcquireSharedInterruptibly()
)。進(jìn)入方法:
深入方法代碼可以分為以下幾步:
1.首先通過 addWaiter()
構(gòu)建一個共享模式的 Node 并加入等待隊列。
2.然后通過無限循環(huán),判斷當(dāng)前節(jié)點的前驅(qū)節(jié)點是否是頭節(jié)點(前驅(qū)節(jié)點為頭節(jié)點表示意味著具有嘗試資源獲取的機會)
- 前驅(qū)節(jié)點是頭節(jié)點,則不斷地嘗試獲取資源(即調(diào)用
tryAcquireShared()
這個方法前面有提到,用于判斷計數(shù)器的值是否為 0),計數(shù)值為 0,則表示獲取資源成功,即線程可以運行,所以執(zhí)行setHeadAndPropagate()
將當(dāng)前節(jié)點設(shè)置為新的頭結(jié)點,并設(shè)置p.next=null
等待 GC 回收。 - 前驅(qū)結(jié)點不是頭節(jié)點,則執(zhí)行
shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()
根據(jù)一定條件判斷線程是否應(yīng)該被阻塞并檢查是否發(fā)生中斷,等待后續(xù)喚醒。
3.最后的 finally
通過標(biāo)志 failed
(表示是否獲取資源失?。?,如果為 true,則執(zhí)行 cancelAcquire()
方法取消對資源的獲取,并移出等待隊列。
所以這個方法核心為通過無限循環(huán)不斷地嘗試獲取共享資源,獲取成功則將當(dāng)前節(jié)點設(shè)置為頭結(jié)點,獲取失敗則判斷是否需要阻塞并檢查是否被中斷,如果最后獲取失敗,則放棄獲取資源并移出等待隊列。
到這里就是 await()
方法的整個實現(xiàn)流程了,底層通過調(diào)用 AQS 的 doAcquireSharedInterruptibly()
方法以及 CountDownLatch 實現(xiàn) AQS 的抽象方法 tryAcquireShared()
實現(xiàn)線程阻塞和喚醒。
核心方法二:countDown()
countDown()
源碼:
可以看到 countDown()
方法中調(diào)用了 Sync 的 releaseShared()
方法,但是 Sync 中并沒有實現(xiàn)該方法,所以實際上調(diào)用的是 AQS 中的 releaseShared()
方法,進(jìn)入方法:
方法中調(diào)用 Sync 實現(xiàn) AQS 的抽象方法 tryReleaseShared()
來進(jìn)行判斷,進(jìn)入方法:
方法中判斷當(dāng)前計數(shù)器值是否為 0,是則返回 false 不做任何操作,也就是當(dāng)計數(shù)器值為 0 時調(diào)用 CountDownLatch()
方法不會做任何操作。不是 0 則進(jìn)行計數(shù)器值減一,并通過 CAS 操作更新計數(shù)器值,如果更新后的值為 0,則調(diào)用 AQS 內(nèi)部的 doReleaseShared()
方法釋放共享資源,否則除了更新計數(shù)器值之外不做任何操作。進(jìn)入 doReleaseShared()
方法:
doReleaseShared()
方法的目的是在釋放共享資源時,確保喚醒等待的線程,并通過循環(huán)和 CAS 操作來處理并發(fā)情況和頭節(jié)點的變化。
到這里就是 countDown()
方法的整個實現(xiàn)過程了,底層通過 CountDownLatch 實現(xiàn) AQS 的抽象方法 tryReleaseShared()
采用 CAS 來完成計數(shù)器減一,并通過 AQS 的內(nèi)部方法 doReleaseShared()
實現(xiàn)釋放資源。
以上就是深入理解Java中的并發(fā)工具類CountDownLatch的詳細(xì)內(nèi)容,更多關(guān)于Java并發(fā)工具類CountDownLatch的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
解決RedisTemplate的key默認(rèn)序列化器的問題
這篇文章主要介紹了解決RedisTemplate的key默認(rèn)序列化器的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-03-03Java并發(fā)系列之AbstractQueuedSynchronizer源碼分析(獨占模式)
這篇文章主要為大家詳細(xì)介紹了Java并發(fā)系列之AbstractQueuedSynchronizer源碼,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-02-02SpringSecurity實現(xiàn)自定義用戶認(rèn)證方案
Spring?Security?實現(xiàn)自定義用戶認(rèn)證方案可以根據(jù)具體需求和業(yè)務(wù)場景進(jìn)行設(shè)計和實施,滿足不同的安全需求和業(yè)務(wù)需求,這種靈活性使得認(rèn)證機制能夠更好地適應(yīng)各種復(fù)雜的環(huán)境和變化?,本文給大家介紹了SpringSecurity實現(xiàn)自定義用戶認(rèn)證方案,需要的朋友可以參考下2025-01-01mybatis-flex與springBoot整合的實現(xiàn)示例
Mybatis-flex提供了簡單易用的API,開發(fā)者只需要簡單的配置即可使用,本文主要介紹了mybatis-flex與springBoot整合,具有一定的參考價值,感興趣的可以了解一下2024-01-01