RocketMQ-延遲消息的處理流程介紹
概述
RocketMQ 支持發(fā)送延遲消息,但不支持任意時間的延遲消息的設置,僅支持內置預設值的延遲時間間隔的延遲消息;
預設值的延遲時間間隔為:
1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
在消息創(chuàng)建的時候,調用 setDelayTimeLevel(int level) 方法設置延遲時間;
broker在接收到延遲消息的時候會把對應延遲級別的消息先存儲到對應的延遲隊列中,等延遲消息時間到達時,會把消息重新存儲到對應的topic的queue里面。
Broker處理延遲消息
CommitLog.putMessage()
//獲取消息的sysflag final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); //非事務消息 或 已commit事務消息 if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery 判斷消息是否設置延遲 if (msg.getDelayTimeLevel() > 0) { //判斷延遲級別是否大于最大級別,如果大于最大值,則將延遲級別設置為最大級 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } //延遲消息的topic為 SCHEDULE_TOPIC_XXXX topic = ScheduleMessageService.SCHEDULE_TOPIC; //獲取延遲級別,一個延遲級別對應一個Queue queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId //消息原始的topic,queueid保存到消息的property中 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } }
1、判斷消息類型,如果是非事務消息、已commit事務消息,才能處理延遲消息
2、判斷消息是否設置延遲級別,如果延遲級別大于0,則該消息為延遲消息
3、判斷延遲級別是否大于最大級別,如果大于最大值,則將延遲級別設置為最大級
4、延遲消息的topic為 SCHEDULE_TOPIC_XXXX
5、獲取延遲級別,一個延遲級別對應一個Queue
6、消息原始的topic,queueid保存到消息的property中
7、修改消息的topci、queueid
啟動延遲消息定時任務
ScheduleMessageService.start()
延遲消息投遞
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。