亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Spring Reactor基本介紹和案例解析

 更新時(shí)間:2024年07月05日 09:13:54   作者:songtianer  
Spring Reactor 是一個(gè)響應(yīng)式編程框架,非常適合類(lèi)似 MXN 這樣的流程編排系統(tǒng),也是 Java 中異步編程的一種補(bǔ)充,這篇文章主要介紹了Spring Reactor基本介紹和案例解析,需要的朋友可以參考下

1. Reactor 對(duì)比

1.1 Reactor 線程模型

Reactor 線程模型就是通過(guò) 單個(gè)線程 使用 Java NIO 包中的 Selector 的 select()方法,進(jìn)行監(jiān)聽(tīng)。當(dāng)獲取到事件(如 accept、read 等)后,就會(huì)分配(dispatch)事件進(jìn)行相應(yīng)的事件處理(handle)。

如果要給 Reactor 線程模型 下一個(gè)更明確的定義,應(yīng)該是:

Reactor線程模式 = Reactor(I/O多路復(fù)用)+ 線程池

Netty、Redis 使用了此模型,主要是解決 C10K 問(wèn)題

C10K 問(wèn)題:服務(wù)器如何支持 10K 個(gè)并發(fā)連接

1.2 Spring Reactor

Reactor 是 JVM 完全非阻塞的響應(yīng)式編程基礎(chǔ),響應(yīng)式編程是一種關(guān)注數(shù)據(jù)流和變化傳播的異步編程范式。這意味著可以通過(guò)所采用的編程語(yǔ)言輕松地表達(dá)靜態(tài)(例如數(shù)組)或動(dòng)態(tài)(例如事件發(fā)射器)數(shù)據(jù)流。

Mono<List<String>> cartInfoMono = Mono.just( "songjiyang" )
        .map( UserService::findUserByName )
        .map( UserService::findUserShoppingCart );
String user = UserService.findUserByName( "songjiyang" );
List<String> userShoppingCart = UserService.findUserShoppingCart( user );

聯(lián)系:

兩者都是使用異步的手段來(lái)提高系統(tǒng)的性能

區(qū)別:

Reactor 模型主要異步的處理新連接、連接和讀寫(xiě),而 Spring Reactor 在更高的代碼級(jí)別提供了異步框架

或者反過(guò)來(lái)說(shuō),新連接、連接和讀寫(xiě)等事件觸發(fā)了 Netty Reactor 的某些管道處理器流程,某些事件觸發(fā)了 Spring Reactor 的執(zhí)行流程,這也是 Reactor(反應(yīng)器)名稱(chēng)的由來(lái)

2. Java 中的異步

上面我們一直在講異步,異步其實(shí)是針對(duì)調(diào)用者的,也就是調(diào)用者調(diào)用完方法之后就可以做的別的事情了,Java 中實(shí)現(xiàn)異步就兩種方式:

  • 回調(diào)
  • 多線程

2.1 回調(diào)

回調(diào)其實(shí)就是把當(dāng)前的事情完成之后,后面需要做的事當(dāng)成函數(shù)傳進(jìn)行,等完成之后調(diào)用就行

    public static void main( String[] args ){
       doA( ( next ) -> {
          log.info( "doB" );
          next.run();
       }, () -> log.info( "doC" ) );
    }
    public static void doA( Consumer<Runnable> next, Runnable nextNext ){
       log.info( "doA" );
       next.accept( nextNext );
    }
// output
15:06:52.818 [main] INFO concurrent.CompleteTest - doA
15:06:52.820 [main] INFO concurrent.CompleteTest - doB
15:06:52.820 [main] INFO concurrent.CompleteTest - doC

回調(diào)是在一個(gè)線程中來(lái)完成的,很容易理解,但問(wèn)題是回調(diào)太多代碼就變的很復(fù)雜,有回調(diào)地域的問(wèn)題

回調(diào)只是一種異步的編程方式,本身實(shí)現(xiàn)異步其實(shí)還是需要多線程,例如單獨(dú)起一個(gè)監(jiān)聽(tīng)線程來(lái)執(zhí)行回調(diào)函數(shù),例如 EventListener

如果執(zhí)行的任務(wù)不考慮線程安全問(wèn)題的話,可以使用 CompletableFuture 來(lái)解決,會(huì)更加易于閱讀

CompletableFuture
       .runAsync( ()-> log.info("doA") )
       .thenRunAsync( ()-> log.info("doB") )
       .thenRunAsync( ()->log.info("doC") )
       .get();
// output
15:08:04.407 [ForkJoinPool.commonPool-worker-1] INFO concurrent.CompleteTest - doA
15:08:04.410 [ForkJoinPool.commonPool-worker-1] INFO concurrent.CompleteTest - doB
15:08:04.410 [ForkJoinPool.commonPool-worker-1] INFO concurrent.CompleteTest - doC

CompletableFuture 的 thenRunAsync 也是基于回調(diào),每個(gè)任務(wù) Class 會(huì)有一個(gè) next, 多個(gè)任務(wù)組成一個(gè)回調(diào)鏈

Mono.just("")
       .doOnNext( (x)-> log.info("doA") )
       .doOnNext( (x)-> log.info("doB") )
       .doOnNext( (x)-> log.info("doC") )
       .block();
15:12:56.160 [main] INFO concurrent.CompleteTest - doA
15:12:56.160 [main] INFO concurrent.CompleteTest - doB
15:12:56.161 [main] INFO concurrent.CompleteTest - doC

2.2 多線程

多線程的方式,大家應(yīng)該都很熟悉

  • Thread
  • ExecutorService 線程池
  • CompletionService 帶結(jié)果隊(duì)列的線程池
  • CompletableFuture 用于任務(wù)編排
  • Runable、Callable、Future、CompletableFuture

3. Spring Reactor

從上面可以看到一些使用 Reactor 的代碼中,都可以在原生 JDK 中找到替換,那我們?yōu)槭裁催€需要它呢?

  • 可組合和可讀性
  • 豐富的操作
  • 訂閱之前什么都不會(huì)發(fā)生
  • 背壓

下面是 Java9 中 Flow 類(lèi)的類(lèi)圖,SpringReactor 也是使用這四個(gè)類(lèi),在 Java9 中已經(jīng)成了規(guī)范

3.1 Publisher

Mono,提供 0 到 1 個(gè) Item

Flux,提供 0 到 N 個(gè) Item

發(fā)布者提供 n 個(gè) Item, 經(jīng)過(guò)一些 operator(數(shù)據(jù)處理操作),完成或者異常中止

核心方法:

subscribe

3.1.1 創(chuàng)建

Mono<String> noData = Mono.empty(); 
Mono<String> data = Mono.just("foo");
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); 
Mono.fromSupplier( ()->1 );
Mono.fromFuture( CompletableFuture.runAsync( ()-> {} ) );
Flux.create((sink)->{
    for( int i = 0; i < 5; i++ ){
       sink.next( i ) ;
    }
    sink.complete();
});

下面這些都稱(chēng)為 operator,可以很靈活處理其中的 Item

  • 轉(zhuǎn)化 map、flatMap、
  • 消費(fèi) doOnNext、doNextError、doOnCancel
  • 過(guò)濾 filter、distinct、take
  • 錯(cuò)誤處理 onErrorReturn、onErrorComplete、onErrorResume、doFinally
  • 時(shí)間相關(guān) timeout、interval、delay
  • 分隔 window、buffer
  • 轉(zhuǎn)同步 block、toStream

3.1.3 訂閱

訂閱然后消費(fèi)發(fā)布者的內(nèi)容

subscribe(); 
subscribe(Consumer<? super T> consumer); 

訂閱之后的返回值是Disposable****,可以使用這個(gè)對(duì)象來(lái)取消訂閱,會(huì)告訴發(fā)布者停止生產(chǎn)對(duì)象,但不保證會(huì)立即終止

  • 當(dāng)然可以給 subscribe 傳遞參數(shù),自定義 complete 或者 error 時(shí)需要做的時(shí)
  • 同時(shí)可以使用 BaseSubscriber 類(lèi)來(lái)實(shí)現(xiàn)訂閱,可以控制消費(fèi)的數(shù)量

3.2 Subscriber

消費(fèi)者一般不用手動(dòng)創(chuàng)建,通過(guò) subscribe 傳進(jìn) Consumer 函數(shù)后,會(huì)自動(dòng)生成一個(gè) LambdaSubscriber,核心方法:

  • onSubscribe
  • onNext
  • onError
  • onComplete

3.3 Processor

既是發(fā)布者,又是訂閱者

3.4 Subscription

訂閱,消費(fèi)者調(diào)用 subscribe 方法之后可以在 onSubscribe 回調(diào)中獲取,可以請(qǐng)求下一個(gè) Item 或者取消訂閱

  • request
  • cancel

3.5 Thread 和 Scheduler

沒(méi)有指定的情況下:

  • 當(dāng)前的 operator 使用上一個(gè) operator 的線程,最先的 operator 使用調(diào)用 subscribe 的線程來(lái)執(zhí)行

Reactor 中使用 Scheduler 來(lái)執(zhí)行流程,類(lèi)似 ExecutorService

  • subscribeOn 可以指定訂閱時(shí)使用的線程,這樣可以不阻塞的訂閱
  • publishOn 指定發(fā)布時(shí)使用的線程

4. Spring Reactor 優(yōu)化案例

流程中可以?xún)?yōu)化的點(diǎn):

  • 準(zhǔn)備數(shù)據(jù)可以異步,等需要用的時(shí)候在去阻塞獲取,相當(dāng)于一個(gè) Future
  • 召回可以完成之后就去等正排數(shù)據(jù),新的問(wèn)題,如何去重?本來(lái)拿一次正排數(shù)據(jù),現(xiàn)在拿 N 個(gè)召回次數(shù)據(jù),請(qǐng)求量是不是會(huì)變大,耗時(shí)是不是也會(huì)增加
  • 過(guò)濾的準(zhǔn)備數(shù)據(jù)也可以異步,也就是說(shuō)某個(gè)過(guò)濾策略的數(shù)據(jù)準(zhǔn)備好了,就可以去執(zhí)行過(guò)濾了,而且還存在很多不需要依賴(lài)數(shù)據(jù)的過(guò)濾策略也需要等
  • 一般粗排只需要 1000 條數(shù)據(jù),過(guò)濾時(shí)已經(jīng)拿夠了 1000 條就可以跳過(guò)了

我們上面所說(shuō)的異步,其實(shí)就是說(shuō)流程中某些節(jié)點(diǎn)是在同時(shí)執(zhí)行的,不必等一個(gè)節(jié)點(diǎn)完成后再執(zhí)行另外一個(gè),這其實(shí)一個(gè)統(tǒng)籌學(xué)的問(wèn)題

4.1 解決方法對(duì)比

問(wèn)題Java 原生Reactor
準(zhǔn)備數(shù)據(jù)異步Future,缺點(diǎn):1. 需要調(diào)用方處理異常 2. 不能編排后續(xù)流程,eg: 拿完企業(yè)信息后繼續(xù)拿企業(yè)治理信息,F(xiàn)uture 需要 get 阻塞Mono, 使用 onErrorResume 處理異常,使用 map 編排后續(xù)流程
召回完成拿正排需要一個(gè)阻塞隊(duì)列,召回把結(jié)果往里面 push,另外一個(gè)線程從隊(duì)列里面拿同時(shí)去取正排數(shù)據(jù),需要自己維護(hù) map 來(lái)去重,需要循環(huán)等待到達(dá)批次后去取正排Flux,召回使用 sink.next 把結(jié)果放進(jìn)去合并節(jié)點(diǎn)訂閱,使用 distinct 來(lái)去重,使用 buffer 來(lái)實(shí)現(xiàn)批次數(shù)據(jù)
過(guò)濾準(zhǔn)備數(shù)據(jù)異步需要阻塞隊(duì)列Flux, 在依賴(lài)任務(wù)中把準(zhǔn)備好的過(guò)濾策略放進(jìn)去,過(guò)濾節(jié)點(diǎn)訂閱 Flux 并過(guò)濾
粗排取 1000 條異步執(zhí)行過(guò)濾,把過(guò)濾結(jié)果放到一個(gè)容器中,粗排節(jié)點(diǎn)不斷查看這個(gè)容器的結(jié)果是否夠 1000 條,夠了就可以執(zhí)行粗排了Flux, 使用 take(1000)
for (StrategyConfig filterConfig : filterConfigList) {
    doStrategyFilter(filterChainContext, recommendContext, recRequest, filterConfig, allFilters, partitionContext, partitionTrace);
}
readyStrategyFlux.publishOn(ExecutorServiceHolder.scheduler).doOnNext((readyStrategyName) -> {
    try {
        List<StrategyConfig> strategyConfigs = strategyNameToConfigs.get(readyStrategyName);
        for (StrategyConfig strategyConfig : strategyConfigs) {
            doStrategyFilter(filterChainContext, recommendContext, recRequest, strategyConfig, allFilters, partitionContext, partitionTrace);
        }
    } catch (Exception e) {
        LOGGER.error("doOnNext filter error", e);
    }
}).blockLast();

這里的 blockLast 又回到了同步世界,可以很好的和已有的代碼兼容

下面是 20240629 到 20240702 某個(gè)場(chǎng)景優(yōu)化過(guò)濾階段的耗時(shí)對(duì)比

pvqpstp99avg
實(shí)驗(yàn)組405186546.90369.00230.88
對(duì)照組405407446.92397.00251.55

業(yè)務(wù)指標(biāo)對(duì)比

無(wú)明顯波動(dòng)

5. 總結(jié)

Spring Reactor 是一個(gè)響應(yīng)式編程框架,非常適合類(lèi)似 MXN 這樣的流程編排系統(tǒng),也是 Java 中異步編程的一種補(bǔ)充,但也會(huì)有一些其他的問(wèn)題,例如潛在的線程安全問(wèn)題,已有框架的沖突 ThreadLocal 等

參考資料

【1】深入 Netty 邏輯架構(gòu),從 Reactor 線程模型開(kāi)始(一)-阿里云開(kāi)發(fā)者社區(qū)

【2】Reactor 3 Reference Guide

【3】C10k 問(wèn)題簡(jiǎn)述-CSDN 博客

到此這篇關(guān)于Spring Reactor基本介紹和案例的文章就介紹到這了,更多相關(guān)Spring Reactor內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論