Java的RocketMq隊(duì)列之消息可靠性詳解
1. 消息的發(fā)送流程
一條消息從生產(chǎn)到被消費(fèi),將會經(jīng)歷三個階段:
- 生產(chǎn)階段,Producer 新建消息,然后通過網(wǎng)絡(luò)將消息投遞給 MQ Broker
- 存儲階段,消息將會存儲在 Broker 端磁盤中
- 消息階段, Consumer 將會從 Broker 拉取消息
以上任一階段都可能會丟失消息,我們只要找到這三個階段丟失消息原因,采用合理的辦法避免丟失,就可以徹底解決消息丟失的問題。
2. 生產(chǎn)階段
生產(chǎn)者(Producer) 通過網(wǎng)絡(luò)發(fā)送消息給 Broker,當(dāng) Broker 收到之后,將會返回確認(rèn)響應(yīng)信息給 Producer。所以生產(chǎn)者只要接收到返回的確認(rèn)響應(yīng),就代表消息在生產(chǎn)階段未丟失。
發(fā)送模式
可靠同步發(fā)送
- 原理:同步發(fā)送是指消息發(fā)送方發(fā)出數(shù)據(jù)后,會在收到接收方發(fā)回響應(yīng)之后才發(fā)下一個數(shù)據(jù)包的通訊方式。
- 場景:此種方式應(yīng)用場景非常廣泛,例如重要通知郵件、報(bào)名短信通知、營銷短信系統(tǒng)等。
- 類似推拉的形式 發(fā)送 ->同步返回 ->發(fā)送 ->同步返回
可靠異步發(fā)送
- 原理:異步發(fā)送是指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應(yīng),接著發(fā)送下個數(shù)據(jù)包的通訊方式。 MQ 的異步發(fā)送,需要用戶實(shí)現(xiàn)異步發(fā)送回調(diào)接口(SendCallback)。消息發(fā)送方在發(fā)送了一條消息后,不需要等待服務(wù)器響應(yīng)即可返回,進(jìn)行第二條消息發(fā)送。發(fā)送方通過回調(diào)接口接收服務(wù)器響應(yīng),并對響應(yīng)結(jié)果進(jìn)行處理。
- 場景:異步發(fā)送一般用于鏈路耗時(shí)較長,對響應(yīng)時(shí)間較為敏感的業(yè)務(wù)場景,例如用戶視頻上傳后通知啟動轉(zhuǎn)碼服務(wù),轉(zhuǎn)碼完成后通知推送轉(zhuǎn)碼結(jié)果等。 耗時(shí)比較長的 可以不需要同步返回給用戶的
單向(Oneway)發(fā)送
- 原理:單向(Oneway)發(fā)送特點(diǎn)為發(fā)送方只負(fù)責(zé)發(fā)送消息,不等待服務(wù)器回應(yīng)且沒有回調(diào)函數(shù)觸發(fā),即只發(fā)送請求不等待應(yīng)答。 此方式發(fā)送消息的過程耗時(shí)非常短,一般在微秒級別。
- 場景:適用于某些耗時(shí)非常短,但對可靠性要求并不高的場景,例如日志收集。
RocketMQ 發(fā)送消息示例代碼如下:
DefaultMQProducer mqProducer=new DefaultMQProducer("test"); // 設(shè)置 nameSpace 地址 mqProducer.setNamesrvAddr("namesrvAddr"); mqProducer.start(); Message msg = new Message("test_topic" /* Topic */, "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 發(fā)送消息到一個Broker try { SendResult sendResult = mqProducer.send(msg); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }
send 方法是一個同步操作,只要這個方法不拋出任何異常,就代表消息已經(jīng)發(fā)送成功。
消息發(fā)送成功僅代表消息已經(jīng)到了 Broker 端,Broker 在不同配置下,可能會返回不同響應(yīng)狀態(tài):
- SendStatus.SEND_OK
- SendStatus.FLUSH_DISK_TIMEOUT
- SendStatus.FLUSH_SLAVE_TIMEOUT
- SendStatus.SLAVE_NOT_AVAILABLE
引用官方狀態(tài)說明:
另外 RocketMQ 還提供異步的發(fā)送的方式,適合于鏈路耗時(shí)較長,對響應(yīng)時(shí)間較為敏感的業(yè)務(wù)場景。
DefaultMQProducer mqProducer = new DefaultMQProducer("test"); // 設(shè)置 nameSpace 地址 mqProducer.setNamesrvAddr("127.0.0.1:9876"); mqProducer.setRetryTimesWhenSendFailed(5); mqProducer.start(); Message msg = new Message("test_topic" /* Topic */, "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); try { // 異步發(fā)送消息到,主線程不會被阻塞,立刻會返回 mqProducer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 消息發(fā)送成功, } @Override public void onException(Throwable e) { // 消息發(fā)送失敗,可以持久化這條數(shù)據(jù),后續(xù)進(jìn)行補(bǔ)償處理 } }); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }
異步發(fā)送消息一定要注意重寫回調(diào)方法,在回調(diào)方法中檢查發(fā)送結(jié)果。
不管是同步還是異步的方式,都會碰到網(wǎng)絡(luò)問題導(dǎo)致發(fā)送失敗的情況。針對這種情況,我們可以設(shè)置合理的重試次數(shù),當(dāng)出現(xiàn)網(wǎng)絡(luò)問題,可以自動重試。設(shè)置方式如下:
// 同步發(fā)送消息重試次數(shù),默認(rèn)為 2 mqProducer.setRetryTimesWhenSendFailed(3); // 異步發(fā)送消息重試次數(shù),默認(rèn)為 2 mqProducer.setRetryTimesWhenSendAsyncFailed(3);
總結(jié)
producer消息發(fā)送方式雖然有3種,但為了減小丟失消息的可能性盡量采用同步的發(fā)送方式,producer同步等待broker響應(yīng)消息的發(fā)送結(jié)果,利用同步發(fā)送+重試機(jī)制+多個master節(jié)點(diǎn),盡可能減小消息丟失的可能性。
3. Broker 存儲階段
默認(rèn)情況下,消息只要到了 Broker 端,將會優(yōu)先保存到內(nèi)存中,然后立刻返回確認(rèn)響應(yīng)給生產(chǎn)者。隨后 Broker 定期批量的將一組消息從內(nèi)存異步刷入磁盤。
這種方式減少 I/O 次數(shù),可以取得更好的性能,但是如果發(fā)生機(jī)器掉電,異常宕機(jī)等情況,消息還未及時(shí)刷入磁盤,就會出現(xiàn)丟失消息的情況。
若想保證 Broker 端不丟消息,保證消息的可靠性,我們需要將消息保存機(jī)制修改為同步刷盤方式,即消息存儲磁盤成功,才會返回響應(yīng)。
修改 Broker 端配置如下:
## 默認(rèn)情況為 ASYNC_FLUSH flushDiskType = SYNC_FLUSH
若 Broker 未在同步刷盤時(shí)間內(nèi)(默認(rèn)為 5s)完成刷盤,將會返回 SendStatus.FLUSH_DISK_TIMEOUT 狀態(tài)給生產(chǎn)者。
集群部署
為了保證可用性,Broker 通常采用一主(master)多從(slave)部署方式。為了保證消息不丟失,消息還需要復(fù)制到 slave 節(jié)點(diǎn)。
默認(rèn)方式下,消息寫入 master 成功,就可以返回確認(rèn)響應(yīng)給生產(chǎn)者,接著消息將會異步復(fù)制到 slave 節(jié)點(diǎn)。
注:master 配置:flushDiskType = SYNC_FLUSH
此時(shí)若 master 突然宕機(jī)且不可恢復(fù),那么還未復(fù)制到 slave 的消息將會丟失。
為了進(jìn)一步提高消息的可靠性,我們可以采用同步的復(fù)制方式,master 節(jié)點(diǎn)將會同步等待 slave 節(jié)點(diǎn)復(fù)制完成,才會返回確認(rèn)響應(yīng)。
異步復(fù)制與同步復(fù)制區(qū)別:
- Sync Broker:生產(chǎn)者發(fā)送的每一條消息都至少同步復(fù)制到一個slave后才返回告訴生產(chǎn)者成功,即“同步雙寫”。
- Async Broker:生產(chǎn)者發(fā)送的每一條消息只要寫入master就返回告訴生產(chǎn)者成功。然后再“異步復(fù)制”到slave。
Broker master 節(jié)點(diǎn) 同步復(fù)制配置如下:
## 默認(rèn)為 ASYNC_MASTER brokerRole=SYNC_MASTER
如果 slave 節(jié)點(diǎn)未在指定時(shí)間內(nèi)同步返回響應(yīng),生產(chǎn)者將會收到 SendStatus.FLUSH_SLAVE_TIMEOUT 返回狀態(tài)。
總結(jié)
在broker端,消息丟失的可能性主要在于刷盤策略和同步機(jī)制。 RocketMQ默認(rèn)broker的刷盤策略為異步刷盤,如果有主從,同步策略也默認(rèn)的是異步同步,這樣子可以提高broker處理消息的效率,但是會有丟失的可能性。因此可以通過同步刷盤策略+同步slave策略+主從的方式解決丟失消息的可能。
結(jié)合生產(chǎn)階段與存儲階段,若需要嚴(yán)格保證消息不丟失,broker 需要采用如下配置:
## master 節(jié)點(diǎn)配置 flushDiskType = SYNC_FLUSH brokerRole=SYNC_MASTER ## slave 節(jié)點(diǎn)配置 brokerRole=slave flushDiskType = SYNC_FLUSH
同時(shí)這個過程我們還需要生產(chǎn)者配合,判斷返回狀態(tài)是否是 SendStatus.SEND_OK。若是其他狀態(tài),就需要考慮補(bǔ)償重試。
雖然上述配置提高消息的高可靠性,但是會降低性能,生產(chǎn)實(shí)踐中需要綜合選擇。
4. 消費(fèi)階段
從producer投遞消息到broker,即使前面這些過程保證了消息正常持久化,但如果consumer消費(fèi)消息沒有消費(fèi)到也不能理解為消息絕對的可靠。因此RockerMQ默認(rèn)提供了At least Once機(jī)制保證消息可靠消費(fèi)。
何為At least Once?
Consumer先pull 消息到本地,消費(fèi)完成后,才向服務(wù)器返回ack。
通常消費(fèi)消息的ack機(jī)制一般分為兩種思路:
1、先提交后消費(fèi);
2、先消費(fèi),消費(fèi)成功后再提交;
思路一可以解決重復(fù)消費(fèi)的問題但是會丟失消息,因此Rocket默認(rèn)實(shí)現(xiàn)的是思路二,由各自consumer業(yè)務(wù)方保證冪等來解決重復(fù)消費(fèi)問題。
消費(fèi)者從 broker 拉取消息,然后執(zhí)行相應(yīng)的業(yè)務(wù)邏輯。一旦執(zhí)行成功,將會返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 狀態(tài)給 Broker。
如果 Broker 未收到消費(fèi)確認(rèn)響應(yīng)或收到其他狀態(tài),消費(fèi)者下次還會再次拉取到該條消息,進(jìn)行重試。這樣的方式有效避免了消費(fèi)者消費(fèi)過程發(fā)生異常,或者消息在網(wǎng)絡(luò)傳輸中丟失的情況。
消息消費(fèi)的代碼如下:
// 實(shí)例化消費(fèi)者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer"); // 設(shè)置NameServer的地址 consumer.setNamesrvAddr("namesrvAddr"); // 訂閱一個或者多個Topic,以及Tag來過濾需要消費(fèi)的消息 consumer.subscribe("test_topic", "*"); // 注冊回調(diào)實(shí)現(xiàn)類來處理從broker拉取回來的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // 執(zhí)行業(yè)務(wù)邏輯 // 標(biāo)記該消息已經(jīng)被成功消費(fèi) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費(fèi)者實(shí)例 consumer.start();
以上消費(fèi)消息過程的,我們需要注意返回消息狀態(tài)。只有當(dāng)業(yè)務(wù)邏輯真正執(zhí)行成功,我們才能返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS。否則我們需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,稍后再重試。
5. 總結(jié)
最后我們還可以說出我們的思考,雖然提高消息可靠性,但是可能導(dǎo)致消息重發(fā),重復(fù)消費(fèi)。所以對于消費(fèi)客戶端,需要注意保證冪等性。
到此這篇關(guān)于Java的RocketMq隊(duì)列之消息可靠性詳解的文章就介紹到這了,更多相關(guān)RocketMq消息可靠性內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis-plus自動填充不生效或自動填充數(shù)據(jù)為null原因及解決方案
本文主要介紹了Mybatis-plus自動填充不生效或自動填充數(shù)據(jù)為null原因及解決方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-05-05詳解Java如何進(jìn)行Base64的編碼(Encode)與解碼(Decode)
這篇文章主要介紹了詳解Java如何進(jìn)行Base64的編碼(Encode)與解碼(Decode),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-03-03SpringCloud使用Zookeeper作為配置中心的示例
這篇文章主要介紹了SpringCloud使用Zookeeper作為配置中心的示例,幫助大家更好的理解和學(xué)習(xí)使用SpringCloud,感興趣的朋友可以了解下2021-04-04