Java并發(fā)編程之CountDownLatch的使用
前言
CountDownLatch是一個倒數(shù)的同步器,和其他同步器不同的是,state為0時表示獲取鎖成功。常用來讓一個線程等待其他N個線程執(zhí)行完成再繼續(xù)向下執(zhí)行,比如主線程等待多個請求返回結(jié)果之后再進(jìn)行匯總處理。
基本使用
public class Test { // 定義CountDownLatch private static CountDownLatch latch = new CountDownLatch(2); public static void main(String[] args) { new Thread(() -> { try { // do something... latch.countDown(); // 計數(shù)減一 } catch (InterruptException e) { // 處理異常 } }, "t1").start(); new Thread(() -> { try { // do something... latch.countDown(); // 計數(shù)減一 } catch (InterruptException e) { // 處理異常 } }, "t2").start(); } latch.await(); // 主線程進(jìn)行阻塞,等待兩個子線程執(zhí)行完 System.out.println("子線程都結(jié)束了..."); ???????}
上面的示例演示了CountDownLatch的一般使用流程:
首先定義一個CountDownLatch對象,并指定計數(shù)的初始值為2;
創(chuàng)建兩個子線程分別去處理任務(wù),完成之后調(diào)用latch.countDownLatch()對計數(shù)進(jìn)行減1;
主線程調(diào)用latch.await()方法進(jìn)行等待,計數(shù)編程1之后會喚醒主線程繼續(xù)向下執(zhí)行。
await
嘗試獲取鎖
// CountDownLatch public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // AQS public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // CountDownLatch.Sync protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
CountDownLatch的await調(diào)用的是AQS的acquireSharedInterruptibly方法,根據(jù)名稱可以推斷是共享模式并且可以中斷,所以首先判斷是否已經(jīng)產(chǎn)生了中斷,是的話就拋出異常。
沒有中斷就通過tryAcquireShared方法嘗試去獲取鎖,如果state的值是0表示獲取鎖成功,返回1,失敗就返回-1。
獲取鎖失敗
private void doAcquireSharedInterruptibly(int arg)vthrows InterruptedException { final Node node = addWaiter(Node.SHARED); // 添加節(jié)點到隊列 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); // 獲取后繼節(jié)點 if (p == head) { int r = tryAcquireShared(arg); // 嘗試獲取鎖 if (r >= 0) { setHeadAndPropagate(node, r); // 更新頭節(jié)點并喚醒后繼節(jié)點 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 是否可以阻塞,可以就阻塞 throw new InterruptedException(); // 阻塞被中斷拋出異常 } } finally { if (failed) cancelAcquire(node); } }
獲取鎖失敗會執(zhí)行doAcquireSharedInterruptibly方法,主要流程為:
1.當(dāng)前線程封裝成Node
節(jié)點添加到隊列中;
2.如果前驅(qū)節(jié)點是頭節(jié)點,就嘗試去獲取鎖:
- 獲取鎖成功,更新頭節(jié)點并喚醒后繼節(jié)點;
- 獲取鎖失敗,進(jìn)入第
3
步阻塞流程;
3.判斷是否可以阻塞:
- 可以阻塞,就通過
park
進(jìn)行阻塞; - 不可以阻塞,重新回到第
2
步
添加節(jié)點
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 創(chuàng)建Node Node pred = tail; // 尾節(jié)點 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { // 更新尾節(jié)點 pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 初始化隊列 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { // 更新尾節(jié)點 t.next = node; return t; } } } }
首先根據(jù)當(dāng)前線程和等待模式創(chuàng)建一個Node節(jié)點對象,如果隊列有尾節(jié)點的話,就直接把當(dāng)前節(jié)點添加到尾節(jié)點的后面,成為新的尾節(jié)點;如果沒有尾節(jié)點就先初始化隊列,然后再把當(dāng)前節(jié)點添加到尾節(jié)點后面。
更新頭結(jié)點并喚醒后繼節(jié)點
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
如果前驅(qū)節(jié)點是頭節(jié)點head,表明當(dāng)前節(jié)點的位置有資格去獲取鎖,于是調(diào)用tryAcquireShared方法嘗試獲取鎖,如果成功了會返回1,并作為參數(shù)propagate傳遞給setHeadAndPropagate方法。首先把當(dāng)前節(jié)點更新為頭節(jié)點head,然后如果后面有等待的共享節(jié)點,就嘗試喚醒后繼節(jié)點。
阻塞等待
// 判斷是否可以阻塞 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } // 阻塞等待 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
沒有資格獲取鎖或者獲取鎖失敗,就會進(jìn)入阻塞流程。因為阻塞的節(jié)點是通過前驅(qū)節(jié)點來喚醒的,所以需要找到一個waitStatus為Node.SIGNAL的前驅(qū)節(jié)點,才能進(jìn)入阻塞,如果沒有符合條件的前驅(qū)節(jié)點就重新嘗試去獲取鎖,不進(jìn)行阻塞,一直重試直到獲取鎖成功為止。如果找到了符合的前驅(qū)節(jié)點,就通過LockSupport.park(this)阻塞當(dāng)前線程。
countDown方法
public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
因為CountDownLatch允許多個線程同時持有鎖,所以是屬于共享模式,通過countDown()方法釋放鎖時調(diào)用的是releaseShared方法,執(zhí)行流程為:
- 嘗試釋放鎖
- 釋放鎖成功,喚醒后繼節(jié)點
釋放鎖
protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
釋放鎖的流程很簡單,如果state已經(jīng)為0了說明已經(jīng)沒有鎖了,釋放鎖就失敗了;否則就讓state減1表示減少一次鎖,然后更新鎖數(shù)量。這里和其他同步器不同的是,只有鎖數(shù)量為0了在阻塞的線程才能去獲取鎖,所以返回的是nextc == 0。
釋放鎖成功
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { // 隊列中有等待的節(jié)點 int ws = h.waitStatus; if (ws == Node.SIGNAL) { // 頭節(jié)點可以喚醒后繼節(jié)點 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 防止多個線程同時喚醒 continue; unparkSuccessor(h); // 執(zhí)行喚醒操作 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
釋放鎖成功后會執(zhí)行doReleaseShared方法,如果隊列中有等待的節(jié)點,并且頭節(jié)點標(biāo)記為可以喚醒,那就嘗試去喚醒后繼節(jié)點,被喚醒的節(jié)點如果獲取鎖成功了會成為新的頭節(jié)點,這里的h==head條件為false,因為阻塞的節(jié)點是共享模式,多個節(jié)點都可以去獲取鎖,所以就繼續(xù)嘗試去喚醒后繼節(jié)點。
喚醒后繼節(jié)點
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
如果頭節(jié)點的后繼節(jié)點不可用了,就不能通過這個節(jié)點繼續(xù)向后查找,所以這里采用的是倒序查找的方法,最終找到一個離頭節(jié)點head最近的可以被喚醒的節(jié)點,然后調(diào)用LockSupport.unpark(s.thread)喚醒該節(jié)點。
以上就是Java并發(fā)編程之CountDownLatch的使用的詳細(xì)內(nèi)容,更多關(guān)于Java CountDownLatch的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
dubbo服務(wù)引用之創(chuàng)建Invoker流程詳解
這篇文章主要為大家介紹了dubbo服務(wù)引用二之創(chuàng)建Invoker流程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08

Java的接口調(diào)用時的權(quán)限驗證功能的實現(xiàn)

Java JSON轉(zhuǎn)成List結(jié)構(gòu)數(shù)據(jù)