Java多線(xiàn)程之同步工具類(lèi)CyclicBarrier
前言:
CyclicBarrier
是一個(gè)同步工具類(lèi),它允許一組線(xiàn)程互相等待,直到達(dá)到某個(gè)公共屏障點(diǎn)。與CountDownLatch
不同的是該barrier在釋放線(xiàn)程等待后可以重用,所以它稱(chēng)為循環(huán)(Cyclic
)的屏障(Barrier
)。
CyclicBarrier支持一個(gè)可選的Runnable
命令,在一組線(xiàn)程中的最后一個(gè)線(xiàn)程到達(dá)之后(但在釋放所有線(xiàn)程之前),該命令只在每個(gè)屏障點(diǎn)運(yùn)行一次。若再繼續(xù)所有的參與線(xiàn)程之前更新共享狀態(tài),此屏蔽操作很有用。
1 CyclicBarrier方法說(shuō)明
CyclicBarrier提供的方法有:
CyclicBarrier
(parties):初始化相互等待的線(xiàn)程數(shù)量的構(gòu)造方法。CyclicBarrier
(parties,Runnable barrierAction):初始化相互等待的線(xiàn)程數(shù)量以及屏障線(xiàn)程的構(gòu)造方法。
屏障線(xiàn)程的運(yùn)行時(shí)機(jī):
等待的線(xiàn)程數(shù)量=parties
之后,CyclicBarrier
打開(kāi)屏障之前。
舉例:在分組計(jì)算中,每個(gè)線(xiàn)程負(fù)責(zé)一部分計(jì)算,最終這些線(xiàn)程計(jì)算結(jié)束之后,交由屏障線(xiàn)程進(jìn)行匯總計(jì)算。
int getParties():獲取CyclicBarrier
打開(kāi)屏障的線(xiàn)程數(shù)量,也成為方數(shù)。
int getNumberWaiting():獲取正在CyclicBarrier
上等待的線(xiàn)程數(shù)量。
int await():在CyclicBarrier
上進(jìn)行阻塞等待,直到發(fā)生以下情形之一:
- 在
CyclicBarrier
上等待的線(xiàn)程數(shù)量達(dá)到parties
,則所有線(xiàn)程被釋放,繼續(xù)執(zhí)行。 - 當(dāng)前線(xiàn)程被中斷,則拋出
InterruptedException
異常,并停止等待,繼續(xù)執(zhí)行。 - 其他等待的線(xiàn)程被中斷,則當(dāng)前線(xiàn)程拋出
BrokenBarrierException
異常,并停止等待,繼續(xù)執(zhí)行。 - 其他等待的線(xiàn)程超時(shí),則當(dāng)前線(xiàn)程拋出
BrokenBarrierException
異常,并停止等待,繼續(xù)執(zhí)行。 - 其他線(xiàn)程調(diào)用CyclicBarrier.reset()方法,則當(dāng)前線(xiàn)程拋出
BrokenBarrierException
異常,并停止等待,繼續(xù)執(zhí)行。
int await(timeout,TimeUnit):在CyclicBarrier
上進(jìn)行限時(shí)的阻塞等待,直到發(fā)生以下情形之一:
- 在
CyclicBarrier
上等待的線(xiàn)程數(shù)量達(dá)到parties
,則所有線(xiàn)程被釋放,繼續(xù)執(zhí)行。 - 當(dāng)前線(xiàn)程被中斷,則拋出
InterruptedException
異常,并停止等待,繼續(xù)執(zhí)行。 - 當(dāng)前線(xiàn)程等待超時(shí),則拋出
TimeoutException
異常,并停止等待,繼續(xù)執(zhí)行。 - 其他等待的線(xiàn)程被中斷,則當(dāng)前線(xiàn)程拋出
BrokenBarrierException
異常,并停止等待,繼續(xù)執(zhí)行。 - 其他等待的線(xiàn)程超時(shí),則當(dāng)前線(xiàn)程拋出
BrokenBarrierException
異常,并停止等待,繼續(xù)執(zhí)行。 - 其他線(xiàn)程調(diào)用
CyclicBarrier.reset()方
法,則當(dāng)前線(xiàn)程拋出BrokenBarrierException
異常,并停止等待,繼續(xù)執(zhí)行。
boolean isBroken():獲取是否破損標(biāo)志位broken
的值,此值有以下幾種情況:
CyclicBarrier
初始化時(shí),broken=false
,表示屏障未破損。- 如果正在等待的線(xiàn)程被中斷,則
broken=true
,表示屏障破損。 - 如果正在等待的線(xiàn)程超時(shí),則
broken=true
,表示屏障破損。 - 如果有線(xiàn)程調(diào)用
CyclicBarrier.reset()
方法,則broken=false
,表示屏障回到未破損狀態(tài)。
void reset():使得CyclicBarrier
回歸初始狀態(tài),直觀(guān)來(lái)看它做了兩件事:
- 如果有正在等待的線(xiàn)程,則會(huì)拋出
BrokenBarrierException
異常,且這些線(xiàn)程停止等待,繼續(xù)執(zhí)行。 - 將是否破損標(biāo)志位
broken
置為false
。
2 CyclicBarrier實(shí)例
假若有若干個(gè)線(xiàn)程都要進(jìn)行寫(xiě)數(shù)據(jù)操作,并且只有所有線(xiàn)程都完成寫(xiě)數(shù)據(jù)操作之后,這些線(xiàn)程才能繼續(xù)做后面的事情,此時(shí)就可以利用CyclicBarrier
了:
public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); for(int i=0;i<N;i++) new Writer(barrier).start(); } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("線(xiàn)程"+Thread.currentThread().getName()+"正在寫(xiě)入數(shù)據(jù)..."); try { Thread.sleep(5000); //以睡眠來(lái)模擬寫(xiě)入數(shù)據(jù)操作 System.out.println("線(xiàn)程"+Thread.currentThread().getName()+"寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println("所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)..."); } }
線(xiàn)程Thread-0正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-3正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-1正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-2正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-1寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
線(xiàn)程Thread-3寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
線(xiàn)程Thread-2寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
線(xiàn)程Thread-0寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
從上面輸出結(jié)果可以看出,每個(gè)寫(xiě)入線(xiàn)程執(zhí)行完寫(xiě)數(shù)據(jù)操作之后,就在等待其他線(xiàn)程寫(xiě)入操作完畢。
當(dāng)所有線(xiàn)程線(xiàn)程寫(xiě)入操作完畢之后,所有線(xiàn)程就繼續(xù)進(jìn)行后續(xù)的操作了。
如果想在所有線(xiàn)程寫(xiě)入操作完之后,進(jìn)行額外的其他操作可以為CyclicBarrier提供Runnable參數(shù):
public class CyclicBarrierTest { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N,new Runnable() { @Override public void run() { System.out.println("當(dāng)前線(xiàn)程"+Thread.currentThread().getName()); } }); for(int i=0;i<N;i++) new Writer(barrier).start(); } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("線(xiàn)程"+Thread.currentThread().getName()+"正在寫(xiě)入數(shù)據(jù)..."); try { Thread.sleep(3000); //以睡眠來(lái)模擬寫(xiě)入數(shù)據(jù)操作 System.out.println("線(xiàn)程"+Thread.currentThread().getName()+"寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println("所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)..."); } } }
線(xiàn)程Thread-0正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-3正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-2正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-1正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-1寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
線(xiàn)程Thread-3寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
線(xiàn)程Thread-0寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
線(xiàn)程Thread-2寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
當(dāng)前線(xiàn)程Thread-2
所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
從結(jié)果可以看出,當(dāng)四個(gè)線(xiàn)程都到達(dá)barrier
狀態(tài)后,會(huì)從四個(gè)線(xiàn)程中選擇一個(gè)線(xiàn)程去執(zhí)行Runnable
。
await指定時(shí)間的效果:
public class CyclicBarrierTest { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); for (int i = 0; i < N; i++) { if (i < N - 1) new Writer(barrier).start(); else { try { //運(yùn)行時(shí)間遠(yuǎn)小于2000(cyclicBarrier.await 指定時(shí)間) 就不會(huì)拋出TimeoutException Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } new Writer(barrier).start(); } } } static class Writer extends Thread { private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("線(xiàn)程" + Thread.currentThread().getName() + "正在寫(xiě)入數(shù)據(jù)..."); try { Thread.sleep(3000); //以睡眠來(lái)模擬寫(xiě)入數(shù)據(jù)操作 System.out.println("線(xiàn)程" + Thread.currentThread().getName() + "寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢"); try { cyclicBarrier.await(2000, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { e.printStackTrace(); } } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)..."); } } }
線(xiàn)程Thread-0正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-2正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-1正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-0寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
線(xiàn)程Thread-2寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
線(xiàn)程Thread-1寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
線(xiàn)程Thread-3正在寫(xiě)入數(shù)據(jù)...
java.util.concurrent.TimeoutException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at CyclicBarrierTest$Writer.run(CyclicBarrierTest.java:43)
Thread-0所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at CyclicBarrierTest$Writer.run(CyclicBarrierTest.java:43)
Thread-1所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at CyclicBarrierTest$Writer.run(CyclicBarrierTest.java:43)
Thread-2所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
線(xiàn)程Thread-3寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at CyclicBarrierTest$Writer.run(CyclicBarrierTest.java:43)
Thread-3所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
上面的代碼在main
方法的for
循環(huán)中,故意讓最后一個(gè)線(xiàn)程啟動(dòng)延遲,因?yàn)樵谇懊嫒齻€(gè)線(xiàn)程都達(dá)到barrier之后,等待了指定的時(shí)間發(fā)現(xiàn)第四個(gè)線(xiàn)程還沒(méi)有達(dá)到barrier
,就拋出異常并繼續(xù)執(zhí)行后面的任務(wù)。
另外CyclicBarrier是可以重用的,看下面這個(gè)例子:
public class CyclicBarrierTest { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); for(int i=0;i<N;i++) { new Writer(barrier).start(); } try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("CyclicBarrier重用"); for(int i=0;i<N;i++) { new Writer(barrier).start(); } } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("線(xiàn)程"+Thread.currentThread().getName()+"正在寫(xiě)入數(shù)據(jù)..."); try { Thread.sleep(3000); //以睡眠來(lái)模擬寫(xiě)入數(shù)據(jù)操作 System.out.println("線(xiàn)程"+Thread.currentThread().getName()+"寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)..."); } } }
線(xiàn)程Thread-0正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-3正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-2正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-1正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-1寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
線(xiàn)程Thread-0寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
線(xiàn)程Thread-3寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
線(xiàn)程Thread-2寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
Thread-2所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
Thread-1所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
Thread-3所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
Thread-0所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
CyclicBarrier重用
線(xiàn)程Thread-4正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-5正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-6正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-7正在寫(xiě)入數(shù)據(jù)...
線(xiàn)程Thread-5寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
線(xiàn)程Thread-4寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
線(xiàn)程Thread-7寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
線(xiàn)程Thread-6寫(xiě)入數(shù)據(jù)完畢,等待其他線(xiàn)程寫(xiě)入完畢
Thread-6所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
Thread-5所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
Thread-4所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
Thread-7所有線(xiàn)程寫(xiě)入完畢,繼續(xù)處理其他任務(wù)...
從執(zhí)行結(jié)果可以看出,在初次的4個(gè)線(xiàn)程越過(guò)barrier
狀態(tài)后,又可以用來(lái)進(jìn)行新一輪的使用。而CountDownLatch
無(wú)法進(jìn)行重復(fù)使用。
3 CyclicBarrier源碼解析
先看一下CyclicBarrier中成員變量的組成:
/** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties;//攔截的線(xiàn)程數(shù)量 /* The command to run when tripped */ private final Runnable barrierCommand; //當(dāng)屏障撤銷(xiāo)時(shí),需要執(zhí)行的屏障操作 //當(dāng)前的Generation。每當(dāng)屏障失效或者開(kāi)閘之后都會(huì)自動(dòng)替換掉。從而實(shí)現(xiàn)重置的功能。 private Generation generation = new Generation(); /** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ private int count;
可以看出,CyclicBarrier
是由ReentrantLock
和Condition
來(lái)實(shí)現(xiàn)的。具體每個(gè)變量都有什么意義,我們?cè)诜治鲈创a的時(shí)候具體說(shuō)。
我們主要從CyclicBarrier
的構(gòu)造方法和它的await
方法分析說(shuō)起。
CyclicBarrier構(gòu)造函數(shù)
CyclicBarrier有兩個(gè)構(gòu)造函數(shù):
//帶Runnable參數(shù)的函數(shù) public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties;//有幾個(gè)運(yùn)動(dòng)員要參賽 this.count = parties;//目前還需要幾個(gè)運(yùn)動(dòng)員準(zhǔn)備好 //你要在所有線(xiàn)程都繼續(xù)執(zhí)行下去之前要執(zhí)行什么操作,可以為空 this.barrierCommand = barrierAction; } //不帶Runnable參數(shù)的函數(shù) public CyclicBarrier(int parties) { this(parties, null); }
其中,第二個(gè)構(gòu)造函數(shù)調(diào)用的是第一個(gè)構(gòu)造函數(shù),這個(gè) Runnable barrierAction
參數(shù)是什么呢?其實(shí)在上面的小示例中我們就用到了這個(gè)Runnable
參數(shù),它就是在所有線(xiàn)程都準(zhǔn)備好之后,滿(mǎn)足Barrier
條件時(shí),并且在所有線(xiàn)程繼續(xù)執(zhí)行之前,我們可以執(zhí)行這個(gè)Runnable
。但是值得注意的是,這不是新起了一個(gè)線(xiàn)程,而是通過(guò)最后一個(gè)準(zhǔn)備好的(也就是最后一個(gè)到達(dá)Barrier
的)線(xiàn)程承擔(dān)啟動(dòng)的。這一點(diǎn)我們?cè)谏厦媸纠写蛴〉倪\(yùn)行結(jié)果中也可以看出來(lái):Thread-2
線(xiàn)程是最后一個(gè)準(zhǔn)備好的,就是它執(zhí)行的這個(gè)barrierAction
。
這里parties
和count
不要混淆,parties
是表示必須有幾個(gè)線(xiàn)程要到達(dá)Barrier
,而count
是表示目前還有幾個(gè)線(xiàn)程未到達(dá)Barrier。也就是說(shuō),只有當(dāng)count參數(shù)為0時(shí),Barrier
條件即滿(mǎn)足,所有線(xiàn)程可以繼續(xù)執(zhí)行。
count變量是怎么減少到0的呢?是通過(guò)Barrier
執(zhí)行的await方法。下面我們就看一下await
方法。
await方法
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen }
await方法調(diào)用的dowait方法:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock();//獲取ReentrantLock互斥鎖 try { final Generation g = generation;//獲取generation對(duì)象 if (g.broken)//如果generation損壞,拋出異常 throw new BrokenBarrierException(); if (Thread.interrupted()) { //如果當(dāng)前線(xiàn)程被中斷,則調(diào)用breakBarrier方法,停止CyclicBarrier,并喚醒所有線(xiàn)程 breakBarrier(); throw new InterruptedException(); } int index = --count;// 看到這里了吧,count減1 //index=0,也就是說(shuō),有0個(gè)線(xiàn)程未滿(mǎn)足CyclicBarrier條件,也就是條件滿(mǎn)足, //可以喚醒所有的線(xiàn)程了 if (index == 0) { // tripped boolean ranAction = false; try { //這就是構(gòu)造器的第二個(gè)參數(shù),如果不為空的話(huà),就執(zhí)行這個(gè)Runnable的run方法, //你看,這里是執(zhí)行的是run方法,也就是說(shuō),并沒(méi)有新起一個(gè)另外的線(xiàn)程, //而是最后一個(gè)執(zhí)行await操作的線(xiàn)程執(zhí)行的這個(gè)run方法。 final Runnable command = barrierCommand; if (command != null) command.run(); //同步執(zhí)行barrierCommand ranAction = true; nextGeneration(); //執(zhí)行成功設(shè)置下一個(gè)nextGeneration return 0; } finally { if (!ranAction) . //如果barrierCommand執(zhí)行失敗,進(jìn)行屏障破壞處理 breakBarrier(); } } //如果當(dāng)前線(xiàn)程不是最后一個(gè)到達(dá)的線(xiàn)程 // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); //調(diào)用Condition的await()方法阻塞 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); //調(diào)用Condition的awaitNanos()方法阻塞 } catch (InterruptedException ie) { //如果當(dāng)前線(xiàn)程被中斷,則判斷是否有其他線(xiàn)程已經(jīng)使屏障破壞。若沒(méi)有則進(jìn)行屏障破壞處理,并拋出異常;否則再次中斷當(dāng)前線(xiàn)程 if (g == generation && ! g.broken) { breakBarrier();//執(zhí)行breakBarrier,喚醒所有線(xiàn)程 throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken)//如果當(dāng)前generation已經(jīng)損壞,拋出異常 throw new BrokenBarrierException(); if (g != generation)//如果generation已經(jīng)更新?lián)Q代,則返回index return index; //如果是參數(shù)是超時(shí)等待,并且已經(jīng)超時(shí),則執(zhí)行breakBarrier()方法 //喚醒所有等待線(xiàn)程。 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
簡(jiǎn)單來(lái)說(shuō),如果不發(fā)生異常,線(xiàn)程不被中斷,那么dowait
方法會(huì)調(diào)用Condition
的await
方法(具體Condition的原理請(qǐng)看前面的文章),直到所有線(xiàn)程都準(zhǔn)備好,即都執(zhí)行了dowait
方法,(做count的減操作,直到count=0),即CyclicBarrier條件已滿(mǎn)足,就會(huì)執(zhí)行喚醒線(xiàn)程操作,也就是上面的nextGeneration()方法??赡艽蠹視?huì)有疑惑,這個(gè)Generation是什么東西呢?其實(shí)這個(gè)Generation
定義的很簡(jiǎn)單,就一個(gè)布爾值的成員變量:
private Generation generation = new Generation(); private static class Generation { boolean broken = false; }
Generation
可以理解成“代”,我們要知道,CyclicBarrier
是可以重復(fù)使用的,CyclicBarrier
中的同一批線(xiàn)程屬于同一“代”,當(dāng)所有線(xiàn)程都滿(mǎn)足了CyclicBarrier
條件,執(zhí)行喚醒操作nextGeneration()
方法時(shí),會(huì)新new 出一個(gè)Generation
,代表一下“代”。
nextGeneration的源碼
private void nextGeneration() { // signal completion of last generation trip.signalAll();//調(diào)用Condition的signalAll方法,喚醒所有await的線(xiàn)程 // set up next generation count = parties;//重置count值 //生成新的Generation,表示上一代的所有線(xiàn)程已經(jīng)喚醒,進(jìn)行更新?lián)Q代 generation = new Generation(); }
breakBarrier源碼
再來(lái)看一下breakBarrier
的代碼,breakBarrier
方法是在當(dāng)前線(xiàn)程被中斷時(shí)執(zhí)行的,用來(lái)喚醒所有的等待線(xiàn)程:
private void breakBarrier() { generation.broken = true;//表示當(dāng)代因?yàn)榫€(xiàn)程被中斷,已經(jīng)發(fā)成損壞了 count = parties;//重置count值 trip.signalAll();//調(diào)用Condition的signalAll方法,喚醒所有await的線(xiàn)程 }
isBroken方法
public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } }
判斷此屏障是否處于中斷狀態(tài)。如果因?yàn)闃?gòu)造或最后一次重置而導(dǎo)致中斷或超時(shí),從而使一個(gè)或多個(gè)參與者擺脫此屏障點(diǎn),或者因?yàn)楫惓6鴮?dǎo)致某個(gè)屏障操作失敗,則返回true
;否則返回false
。
reset方法
//將屏障重置為其初始狀態(tài)。 public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { //喚醒所有等待的線(xiàn)程繼續(xù)執(zhí)行,并設(shè)置屏障中斷狀態(tài)為true breakBarrier(); // break the current generation //喚醒所有等待的線(xiàn)程繼續(xù)執(zhí)行,并設(shè)置屏障中斷狀態(tài)為false nextGeneration(); // start a new generation } finally { lock.unlock(); } }
getNumberWaiting方法
//返回當(dāng)前在屏障處等待的參與者數(shù)目,此方法主要用于調(diào)試和斷言。 public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } }
總結(jié):
1.CyclicBarrier
可以用于多線(xiàn)程計(jì)算數(shù)據(jù),最后合并計(jì)算結(jié)果的應(yīng)用場(chǎng)景。
2.這個(gè)等待的await
方法,其實(shí)是使用ReentrantLock
和Condition
控制實(shí)現(xiàn)的。
3.CyclicBarrier
可以重復(fù)使用。
到此這篇關(guān)于Java多線(xiàn)程之同步工具類(lèi)CyclicBarrier的文章就介紹到這了,更多相關(guān)Java多線(xiàn)程 CyclicBarrier內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用多種方式實(shí)現(xiàn)遍歷HashMap的方法
下面小編就為大家?guī)?lái)一篇使用多種方式實(shí)現(xiàn)遍歷HashMap的方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-05-05基于SpringBoot集成測(cè)試遠(yuǎn)程連接Redis服務(wù)的教程詳解
這篇文章主要介紹了基于SpringBoot集成測(cè)試遠(yuǎn)程連接的Redis服務(wù)的相關(guān)知識(shí),本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-03-03使用SpringCloud Gateway解決跨域問(wèn)題
本文給大家介紹了使用SpringCloud Gateway解決跨域問(wèn)題,Spring Cloud Gateway是一個(gè)基于Spring Framework的微服務(wù)網(wǎng)關(guān),使用Spring Cloud Gateway的跨域配置能夠有效管理不同服務(wù)之間的通信,提高系統(tǒng)的可維護(hù)性和安全性,需要的朋友可以參考下2024-02-02詳解Spring Boot加載properties和yml配置文件
本篇文章主要介紹了詳解Spring Boot加載properties和yml配置文件,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-04-04