Java中的CyclicBarrier循環(huán)柵欄深入解析
關(guān)于 CyclicBarrier
Java JUC 包中提供類似的工具,可以設(shè)置一個簡單的集合點,等所有成員到齊了之后,再執(zhí)行下一步操作。CycleBarrier 它就相當于是一個柵欄,所有線程在到達柵欄后都需要等待其他線程,等所有線程都到達后,再一起通過。
CyclicBarrier 可以有不止一個柵欄,因為它的柵欄(Barrier)可以重復使用(Cyclic)。
CycleBarrier 特別適用于并行迭代計算,每個線程負責一部分計算,然后在柵欄處等待其他線程完成,所有線程到齊后,交換數(shù)據(jù)和計算結(jié)果,再進行下一次迭代。
初步理解
CyclicBarrier 有一個 parties 成員變量,表示參與的線程的個數(shù),下面來看一個例子,我們通過這個例子來理解下 Cycle 的含義:
public class Demo1 { public static void main(String[] args) { // 初始化一個 CyclicBarrier,大小為 5,當 5 個線程滿足條件之后,就執(zhí)行一次:執(zhí)行完畢! CyclicBarrier cyclicBarrier = new CyclicBarrier( 5, () -> { System.out.println("執(zhí)行完畢!"); }); // 為了測試是否能循環(huán),這里循環(huán)設(shè)置為 20,預(yù)期的結(jié)果應(yīng)該是會執(zhí)行 4 次:執(zhí)行完畢! for (int i=0; i<20; i++) { int temp = i; new Thread(() -> { System.out.println(Thread.currentThread().getName() + ", 執(zhí)行到:" + temp ); try{ cyclicBarrier.await(); // 等待其他線程到齊,到齊了之后就會執(zhí)行一次 } catch (BrokenBarrierException | InterruptedException e) { e.printStackTrace(); } }).start(); } } }
程序輸出
由于多線程輸出,我是多刷了幾遍(也不知道刷了多少遍了)就會出現(xiàn)下面的執(zhí)行結(jié)果,這里就可以看出來效果了,這里有 20個線程,5 個線程為一組(如果是按照源碼上面來說,就是 5 個線程 parties 為一代 generation ),可以滿足我們的預(yù)期。
每次參與的線程數(shù)為 5,然后當 5個線程都執(zhí)行完畢之后,再進入下一代的計算,看到這里應(yīng)該還是比較容易理解的。
源碼探究
成員變量
首先,我們看下 CycleBarrier 的成員變量,
//同步操作鎖 private final ReentrantLock lock = new ReentrantLock(); //線程攔截器 private final Condition trip = lock.newCondition(); //每次攔截的線程數(shù) private final int parties; //換代前執(zhí)行的任務(wù) private final Runnable barrierCommand; //表示柵欄的當前代 private Generation generation = new Generation(); //計數(shù)器 private int count; //靜態(tài)內(nèi)部類Generation private static class Generation { Generation() {} // 防止訪問構(gòu)造函數(shù)創(chuàng)建 boolean broken; // 初始化為 false }
這里說明下:
- trip 是一個 condition,就是用來阻塞與喚醒線程使用的。
- 兩個 int 類型的變量 parties 與 count,在初始化 CycleBarrier 的時候,兩個值的大小是一樣的,之后隨著線程的 await 方法的調(diào)用而 -1,直到減到 0 ,就將所有的線程全部喚醒。
- 內(nèi)部靜態(tài)變量 Generation,表示當前柵欄的代,上面的例子中,5個線程就為一代,執(zhí)行完了,就進行下一代。
- barrierCommond,表示換代前執(zhí)行的任務(wù),當 count 減為 0 時表示本次結(jié)束。在轉(zhuǎn)到下一局游戲之前,將所有的線程喚醒,在喚醒之前可以通過指定的 barrierCommond 來執(zhí)行自己的任務(wù)。
構(gòu)造器
接著,我們看下構(gòu)造器相關(guān)的源碼,這里就可以看到 count 與 parties 的初始化的時候就是一致的
//構(gòu)造器,這里就是我們上面那個例子使用的構(gòu)造器 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } // 構(gòu)造器,只傳入一個 parties 的大小,不進行自定義任務(wù) public CyclicBarrier(int parties) { this(parties, null); }
await() 方法
這里我們看下 await() 方法,在上面的例子中,是使用 await() 方法等待其他線程到齊,再去執(zhí)行后面的操作
// 非定時等待 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); } } // 定時等待 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
看源碼之后,發(fā)現(xiàn) await() 方法最后都是調(diào)用 dowait() 方法,我們就直接看下 核心的 dowait 方法的執(zhí)行邏輯,會發(fā)現(xiàn)這塊并沒有特別晦澀難懂,那么我們就一起繼續(xù)向下讀
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); // 開始上鎖 try { final Generation g = generation; if (g.broken) // 判斷是否當前代被打破 throw new BrokenBarrierException(); // 檢查當前線程是否被中斷 if (Thread.interrupted()) { // 如果中斷了,就會執(zhí)行下面三件事 // 1. 將 generation 的 broken 標記設(shè)置為 true // 2. 重置 count 值 // 3. 喚醒當前攔截的線程 breakBarrier(); throw new InterruptedException(); } int index = --count; // 每次都將計數(shù)器的值減 1 if (index == 0) { // // 計數(shù)器的值減為0,就會喚醒所有線程并轉(zhuǎn)換到下一代 Runnable command = barrierCommand; if (command != null) { // // 如果存在自定義的任務(wù),這里就會做判斷,在喚醒所有線程之前執(zhí)行 try { command.run(); } catch (Throwable ex) { breakBarrier(); throw ex; } } nextGeneration(); // 喚醒所有線程,并轉(zhuǎn)到下一代 return 0; } //官方注釋: loop until tripped, broken, interrupted, or timed out //這里循環(huán)是:如果計數(shù)器不為0則執(zhí)行此循環(huán) for (;;) { try { // 根據(jù)傳入的參數(shù)來決定是定時等待還是非定時等待 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { // 如果當前線程在等待期間被中斷,則打破柵欄喚醒其他線程 breakBarrier(); throw ie; } else { //若在捕獲中斷異常前已經(jīng)完成在柵欄上的等待, 則直接調(diào)用中斷操作 Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) //如果線程因為換代操作而被喚醒則返回計數(shù)器的值 return index; // 如果是因為時間到了而被喚醒則打破柵欄,拋出異常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); // 解鎖 } }
nextGeneration() 與 breakBarrier()
后面,我們看下剛剛上面提到過的切換代 nextGeneration(), 打破柵欄方法 breakBarrier()
//切換柵欄到下一代 private void nextGeneration() { //喚醒條件隊列所有線程 trip.signalAll(); //設(shè)置計數(shù)器的值為需要攔截的線程數(shù) count = parties; //重新設(shè)置柵欄代次 generation = new Generation(); } //打破當前柵欄 private void breakBarrier() { //將當前柵欄狀態(tài)設(shè)置為打翻 generation.broken = true; //設(shè)置計數(shù)器的值為需要攔截的線程數(shù) count = parties; //喚醒所有線程 trip.signalAll(); }
reset()
最后,我們看下怎么重置,這里其實就是調(diào)用了上面說的 切換代 nextGeneration() 與 breakBarrier() 方法
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
回顧
最后看一個例子回顧下
public class Demo { static class Tourist extends Thread { CyclicBarrier cyclicBarrier; public Tourist(CyclicBarrier cyclicBarrier){ this.cyclicBarrier = cyclicBarrier; } @Override public void run() { try { // 模擬點先各自獨立運行 Thread.sleep( (int) (Math.random() * 1000) ); // 集合點 A cyclicBarrier.await(); System.out.println(this.getName() + ",到達集合點 A " + System.currentTimeMillis() ); // 集合后模擬再各自獨立運行 Thread.sleep( (int) (Math.random() * 1000)); // 集合點 B cyclicBarrier.await(); System.out.println(this.getName() + ",到達集合點 B " + System.currentTimeMillis() ); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } public static void main(String[] args) { // 定義出多個 線程 int num = 3; Tourist[] tourist = new Tourist[num]; CyclicBarrier barrier = new CyclicBarrier(3, () -> { System.out.println("集合完畢: " + System.currentTimeMillis() + " 執(zhí)行的線程為: " + Thread.currentThread().getName()); }); for (int i = 0; i< num; i++){ tourist[i] = new Tourist(barrier); tourist[i].start(); } } }
到此這篇關(guān)于Spring中的CyclicBarrier循環(huán)柵欄深入解析的文章就介紹到這了,更多相關(guān)CyclicBarrier循環(huán)柵欄內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
spring boot 項目中使用thymeleaf模板的案例分析
這篇文章主要介紹了spring boot 項目中使用thymeleaf模板的案例分析,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-09-09Spring?Cloud?Hystrix?服務(wù)降級限流策略詳解
這篇文章主要為大家介紹了Spring?Cloud?Hystrix?服務(wù)降級限流策略詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-01-01Hibernate雙向一對一映射關(guān)系配置代碼實例
這篇文章主要介紹了Hibernate雙向一對一映射關(guān)系配置代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-10-10