Java AQS中CyclicBarrier回環(huán)柵欄的使用
一. 簡(jiǎn)介
CyclicBarrier ,回環(huán)柵欄(循環(huán)屏障),通過它可以實(shí)現(xiàn)讓一組線程等待至某個(gè)狀態(tài)(屏障點(diǎn))之后再全部同時(shí)執(zhí)行。叫做回環(huán)是因?yàn)楫?dāng)所有等待線程都被釋放以后,CyclicBarrier可以被重用。

二. CyclicBarrier的使用
構(gòu)造方法
//parties表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞。
public CyclicBarrier(int parties) {
this(parties, null);
}
//用于在線程到達(dá)屏障時(shí),優(yōu)先執(zhí)行 barrierAction,
//方便處理更復(fù)雜的業(yè)務(wù)場(chǎng)景(該線程的執(zhí)行時(shí)機(jī)是在到達(dá)屏障之后再執(zhí)行)
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
重要方法
//指定數(shù)量的線程全部調(diào)用await()方法時(shí),這些線程不再阻塞
// BrokenBarrierException 表示柵欄已經(jīng)被破壞,破壞的原因可能是其中一個(gè)線程 await() 時(shí)被中斷或者超時(shí)
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
//循環(huán)重置
public void reset() {}
三. CyclicBarrier的應(yīng)用場(chǎng)景
CyclicBarrier 可以用于多線程計(jì)算數(shù)據(jù),最后合并計(jì)算結(jié)果的場(chǎng)景。
例子一 ,多個(gè)線程調(diào)用await之后阻塞,等到達(dá)到屏障攔截的線程數(shù)量之后,再一起執(zhí)行
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()
+ "開始等待其他線程");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "開始執(zhí)行");
//TODO 模擬業(yè)務(wù)處理
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + "執(zhí)行完畢");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
Thread-0開始等待其他線程
Thread-2開始等待其他線程
Thread-1開始等待其他線程
Thread-4開始等待其他線程
Thread-3開始等待其他線程
Thread-2開始執(zhí)行
Thread-1開始執(zhí)行
Thread-0開始執(zhí)行
例子二 用于多線程計(jì)算數(shù)據(jù),最后合并計(jì)算結(jié)果的場(chǎng)景
//保存每個(gè)學(xué)生的平均成績(jī)
private ConcurrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>();
private ExecutorService threadPool= Executors.newFixedThreadPool(3);
private CyclicBarrier cb=new CyclicBarrier(3,()->{
int result=0;
Set<String> set = map.keySet();
for(String s:set){
result+=map.get(s);
}
System.out.println("三人平均成績(jī)?yōu)?"+(result/3)+"分");
});
public void count(){
for(int i=0;i<3;i++){
threadPool.execute(new Runnable(){
@Override
public void run() {
//獲取學(xué)生平均成績(jī)
int score=(int)(Math.random()*40+60);
map.put(Thread.currentThread().getName(), score);
System.out.println(Thread.currentThread().getName()
+"同學(xué)的平均成績(jī)?yōu)椋?+score);
try {
//執(zhí)行完運(yùn)行await(),等待所有學(xué)生平均成績(jī)都計(jì)算完畢
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
public static void main(String[] args) {
CyclicBarrierTest2 cb=new CyclicBarrierTest2();
cb.count();
}
pool-1-thread-3同學(xué)的平均成績(jī)?yōu)椋?2
pool-1-thread-1同學(xué)的平均成績(jī)?yōu)椋?4
pool-1-thread-2同學(xué)的平均成績(jī)?yōu)椋?5
三人平均成績(jī)?yōu)?73分
例子三 利用CyclicBarrier的計(jì)數(shù)器能夠重置,屏障可以重復(fù)使用的特性
public static void main(String[] args) {
AtomicInteger counter = new AtomicInteger();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5, 5, 1000, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
(r) -> new Thread(r, counter.addAndGet(1) + " 號(hào) "),
new ThreadPoolExecutor.AbortPolicy());
CyclicBarrier cyclicBarrier = new CyclicBarrier(5,
() -> System.out.println("裁判:比賽開始~~"));
for (int i = 0; i < 10; i++) {
threadPoolExecutor.submit(new Runner(cyclicBarrier));
}
}
static class Runner extends Thread{
private CyclicBarrier cyclicBarrier;
public Runner (CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
int sleepMills = ThreadLocalRandom.current().nextInt(1000);
Thread.sleep(sleepMills);
System.out.println(Thread.currentThread().getName() + " 選手已就位, 準(zhǔn)備共用時(shí): " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
}
}
3 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 223ms0
2 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 315ms1
5 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 471ms2
1 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 556ms3
4 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 923ms4
裁判:比賽開始~~
3 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 285ms0
2 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 413ms1
1 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 533ms2
5 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 661ms3
4 號(hào) 選手已就位, 準(zhǔn)備共用時(shí): 810ms4
裁判:比賽開始~~
四. CyclicBarrier與CountDownLatch的區(qū)別
- CountDownLatch的計(jì)數(shù)器只能使用一次,而CyclicBarrier的計(jì)數(shù)器可以使用reset() 方法重置。所以CyclicBarrier能處理更為復(fù)雜的業(yè)務(wù)場(chǎng)景,比如如果計(jì)算發(fā)生錯(cuò)誤,可以重置計(jì)數(shù)器,并讓線程們重新執(zhí)行一次
- CyclicBarrier還提供getNumberWaiting(可以獲得CyclicBarrier阻塞的線程數(shù)量)、isBroken(用來知道阻塞的線程是否被中斷)等方法。
- CountDownLatch會(huì)阻塞主線程,CyclicBarrier不會(huì)阻塞主線程,只會(huì)阻塞子線程。
- CountDownLatch和CyclicBarrier都能夠?qū)崿F(xiàn)線程之間的等待,只不過它們側(cè)重點(diǎn)不同。CountDownLatch一般用于一個(gè)或多個(gè)線程,等待其他線程執(zhí)行完任務(wù)后,再執(zhí)行。CyclicBarrier一般用于一組線程互相等待至某個(gè)狀態(tài),然后這一組線程再同時(shí)執(zhí)行。
- CyclicBarrier 還可以提供一個(gè) barrierAction,合并多線程計(jì)算結(jié)果。
- CyclicBarrier是通過ReentrantLock的"獨(dú)占鎖"和Conditon來實(shí)現(xiàn)一組線程的阻塞喚醒的,而CountDownLatch則是通過AQS的“共享鎖”實(shí)現(xiàn)
五. CyclicBarrier源碼解析
以例子以為例,thread0 從await方法開始,await會(huì)調(diào)用dowait
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
那么dowait里又做了什么
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//如果狀態(tài)為0就將其修改為1,并設(shè)置當(dāng)前線程,調(diào)用await時(shí)外面肯定是lock
lock.lock();
try {
//每一個(gè)棧欄算是一代
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//每個(gè)線程執(zhí)行一次,count自減一
int index = --count;
// count減到0,說明幾個(gè)線程都到達(dá)屏障,就會(huì)重置 進(jìn)入下一個(gè)屏障
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
//trip就是一個(gè)條件隊(duì)列condition,入條件等待隊(duì)列,單向鏈表結(jié)構(gòu)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
入隊(duì)阻塞的方法在await()中,下面看一下
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//入隊(duì) 創(chuàng)建節(jié)點(diǎn)
Node node = addConditionWaiter();
// 釋放鎖,這樣其他線程才能獲取鎖,執(zhí)行
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//不在當(dāng)前隊(duì)列中就阻塞
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter中創(chuàng)建節(jié)點(diǎn),對(duì)于thread0,頭節(jié)點(diǎn)是自己,lastWaiter也是自己
/**
* 添加條件等待節(jié)點(diǎn)
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
//節(jié)點(diǎn)取消 則移除.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//創(chuàng)建一個(gè)節(jié)點(diǎn),thread=Thread.currentThread(),waitstatus= Node.CONDITION =-2
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//如果上一個(gè)節(jié)點(diǎn)為null,則將該節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

釋放鎖
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//釋放鎖時(shí)state變?yōu)?了
if (c == 0) {
free = true;
//將當(dāng)前線程設(shè)置為null
setExclusiveOwnerThread(null);
}
//修改state狀態(tài)值
setState(c);
return free;
}
thread0的大致流程
首先會(huì)調(diào)用lock.lock進(jìn)行加鎖,加鎖之后調(diào)用trip.await方法進(jìn)行入隊(duì)阻塞,入隊(duì)是通過addConditionWaiter添加進(jìn)條件等待隊(duì)列,然后通過fullyRelease釋放鎖,設(shè)置當(dāng)前線程為null,然后修改state狀態(tài)值,最后調(diào)用LockSupport.park(this);進(jìn)行阻塞。

threa1的流程大致和thread0一樣,還沒有達(dá)到屏障數(shù)量,入隊(duì)的地方和thread0不一樣
private Node addConditionWaiter() {
Node t = lastWaiter;
//節(jié)點(diǎn)取消 則移除.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//創(chuàng)建一個(gè)節(jié)點(diǎn),thread=Thread.currentThread(),waitstatus= Node.CONDITION =-2
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//如果上一個(gè)節(jié)點(diǎn)為null,則將該節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)
if (t == null)
firstWaiter = node;
else
//thread1執(zhí)行時(shí),t已經(jīng)不為null了
t.nextWaiter = node;
//lastWaiter指向當(dāng)前
lastWaiter = node;
return node;
}

thread1流程

thread2執(zhí)行的時(shí)候count已經(jīng)減到0,會(huì)執(zhí)行nextGeneration方法
//每個(gè)線程執(zhí)行一次,count自減一
int index = --count;
// count減到0,說明幾個(gè)線程都到達(dá)屏障,就會(huì)重置 進(jìn)入下一個(gè)屏障
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//開啟下一代屏障
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
private void nextGeneration() {
// 喚醒所有線程,喚醒操作是在同步等待隊(duì)列中,所以要將條件等待隊(duì)列轉(zhuǎn)換為同步等待隊(duì)列
trip.signalAll();
// 重置count
count = parties;
//創(chuàng)建下一代屏障
generation = new Generation();
}
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
//將firstWaiter和lastWaiter都置為null,就沒有首尾節(jié)點(diǎn)了
lastWaiter = firstWaiter = null;
//條件隊(duì)列的出隊(duì)
do {
//循環(huán)將條件隊(duì)列轉(zhuǎn)同步隊(duì)列
Node next = first.nextWaiter;
//first的nextWaiter置為null
first.nextWaiter = null;
//條件隊(duì)列轉(zhuǎn)同步隊(duì)列,因?yàn)閱拘咽窃谕疥?duì)列中
transferForSignal(first);
first = next;
} while (first != null);
}
條件隊(duì)列的出隊(duì)

/**
* 條件隊(duì)列轉(zhuǎn)同步隊(duì)列
*/
final boolean transferForSignal(Node node) {
//將同步狀態(tài)改為0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
//將ws設(shè)置為-1
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
同步隊(duì)列入隊(duì)

thread2開始解鎖
private int dowait(boolean timed, long nanos){
//....
} finally {
lock.unlock();
}
}
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//喚醒線程0
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
//將ws設(shè)置為0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//喚醒線程0
LockSupport.unpark(s.thread);
}
Thread0喚醒之后就會(huì)獲取鎖,執(zhí)行業(yè)務(wù)邏輯然后再釋放鎖
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 在這里獲取鎖,再釋放鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
//CAS嘗試獲取鎖
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//cas獲取鎖
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
同步隊(duì)列就會(huì)出隊(duì)

Thread0喚醒之后,執(zhí)行業(yè)務(wù)邏輯之后會(huì)unlock,又會(huì)喚醒Thread1,Thread1喚醒之后,又會(huì)喚醒Thread2。
總結(jié):
await方法,
前半段 釋放鎖 進(jìn)入條件隊(duì)列,阻塞線程(Thread0 Thread1),
過渡階段 其他線程調(diào)用singnal/signalAll喚醒(Thread2),條件隊(duì)列轉(zhuǎn)同步隊(duì)列,可以在釋放鎖的時(shí)候喚醒head的后續(xù)節(jié)點(diǎn)所在的線程
后半段 (Thread0)被喚醒的線程獲取鎖(如果有競(jìng)爭(zhēng),CAS獲取鎖失敗,還會(huì)阻塞),Thread0釋放鎖,喚醒同步隊(duì)列中head的后續(xù)節(jié)點(diǎn)所在的線程(獨(dú)占鎖的邏輯)
到此這篇關(guān)于Java AQS中CyclicBarrier回環(huán)柵欄的使用的文章就介紹到這了,更多相關(guān)Java CyclicBarrier內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中保證多線程間的數(shù)據(jù)共享的方法詳解
這篇文章詳解的發(fā)給大家介紹了Java中是如何保證多線程間的數(shù)據(jù)共享的,文中通過圖文介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2023-11-11
SpringBoot手動(dòng)開啟事務(wù):DataSourceTransactionManager問題
這篇文章主要介紹了SpringBoot手動(dòng)開啟事務(wù):DataSourceTransactionManager問題,具有很好的價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07
Java程序員的10道常見的XML面試問答題(XML術(shù)語詳解)
包括web開發(fā)人員的Java面試在內(nèi)的各種面試中,XML面試題在各種編程工作的面試中很常見。XML是一種成熟的技術(shù),經(jīng)常作為從一個(gè)平臺(tái)到其他平臺(tái)傳輸數(shù)據(jù)的標(biāo)準(zhǔn)2014-04-04
IDEA關(guān)于.properties資源文件的編碼調(diào)整問題
這篇文章主要介紹了IDEA關(guān)于.properties資源文件的編碼調(diào)整問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06
Spring Boot(四)之使用JWT和Spring Security保護(hù)REST API
這篇文章主要介紹了Spring Boot(四)之使用JWT和Spring Security保護(hù)REST API的相關(guān)知識(shí),需要的朋友可以參考下2017-04-04
關(guān)于Springboot+gateway整合依賴并處理依賴沖突問題
這篇文章主要介紹了Springboot+gateway整合依賴并處理依賴沖突問題,給大家提到了spring boot版本和spring cloud版本,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-01-01

