亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

SpringBoot+RocketMQ實現(xiàn)延遲消息的示例代碼

 更新時間:2025年10月24日 11:08:29   作者:匆匆忙忙游刃有余  
本文主要介紹了SpringBoot+RocketMQ實現(xiàn)延遲消息案例詳解,包括基于延遲級別和基于具體時間兩種方式的完整實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧

下面將詳細介紹如何在SpringBoot中使用RocketMQ實現(xiàn)延遲消息,包括基于延遲級別和基于具體時間兩種方式的完整實現(xiàn)。

一、延遲消息概述

RocketMQ提供了兩種類型的延遲消息機制:

  1. 延遲消息:消息發(fā)送后延遲指定的時間長度再被消費
  2. 定時消息:消息在指定的具體時間點被消費

這兩種機制在訂單超時取消、會議提醒、定時任務調度等場景中有廣泛應用。

二、環(huán)境準備

1. 添加Maven依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

2. 配置文件設置

application.yml中配置RocketMQ連接信息:

rocketmq:
  name-server: localhost:9876
  producer:
    group: delay-message-producer-group

三、延遲級別機制實現(xiàn)

1. 默認延遲級別

RocketMQ默認提供18個延遲級別,定義在MessageStoreConfig類中:

messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"

對應關系:

  • level=1: 延遲1秒
  • level=2: 延遲5秒
  • level=3: 延遲10秒
  • level=4: 延遲30秒
  • level=5: 延遲1分鐘
  • level=6: 延遲2分鐘
  • ...以此類推
  • level=18: 延遲2小時

2. 基于延遲級別的生產者實現(xiàn)

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class DelayLevelProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 發(fā)送基于延遲級別的消息
     * @param topic 主題
     * @param tag 標簽
     * @param message 消息內容
     * @param delayLevel 延遲級別(1-18)
     */
    public void sendMessageByDelayLevel(String topic, String tag, String message, int delayLevel) {
        // 創(chuàng)建消息
        Message<String> springMessage = MessageBuilder.withPayload(message).build();
        
        // 發(fā)送延遲消息
        SendResult sendResult = rocketMQTemplate.syncSend(
            topic + ":" + tag, 
            springMessage, 
            3000, // 超時時間
            delayLevel // 延遲級別
        );
        
        System.out.println("延遲級別消息發(fā)送成功: " + sendResult);
    }
    
    /**
     * 發(fā)送訂單超時取消消息(延遲15分鐘)
     */
    public void sendOrderTimeoutMessage(String orderId) {
        String message = "訂單超時取消: " + orderId;
        // 15分鐘對應level=14(根據默認配置)
        sendMessageByDelayLevel("OrderTopic", "Timeout", message, 14);
    }
}

四、基于具體時間的延遲消息實現(xiàn)

1. 定時消息生產者

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class ScheduledMessageProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 發(fā)送延遲指定毫秒數的消息
     */
    public void sendMessageWithDelayMs(String topic, String message, long delayMs) {
        // 計算投遞時間
        long deliverTimeMs = System.currentTimeMillis() + delayMs;
        
        // 創(chuàng)建消息并設置投遞時間
        Message<String> springMessage = MessageBuilder.withPayload(message)
            .setHeader(MessageConst.PROPERTY_DELAY_TIME_MS, String.valueOf(delayMs))
            .setHeader(MessageConst.PROPERTY_TIMER_DELIVER_MS, String.valueOf(deliverTimeMs))
            .build();
        
        SendResult sendResult = rocketMQTemplate.syncSend(topic, springMessage);
        System.out.println("延遲毫秒消息發(fā)送成功: " + sendResult);
    }
    
    /**
     * 發(fā)送指定時間點投遞的消息
     */
    public void sendMessageAtTime(String topic, String message, Date deliverTime) {
        long deliverTimeMs = deliverTime.getTime();
        
        // 創(chuàng)建消息并設置投遞時間
        Message<String> springMessage = MessageBuilder.withPayload(message)
            .setHeader(MessageConst.PROPERTY_TIMER_DELIVER_MS, String.valueOf(deliverTimeMs))
            .build();
        
        SendResult sendResult = rocketMQTemplate.syncSend(topic, springMessage);
        System.out.println("定時投遞消息發(fā)送成功: " + sendResult);
    }
    
    /**
     * 發(fā)送10秒后投遞的消息
     */
    public void sendTenSecondsLaterMessage(String topic, String message) {
        sendMessageWithDelayMs(topic, message, 10000L);
    }
}

五、消費者實現(xiàn)

延遲消息的消費者與普通消息消費者相同,無需特殊配置:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

@Component
@RocketMQMessageListener(
    topic = "OrderTopic",
    consumerGroup = "delay-message-consumer-group",
    selectorExpression = "Timeout"
)
public class OrderTimeoutConsumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String message) {
        String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        System.out.println("[" + now + "] 接收到訂單超時消息: " + message);
        
        // 處理訂單取消邏輯
        processOrderCancellation(message);
    }
    
    private void processOrderCancellation(String message) {
        // 提取訂單ID
        String orderId = message.substring(message.indexOf(":") + 2);
        System.out.println("執(zhí)行訂單取消操作,訂單ID: " + orderId);
        // 這里可以調用訂單服務進行取消操作
    }
}

六、Controller層實現(xiàn)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.web.bind.annotation.*;

import java.util.Date;

@RestController
@RequestMapping("/api/delay")
public class DelayMessageController {
    
    @Autowired
    private DelayLevelProducer delayLevelProducer;
    
    @Autowired
    private ScheduledMessageProducer scheduledMessageProducer;
    
    /**
     * 發(fā)送基于延遲級別的消息
     */
    @PostMapping("/level")
    public String sendByDelayLevel(
            @RequestParam String topic,
            @RequestParam String tag,
            @RequestParam String message,
            @RequestParam(defaultValue = "3") int delayLevel) {
        
        delayLevelProducer.sendMessageByDelayLevel(topic, tag, message, delayLevel);
        return "延遲級別消息發(fā)送成功,延遲級別: " + delayLevel;
    }
    
    /**
     * 發(fā)送訂單超時取消消息
     */
    @PostMapping("/order/timeout")
    public String sendOrderTimeout(@RequestParam String orderId) {
        delayLevelProducer.sendOrderTimeoutMessage(orderId);
        return "訂單超時取消消息已發(fā)送,訂單ID: " + orderId;
    }
    
    /**
     * 發(fā)送延遲指定毫秒的消息
     */
    @PostMapping("/milliseconds")
    public String sendByDelayMs(
            @RequestParam String topic,
            @RequestParam String message,
            @RequestParam long delayMs) {
        
        scheduledMessageProducer.sendMessageWithDelayMs(topic, message, delayMs);
        return "延遲毫秒消息發(fā)送成功,延遲: " + delayMs + "ms";
    }
    
    /**
     * 發(fā)送指定時間點的消息
     */
    @PostMapping("/scheduled")
    public String sendScheduled(
            @RequestParam String topic,
            @RequestParam String message,
            @RequestParam @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Date deliverTime) {
        
        scheduledMessageProducer.sendMessageAtTime(topic, message, deliverTime);
        return "定時消息發(fā)送成功,投遞時間: " + deliverTime;
    }
}

七、自定義延遲級別配置

在Broker的配置文件中可以自定義延遲級別:

# 在broker.conf文件中添加
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 3h 4h 5h

重啟Broker使其生效。注意,修改延遲級別后,所有使用延遲級別的消息都會使用新的配置。

八、兩種實現(xiàn)方式對比

特性基于延遲級別基于具體時間
靈活性較低,只能使用預定義級別高,可以精確到毫秒
適用版本全版本支持RocketMQ 5.x及以上版本完整支持
使用場景固定延遲時間的場景需要精確控制投遞時間的場景
配置復雜度簡單,無需額外配置可能需要在Broker端開啟相關功能

九、使用注意事項

  1. 延遲精度

    • 延遲消息的投遞時間不是完全精確的,有一定誤差
    • 在高并發(fā)場景下,誤差可能會增大
  2. 版本兼容性

    • 基于具體時間的延遲消息在RocketMQ 5.x版本支持更完善
    • 在低版本中可能需要使用延遲級別機制
  3. 性能考慮

    • 大量延遲消息可能會增加Broker的負擔
    • 對于長時間延遲的消息,考慮使用其他方案(如定時任務+消息隊列組合)
  4. 消息可靠性

    • 延遲消息同樣支持持久化,確保Broker重啟后不會丟失
    • 建議開啟消息確認機制確保消息可靠投遞

十、測試示例

  1. 發(fā)送訂單超時取消消息(延遲15分鐘):

    POST /api/delay/order/timeout?orderId=ORDER123456
    
  2. 發(fā)送10秒后投遞的消息:

    POST /api/delay/milliseconds?topic=TestTopic&message=HelloDelay&delayMs=10000
    
  3. 發(fā)送指定時間點的消息:

    POST /api/delay/scheduled?topic=TestTopic&message=HelloScheduled&deliverTime=2024-12-25%2000:00:00
    

通過以上配置和代碼,您可以在SpringBoot項目中輕松實現(xiàn)基于RocketMQ的延遲消息功能,滿足各種定時任務和延遲處理的業(yè)務需求。

到此這篇關于SpringBoot+RocketMQ實現(xiàn)延遲消息的示例代碼的文章就介紹到這了,更多相關SpringBoot RocketMQ 延遲內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 關于Java?中?Future?的?get?方法超時問題

    關于Java?中?Future?的?get?方法超時問題

    這篇文章主要介紹了Java?中?Future?的?get?方法超時,最常見的理解就是,“超時以后,當前線程繼續(xù)執(zhí)行,線程池里的對應線程中斷”,真的是這樣嗎?本文給大家詳細介紹,需要的朋友參考下吧
    2022-06-06
  • RabbitMQ工作模式中的發(fā)布確認模式示例詳解

    RabbitMQ工作模式中的發(fā)布確認模式示例詳解

    發(fā)布確認模式用于確保消息已經被正確地發(fā)送到RabbitMQ服務器,并被成功接收和持久化,本文通過實例代碼給大家介紹RabbitMQ工作模式之發(fā)布確認模式,感興趣的朋友一起看看吧
    2025-05-05
  • mybatis中返回主鍵一直為1的問題

    mybatis中返回主鍵一直為1的問題

    這篇文章主要介紹了mybatis中返回主鍵一直為1的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • 詳解JAVA的封裝

    詳解JAVA的封裝

    Java面向對象的三大特性:封裝、繼承、多態(tài)。下面對三大特性之一封裝進行了總結,需要的朋友可以參考下
    2017-04-04
  • 定時任務@Scheduled用法及其參數使用

    定時任務@Scheduled用法及其參數使用

    這篇文章主要介紹了定時任務@Scheduled用法及其參數使用,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-08-08
  • 關于Struts2文件上傳與自定義攔截器

    關于Struts2文件上傳與自定義攔截器

    本篇文章,小編將為大家介紹關于Struts2文件上傳與自定義攔截器,有需要的朋友可以參考一下
    2013-04-04
  • java實現(xiàn)計算器加法小程序(圖形化界面)

    java實現(xiàn)計算器加法小程序(圖形化界面)

    這篇文章主要介紹了Java實現(xiàn)圖形化界面的計算器加法小程序,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2020-05-05
  • Java包裝類之自動裝箱與拆箱

    Java包裝類之自動裝箱與拆箱

    這篇文章主要介紹了Java包裝類之自動裝箱與拆箱,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-09-09
  • springboot如何使用MybatisPlus

    springboot如何使用MybatisPlus

    MyBatisPlus是一個強大的數據庫操作框架,其代碼生成器可以快速生成實體類、映射文件等,本文介紹了如何導入MyBatisPlus相關依賴,創(chuàng)建代碼生成器,并配置數據庫信息以逆向生成代碼,感興趣的朋友跟隨小編一起看看吧
    2024-09-09
  • MyBatis-Plus 使用枚舉自動關聯(lián)注入

    MyBatis-Plus 使用枚舉自動關聯(lián)注入

    本文主要介紹了MyBatis-Plus 使用枚舉自動關聯(lián)注入,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2021-06-06

最新評論