亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

RocketMQ保證消息的有序性的案例分享

 更新時(shí)間:2024年04月25日 08:50:22   作者:一只愛擼貓的程序猿  
Apache RocketMQ 是一個(gè)常用的開源消息中間件,它提供了強(qiáng)大的有序消息處理能力,這里我們會(huì)探討 RocketMQ 是如何保證消息的有序性的,包括其設(shè)計(jì)原理和相關(guān)的源碼實(shí)現(xiàn),需要的朋友可以參考下

在分布式系統(tǒng)中,消息隊(duì)列(MQ)的有序性是一個(gè)重要的特性,尤其是在需要保證事件順序執(zhí)行的業(yè)務(wù)場(chǎng)景下。Apache RocketMQ 是一個(gè)常用的開源消息中間件,它提供了強(qiáng)大的有序消息處理能力。這里我們會(huì)探討 RocketMQ 是如何保證消息的有序性的,包括其設(shè)計(jì)原理和相關(guān)的源碼實(shí)現(xiàn)。

簡(jiǎn)單原理

RocketMQ 有序消息的基本概念

RocketMQ 保證有序性的主要方法是通過順序消息來實(shí)現(xiàn)的。在 RocketMQ 中,順序消息分為全局順序和分區(qū)順序兩種:

  • 全局順序:指的是消息全局范圍內(nèi)的有序,也就是在所有的消息中,都是按照發(fā)送的順序來消費(fèi)。
  • 分區(qū)順序:指的是在同一個(gè)隊(duì)列(Queue)中的消息是有序的,而不同隊(duì)列間的消息并不保證有序。

RocketMQ 默認(rèn)使用分區(qū)順序,通過將同一個(gè) topic 下的消息分到同一個(gè)隊(duì)列(queue)中,來保證隊(duì)列內(nèi)的消息有序。

RocketMQ 有序消息的實(shí)現(xiàn)機(jī)制

消息發(fā)送

在發(fā)送端,RocketMQ 通過確保生產(chǎn)者向同一個(gè)隊(duì)列(Queue)發(fā)送消息來保證消息的有序性。生產(chǎn)者在發(fā)送消息時(shí)可以指定消息的 keys 或者其他屬性,RocketMQ 通過這些屬性計(jì)算消息應(yīng)該發(fā)送到哪個(gè)隊(duì)列。

源碼示例(偽代碼):

public class Producer {
    public void sendMessages(List<Message> messages) {
        for (Message msg : messages) {
            int queueId = this.calculateQueueId(msg);
            msg.setQueueId(queueId);
            this.sendMessageToQueue(msg, queueId);
        }
    }

    private int calculateQueueId(Message msg) {
        // 使用 hash 算法基于 message key 計(jì)算隊(duì)列 ID
        return Math.abs(msg.getKey().hashCode()) % this.queueSize;
    }
}

消息消費(fèi)

在消費(fèi)端,RocketMQ 使用單線程消費(fèi)模式來保證同一個(gè)隊(duì)列的消息順序性。消費(fèi)者會(huì)固定分配到某個(gè)隊(duì)列,而且是單線程從該隊(duì)列拉取并處理消息,從而保證消息的有序處理。

源碼示例(偽代碼):

public class Consumer {
    public void consume() {
        while (true) {
            Message msg = this.pullMessage();
            this.processMessage(msg);
        }
    }
}

簡(jiǎn)單案例

在Spring Boot中使用RocketMQ來保證消息的隊(duì)列順序性,我們需要配置RocketMQ的客戶端和服務(wù)器端以支持順序消息。以下是一個(gè)基于RocketMQ和Spring Boot實(shí)現(xiàn)的消息順序發(fā)送和消費(fèi)的例子。這個(gè)場(chǎng)景假設(shè)我們需要在一個(gè)電商系統(tǒng)中處理訂單狀態(tài)更新,訂單狀態(tài)更新必須按照順序來處理,以避免狀態(tài)不一致。

步驟1: 添加依賴

首先,確保你的pom.xml中加入了RocketMQ的Spring Boot Starter依賴。

<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.1.0</version>
    </dependency>
</dependencies>

步驟2: 配置RocketMQ

application.ymlapplication.properties中配置RocketMQ的基礎(chǔ)屬性:

rocketmq:
  name-server: 127.0.0.1:9876  # 修改為你的NameServer地址
  producer:
    group: order_producer_group
    send-message-timeout: 3000
  consumer:
    group: order_consumer_group
    consume-thread-min: 1
    consume-thread-max: 1

步驟3: 生產(chǎn)者配置

創(chuàng)建一個(gè)生產(chǎn)者服務(wù),這個(gè)服務(wù)將訂單狀態(tài)更新作為順序消息發(fā)送。

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderStatusProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendOrderStatusUpdate(String orderStatus, String orderId) {
        // 使用訂單ID作為key保證同一個(gè)訂單的更新在同一個(gè)隊(duì)列
        SendResult result = rocketMQTemplate.syncSendOrderly("order-topic", orderStatus, orderId);
        System.out.println("Message sent, result: " + result.getSendStatus());
    }
}

步驟4: 消費(fèi)者配置

創(chuàng)建一個(gè)消費(fèi)者服務(wù),這個(gè)服務(wù)將按順序消費(fèi)訂單狀態(tài)更新。

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order_consumer_group", consumeMode = ConsumeMode.ORDERLY)
public class OrderStatusConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received order update: " + message);
        // 處理訂單更新邏輯
        processOrderUpdate(message);
    }

    private void processOrderUpdate(String status) {
        // 實(shí)現(xiàn)訂單更新處理邏輯
        System.out.println("Processing order status update: " + status);
    }
}

步驟5: 測(cè)試消息順序

你可以通過編寫一個(gè)簡(jiǎn)單的測(cè)試來發(fā)送多個(gè)消息,并觀察消費(fèi)者是否按順序接收它們。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class OrderStatusTestRunner implements CommandLineRunner {

    @Autowired
    private OrderStatusProducer producer;

    @Override
    public void run(String... args) throws Exception {
        producer.sendOrderStatusUpdate("Order Created", "OrderId123");
        producer.sendOrderStatusUpdate("Payment Received", "OrderId123");
        producer.sendOrderStatusUpdate("Shipped", "OrderId123");
        producer.sendOrderStatusUpdate("Delivered", "OrderId123");
    }
}

通過這個(gè)設(shè)置,RocketMQ 和 Spring Boot 能夠保證同一個(gè)訂單的不同狀態(tài)更新是按照發(fā)送順序被處理的。這對(duì)于需要順序一致性的業(yè)務(wù)邏輯是非常重要的。

以上就是RocketMQ保證消息的有序性的案例分享的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ消息有序性的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 實(shí)例分析java開啟線程的方法

    實(shí)例分析java開啟線程的方法

    在本文里我們通過實(shí)例給大家講解了JAVA開啟線程的方法和相關(guān)知識(shí)點(diǎn),需要的朋友們跟著學(xué)習(xí)下。
    2019-03-03
  • Mybatis中Mapper映射文件使用詳解

    Mybatis中Mapper映射文件使用詳解

    這篇文章主要介紹了Mybatis中Mapper映射文件使用詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-06-06
  • Eclipse+Java+Swing+Mysql實(shí)現(xiàn)電影購(gòu)票系統(tǒng)(詳細(xì)代碼)

    Eclipse+Java+Swing+Mysql實(shí)現(xiàn)電影購(gòu)票系統(tǒng)(詳細(xì)代碼)

    這篇文章主要介紹了Eclipse+Java+Swing+Mysql實(shí)現(xiàn)電影購(gòu)票系統(tǒng)并附詳細(xì)的代碼詳解,需要的小伙伴可以參考一下
    2022-01-01
  • 聊聊Java的switch為什么不支持long

    聊聊Java的switch為什么不支持long

    這篇文章主要介紹了Java的switch為什么不支持long,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • Java實(shí)現(xiàn)的DES加密解密工具類實(shí)例

    Java實(shí)現(xiàn)的DES加密解密工具類實(shí)例

    這篇文章主要介紹了Java實(shí)現(xiàn)的DES加密解密工具類,結(jié)合具體實(shí)例形式分析了Java實(shí)現(xiàn)的DES加密解密工具類定義與使用方法,需要的朋友可以參考下
    2017-09-09
  • 新版idea創(chuàng)建spring boot項(xiàng)目的詳細(xì)教程

    新版idea創(chuàng)建spring boot項(xiàng)目的詳細(xì)教程

    這篇文章給大家介紹了新版idea創(chuàng)建spring boot項(xiàng)目的詳細(xì)教程,本教程對(duì)新手小白友好,若根據(jù)教程創(chuàng)建出現(xiàn)問題導(dǎo)致失敗可下載我提供的源碼,在文章最后,本教程較新,文中通過圖文給大家介紹的非常詳細(xì),感興趣的朋友可以參考下
    2024-01-01
  • java swing實(shí)現(xiàn)電影購(gòu)票系統(tǒng)

    java swing實(shí)現(xiàn)電影購(gòu)票系統(tǒng)

    這篇文章主要為大家詳細(xì)介紹了java swing實(shí)現(xiàn)電影購(gòu)票系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-01-01
  • 詳解SpringSecurity中的Authentication信息與登錄流程

    詳解SpringSecurity中的Authentication信息與登錄流程

    這篇文章主要介紹了SpringSecurity中的Authentication信息與登錄流程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-09-09
  • idea配置maven環(huán)境時(shí)maven下載速度慢的解決方法

    idea配置maven環(huán)境時(shí)maven下載速度慢的解決方法

    我們?cè)趇dea配置maven環(huán)境的時(shí)候會(huì)發(fā)現(xiàn)maven更新慢的現(xiàn)象,解決辦法就是下載國(guó)內(nèi)的鏡像包,完美解決下載速度慢的問題,文中有詳細(xì)的具體操作方法,并通過圖文介紹的非常詳細(xì),需要的朋友可以參考下
    2024-02-02
  • SpringBoot + Mybatis Plus 整合 Redis的詳細(xì)步驟

    SpringBoot + Mybatis Plus 整合 Redis的

    文章詳細(xì)介紹了Redis在用戶管理系統(tǒng)中的應(yīng)用,包括用戶信息緩存、Token存儲(chǔ)、接口限流、重復(fù)提交攔截和熱點(diǎn)數(shù)據(jù)預(yù)加載等場(chǎng)景,并提供了具體的實(shí)現(xiàn)方案和步驟,感興趣的朋友一起看看吧
    2025-03-03

最新評(píng)論