Java并發(fā)工具類之CountDownLatch詳解
CountDownLatch
1.概述
CountDownLatch可以使一個獲多個線程等待其他線程各自執(zhí)行完畢后再執(zhí)行。
CountDownLatch 定義了一個計數(shù)器,和一個阻塞隊列, 當計數(shù)器的值遞減為0之前,阻塞隊列里面的線程處于掛起狀態(tài),當計數(shù)器遞減到0時會喚醒阻塞隊列所有線程,這里的計數(shù)器是一個標志,可以表示一個任務一個線程,也可以表示一個倒計時器,CountDownLatch可以解決那些一個或者多個線程在執(zhí)行之前必須依賴于某些必要的前提業(yè)務先執(zhí)行的場景。
2.常用方法
CountDownLatch(int count); //構(gòu)造方法,創(chuàng)建一個值為count 的計數(shù)器。 ? await();//阻塞當前線程,將當前線程加入阻塞隊列。 ? await(long timeout, TimeUnit unit);//在timeout的時間之內(nèi)阻塞當前線程,時間一過則當前線程可以執(zhí)行, ? countDown();//對計數(shù)器進行遞減1操作,當計數(shù)器遞減至0時,當前線程會去喚醒阻塞隊列里的所有線程。
3.應用
我們經(jīng)常會在一個接口中調(diào)用多個第三方接口,然后將結(jié)果返回,其實就可以通過CountDownLatch來實現(xiàn)
public static void main(String[] args) { CountDownLatch count = new CountDownLatch(3); Thread thread = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep((int)(Math.random()*1000)); System.out.println("獲取接口一的數(shù)據(jù)"); count.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread thread2 = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep((int)(Math.random()*1000)); System.out.println("獲取接口二的數(shù)據(jù)"); count.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread thread3 = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep((int)(Math.random()*1000)); System.out.println("獲取接口三的數(shù)據(jù)"); count.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }); thread.start(); thread2.start(); thread3.start(); try { count.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("執(zhí)行成功"); }
4.實現(xiàn)原理
(1)創(chuàng)建計數(shù)器
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count);//創(chuàng)建同步隊列,并設置初始計數(shù)器值 }
(2)Sync類
可以看出該類是繼承AQS的,所以CountDownLatch的實現(xiàn)大多都是通過AQS來實現(xiàn)
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
(3)await方法
當我們調(diào)用countDownLatch.wait()的時候,會創(chuàng)建一個節(jié)點,加入到AQS阻塞隊列,并同時把當前線程掛起,其實就是調(diào)用共享模式下的鎖獲取,詳情看AQS文章
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
在Sync類重寫的tryAcquireShared()方法中g(shù)etState()只有等于0才會獲取到鎖,所以當countDownLatch待執(zhí)行的任務數(shù)大于0都會堵塞該線程直到所有任務都完成
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
AQS中觸發(fā)堵塞線程的源碼:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //新建節(jié)點加入阻塞隊列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //獲得當前節(jié)點pre節(jié)點 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg);//返回鎖的state if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //重組雙向鏈表,清空無效節(jié)點,掛起當前線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
(4)countDown()方法
當我們調(diào)用countDownLatch.countDown()方法的時候,會對計數(shù)器進行減1操作,AQS內(nèi)部是通過釋放鎖的方式,對state進行減1操作,當state=0的時候證明計數(shù)器已經(jīng)遞減完畢,此時會將AQS阻塞隊列里的節(jié)點線程全部喚醒。
public void countDown() { //遞減鎖重入次數(shù),當state=0時喚醒所有阻塞線程 sync.releaseShared(1); }
public final boolean releaseShared(int arg) { //遞減鎖的重入次數(shù) if (tryReleaseShared(arg)) { doReleaseShared();//喚醒隊列所有阻塞的節(jié)點 return true; } return false; } private void doReleaseShared() { //喚醒所有阻塞隊列里面的線程 for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {//節(jié)點是否在等待喚醒狀態(tài) if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//修改狀態(tài)為初始 continue; unparkSuccessor(h);//成功則喚醒線程 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
到此這篇關(guān)于Java并發(fā)工具類之CountDownLatch詳解的文章就介紹到這了,更多相關(guān)CountDownLatch詳解內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot整合SpringSecurity和JWT和Redis實現(xiàn)統(tǒng)一鑒權(quán)認證
Spring Security是一個可以為Java應用程序提供全面安全服務的框架,同時它也可以輕松擴展以滿足自定義需求,本文主要介紹了SpringBoot整合SpringSecurity和JWT和Redis實現(xiàn)統(tǒng)一鑒權(quán)認證,感興趣的可以了解一下2023-11-11HashMap原理及put方法與get方法的調(diào)用過程
這篇文章主要介紹了HashMap原理及put方法與get方法的調(diào)用過程,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09MultipartFile中transferTo(File file)的路徑問題及解決
這篇文章主要介紹了MultipartFile中transferTo(File file)的路徑問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07Java中如何使用?byte?數(shù)組作為?Map?的?key
本文將討論在使用HashMap時,當byte數(shù)組作為key時所遇到的問題及其解決方案,介紹使用String和List這兩種數(shù)據(jù)結(jié)構(gòu)作為臨時解決方案的方法,感興趣的朋友跟隨小編一起看看吧2023-06-06