Spring Reactor基本介紹和案例解析
1. Reactor 對(duì)比
1.1 Reactor 線程模型
Reactor 線程模型就是通過(guò) 單個(gè)線程 使用 Java NIO 包中的 Selector 的 select()方法,進(jìn)行監(jiān)聽(tīng)。當(dāng)獲取到事件(如 accept、read 等)后,就會(huì)分配(dispatch)事件進(jìn)行相應(yīng)的事件處理(handle)。
如果要給 Reactor 線程模型 下一個(gè)更明確的定義,應(yīng)該是:
Reactor線程模式 = Reactor(I/O多路復(fù)用)+ 線程池
Netty、Redis 使用了此模型,主要是解決 C10K 問(wèn)題
C10K 問(wèn)題:服務(wù)器如何支持 10K 個(gè)并發(fā)連接
1.2 Spring Reactor
Reactor 是 JVM 完全非阻塞的響應(yīng)式編程基礎(chǔ),響應(yīng)式編程是一種關(guān)注數(shù)據(jù)流和變化傳播的異步編程范式。這意味著可以通過(guò)所采用的編程語(yǔ)言輕松地表達(dá)靜態(tài)(例如數(shù)組)或動(dòng)態(tài)(例如事件發(fā)射器)數(shù)據(jù)流。
Mono<List<String>> cartInfoMono = Mono.just( "songjiyang" ) .map( UserService::findUserByName ) .map( UserService::findUserShoppingCart ); String user = UserService.findUserByName( "songjiyang" ); List<String> userShoppingCart = UserService.findUserShoppingCart( user );
聯(lián)系:
兩者都是使用異步的手段來(lái)提高系統(tǒng)的性能
區(qū)別:
Reactor 模型主要異步的處理新連接、連接和讀寫(xiě),而 Spring Reactor 在更高的代碼級(jí)別提供了異步框架
或者反過(guò)來(lái)說(shuō),新連接、連接和讀寫(xiě)等事件觸發(fā)了 Netty Reactor 的某些管道處理器流程,某些事件觸發(fā)了 Spring Reactor 的執(zhí)行流程,這也是 Reactor(反應(yīng)器)名稱(chēng)的由來(lái)
2. Java 中的異步
上面我們一直在講異步,異步其實(shí)是針對(duì)調(diào)用者的,也就是調(diào)用者調(diào)用完方法之后就可以做的別的事情了,Java 中實(shí)現(xiàn)異步就兩種方式:
回調(diào)- 多線程
2.1 回調(diào)
回調(diào)其實(shí)就是把當(dāng)前的事情完成之后,后面需要做的事當(dāng)成函數(shù)傳進(jìn)行,等完成之后調(diào)用就行
public static void main( String[] args ){ doA( ( next ) -> { log.info( "doB" ); next.run(); }, () -> log.info( "doC" ) ); } public static void doA( Consumer<Runnable> next, Runnable nextNext ){ log.info( "doA" ); next.accept( nextNext ); } // output 15:06:52.818 [main] INFO concurrent.CompleteTest - doA 15:06:52.820 [main] INFO concurrent.CompleteTest - doB 15:06:52.820 [main] INFO concurrent.CompleteTest - doC
回調(diào)是在一個(gè)線程中來(lái)完成的,很容易理解,但問(wèn)題是回調(diào)太多代碼就變的很復(fù)雜,有回調(diào)地域的問(wèn)題
回調(diào)只是一種異步的編程方式,本身實(shí)現(xiàn)異步其實(shí)還是需要多線程,例如單獨(dú)起一個(gè)監(jiān)聽(tīng)線程來(lái)執(zhí)行回調(diào)函數(shù),例如 EventListener
如果執(zhí)行的任務(wù)不考慮線程安全問(wèn)題的話,可以使用 CompletableFuture 來(lái)解決,會(huì)更加易于閱讀
CompletableFuture .runAsync( ()-> log.info("doA") ) .thenRunAsync( ()-> log.info("doB") ) .thenRunAsync( ()->log.info("doC") ) .get(); // output 15:08:04.407 [ForkJoinPool.commonPool-worker-1] INFO concurrent.CompleteTest - doA 15:08:04.410 [ForkJoinPool.commonPool-worker-1] INFO concurrent.CompleteTest - doB 15:08:04.410 [ForkJoinPool.commonPool-worker-1] INFO concurrent.CompleteTest - doC
CompletableFuture 的 thenRunAsync 也是基于回調(diào),每個(gè)任務(wù) Class 會(huì)有一個(gè) next, 多個(gè)任務(wù)組成一個(gè)回調(diào)鏈
Mono.just("") .doOnNext( (x)-> log.info("doA") ) .doOnNext( (x)-> log.info("doB") ) .doOnNext( (x)-> log.info("doC") ) .block(); 15:12:56.160 [main] INFO concurrent.CompleteTest - doA 15:12:56.160 [main] INFO concurrent.CompleteTest - doB 15:12:56.161 [main] INFO concurrent.CompleteTest - doC
2.2 多線程
多線程的方式,大家應(yīng)該都很熟悉
- Thread
- ExecutorService 線程池
- CompletionService 帶結(jié)果隊(duì)列的線程池
- CompletableFuture 用于任務(wù)編排
- Runable、Callable、Future、CompletableFuture
3. Spring Reactor
從上面可以看到一些使用 Reactor 的代碼中,都可以在原生 JDK 中找到替換,那我們?yōu)槭裁催€需要它呢?
- 可組合和可讀性
- 豐富的操作
- 訂閱之前什么都不會(huì)發(fā)生
- 背壓
下面是 Java9 中 Flow 類(lèi)的類(lèi)圖,SpringReactor 也是使用這四個(gè)類(lèi),在 Java9 中已經(jīng)成了規(guī)范
3.1 Publisher
Mono,提供 0 到 1 個(gè) Item
Flux,提供 0 到 N 個(gè) Item
發(fā)布者提供 n 個(gè) Item, 經(jīng)過(guò)一些 operator(數(shù)據(jù)處理操作),完成或者異常中止
核心方法:
subscribe
3.1.1 創(chuàng)建
Mono<String> noData = Mono.empty(); Mono<String> data = Mono.just("foo"); Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); Mono.fromSupplier( ()->1 ); Mono.fromFuture( CompletableFuture.runAsync( ()-> {} ) ); Flux.create((sink)->{ for( int i = 0; i < 5; i++ ){ sink.next( i ) ; } sink.complete(); });
下面這些都稱(chēng)為 operator,可以很靈活處理其中的 Item
- 轉(zhuǎn)化 map、flatMap、
- 消費(fèi) doOnNext、doNextError、doOnCancel
- 過(guò)濾 filter、distinct、take
- 錯(cuò)誤處理 onErrorReturn、onErrorComplete、onErrorResume、doFinally
- 時(shí)間相關(guān) timeout、interval、delay
- 分隔 window、buffer
- 轉(zhuǎn)同步 block、toStream
3.1.3 訂閱
訂閱然后消費(fèi)發(fā)布者的內(nèi)容
subscribe(); subscribe(Consumer<? super T> consumer);
訂閱之后的返回值是Disposable****,可以使用這個(gè)對(duì)象來(lái)取消訂閱,會(huì)告訴發(fā)布者停止生產(chǎn)對(duì)象,但不保證會(huì)立即終止
- 當(dāng)然可以給 subscribe 傳遞參數(shù),自定義 complete 或者 error 時(shí)需要做的時(shí)
- 同時(shí)可以使用 BaseSubscriber 類(lèi)來(lái)實(shí)現(xiàn)訂閱,可以控制消費(fèi)的數(shù)量
3.2 Subscriber
消費(fèi)者一般不用手動(dòng)創(chuàng)建,通過(guò) subscribe 傳進(jìn) Consumer 函數(shù)后,會(huì)自動(dòng)生成一個(gè) LambdaSubscriber,核心方法:
- onSubscribe
- onNext
- onError
- onComplete
3.3 Processor
既是發(fā)布者,又是訂閱者
3.4 Subscription
訂閱,消費(fèi)者調(diào)用 subscribe 方法之后可以在 onSubscribe 回調(diào)中獲取,可以請(qǐng)求下一個(gè) Item 或者取消訂閱
- request
- cancel
3.5 Thread 和 Scheduler
沒(méi)有指定的情況下:
- 當(dāng)前的 operator 使用上一個(gè) operator 的線程,最先的 operator 使用調(diào)用 subscribe 的線程來(lái)執(zhí)行
Reactor 中使用 Scheduler 來(lái)執(zhí)行流程,類(lèi)似 ExecutorService
- subscribeOn 可以指定訂閱時(shí)使用的線程,這樣可以不阻塞的訂閱
- publishOn 指定發(fā)布時(shí)使用的線程
4. Spring Reactor 優(yōu)化案例
流程中可以?xún)?yōu)化的點(diǎn):
- 準(zhǔn)備數(shù)據(jù)可以異步,等需要用的時(shí)候在去阻塞獲取,相當(dāng)于一個(gè) Future
- 召回可以完成之后就去等正排數(shù)據(jù),新的問(wèn)題,如何去重?本來(lái)拿一次正排數(shù)據(jù),現(xiàn)在拿 N 個(gè)召回次數(shù)據(jù),請(qǐng)求量是不是會(huì)變大,耗時(shí)是不是也會(huì)增加
- 過(guò)濾的準(zhǔn)備數(shù)據(jù)也可以異步,也就是說(shuō)某個(gè)過(guò)濾策略的數(shù)據(jù)準(zhǔn)備好了,就可以去執(zhí)行過(guò)濾了,而且還存在很多不需要依賴(lài)數(shù)據(jù)的過(guò)濾策略也需要等
- 一般粗排只需要 1000 條數(shù)據(jù),過(guò)濾時(shí)已經(jīng)拿夠了 1000 條就可以跳過(guò)了
我們上面所說(shuō)的異步,其實(shí)就是說(shuō)流程中某些節(jié)點(diǎn)是在同時(shí)執(zhí)行的,不必等一個(gè)節(jié)點(diǎn)完成后再執(zhí)行另外一個(gè),這其實(shí)一個(gè)統(tǒng)籌學(xué)的問(wèn)題
4.1 解決方法對(duì)比
問(wèn)題 | Java 原生 | Reactor |
---|---|---|
準(zhǔn)備數(shù)據(jù)異步 | Future,缺點(diǎn):1. 需要調(diào)用方處理異常 2. 不能編排后續(xù)流程,eg: 拿完企業(yè)信息后繼續(xù)拿企業(yè)治理信息,F(xiàn)uture 需要 get 阻塞 | Mono, 使用 onErrorResume 處理異常,使用 map 編排后續(xù)流程 |
召回完成拿正排 | 需要一個(gè)阻塞隊(duì)列,召回把結(jié)果往里面 push,另外一個(gè)線程從隊(duì)列里面拿同時(shí)去取正排數(shù)據(jù),需要自己維護(hù) map 來(lái)去重,需要循環(huán)等待到達(dá)批次后去取正排 | Flux,召回使用 sink.next 把結(jié)果放進(jìn)去合并節(jié)點(diǎn)訂閱,使用 distinct 來(lái)去重,使用 buffer 來(lái)實(shí)現(xiàn)批次數(shù)據(jù) |
過(guò)濾準(zhǔn)備數(shù)據(jù)異步 | 需要阻塞隊(duì)列 | Flux, 在依賴(lài)任務(wù)中把準(zhǔn)備好的過(guò)濾策略放進(jìn)去,過(guò)濾節(jié)點(diǎn)訂閱 Flux 并過(guò)濾 |
粗排取 1000 條 | 異步執(zhí)行過(guò)濾,把過(guò)濾結(jié)果放到一個(gè)容器中,粗排節(jié)點(diǎn)不斷查看這個(gè)容器的結(jié)果是否夠 1000 條,夠了就可以執(zhí)行粗排了 | Flux, 使用 take(1000) |
for (StrategyConfig filterConfig : filterConfigList) { doStrategyFilter(filterChainContext, recommendContext, recRequest, filterConfig, allFilters, partitionContext, partitionTrace); } readyStrategyFlux.publishOn(ExecutorServiceHolder.scheduler).doOnNext((readyStrategyName) -> { try { List<StrategyConfig> strategyConfigs = strategyNameToConfigs.get(readyStrategyName); for (StrategyConfig strategyConfig : strategyConfigs) { doStrategyFilter(filterChainContext, recommendContext, recRequest, strategyConfig, allFilters, partitionContext, partitionTrace); } } catch (Exception e) { LOGGER.error("doOnNext filter error", e); } }).blockLast();
這里的 blockLast 又回到了同步世界,可以很好的和已有的代碼兼容
下面是 20240629 到 20240702 某個(gè)場(chǎng)景優(yōu)化過(guò)濾階段的耗時(shí)對(duì)比
pv | qps | tp99 | avg | |
---|---|---|---|---|
實(shí)驗(yàn)組 | 4051865 | 46.90 | 369.00 | 230.88 |
對(duì)照組 | 4054074 | 46.92 | 397.00 | 251.55 |
業(yè)務(wù)指標(biāo)對(duì)比
無(wú)明顯波動(dòng)
5. 總結(jié)
Spring Reactor 是一個(gè)響應(yīng)式編程框架,非常適合類(lèi)似 MXN 這樣的流程編排系統(tǒng),也是 Java 中異步編程的一種補(bǔ)充,但也會(huì)有一些其他的問(wèn)題,例如潛在的線程安全問(wèn)題,已有框架的沖突 ThreadLocal 等
參考資料
【1】深入 Netty 邏輯架構(gòu),從 Reactor 線程模型開(kāi)始(一)-阿里云開(kāi)發(fā)者社區(qū)
【3】C10k 問(wèn)題簡(jiǎn)述-CSDN 博客
到此這篇關(guān)于Spring Reactor基本介紹和案例的文章就介紹到這了,更多相關(guān)Spring Reactor內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot實(shí)現(xiàn)在webapp下直接訪問(wèn)html,jsp
這篇文章主要介紹了SpringBoot實(shí)現(xiàn)在webapp下直接訪問(wèn)html,jsp問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-10-10mybatis實(shí)現(xiàn)獲取入?yún)⑹荓ist和Map的取值
這篇文章主要介紹了mybatis實(shí)現(xiàn)獲取入?yún)⑹荓ist和Map的取值問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06JavaEE中用response向客戶(hù)端輸出中文數(shù)據(jù)亂碼問(wèn)題分析
這篇文章主要介紹了JavaEE中用response向客戶(hù)端輸出中文數(shù)據(jù)亂碼問(wèn)題分析,需要的朋友可以參考下2014-10-10MyBatis結(jié)果映射(ResultMap)的使用
在MyBatis中,結(jié)果映射是實(shí)現(xiàn)數(shù)據(jù)庫(kù)結(jié)果集到Java對(duì)象映射的核心,它不僅支持簡(jiǎn)單的字段映射,還能處理字段名不一致、嵌套對(duì)象和集合映射等復(fù)雜場(chǎng)景,通過(guò)ResultMap,開(kāi)發(fā)者可以靈活定義映射關(guān)系,以適應(yīng)各種需求,感興趣的可以了解一下2024-09-09springboot使用logback自定義日志的詳細(xì)過(guò)程
這篇文章主要介紹了springboot使用logback自定義日志的詳細(xì)過(guò)程,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),感興趣的朋友一起看看吧2024-12-12Java 獲取指定日期的實(shí)現(xiàn)方法總結(jié)
以下是對(duì)Java中獲取指定日期的實(shí)現(xiàn)方法進(jìn)行了歸納總結(jié),需要的朋友可以參考下2013-07-07使用Lombok子類(lèi)繼承父類(lèi),父類(lèi)屬性不生效問(wèn)題及解決
在使用Lombok庫(kù)時(shí),若子類(lèi)繼承父類(lèi),父類(lèi)的屬性可能不會(huì)自動(dòng)生效,為解決此問(wèn)題,可通過(guò)在父類(lèi)上添加@Getter和@Setter注解,或使用@SuperBuilder注解來(lái)確保父類(lèi)屬性在子類(lèi)中有效,同時(shí),需注意確保Lombok版本一致且正確配置了相關(guān)插件2024-10-10關(guān)于HashMap的put方法執(zhí)行全過(guò)程
這篇文章主要介紹了關(guān)于HashMap的put方法執(zhí)行全過(guò)程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06SpringBoot Maven打包失敗報(bào):class lombok.javac.apt.Lombo
最新項(xiàng)目部署的時(shí)候,出現(xiàn)了一個(gè)maven打包失敗的問(wèn)題,報(bào):class lombok.javac.apt.LombokProcessor錯(cuò)誤,所以本文給大家介紹了如何解決SpringBoot Maven 打包失?。篶lass lombok.javac.apt.LombokProcessor 錯(cuò)誤,需要的朋友可以參考下2023-12-12