SpringBoot基于RabbitMQ實現(xiàn)消息可靠性的方法
1.概述
消息從發(fā)送到消費者接收 會經(jīng)歷的過程如下:
丟失消息的可能性
- 發(fā)送時丟失:
- 生產(chǎn)者發(fā)送的消息未送達exchange
- 消息到達exchange后未到達queue
- MQ宕機,queue將消息丟失
- consumer接收到消息后未消費就宕機
針對這些問題,RabbitMQ分別給出了解決方案
- 生產(chǎn)者確認機制
- mq持久化
- 消費者確認機制
- 失敗重試機制
2. 生產(chǎn)者消息確認
2.1 概述
RabbitMQ 提供了 publisher confirm
機制來避免消息發(fā)送到 MQ 過程中丟失。這種機制必須給每個消息指定一個唯一ID。消息發(fā)送到MQ以后,會返回一個結(jié)果給發(fā)送者,表示消息是否處理成功。
返回結(jié)果有兩種方式:
- publisher-confirm,發(fā)送者確認
- 消息成功投遞到交換機,返回ack
- 消息未投遞到交換機,返回nack
- publisher-return,發(fā)送者回執(zhí)
- 消息投遞到交換機了,但是沒有路由到隊列。返回ACK,及路由失敗原因。
2.2 實戰(zhàn)
2.2.1 修改配置
spring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true template: mandatory: true
配置說明:
publish-confirm-type
:開啟publisher-confirm,這里支持兩種類型:simple
:同步等待confirm結(jié)果,直到超時correlated
:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時會回調(diào)這個ConfirmCallback
publish-returns
:開啟publish-return功能,同樣是基于callback機制,不過是定義ReturnCallbacktemplate.mandatory
:定義消息路由失敗時的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息
2.2.2 定義 Return 回調(diào)
每個RabbitTemplate只能配置一個ReturnCallback,因此需要在項目加載時配置:
修改publisher服務(wù),添加一個:
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 獲取RabbitTemplate RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 設(shè)置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 投遞失敗,記錄日志 log.info("消息發(fā)送失敗,應(yīng)答碼{},原因{},交換機{},路由鍵{},消息{}", replyCode, replyText, exchange, routingKey, message.toString()); // 如果有業(yè)務(wù)需要,可以重發(fā)消息 }); } }
2.2.3 定義ConfirmCallback
ConfirmCallback 可以在發(fā)送消息時指定,因為每個業(yè)務(wù)處理 confirm 成功或失敗的邏輯不一定相同。
public void testSendMessage2SimpleQueue() throws InterruptedException { // 1.消息體 String message = "hello, spring amqp!"; // 2.全局唯一的消息ID,需要封裝到 CorrelationData 中 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 3.添加callback correlationData.getFuture().addCallback( result -> { if(result.isAck()){ // 3.1.ack,消息成功 log.debug("消息發(fā)送成功, ID:{}", correlationData.getId()); }else{ // 3.2.nack,消息失敗 log.error("消息發(fā)送失敗, ID:{}, 原因{}",correlationData.getId(), result.getReason()); } }, ex -> log.error("消息發(fā)送異常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()) ); // 4.發(fā)送消息 rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData); // 休眠一會兒,等待ack回執(zhí) //Thread.sleep(20); }
3. 消息持久化
生產(chǎn)者確認可以確保消息投遞到 RabbitMQ 的隊列中,但是消息發(fā)送到 RabbitMQ 以后,如果突然宕機,也可能導(dǎo)致消息丟失。
要想確保消息在RabbitMQ中安全保存,必須開啟消息持久化機制。
- 交換機持久化
- 隊列持久化
- 消息持久化
3.1 交換機持久化
@Bean public DirectExchange simpleExchange(){ // 三個參數(shù):①交換機名稱、②是否持久化、③當沒有queue與其綁定時是否自動刪除 return new DirectExchange("simple.direct", true, false); }
事實上,默認情況下,由SpringAMQP聲明的交換機都是持久化的。
3.2 隊列持久化
@Bean public Queue simpleQueue(){ // 使用QueueBuilder構(gòu)建隊列,durable就是持久化的 return QueueBuilder.durable("simple.queue").build(); }
事實上,默認情況下,由SpringAMQP聲明的隊列都是持久化的。
3.3 消息持久化
默認情況下,SpringAMQP 交換機 隊列 以及發(fā)出的任何消息都是持久化的,不用特意指定。
4. 消費者消息確認
RabbitMQ 是 閱后即焚 機制,RabbitMQ 確認消息被消費者消費后會立刻刪除。
而 RabbitMQ 是通過 消費者回執(zhí) 來確認消費者是否成功處理消息的:消費者獲取消息后,應(yīng)該向 RabbitMQ 發(fā)送 ACK 回執(zhí),表明自己已經(jīng)處理消息。
設(shè)想這樣的場景:
- 1)RabbitMQ投遞消息給消費者
- 2)消費者獲取消息后,返回ACK給RabbitMQ
- 3)RabbitMQ刪除消息
- 4)消費者宕機,消息尚未處理
這樣,消息就丟失了。因此消費者返回ACK的時機非常重要。
4.1 三種確認模式
而 SpringAMQP 則允許配置三種確認模式:
•manual:手動ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack。
•auto:自動ack,由spring監(jiān)測listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack。
•none:關(guān)閉ack,MQ假定消費者獲取消息后會成功處理,因此消息投遞后立即被刪除(存在丟失消息的風險)。
由此可知:
- none 模式下,消息投遞是不可靠的,可能丟失
- auto 模式類似事務(wù)機制,出現(xiàn)異常時返回nack,消息回滾到mq;沒有異常,返回ack
- manual:自己根據(jù)業(yè)務(wù)情況,判斷什么時候該ack
一般,我們都是使用默認的 auto 即可。
相關(guān)配置:
spring: rabbitmq: listener: simple: #acknowledge-mode: none # 關(guān)閉ack #acknowledge-mode: manual # 手動ack acknowledge-mode: auto # 自動ack
4.2 消息失敗重試機制
當消費者出現(xiàn)異常后,消息會不斷 requeue(重入隊)到隊列,再重新發(fā)送給消費者,然后再次異常,再次 requeue,無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力:怎么辦呢?
4.2.1 本地重試機制
我們可以利用 Spring 的 retry 機制,在消費者出現(xiàn)異常時利用 本地重試,而不是無限制的 requeue 到 mq 隊列。
修改 consumer 服務(wù)的 application.yml 文件,添加內(nèi)容:
spring: rabbitmq: listener: simple: retry: enabled: true # 開啟消費者失敗重試 initial-interval: 1000 # 初始的失敗等待時長為1秒 multiplier: 1 # 失敗的等待時長倍數(shù),下次等待時長 = multiplier * last-interval max-attempts: 3 # 最大重試次數(shù) stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
重啟consumer服務(wù),重復(fù)之前的測試??梢园l(fā)現(xiàn):
- 在重試3次后,SpringAMQP 會拋出異常
AmqpRejectAndDontRequeueException
,說明本地重試觸發(fā)了。 - 查看 RabbitMQ 控制臺,發(fā)現(xiàn)消息被刪除了,說明最后 SpringAMQP 返回的是ack,mq刪除消息了。
結(jié)論
- 開啟本地重試時,消息處理過程中拋出異常,不會 requeue 到隊列,而是在消費者本地重試
- 重試達到最大次數(shù)后,Spring 會返回 ack,消息會被丟棄。
4.2.2 失敗策略
在之前的測試中,達到最大重試次數(shù)后,消息會被丟棄,這是由 Spring 內(nèi)部機制決定的。
在開啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有 MessageRecovery
接口來處理,它包含三種不同的實現(xiàn):
RejectAndDontRequeueRecoverer
:重試耗盡后,直接 reject,丟棄消息。默認就是這種方式ImmediateRequeueMessageRecoverer
:重試耗盡后,返回 nack,消息重新入隊RepublishMessageRecoverer
:重試耗盡后,將失敗消息投遞到指定的交換機
比較優(yōu)雅的一種處理方案是RepublishMessageRecoverer
,失敗后將消息投遞到一個指定的,專門存放異常消息的隊列,后續(xù)由人工集中處理。
1)在consumer服務(wù)中定義處理失敗消息的交換機和隊列
2)定義一個RepublishMessageRecoverer,關(guān)聯(lián)隊列和交換機
代碼如下:
@Configuration public class ErrorMessageConfig { @Bean // 處理失敗消息的交換機 public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean // 處理失敗消息的隊列 public Queue errorQueue(){ return new Queue("error.queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); } @Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); } }
其實 我們在生產(chǎn)中會指定死信交換機來處理失敗的消息
5. 總結(jié)
如何確保RabbitMQ消息的可靠性?
- 開啟生產(chǎn)者確認機制,確保生產(chǎn)者的消息能到達隊列
- 開啟持久化功能,確保消息未消費前在隊列中不會丟失
- 開啟消費者確認機制為auto,由spring確認消息處理成功后完成ack
- 開啟消費者失敗重試機制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機,交由人工處理
以上就是SpringBoot基于RabbitMQ實現(xiàn)消息可靠性的方法的詳細內(nèi)容,更多關(guān)于SpringBoot RabbitMQ消息可靠性的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringMVC中的@RequestMapping注解的使用詳細教程
@RequestMapping注解的作用就是將請求和處理請求的控制器方法關(guān)聯(lián)起來,建立映射關(guān)系,本文主要來和大家詳細講講它的具體使用,感興趣的可以了解一下2023-07-07出現(xiàn)java.lang.NoSuchMethodException異常的解決(靠譜)
這篇文章主要介紹了出現(xiàn)java.lang.NoSuchMethodException異常的解決方案(靠譜),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-03-03SpringBoot 創(chuàng)建容器的實現(xiàn)
這篇文章主要介紹了SpringBoot 創(chuàng)建容器的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-10-10