SpringBoot+RocketMQ實現(xiàn)延遲消息的示例代碼
下面將詳細介紹如何在SpringBoot中使用RocketMQ實現(xiàn)延遲消息,包括基于延遲級別和基于具體時間兩種方式的完整實現(xiàn)。
一、延遲消息概述
RocketMQ提供了兩種類型的延遲消息機制:
- 延遲消息:消息發(fā)送后延遲指定的時間長度再被消費
- 定時消息:消息在指定的具體時間點被消費
這兩種機制在訂單超時取消、會議提醒、定時任務調度等場景中有廣泛應用。
二、環(huán)境準備
1. 添加Maven依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
2. 配置文件設置
在application.yml中配置RocketMQ連接信息:
rocketmq:
name-server: localhost:9876
producer:
group: delay-message-producer-group
三、延遲級別機制實現(xiàn)
1. 默認延遲級別
RocketMQ默認提供18個延遲級別,定義在MessageStoreConfig類中:
messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
對應關系:
- level=1: 延遲1秒
- level=2: 延遲5秒
- level=3: 延遲10秒
- level=4: 延遲30秒
- level=5: 延遲1分鐘
- level=6: 延遲2分鐘
- ...以此類推
- level=18: 延遲2小時
2. 基于延遲級別的生產者實現(xiàn)
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class DelayLevelProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 發(fā)送基于延遲級別的消息
* @param topic 主題
* @param tag 標簽
* @param message 消息內容
* @param delayLevel 延遲級別(1-18)
*/
public void sendMessageByDelayLevel(String topic, String tag, String message, int delayLevel) {
// 創(chuàng)建消息
Message<String> springMessage = MessageBuilder.withPayload(message).build();
// 發(fā)送延遲消息
SendResult sendResult = rocketMQTemplate.syncSend(
topic + ":" + tag,
springMessage,
3000, // 超時時間
delayLevel // 延遲級別
);
System.out.println("延遲級別消息發(fā)送成功: " + sendResult);
}
/**
* 發(fā)送訂單超時取消消息(延遲15分鐘)
*/
public void sendOrderTimeoutMessage(String orderId) {
String message = "訂單超時取消: " + orderId;
// 15分鐘對應level=14(根據默認配置)
sendMessageByDelayLevel("OrderTopic", "Timeout", message, 14);
}
}
四、基于具體時間的延遲消息實現(xiàn)
1. 定時消息生產者
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class ScheduledMessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 發(fā)送延遲指定毫秒數的消息
*/
public void sendMessageWithDelayMs(String topic, String message, long delayMs) {
// 計算投遞時間
long deliverTimeMs = System.currentTimeMillis() + delayMs;
// 創(chuàng)建消息并設置投遞時間
Message<String> springMessage = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_DELAY_TIME_MS, String.valueOf(delayMs))
.setHeader(MessageConst.PROPERTY_TIMER_DELIVER_MS, String.valueOf(deliverTimeMs))
.build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, springMessage);
System.out.println("延遲毫秒消息發(fā)送成功: " + sendResult);
}
/**
* 發(fā)送指定時間點投遞的消息
*/
public void sendMessageAtTime(String topic, String message, Date deliverTime) {
long deliverTimeMs = deliverTime.getTime();
// 創(chuàng)建消息并設置投遞時間
Message<String> springMessage = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_TIMER_DELIVER_MS, String.valueOf(deliverTimeMs))
.build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, springMessage);
System.out.println("定時投遞消息發(fā)送成功: " + sendResult);
}
/**
* 發(fā)送10秒后投遞的消息
*/
public void sendTenSecondsLaterMessage(String topic, String message) {
sendMessageWithDelayMs(topic, message, 10000L);
}
}
五、消費者實現(xiàn)
延遲消息的消費者與普通消息消費者相同,無需特殊配置:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Component
@RocketMQMessageListener(
topic = "OrderTopic",
consumerGroup = "delay-message-consumer-group",
selectorExpression = "Timeout"
)
public class OrderTimeoutConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
System.out.println("[" + now + "] 接收到訂單超時消息: " + message);
// 處理訂單取消邏輯
processOrderCancellation(message);
}
private void processOrderCancellation(String message) {
// 提取訂單ID
String orderId = message.substring(message.indexOf(":") + 2);
System.out.println("執(zhí)行訂單取消操作,訂單ID: " + orderId);
// 這里可以調用訂單服務進行取消操作
}
}
六、Controller層實現(xiàn)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.web.bind.annotation.*;
import java.util.Date;
@RestController
@RequestMapping("/api/delay")
public class DelayMessageController {
@Autowired
private DelayLevelProducer delayLevelProducer;
@Autowired
private ScheduledMessageProducer scheduledMessageProducer;
/**
* 發(fā)送基于延遲級別的消息
*/
@PostMapping("/level")
public String sendByDelayLevel(
@RequestParam String topic,
@RequestParam String tag,
@RequestParam String message,
@RequestParam(defaultValue = "3") int delayLevel) {
delayLevelProducer.sendMessageByDelayLevel(topic, tag, message, delayLevel);
return "延遲級別消息發(fā)送成功,延遲級別: " + delayLevel;
}
/**
* 發(fā)送訂單超時取消消息
*/
@PostMapping("/order/timeout")
public String sendOrderTimeout(@RequestParam String orderId) {
delayLevelProducer.sendOrderTimeoutMessage(orderId);
return "訂單超時取消消息已發(fā)送,訂單ID: " + orderId;
}
/**
* 發(fā)送延遲指定毫秒的消息
*/
@PostMapping("/milliseconds")
public String sendByDelayMs(
@RequestParam String topic,
@RequestParam String message,
@RequestParam long delayMs) {
scheduledMessageProducer.sendMessageWithDelayMs(topic, message, delayMs);
return "延遲毫秒消息發(fā)送成功,延遲: " + delayMs + "ms";
}
/**
* 發(fā)送指定時間點的消息
*/
@PostMapping("/scheduled")
public String sendScheduled(
@RequestParam String topic,
@RequestParam String message,
@RequestParam @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Date deliverTime) {
scheduledMessageProducer.sendMessageAtTime(topic, message, deliverTime);
return "定時消息發(fā)送成功,投遞時間: " + deliverTime;
}
}
七、自定義延遲級別配置
在Broker的配置文件中可以自定義延遲級別:
# 在broker.conf文件中添加 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 3h 4h 5h
重啟Broker使其生效。注意,修改延遲級別后,所有使用延遲級別的消息都會使用新的配置。
八、兩種實現(xiàn)方式對比
| 特性 | 基于延遲級別 | 基于具體時間 |
|---|---|---|
| 靈活性 | 較低,只能使用預定義級別 | 高,可以精確到毫秒 |
| 適用版本 | 全版本支持 | RocketMQ 5.x及以上版本完整支持 |
| 使用場景 | 固定延遲時間的場景 | 需要精確控制投遞時間的場景 |
| 配置復雜度 | 簡單,無需額外配置 | 可能需要在Broker端開啟相關功能 |
九、使用注意事項
延遲精度:
- 延遲消息的投遞時間不是完全精確的,有一定誤差
- 在高并發(fā)場景下,誤差可能會增大
版本兼容性:
- 基于具體時間的延遲消息在RocketMQ 5.x版本支持更完善
- 在低版本中可能需要使用延遲級別機制
性能考慮:
- 大量延遲消息可能會增加Broker的負擔
- 對于長時間延遲的消息,考慮使用其他方案(如定時任務+消息隊列組合)
消息可靠性:
- 延遲消息同樣支持持久化,確保Broker重啟后不會丟失
- 建議開啟消息確認機制確保消息可靠投遞
十、測試示例
發(fā)送訂單超時取消消息(延遲15分鐘):
POST /api/delay/order/timeout?orderId=ORDER123456
發(fā)送10秒后投遞的消息:
POST /api/delay/milliseconds?topic=TestTopic&message=HelloDelay&delayMs=10000
發(fā)送指定時間點的消息:
POST /api/delay/scheduled?topic=TestTopic&message=HelloScheduled&deliverTime=2024-12-25%2000:00:00
通過以上配置和代碼,您可以在SpringBoot項目中輕松實現(xiàn)基于RocketMQ的延遲消息功能,滿足各種定時任務和延遲處理的業(yè)務需求。
到此這篇關于SpringBoot+RocketMQ實現(xiàn)延遲消息的示例代碼的文章就介紹到這了,更多相關SpringBoot RocketMQ 延遲內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

