Java中的CyclicBarrier循環(huán)柵欄深入解析
關于 CyclicBarrier
Java JUC 包中提供類似的工具,可以設置一個簡單的集合點,等所有成員到齊了之后,再執(zhí)行下一步操作。CycleBarrier 它就相當于是一個柵欄,所有線程在到達柵欄后都需要等待其他線程,等所有線程都到達后,再一起通過。
CyclicBarrier 可以有不止一個柵欄,因為它的柵欄(Barrier)可以重復使用(Cyclic)。
CycleBarrier 特別適用于并行迭代計算,每個線程負責一部分計算,然后在柵欄處等待其他線程完成,所有線程到齊后,交換數據和計算結果,再進行下一次迭代。

初步理解
CyclicBarrier 有一個 parties 成員變量,表示參與的線程的個數,下面來看一個例子,我們通過這個例子來理解下 Cycle 的含義:
public class Demo1 {
public static void main(String[] args) {
// 初始化一個 CyclicBarrier,大小為 5,當 5 個線程滿足條件之后,就執(zhí)行一次:執(zhí)行完畢!
CyclicBarrier cyclicBarrier = new CyclicBarrier( 5, () -> {
System.out.println("執(zhí)行完畢!");
});
// 為了測試是否能循環(huán),這里循環(huán)設置為 20,預期的結果應該是會執(zhí)行 4 次:執(zhí)行完畢!
for (int i=0; i<20; i++) {
int temp = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + ", 執(zhí)行到:" + temp );
try{
cyclicBarrier.await(); // 等待其他線程到齊,到齊了之后就會執(zhí)行一次
} catch (BrokenBarrierException | InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}程序輸出
由于多線程輸出,我是多刷了幾遍(也不知道刷了多少遍了)就會出現下面的執(zhí)行結果,這里就可以看出來效果了,這里有 20個線程,5 個線程為一組(如果是按照源碼上面來說,就是 5 個線程 parties 為一代 generation ),可以滿足我們的預期。
每次參與的線程數為 5,然后當 5個線程都執(zhí)行完畢之后,再進入下一代的計算,看到這里應該還是比較容易理解的。

源碼探究
成員變量
首先,我們看下 CycleBarrier 的成員變量,
//同步操作鎖
private final ReentrantLock lock = new ReentrantLock();
//線程攔截器
private final Condition trip = lock.newCondition();
//每次攔截的線程數
private final int parties;
//換代前執(zhí)行的任務
private final Runnable barrierCommand;
//表示柵欄的當前代
private Generation generation = new Generation();
//計數器
private int count;
//靜態(tài)內部類Generation
private static class Generation {
Generation() {} // 防止訪問構造函數創(chuàng)建
boolean broken; // 初始化為 false
}這里說明下:
- trip 是一個 condition,就是用來阻塞與喚醒線程使用的。
- 兩個 int 類型的變量 parties 與 count,在初始化 CycleBarrier 的時候,兩個值的大小是一樣的,之后隨著線程的 await 方法的調用而 -1,直到減到 0 ,就將所有的線程全部喚醒。
- 內部靜態(tài)變量 Generation,表示當前柵欄的代,上面的例子中,5個線程就為一代,執(zhí)行完了,就進行下一代。
- barrierCommond,表示換代前執(zhí)行的任務,當 count 減為 0 時表示本次結束。在轉到下一局游戲之前,將所有的線程喚醒,在喚醒之前可以通過指定的 barrierCommond 來執(zhí)行自己的任務。
構造器
接著,我們看下構造器相關的源碼,這里就可以看到 count 與 parties 的初始化的時候就是一致的
//構造器,這里就是我們上面那個例子使用的構造器
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
// 構造器,只傳入一個 parties 的大小,不進行自定義任務
public CyclicBarrier(int parties) {
this(parties, null);
}await() 方法
這里我們看下 await() 方法,在上面的例子中,是使用 await() 方法等待其他線程到齊,再去執(zhí)行后面的操作
// 非定時等待
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe);
}
}
// 定時等待
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
return dowait(true, unit.toNanos(timeout));
}看源碼之后,發(fā)現 await() 方法最后都是調用 dowait() 方法,我們就直接看下 核心的 dowait 方法的執(zhí)行邏輯,會發(fā)現這塊并沒有特別晦澀難懂,那么我們就一起繼續(xù)向下讀
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock(); // 開始上鎖
try {
final Generation g = generation;
if (g.broken) // 判斷是否當前代被打破
throw new BrokenBarrierException();
// 檢查當前線程是否被中斷
if (Thread.interrupted()) {
// 如果中斷了,就會執(zhí)行下面三件事
// 1. 將 generation 的 broken 標記設置為 true
// 2. 重置 count 值
// 3. 喚醒當前攔截的線程
breakBarrier();
throw new InterruptedException();
}
int index = --count; // 每次都將計數器的值減 1
if (index == 0) { // // 計數器的值減為0,就會喚醒所有線程并轉換到下一代
Runnable command = barrierCommand;
if (command != null) { // // 如果存在自定義的任務,這里就會做判斷,在喚醒所有線程之前執(zhí)行
try {
command.run();
} catch (Throwable ex) {
breakBarrier();
throw ex;
}
}
nextGeneration(); // 喚醒所有線程,并轉到下一代
return 0;
}
//官方注釋: loop until tripped, broken, interrupted, or timed out
//這里循環(huán)是:如果計數器不為0則執(zhí)行此循環(huán)
for (;;) {
try {
// 根據傳入的參數來決定是定時等待還是非定時等待
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
// 如果當前線程在等待期間被中斷,則打破柵欄喚醒其他線程
breakBarrier();
throw ie;
} else {
//若在捕獲中斷異常前已經完成在柵欄上的等待, 則直接調用中斷操作
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation) //如果線程因為換代操作而被喚醒則返回計數器的值
return index;
// 如果是因為時間到了而被喚醒則打破柵欄,拋出異常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock(); // 解鎖
}
}nextGeneration() 與 breakBarrier()
后面,我們看下剛剛上面提到過的切換代 nextGeneration(), 打破柵欄方法 breakBarrier()
//切換柵欄到下一代
private void nextGeneration() {
//喚醒條件隊列所有線程
trip.signalAll();
//設置計數器的值為需要攔截的線程數
count = parties;
//重新設置柵欄代次
generation = new Generation();
}
//打破當前柵欄
private void breakBarrier() {
//將當前柵欄狀態(tài)設置為打翻
generation.broken = true;
//設置計數器的值為需要攔截的線程數
count = parties;
//喚醒所有線程
trip.signalAll();
}reset()
最后,我們看下怎么重置,這里其實就是調用了上面說的 切換代 nextGeneration() 與 breakBarrier() 方法
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}回顧
最后看一個例子回顧下
public class Demo {
static class Tourist extends Thread {
CyclicBarrier cyclicBarrier;
public Tourist(CyclicBarrier cyclicBarrier){
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
// 模擬點先各自獨立運行
Thread.sleep( (int) (Math.random() * 1000) );
// 集合點 A
cyclicBarrier.await();
System.out.println(this.getName() + ",到達集合點 A " + System.currentTimeMillis() );
// 集合后模擬再各自獨立運行
Thread.sleep( (int) (Math.random() * 1000));
// 集合點 B
cyclicBarrier.await();
System.out.println(this.getName() + ",到達集合點 B " + System.currentTimeMillis() );
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
// 定義出多個 線程
int num = 3;
Tourist[] tourist = new Tourist[num];
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("集合完畢: " + System.currentTimeMillis() + " 執(zhí)行的線程為: " + Thread.currentThread().getName());
});
for (int i = 0; i< num; i++){
tourist[i] = new Tourist(barrier);
tourist[i].start();
}
}
}到此這篇關于Spring中的CyclicBarrier循環(huán)柵欄深入解析的文章就介紹到這了,更多相關CyclicBarrier循環(huán)柵欄內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
spring boot 項目中使用thymeleaf模板的案例分析
這篇文章主要介紹了spring boot 項目中使用thymeleaf模板的案例分析,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-09-09
Spring?Cloud?Hystrix?服務降級限流策略詳解
這篇文章主要為大家介紹了Spring?Cloud?Hystrix?服務降級限流策略詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-01-01

