深度解析Java中CountDownLatch的原理
在高并發(fā)編程中,AbstractQueuedSynchronizer(簡稱AQS)抽象的隊列同步器是我們必須掌握的,AQS底層提供了二種鎖模式
- 獨占鎖:ReentrantLock就是基于獨占鎖模式實現(xiàn)的
- 共享鎖:CountDownLatch,ReadWriteLock,Semplere都是基于共享鎖模式實現(xiàn)的
接下來我們通過CountDownLatch底層實現(xiàn)原理來了解AQS共享鎖模式的實現(xiàn)原理
CountDownLatch用法
CountDownLatch一般是在需要等待多個線程全部執(zhí)行完畢之后才繼續(xù)執(zhí)行剩下的業(yè)務邏輯,舉個例子,比如你現(xiàn)在去餐廳吃飯點了份辣子雞。
這時候餐廳有處理雞塊的,有配置調(diào)料的,還有燒菜的等多個廚師一起協(xié)作最后才能完成一道辣子雞,而且這幾個步驟可以是一起執(zhí)行的。一個廚師在配置調(diào)料的同時,另外一個廚師正在處理雞塊,還有一個廚師正在熱油等。
但是作為顧客的我們來說,我們必須等到這幾個廚師全部執(zhí)行完畢之后我們才能吃到辣子雞
public static void main(String[] args) throws Exception{
CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(() -> {
System.out.println("處理雞塊");
try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
countDownLatch.countDown();
}).start();
new Thread(() -> {
System.out.println("配置調(diào)料");
try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
countDownLatch.countDown();
}).start();
new Thread(() -> {
System.out.println("起鍋熱油");
try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
countDownLatch.countDown();
}).start();
//會阻塞,等待所有的線程執(zhí)行結束之后才會繼續(xù)執(zhí)行剩下的邏輯
countDownLatch.await();
//執(zhí)行剩下業(yè)務邏輯
}首先我們看 countDownLatch.await(); 這段阻塞的代碼,看下底層是如何讓線程進入阻塞等待的
進入之后到CountDownLatch類中,然后繼續(xù)這個方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}此時就會進去AQS的內(nèi)部實現(xiàn)中
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}首先我們看下 tryAcquireShared(arg) < 0 這個判斷是干嘛的,他是進入到CountDownLatch的類中,這里判斷 state的值是否等于0,在初始化 CountDownLatch 的時候,我們將state的值初始化成了3,只有當執(zhí)行一次 countDownLatch.countDown(); 的時候,這個值才會減1,但是此時我們的線程還沒有執(zhí)行結束,所以這個值不會等于0,那么這時候就會返回 -1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}返回-1以后,就會執(zhí)行 doAcquireSharedInterruptibly(arg); 這個業(yè)務邏輯了
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//創(chuàng)建一個新的共享的Node節(jié)點
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//嘗試判斷state是否已經(jīng)等于0了,如果是,那么主線程就不用阻塞了,
//可以繼續(xù)執(zhí)行了,以此來提高程序性能
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //這里才是真正讓主線程阻塞的核心方法
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}我們看一下這里的 addWaiter(Node.SHARED)方法
private Node addWaiter(Node mode) {
//把當前線程,也就是main線程封裝成一個Node,并設置成共享模式
Node node = new Node(Thread.currentThread(), mode);
//在第一次的時候,這個tail節(jié)點是為null的
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//初始化鏈表
enq(node);
return node;
}分析下初始化雙向鏈表邏輯
private Node enq(final Node node) {
for (;;) { //注意:這里是死循環(huán)
Node t = tail;
//第一次進來,因為tail=null,所以會進入到if里面去
if (t == null) { // Must initialize
//這里新創(chuàng)建一個空的Node節(jié)點
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}第一次進來:因為第一次進來的時候tail=null,所以會進入到if中去,然后創(chuàng)建一個新的空的節(jié)點,然后將頭節(jié)點和尾節(jié)點都指向這個節(jié)點

然后進入第二次循環(huán):這時候tail已經(jīng)不為空了,所以會進入到else分支里面去,所以的操作就是將當前線程封裝成的Node設置尾巴節(jié)點,然后設置前置節(jié)點和后置節(jié)點的關系

現(xiàn)在回頭addWaiter()方法已經(jīng)清楚了,繼續(xù)分析剩下的邏輯
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//創(chuàng)建一個新的共享的Node節(jié)點
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) { //這里還是個死循環(huán)
//拿到頭節(jié)點
final Node p = node.predecessor();
if (p == head) {
//繼續(xù)判斷state的值是否等于0,如果已經(jīng)等于0了,那么主線程就不需要阻塞等待了,可以繼續(xù)執(zhí)行了
int r = tryAcquireShared(arg);
//如果state的值等于0,這里r=1,不等于0,r=-1
//我們假設現(xiàn)在就是不等于0,也就是其它線程還沒有執(zhí)行結束,所以不會進入到if
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}進入 shouldParkAfterFailedAcquire()方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//這里獲取Node的waitStatus,在Node初始化之后,默認是是0,
//所以會進入到 else 分支里面去,將Node的waitStatus的值修改成Node.SIGNAL
//但是在上一步中是一個死循環(huán),所以會再次進入到這個方法中,這時候waitStatus的值是Node.SIGNAL
//所以會進入到第一個if分支里面去,最后返回true
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}這時候shouldParkAfterFailedAcquire()方法返回了true,就會執(zhí)行 parkAndCheckInterrupt()方法了
private final boolean parkAndCheckInterrupt() {
//真正讓線程阻塞的核心方法
LockSupport.park(this);
return Thread.interrupted();
}當主線程掛起之后,只有全部線程執(zhí)行結束了,才會繼續(xù)執(zhí)行,所以我們來分析下 countDownLatch.countDown();
public void countDown() {
sync.releaseShared(1);
}public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}進入tryReleaseShared(arg)方法,是判斷state是否等于0的,
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
//第一次進來,因為state=3,所以不會進入if,只有在初始化的時候將state設置成0,
//或者你有10個資源,但是有11個線程來獲取資源,最后一個線程進來的時候也會等于0
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}一直到第三次進來之后,nextc就會等于0,因為一共減了三次1,也就是最后一個線程執(zhí)行到這里來了,最后返回true,返回true以后就會執(zhí)行doReleaseShared();方法了
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 核心方法,喚醒阻塞線程,這里傳入的是頭節(jié)點
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//拿到真正封裝了當前線程的Node
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 執(zhí)行喚醒操作
LockSupport.unpark(s.thread);
}以上就是深度解析Java中CountDownLatch的原理的詳細內(nèi)容,更多關于Java CountDownLatch的資料請關注腳本之家其它相關文章!
相關文章
Java并發(fā)(Runnable+Thread)實現(xiàn)硬盤文件搜索功能
這篇文章主要介紹了Java并發(fā)(Runnable+Thread)實現(xiàn)硬盤文件搜索,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01
java中利用List的subList方法實現(xiàn)對List分頁(簡單易學)
本篇文章主要介紹了java中l(wèi)ist數(shù)據(jù)拆分為sublist實現(xiàn)頁面分頁的簡單代碼,具有一定的參考價值,有需要的可以了解一下。2016-11-11

