rocketMQ如何避免消息重復(fù)消費問題
rocketMQ如何避免消息重復(fù)消費
在互聯(lián)網(wǎng)應(yīng)用中,尤其在網(wǎng)絡(luò)不穩(wěn)定的情況下,消息隊列 RocketMQ 的消息有可能會出現(xiàn)重復(fù),造成重復(fù)消費。
這個重復(fù)簡單可以概括為以下情況:
- produce發(fā)送到Broker時消息重復(fù)
當一條消息已被成功發(fā)送到服務(wù)端并完成持久化,此時出現(xiàn)了網(wǎng)絡(luò)閃斷或者producer宕機,導(dǎo)致服務(wù)端Broker 對 producer應(yīng)答失敗。
如果此時生產(chǎn)者意識到消息發(fā)送失敗并嘗試再次發(fā)送消息,消費者后續(xù)會收到兩條內(nèi)容相同并且 Message ID 也相同的消息。
- Broker投遞消息到Consumer時消息重復(fù)
消息消費的場景下,消息已投遞到消費者并完成業(yè)務(wù)處理,當客戶端給服務(wù)端反饋應(yīng)答的時候網(wǎng)絡(luò)閃斷。
為了保證消息至少被消費一次,消息隊列 RocketMQ 的服務(wù)端將在網(wǎng)絡(luò)恢復(fù)后再次嘗試投遞之前已被處理過的消息,消費者后續(xù)會收到兩條內(nèi)容相同并且 Message ID 也相同的消息。
- 負載均衡時消息重復(fù)(包括但不限于網(wǎng)絡(luò)抖動、Broker 重啟以及訂閱方應(yīng)用重啟)
當消息隊列 RocketMQ 的 Broker或客戶端重啟、擴容或縮容時,會觸發(fā)Rebalance重平衡機制,此時消費者可能會收到重復(fù)消息。
冪等性解決方案
冪等性:就是用戶對于同一操作發(fā)起的一次請求或者多次請求的結(jié)果是一致的,不會因為多次點擊而產(chǎn)生了副作用。
從上面的分析中,我們知道,在RocketMQ中,是無法保證每個消息只被投遞一次的,所以要在業(yè)務(wù)上自行來保證消息消費的冪等性。而要處理這個問題,RocketMQ的每條消息都有一個唯一的MessageId,這個參數(shù)在多次投遞的過程中是不會改變的,所以業(yè)務(wù)上可以用這個MessageId來作為判斷冪等的關(guān)鍵依據(jù)。
但是,這個MessageId是無法保證全局唯一的,也會有沖突的情況。所以在一些對冪等性要求嚴格的場景,最好是使用業(yè)務(wù)上唯一的一個標識比較靠譜。這個id可以使用分布式中間件redis,zookeeper等去生成。例如訂單ID。而這個業(yè)務(wù)標識可以使用Message的Key來進行傳遞。
同時,建立一個消息表,拿到這個消息做數(shù)據(jù)庫的insert操作。給這個消息做一個唯一主鍵(primary key)或者唯一約束,那么就算出現(xiàn)重復(fù)消費的情況,就會導(dǎo)致主鍵沖突,那么就不再處理這條消息。
消息持久層做消息唯一性的策略(該方案未經(jīng)證實)
1.持久化過程中業(yè)務(wù)唯一標識驗證,每個消息具有業(yè)務(wù)唯一標識,在消息最終持久化之前通過驗證唯一性標識保證消息的唯一性。消息持久化位置如果出現(xiàn)同樣的消息,系統(tǒng)就不做處理,期間無任何傳遞過程,保證消息的唯一性。
2.使用過程中業(yè)務(wù)唯一標識驗證,使用過程中如果出現(xiàn)同樣的消息,系統(tǒng)進行相應(yīng)的異常處理。
rocketMQ消息重復(fù)消費場景
答案
將去重操作直接放在了消費端,消費端處理消息的業(yè)務(wù)邏輯保持冪等性。消費者收到消息后,從消息中獲取消息標識寫入到Redis(分布式鎖)或數(shù)據(jù)庫(標識作為表唯一索引插入一條記錄),當再次收到該消息時就不作處理。
在broker端對Queue加鎖(synchronized),Consumer監(jiān)聽的Queue存在已投遞但未收到ack且未超時的消息,不允許獲取鎖,直到該Queue投遞的消息全部ack或者消費超時,才允許新的Consumer獲取鎖,拉取消息。
思考問題
- 1、為什么不在生產(chǎn)者去重?
- 2、為什么在消費者做去重?
一、網(wǎng)絡(luò)波動導(dǎo)致系統(tǒng)A消息發(fā)送到RocketMQ后沒有收到消息發(fā)送超時,系統(tǒng)A重試導(dǎo)致消息重復(fù)
1、RocketMQ支持消息查詢的功能,消息發(fā)送前去RocketMQ查詢一下是否已經(jīng)發(fā)送過該條消息,存在則不發(fā)送,不存在發(fā)送到RocketMQ。在高并發(fā)的場景下,每條消息在發(fā)送到RocketMQ時都去查詢一下,會影響接口的性能。
2、redis分布式鎖,在發(fā)送消息到RocketMQ成功之后,向redis中插入一條數(shù)據(jù),如果發(fā)生重試,則先去redis中查詢是否存在,存在的話不再發(fā)送消息。redis集群此時宕機,再次查詢redis判斷消息是否已經(jīng)發(fā)送過,無法得到正確結(jié)果的。
以上兩種方式只是保證只發(fā)送了一次,不能保證消費只一次的情況。
二、MQ要保證消息投遞的可靠性,對未ack的消息,會重復(fù)投遞。
- 場景一:broker發(fā)送Consumer超時后重新發(fā)送
消費者端要保證消費的冪等性,從消息中獲取消息標識寫入到Redis或數(shù)據(jù)庫,當再次收到該消息時不作處理。
- 場景二:負載均衡階段,前一個監(jiān)聽Queue的消費實例拉取的消息未全部ack,新的消費實例監(jiān)聽到這個Queue重新拉取消息。
在負載均衡結(jié)果變化過程增加了一個過渡態(tài),在過渡態(tài)的時候,Consumer會繼續(xù)保留上一次負載均衡的結(jié)果,直到原消費者拉取的消息全部ack,才釋放老的結(jié)果。
在broker端對Queue加鎖(synchronized),Consumer監(jiān)聽的Queue存在已投遞但未收到ack且未超時的消息,不允許獲取鎖,直到該Queue投遞的消息全部ack或者消費超時,才允許新的Consumer獲取鎖,拉取消息。
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
java批量導(dǎo)入導(dǎo)出文件的實例分享(兼容xls,xlsx)
這篇文章主要給大家介紹了利用java批量導(dǎo)入導(dǎo)出文件的相關(guān)資料,文中給出了詳細的實例代碼,并且兼容xls,xlsx,對大家具有一定的參考學(xué)習(xí)價值,下面跟著小編一起來看看詳細的介紹吧。2017-06-06springboot的異步任務(wù):無返回值和有返回值問題
這篇文章主要介紹了springboot的異步任務(wù):無返回值和有返回值問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07Java中的強制類型轉(zhuǎn)換 大數(shù)轉(zhuǎn)小數(shù)
這里主要討論一下大數(shù)轉(zhuǎn)小數(shù),比如int類型轉(zhuǎn)short類型。小數(shù)轉(zhuǎn)大數(shù),如short 轉(zhuǎn) int不做討論,需要的朋友可以參考下2020-02-02SpringBoot項目為何引入大量的starter?如何自定義starter?
這篇文章主要介紹了SpringBoot項目為何引入大量的starter?如何自定義starter?文章基于這兩個問題展開全文,需要的小伙伴可以參考一下2022-04-04Spring Boot 啟動、停止、重啟、狀態(tài)腳本
今天給大家分享Spring Boot 項目腳本(啟動、停止、重啟、狀態(tài)),通過示例代碼給大家介紹的非常詳細,需要的朋友參考下吧2021-06-06