基于rocketmq的有序消費模式和并發(fā)消費模式的區(qū)別說明
rocketmq消費者注冊監(jiān)聽有兩種模式
有序消費MessageListenerOrderly和并發(fā)消費MessageListenerConcurrently,這兩種模式返回值不同。
MessageListenerOrderly
正確消費返回
ConsumeOrderlyStatus.SUCCESS
稍后消費返回
ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
MessageListenerConcurrently
正確消費返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS
稍后消費返回
ConsumeConcurrentlyStatus.RECONSUME_LATER
顧名思義,有序消費模式是按照消息的順序進行消費,但是除此之外,在實踐過程中我發(fā)現(xiàn)和并發(fā)消費模式還有很大的區(qū)別的。
第一,速度,下面我打算用實驗來探究一下。
使用mq發(fā)送消息,消費者使用有序消費模式消費,具體的業(yè)務(wù)是阻塞100ms
Long totalTime = 0L;
Date date1 = null;
Date date2 = new Date();
new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
logger.info("==========CONSUME_START===========");
logger.info(Thread.currentThread().getName()
+ " Receive New Messages: " + msgs.size());
try {
if(date1 == null)
date1 = new Date();//在第一次消費時初始化
Thread.sleep(100);
logger.info("total:"+(++total));
date2 = new Date();
totalTime = (date2.getTime() - date1.getTime());
logger.info("totalTime:"+totalTime);
logger.info("==========CONSUME_SUCCESS===========");
return ConsumeOrderlyStatus.SUCCESS;
}catch (Exception e) {
logger.info("==========RECONSUME_LATER===========");
logger.error(e.getMessage(),e);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
}
消費100條消息

速度挺快的,為了讓結(jié)果更準確,將消息加到1000條
消費1000條消息

可以看到每一條消息平均耗時25ms,然而業(yè)務(wù)是阻塞100ms,這說明有序消費模式和同步消費可能并不是一回事,那如果不阻塞代碼我們再來看一下結(jié)果

不阻塞過后速度明顯提高了,那么我阻塞300ms會怎么樣呢?

時間相比阻塞100ms多了2倍
接下來我們測試并發(fā)消費模式
Long totalTime = 0L;
Date date1 = null;
Date date2 = new Date();
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List< MessageExt > msgs, ConsumeConcurrentlyContext context) {
logger.info(Thread.currentThread().getName()
+ " Receive New Messages: " + msgs.size());
try {
if(date1 == null)
date1 = new Date();
Thread.sleep(100);
logger.info("total:"+(++total));
date2 = new Date();
totalTime = (date2.getTime() - date1.getTime());
logger.info("totalTime:"+totalTime);
logger.info("==========CONSUME_SUCCESS===========");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.info("==========RECONSUME_LATER===========");
logger.error(e.getMessage(),e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
基于上次的經(jīng)驗,同樣測試三種情況,消費1000條不阻塞,消費1000條阻塞100ms,消費1000條阻塞300ms
消費1000條不阻塞的情況

和有序消費模式差不多,快個一兩秒。
消費1000條阻塞100ms

竟然比不阻塞的情況更快,可能是誤差把
消費1000條阻塞300ms

速度稍慢,但是還是比有序消費快得多。
結(jié)論是并發(fā)消費的消費速度要比有序消費更快。
另一個區(qū)別是消費失敗時的處理不同,有序消費模式返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT后,消費者會立馬消費這條消息,而使用并發(fā)消費模式,返回ConsumeConcurrentlyStatus.RECONSUME_LATER后,要過好幾秒甚至十幾秒才會再次消費。
我是在只有一條消息的情況下測試的。更重要的區(qū)別是,
返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT并不會增加消息的消費次數(shù),mq消息有個默認最大消費次數(shù)16,消費次數(shù)到了以后,這條消息會進入死信隊列,這個最大消費次數(shù)是可以在mqadmin中設(shè)置的。
mqadmin updateSubGroup -n 127.0.0.1:9876 -c DefaultCluster -g MonitorCumsumerGroupName -r 3
我測試后發(fā)現(xiàn),并發(fā)模式下返回ConsumeConcurrentlyStatus.RECONSUME_LATER,同一個消息到達最大消費次數(shù)之后就不會再出現(xiàn)了。這說明有序消費模式可能并沒有這個機制,這意味著你再有序消費模式下拋出固定異常,那么這條異常信息將會被永遠消費,并且很可能會影響之后正常的消息。下面依然做個試驗
Map<String, Integer> map = new HashMap<>();//保存消息錯誤消費次數(shù)
new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
try {
if(1 == 1)
throw new Exception();
return ConsumeOrderlyStatus.SUCCESS;
}catch (Exception e) {
MessageExt msg = msgs.get(0);
if(map.containsKey(msg.getKeys())) {//消息每消費一次,加1
map.put(msg.getKeys(), map.get(msg.getKeys()) + 1);
}else {
map.put(msg.getKeys(), 1);
}
logger.info(msg.getKeys()+":"+map.get(msg.getKeys()));
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
}
發(fā)送了十條消息

可以看到雖然我發(fā)了十條消息,但是一直在消費同樣四條消息,這可能跟消息broker有默認四條隊列有關(guān)系。同時從時間可以看到,消費失敗后,會馬上拉這條信息。
至于并發(fā)消費模式則不會無限消費,而且消費失敗后不會馬上再消費。具體的就不嘗試了。
結(jié)論是有序消費模式MessageListenerOrderly要慎重地處理異常,我則是用全局變量記錄消息的錯誤消費次數(shù),只要消費次數(shù)達到一定次數(shù),那么就直接返回ConsumeOrderlyStatus.SUCCESS。
突然想到之前測試有序消費模式MessageListenerOrderly的時候為什么1000條消息阻塞100ms耗時25000ms了,因為有序消費模式是同時拉取四條隊列消息的,這就對上了。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot使用Validator進行參數(shù)校驗實戰(zhàn)教程(自定義校驗,分組校驗)
這篇文章主要介紹了SpringBoot使用Validator進行參數(shù)校驗(自定義校驗,分組校驗)的實戰(zhàn)教程,本文通過示例代碼給大家介紹的非常詳細,需要的朋友參考下吧2023-07-07
Java設(shè)計模式之簡單工廠 工廠方法 抽象工廠深度總結(jié)
設(shè)計模式(Design Pattern)是前輩們對代碼開發(fā)經(jīng)驗的總結(jié),是解決特定問題的一系列套路。它不是語法規(guī)定,而是一套用來提高代碼可復(fù)用性、可維護性、可讀性、穩(wěn)健性以及安全性的解決方案2021-09-09
淺談springboot一個service內(nèi)組件的加載順序
這篇文章主要介紹了springboot一個service內(nèi)組件的加載順序,具有很好的參考價值,希望對大家有所幫助。以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家2021-08-08
spring-core組件詳解——PropertyResolver屬性解決器
這篇文章主要介紹了spring-core組件詳解——PropertyResolver屬性解決器,需要的朋友可以參考下2016-05-05

