亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

RocketMQ中消費(fèi)者概念和消費(fèi)流程詳解

 更新時(shí)間:2023年10月11日 09:22:55   作者:金甲蟲Scarb  
這篇文章主要介紹了RocketMQ中消費(fèi)者概念和消費(fèi)流程詳解,RocketMQ是一款高性能、高可靠性的分布式消息中間件,消費(fèi)者是RocketMQ中的重要組成部分,消費(fèi)者負(fù)責(zé)從消息隊(duì)列中獲取消息并進(jìn)行處理,需要的朋友可以參考下

1. 背景

RocketMQ 的消費(fèi)可以算是 RocketMQ 的業(yè)務(wù)邏輯中最復(fù)雜的一塊。這里面涉及到許多消費(fèi)模式和特性。本想一篇文章寫完,寫到后面發(fā)現(xiàn)消費(fèi)涉及到的內(nèi)容太多,于是決定分多篇來寫。本文作為消費(fèi)系列的第一篇,主要講述 RocketMQ 消費(fèi)涉及到的模式和特性,也會(huì)概括性地講一下消費(fèi)流程。

我將 RocketMQ 的消費(fèi)流程大致分成 4 個(gè)步驟

重平衡消費(fèi)者拉取消息Broker 接收拉取請(qǐng)求后從存儲(chǔ)中查詢消息并返回消費(fèi)者消費(fèi)消息

每個(gè)步驟都會(huì)用一篇文章來講解。

先了解一下 RocketMQ 消費(fèi)涉及到地概念

2. 概念簡(jiǎn)述

2.1 消費(fèi)組概念與消費(fèi)模式

和大多數(shù)消息隊(duì)列一樣,RocketMQ 支持兩種消息模式:集群消費(fèi)(Clustering)和廣播消費(fèi)(Broadcasting)。在了解它們之前,需要先引入消費(fèi)組的概念。

2.1.1 消費(fèi)組

一個(gè)消費(fèi)者實(shí)例即是一個(gè)消費(fèi)者進(jìn)程,負(fù)責(zé)消費(fèi)消息。單個(gè)消費(fèi)者速度有限,在實(shí)際使用中通常會(huì)采用多個(gè)消費(fèi)者共同消費(fèi)同樣的 Topic 以加快消費(fèi)速度。這多個(gè)消費(fèi)同樣 Topic 的消費(fèi)者組成了消費(fèi)者組。

消費(fèi)組是一個(gè)邏輯概念,它包含了多個(gè)同一類的消費(fèi)者實(shí)例,通常這些消費(fèi)者都消費(fèi)同一類消息(都消費(fèi)相同的 Topic)且消費(fèi)邏輯一致。

消費(fèi)組的引入是用來在消費(fèi)消息時(shí)更好地進(jìn)行負(fù)載均衡和容錯(cuò)。

2.1.2 廣播消費(fèi)模式(BROADCASTING)

廣播消費(fèi)模式即全部的消息會(huì)廣播分發(fā)到所有的消費(fèi)者實(shí)例,每個(gè)消費(fèi)者實(shí)例會(huì)收到全量的消息(即便消費(fèi)組中有多個(gè)消費(fèi)者都訂閱同一 Topic)。

如下圖所示,生產(chǎn)者發(fā)送了 5 條消息,每個(gè)消費(fèi)組中的消費(fèi)者都收到全部的 5 條消息。

廣播模式使用較少,適合各個(gè)消費(fèi)者都需要通知的場(chǎng)景,如刷新應(yīng)用中的緩存。

廣播消費(fèi)模式

注意事項(xiàng):

  1. 廣播消費(fèi)模式下不支持 順序消息。
  2. 廣播消費(fèi)模式下不支持 重置消費(fèi)位點(diǎn)。
  3. 每條消息都需要被相同訂閱邏輯的多臺(tái)機(jī)器處理。
  4. 消費(fèi)進(jìn)度在客戶端維護(hù),出現(xiàn)重復(fù)消費(fèi)的概率稍大于集群模式。如果消費(fèi)進(jìn)度文件丟失,存在消息丟失的可能。
  5. 廣播模式下,消息隊(duì)列 RocketMQ 版保證每條消息至少被每臺(tái)客戶端消費(fèi)一次,但是并不會(huì)重投消費(fèi)失敗的消息,因此業(yè)務(wù)方需要關(guān)注消費(fèi)失敗的情況。
  6. 廣播模式下,客戶端每一次重啟都會(huì)從最新消息消費(fèi)??蛻舳嗽诒煌V蛊陂g發(fā)送至服務(wù)端的消息將會(huì)被自動(dòng)跳過,請(qǐng)謹(jǐn)慎選擇。
  7. 廣播模式下,每條消息都會(huì)被大量的客戶端重復(fù)處理,因此推薦盡可能使用集群模式。
  8. 廣播模式下服務(wù)端不維護(hù)消費(fèi)進(jìn)度,所以消息隊(duì)列 RocketMQ 版控制臺(tái)不支持消息堆積查詢、消息堆積報(bào)警和訂閱關(guān)系查詢功能。

2.1.3 集群消費(fèi)模式(CLUSTERING)

集群消費(fèi)模式下,同一 Topic 下的一條消息只會(huì)被同一消費(fèi)組中的一個(gè)消費(fèi)者消費(fèi)。也就是說,消息被負(fù)載均衡到了同一個(gè)消費(fèi)組的多個(gè)消費(fèi)者實(shí)例上。

更具體一點(diǎn),在同一消費(fèi)組中的不同消費(fèi)者會(huì)根據(jù)負(fù)載機(jī)制來平均地訂閱 Topic 中的每個(gè) Queue。(默認(rèn) AVG 負(fù)載方式)

廣播消費(fèi)模式

RocketMQ 默認(rèn)使用集群消費(fèi)模式,這也是大部分場(chǎng)景下會(huì)使用到的消費(fèi)模式。

2.2 消費(fèi)者拉取消息模式

2.2.1 Pull

指消費(fèi)者主動(dòng)拉取消息進(jìn)行消費(fèi),主動(dòng)從 Broker 拉取消息,主動(dòng)權(quán)由消費(fèi)者應(yīng)用控制。

2.2.2 Push

Broker 主動(dòng)將消息 Push 給消費(fèi)者,Broker 收到消息就會(huì)主動(dòng)推送到消費(fèi)者端。該模式的消費(fèi)實(shí)時(shí)性較高,也是主流場(chǎng)景中普遍采用的消費(fèi)形式。

消費(fèi)者組中的消費(fèi)者實(shí)例會(huì)根據(jù)預(yù)設(shè)的負(fù)載均衡算法對(duì) Topic 中的 Queue 進(jìn)行均勻的訂閱,每個(gè) Queue 最多只能被一個(gè)消費(fèi)者訂閱。

在 RocketMQ 中,Push 消費(fèi)其實(shí)也是由 Pull 消費(fèi)(拉?。?shí)現(xiàn)。Push 消費(fèi)只是通過客戶端 API 層面的封裝讓用戶感覺像是 Broker 在推送消息給消費(fèi)者。

2.2.3 POP

RocketMQ 5.0 引入的新消費(fèi)形式,是 Pull 拉取的另一種實(shí)現(xiàn)。也可以在 Push 模式下使用 POP 拉取消息,甚至可以和 Push 模式共同使用(分別消費(fèi)重試 Topic 和普通 Topic)。

POP 與 Pull 可以通過一個(gè)開關(guān)實(shí)時(shí)進(jìn)行切換。POP 模式下,Broker 來控制每個(gè)消費(fèi)者消費(fèi)的隊(duì)列和拉取的消息,把重平衡邏輯從客戶端移到了服務(wù)端。

主要解決了原來 Push 模式消費(fèi)的以下痛點(diǎn):

富客戶端:客戶端邏輯比較重,多語言支持不友好隊(duì)列獨(dú)占:Topic 中的一個(gè) Queue 最多只能被 1 個(gè) Push 消費(fèi)者消費(fèi),消費(fèi)者數(shù)量無法無限擴(kuò)展。且消費(fèi)者 hang 住時(shí)該隊(duì)列的消息會(huì)堆積。消費(fèi)后更新 offset:本地消費(fèi)成功才會(huì)提交 offset

RocketMQ 5.0 的輕量化 gRPC 客戶端就是基于 POP 消費(fèi)模式開發(fā)

2.3 隊(duì)列負(fù)載機(jī)制與重平衡

在集群消費(fèi)模式下,消費(fèi)組中的消費(fèi)者共同消費(fèi)訂閱的 Topic 中的所有消息,這里就存在 Topic 中的隊(duì)列如何分配給消費(fèi)者的問題。

2.3.1 隊(duì)列負(fù)載機(jī)制

RocketMQ Broker 中的隊(duì)列負(fù)載機(jī)制將一個(gè) Topic 的不同隊(duì)列按照算法盡可能平均地分配給消費(fèi)者組中的所有消費(fèi)者。RocketMQ 預(yù)設(shè)了多種負(fù)載算法供不同場(chǎng)景下的消費(fèi)。

AVG:將隊(duì)列按數(shù)量平均分配給多個(gè)消費(fèi)者,按 Broker 順序先分配第一個(gè) Broker 的所有隊(duì)列給第一個(gè)消費(fèi)者,然后給第二個(gè)。

AVG_BY_CIRCLE:將 Broker 上的隊(duì)列輪流分給不同消費(fèi)者,更適用于 Topic 在不同 Broker 之間分布不均勻的情況。

默認(rèn)采用 AVG 負(fù)載方式。

2.3.2 重平衡(Rebalance)

為消費(fèi)者分配隊(duì)列消費(fèi)的這一個(gè)負(fù)載過程并不是一勞永逸的,比如當(dāng)消費(fèi)者數(shù)量變化、Broker 掉線等情況發(fā)生后,原先的負(fù)載就變得不再均衡,此時(shí)就需要重新進(jìn)行負(fù)載均衡,這一過程被稱為重平衡機(jī)制。

每隔 20s,RocketMQ 會(huì)進(jìn)行一次檢查,檢查隊(duì)列數(shù)量、消費(fèi)者數(shù)量是否發(fā)生變化,如果變化則觸發(fā)消費(fèi)隊(duì)列重平衡,重新執(zhí)行上述負(fù)載算法。

2.4 消費(fèi)端高可靠

2.4.1 重試-死信機(jī)制

在實(shí)際使用中,消息的消費(fèi)可能出現(xiàn)失敗。RocketMQ 擁有重試機(jī)制和死信機(jī)制來保證消息消費(fèi)的可靠性。

正常消費(fèi):消費(fèi)成功則提交消費(fèi)位點(diǎn)

重試機(jī)制:如果正常消費(fèi)失敗,消息會(huì)被消費(fèi)者發(fā)回 Broker,放入重試 Topic: %RETRY%消費(fèi)者組 。最多重試消費(fèi) 16 次,重試的時(shí)間間隔逐漸變長。(消費(fèi)者組會(huì)自動(dòng)訂閱重試 Topic)。

這里地延遲重試采用了 RocketMQ 的延遲消息,重試的 16 次時(shí)間間隔為延遲消息配置的每個(gè)延遲等級(jí)的時(shí)間(從第三個(gè)等級(jí)開始)。如果修改延遲等級(jí)時(shí)間的配置,重試的時(shí)間間隔也會(huì)相應(yīng)發(fā)生變化。但即便延遲等級(jí)時(shí)間間隔配置不足 16 個(gè),仍會(huì)重試 16 次,后面按照最大的時(shí)間間隔來重試。

死信機(jī)制:如果正常消費(fèi)和重試 16 次均失敗,消息會(huì)保存到死信 Topic %DLQ%消費(fèi)者組 中,此時(shí)需人工介入處理

2.4.2 隊(duì)列負(fù)載機(jī)制與重平衡

當(dāng)發(fā)生 Broker 掛掉或者消費(fèi)者掛掉時(shí),會(huì)引發(fā)重平衡,可以自動(dòng)感知有組件掛掉的情況并重新調(diào)整消費(fèi)者的訂閱關(guān)系。

2.5 并發(fā)消費(fèi)與順序消費(fèi)

在消費(fèi)者客戶端消費(fèi)時(shí),有兩種訂閱消息的方式,分別是并發(fā)消費(fèi)和順序消費(fèi)。廣播模式不支持順序消費(fèi),僅有集群模式能使用順序消費(fèi)。

需要注意的是,這里所說的順序消費(fèi)指的是隊(duì)列維度的順序,即在消費(fèi)一個(gè)隊(duì)列時(shí),消費(fèi)消息的順序和消息發(fā)送的順序一致。如果一個(gè) Topic 有多個(gè)隊(duì)列, 是不可能達(dá)成 Topic 級(jí)別的順序消費(fèi)的,因?yàn)闊o法控制哪個(gè)隊(duì)列的消息被先消費(fèi)。Topic 只有一個(gè)隊(duì)列的情況下能夠?qū)崿F(xiàn) Topic 級(jí)別的順序消費(fèi)。

具體順序生產(chǎn)和消費(fèi)代碼見 官方文檔。

順序生產(chǎn)的方式為串行生產(chǎn),并在生產(chǎn)時(shí)指定隊(duì)列。

并發(fā)消費(fèi)的方式是調(diào)用消費(fèi)者的指定 MessageListenerConcurrently 作為消費(fèi)的回調(diào)類,順序消費(fèi)則使用 MessageListenerOrderly 類進(jìn)行回調(diào)。處理這兩種消費(fèi)方式的消費(fèi)服務(wù)也不同,分別是 ConsumeMessageConcurrentlyService ConsumeMessageOrderlyService 。

順序消費(fèi)的大致原理是依靠兩組鎖,一組在 Broker 端(Broker 鎖),鎖定隊(duì)列和消費(fèi)者的關(guān)系,保證同一時(shí)間只有一個(gè)消費(fèi)者在消費(fèi);在消費(fèi)者端也有一組鎖(消費(fèi)隊(duì)列鎖)以保證消費(fèi)的順序性。

2.6 消費(fèi)進(jìn)度保存和提交

消費(fèi)者消費(fèi)一批消息完成之后,需要保存消費(fèi)進(jìn)度。如果是集群消費(fèi)模式,還需要將消費(fèi)進(jìn)度讓其他消費(fèi)者知道,所以需要提交消費(fèi)進(jìn)度。這樣在消費(fèi)者重啟或隊(duì)列重平衡時(shí)可以根據(jù)消費(fèi)進(jìn)度繼續(xù)消費(fèi)。

不同模式下消費(fèi)進(jìn)度保存方式的不同:

  • 廣播模式:保存在消費(fèi)者本地。因?yàn)槊總€(gè)消費(fèi)者都需要消費(fèi)全量消息消息。在 LocalfileOffsetStore 當(dāng)中。
  • 集群模式:保存在 Broker,同時(shí)消費(fèi)者端緩存。因?yàn)橐粋€(gè) Topic 的消息只要被消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)即可,所以消息的消費(fèi)進(jìn)度需要統(tǒng)一保存。通過 RemoteBrokerOffsetStore 存儲(chǔ)。

集群模式下,消費(fèi)者端有定時(shí)任務(wù),定時(shí)將內(nèi)存中的消費(fèi)進(jìn)度提交到 Broker,Broker 也有定時(shí)任務(wù)將內(nèi)存中的消費(fèi)偏移量持久化到磁盤。此外,消費(fèi)者向 Broker 拉取消息時(shí)也會(huì)提交消費(fèi)偏移量。注意,消費(fèi)者線程池提交的偏移量是線程池消費(fèi)的這一批消息中偏移量最小的消息的偏移量。

  1. 消費(fèi)完一批消息后將消息消費(fèi)進(jìn)度存在本地內(nèi)存
  2. 消費(fèi)者中有一個(gè)定時(shí)線程,每 5s 將內(nèi)存中所有隊(duì)列的消費(fèi)偏移量提交到 Broker
  3. Broker 收到消費(fèi)進(jìn)度先緩存到內(nèi)存,有一個(gè)定時(shí)任務(wù)每隔 5s 將消息偏移量持久化到磁盤
  4. 消費(fèi)者向 Broker 拉取消息時(shí)也會(huì)將隊(duì)列的消息偏移量提交到 Broker

3. 消費(fèi)流程

這張圖是阿里云的文章講解消費(fèi)時(shí)用到的,能夠清晰地表示客戶端 Push 模式并發(fā)消費(fèi)流程。

img

從左上角第一個(gè)方框開始看

  1. 消費(fèi)者啟動(dòng)時(shí)喚醒重平衡服務(wù) RebalanceService,重平衡服務(wù)是客戶端開始消費(fèi)的起點(diǎn)。
  2. 重平衡服務(wù)會(huì)周期性(每 20s)執(zhí)行重平衡方法 doRebalance),查詢所有注冊(cè)的 Broker,根據(jù)注冊(cè)的 Broker 數(shù)量為自身分配負(fù)載的隊(duì)列 rebalanceByTopic()
  3. 分配完隊(duì)列后,會(huì)為每個(gè)分配到的新隊(duì)列創(chuàng)建一個(gè)消息拉取請(qǐng)求 pullRequest,這個(gè)拉取請(qǐng)求中保存一個(gè)處理隊(duì)列 processQueue,即圖中的紅黑樹(TreeMap),用來保存拉取到的消息。紅黑樹保存消息的順序。
  4. 消息拉取線程應(yīng)用生產(chǎn)-消費(fèi)模式,用一個(gè)線程從拉取請(qǐng)求隊(duì)列 pullRequestQueue 中彈出拉取請(qǐng)求,執(zhí)行拉取任務(wù),將拉取到的消息放入處理隊(duì)列。
  5. 拉取請(qǐng)求在一次拉取消息完成之后會(huì)復(fù)用,重新被放入拉取請(qǐng)求隊(duì)列 pullRequestQueue 中
  6. 拉取完成后,在 NettyClientPublicExecutorThreadPool 線程池異步處理結(jié)果,將拉取到的消息放入處理隊(duì)列,然后調(diào)用 consumeMessageService.submitConsumeRequest,將處理隊(duì)列和 多個(gè)消費(fèi)任務(wù)提交到消
  7. 費(fèi)線程池。每個(gè)消費(fèi)任務(wù)消費(fèi) 1 批消息(1 批默認(rèn)為 1 條)
  8. 每個(gè)消費(fèi)者都有一個(gè)消費(fèi)線程池 consumeMessageThreadPool ,默認(rèn)有 20 個(gè)消費(fèi)線程。
  9. 消費(fèi)線程池的每個(gè)消費(fèi)線程會(huì)嘗試從消費(fèi)任務(wù)隊(duì)列中獲取消費(fèi)請(qǐng)求,執(zhí)行消費(fèi)業(yè)務(wù)邏輯 listener.consumeMessage。
  10. 消費(fèi)完成后,如果消費(fèi)成功,則更新偏移量 updateOffset(先更新到內(nèi)存 offsetTable,定時(shí)上報(bào)到 Broker。Broker 端也先放到內(nèi)存,定時(shí)刷盤)。

到此這篇關(guān)于RocketMQ中消費(fèi)者概念和消費(fèi)流程詳解的文章就介紹到這了,更多相關(guān)RocketMQ中的消費(fèi)者內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論