詳解Java并發(fā)工具類之CountDownLatch和CyclicBarrier
在JDK的并發(fā)包中,有幾個非常有用的并發(fā)工具類,它們分別是:CountDownLatch
、CyclicBarrier
、Semaphore
和Exchanger
。
CountDownLatch
(倒計時門閂):它允許一個或多個線程等待其他線程完成操作后再繼續(xù)執(zhí)行。它通過一個計數(shù)器來實現(xiàn),線程通過調(diào)用countDown()
方法來減少計數(shù)器的值,await()
方法進行阻塞等待計數(shù)器減少,當(dāng)計數(shù)器達到零時,等待的線程將被釋放。CyclicBarrier
(循環(huán)屏障):它允許一組線程互相等待,直到到達一個共同的屏障點,然后繼續(xù)執(zhí)行后續(xù)操作。與CountDownLatch
不同的是,CyclicBarrier
的計數(shù)器可以重復(fù)使用(reset()
方法),當(dāng)所有等待線程都到達屏障點后,計數(shù)器會重置,線程可以繼續(xù)下一次等待。Semaphore
(信號量):它用于控制對某個資源的訪問權(quán)限。Semaphore
維護了一組許可證,線程在訪問資源前需要獲取許可證,如果許可證不可用,則線程必須等待,直到有可用的許可證。Exchanger
(交換器):它提供了一種線程間交換數(shù)據(jù)的機制。兩個線程可以通過Exchanger
交換數(shù)據(jù),當(dāng)兩個線程都調(diào)用exchange()
方法后,他們會彼此交換數(shù)據(jù),并繼續(xù)執(zhí)行后續(xù)操作。
CountDownLatch
Latch(門閂)設(shè)計模式
當(dāng)多個線程并發(fā)執(zhí)行任務(wù),然后只有等待所有子任務(wù)全部完成進行匯總,程序的門閂才能打開讓程序繼續(xù)往下執(zhí)行。它指定了一個屏障,只有所有條件都滿足的時候,門閥才能打開。
比如小明和小紅相約周末去爬山,約定在人民廣場碰頭,然后一同出發(fā)去爬山,他們各自從家里出發(fā),無論是其中某一個先到達了人民廣場都要等待另一個到達之后才可以繼續(xù)進行下去,這里的人民廣場碰頭就相當(dāng)于上述的門閂。
示例
還是使用上面的例子,我們模擬小明和小紅從家出發(fā),設(shè)定不同的等待時間模擬到達人民廣場的路程耗時。代碼如下
public static void main(String[] args) throws InterruptedException, ExecutionException { ? ? ? ?final int threadNum = 2; ? ? ? ?ExecutorService executorService = Executors.newFixedThreadPool(threadNum); ? ? ? ?CountDownLatch countDownLatch = new CountDownLatch(threadNum); ? ? ? ? ?executorService.execute(() -> { ? ? ? ? ? ? ?System.out.println("小明開始出發(fā)"); ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(2); ? ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? ? ?e.printStackTrace(); ? ? ? ? ? } ? ? ? ? ? ?countDownLatch.countDown(); ?// 計數(shù)器 -1 ? ? ? ? ? ?System.out.println("小明到達人民廣場"); ? ? ? ? ? }); ? ? ? ?executorService.execute(() -> { ? ? ? ? ? ? ?System.out.println("小紅開始出發(fā)"); ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(3); ? ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? ? ?e.printStackTrace(); ? ? ? ? ? } ? ? ? ? ? ?countDownLatch.countDown(); ?// 計數(shù)器 -1 ? ? ? ? ? ?System.out.println("小紅到達人民廣場"); ? ? ? ? ? }); ? ? ? ?countDownLatch.await(); ? ? ? ?System.out.println("小明和小紅都到達了人民廣場,開始一起出發(fā)去爬山"); ? ? ? ?executorService.shutdown(); ? ? } ?
結(jié)果
小明開始出發(fā)
小紅開始出發(fā)
小明到達人民廣場 // 2s后打印
小紅到達人民廣場 // 3s后打印
小明和小紅都到達了人民廣場,開始一起出發(fā)去爬山 //3s后打印
?
Process finished with exit code 0
與Join()的區(qū)別
可能這里會有疑問,使用Thread.join()
也可以實現(xiàn)相同的功能,這與使用CountDownLatch
有什么區(qū)別呢?
join()的實現(xiàn)
public static void main(String[] args) throws InterruptedException { ? ?Thread thread1 = new Thread(() -> { ? ? ? ?System.out.println("小明開始出發(fā)"); ? ? ? ?try { ? ? ? ? ? ?TimeUnit.SECONDS.sleep(2); ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ?e.printStackTrace(); ? ? ? } ? ? ? ?System.out.println("小明到達人民廣場"); ? ? }, "thread1"); ? ?Thread thread2 = new Thread(() -> { ? ? ? ?System.out.println("小紅開始出發(fā)"); ? ? ? ?try { ? ? ? ? ? ?TimeUnit.SECONDS.sleep(3); ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ?e.printStackTrace(); ? ? ? } ? ? ? ?System.out.println("小紅到達人民廣場"); ? ? }, "thread2"); ? ? ?thread1.start(); ? ?thread2.start(); ? ?thread1.join(); ? ?thread2.join(); ? ?System.out.println("小明和小紅都到達了人民廣場,開始一起出發(fā)去爬山"); ? }
結(jié)果
小明開始出發(fā)
小紅開始出發(fā)
小明到達人民廣場 // 2s后打印
小紅到達人民廣場 // 3s后打印
小明和小紅都到達了人民廣場,開始一起出發(fā)去爬山 //3s后打印
?
Process finished with exit code 0
發(fā)現(xiàn)使用join()實現(xiàn)和countDownCatch
實現(xiàn)好像在代碼上的體現(xiàn)并沒有太大差異,不急,我們接著往下看
join()實現(xiàn)原理
我們點進去join的jdk
源碼查看它的實現(xiàn)邏輯
public final void join() throws InterruptedException { ? ?join(0); } ? public final synchronized void join(long millis) ? ?throws InterruptedException { ? ? ? ?long base = System.currentTimeMillis(); ? ? ? ?long now = 0; ? ? ? ? ?if (millis < 0) { ? ? ? ? ? ?throw new IllegalArgumentException("timeout value is negative"); ? ? ? } // 調(diào)用join真正執(zhí)行的方法 ? ? ? ?if (millis == 0) { ? ? ? ? ? ?while (isAlive()) { ? ? ? ? ? ? ? ?wait(0); ? ? ? ? ? } ? ? ? } else { ? ? ? ? ? ?while (isAlive()) { ? ? ? ? ? ? ? ?long delay = millis - now; ? ? ? ? ? ? ? ?if (delay <= 0) { ? ? ? ? ? ? ? ? ? ?break; ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ?wait(delay); ? ? ? ? ? ? ? ?now = System.currentTimeMillis() - base; ? ? ? ? ? } ? ? ? } ? } ?
我們看到他的核心代碼就幾行
while (isAlive()) { ? ? wait(0); }
這幾行代碼不難理解,通過不停的檢查join線程是否存活,如果線程狀態(tài)是活動的,那么就一直等待下去(wait(0)表示永久等待),直到j(luò)oin線程中止后,線程的this.notifyAll()
方法會被調(diào)用,不過調(diào)用notifyAll()
方法是在JVM里 實現(xiàn)的,所以在JDK里看不到。
Join()與countDownLatch比較
回到上一個問題,join到底和countDownLatch
有什么區(qū)別,countDownLatch
底層使用了計數(shù)器來控制線程的喚醒,提供了更細粒度的線程控制,比如我們運行了100個線程,但是只需要80個線程執(zhí)行結(jié)束就可以繼續(xù)下去,那么使用join就不合適了。
綜上所述 CountDownLatch
相對于Join的優(yōu)勢:
CountDownLatch
可以等待多個線程的完成,而Join只能等待一個線程。CountDownLatch
可以靈活地設(shè)置計數(shù)器的值,不僅僅限于線程數(shù),可以根據(jù)需要自由控制。CountDownLatch
提供了更細粒度的線程間協(xié)作和控制,可以在任意位置進行countDown
()和await()的調(diào)用,更靈活地控制線程的流程。
CountDownLatch程序?qū)崿F(xiàn)
上面說了很多CountDownLatch
的示例和與join比較,也提了一下CountDownLatch
底層的原理,下面就看一下如何實現(xiàn)一個簡單的CountDownLatch
程序
我們先新建一個抽象類,包含countDownLatch需要的參數(shù)和方法
public abstract class Latch { ? ?// 控制了多少線程完成后門閥才能打開 ? ?protected int limit; ? ? ?// 構(gòu)造函數(shù) ? ?public Latch(int limit){ ? ? ? ?this.limit = limit; ? } ? ? ?// 方法使得線程一直等待 ? ?public abstract void await() throws InterruptedException; ? ? ?// 當(dāng)前任務(wù)線程完成工作之后調(diào)用該方法使得計數(shù)器減一 ? ?public abstract void countDown(); ? ? ?// 獲取當(dāng)前還有多少個線程沒有完成任務(wù) ? ?public abstract int getUnArrived(); ? }
然后實現(xiàn)這個抽象類,并寫入具體邏輯代碼
public class CountDownLatch extends Latch { ? ? ?private final Lock lock = new ReentrantLock(); ? ?private final Condition condition = lock.newCondition(); ? ? ? ?public CountDownLatch(int limit) { ? ? ? ?super(limit); ? } ? ? ?@Override ? ?public void await() throws InterruptedException { ? ? ? ?lock.lock(); ? ? ? ?while (limit > 0){ ? ? ? ? ? ?condition.await(); ? ? ? } ? ? ? ?lock.unlock(); ? } ? ? ?@Override ? ?public void countDown() { ? ? ? ? ?lock.lock(); ? ? ? ?if(limit < 0){ ? ? ? ? ? ?throw new IllegalStateException(); ? ? ? } ? ? ? ?limit--; ? ? ? ?condition.signalAll(); ? ? ? ? ?lock.unlock(); ? ? } ? ? ?@Override ? ?public int getUnArrived() { ? ? ? ?return limit; ? } }
測試
public class LatchDemo { ? ?public static void main(String[] args) throws InterruptedException { ? ? ? ?Latch latch = new CountDownLatch(2); ? ? ? ? ?ExecutorService executorService = Executors.newFixedThreadPool(2); ? ? ? ? ?executorService.execute(()->{ ? ? ? ? ? ?System.out.println("小明開始出發(fā)"); ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(2); ? ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? ? ?e.printStackTrace(); ? ? ? ? ? } ? ? ? ? ? ?latch.countDown(); ?// 計數(shù)器 -1 ? ? ? ? ? ?System.out.println("小明到達人民廣場"); ? ? ? }); ? ? ? ?executorService.execute(()->{ ? ? ? ? ? ?System.out.println("小紅開始出發(fā)"); ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(3); ? ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? ? ?e.printStackTrace(); ? ? ? ? ? } ? ? ? ? ? ?latch.countDown(); ?// 計數(shù)器 -1 ? ? ? ? ? ?System.out.println("小紅到達人民廣場"); ? ? ? }); ? ? ? ? ? ?latch.await(); ? ? ? ?System.out.println("小明和小紅都到達了人民廣場,開始一起出發(fā)去爬山"); ? ? ? ? ?executorService.shutdown(); ? ? } }
結(jié)果
小明開始出發(fā)
小紅開始出發(fā)
小明到達人民廣場
小紅到達人民廣場
小明和小紅都到達了人民廣場,開始一起出發(fā)去爬山
?
Process finished with exit code 0
可以看到結(jié)果如前文一致,這就實現(xiàn)了一個簡單的CountDownLatch
,當(dāng)然具體實現(xiàn)還有更多的細節(jié),如有需要,請翻閱源碼。
總結(jié)
通過上面的簡單實現(xiàn),我們可以看到CountDownLatch
基于計數(shù)器實現(xiàn)了多線程之間的門閥攔截,底層還是通過線程之間的通訊、鎖和計數(shù)器控制。
CyclicBarrier
除了使用CountDownLatch
來實現(xiàn)多線程之間的阻塞同步,也可以使用CyclicBarrier
來實現(xiàn),并且CyclicBarrier
提供了比CountDownLatch
更強大的功能。
CyclicBarrier
的字面意思是可循環(huán)使用的屏障。它提供了一種同步機制,使一組線程能夠在達到屏障時被阻塞,直到最后一個線程到達屏障時,屏障才會開啟,所有被阻塞的線程才能繼續(xù)執(zhí)行。
網(wǎng)上找的一張示意圖
示例
還是用之前的例子,模擬小明和小紅去爬山,代碼如下,結(jié)果就不贅述了。
public static void main(String[] args) throws InterruptedException, BrokenBarrierException { ? ? ?CyclicBarrier cyclicBarrier = new CyclicBarrier(3); ? ? ?ExecutorService executorService = Executors.newFixedThreadPool(2); ? ? ?executorService.execute(()->{ ? ? ? ?System.out.println("小明開始出發(fā)"); ? ? ? ?try { ? ? ? ? ? ?TimeUnit.SECONDS.sleep(2); ? ? ? ? ? ?System.out.println("小明到達人民廣場"); ? ? ? ? ? ?cyclicBarrier.await(); // 計數(shù)器 -1 ? ? ? } catch (Exception e) { ? ? ? ? ? ?e.printStackTrace(); ? ? ? } ? ? }); ? ?executorService.execute(()->{ ? ? ? ?System.out.println("小紅開始出發(fā)"); ? ? ? ?try { ? ? ? ? ? ?TimeUnit.SECONDS.sleep(3); ? ? ? ? ? ?System.out.println("小紅到達人民廣場"); ? ? ? ? ? ?cyclicBarrier.await(); // 計數(shù)器 -1 ? ? ? } catch (Exception e) { ? ? ? ? ? ?e.printStackTrace(); ? ? ? } ? }); ? ?cyclicBarrier.await(); ? ?System.out.println("小明和小紅都到達了人民廣場,開始一起出發(fā)去爬山"); ? ? ?executorService.shutdown(); ? }
不同的是,這里我設(shè)置了三個屏障點 cyclicBarrier.await()
;,而使用CountDownLatch
只用了兩個計數(shù)器減一操作 + 一個wait()方法,使用起來很相似,我們說cyclicBarrier
比 CountDownLatch
功能更強大,那么強大在哪里呢?
重置計數(shù)器和獲取狀態(tài)
重置計數(shù)器
CountDownLatch
的計數(shù)器只能使用一次,而CyclicBarrier
的計數(shù)器可以使用reset()
方法重置。所以CyclicBarrier
能處理更為復(fù)雜的業(yè)務(wù)場景。例如,如果計算發(fā)生錯誤,可以重置計數(shù)器,并讓線程重新執(zhí)行一次。
public static void main(String[] args) throws InterruptedException, BrokenBarrierException { ? ? ?final int threadNum = 3; ? ?ExecutorService executorService = Executors.newFixedThreadPool(threadNum); ? ?CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum, () -> { ? ? ? ?System.out.println("所有線程都到達屏障"); ? }); ? ? ?for (int i = 0; i < threadNum; i++) { ? ? ? ?executorService.execute(() -> { ? ? ? ? ? ?System.out.println(Thread.currentThread().getName() + " 到達屏障"); ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?cyclicBarrier.await(); ? ? ? ? ? } catch (InterruptedException | BrokenBarrierException e) { ? ? ? ? ? ? ? ?e.printStackTrace(); ? ? ? ? ? } ? ? ? }); ? } ? ? ?Thread.sleep(2000); // 等待一段時間,確保所有線程都到達屏障 ? ? ?cyclicBarrier.reset(); // 重置屏障 ? ? ?System.out.println("屏障已重置"); ? ? ?for (int i = 0; i < threadNum-1; i++) { ? ? ? ?executorService.execute(() -> { ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(1); ? ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? ? ?e.printStackTrace(); ? ? ? ? ? } ? ? ? ? ? ?System.out.println(Thread.currentThread().getName() + " 到達屏障"); ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?cyclicBarrier.await(); ? ? ? ? ? } catch (InterruptedException | BrokenBarrierException e) { ? ? ? ? ? ? ? ?e.printStackTrace(); ? ? ? ? ? } ? ? ? }); ? } ? ? ?System.out.println("第二次進入 循環(huán)屏障"); ? ?cyclicBarrier.await(); ? ?System.out.println("第二次循環(huán) 邁過屏障"); ? ? ? ?executorService.shutdown(); ? ? }
結(jié)果
pool-1-thread-2 到達屏障
pool-1-thread-3 到達屏障
pool-1-thread-1 到達屏障
所有線程都到達屏障
屏障已重置
第二次進入 循環(huán)屏障
pool-1-thread-2 到達屏障
pool-1-thread-1 到達屏障
所有線程都到達屏障
第二次循環(huán) 邁過屏障
?
Process finished with exit code 0
先說一下CyclicBarrier
提供的另一個構(gòu)造函數(shù)CyclicBarrier(int parties,Runnable barrierAction)
,用于在線程到達屏障時,優(yōu)先執(zhí)行barrierAction
,也就是上方代碼中用到的這幾段
CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum, () -> { ? ?System.out.println("所有線程都到達屏障"); });
這里用于提示所有的線程到達屏障。緊接著是比較常規(guī)的代碼,循環(huán)構(gòu)造線程并在線程中執(zhí)行了 cyclicBarrier.await();
到達屏障。重點是 cyclicBarrier.reset();
重置屏障后,我留下一個屏障給主線程測試使用,而在新構(gòu)造的線程中停留1s, System.out.println("第二次循環(huán) 邁過屏障");
打印在 System.out.println(Thread.currentThread().getName() + " 到達屏障");
之后,說明屏障計數(shù)器已經(jīng)重置并且生效了。
獲取狀態(tài)
除了上述的基本功能外,CyclicBarrier
也提供了以下API用來查看狀態(tài),
getNumberWaiting
() // 顧名思義,獲取目前正在屏障處阻塞等待的線程數(shù)量。getParties
() // 獲取屏障數(shù)量 也就是我們傳入構(gòu)造函數(shù)中的parties
參數(shù)isBroken
() // 查詢阻塞的線程是否被中斷
以上就是詳解Java并發(fā)工具類之CountDownLatch和CyclicBarrier的詳細內(nèi)容,更多關(guān)于Java CountDownLatch CyclicBarrier的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java 守護線程_動力節(jié)點Java學(xué)院整理
Java語言機制是構(gòu)建在JVM的基礎(chǔ)之上的,意思是Java平臺把操作系統(tǒng)的底層給屏蔽起來,所以它可以在它自己的虛擬的平臺里面構(gòu)造出對自己有利的機制,而語言或者說平臺的設(shè)計者多多少少是收到Unix思想的影響,而守護線程機制又是對JVM這樣的平臺湊合,于是守護線程應(yīng)運而生2017-05-05線程池滿Thread?pool?exhausted排查和解決方案
這篇文章主要介紹了線程池滿Thread?pool?exhausted排查和解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-11-11Spring裝配Bean之用Java代碼安裝配置bean詳解
這篇文章主要給大家介紹了關(guān)于Spring裝配Bean之用Java代碼安裝配置bean的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家學(xué)習(xí)或者使用spring具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧。2017-10-10Java Map如何根據(jù)key取value以及不指定key取出所有的value
這篇文章主要介紹了Java Map如何根據(jù)key取value以及不指定key取出所有的value,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09關(guān)于springcloud報錯報UnsatisfiedDependencyException的問題
這篇文章主要介紹了關(guān)于springcloud報錯報UnsatisfiedDependencyException的問題,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-11-11java 使用URLDecoder和URLEncoder對中文進行處理
這篇文章主要介紹了java 使用URLDecoder和URLEncoder對中文進行處理的相關(guān)資料,需要的朋友可以參考下2017-02-02