RocketMQ中消費者的消費進度管理
RocketMQ消費進度管理
業(yè)務實現消費回調的時候,當且僅當此回調函數返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS ,RocketMQ才會認為這批消息(默認是1條)是消費完成的
如果這時候消息消費失敗,例如數據庫異常,余額不足扣款失敗等一切業(yè)務認為消息需要重試的場景,只要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER ,RocketMQ就會認為這批消息消費失敗了。
為了保證消息是肯定被至少消費成功一次,RocketMQ會把這批消費失敗的消息重發(fā)回Broker(topic不是原topic而是這個消費租的RETRY topic),在延遲的某個時間點(默認是10秒,業(yè)務可設置)后,再次投遞到這個ConsumerGroup。而如果一直這樣重復消費都持續(xù)失敗到一定次數(默 認16次),就會投遞到DLQ死信隊列。應用可以監(jiān)控死信隊列來做人工干預。
從哪里開始消費
當新實例啟動的時候,PushConsumer會拿到本消費組broker已經記錄好的消費進度,如果這個消費進度在Broker并沒有存儲起來,證明這個是一個全新的消費組,這時候客戶端有幾個策略可以選擇:
CONSUME_FROM_LAST_OFFSET //默認策略,從該隊列最尾開始消費,即跳過歷史消息 CONSUME_FROM_FIRST_OFFSET //從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍 CONSUME_FROM_TIMESTAMP//從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前
消息ACK機制
RocketMQ是以consumer group+queue為單位是管理消費進度的,以一個consumer offset標記這個消費組在這條queue上的消費進度。
每次消息成功后,本地的消費進度會被更新,然后由定時器定時同步到broker(即,不是立刻同步到broker,有一段時間消費進度只會存在于本地,此時如果宕機,那么未提交的消費進度就會被重新消費),以此持久化消費進度。但是每次記錄消費進度的時候,只會把一批消息中最小的offset值為消費進度值,如下圖:
比如2消費失敗,rocketmq跳過2消費到了8,8消費成功了,但是提交的時候只會提交【消費到了1】,因為2失敗了,所以會提交最小成功點
重復消費問題
由于消費進度只是記錄了一個下標,就可能出現拉取了100條消息如 2101-2200的消息,后面99條都消費結束了,只有2101消費一直沒有結束的情況。
在這種情況下,RocketMQ為了保證消息肯定被消費成功,消費進度職能維持在2101,直到2101也消費結束了,本地的消費進度才能標記2200消費結束了(注:consumerOffset=2201)。 在這種設計下,就有消費大量重復的風險。如2101在還沒有消費完成的時候消費實例突然退出(機器斷電,或者被kill)。這條queue的消費進度還是維持在2101,當queue重新分配給新的實例的時候,新的實例從broker上拿到的消費進度還是維持在2101,這時候就會又從2101開始消費,2102-2200這批消息實際上已經被消費過還是會投遞一次。
對于這個場景,RocketMQ暫時無能為力,所以業(yè)務必須要保證消息消費的冪等性,這也是RocketMQ官方多次強調的態(tài)度。
重復消費驗證
查看當前消費進度
檢查隊列消費的當前進度。 查看RocketMQ 的config文件夾下的 consumerOffset.json
cat consumerOffset.json
通過consumerOffset.json我們可以知道當前 topicTest
主題的 rocket_test_consumer_group
組的 queue2
消費到偏移量為32
消費者發(fā)送消息
消費者發(fā)送消息,并查看各個隊列消息的偏移量
我們發(fā)現隊列2的偏移量最小為32 消費的時候最小偏移量不提交,其他都正常
//隊列2的偏移量為32的數據在等待 if (ext.getQueueId() == 2 && ext.getQueueOffset() == 32) { System.out.println("消息消費耗時較廠接收queueId:[" + ext.getQueueId() + "],偏移量 offset:[" + ext.getQueueOffset() + "]"); //等待 模擬假死狀態(tài) try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } }
運行查看日志
我們發(fā)現只有隊列2的偏移量為32的消息消費超時,其他都已經正常消費 我們再查看下consumerOffset.json
cat consumerOffset.json
我們發(fā)現因為rocketMQ 整個消費記錄都沒有被提交,所以下次消費會全部再次消費。 這里模擬出了整個消費進度都在本地,沒來得及提交給broker。
還有一種情況就是,進度成功提交給broker了,queue0、1、3的消費進度都改變了。但是queue2的消費進度還是32,因為消費32的時候超時了,rocketmq只能提交最小成功offset!
再次消費
去掉延時代碼繼續(xù)消費
我們發(fā)現消息被重復消費了一遍
到此這篇關于RocketMQ中消費者的消費進度管理的文章就介紹到這了,更多相關RocketMQ消費進度管理內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
聊聊spring boot的WebFluxTagsProvider的使用
這篇文章主要介紹了聊聊spring boot的WebFluxTagsProvider的使用,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-07-07詳解MyBatis-Plus Wrapper條件構造器查詢大全
這篇文章主要介紹了詳解MyBatis-Plus Wrapper條件構造器查詢大全,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-08-08Spring+SpringMVC+MyBatis深入學習及搭建(二)之MyBatis原始Dao開發(fā)和mapper代理開發(fā)
這篇文章主要介紹了Spring+SpringMVC+MyBatis深入學習及搭建(二)之MyBatis原始Dao開發(fā)和mapper代理開發(fā),需要的朋友可以參考下2017-05-05