RocketMQ順序消息的原理與特點
一、什么是順序消息
消息有序指的是,消費者端消費消息時,需按照消息的發(fā)送順序來消費,即先發(fā)送的消息,需要先消費(FIFO)。
舉個容易理解的例子:通常創(chuàng)建訂單后,會經(jīng)歷一系列的操作:【訂單創(chuàng)建 -> 訂單支付 -> 訂單發(fā)貨 -> 訂單配送 -> 訂單完成】。在創(chuàng)建完訂單后,會發(fā)送五條消息到MQ Broker中,消費的時候要按照【訂單創(chuàng)建 -> 訂單支付 -> 訂單發(fā)貨 -> 訂單配送 -> 訂單完成】這個順序去消費,這樣的訂單才是有效的。
RocketMQ采用局部順序一致性的機制,實現(xiàn)了單個隊列中消息的有序性,使用FIFO順序提供有序消息。簡而言之,我們的消息要保證有序,就必須把一組消息存放在同一個隊列,然后由Consumer進行逐一消費。
但是如果碰到高并發(fā)的情況,消息不就會阻塞了嗎?
RocketMQ給的解決方案是按照業(yè)務(wù)去劃分不同的隊列,然后并行消費,提高消息的處理速度的同時避免消息堆積。
RocketMQ可以嚴格的保證消息有序,可以分為分區(qū)有序或者全局有序。
- 全局有序:全局順序時使用一個queue;
- 分區(qū)有序:局部順序時多個queue并行消費;
二、順序消息的原理
在默認的情況下,消息發(fā)送會采取Round Robin輪詢方式把消息發(fā)送到不同的queue;而消費消息的時候從多個queue上拉取消息,這種情況發(fā)送和消費是不能保證順序的。但是如果控制發(fā)送的順序消息只依次發(fā)送到同一個queue中,消費的時候只從這個queue上依次拉取,則就保證了順序。當(dāng)發(fā)送和消費參與的queue只有一個,則是全局有序;如果多個queue參與,則為分區(qū)有序,即相對每個queue,消息都是有序的。
三、全局順序消息
前面介紹到,全局順序消息的話,我們需要將所有消息都發(fā)送到同一個隊列,然后消費者端也訂閱同一個隊列,這樣就能實現(xiàn)順序消費消息的功能。下面通過一個示例說明如何實現(xiàn)全局順序消息。
(1)、生產(chǎn)者發(fā)送消息
public class OrderMQProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException, ExecutionException { // 創(chuàng)建DefaultMQProducer類并設(shè)定生產(chǎn)者名稱 DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test"); // 設(shè)置NameServer地址,如果是集群的話,使用分號;分隔開 mqProducer.setNamesrvAddr("10.0.90.86:9876"); // 啟動消息生產(chǎn)者 mqProducer.start(); for (int i = 0; i < 5; i++) { // 創(chuàng)建消息,并指定Topic(主題),Tag(標簽)和消息內(nèi)容 Message message = new Message("GLOBAL_ORDER_TOPIC", "", ("全局有序消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 實現(xiàn)MessageQueueSelector,重寫select方法,保證消息都進入同一個隊列 // send方法的第一個參數(shù): 需要發(fā)送的消息Message // send方法的第二個參數(shù): 消息隊列選擇器MessageQueueSelector // send方法的第三個參數(shù): 消息將要進入的隊列下標,這里我們指定消息都發(fā)送到下標為1的隊列 SendResult sendResult = mqProducer.send(message, new MessageQueueSelector() { @Override // select方法第一個參數(shù): 指該Topic下有的隊列集合 // 第二個參數(shù): 發(fā)送的消息 // 第三個參數(shù): 消息將要進入的隊列下標,它與send方法的第三個參數(shù)相同 public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return mqs.get((Integer) arg); } }, 1); System.out.println("sendResult = " + sendResult); } // 如果不再發(fā)送消息,關(guān)閉Producer實例 mqProducer.shutdown(); } }
(2)、消費者消費消息
public class OrderMQConsumer { public static void main(String[] args) throws MQClientException { // 創(chuàng)建DefaultMQPushConsumer類并設(shè)定消費者名稱 DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test"); // 設(shè)置NameServer地址,如果是集群的話,使用分號;分隔開 mqPushConsumer.setNamesrvAddr("10.0.90.86:9876"); // 設(shè)置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 // 如果不是第一次啟動,那么按照上次消費的位置繼續(xù)消費 mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息,如果訂閱該主題下的所有tag,則使用* mqPushConsumer.subscribe("GLOBAL_ORDER_TOPIC", "*"); /** * 與普通消費一樣需要注冊消息監(jiān)聽器,但是傳入的不再是MessageListenerConcurrently * 而是需要傳入MessageListenerOrderly的實現(xiàn)子類,并重寫consumeMessage方法。 */ // 順序消費同一個隊列的消息 mqPushConsumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(false); for (MessageExt msg : msgs) { System.out.println("消費線程=" + Thread.currentThread().getName() + ", queueId=" + msg.getQueueId() + ", 消息內(nèi)容:" + new String(msg.getBody())); } // 標記該消息已經(jīng)被成功消費 return ConsumeOrderlyStatus.SUCCESS; } }); // 啟動消費者實例 mqPushConsumer.start(); } }
(3)、啟動生產(chǎn)者
如下,可看到,消息成功發(fā)送到Broker中,并且可以看到,5條消息選擇的queueId都是1。
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71A70000, offsetMsgId=0A005A5600002A9F00000000000076AB, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=0] sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71D20001, offsetMsgId=0A005A5600002A9F000000000000776B, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=1] sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71D80002, offsetMsgId=0A005A5600002A9F000000000000782B, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=2] sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71DE0003, offsetMsgId=0A005A5600002A9F00000000000078EB, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=3] sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71E60004, offsetMsgId=0A005A5600002A9F00000000000079AB, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=4]
(4)、啟動消費者
如下,可看到,消費者也是按照發(fā)送消息的順序消費消息。
消費線程=ConsumeMessageThread_1, queueId=1, 消息內(nèi)容:全局有序消息0
消費線程=ConsumeMessageThread_1, queueId=1, 消息內(nèi)容:全局有序消息1
消費線程=ConsumeMessageThread_1, queueId=1, 消息內(nèi)容:全局有序消息2
消費線程=ConsumeMessageThread_1, queueId=1, 消息內(nèi)容:全局有序消息3
消費線程=ConsumeMessageThread_1, queueId=1, 消息內(nèi)容:全局有序消息4
四、局部順序消息
下面用訂單進行分區(qū)有序的示例。一個訂單創(chuàng)建完成后,訂單的狀態(tài)流轉(zhuǎn)大概是:【訂單創(chuàng)建 -> 訂單支付 -> 訂單完成】,我們在創(chuàng)建MessageQueueSelector消息隊列選擇器的時候,需要根據(jù)業(yè)務(wù)唯一標識自定義隊列選擇算法,如本例中則可以使用orderId訂單號去選擇隊列。這樣的話,訂單號相同的消息會被先后發(fā)送到同一個隊列中,消費時,同一個OrderId獲取到的肯定是同一個隊列。
大體過程如下圖:
(1)、生產(chǎn)者發(fā)送消息
public class OrderMQProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException, ExecutionException { // 創(chuàng)建DefaultMQProducer類并設(shè)定生產(chǎn)者名稱 DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test"); // 設(shè)置NameServer地址,如果是集群的話,使用分號;分隔開 mqProducer.setNamesrvAddr("10.0.90.86:9876"); // 啟動消息生產(chǎn)者 mqProducer.start(); List<Order> orderList = getOrderList(); for (int i = 0; i < orderList.size(); i++) { String body = "【" + orderList.get(i) + "】訂單狀態(tài)變更消息"; // 創(chuàng)建消息,并指定Topic(主題),Tag(標簽)和消息內(nèi)容 Message msg = new Message("ORDER_STATUS_CHANGE", "", body.getBytes(RemotingHelper.DEFAULT_CHARSET)); // MessageQueueSelector: 消息隊列選擇器,根據(jù)業(yè)務(wù)唯一標識自定義隊列選擇算法 /** * msg:消息對象 * selector:消息隊列的選擇器 * arg:選擇隊列的業(yè)務(wù)標識,如本例中的orderId */ SendResult sendResult = mqProducer.send(msg, new MessageQueueSelector() { /** * @param mqs 隊列集合 * @param msg 消息對象 * @param arg 業(yè)務(wù)標識的參數(shù),對應(yīng)send()方法傳入的第三個參數(shù)arg * @return */ @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //根據(jù)arg(實際上是訂單id)選擇消息發(fā)送的隊列 long index = (Long) arg % mqs.size(); return mqs.get((int) index); } //mqProducer.send()方法第三個參數(shù), 會傳遞到select()方法的arg參數(shù) }, orderList.get(i).getOrderId()); System.out.println(String.format("消息發(fā)送狀態(tài):%s, orderId:%s, queueId:%d, body:%s", sendResult.getSendStatus(), orderList.get(i).getOrderId(), sendResult.getMessageQueue().getQueueId(), body)); } // 如果不再發(fā)送消息,關(guān)閉Producer實例 mqProducer.shutdown(); } /** * 訂單狀態(tài)變更流程: ORDER_CREATE(訂單創(chuàng)建) -> ORDER_PAYED(訂單已支付) -> ORDER_COMPLETE(訂單完成) */ public static List<Order> getOrderList() { List<Order> orderList = new ArrayList<>(); Order orderDemo = new Order(); orderDemo.setOrderId(1L); orderDemo.setOrderStatus("ORDER_CREATE"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(2L); orderDemo.setOrderStatus("ORDER_CREATE"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(1L); orderDemo.setOrderStatus("ORDER_PAYED"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(2L); orderDemo.setOrderStatus("ORDER_PAYED"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(2L); orderDemo.setOrderStatus("ORDER_COMPLETE"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(3L); orderDemo.setOrderStatus("ORDER_CREATE"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(4L); orderDemo.setOrderStatus("ORDER_CREATE"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(3L); orderDemo.setOrderStatus("ORDER_PAYED"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(1L); orderDemo.setOrderStatus("ORDER_COMPLETE"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(3L); orderDemo.setOrderStatus("ORDER_COMPLETE"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(4L); orderDemo.setOrderStatus("ORDER_PAYED"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(4L); orderDemo.setOrderStatus("ORDER_COMPLETE"); orderList.add(orderDemo); return orderList; } } public class Order implements Serializable { /** * 訂單ID */ private Long orderId; /** * 訂單狀態(tài) */ private String orderStatus; public Long getOrderId() { return orderId; } public void setOrderId(Long orderId) { this.orderId = orderId; } public String getOrderStatus() { return orderStatus; } public void setOrderStatus(String orderStatus) { this.orderStatus = orderStatus; } @Override public String toString() { return "Order{" + "orderId=" + orderId + ", orderStatus='" + orderStatus + '\'' + '}'; } }
(2)、消費者消費消息
public class OrderMQConsumer { public static void main(String[] args) throws MQClientException { // 創(chuàng)建DefaultMQPushConsumer類并設(shè)定消費者名稱 DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test"); // 設(shè)置NameServer地址,如果是集群的話,使用分號;分隔開 mqPushConsumer.setNamesrvAddr("10.0.90.86:9876"); // 設(shè)置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 // 如果不是第一次啟動,那么按照上次消費的位置繼續(xù)消費 mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息,如果訂閱該主題下的所有tag,則使用* mqPushConsumer.subscribe("ORDER_STATUS_CHANGE", "*"); // 注冊回調(diào)實現(xiàn)類來處理從broker拉取回來的消息 // 注意:順序消息注冊的是MessageListenerOrderly監(jiān)聽器 mqPushConsumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext consumeOrderlyContext) { consumeOrderlyContext.setAutoCommit(true); for (MessageExt msg : msgList) { // 每個queue有唯一的consume線程來消費, 訂單對每個queue都是分區(qū)有序 System.out.println("消費線程=" + Thread.currentThread().getName() + ", queueId=" + msg.getQueueId() + ", 消息內(nèi)容:" + new String(msg.getBody())); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } // 標記該消息已經(jīng)被成功消費 return ConsumeOrderlyStatus.SUCCESS; } }); // 啟動消費者實例 mqPushConsumer.start(); } }
(3)、啟動生產(chǎn)者
消息發(fā)送狀態(tài):SEND_OK, orderId:1, queueId:1, body:【Order{orderId=1, orderStatus='ORDER_CREATE'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:2, queueId:2, body:【Order{orderId=2, orderStatus='ORDER_CREATE'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:1, queueId:1, body:【Order{orderId=1, orderStatus='ORDER_PAYED'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:2, queueId:2, body:【Order{orderId=2, orderStatus='ORDER_PAYED'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:2, queueId:2, body:【Order{orderId=2, orderStatus='ORDER_COMPLETE'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:3, queueId:3, body:【Order{orderId=3, orderStatus='ORDER_CREATE'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:4, queueId:0, body:【Order{orderId=4, orderStatus='ORDER_CREATE'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:3, queueId:3, body:【Order{orderId=3, orderStatus='ORDER_PAYED'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:1, queueId:1, body:【Order{orderId=1, orderStatus='ORDER_COMPLETE'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:3, queueId:3, body:【Order{orderId=3, orderStatus='ORDER_COMPLETE'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:4, queueId:0, body:【Order{orderId=4, orderStatus='ORDER_PAYED'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:4, queueId:0, body:【Order{orderId=4, orderStatus='ORDER_COMPLETE'}】訂單狀態(tài)變更消息
(4)、啟動消費者
消費線程=ConsumeMessageThread_2, queueId=1, 消息內(nèi)容:【Order{orderId=1, orderStatus='ORDER_CREATE'}】訂單狀態(tài)變更消息
消費線程=ConsumeMessageThread_1, queueId=2, 消息內(nèi)容:【Order{orderId=2, orderStatus='ORDER_CREATE'}】訂單狀態(tài)變更消息
消費線程=ConsumeMessageThread_3, queueId=3, 消息內(nèi)容:【Order{orderId=3, orderStatus='ORDER_CREATE'}】訂單狀態(tài)變更消息
消費線程=ConsumeMessageThread_4, queueId=0, 消息內(nèi)容:【Order{orderId=4, orderStatus='ORDER_CREATE'}】訂單狀態(tài)變更消息
消費線程=ConsumeMessageThread_2, queueId=1, 消息內(nèi)容:【Order{orderId=1, orderStatus='ORDER_PAYED'}】訂單狀態(tài)變更消息
消費線程=ConsumeMessageThread_1, queueId=2, 消息內(nèi)容:【Order{orderId=2, orderStatus='ORDER_PAYED'}】訂單狀態(tài)變更消息
消費線程=ConsumeMessageThread_3, queueId=3, 消息內(nèi)容:【Order{orderId=3, orderStatus='ORDER_PAYED'}】訂單狀態(tài)變更消息
消費線程=ConsumeMessageThread_4, queueId=0, 消息內(nèi)容:【Order{orderId=4, orderStatus='ORDER_PAYED'}】訂單狀態(tài)變更消息
消費線程=ConsumeMessageThread_1, queueId=2, 消息內(nèi)容:【Order{orderId=2, orderStatus='ORDER_COMPLETE'}】訂單狀態(tài)變更消息
消費線程=ConsumeMessageThread_3, queueId=3, 消息內(nèi)容:【Order{orderId=3, orderStatus='ORDER_COMPLETE'}】訂單狀態(tài)變更消息
消費線程=ConsumeMessageThread_2, queueId=1, 消息內(nèi)容:【Order{orderId=1, orderStatus='ORDER_COMPLETE'}】訂單狀態(tài)變更消息
消費線程=ConsumeMessageThread_4, queueId=0, 消息內(nèi)容:【Order{orderId=4, orderStatus='ORDER_COMPLETE'}】訂單狀態(tài)變更消息
從上面的結(jié)果,我們可以看出來實現(xiàn)了分區(qū)有序,即一個線程只完成唯一標識的訂單消息。
五、順序消息缺陷
- 消費順序消息的并行度依賴于隊列的數(shù)量 ;
- 隊列熱點問題,個別隊列由于哈希不均導(dǎo)致消息過多,消費速度跟不上,產(chǎn)生消息堆積問題;
- 遇到消息失敗的消息,無法跳過,當(dāng)前隊列消費暫停;
到此這篇關(guān)于RocketMQ順序消息的原理與特點的文章就介紹到這了,更多相關(guān)RocketMQ順序消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中基于Nacos實現(xiàn)Sentinel規(guī)則持久化詳解
這篇文章主要介紹了Java中基于Nacos實現(xiàn)Sentinel規(guī)則持久化詳解,Sentinel Dashboard中添加的規(guī)則數(shù)據(jù)存儲在內(nèi)存,微服務(wù)停掉規(guī)則數(shù)據(jù)就消失,在?產(chǎn)環(huán)境下不合適,我們可以將Sentinel規(guī)則數(shù)據(jù)持久化到Nacos配置中?,讓微服務(wù)從Nacos獲取規(guī)則數(shù)據(jù),需要的朋友可以參考下2023-09-09java數(shù)組算法例題代碼詳解(冒泡排序,選擇排序,找最大值、最小值,添加、刪除元素等)
這篇文章主要介紹了java數(shù)組算法例題代碼詳解(冒泡排序,選擇排序,找最大值、最小值,添加、刪除元素等),本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-05-05基于Java語言在窗體上實現(xiàn)飛機大戰(zhàn)小游戲的完整步驟
這篇文章主要給大家介紹了基于Java語言在窗體上實現(xiàn)飛機大戰(zhàn)小游戲的完整步驟,文中通過圖文以及實例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2022-02-02