亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

rocketMQ如何避免消息重復(fù)消費問題

 更新時間:2024年06月17日 16:47:20   作者:qq_16570607  
這篇文章主要介紹了rocketMQ如何避免消息重復(fù)消費問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

rocketMQ如何避免消息重復(fù)消費

在互聯(lián)網(wǎng)應(yīng)用中,尤其在網(wǎng)絡(luò)不穩(wěn)定的情況下,消息隊列 RocketMQ 的消息有可能會出現(xiàn)重復(fù),造成重復(fù)消費。

這個重復(fù)簡單可以概括為以下情況:

  • produce發(fā)送到Broker時消息重復(fù)

當一條消息已被成功發(fā)送到服務(wù)端并完成持久化,此時出現(xiàn)了網(wǎng)絡(luò)閃斷或者producer宕機,導(dǎo)致服務(wù)端Broker 對 producer應(yīng)答失敗。

如果此時生產(chǎn)者意識到消息發(fā)送失敗并嘗試再次發(fā)送消息,消費者后續(xù)會收到兩條內(nèi)容相同并且 Message ID 也相同的消息。

  • Broker投遞消息到Consumer時消息重復(fù)

消息消費的場景下,消息已投遞到消費者并完成業(yè)務(wù)處理,當客戶端給服務(wù)端反饋應(yīng)答的時候網(wǎng)絡(luò)閃斷。

為了保證消息至少被消費一次,消息隊列 RocketMQ 的服務(wù)端將在網(wǎng)絡(luò)恢復(fù)后再次嘗試投遞之前已被處理過的消息,消費者后續(xù)會收到兩條內(nèi)容相同并且 Message ID 也相同的消息。

  • 負載均衡時消息重復(fù)(包括但不限于網(wǎng)絡(luò)抖動、Broker 重啟以及訂閱方應(yīng)用重啟)

當消息隊列 RocketMQ 的 Broker或客戶端重啟、擴容或縮容時,會觸發(fā)Rebalance重平衡機制,此時消費者可能會收到重復(fù)消息。

冪等性解決方案

冪等性:就是用戶對于同一操作發(fā)起的一次請求或者多次請求的結(jié)果是一致的,不會因為多次點擊而產(chǎn)生了副作用。

從上面的分析中,我們知道,在RocketMQ中,是無法保證每個消息只被投遞一次的,所以要在業(yè)務(wù)上自行來保證消息消費的冪等性。而要處理這個問題,RocketMQ的每條消息都有一個唯一的MessageId,這個參數(shù)在多次投遞的過程中是不會改變的,所以業(yè)務(wù)上可以用這個MessageId來作為判斷冪等的關(guān)鍵依據(jù)。

但是,這個MessageId是無法保證全局唯一的,也會有沖突的情況。所以在一些對冪等性要求嚴格的場景,最好是使用業(yè)務(wù)上唯一的一個標識比較靠譜。這個id可以使用分布式中間件redis,zookeeper等去生成。例如訂單ID。而這個業(yè)務(wù)標識可以使用Message的Key來進行傳遞。

同時,建立一個消息表,拿到這個消息做數(shù)據(jù)庫的insert操作。給這個消息做一個唯一主鍵(primary key)或者唯一約束,那么就算出現(xiàn)重復(fù)消費的情況,就會導(dǎo)致主鍵沖突,那么就不再處理這條消息。

消息持久層做消息唯一性的策略(該方案未經(jīng)證實)

1.持久化過程中業(yè)務(wù)唯一標識驗證,每個消息具有業(yè)務(wù)唯一標識,在消息最終持久化之前通過驗證唯一性標識保證消息的唯一性。消息持久化位置如果出現(xiàn)同樣的消息,系統(tǒng)就不做處理,期間無任何傳遞過程,保證消息的唯一性。

2.使用過程中業(yè)務(wù)唯一標識驗證,使用過程中如果出現(xiàn)同樣的消息,系統(tǒng)進行相應(yīng)的異常處理。

rocketMQ消息重復(fù)消費場景

答案

將去重操作直接放在了消費端,消費端處理消息的業(yè)務(wù)邏輯保持冪等性。消費者收到消息后,從消息中獲取消息標識寫入到Redis(分布式鎖)或數(shù)據(jù)庫(標識作為表唯一索引插入一條記錄),當再次收到該消息時就不作處理。

在broker端對Queue加鎖(synchronized),Consumer監(jiān)聽的Queue存在已投遞但未收到ack且未超時的消息,不允許獲取鎖,直到該Queue投遞的消息全部ack或者消費超時,才允許新的Consumer獲取鎖,拉取消息。

思考問題

  • 1、為什么不在生產(chǎn)者去重?
  • 2、為什么在消費者做去重?

一、網(wǎng)絡(luò)波動導(dǎo)致系統(tǒng)A消息發(fā)送到RocketMQ后沒有收到消息發(fā)送超時,系統(tǒng)A重試導(dǎo)致消息重復(fù)  

1、RocketMQ支持消息查詢的功能,消息發(fā)送前去RocketMQ查詢一下是否已經(jīng)發(fā)送過該條消息,存在則不發(fā)送,不存在發(fā)送到RocketMQ。在高并發(fā)的場景下,每條消息在發(fā)送到RocketMQ時都去查詢一下,會影響接口的性能。

2、redis分布式鎖,在發(fā)送消息到RocketMQ成功之后,向redis中插入一條數(shù)據(jù),如果發(fā)生重試,則先去redis中查詢是否存在,存在的話不再發(fā)送消息。redis集群此時宕機,再次查詢redis判斷消息是否已經(jīng)發(fā)送過,無法得到正確結(jié)果的。

以上兩種方式只是保證只發(fā)送了一次,不能保證消費只一次的情況。

二、MQ要保證消息投遞的可靠性,對未ack的消息,會重復(fù)投遞。

  • 場景一:broker發(fā)送Consumer超時后重新發(fā)送

消費者端要保證消費的冪等性,從消息中獲取消息標識寫入到Redis或數(shù)據(jù)庫,當再次收到該消息時不作處理。

  • 場景二:負載均衡階段,前一個監(jiān)聽Queue的消費實例拉取的消息未全部ack,新的消費實例監(jiān)聽到這個Queue重新拉取消息。

在負載均衡結(jié)果變化過程增加了一個過渡態(tài),在過渡態(tài)的時候,Consumer會繼續(xù)保留上一次負載均衡的結(jié)果,直到原消費者拉取的消息全部ack,才釋放老的結(jié)果。

在broker端對Queue加鎖(synchronized),Consumer監(jiān)聽的Queue存在已投遞但未收到ack且未超時的消息,不允許獲取鎖,直到該Queue投遞的消息全部ack或者消費超時,才允許新的Consumer獲取鎖,拉取消息。

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評論