redis在spring boot中異常退出的問題解決方案
問題:
Exception in thread "rtsp-consumer-3" org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis; nested exception is io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.translateException(LettuceConnectionFactory.java:1689)
at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.getConnection(LettuceConnectionFactory.java:1597)
at org.springframework.data.redis.connection.lettuce.LettuceConnection.doGetAsyncDedicatedConnection(LettuceConnection.java:1006)
at org.springframework.data.redis.connection.lettuce.LettuceConnection.getOrCreateDedicatedConnection(LettuceConnection.java:1069)
at org.springframework.data.redis.connection.lettuce.LettuceConnection.getAsyncDedicatedConnection(LettuceConnection.java:990)
at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.getAsyncDedicatedConnection(LettuceStreamCommands.java:395)
at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:346)
at org.springframework.data.redis.connection.DefaultedRedisConnection.xReadGroup(DefaultedRedisConnection.java:592)
at org.springframework.data.redis.core.DefaultStreamOperations$4.inRedis(DefaultStreamOperations.java:310)
at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:387)
at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:382)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:222)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:189)
at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96)
at org.springframework.data.redis.core.DefaultStreamOperations.read(DefaultStreamOperations.java:305)
at com.ruoyi.vedioFrame.utils.RedisStreamOperations.readGroup(RedisStreamOperations.java:70)
at com.ruoyi.vedioFrame.service.impl.StreamConsumerService.consumeFrames(StreamConsumerService.java:200)
at com.ruoyi.vedioFrame.service.impl.StreamConsumerService.lambda$null$0(StreamConsumerService.java:108)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:78)
at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:56)
at io.lettuce.core.AbstractRedisClient.getConnection(AbstractRedisClient.java:330)
at io.lettuce.core.RedisClient.connect(RedisClient.java:216)
at org.springframework.data.redis.connection.lettuce.StandaloneConnectionProvider.lambda$getConnection$1(StandaloneConnectionProvider.java:115)
at java.util.Optional.orElseGet(Optional.java:267)
at org.springframework.data.redis.connection.lettuce.StandaloneConnectionProvider.getConnection(StandaloneConnectionProvider.java:115)
at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.getConnection(LettuceConnectionFactory.java:1595)
... 19 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: java.net.BindException: Address already in use: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
解決
根據(jù)代碼和現(xiàn)象分析,根本原因可能是處理時(shí)間過長導(dǎo)致Redis消息積壓觸發(fā)超時(shí)異常。以下是詳細(xì)分析和解決方案:
?? 問題根源
同步阻塞模型:
frameProcessor.process()調(diào)用外部接口時(shí)存在網(wǎng)絡(luò)延遲,導(dǎo)致消息處理時(shí)間超過Redis的BLOCK超時(shí)(2000ms)。- 未及時(shí)ACK會導(dǎo)致消息重新進(jìn)入Pending狀態(tài),積累后觸發(fā)
READONLY錯(cuò)誤或消費(fèi)者組失效。
Thread.sleep(1000)加劇阻塞:
- 即使沒有消息,每次循環(huán)強(qiáng)制休眠1秒,降低消費(fèi)吞吐量,無法及時(shí)處理積壓消息。
異常未妥善處理:
- 注釋掉的try-catch導(dǎo)致處理過程中的異常(如網(wǎng)絡(luò)超時(shí))未被捕獲,中斷循環(huán)后Redis連接未正確恢復(fù)。
??? 解決方案
1. 異步處理 + 提前ACK(關(guān)鍵步驟)
將消息處理與ACK分離,處理前立即ACK避免消息積壓,異步處理業(yè)務(wù)邏輯:
ExecutorService executor = Executors.newCachedThreadPool(); // 全局線程池
while (!Thread.currentThread().isInterrupted()) {
List<MapRecord<String, Object, Object>> records = redisStreamOperations.readGroup(...);
if (records != null) {
for (MapRecord<String, Object, Object> record : records) {
// 提前ACK釋放消息
redisStreamOperations.ack(streamKey, groupName, record.getId());
// 提交異步任務(wù)處理業(yè)務(wù)
executor.submit(() -> {
try {
String strdata = (String) record.getValue().get("frameData");
byte[] frameData = Base64.getDecoder().decode(strdata);
BufferedImage image = ImageIO.read(new ByteArrayInputStream(frameData));
frameProcessor.process(image, streamId, commonQueues, regions);
} catch (Exception e) {
log.error("處理幀失敗: {}", e.getMessage());
}
});
}
}
// 移除Thread.sleep(1000) 提升吞吐量
}2. 調(diào)整Redis消費(fèi)者組參數(shù)
增加XGROUP CREATE時(shí)的BLOCK和IDLE超時(shí):
XGROUP CREATE stream:1234 mygroup $ MKSTREAM BLOCK 5000 IDLE 30000
3. 優(yōu)化Redis連接配置
確保RedisTemplate的讀寫超時(shí)大于業(yè)務(wù)處理最長時(shí)間:
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory());
// 設(shè)置超時(shí)時(shí)間(示例:10秒)
RedisConnectionFactory factory = template.getConnectionFactory();
if (factory instanceof LettuceConnectionFactory) {
((LettuceConnectionFactory) factory).setTimeout(Duration.ofSeconds(10));
}
return template;
}4. 監(jiān)控Pending隊(duì)列
定期檢查Pending消息數(shù)量,異常時(shí)重置消費(fèi)者:
// 在循環(huán)中添加檢查
Long pending = redisTemplate.opsForStream().pending(streamKey, groupName);
if (pending > 1000) { // 閾值根據(jù)業(yè)務(wù)設(shè)定
redisTemplate.opsForStream().destroyGroup(streamKey, groupName);
redisTemplate.opsForStream().createGroup(streamKey, groupName);
}5. 添加熔斷機(jī)制
當(dāng)外部接口連續(xù)失敗時(shí),暫停消費(fèi)避免雪崩:
// 全局熔斷器
CircuitBreaker breaker = CircuitBreaker.ofDefaults("frameProcessor");
// 處理邏輯改為
breaker.executeSupplier(() -> {
frameProcessor.process(...);
return null;
});?? 驗(yàn)證方案
日志監(jiān)控:
// 在異步任務(wù)中添加耗時(shí)日志
long start = System.currentTimeMillis();
frameProcessor.process(...);
log.info("處理耗時(shí): {}ms", System.currentTimeMillis() - start);確保單次處理時(shí)間低于Redis超時(shí)閾值。
Redis CLI檢查:
XINFO GROUPS stream:1234 # 查看消費(fèi)者組狀態(tài) XPENDING stream:1234 mygroup # 查看Pending消息
?? 關(guān)鍵優(yōu)化點(diǎn)總結(jié)
| 措施 | 作用 |
|---|---|
| 異步處理 + 提前ACK | 徹底解耦處理耗時(shí)與Redis消費(fèi) |
| 增大Redis超時(shí)參數(shù) | 避免服務(wù)端主動斷開連接 |
| 線程池隔離 | 防止外部接口故障拖垮整個(gè)消費(fèi)服務(wù) |
| 熔斷機(jī)制 | 快速失敗,保護(hù)系統(tǒng)穩(wěn)定性 |
通過以上改造,即使frameProcessor.process()調(diào)用外部平臺接口,也能通過異步化和資源隔離保證Redis消費(fèi)鏈路的穩(wěn)定性。最終只需確保線程池和熔斷參數(shù)適配業(yè)務(wù)負(fù)載即可。
代碼具體添加位置:
以下是針對 優(yōu)化Redis連接配置、監(jiān)控Pending隊(duì)列 和 添加熔斷機(jī)制 的代碼具體添加位置的詳細(xì)說明:
一、優(yōu)化Redis連接配置
位置:Redis配置類
在Spring Boot的Redis配置類中調(diào)整超時(shí)參數(shù)(通常為RedisConfig.java):
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
// 設(shè)置連接超時(shí)和讀寫超時(shí)(關(guān)鍵參數(shù))
lettuceConnectionFactory.setTimeout(Duration.ofSeconds(10)); // 命令超時(shí)時(shí)間
lettuceConnectionFactory.setShareNativeConnection(false); // 禁用共享連接,避免阻塞
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(lettuceConnectionFactory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return template;
}
}關(guān)鍵參數(shù)說明:
setTimeout(10秒):確保超時(shí)時(shí)間大于frameProcessor.process()的最長處理時(shí)間setShareNativeConnection(false):避免多個(gè)線程共享同一個(gè)連接導(dǎo)致阻塞。
二、監(jiān)控Pending隊(duì)列
位置:consumeFrames方法內(nèi)的循環(huán)中
在消費(fèi)消息的主循環(huán)中定期檢查Pending隊(duì)列:
private void consumeFrames(String streamId, String groupName, String consumerName,
CommonQueues commonQueues, String regions) throws InterruptedException, IOException {
// ... 其他初始化代碼 ...
int checkPendingInterval = 10; // 每處理10次循環(huán)檢查一次Pending隊(duì)列
int loopCount = 0;
while (!Thread.currentThread().isInterrupted()) {
// ... 原有代碼讀取消息 ...
// 監(jiān)控Pending隊(duì)列的邏輯(添加位置)
loopCount++;
if (loopCount % checkPendingInterval == 0) {
String streamKey = "stream:" + streamId;
PendingMessages pending = redisStreamOperations.pending(streamKey, groupName);
if (pending != null && pending.getTotalPendingMessages() > 1000) { // 閾值根據(jù)業(yè)務(wù)調(diào)整
log.warn("檢測到Pending消息積壓 {} 條,重置消費(fèi)者組", pending.getTotalPendingMessages());
redisStreamOperations.destroyGroup(streamKey, groupName);
redisStreamOperations.createGroup(StreamKey.of(streamKey), groupName);
}
}
// ... 后續(xù)處理代碼 ...
}
}說明:
- 通過
redisStreamOperations.pending()獲取當(dāng)前Pending消息數(shù)。 - 當(dāng)Pending消息超過閾值時(shí),強(qiáng)制銷毀并重建消費(fèi)者組,避免消息卡死。
三、添加熔斷機(jī)制
位置:處理消息的業(yè)務(wù)邏輯外層
使用Resilience4j熔斷器包裹frameProcessor.process()調(diào)用:
1. 熔斷器配置類
@Configuration
public class CircuitBreakerConfig {
@Bean
public CircuitBreaker frameProcessorCircuitBreaker() {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50) // 失敗率閾值50%
.slidingWindowType(SlidingWindowType.COUNT_BASED)
.slidingWindowSize(10) // 基于最近10次調(diào)用統(tǒng)計(jì)
.minimumNumberOfCalls(5) // 最少5次調(diào)用后開始計(jì)算
.waitDurationInOpenState(Duration.ofSeconds(30)) // 熔斷后30秒進(jìn)入半開狀態(tài)
.build();
return CircuitBreakerRegistry.of(config).circuitBreaker("frameProcessor");
}
}2. 在消費(fèi)代碼中使用熔斷器
public class YourConsumerClass {
@Autowired
private CircuitBreaker frameProcessorCircuitBreaker; // 注入熔斷器
private void consumeFrames(...) {
// ... 原有代碼 ...
for (MapRecord<String, Object, Object> record : records) {
redisStreamOperations.ack(...); // 提前ACK
// 使用熔斷器保護(hù)處理邏輯(添加位置)
Try.runRunnable(() -> frameProcessorCircuitBreaker.executeRunnable(() -> {
String strdata = (String) record.getValue().get("frameData");
byte[] frameData = Base64.getDecoder().decode(strdata);
BufferedImage image = ImageIO.read(new ByteArrayInputStream(frameData));
frameProcessor.process(image, streamId, commonQueues, regions);
})).onFailure(e -> log.error("處理失敗且熔斷: {}", e.getMessage()));
}
// ... 后續(xù)代碼 ...
}
}熔斷邏輯說明:
- 當(dāng)
frameProcessor.process()連續(xù)失敗觸發(fā)閾值時(shí),熔斷器會暫時(shí)阻止后續(xù)調(diào)用,避免雪崩效應(yīng)。 - 熔斷期間直接跳過處理,但仍會ACK消息(根據(jù)業(yè)務(wù)需求選擇是否重試)。
四、代碼集成位置總結(jié)
| 優(yōu)化措施 | 代碼位置 | 關(guān)鍵注解 |
|---|---|---|
| Redis連接配置 | Redis配置類(如RedisConfig.java) | 調(diào)整超時(shí)時(shí)間和連接池參數(shù) |
| Pending隊(duì)列監(jiān)控 | consumeFrames方法的主循環(huán)內(nèi) | 定期檢查+自動重置消費(fèi)者組 |
| 熔斷機(jī)制 | 業(yè)務(wù)處理代碼外層(包裹frameProcessor.process) | 依賴熔斷器庫(如Resilience4j) |
五、參數(shù)調(diào)整建議
Redis超時(shí):
lettuceConnectionFactory.setTimeout應(yīng)大于frameProcessor.process()的最大處理時(shí)間 + 網(wǎng)絡(luò)抖動余量(如設(shè)置為實(shí)際最大處理時(shí)間的2倍)。
Pending隊(duì)列閾值:
- 如果每秒處理100條消息,閾值可設(shè)置為
1000(相當(dāng)于10秒積壓量)。
熔斷器參數(shù):
failureRateThreshold:根據(jù)外部接口的穩(wěn)定性調(diào)整(如頻繁超時(shí)可設(shè)為70%)。waitDurationInOpenState:根據(jù)外部服務(wù)恢復(fù)時(shí)間調(diào)整(如30秒到5分鐘)。
通過以上改造,即使frameProcessor.process()調(diào)用外部平臺接口,也能通過資源隔離、快速失敗和自動恢復(fù)機(jī)制保障Redis消費(fèi)鏈路的穩(wěn)定性。
到此這篇關(guān)于redis在spring boot中異常退出的文章就介紹到這了,更多相關(guān)redis spring boot異常退出內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
如何利用 Redis 實(shí)現(xiàn)接口頻次限制
這篇文章主要介紹了如何利用 Redis 實(shí)現(xiàn)接口頻次限制,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-02-02
緩存替換策略及應(yīng)用(以Redis、InnoDB為例)
本文以Redis、InnoDB為例給大家講解緩存替換策略及應(yīng)用,本文給大家提到五種置換策略,通過實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友參考下吧2021-07-07
Redis實(shí)現(xiàn)延遲隊(duì)列的項(xiàng)目示例
延遲隊(duì)列是Redis的一個(gè)重要應(yīng)用場景,本文主要介紹了Redis實(shí)現(xiàn)延遲隊(duì)列的項(xiàng)目示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-06-06
Redis中ServiceStack.Redis和StackExchange.Redis區(qū)別詳解
本文主要介紹了Redis中ServiceStack.Redis和StackExchange.Redis區(qū)別詳解,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-05-05
Redis Caffeine實(shí)現(xiàn)兩級緩存的項(xiàng)目實(shí)踐
本文介紹了使用Redis和Caffeine實(shí)現(xiàn)兩級緩存,以提高查詢接口的性能,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-12-12

