亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Reactor 多任務(wù)并發(fā)執(zhí)行且結(jié)果按順序返回第一個

 更新時間:2022年09月22日 15:06:38   作者:???????六七十三  
這篇文章主要介紹了Reactor 多任務(wù)并發(fā)執(zhí)行且結(jié)果按順序返回第一個,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,感興趣的小伙伴可以參考一下

1 場景

調(diào)用多個平級服務(wù),按照服務(wù)優(yōu)先級返回第一個有效數(shù)據(jù)。

具體case:一個頁面可能有很多的彈窗,彈窗之間又有優(yōu)先級。每次只需要返回第一個有數(shù)據(jù)的彈窗。但是又希望所有彈窗之間的數(shù)據(jù)獲取是異步的。這種場景使用 Reactor 怎么實現(xiàn)呢?

2 創(chuàng)建 service

2.1 創(chuàng)建基本接口和實體類

public interface TestServiceI {
    Mono request();
}

提供一個 request 方法,返回一個 Mono 對象。

@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class TestUser {
    private String name;
}

2.2 創(chuàng)建 service 實現(xiàn)

@Slf4j
public class TestServiceImpl1 implements TestServiceI {
    @Override
    public Mono request() {
        log.info("execute.test.service1");
        return Mono.fromSupplier(() -> {
                    try {
                        System.out.println("service1.threadName=" + Thread.currentThread().getName());
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return "";
                })
                .map(name -> {
                    return new TestUser(name);
                });
    }
}

第一個 service 執(zhí)行耗時 500ms。返回空對象;

創(chuàng)建第二個 service 執(zhí)行耗時 1000ms。返回空對象;代碼如上,改一下sleep時間即可。

繼續(xù)創(chuàng)建第三個 service 執(zhí)行耗時 1000ms。返回 name3。代碼如上,改一下 sleep 時間,以及返回為 name3。

3 主體方法

public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        TestServiceI testServiceImpl4 = new TestServiceImpl4();
        TestServiceI testServiceImpl5 = new TestServiceImpl5();
        TestServiceI testServiceImpl6 = new TestServiceImpl6();
        List<TestServiceI> serviceIList = new ArrayList<>();
        serviceIList.add(testServiceImpl4);
        serviceIList.add(testServiceImpl5);
        serviceIList.add(testServiceImpl6);

    // 執(zhí)行 service 列表,這樣有多少個 service 都可以
        Flux<Mono<TestUser>> monoFlux = Flux.fromIterable(serviceIList)
                .map(service -> {
                    return service.request();
                });
    // flatMap(或者flatMapSequential) + map 實現(xiàn)異常繼續(xù)下一個執(zhí)行
        Flux flux = monoFlux.flatMapSequential(mono -> {
            return mono.map(user -> {
                        TestUser testUser = JsonUtil.parseJson(JsonUtil.toJson(user), TestUser.class);
                        if (Objects.nonNull(testUser) && StringUtils.isNotBlank(testUser.getName())) {
                            return testUser;
                        }
            // null 在 reactor 中是異常數(shù)據(jù)。
                        return null;
                    })
                    .onErrorContinue((err, i) -> {
                        log.info("onErrorContinue={}", i);
                    });
        });
        Mono mono = flux.elementAt(0, Mono.just(""));
        Object block = mono.block();
        System.out.println(block + "blockFirst 執(zhí)行耗時ms:" + (System.currentTimeMillis() - startTime));
    }
  • 1、Flux.fromIterable 執(zhí)行 service 列表,可以隨意增刪 service 服務(wù)。
  • 2、flatMap(或者flatMapSequential) + map + onErrorContinue 實現(xiàn)異常繼續(xù)下一個執(zhí)行。具體參考:Reactor中的onErrorContinue 和 onErrorResume
  • 3、Mono mono = flux.elementAt(0, Mono.just("")); 返回第一個正常數(shù)據(jù)。

執(zhí)行輸出:

20:54:26.512 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:54:26.553 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
service1.threadName=main
20:54:27.237 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
20:54:27.237 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
service5.threadName=main
20:54:28.246 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
20:54:28.246 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service6.threadName=main
TestUser(name=name3)blockFirst 執(zhí)行耗時ms:2895

  • 1、service1 和 service2 因為返回空,所以繼續(xù)下一個,最終返回 name3。
  • 2、查看總耗時:2895ms。service1 耗時 500,service2 耗時1000,service3 耗時 1000。發(fā)現(xiàn)耗時基本上等于 service1 + service2 + service3 。這是怎么回事呢?查看返回執(zhí)行的線程,都是 main。

總結(jié):這樣實現(xiàn)按照順序返回第一個正常數(shù)據(jù)。但是執(zhí)行并沒有異步。下一步:如何實現(xiàn)異步呢?

4 實現(xiàn)異步

4.1 subcribeOn 實現(xiàn)異步

修改 service 實現(xiàn)。增加 .subscribeOn(Schedulers.boundedElastic())

如下:

@Slf4j
public class TestServiceImpl1 implements TestServiceI {
    @Override
    public Mono request() {
        log.info("execute.test.service1");
        return Mono.fromSupplier(() -> {
                    try {
                        System.out.println("service1.threadName=" + Thread.currentThread().getName());
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return "";
                })
                //增加subscribeOn
                .subscribeOn(Schedulers.boundedElastic())
                .map(name -> {
                    return new TestUser(name);
                });
    }
}

再次執(zhí)行輸出如下:

21:02:04.213 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
21:02:04.265 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
service4.threadName=boundedElastic-1
21:02:04.300 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
21:02:04.302 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service2.threadName=boundedElastic-2
service3.threadName=boundedElastic-3
21:02:04.987 [boundedElastic-1] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
21:02:05.307 [boundedElastic-2] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
TestUser(name=name6)blockFirst 執(zhí)行耗時ms:1242

  • 1、發(fā)現(xiàn)具體實現(xiàn) sleep 的線程都不是 main 線程,而是 boundedElastic;
  • 2、最終執(zhí)行耗時 1242ms,只比執(zhí)行時間最長的 service2 和 service3 耗時 1000ms,多一些。證明是異步了。

4.2 CompletableFuture 實現(xiàn)異步

修改 service 實現(xiàn),使用 CompletableFuture 執(zhí)行耗時操作(這里是sleep,具體到項目中可能是外部接口調(diào)用,DB 操作等);然后使用 Mono.fromFuture 返回 Mono 對象。

@Slf4j
public class TestServiceImpl1 implements TestServiceI{
    @Override
    public Mono request() {
        log.info("execute.test.service1");
        CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("service1.threadName=" + Thread.currentThread().getName());
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "testname1";
        });

        return Mono.fromFuture(uCompletableFuture).map(name -> {
            return new TestUser(name);
        });
    }
}

執(zhí)行返回如下:

21:09:59.465 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
21:09:59.510 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
service2.threadName=ForkJoinPool.commonPool-worker-1
21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service3.threadName=ForkJoinPool.commonPool-worker-2
21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1 
service1.threadName=ForkJoinPool.commonPool-worker-3
21:10:00.526 [ForkJoinPool.commonPool-worker-1] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
21:10:00.538 [ForkJoinPool.commonPool-worker-2] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
TestUser(name=testname1)blockFirst 執(zhí)行耗時ms:1238

  • 1、耗時操作都是使用 ForkJoinPool 線程池中的線程執(zhí)行。
  • 2、最終耗時和方法1基本差不多。

到此這篇關(guān)于Reactor 多任務(wù)并發(fā)執(zhí)行且結(jié)果按順序返回第一個的文章就介紹到這了,更多相關(guān)Reactor 多任務(wù)執(zhí)行內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java中的內(nèi)部類使用詳情

    Java中的內(nèi)部類使用詳情

    說起內(nèi)部類這個詞,想必很多人都不陌生,但是又會覺得不熟悉。原因是平時編寫代碼時可能用到的場景不多,用得最多的是在有事件監(jiān)聽的情況下,并且即使用到也很少去總結(jié)內(nèi)部類的用法。今天我們就來一探究竟
    2022-03-03
  • 解決問題:Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources

    解決問題:Failed to execute goal org.apache.m

    這篇文章主要給大家介紹了關(guān)于解決問題:Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources的相關(guān)資料,文中將解決的辦法介紹的非常詳細(xì),需要的朋友可以參考下
    2023-03-03
  • Spring AOP的概念與實現(xiàn)過程詳解

    Spring AOP的概念與實現(xiàn)過程詳解

    AOP為Aspect Oriented Programming的縮寫,意為:面向切面編程,可通過運行期動態(tài)代理實現(xiàn)程序功能的統(tǒng)一維護的一種技術(shù)。AOP是 Spring框架中的一個重要內(nèi)容
    2023-02-02
  • 基于JAVA文件中獲取路徑及WEB應(yīng)用程序獲取路徑的方法

    基于JAVA文件中獲取路徑及WEB應(yīng)用程序獲取路徑的方法

    下面小編就為大家?guī)硪黄贘AVA文件中獲取路徑及WEB應(yīng)用程序獲取路徑的方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-11-11
  • MybatisPlus出現(xiàn)Error attempting to get column ‘xxx字段‘ from result set異常解決

    MybatisPlus出現(xiàn)Error attempting to get col

    本文重點分析使用@EnumValue注解轉(zhuǎn)換時遇到的一下錯誤原因,及解決方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-11-11
  • java使用itext如何直接生成pdf

    java使用itext如何直接生成pdf

    在工作中,制作PDF文件是常見需求,尤其是需要插入動態(tài)數(shù)據(jù)或圖像時,使用PDF模板填充表單域通常足夠,但對于復(fù)雜文件,可以通過拼接PDF內(nèi)容來靈活排版,iText庫提供了豐富的PDF操作功能,如設(shè)置頁面大小、邊距、字體、生成動態(tài)表格、添加水印、設(shè)置密碼等
    2024-09-09
  • 詳解SpringCloud LoadBalancer 新一代負(fù)載均衡器

    詳解SpringCloud LoadBalancer 新一代負(fù)載均衡器

    這篇文章主要為大家介紹了SpringCloud LoadBalancer新一代負(fù)載均衡器詳解使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-01-01
  • Java知識梳理之泛型用法詳解

    Java知識梳理之泛型用法詳解

    從JDK?5.0以后,Java引入了“參數(shù)化類型(Parameterized?type)”的概念,允許我們在創(chuàng)建集合時再指定集合元素的類型。本文就來和大家深入聊聊Java泛型的使用
    2022-08-08
  • Java?C++題解leetcode消失的兩個數(shù)字實例

    Java?C++題解leetcode消失的兩個數(shù)字實例

    這篇文章主要介紹了Java?C++題解leetcode消失的兩個數(shù)字實例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-09-09
  • Spring MVC Controller傳遞枚舉值的實例

    Spring MVC Controller傳遞枚舉值的實例

    這篇文章主要介紹了Spring MVC Controller傳遞枚舉值的實例,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09

最新評論