亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

一文吃透消息隊列RocketMQ實現(xiàn)消費冪等原理

 更新時間:2024年01月15日 09:30:59   作者:勇哥Java實戰(zhàn)  
這篇文章主要介紹了消息隊列RocketMQ實現(xiàn)消費冪等的全面講解,幫助大家吃透RocketMQ消息隊列消費冪等,更好的的應(yīng)用與工作實踐,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

基礎(chǔ)概念

這篇文章,我們聊聊消息隊列中非常重要的最佳實踐之一:消費冪等。

消費冪等是指:當(dāng)出現(xiàn) RocketMQ 消費者對某條消息重復(fù)消費的情況時,重復(fù)消費的結(jié)果與消費一次的結(jié)果是相同的,并且多次消費并未對業(yè)務(wù)系統(tǒng)產(chǎn)生任何負面影響。

例如,在支付場景下,消費者消費扣款消息,對一筆訂單執(zhí)行扣款操作,扣款金額為100元。

如果因網(wǎng)絡(luò)不穩(wěn)定等原因?qū)е驴劭钕⒅貜?fù)投遞,消費者重復(fù)消費了該扣款消息,但最終的業(yè)務(wù)結(jié)果是只扣款一次,扣費100元,且用戶的扣款記錄中對應(yīng)的訂單只有一條扣款流水,不會多次扣除費用。那么這次扣款操作是符合要求的,整個消費過程實現(xiàn)了消費冪等。

適用場景

RocketMQ 消息重復(fù)的場景如下:

  • 發(fā)送時消息重復(fù)

    當(dāng)一條消息已被成功發(fā)送到服務(wù)端并完成持久化,此時出現(xiàn)了網(wǎng)絡(luò)閃斷或者客戶端宕機,導(dǎo)致服務(wù)端對客戶端應(yīng)答失敗。

    如果此時生產(chǎn)者意識到消息發(fā)送失敗并嘗試再次發(fā)送消息,消費者后續(xù)會收到兩條內(nèi)容相同但 Message ID 不同的消息。

  • 投遞時消息重復(fù)

    消息消費的場景下,消息已投遞到消費者并完成業(yè)務(wù)處理,當(dāng)客戶端給服務(wù)端反饋應(yīng)答的時候網(wǎng)絡(luò)閃斷。為了保證消息至少被消費一次,Broker 服務(wù)端將在網(wǎng)絡(luò)恢復(fù)后再次嘗試投遞之前已被處理過的消息,消費者后續(xù)會收到兩條內(nèi)容相同并且 Message ID 也相同的消息。

  • 負載均衡時消息重復(fù)(包括但不限于網(wǎng)絡(luò)抖動、Broker 重啟以及消費者應(yīng)用重啟)

    Broker 端或客戶端重啟、擴容或縮容時,會觸發(fā) Rebalance ,此時消費者可能會收到少量重復(fù)消息。

業(yè)務(wù)唯一標識

因為不同的 Message ID 對應(yīng)的消息內(nèi)容可能相同,有可能出現(xiàn)沖突(重復(fù))的情況,所以真正安全的冪等處理,不建議以 Message ID 作為處理依據(jù)。

最好的方式是以業(yè)務(wù)唯一標識作為冪等處理的關(guān)鍵依據(jù),消息必須攜帶業(yè)務(wù)唯一標識

消息攜帶業(yè)務(wù)唯一標識一般來講有兩種方式:

  • 消息 Key 存放業(yè)務(wù)唯一標識
Message msg = new Message(TOPIC /* Topic */,
             TAG /* Tag */,
               ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
             );
message.setKey("ORDERID_100"); // 訂單編號
SendResult sendResult = producer.send(message);      
  • 消息 body 存放業(yè)務(wù)唯一標識
Message msg = new Message(TOPIC /* Topic */,
             TAG /* Tag */,
               (JSON.toJSONString(orderDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
             );
message.setKey("ORDERID_100"); // 訂單編號
SendResult sendResult = producer.send(message);      

消費者收到消息時,從消息中獲取訂單號來實現(xiàn)消息冪等 :

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt message : msgs) {
            // 方法1: 根據(jù)業(yè)務(wù)唯一標識的Key做冪等處理
            String orderId = message.getKeys();
            // 方法2: 從消息body體重解析出訂單號
            String orderJSON = new String(messageExt.getBody(), "UTF-8");
            OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
            String orderId = orderPO.getId();
            // TODO 業(yè)務(wù)處理邏輯
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

冪等策略

1 業(yè)務(wù)狀態(tài)機判斷

為了保證冪等,一定要做業(yè)務(wù)邏輯判斷,筆者認為這是保證冪等的首要條件。

筆者曾經(jīng)服務(wù)于神州專車,乘客在用戶端點擊立即叫車,訂單服務(wù)創(chuàng)建訂單,首先保存到數(shù)據(jù)庫后,然后將訂單信息同步保存到緩存中。

在訂單的載客生命周期里,訂單的修改操作先修改緩存,然后發(fā)送消息到> MetaQ ,訂單落盤服務(wù)消費消息,并判斷訂單信息是否正常(比如有無亂序),若訂單數(shù)據(jù)無誤,則存儲到數(shù)據(jù)庫中。

訂單狀態(tài)機按順序分別是:創(chuàng)建、已分配司機司機已出發(fā)、司機已到達、司機已接到乘客已到達。

這種設(shè)計是為了快速提升系統(tǒng)性能,由于網(wǎng)絡(luò)問題有非常小的概率,消費者會收到亂序的消息。

當(dāng)訂單狀態(tài)是司機已到達時,消費者可能會收到司機已出發(fā)的消息,也就是先發(fā)的消息因為網(wǎng)絡(luò)原因被延遲消費了。

此時,消費者需要判斷當(dāng)前的專車訂單狀態(tài)機,保存最合理的訂單數(shù)據(jù),就可以忽略舊的消息,打印相關(guān)日志即可。

2 全局處理標識

1 數(shù)據(jù)庫去重表

數(shù)據(jù)庫去重表有兩個要點 :

  • 操作之前先從去重表中通過唯一業(yè)務(wù)標識查詢記錄是否存在,若不存在,則進行后續(xù)消費流程 ;
  • 為了避免并發(fā)場景,去重表需要包含業(yè)務(wù)唯一鍵 uniqueKey , 這樣就算并發(fā)插入也不可能插入多條,插入失敗后,拋異常。

舉一個電商場景的例子:用戶購物車結(jié)算時,系統(tǒng)會創(chuàng)建支付訂單。用戶支付成功后支付訂單的狀態(tài)會由未支付修改為支付成功,然后系統(tǒng)給用戶增加積分。

我們可以使用 RocketMQ 事務(wù)消息的方案,該方案能夠發(fā)揮 MQ 的優(yōu)勢:異步解耦,以及事務(wù)的最終一致性的特性。

在消費監(jiān)聽器邏輯里,冪等非常重要 。積分表 SQL 如下:

CREATE TABLE `t_points` (
  `id` bigint(20) NOT NULL COMMENT '主鍵',
  `user_id` bigint(20) NOT NULL COMMENT '用戶id',
  `order_id` bigint(20) NOT NULL COMMENT '訂單編號',
  `points` int(4) NOT NULL COMMENT '積分',
  `remarks` varchar(128) COLLATE utf8mb4_bin NOT NULL COMMENT '備注',
  `create_time` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `unique_order_Id` (`order_id`) USING BTREE COMMENT '訂單唯一'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

當(dāng)收到訂單信息后,首先判斷該訂單是否有積分記錄,若沒有記錄,才插入積分記錄。

就算出現(xiàn)極端并發(fā)場景下,訂單編號也是唯一鍵,數(shù)據(jù)庫中也必然不會存在相同訂單的多條積分記錄。

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    try {
        for (MessageExt messageExt : msgs) {
            String orderJSON = new String(messageExt.getBody(), "UTF-8");
            logger.info("orderJSON:" + orderJSON);
            OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
            // 首先查詢是否處理完成
            PointsPO pointsPO = pointsMapper.getByOrderId(orderPO.getId());
            if (pointsPO == null) {
                Long id = SnowFlakeIdGenerator.getUniqueId(1023, 0);
                pointsPO = new PointsPO();
                pointsPO.setId(id);
                pointsPO.setOrderId(orderPO.getId());
                pointsPO.setUserId(orderPO.getUserId());
                // 添加積分數(shù) 30
                pointsPO.setPoints(30);
                pointsPO.setCreateTime(new Date());
                pointsPO.setRemarks("添加積分數(shù) 30");
                pointsMapper.insert(pointsPO);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        logger.error("consumeMessage error: ", e);
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
}

2 Redis處理標志位

在消費者接收到消息后,首先判斷 Redis 中是否存在該業(yè)務(wù)主鍵的標志位,若存在標志位,則認為消費成功,否則,則執(zhí)行業(yè)務(wù)邏輯,執(zhí)行完成后,在緩存中添加標志位。

public ConsumeConcurrentlyStatus consumeMessage(List&lt;MessageExt&gt; msgs, ConsumeConcurrentlyContext context) {
    try {
        for (MessageExt messageExt : msgs) {
           String bizKey = messageExt.getKeys(); // 唯一業(yè)務(wù)主鍵
           //1. 判斷是否存在標志
           if(redisTemplate.hasKey(RedisKeyConstants.WAITING_SEND_LOCK + bizKey)) {
         			continue;
       		 }
         	 //2. 執(zhí)行業(yè)務(wù)邏輯
           //TODO do business
           //3. 設(shè)置標志位
           redisTemplate.opsForValue().set(RedisKeyConstants.WAITING_SEND_LOCK + bizKey, "1", 72, TimeUnit.HOURS);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        logger.error("consumeMessage error: ", e);
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
}

3 分布式鎖

僅僅有業(yè)務(wù)邏輯判斷是不夠的,為了應(yīng)對并發(fā)場景,我們可以使用分布式鎖

分布式鎖一般有三種方案:

  • 數(shù)據(jù)庫樂觀鎖
  • 數(shù)據(jù)庫悲觀鎖
  • Redis 鎖

1 數(shù)據(jù)庫樂觀鎖

數(shù)據(jù)樂觀鎖假設(shè)認為數(shù)據(jù)一般情況下不會造成沖突,所以在數(shù)據(jù)進行提交更新的時候,才會正式對數(shù)據(jù)的沖突與否進行檢測,如果發(fā)現(xiàn)沖突了,則讓返回用戶錯誤的信息,讓用戶決定如何去做。

由于樂觀鎖沒有了鎖等待,提高了吞吐量,所以樂觀鎖適合讀多寫少的場景。

實現(xiàn)樂觀鎖:一般是在數(shù)據(jù)表中加上一個數(shù)據(jù)版本號 version 字段,表示數(shù)據(jù)被修改的次數(shù),當(dāng)數(shù)據(jù)被修改時,version 值會加一。

當(dāng)線程 A 要更新數(shù)據(jù)值時,在讀取數(shù)據(jù)的同時也會讀取version值,在提交更新時,若剛才讀取到的 version 值為當(dāng)前數(shù)據(jù)庫中的 version 值相等時才更新,否則重試更新操作,直到更新成功。

步驟 1 : 查詢出條目數(shù)據(jù)

select version from my_table where id = #{id}

步驟 2 :修改條目數(shù)據(jù),傳遞版本參數(shù)

update  my_table set n = n + 1, version = version + 1 where id=#{id} and version = #{version};

從樂觀鎖的實現(xiàn)角度來講,樂觀鎖非常容易實現(xiàn),但它有兩個缺點:

  • 對業(yè)務(wù)的侵入性,添加版本字段;
  • 高并發(fā)場景下,只有一個線程可以修改成功,那么就會存在大量的失敗 。

消費端演示代碼如下:

public ConsumeConcurrentlyStatus consumeMessage(List&lt;MessageExt&gt; msgs, ConsumeConcurrentlyContext context) {
    try {
        for (MessageExt messageExt : msgs) {
           String orderJSON = new String(messageExt.getBody(), "UTF-8");
           OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
           Long version = orderMapper.selectVersionByOrderId(orderPO.getId()); //版本
           orderPO.setVersion(version);
           // 對應(yīng) SQL:update t_order t set version = version + 1 , status = #{status} where id = #{id} 
           // and version = #{version}
           int affectedCount = orderMapper.updateOrder(orderPO);
           if(affectedCount == 0) {
              return ConsumeConcurrentlyStatus.RECONSUME_LATER;
           }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        logger.error("consumeMessage error: ", e);
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
}

2 數(shù)據(jù)庫悲觀鎖

當(dāng)我們要對一個數(shù)據(jù)庫中的一條數(shù)據(jù)進行修改的時候,為了避免同時被其他人修改,最好的辦法就是直接對該數(shù)據(jù)進行加鎖以防止并發(fā)。

這種借助數(shù)據(jù)庫鎖機制在修改數(shù)據(jù)之前先鎖定,再修改的方式被稱之為悲觀并發(fā)控制(又名“悲觀鎖”,Pessimistic Concurrency Control,縮寫“PCC”)。

之所以叫做悲觀鎖,是因為這是一種對數(shù)據(jù)的修改抱有悲觀態(tài)度的并發(fā)控制方式。我們一般認為數(shù)據(jù)被并發(fā)修改的概率比較大,所以需要在修改之前先加鎖。

悲觀并發(fā)控制實際上是**“先取鎖再訪問”的保守策略**,為數(shù)據(jù)處理的安全提供了保證。

MySQL 悲觀鎖的使用方法如下:

begin;

-- 讀取數(shù)據(jù)并加鎖
select ... for update;

-- 修改數(shù)據(jù)
update ...;

commit;

例如,以下代碼將讀取 t_order 表中 id 為 1 的記錄,并將該記錄的 status 字段修改為 3

begin;

select * from t_order where id = 1 for update;

update t_order set status = '3' where id = 1;

commit;

如果 t_order 表中 id 為 1 的記錄正在被其他事務(wù)修改,則上述代碼會等待該記錄被釋放鎖后才能繼續(xù)執(zhí)行。

消費端演示代碼如下:

public ConsumeConcurrentlyStatus consumeMessage(List&lt;MessageExt&gt; msgs, ConsumeConcurrentlyContext context) {
    try {
        for (MessageExt messageExt : msgs) {
           String orderJSON = new String(messageExt.getBody(), "UTF-8");
           OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
           Long orderId = orderPo.getId();
           //調(diào)用service的修改訂單信息,該方法事務(wù)加鎖, 當(dāng)修改訂單記錄時,該其他線程會等待該記錄被釋放才能繼續(xù)執(zhí)行
           orderService.updateOrderForUpdate(orderId ,orderPO);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        logger.error("consumeMessage error: ", e);
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
}

3 Redis鎖

使用數(shù)據(jù)庫鎖是非常重的一個操作,我們可以使用更輕量級的 Redis 鎖來替換,因為 Redis 性能高,同時有非常豐富的生態(tài)(類庫)支持不同類型的分布式鎖。

我們選擇 Redisson 框架提供的分布式鎖功能,簡化的示例代碼如下:

public ConsumeConcurrentlyStatus consumeMessage(List&lt;MessageExt&gt; msgs, ConsumeConcurrentlyContext context) {
    try {
        for (MessageExt messageExt : msgs) {
           String orderJSON = new String(messageExt.getBody(), "UTF-8");
           OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
           Long orderId = orderPo.getId();
           RLock lock = redissonClient.getLock("order-lock-" + orderId);
           rLock.lock(10, TimeUnit.SECONDS);
           // TODO 業(yè)務(wù)邏輯
           rLock.unlock();
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        logger.error("consumeMessage error: ", e);
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
}

總結(jié)

這篇文章,我們詳細剖析了如何實現(xiàn) RocketMQ 消費冪等。

1、消費冪等:當(dāng)出現(xiàn) RocketMQ 消費者對某條消息重復(fù)消費的情況時,重復(fù)消費的結(jié)果與消費一次的結(jié)果是相同的,并且多次消費并未對業(yè)務(wù)系統(tǒng)產(chǎn)生任何負面影響。

2、適用場景:發(fā)送時消息重復(fù)、投遞時消息重復(fù)、負載均衡時消息重復(fù)

3、業(yè)務(wù)唯一標識:以業(yè)務(wù)唯一標識作為冪等處理的關(guān)鍵依據(jù),消息必須攜帶業(yè)務(wù)唯一標識。

4、冪等策略:業(yè)務(wù)邏輯代碼中需要判斷業(yè)務(wù)狀態(tài)機,同時根據(jù)實際條件選擇全局處理標識分布式鎖兩種方式處理。

以上就是一文吃透消息隊列RocketMQ實現(xiàn)消費冪等的詳細內(nèi)容,更多關(guān)于消息隊列RocketMQ消費冪等的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • spring boot使用thymeleaf模板的方法詳解

    spring boot使用thymeleaf模板的方法詳解

    thymeleaf 是新一代的模板引擎,在spring4.0中推薦使用thymeleaf來做前端模版引擎。下面這篇文章主要給大家介紹了關(guān)于spring boot使用thymeleaf模板的方法,文中通過示例代碼介紹的非常詳細,需要的朋友們下面來一起看看吧。
    2017-07-07
  • java string類的常用方法詳細介紹

    java string類的常用方法詳細介紹

    在開發(fā)過程中經(jīng)常會使用到j(luò)ava string類的方法,本文將以此問題進行詳細介紹
    2012-11-11
  • 淺析對Spring?aware接口理解

    淺析對Spring?aware接口理解

    通過aware接口可以獲取Spring容器相關(guān)信息,但這樣會與Spring容器耦合,這篇文章主要介紹了Spring?aware接口理解,需要的朋友可以參考下
    2022-08-08
  • 原生Java操作兔子隊列RabbitMQ

    原生Java操作兔子隊列RabbitMQ

    這篇文章主要介紹了原生Java操作兔子隊列RabbitMQ,MQ全稱為Message?Queue,即消息隊列,“消息隊列”是在消息的傳輸過程中保存消息的容器,需要的朋友可以參考下
    2023-05-05
  • SpringBoot文件上傳同時接收復(fù)雜參數(shù)的過程詳解

    SpringBoot文件上傳同時接收復(fù)雜參數(shù)的過程詳解

    這篇文章主要介紹了SpringBoot文件上傳同時,接收復(fù)雜參數(shù),本文通過示例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-12-12
  • JVM知識總結(jié)之垃圾收集算法

    JVM知識總結(jié)之垃圾收集算法

    本博客為讀書筆記,讀的是《深入理解Java虛擬機》一書,在看這個書的時候,最大的一個感受便是“當(dāng)初怎么就沒有好好學(xué)習(xí)操作系統(tǒng)呢,不然也不會有這么多看的云里霧里的地方了”,不過那都是過去的事了,學(xué)習(xí)最好的時刻便是現(xiàn)在,需要的朋友可以參考下
    2021-06-06
  • IDEA中make directory as的作用及說明

    IDEA中make directory as的作用及說明

    這篇文章主要介紹了IDEA中make directory as的作用及說明,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-09-09
  • java交換排序之雞尾酒排序?qū)崿F(xiàn)方法

    java交換排序之雞尾酒排序?qū)崿F(xiàn)方法

    這篇文章主要介紹了java交換排序之雞尾酒排序?qū)崿F(xiàn)方法,實例分析了排序的原理與相關(guān)的實現(xiàn)技巧,需要的朋友可以參考下
    2015-02-02
  • SpringBoot整合MongoDB實現(xiàn)文檔存儲功能

    SpringBoot整合MongoDB實現(xiàn)文檔存儲功能

    MongoDB是可以應(yīng)用于各種規(guī)模的企業(yè)、各個行業(yè)以及各類應(yīng)用程序的開源數(shù)據(jù)庫,本文將結(jié)合MongoDB和SpringBoot實現(xiàn)文檔存儲功能,需要的可以參考下
    2024-12-12
  • maven的5種打包方式小結(jié)

    maven的5種打包方式小結(jié)

    本文主要介紹了maven的5種打包方式小結(jié),主要是幾種插件打包,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-06-06

最新評論