RabbitMQ 如何解決消息冪等性的問題
前言
關(guān)于MQ消費(fèi)者的冪等性問題,在于MQ的重試機(jī)制,因?yàn)榫W(wǎng)絡(luò)原因或客戶端延遲消費(fèi)導(dǎo)致重復(fù)消費(fèi)。使用MQ重試機(jī)制需要注意的事項(xiàng)以及如何解決消費(fèi)者冪等性問題以下將逐一講解。
1. RabbitMQ自動重試機(jī)制
消費(fèi)者在消費(fèi)消息的時(shí)候,如果消費(fèi)者業(yè)務(wù)邏輯出現(xiàn)程序異常,這個(gè)時(shí)候我們?nèi)绾翁幚恚?/p>
使用重試機(jī)制,RabbitMQ默認(rèn)開啟重試機(jī)制。
實(shí)現(xiàn)原理:
- @RabbitHandler注解 底層使用Aop攔截,如果程序(消費(fèi)者)沒有拋出異常,自動提交事務(wù)
- 如果Aop使用異常通知攔截獲取到異常后,自動實(shí)現(xiàn)補(bǔ)償機(jī)制,消息緩存在RabbitMQ服務(wù)器端
注意:
- 默認(rèn)會一直重試到消費(fèi)者不拋異常為止,這樣顯然不好。我們需要修改重試機(jī)制策略,如間隔3s重試一次)
配置:
spring: rabbitmq: # 連接地址 host: 127.0.0.1 # 端口號 port: 5672 # 賬號 username: guest # 密碼 password: guest # 地址(類似于數(shù)據(jù)庫的概念) virtual-host: /admin_vhost # 消費(fèi)者監(jiān)聽相關(guān)配置 listener: simple: retry: # 開啟消費(fèi)者(程序出現(xiàn)異常)重試機(jī)制,默認(rèn)開啟并一直重試 enabled: true # 最大重試次數(shù) max-attempts: 5 # 重試間隔時(shí)間(毫秒) initial-interval: 3000
2. 如何合理選擇重試機(jī)制?
情況1: 消費(fèi)者獲取到消息后,調(diào)用第三方接口,但接口暫時(shí)無法訪問,是否需要重試? 需要重試,可能是因?yàn)榫W(wǎng)絡(luò)原因短暫不能訪問
情況2: 消費(fèi)者獲取到消息后,拋出數(shù)據(jù)轉(zhuǎn)換異常,是否需要重試? 不需要重試,因?yàn)閷儆诔绦騜ug需要重新發(fā)布版本
總結(jié):對于情況2,如果消費(fèi)者代碼拋出異常是需要發(fā)布新版本才能解決的問題,那么不需要重試,重試也無濟(jì)于事。應(yīng)該采用日志記錄+定時(shí)任務(wù)job進(jìn)行健康檢查+人工進(jìn)行補(bǔ)償
3. 調(diào)用第三方接口自動實(shí)現(xiàn)補(bǔ)償機(jī)制
我們知道了,RabbitMQ在消費(fèi)者消費(fèi)發(fā)生異常時(shí),會自動進(jìn)行補(bǔ)償機(jī)制,所以我們(消費(fèi)者)在調(diào)用第三方接口時(shí),可以根據(jù)返回結(jié)果判斷是否成功:
- 成功:正常消費(fèi)
- 失?。菏謩訏佁幰粋€(gè)異常,這時(shí)RabbitMQ自動給我們做重試 (補(bǔ)償)。
4. 如何解決消費(fèi)者冪等性問題
防止重復(fù)消費(fèi) (MQ重試機(jī)制需要注意的問題)
產(chǎn)生原因:網(wǎng)絡(luò)延遲傳輸中,消費(fèi)者出現(xiàn)異常或者消費(fèi)者延遲消費(fèi),會造成進(jìn)行MQ重試補(bǔ)償,在重試過程中,可能會造成重復(fù)消費(fèi)。
面試題:MQ中消費(fèi)者如何保證冪等性問題,不被重復(fù)消費(fèi)?
偽代碼:
生產(chǎn)者核心代碼:
請求頭設(shè)置消息id(messageId)
@Component public class FanoutProducer { @Autowired private AmqpTemplate amqpTemplate; public void send(String queueName) { String msg = "my_fanout_msg:" + System.currentTimeMillis(); //請求頭設(shè)置消息id(messageId) Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON) .setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build(); System.out.println(msg + ":" + msg); amqpTemplate.convertAndSend(queueName, message); } }
消費(fèi)者核心代碼:
@RabbitListener(queues = "fanout_email_queue") public void process(Message message) throws Exception { // 獲取消息Id String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); //② 判斷唯一Id是否被消費(fèi),消息消費(fèi)成功后將id和狀態(tài)保存在日志表中,我們從(①步驟)表中獲取并判斷messageId的狀態(tài)即可 //從redis中獲取messageId的value String value = redisUtils.get(messageId)+""; if(value.equals("1") ){ //表示已經(jīng)消費(fèi) return; //結(jié)束 } System.out.println("郵件消費(fèi)者獲取生產(chǎn)者消息" + "messageId:" + messageId + ",消息內(nèi)容:" + msg); JSONObject jsonObject = JSONObject.parseObject(msg); // 獲取email參數(shù) String email = jsonObject.getString("email"); // 請求地址 String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email; JSONObject result = HttpClientUtils.httpGet(emailUrl); if (result == null) { // 因?yàn)榫W(wǎng)絡(luò)原因,造成無法訪問,繼續(xù)重試 throw new Exception("調(diào)用接口失敗!"); } System.out.println("執(zhí)行結(jié)束...."); //① 執(zhí)行到這里已經(jīng)消費(fèi)成功,我們可以修改messageId的狀態(tài),并存入日志表(可以存到redis中,key為消息Id、value為狀態(tài)) }
5. SpringBoot整合RabbitMQ應(yīng)答模式(ACK)
1.修改配置simple下添加 acknowledge-mode: manual:
spring: rabbitmq: # 連接地址 host: 127.0.0.1 # 端口號 port: 5672 # 賬號 username: guest # 密碼 password: guest # 地址(類似于數(shù)據(jù)庫的概念) virtual-host: /admin_vhost # 消費(fèi)者監(jiān)聽相關(guān)配置 listener: simple: retry: # 開啟消費(fèi)者(程序出現(xiàn)異常)重試機(jī)制,默認(rèn)開啟并一直重試 enabled: true # 最大重試次數(shù) max-attempts: 5 # 重試間隔時(shí)間(毫秒) initial-interval: 3000 # 開啟手動ack acknowledge-mode: manual
2.消費(fèi)者增加代碼:
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); 手動ack channel.basicAck(deliveryTag, false);手動簽收
//郵件隊(duì)列 @Component public class FanoutEamilConsumer { @RabbitListener(queues = "fanout_email_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { System.out .println(Thread.currentThread().getName() + ",郵件消費(fèi)者獲取生產(chǎn)者消息msg:" + new String(message.getBody(), "UTF-8") + ",messageId:" + message.getMessageProperties().getMessageId()); // 手動ack Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手動簽收 channel.basicAck(deliveryTag, false); } }
RabbitMQ 如何保證冪等性,數(shù)據(jù)一致性
mq的作用主要是用來解耦,削峰,異步,
增加MQ,系統(tǒng)的復(fù)雜性也會增加很多,
也會帶來其他的問題,比如MQ掛了怎么辦,怎么保持?jǐn)?shù)據(jù)的冪等性
冪等性問題通俗點(diǎn)講就是保證數(shù)據(jù)不被重復(fù)消費(fèi),同時(shí)數(shù)據(jù)也不能少,
也就是數(shù)據(jù)一致性問題。
下面是MQ丟失的3種情況
1,生產(chǎn)者發(fā)送消息至MQ的數(shù)據(jù)丟失
解決方法:在生產(chǎn)者端開啟comfirm 確認(rèn)模式,你每次寫的消息都會分配一個(gè)唯一的 id,
然后如果寫入了 RabbitMQ 中,RabbitMQ 會給你回傳一個(gè) ack 消息,告訴你說這個(gè)消息 ok 了
2,MQ收到消息,暫存內(nèi)存中,還沒消費(fèi),自己掛掉,數(shù)據(jù)會都丟失
解決方式:MQ設(shè)置為持久化。將內(nèi)存數(shù)據(jù)持久化到磁盤中
3,消費(fèi)者剛拿到消息,還沒處理,掛掉了,MQ又以為消費(fèi)者處理完
解決方式:用 RabbitMQ 提供的 ack 機(jī)制,簡單來說,就是你必須關(guān)閉 RabbitMQ 的自動 ack,可以通過一個(gè) api 來調(diào)用就行,然后每次你自己代碼里確保處理完的時(shí)候,再在程序里 ack 一把。這樣的話,如果你還沒處理完,不就沒有 ack 了?那 RabbitMQ 就認(rèn)為你還沒處理完,這個(gè)時(shí)候 RabbitMQ 會把這個(gè)消費(fèi)分配給別的 consumer 去處理,消息是不會丟的。
數(shù)據(jù)重復(fù)的問題簡單的多,就是在消費(fèi)端判斷數(shù)據(jù)是否已經(jīng)被消費(fèi)過
- 比如你拿個(gè)數(shù)據(jù)要寫庫,你先根據(jù)主鍵查一下,如果這數(shù)據(jù)都有了,你就別插入了,update 一下好吧。
- 比如你是寫 Redis,那沒問題了,反正每次都是 set,天然冪等性。
- 比如你不是上面兩個(gè)場景,那做的稍微復(fù)雜一點(diǎn),你需要讓生產(chǎn)者發(fā)送每條數(shù)據(jù)的時(shí)候,里面加一個(gè)全局唯一的 id,類似訂單 id 之類的東西,然后你這里消費(fèi)到了之后,先根據(jù)這個(gè) id 去比如 Redis 里查一下,之前消費(fèi)過嗎?如果沒有消費(fèi)過,你就處理,然后這個(gè) id 寫 Redis。如果消費(fèi)過了,那你就別處理了,保證別重復(fù)處理相同的消息即可。
- 比如基于數(shù)據(jù)庫的唯一鍵來保證重復(fù)數(shù)據(jù)不會重復(fù)插入多條。因?yàn)橛形ㄒ绘I約束了,重復(fù)數(shù)據(jù)插入只會報(bào)錯(cuò),不會導(dǎo)致數(shù)據(jù)庫中出現(xiàn)臟數(shù)據(jù)。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Spring Cloud Zuul路由網(wǎng)關(guān)服務(wù)過濾實(shí)現(xiàn)代碼
這篇文章主要介紹了Spring Cloud Zuul路由網(wǎng)關(guān)服務(wù)過濾實(shí)現(xiàn)代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04Java中使用同步回調(diào)和異步回調(diào)的示例詳解
這篇文章主要介紹了Java中使用同步回調(diào)和異步回調(diào)的相關(guān)資料,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-04-04Java如何將Excel數(shù)據(jù)導(dǎo)入到數(shù)據(jù)庫
這篇文章主要為大家詳細(xì)介紹了Java將Excel數(shù)據(jù)導(dǎo)入到數(shù)據(jù)庫的方法,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-10-10SpringBoot中分頁插件PageHelper的使用詳解
分頁查詢是為了高效展示大量數(shù)據(jù),通過分頁將數(shù)據(jù)劃分為多個(gè)部分逐頁展示,原生方法需手動計(jì)算數(shù)據(jù)起始行,而使用PageHelper插件則簡化這一過程,本文給大家介紹SpringBoot中分頁插件PageHelper的使用,感興趣的朋友一起看看吧2024-09-09java實(shí)現(xiàn)利用String類的簡單方法讀取xml文件中某個(gè)標(biāo)簽中的內(nèi)容
下面小編就為大家?guī)硪黄猨ava實(shí)現(xiàn)利用String類的簡單方法讀取xml文件中某個(gè)標(biāo)簽中的內(nèi)容。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-12-12hibernate一對多關(guān)聯(lián)映射學(xué)習(xí)小結(jié)
這篇文章主要介紹了hibernate一對多關(guān)聯(lián)映射學(xué)習(xí)小結(jié),需要的朋友可以參考下2017-09-09java實(shí)現(xiàn)支付寶支付接口的調(diào)用
本文主要介紹了java實(shí)現(xiàn)支付寶支付接口的調(diào)用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07Java創(chuàng)建型設(shè)計(jì)模式之工廠方法模式深入詳解
工廠方法模式(FACTORY METHOD)是一種常用的類創(chuàng)建型設(shè)計(jì)模式,此模式的核心精神是封裝類中變化的部分,提取其中個(gè)性化善變的部分為獨(dú)立類,通過依賴注入以達(dá)到解耦、復(fù)用和方便后期維護(hù)拓展的目的。它的核心結(jié)構(gòu)有四個(gè)角色,分別是抽象工廠、具體工廠、抽象產(chǎn)品、具體產(chǎn)品2022-09-09