Spring?Boot?Reactor?整合?Resilience4j詳析
1 引入 pom 包
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-all</artifactId>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
</dependency>2 配置說明
2.1 限流 ratelimiter
兩個限流配置:backendA 1s 中最多允許 10 次請求;
backendB 每 500ms 最多允許 6 次請求。
resilience4j.ratelimiter:
instances:
backendA:
limitForPeriod: 10
limitRefreshPeriod: 1s
timeoutDuration: 10ms
registerHealthIndicator: true
eventConsumerBufferSize: 100
backendB:
limitForPeriod: 6
limitRefreshPeriod: 500ms
timeoutDuration: 3s| 配置屬性 | 默認值 | 描述 |
|---|---|---|
| timeoutDuration | 5【s】 | 一個線程等待許可的默認等待時間 |
| limitRefreshPeriod | 500【ns】 | 限制刷新的周期。在每個周期之后,速率限制器將其權(quán)限計數(shù)設(shè)置回 limitForPeriod 值 |
| limitForPeriod | 50 | 一個 limitRefreshPeriod (周期)允許訪問的數(shù)量(許可數(shù)量) |
2.2 重試 retry
注意指定需要重試的異常,不是所有的異常重試都有效。比如 DB 相關(guān)校驗異常,如唯一約束等,重試也不會成功的。
重試配置:
resilience4j.retry:
instances:
backendA:
maxAttempts: 3
waitDuration: 10s
enableExponentialBackoff: true
exponentialBackoffMultiplier: 2
retryExceptions:
- org.springframework.web.client.HttpServerErrorException
- java.io.IOException
backendB:
maxAttempts: 3
waitDuration: 10s
retryExceptions:
- org.springframework.web.client.HttpServerErrorException
- java.io.IOException| 配置屬性 | 默認值 | 描述 |
|---|---|---|
| maxAttempts | 3 | 最大重試次數(shù)(包括第一次) |
| waitDuration | 500【ms】 | 兩次重試之間的等待間隔 |
| intervalFunction | numOfAttempts -> waitDuration | 修改失敗后等待間隔的函數(shù)。默認情況下,等待時間是個常量。 |
| retryOnResultPredicate | result->false | 配置一個判斷結(jié)果是否應(yīng)該重試的 predicate 函數(shù)。如果結(jié)果應(yīng)該重試,Predicate 必須返回 true,否則它必須返回 false。 |
| retryExceptionPredicate | throwable -> true | 和 retryOnResultPredicate 類似,如果要重試,Predicate 必須返回true,否則返回 false。 |
| retryExceptions | 空 | 需要重試的異常類型列表 |
| ignoreExceptions | 空 | 不需要重試的異常類型列表 |
| failAfterMaxAttempts | false | 當重試達到配置的 maxAttempts 并且結(jié)果仍未通過 retryOnResultPredicate 時啟用或禁用拋出 MaxRetriesExceededException 的布爾值 |
| intervalBiFunction | (numOfAttempts, Either<throwable, result>) -> waitDuration | 根據(jù) maxAttempts 和結(jié)果或異常修改失敗后等待間隔時間的函數(shù)。與 intervalFunction 一起使用時會拋出 IllegalStateException。 |
2.3 超時 TimeLimiter
超時配置:
resilience4j.timelimiter:
instances:
backendA:
timeoutDuration: 2s
cancelRunningFuture: true
backendB:
timeoutDuration: 1s
cancelRunningFuture: false超時配置比較簡單,主要是配置 timeoutDuration 也就是超時的時間。
cancelRunningFuture 的意思是:是否應(yīng)該在運行的 Future 調(diào)用 cancel 去掉調(diào)用。
2.4 斷路器 circuitbreaker

斷路器有幾種狀態(tài):關(guān)閉、打開、半開。注意:打開,意味著不能訪問,會迅速失敗。
CircuitBreaker 使用滑動窗口來存儲和匯總調(diào)用結(jié)果。您可以在基于計數(shù)的滑動窗口和基于時間的滑動窗口之間進行選擇。基于計數(shù)的滑動窗口聚合最后 N 次調(diào)用的結(jié)果。基于時間的滑動窗口聚合了最后 N 秒的調(diào)用結(jié)果。
斷路器配置:
resilience4j.circuitbreaker:
instances:
backendA:
// 健康指標參數(shù),非斷路器屬性
registerHealthIndicator: true
slidingWindowSize: 100| 配置屬性 | 默認值 | 描述 |
|---|---|---|
| slidingWindowSize | 100 | 記錄斷路器關(guān)閉狀態(tài)下(可以訪問的情況下)的調(diào)用的滑動窗口大小 |
| failureRateThreshold | 50(百分比) | 當失敗比例超過 failureRateThreshold 的時候,斷路器會打開,并開始短路呼叫 |
| slowCallDurationThreshold | 60000【ms】 | 請求被定義為慢請求的閾值 |
| slowCallRateThreshold | 100(百分比) | 慢請求百分比大于等于該值時,打開斷路器開關(guān) |
| permittedNumberOfCalls | 10 | 半開狀態(tài)下允許通過的請求數(shù) |
| maxWaitDurationInHalfOpenState | 0 | 配置最大等待持續(xù)時間,該持續(xù)時間控制斷路器在切換到打開之前可以保持在半開狀態(tài)的最長時間。 |
值 0 表示斷路器將在 HalfOpen 狀態(tài)下無限等待,直到所有允許的調(diào)用都已完成。
2.5 壁倉 bulkhead
resilience4j 提供了兩種實現(xiàn)壁倉的方法:
SemaphoreBulkhead使用 Semaphore 實現(xiàn)FixedThreadPoolBulkhead使用有界隊列和固定線程池實現(xiàn)
resilience4j.bulkhead:
instances:
backendA:
maxConcurrentCalls: 10
backendB:
maxWaitDuration: 10ms
maxConcurrentCalls: 20
resilience4j.thread-pool-bulkhead:
instances:
backendC:
maxThreadPoolSize: 1
coreThreadPoolSize: 1
queueCapacity: 12.5.1 SemaphoreBulkhead
| 配置屬性 | 默認值 | 描述 |
|---|---|---|
| maxConcurrentCalls | 25 | 允許的并發(fā)執(zhí)行的數(shù)量 |
| maxWaitDuration | 0 | 嘗試進入飽和隔板時線程應(yīng)被阻止的最長時間 |
2.5.2 FixedThreadPoolBulkhead
| 配置屬性 | 默認值 | 描述 |
|---|---|---|
| maxThreadPoolSize | Runtime.getRuntime().availableProcessors() | 線程池最大線程個數(shù) |
| coreThreadPoolSize | Runtime.getRuntime().availableProcessors()-1 | 線程池核心線程個數(shù) |
| queueCapacity | 100 | 線程池隊列容量 |
| keepAliveDuration | 20【ms】 | 線程數(shù)超過核心線程數(shù)之后,空余線程在終止之前等待的最長時間 |
3 使用
3.1 配置
在 application.yml 文件中添加以下 resilience4j 配置:
resilience4j.circuitbreaker:
instances:
backendA:
registerHealthIndicator: true
slidingWindowSize: 100
resilience4j.retry:
instances:
backendA:
maxAttempts: 3
waitDuration: 10s
enableExponentialBackoff: true
exponentialBackoffMultiplier: 2
retryExceptions:
- org.springframework.web.client.HttpServerErrorException
- java.io.IOException
backendB:
maxAttempts: 3
waitDuration: 10s
retryExceptions:
- org.springframework.web.client.HttpServerErrorException
- java.io.IOException
resilience4j.bulkhead:
instances:
backendA:
maxConcurrentCalls: 10
backendB:
maxWaitDuration: 10ms
maxConcurrentCalls: 20
resilience4j.thread-pool-bulkhead:
instances:
backendC:
maxThreadPoolSize: 1
coreThreadPoolSize: 1
queueCapacity: 1
resilience4j.ratelimiter:
instances:
backendA:
limitForPeriod: 10
limitRefreshPeriod: 1s
timeoutDuration: 10ms
registerHealthIndicator: true
eventConsumerBufferSize: 100
backendB:
limitForPeriod: 6
limitRefreshPeriod: 500ms
timeoutDuration: 3s
resilience4j.timelimiter:
instances:
backendA:
timeoutDuration: 2s
cancelRunningFuture: true
backendB:
timeoutDuration: 1s
cancelRunningFuture: false3.2 使用注解實現(xiàn)
直接在需要限流的方法上增加注解@RateLimiter 實現(xiàn)限流;增加注解@Retry 實現(xiàn)重試;增加注解 @CircuitBreaker 熔斷;增加注解 @Bulkhead 實現(xiàn)壁倉。name 屬性中分別填寫限流器、重試、熔斷、壁倉組件的名字。
@Bulkhead(name = "backendA")
@CircuitBreaker(name = "backendA")
@Retry(name = "backendA")
@RateLimiter(name = "backendA")
public Mono<List<User>> list() {
long startTime = System.currentTimeMillis();
return Mono.fromSupplier(() -> {
return userRepository.findAll();
}).doOnError(e -> {
// 打印異常日志&增加監(jiān)控(自行處理)
logger.error("list.user.error, e", e);
})
.doFinally(e -> {
// 耗時 & 整體健康
logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime);
});
}
@Bulkhead(name = "backendA")
@CircuitBreaker(name = "backendA")//最多支持10個并發(fā)量
@Retry(name = "backendA")//使用 backendA 重試器,如果拋出 IOException 會重試三次。
@RateLimiter(name = "backendA")// 限流 10 Qps
public Mono<Boolean> save(User user) {
long startTime = System.currentTimeMillis();
return Mono.fromSupplier(() -> {
return userRepository.save(user) != null;
})
.doOnError(e -> {
// 打印異常日志&增加監(jiān)控(自行處理)
logger.error("save.user.error, user={}, e", user, e);
})
.doFinally(e -> {
// 耗時 & 整體健康
logger.info("save.user.time={}, user={}", user, System.currentTimeMillis() - startTime);
});
}注意:以上所有組件,都支持自定義。
到此這篇關(guān)于Spring Boot Reactor 整合 Resilience4j詳析的文章就介紹到這了,更多相關(guān)Spring Boot Reactor 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
淺析Java數(shù)據(jù)庫操作工具包jOOQ的使用
jOOQ?是一個輕量級的?Java?ORM(對象關(guān)系映射)框架,可用來構(gòu)建復(fù)雜的?SQL?查詢,這篇文章主要來和大家介紹一下jOOQ的使用,需要的可以參考下2024-04-04
Spring Security 中細化權(quán)限粒度的方法
這篇文章主要介紹了Spring Security 中細化權(quán)限粒度的方法,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-09-09
Java并發(fā)系列之CyclicBarrier源碼分析
這篇文章主要為大家詳細分析了Java并發(fā)系列之CyclicBarrier源碼,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-03-03
java中Pulsar?InterruptedException?異常
這篇文章主要為大家介紹了java中Pulsar?InterruptedException?異常分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-02-02
springboot之SpringApplication生命周期和事件機制解讀
這篇文章主要介紹了springboot之SpringApplication生命周期和事件機制,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-06-06
Spring中的SpringApplicationRunListener詳細解析
這篇文章主要介紹了Spring中的SpringApplicationRunListener詳細解析,SpringApplicationRunListener是一個監(jiān)聽SpringApplication中run方法的接口,在項目啟動過程的各個階段進行事件的發(fā)布,需要的朋友可以參考下2023-11-11

