Java并發(fā)工具之CyclicBarrier使用詳解
1、CyclicBarrier簡(jiǎn)介
CyclicBarrier是一個(gè)同步器,允許一組線程相互之間等待,直到到達(dá)某個(gè)公共屏障點(diǎn) (common barrier point),再繼續(xù)執(zhí)行。
因?yàn)镃yclicBarrier 的計(jì)數(shù)器是可以循環(huán)利用的,所以稱它為循環(huán)(Cyclic) 的 Barrier。
CyclicBarrier常用于多線程計(jì)算數(shù)據(jù),當(dāng)所有線程都完成執(zhí)行后,在CyclicBarrier回調(diào)線程中合并計(jì)算。
2、使用介紹
CyclicBarrier默認(rèn)的構(gòu)造方法是CyclicBarrier(int parties)其參數(shù)表示屏障(barrier )攔截的線程數(shù) 量,每個(gè)線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞。
public class CyclicBarrierDemo { static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } //在子線程輸出1 System.out.println("1"); } }).start(); try { c.await(); //在主線程輸出2 System.out.println("2"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }
因?yàn)橹骶€程和子線程的調(diào)度是由CPU決定的,兩個(gè)線程都有可能先執(zhí)行,所以既有可能先執(zhí)行System.out.println("1");,也有可能先執(zhí)行 System.out.println("2");。
如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),則主線程和子線程會(huì)永遠(yuǎn)等待, 因?yàn)闆](méi)有第三個(gè)線程執(zhí)行await方法,即沒(méi)有第三個(gè)線程到達(dá)屏障,所以之前到達(dá)屏障的兩個(gè) 線程都不會(huì)繼續(xù)執(zhí)行。
此外CyclicBarrier還可以設(shè)置回調(diào)函數(shù),它是一個(gè) Runnable 實(shí)例,用于在線程到達(dá)屏障時(shí),優(yōu)先執(zhí)行 Runnable 實(shí)例,可以處理更復(fù)雜的業(yè)務(wù)場(chǎng)景。
public class CyclicBarrierDemo { //實(shí)例化CyclicBarrier,并指定回調(diào)函數(shù) static CyclicBarrier c = new CyclicBarrier(2,new TestRunable()); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("1"); } }).start(); try { c.await(); System.out.println("2"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } static class TestRunable implements Runnable{ @Override public void run() { System.out.println("3"); } } }
輸出結(jié)果:
312
CyclicBarrier方法
- await() 該方法被調(diào)用時(shí)表示當(dāng)前線程已經(jīng)到達(dá)公共屏障點(diǎn) (common barrier point),當(dāng)前線程阻塞進(jìn)入休眠狀態(tài),直到所有線程都到達(dá)屏障點(diǎn),當(dāng)前線程才會(huì)被喚醒。
- await(long timeout, TimeUnit unit) await()的重載方法,可以指定阻塞時(shí)長(zhǎng);
- getParties() 返回參與相互等待的線程數(shù);
- isBroken() 判斷此屏障是否處于中斷狀態(tài)。如果因?yàn)闃?gòu)造或最后一次重置而導(dǎo)致中斷或超時(shí),從而使一個(gè)或多個(gè)參與者擺脫此屏障點(diǎn),或者因?yàn)楫惓6鴮?dǎo)致某個(gè)屏障操作失敗,則返回true;否則返回false;
- reset() 將屏障重置為其初始狀態(tài);
- getNumberWaiting() 返回當(dāng)前在屏障處等待的參與者數(shù)目,此方法主要用于調(diào)試。
3、使用案例
如果項(xiàng)目中有這樣一個(gè)需求,計(jì)算每個(gè)客戶指定年份消費(fèi)總金額,然后對(duì)所有年份金額進(jìn)行匯總。我們可以使用一個(gè)固定數(shù)量的線程計(jì)算每年消費(fèi)總金額,將相應(yīng)的結(jié)果存儲(chǔ)在一個(gè)集合中,當(dāng)所有線程完成執(zhí)行其操作時(shí),在CyclicBarrier回調(diào)線程中進(jìn)行匯總以及顯示。代碼如下:
public class UserConsumeInfo { private String userName; private List<UserAggreInfo> userAggreInfos; //省略必要的get set 方法...... } public class UserAggreInfo { //年份 private Integer year; //消費(fèi)金額 private Integer amount; public UserAggreInfo(Integer year, Integer amount) { this.year = year; this.amount = amount; } //省略必要的get set 方法...... }
public class CyclicBarrierTest { private CyclicBarrier cyclicBarrier; //保存客戶消費(fèi)數(shù)據(jù) private List<UserConsumeInfo> partialResults = new CopyOnWriteArrayList<>(); private Random random = new Random(); //要統(tǒng)計(jì)的年份 private List<Integer> years; //統(tǒng)計(jì)的用戶 private List<String> users; /** * 構(gòu)造函數(shù) * @param years * @param users */ public CyclicBarrierTest(List<Integer> years, List<String> users) { this.years = years; this.users = users; } /** * 開(kāi)始計(jì)算用戶消費(fèi)信息 */ public void startCalc(){ cyclicBarrier = new CyclicBarrier(users.size(), new AggregatorThread()); for (int i = 0; i < users.size(); i++) { Thread worker = new Thread(new CalcUserAmountThread(users.get(i))); //把用戶設(shè)置為當(dāng)前線程名稱 worker.setName(users.get(i)); worker.start(); } } /** * 計(jì)算用戶消費(fèi)金額線程 */ final class CalcUserAmountThread implements Runnable{ //用戶名 private String userName; public CalcUserAmountThread(String userName) { this.userName = userName; } @Override public void run() { UserConsumeInfo userConsumeInfo=new UserConsumeInfo(); userConsumeInfo.setUserName(userName); //計(jì)算每年的消費(fèi)金額 List<UserAggreInfo> userAggreInfos=new ArrayList<>(); for (int i = 0; i < years.size(); i++) { Integer num = random.nextInt(1000); System.out.println("用戶:"+userName+" "+years.get(i)+"年,消費(fèi):"+num+"元"); userAggreInfos.add(new UserAggreInfo(years.get(i),num)); } userConsumeInfo.setUserAggreInfos(userAggreInfos); partialResults.add(userConsumeInfo); try { System.out.println("用戶:"+userName + "到達(dá)barrier,等待其他用戶."); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { } } } /** * 聚合線程,算出用戶總的消費(fèi)金額 */ class AggregatorThread implements Runnable { @Override public void run() { System.out.println("=====統(tǒng)計(jì)用戶消費(fèi)信息======="); int sum = 0;//所有用戶消費(fèi)金額 for (UserConsumeInfo user : partialResults) { String userName = user.getUserName(); System.out.println("用戶:"+userName+"消費(fèi)信息如下:"); for (UserAggreInfo aggre : user.getUserAggreInfos()) { System.out.print("\t"+aggre.getYear()+"年,"+"消費(fèi)"+aggre.getAmount()+"元"+"\t"); sum+=aggre.getAmount(); } System.out.println(); } System.out.println("所有用戶消費(fèi)總金額:"+sum); } } }
測(cè)試:
public static void main(String[] args) { List<String> userResults=new ArrayList<>(); userResults.add("張三"); userResults.add("李四"); userResults.add("王二"); userResults.add("小米"); userResults.add("小明"); userResults.add("小紅"); userResults.add("小張"); List<Integer> yearResults=new ArrayList<>(); yearResults.add(2018); yearResults.add(2019); yearResults.add(2020); CyclicBarrierTest demo = new CyclicBarrierTest(yearResults,userResults); demo.startCalc(); }
輸出結(jié)果:
用戶:小明 2018年,消費(fèi):310元
用戶:小紅 2018年,消費(fèi):589元
用戶:李四 2018年,消費(fèi):557元
用戶:小米 2018年,消費(fèi):946元
用戶:張三 2018年,消費(fèi):150元
用戶:王二 2018年,消費(fèi):17元
用戶:小張 2018年,消費(fèi):228元
用戶:小明 2019年,消費(fèi):29元
用戶:王二 2019年,消費(fèi):741元
用戶:小張 2019年,消費(fèi):380元
用戶:王二 2020年,消費(fèi):650元
用戶:小紅 2019年,消費(fèi):412元
用戶:李四 2019年,消費(fèi):453元
用戶:王二到達(dá)barrier,等待其他用戶.
用戶:小明 2020年,消費(fèi):582元
用戶:小米 2019年,消費(fèi):524元
用戶:張三 2019年,消費(fèi):691元
用戶:小米 2020年,消費(fèi):777元
用戶:小明到達(dá)barrier,等待其他用戶.
用戶:李四 2020年,消費(fèi):262元
用戶:小紅 2020年,消費(fèi):631元
用戶:小張 2020年,消費(fèi):34元
用戶:小紅到達(dá)barrier,等待其他用戶.
用戶:李四到達(dá)barrier,等待其他用戶.
用戶:小米到達(dá)barrier,等待其他用戶.
用戶:張三 2020年,消費(fèi):802元
用戶:小張到達(dá)barrier,等待其他用戶.
用戶:張三到達(dá)barrier,等待其他用戶.
=====統(tǒng)計(jì)用戶消費(fèi)信息=======
用戶:王二消費(fèi)信息如下:
2018年,消費(fèi)17元 2019年,消費(fèi)741元 2020年,消費(fèi)650元 共消費(fèi):1408
用戶:小明消費(fèi)信息如下:
2018年,消費(fèi)310元 2019年,消費(fèi)29元 2020年,消費(fèi)582元 共消費(fèi):921
用戶:小米消費(fèi)信息如下:
2018年,消費(fèi)946元 2019年,消費(fèi)524元 2020年,消費(fèi)777元 共消費(fèi):2247
用戶:李四消費(fèi)信息如下:
2018年,消費(fèi)557元 2019年,消費(fèi)453元 2020年,消費(fèi)262元 共消費(fèi):1272
用戶:小紅消費(fèi)信息如下:
2018年,消費(fèi)589元 2019年,消費(fèi)412元 2020年,消費(fèi)631元 共消費(fèi):1632
用戶:小張消費(fèi)信息如下:
2018年,消費(fèi)228元 2019年,消費(fèi)380元 2020年,消費(fèi)34元 共消費(fèi):642
用戶:張三消費(fèi)信息如下:
2018年,消費(fèi)150元 2019年,消費(fèi)691元 2020年,消費(fèi)802元 共消費(fèi):1643
所有用戶消費(fèi)總金額:9765
4、CyclicBarrier和CountDownLatch的區(qū)別
CountDownLatch的計(jì)數(shù)器只能使用一次,而CyclicBarrier的計(jì)數(shù)器可以使用reset()方法重 置。
所以CyclicBarrier能處理更為復(fù)雜的業(yè)務(wù)場(chǎng)景。
例如,如果計(jì)算發(fā)生錯(cuò)誤,可以重置計(jì)數(shù) 器,并讓線程重新執(zhí)行一次。
CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得Cyclic-Barrier 阻塞的線程數(shù)量。
isBroken()方法用來(lái)了解阻塞的線程是否被中斷
到此這篇關(guān)于Java并發(fā)工具之CyclicBarrier使用詳解的文章就介紹到這了,更多相關(guān)CyclicBarrier使用詳解內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java基礎(chǔ)知識(shí)精選 你答對(duì)了幾道?
精選Java基礎(chǔ)知識(shí)講解,看看你能答對(duì)多少?2017-09-09java根據(jù)模板實(shí)現(xiàn)填充word內(nèi)容并轉(zhuǎn)換為pdf
這篇文章主要為大家詳細(xì)介紹了java如何根據(jù)模板實(shí)現(xiàn)填充word內(nèi)容并轉(zhuǎn)換為pdf,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2025-04-04springAOP的三種實(shí)現(xiàn)方式示例代碼
這篇文章主要介紹了springAOP的三種實(shí)現(xiàn)方式,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07聊聊BeanUtils.copyProperties和clone()方法的區(qū)別
這篇文章主要介紹了聊聊BeanUtils.copyProperties和clone()方法的區(qū)別,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09使用springmvc運(yùn)行流程分析,手寫spring框架嘗試
這篇文章主要介紹了使用springmvc運(yùn)行流程分析,手寫spring框架嘗試,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10Java中synchronized鎖升級(jí)的過(guò)程
本文主要介紹了Java中synchronized鎖升級(jí)的過(guò)程,synchronized相對(duì)于早期的synchronized做出了優(yōu)化,從以前的加鎖就是重量級(jí)鎖優(yōu)化成了有一個(gè)鎖升級(jí)的過(guò),下文詳細(xì)內(nèi)容需要的小伙伴可以參考一下2022-05-05Java中的CGLIB動(dòng)態(tài)代理的使用及原理詳解
這篇文章主要介紹了Java中的CGLIB動(dòng)態(tài)代理的使用及原理詳解,CGLIB是一個(gè)功能強(qiáng)大,高性能的代碼生成包,它為沒(méi)有實(shí)現(xiàn)接口的類提供代理,為JDK的動(dòng)態(tài)代理提供了很好的補(bǔ)充,需要的朋友可以參考下2023-09-09Java 中解決Unsupported major.minor version 51.0的問(wèn)題
本文主要介紹解決Unsupported major.minor version 51.0的問(wèn)題, 這里給大家整理了詳細(xì)資料,有需要的小伙伴可以參考下2016-08-08