亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Java中的Phaser并發(fā)階段器詳解

 更新時(shí)間:2023年12月22日 08:57:16   作者:Java面試365  
這篇文章主要介紹了Java中的Phaser并發(fā)階段器詳解,Phaser由JDK1.7提出,是一個(gè)復(fù)雜強(qiáng)大的同步輔助類,是對(duì)同步工具類CountDownLatch和CyclicBarrier的綜合升級(jí),能夠支持分階段實(shí)現(xiàn)等待的業(yè)務(wù)場(chǎng)景,需要的朋友可以參考下

Phaser并發(fā)階段器

Phaser由JDK1.7提出,是一個(gè)復(fù)雜強(qiáng)大的同步輔助類,是對(duì)同步工具類CountDownLatch和CyclicBarrier的綜合升級(jí),能夠支持分階段實(shí)現(xiàn)等待的業(yè)務(wù)場(chǎng)景。

我們可以回憶下CountDownLatch講的是先指定N個(gè)線程,在N個(gè)線程干完活之前,其它線程都需要等待(導(dǎo)游等待旅游團(tuán)所有人上車才能開車),而CyclicBarrier講的是先指定N個(gè)線程。等N個(gè)線程到齊了大家同時(shí)干活(多個(gè)驢友相約去旅游,先到的需要等待后來(lái)的),而Phaser是兩者的結(jié)合,可以理解為先指定N個(gè)線程,等N個(gè)線程到齊后開始干第一階段的活,等第一階段所有的線程都干完活了,接著N個(gè)線程開始干第二階段的活,直到所有的階段完成工作,程序結(jié)束,當(dāng)然需要注意的是每個(gè)階段可以根據(jù)業(yè)務(wù)需要新增或者刪除一些線程,并不是開始指定多少個(gè)線程每個(gè)階段就必須有多少個(gè)線程。

入門體驗(yàn)

看了概念可能不容易理解,從一個(gè)小demo入手體驗(yàn)下

public class PhaserDemo1 {
    // 指定隨機(jī)種子
    private static Random random = new Random(System.currentTimeMillis());
    public static void main(String[] args) {
        Phaser phaser = new Phaser();
        // 將線程注冊(cè)到phaser
        phaser.register();
        for (int i = 0; i <5 ; i++) {
            Task task = new Task(phaser);
            task.start();
        }
        phaser.arriveAndAwaitAdvance();
        System.out.println("all task execute close");
    }
    static class Task extends Thread{
        Phaser phaser;
        public Task(Phaser phaser){
            this.phaser = phaser;
            this.phaser.register();
        }
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+"開始執(zhí)行");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(Thread.currentThread().getName()+"執(zhí)行完畢");
                // 類似CountDownLatch中的 await
                phaser.arriveAndAwaitAdvance();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

不知道有沒有這樣的疑惑,phaser.register是向phaser去注冊(cè)這個(gè)線程,那么為什么主線程也需要注冊(cè)呢?

其實(shí)很簡(jiǎn)單主線程需要等待所有子線程執(zhí)行完畢才能繼續(xù)往下面執(zhí)行所以必須要phaser.arriveAndAwaitAdvance();阻塞等待,而這個(gè)語(yǔ)句是意思當(dāng)前線程已經(jīng)到達(dá)屏障,在此等待一段時(shí)間等條件滿足后需要向下一個(gè)屏障繼續(xù)執(zhí)行,如果沒有主線程的phaser.register,直接調(diào)用phaser.arriveAndAwaitAdvance,在源碼中提到可能會(huì)有異常,所以必須在主程序中注冊(cè)phaser.register();

/* <p>It is a usage error for an unregistered party to invoke this
* method.  However, this error may result in an {@code
* IllegalStateException} only upon some subsequent operation on
* this phaser, if ever.
*/
譯:
未注冊(cè)方調(diào)用此函數(shù)是一個(gè)使用錯(cuò)誤方法。但是,這個(gè)錯(cuò)誤可能會(huì)導(dǎo)致
{@codeIllegalStateException}僅在一些后續(xù)操作這個(gè)相位器,如果有的話。

Phaser解決分科考試問(wèn)題

從體驗(yàn)的示例中其實(shí)沒看出其優(yōu)勢(shì)在哪里,上訴場(chǎng)景完全可以采用CountDownLatch,所以現(xiàn)在換一種場(chǎng)景來(lái)說(shuō)明Phaser的優(yōu)勢(shì)。

假設(shè)某校舉行期末考試,有三門考試語(yǔ)文、數(shù)學(xué)、英語(yǔ),每門課允許學(xué)生提前交卷,只有當(dāng)所有學(xué)生完成考試后才能舉行下一次的考試,這就是典型的分階段任務(wù)處理,示例圖如下。

圖片

將上訴場(chǎng)景語(yǔ)義化如下

public class PhaserExam {
    public static Random random = new Random(System.currentTimeMillis());
    public static void main(String[] args) {
        // 一次初始化2個(gè) 相當(dāng)于兩次register
        Phaser phaser = new Phaser(2);
        for (int i = 0; i <2 ; i++) {
            Exam exam = new Exam(phaser,random.nextLong());
            exam.start();
        }
    }
    static class Exam extends Thread{
        Phaser phaser;
        Long id;
        public Exam(Phaser phaser,Long id){
            this.phaser = phaser;
            this.id = id;
        }
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+"===開始語(yǔ)文考試");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(Thread.currentThread().getName()+"===結(jié)束語(yǔ)文考試");
                phaser.arriveAndAwaitAdvance();
                System.out.println(Thread.currentThread().getName()+"===開始數(shù)學(xué)考試");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(Thread.currentThread().getName()+"===結(jié)束數(shù)學(xué)考試");
                phaser.arriveAndAwaitAdvance();
                System.out.println(Thread.currentThread().getName()+"===開始英語(yǔ)考試");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(Thread.currentThread().getName()+"===結(jié)束英語(yǔ)考試");
                phaser.arriveAndAwaitAdvance();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

代碼執(zhí)行結(jié)果如下,可以看到三個(gè)階段都是等待所有線程執(zhí)行完畢后才往下執(zhí)行,相當(dāng)于多個(gè)柵欄。

圖片

到這里請(qǐng)注意,通過(guò)Phaser類的構(gòu)造方法構(gòu)建的party數(shù),也就是線程數(shù)需要和循環(huán)的次數(shù)對(duì)應(yīng),不然可能影響后續(xù)階段器的正常運(yùn)行。

兩個(gè)重要狀態(tài)

在Phaser內(nèi)有2個(gè)重要狀態(tài),分別是phase和party,乍一看很難理解,他們的定義如下。

phase就是階段,如上面提到的語(yǔ)文、數(shù)學(xué)、英語(yǔ)考試這每個(gè)考試對(duì)應(yīng)一個(gè)階段,不過(guò)phase是從0開始的,當(dāng)所有任務(wù)執(zhí)行完畢,準(zhǔn)備進(jìn)入下一個(gè)階段時(shí)phase就會(huì)加一。

party對(duì)應(yīng)注冊(cè)到Phaser線程數(shù),party初始值有兩種形式

  • 方法一就是通過(guò)Phaser的有參構(gòu)造初始化party值。
  • 方法二采用動(dòng)態(tài)注冊(cè)方法phaser.register()或phaser.bulkRegister(線程數(shù))指定線程數(shù),注銷線程調(diào)用phaser.arriveAndDeregister()方法party值會(huì)減一。

Phaser常用API

Phaser常用API總結(jié)如下所示

// 獲取Phaser階段數(shù),默認(rèn)0
public final int getPhase();
// 向Phaser注冊(cè)一個(gè)線程
public int register();    
// 向Phaser注冊(cè)多個(gè)線程
public int bulkRegister(int parties);
// 獲取已經(jīng)注冊(cè)的線程數(shù),也就是重要狀態(tài)party的值
public int getRegisteredParties();
// 到達(dá)并且等待其它線程到達(dá)
public int arriveAndAwaitAdvance();
// 到達(dá)后注銷不等待其它線程,繼續(xù)往下執(zhí)行
public int arriveAndDeregister();
// 已到達(dá)線程數(shù)
public int getArrivedParties();
// 未到達(dá)線程數(shù)
public int getUnarrivedParties();
// Phaser是否結(jié)束 只有當(dāng)party的數(shù)量是0或者調(diào)用方法forceTermination時(shí)才會(huì)結(jié)束
public boolean isTerminated();
// 結(jié)束Phaser
public void forceTermination();

代碼演示如下

public class PhaserApiTest {
    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser(5);
        System.out.println("當(dāng)前階段"+phaser.getPhase());
        System.out.println("注冊(cè)線程數(shù)==="+phaser.getRegisteredParties());
        // 向phaser注冊(cè)一個(gè)線程
        phaser.register();
        System.out.println("注冊(cè)線程數(shù)==="+phaser.getRegisteredParties());
        // 向phaser注冊(cè)多個(gè)線程,批量注冊(cè)
        phaser.bulkRegister(4);
        System.out.println("注冊(cè)線程數(shù)==="+phaser.getRegisteredParties());
        new Thread(()->{
            // 到達(dá)且等待
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName()+"===執(zhí)行1");
        }).start();
        new Thread(()->{
            // 到達(dá)不等待,從phaser中注銷一個(gè)線程
            phaser.arriveAndDeregister();
            System.out.println(Thread.currentThread().getName()+"===執(zhí)行2");
        }).start();
        TimeUnit.SECONDS.sleep(3);
        System.out.println("已到達(dá)線程數(shù)==="+phaser.getArrivedParties());
        System.out.println("未到達(dá)線程數(shù)==="+phaser.getUnarrivedParties());
        System.out.println("Phaser是否結(jié)束"+phaser.isTerminated());
        phaser.forceTermination();
        System.out.println("Phaser是否結(jié)束"+phaser.isTerminated());
    }
}

執(zhí)行結(jié)果如下所示

圖片

arriveAndAwaitAdvance解析

arriveAndAwaitAdvance是Phaser中一個(gè)重要實(shí)現(xiàn)阻塞的API,其實(shí)arriveAndAwaitAdvance是由arrive方法和awaitAdvance方法合并而來(lái),兩個(gè)方法的作用分別為

  • arrive:到達(dá)屏障但不阻塞,返回值為到達(dá)的階段號(hào)。
  • awaitAdvance(int):接收一個(gè) int 值的階段號(hào),在指定的屏障處阻塞。

測(cè)試代碼如下

public class PhaserTestArrive {
    public static Random random = new Random(System.currentTimeMillis());
    public static void main(String[] args) {
        Phaser phaser = new Phaser(5);
        for (int i = 0; i <5 ; i++) {
            new Task(i,phaser).start();
        }
        phaser.register();
        // 主線程需要調(diào)用arrive的原因是主線程注冊(cè)的第六個(gè)線程還未到達(dá),需要手動(dòng)到達(dá),才能調(diào)用awaitAdvance阻塞屏障
        phaser.arrive();
        // 因?yàn)镻haser線程數(shù)為6,所以即使5個(gè)線程已經(jīng)到達(dá),但是還差主線程的一個(gè),目前階段數(shù)就是0
        phaser.awaitAdvance(0);
        System.out.println("all task is end");
    }
    static class Task extends Thread{
        Phaser phaser;
        public Task(int num,Phaser phaser){
            super("Thread--"+String.valueOf(num));
            this.phaser = phaser;
        }
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+"===task1 is start");
                TimeUnit.SECONDS.sleep(random.nextInt(3));
                System.out.println(Thread.currentThread().getName()+"===task1 is end");
                // 到達(dá)且不等待
                phaser.arrive();
                System.out.println(Thread.currentThread().getName()+"===task2 is start");
                TimeUnit.SECONDS.sleep(random.nextInt(3));
                System.out.println(Thread.currentThread().getName()+"===task2 is end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

中斷響應(yīng)

我們需要特別注意的就是Phaser所有API中只有awaitAdvanceInterruptibly是響應(yīng)中斷的,其余全部不會(huì)響應(yīng)中斷所以不需要對(duì)其進(jìn)行異常處理,演示如下

public static void main(String[] args) {
        Phaser phaser = new Phaser(3);
        Thread T1 = new Thread(()->{
            try {
                phaser.awaitAdvanceInterruptibly(phaser.getPhase());
            } catch (InterruptedException e) {
                System.out.println("中斷異常");
                e.printStackTrace();
            }
            //phaser.arriveAndAwaitAdvance();
        });
        T1.start();
        T1.interrupt();
        phaser.arriveAndAwaitAdvance();
    }

圖片

到此這篇關(guān)于Java中的Phaser并發(fā)階段器詳解的文章就介紹到這了,更多相關(guān)Phaser并發(fā)階段器內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • GsonFormat快速生成JSon實(shí)體類的實(shí)現(xiàn)

    GsonFormat快速生成JSon實(shí)體類的實(shí)現(xiàn)

    GsonFormat主要用于使用Gson庫(kù)將JSONObject格式的String?解析成實(shí)體,本文主要介紹了GsonFormat快速生成JSon實(shí)體類的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下
    2023-05-05
  • Java消息隊(duì)列中的Kafka如何保證冪等性

    Java消息隊(duì)列中的Kafka如何保證冪等性

    這篇文章主要介紹了Java消息隊(duì)列中的Kafka如何保證冪等性,Kafka是一種消息隊(duì)列,主要用來(lái)處理大量數(shù)據(jù)狀態(tài)下的消息隊(duì)列,一般用來(lái)做日志的處理,既然是消息隊(duì)列,那么Kafka也就擁有消息隊(duì)列的相應(yīng)的特性了,需要的朋友可以參考下
    2023-07-07
  • Java中CyclicBarrier的用法分析

    Java中CyclicBarrier的用法分析

    CyclicBarrier和CountDownLatch一樣,都是關(guān)于線程的計(jì)數(shù)器。用法略有不同,測(cè)試代碼如下:
    2013-03-03
  • Spring?AOP概念及原理解析

    Spring?AOP概念及原理解析

    這篇文章主要介紹了Spring?AOP概念及原理?,通過(guò)使用?Spring?AOP?實(shí)現(xiàn)日志管理,我們可以將日志記錄的邏輯從業(yè)務(wù)邏輯中分離出來(lái),簡(jiǎn)化了代碼的維護(hù),需要的朋友可以參考下
    2024-07-07
  • FastJson踩坑:@JsonField在反序列化時(shí)失效的解決

    FastJson踩坑:@JsonField在反序列化時(shí)失效的解決

    這篇文章主要介紹了FastJson踩坑:@JsonField在反序列化時(shí)失效的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • java mybatis框架實(shí)現(xiàn)多表關(guān)系查詢功能

    java mybatis框架實(shí)現(xiàn)多表關(guān)系查詢功能

    這篇文章主要介紹了java mybatis框架實(shí)現(xiàn)多表關(guān)系查詢,基于Maven框架的整體設(shè)計(jì) —— 一多一的關(guān)系,文中通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-10-10
  • Spring @Transactional工作原理詳解

    Spring @Transactional工作原理詳解

    這篇文章主要介紹了Spring @Transactional工作原理詳解,具有一定借鑒價(jià)值,需要的朋友可以參考下。
    2017-12-12
  • OpenFeign超時(shí)時(shí)間設(shè)置不生效問(wèn)題排查記錄

    OpenFeign超時(shí)時(shí)間設(shè)置不生效問(wèn)題排查記錄

    文章主要講述了在升級(jí)Spring Boot 3后,發(fā)現(xiàn)配置文件中的OpenFeign超時(shí)時(shí)間設(shè)置不生效的問(wèn)題,通過(guò)查看FeignClientFactoryBean類和FeignClientProperties類的源碼,發(fā)現(xiàn)配置讀取的方式發(fā)生了變化,從而導(dǎo)致超時(shí)時(shí)間設(shè)置不生效
    2024-11-11
  • 一文詳解Java對(duì)象的序列化和反序列化

    一文詳解Java對(duì)象的序列化和反序列化

    本文主要介紹了一文詳解Java對(duì)象的序列化和反序列化,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • java中棧和隊(duì)列的實(shí)現(xiàn)和API的用法(詳解)

    java中棧和隊(duì)列的實(shí)現(xiàn)和API的用法(詳解)

    下面小編就為大家?guī)?lái)一篇java中棧和隊(duì)列的實(shí)現(xiàn)和API的用法(詳解)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-05-05

最新評(píng)論