亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Java中RocketMQ使用方法詳解

 更新時間:2025年02月24日 09:18:28   作者:莫凡的博客  
這篇文章主要介紹了RocketMQ和Kafka在SpringBoot中的使用方法,以及如何保證消息隊列的順序性、可靠性以及冪等性,文中通過代碼介紹的非常詳細,需要的朋友可以參考下

在 Spring Boot 中,RocketMQ 和 Kafka 都是常用的消息中間件,它們的使用方法有一些相似之處,也有各自的特點。

一、RocketMQ 在 Spring Boot 中的使用

  • 引入依賴

    • 在項目的pom.xml文件中添加 RocketMQ 的依賴。
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.3</version>
    </dependency>
    
  • 配置 RocketMQ

    • application.propertiesapplication.yml文件中配置 RocketMQ 的相關參數(shù),如 namesrvAddr(NameServer 地址)等。
    rocketmq.name-server=127.0.0.1:9876
    
  • 生產(chǎn)者

    • 創(chuàng)建一個生產(chǎn)者類,使用@Resource注入RocketMQTemplate。
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class RocketMQProducer {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        public void sendMessage(String topic, String message) {
            rocketMQTemplate.convertAndSend(topic, message);
        }
    }
    
  • 消費者

    • 創(chuàng)建一個消費者類,使用@RocketMQMessageListener注解指定監(jiān)聽的主題和消費組。
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
    public class RocketMQConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            // 處理接收到的消息
            System.out.println("Received message: " + message);
        }
    }
    

二、Kafka 在 Spring Boot 中的使用

  • 引入依賴

    • pom.xml文件中添加 Kafka 的依賴。
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.8.12</version>
    </dependency>
    
  • 配置 Kafka

    • application.propertiesapplication.yml文件中配置 Kafka 的相關參數(shù),如 bootstrapServers(Kafka 服務器地址)等。
    spring.kafka.bootstrap-servers=127.0.0.1:9092
    
  • 生產(chǎn)者

    • 創(chuàng)建一個生產(chǎn)者類,使用@Resource注入KafkaTemplate
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    @Component
    public class KafkaProducer {
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        public void sendMessage(String topic, String message) {
            kafkaTemplate.send(topic, message);
        }
    }
    
  • 消費者

    • 創(chuàng)建一個消費者類,使用@KafkaListener注解指定監(jiān)聽的主題和消費組。
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class KafkaConsumer {
        @KafkaListener(topics = "your_topic", groupId = "your_consumer_group")
        public void onMessage(String message) {
            // 處理接收到的消息
            System.out.println("Received message: " + message);
        }
    }
    

總的來說,RocketMQ 和 Kafka 在 Spring Boot 中的使用都比較方便,具體選擇哪種消息中間件可以根據(jù)項目的實際需求來決定。RocketMQ 在一些場景下可能具有高吞吐量、低延遲等優(yōu)勢,而 Kafka 則在大規(guī)模分布式系統(tǒng)中被廣泛應用,具有高可靠性和可擴展性。

三、如何保證消息隊列順序性

1、發(fā)送端保證順序性

  • 合理設計業(yè)務

    • 確保具有順序性要求的消息被發(fā)送到同一個主題(Topic)的同一個隊列(Queue)中。比如,將同一類業(yè)務的消息按照特定規(guī)則進行分類,使得它們都進入相同的隊列。
    • 一個業(yè)務場景的消息盡量由一個發(fā)送端來發(fā)送消息,避免多個發(fā)送端發(fā)送可能導致的亂序。
  • 使用同步發(fā)送

    • 在發(fā)送消息時,使用同步發(fā)送方式send(Message msg, long timeout),確保消息成功發(fā)送后再進行下一個消息的發(fā)送。這樣可以避免異步發(fā)送可能導致的消息亂序情況。

2、消費端保證順序性

  • 單線程消費

    • 消費者在消費消息時,采用單線程的方式進行消費。這樣可以確保同一隊列中的消息按照發(fā)送的順序被依次處理。
    @Component
    @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
    public class RocketMQConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            // 處理接收到的消息
            System.out.println("Received message: " + message);
        }
    }
    

    在實際應用中,可以將消費邏輯放在一個單獨的方法中,然后在這個方法中進行順序處理,確保消息的順序性。

  • 避免并發(fā)處理

    • 確保在消費消息的過程中,不會出現(xiàn)并發(fā)處理的情況。比如,不要在消費消息的同時啟動其他異步任務或者多線程處理,以免破壞消息的順序性。

3、設置隊列數(shù)量

  • 控制隊列數(shù)量
    • 如果業(yè)務對消息順序性要求非常嚴格,可以考慮減少主題下的隊列數(shù)量。通常情況下,一個主題可以包含多個隊列,消息會被隨機分發(fā)到不同的隊列中。如果隊列數(shù)量較少,那么消息更有可能被發(fā)送到同一個隊列中,從而更容易保證順序性。

通過以上方法,可以在一定程度上保證 RocketMQ 消息的順序性。但需要注意的是,保證消息順序性可能會犧牲一定的性能和吞吐量,因此需要根據(jù)實際業(yè)務需求進行權衡和選擇。

四、如何確保消息隊列的可靠性

1、發(fā)送端

  • 同步發(fā)送與確認

    • 使用同步發(fā)送方式send(Message msg, long timeout),該方法會等待消息發(fā)送成功的確認,確保消息被正確地發(fā)送到 Broker。如果發(fā)送失敗或超時,可以進行重試或其他錯誤處理操作。
    try {
        SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
        System.out.println("Message sent successfully: " + sendResult);
    } catch (Exception e) {
        System.out.println("Failed to send message: " + e.getMessage());
        // 進行重試或其他錯誤處理
    }
    
  • 事務消息

    • 對于一些需要保證事務一致性的場景,可以使用 RocketMQ 的事務消息機制。發(fā)送事務消息分為兩個階段,首先發(fā)送半事務消息,然后執(zhí)行本地事務,根據(jù)本地事務的結果決定提交或回滾事務消息。
    @Service
    public class TransactionProducer {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        public void sendTransactionMessage() {
            TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("transactionTopic", new Message<>("transactionMessage"), null);
            System.out.println("Transaction message sent: " + result);
        }
    }
    

2、Broker 端

  • 持久化存儲

    • RocketMQ 支持消息的持久化存儲,可以將消息存儲在磁盤上,以防止消息丟失。通過配置broker.conf文件中的flushDiskType參數(shù),可以選擇同步刷盤或異步刷盤方式。同步刷盤可以保證消息在寫入磁盤后才返回成功響應,但會影響性能;異步刷盤可以提高性能,但在系統(tǒng)故障時可能會丟失部分未刷盤的消息。
  • 高可用部署

    • 部署多主多從的 RocketMQ 集群,當主節(jié)點出現(xiàn)故障時,從節(jié)點可以自動切換為主節(jié)點,保證消息服務的可用性。同時,可以配置主從同步方式,確保消息在主從節(jié)點之間的可靠同步。

3、消費端

  • 消費確認

    • 消費者在成功處理消息后,需要向 Broker 發(fā)送消費確認??梢酝ㄟ^設置consumeModeCONSUME_PASSIVELY(被動消費模式),并在處理完消息后手動調(diào)用acknowledge()方法進行確認。如果消費失敗,可以選擇重試或者將消息發(fā)送到死信隊列進行后續(xù)處理。
    @Component
    @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
    public class RocketMQConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            try {
                // 處理消息
                System.out.println("Received message: " + message);
                // 確認消費成功
                getRocketMQListenerContainer().acknowledge();
            } catch (Exception e) {
                System.out.println("Failed to process message: " + e.getMessage());
                // 可以選擇重試或者發(fā)送到死信隊列
            }
        }
    }
    
  • 重試機制

    • 配置消費者的重試次數(shù)和重試時間間隔,當消費失敗時,RocketMQ 會自動進行重試??梢栽?code>application.properties或application.yml中配置rocketmq.retry.timesrocketmq.retry.interval參數(shù)來控制重試策略。

通過以上措施,可以在不同階段保證 RocketMQ 消息的可靠性,確保消息在生產(chǎn)、存儲和消費過程中不會丟失或出現(xiàn)錯誤。

五、保證消息處理的冪等性

在 RocketMQ 中,可以通過以下幾種方式來保證消息處理的冪等性:

1、業(yè)務層面設計

  • 使用唯一標識

    • 在業(yè)務中為每條消息生成一個唯一的標識,比如使用業(yè)務流水號、訂單號等作為消息的唯一標識。在消費消息時,先根據(jù)這個唯一標識判斷該消息是否已經(jīng)被處理過。如果已經(jīng)處理過,則直接忽略該消息。
    • 例如在電商系統(tǒng)中,訂單創(chuàng)建的消息可以使用訂單號作為唯一標識。消費者在處理消息時,先查詢數(shù)據(jù)庫中是否存在該訂單號對應的處理記錄,如果存在則說明該消息已經(jīng)被處理過,不再重復處理。
    @Service
    public class OrderProcessingService {
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        public void processOrderMessage(String orderId) {
            boolean isProcessed = isOrderProcessed(orderId);
            if (isProcessed) {
                return;
            }
            // 處理訂單邏輯
            System.out.println("Processing order: " + orderId);
            markOrderAsProcessed(orderId);
        }
    
        private boolean isOrderProcessed(String orderId) {
            int count = jdbcTemplate.queryForObject(
                    "SELECT COUNT(*) FROM processed_orders WHERE order_id =?",
                    Integer.class, orderId);
            return count > 0;
        }
    
        private void markOrderAsProcessed(String orderId) {
            jdbcTemplate.update(
                    "INSERT INTO processed_orders (order_id) VALUES (?)",
                    orderId);
        }
    }
    
  • 利用數(shù)據(jù)庫約束

    • 可以在數(shù)據(jù)庫中使用唯一索引、主鍵約束等方式來保證業(yè)務數(shù)據(jù)的唯一性。在處理消息時,如果違反了這些約束,則說明該消息已經(jīng)被處理過,不再重復處理。
    • 比如在用戶注冊的場景中,可以在數(shù)據(jù)庫的用戶表中使用用戶名或郵箱作為唯一索引。當消費用戶注冊的消息時,嘗試插入用戶數(shù)據(jù),如果插入失?。ㄒ驗檫`反唯一索引約束),則說明該用戶已經(jīng)注冊過,不再重復處理。
    @Service
    public class UserRegistrationService {
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        public void registerUser(String username, String password) {
            try {
                jdbcTemplate.update(
                        "INSERT INTO users (username, password) VALUES (?,?)",
                        username, password);
            } catch (DataIntegrityViolationException e) {
                // 處理插入失敗的情況,可能是用戶已存在
                System.out.println("User already exists: " + username);
            }
        }
    }
    

2、技術層面實現(xiàn)

  • 分布式鎖
    • 可以使用分布式鎖來保證同一時間只有一個消費者實例在處理特定的消息。在處理消息之前,先獲取分布式鎖,如果獲取成功則處理消息,處理完成后釋放鎖。如果獲取鎖失敗,則說明該消息正在被其他實例處理,當前實例可以選擇等待或者直接忽略該消息。
    • 可以使用 Redis 或 Zookeeper 等實現(xiàn)分布式鎖。以 Redis 為例,可以使用 SETNX 命令來實現(xiàn)分布式鎖。
    @Service
    public class MessageProcessingService {
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        public void processMessage(String messageId) {
            String lockKey = "message_lock_" + messageId;
            boolean locked = tryLock(lockKey);
            if (!locked) {
                return;
            }
            try {
                boolean isProcessed = isMessageProcessed(messageId);
                if (isProcessed) {
                    return;
                }
                // 處理消息邏輯
                System.out.println("Processing message: " + messageId);
                markMessageAsProcessed(messageId);
            } finally {
                releaseLock(lockKey);
            }
        }
    
        private boolean tryLock(String key) {
            return redisTemplate.opsForValue().setIfAbsent(key, "locked", Duration.ofSeconds(30));
        }
    
        private void releaseLock(String key) {
            redisTemplate.delete(key);
        }
    
        private boolean isMessageProcessed(String messageId) {
            // 判斷消息是否已處理的邏輯
            return false;
        }
    
        private void markMessageAsProcessed(String messageId) {
            // 標記消息已處理的邏輯
        }
    }
    

通過以上方法,可以有效地保證 RocketMQ 消息處理的冪等性,避免因重復消費消息而導致的業(yè)務數(shù)據(jù)不一致問題。

總結

到此這篇關于Java中RocketMQ使用方法的文章就介紹到這了,更多相關Java RocketMQ使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 微信小程序與Java后端接口交互

    微信小程序與Java后端接口交互

    本文主要介紹了微信小程序與Java后端接口交互,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-07-07
  • 解決IDEA報錯,無效的源發(fā)行版 無效的目標發(fā)行版:22問題

    解決IDEA報錯,無效的源發(fā)行版 無效的目標發(fā)行版:22問題

    在項目編譯過程中,可能會出現(xiàn)“無效的源發(fā)行版”或“無效的目標發(fā)行版”的報錯信息,原因通常是編譯使用的JDK版本與項目設置的發(fā)布版本不一致,解決這類問題的辦法是統(tǒng)一JDK版本,具體操作為:在IDE的項目設置中(如File->ProjectStructure->ProjectSettings)
    2024-10-10
  • mybatis一對多方式實現(xiàn)批量插入

    mybatis一對多方式實現(xiàn)批量插入

    這篇文章主要介紹了mybatis一對多方式實現(xiàn)批量插入,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-11-11
  • 501 Command "HELO" requires an argument問題的解決方法

    501 Command "HELO" requires an argument問題的解決方法

    換一個windows服務器,發(fā)現(xiàn)就沒這樣的問題,僅在一臺Linux服務器上可以重現(xiàn),直觀感覺就是這臺Linux服務器某些配置有問題
    2013-08-08
  • Mybatis中特殊SQL的執(zhí)行

    Mybatis中特殊SQL的執(zhí)行

    這篇文章主要介紹了Mybatis中特殊SQL的執(zhí)行,介紹內(nèi)容包括模糊查詢、批量刪除、動態(tài)設置表名、添加功能獲取自增的主鍵等相關資料,需要的小伙伴可以參考一下
    2022-04-04
  • Java去掉字符串最后一個逗號的方法

    Java去掉字符串最后一個逗號的方法

    Java中去掉字符串的最后一個逗號有多種實現(xiàn)方法,不同的方法適用于不同的場景,本文通過實例代碼介紹Java去掉字符串最后一個逗號的相關知識,感興趣的朋友一起看看吧
    2023-12-12
  • Java中ArrayList類詳細介紹

    Java中ArrayList類詳細介紹

    這篇文章主要介紹了Java中ArrayList類詳細介紹的相關資料,需要的朋友可以參考下
    2017-04-04
  • MyBatis-plus中的模糊查詢解讀

    MyBatis-plus中的模糊查詢解讀

    這篇文章主要介紹了MyBatis-plus中的模糊查詢解讀,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-05-05
  • springBoot的日志文件詳解

    springBoot的日志文件詳解

    日志是程序的重要組成部分,主要可以用來定位和排查問題,在程序中進行自定義日志輸出的時候,也通常是借助于SLF4J框架來輸出日志,本文給大家分享springBoot的日志文件相關知識,感興趣的朋友一起看看吧
    2024-06-06
  • SpringBoot中的統(tǒng)一異常處理詳細解析

    SpringBoot中的統(tǒng)一異常處理詳細解析

    這篇文章主要介紹了SpringBoot中的統(tǒng)一異常處理詳細解析,該注解可以把異常處理器應用到所有控制器,而不是單個控制器,借助該注解,我們可以實現(xiàn):在獨立的某個地方,比如單獨一個類,定義一套對各種異常的處理機制,需要的朋友可以參考下
    2024-01-01

最新評論