亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

SpringCloud+RocketMQ實現(xiàn)分布式事務的實踐

 更新時間:2021年10月28日 08:58:48   作者:黃青石  
分布式事務已經成為了我們的經常使用的。所以我們來一步一步的實現(xiàn)基于RocketMQ的分布式事務。感興趣的可以了解一下

  隨著互聯(lián)網(wǎng)公司的微服務越來越多,分布式事務已經成為了我們的經常使用的。所以我們來一步一步的實現(xiàn)基于RocketMQ的分布式事務。接下來,我們將要做的主題寫出來。

  • RocketMQ的分布式事務結構和說明
  • 搭建RocketMQ步驟
  • 事務場景,然后準備工程,運行代碼

一、RocketMQ的分布式事務結構和說明

  我們通過下圖來了解一下RocketMQ實現(xiàn)分布式事務的結構。采用半消息機制實現(xiàn)分布式事務,半消息顧名思義,就是發(fā)送方將消息發(fā)送到MQ中的Broker端,這個消息被標記為“暫不投遞”狀態(tài),這個時間訂閱方是不能收到這個消息的,當發(fā)送方將提交了Commit后,這個消息才可以被訂閱方收到。如果發(fā)送方提交了Rollback,那么訂閱方就不會收到以該消息。

  RocketMQ采用的是最終一致性事務,因為它使用了半消息機制,訂閱方可能收不到消息,但是最終狀態(tài)是能保持一致的。

  我們在講這個流程前,我們需要先看下圖的紅色部分,當發(fā)送方發(fā)給MQ的Commit/Rollback沒有收到的時候,這個時候需要啟用一個線程,從本地事務庫里邊查找到當前的狀態(tài),根據(jù)當前的狀態(tài)來決定是否重新發(fā)送Commit/Rollback,相當于重試。

  接下來,我們說一下整個執(zhí)行的過程。

發(fā)送方將半消息發(fā)送到MQ中,成功后MQ將消息保持好后將結果同步給發(fā)送方,即半消息發(fā)送成功。當MQ告訴發(fā)送方成功后,我們需要記錄下發(fā)送到MQ的相關的事情,比如發(fā)送的消息內容,發(fā)送時間,發(fā)送的相關ID,比如事務ID,都要做好相應記錄。當本地事務庫,也可以理解為日志庫,我們可以針對發(fā)生的問題進行追溯。本地庫記錄好事務后,發(fā)送方就可以將Commit/Rollback提交到MQ了。當MQ收到了Commit命令后,這個時候就會將該條消息發(fā)送給訂閱方。收到了Rollback后,那該條消息不會投遞給訂閱方,消息保存3天后刪除。

  整個過程就是這些,大家如果覺得有遺漏的,可以告訴我。

二、搭建RocketMQ

  我使用的是windos10,所以需要下載windows版本。

   進入到RocketMQ的官方地址:http://rocketmq.apache.org/release_notes/release-notes-4.3.0/

   下載二進制文件壓縮包。如下圖:

  先解壓,然后進行環(huán)境變量配置,打開菜單,直接輸入環(huán)境變量。

    變量名:ROCKETMQ_HOME

    值:解壓的release的路徑,如D:\rocketmq-all-4.3.0-bin-release

  然后打開CMD命令,進入到解壓路徑中的bin目標,進行nameserver啟動。

start mqnameserv.cmd

  啟動后的效果為:

  再進行broker的啟動,啟動需要連到的nameserver為127.0.0.1:9876,打開自動創(chuàng)建Topic,這個時候當用到某個Topic沒有的情況下會自動創(chuàng)建一個。

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

  這個時候Windows的rocketmq就下載好并且啟動成功了。

三、事務場景,然后準備工程,運行代碼

  因為我們使用的RocketMq是使用的分布式事務是最終一致性的,柔性的,所以我們使用的場景也要考慮到。我們用的場景就是下訂單充話費,用戶支付了訂單,那么直接給用戶的進行充值。購買的訂單不會立馬充值到手機的,需要等一會才到賬,這就是最終一致性。

  我們使用的如下代碼版本號,SpringCloud的版本號要和SpringBoot保持一致,否則會出現(xiàn)類找不到的情況。

SpringCloud(Finchley.RELEASE) + SpringBoot2.0.4.RELEASE + RocketMQ4.3 +MySQL + lombok(插件)

  我們使用SpringCloud的幾個組件:

  Euerka Server :用于提供服務注冊的能力和發(fā)現(xiàn)

  Euerka Client : 用于進行服務注冊

  Feign:用于服務間的調用

  Config :用于進行配置

  接下來,我們創(chuàng)建需要進行配置的表。

DROP TABLE IF EXISTS `spring_cloud_config`;

CREATE TABLE `spring_cloud_config` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `akey` varchar(30) DEFAULT NULL,
  `avalue` varchar(128) DEFAULT NULL,
  `application` varchar(30) DEFAULT NULL,
  `aprofile` varchar(30) DEFAULT NULL,
  `label` varchar(30) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;



INSERT INTO `spring_cloud_config` (`id`, `akey`, `avalue`, `application`, `aprofile`, `label`)
VALUES
    (2,'name_server','127.0.0.1:9876','product-service','dev','dev'),
    (3,'name_server','127.0.0.1:9876','order-service','dev','dev'),
    (4,'order_topic','order_topic','order-service','dev','dev'),
    (5,'order_topic','order_topic','product-service','dev','dev');

  我們構建了4個模塊,eureka模塊用于服務注冊和發(fā)現(xiàn),springcloud-config將數(shù)據(jù)庫創(chuàng)建的配置加載供其他工程使用。charge-order-service用于下充值訂單,phone-charge-service用于給手機進行充值業(yè)務。

  主要的流程為,通過charge-order-service產生了訂單,然后將訂單和數(shù)量到phone-charge-service進行二次確認看是否還有足夠的數(shù)量可以使用,如果數(shù)量不夠的話直接將事務進行rollback,這樣消息就不會到消費端。如果下過去的訂單號已經充值過了,那么該消息將會被直接丟掉,這也是消息端的冪等設計。

  接下來,我們看一下生產者以及事務的核心代碼。主要用于連接RocketMQ, 執(zhí)行本地事務,在本地事務中進行二次確認,根據(jù)結果進行Commit、Rollback。當連接RocketMQ出現(xiàn)問題的時候可能會收到UNKNOWN,這個時候會調用checkLocalTransaction()方法,用于檢查是否將消息發(fā)送給RocketMQ了。

package com.hqs.chargeorderservice.mqservice;

import com.alibaba.fastjson.JSONObject;
import com.hqs.chargeorderservice.config.Jms;
import com.hqs.chargeorderservice.service.ProduceOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.*;


/**
 *
 * @Description: 分布式事務RocketMQ 生產者
 */
@Slf4j
@Component
public class TransactionProducer {

    /**
     * 需要自定義事務監(jiān)聽器 用于 事務的二次確認 和 事務回查
     */
    private TransactionListener transactionListener ;

    /**
     * 這里的生產者和之前的不一樣
     */
    private TransactionMQProducer producer = null;

    /**
     * 官方建議自定義線程 給線程取自定義名稱 發(fā)現(xiàn)問題更好排查
     */
    private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }

    });

    public TransactionProducer(@Autowired Jms jms, @Autowired ProduceOrderService produceOrderService) {
        transactionListener = new TransactionListenerImpl(produceOrderService);
        // 初始化 事務生產者
        producer = new TransactionMQProducer(jms.getOrderTopic());
        // 添加服務器地址
        producer.setNamesrvAddr(jms.getNameServer());
        // 添加事務監(jiān)聽器
        producer.setTransactionListener(transactionListener);
        // 添加自定義線程池
        producer.setExecutorService(executorService);

        start();
    }

    public TransactionMQProducer getProducer() {
        return this.producer;
    }

    /**
     * 對象在使用之前必須要調用一次,只能初始化一次
     */
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * 一般在應用上下文,使用上下文監(jiān)聽器,進行關閉
     */
    public void shutdown() {
        this.producer.shutdown();
    }
}

/**
 * @Description: 自定義事務監(jiān)聽器
 */
@Slf4j
class TransactionListenerImpl implements TransactionListener {

    @Autowired
    private ProduceOrderService produceOrderService ;

    public TransactionListenerImpl( ProduceOrderService produceOrderService) {
        this.produceOrderService = produceOrderService;
    }

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info("=========本地事務開始執(zhí)行=============");
        String message = new String(msg.getBody());
        JSONObject jsonObject = JSONObject.parseObject(message);
        Integer productId = jsonObject.getInteger("productId");
        Integer total = jsonObject.getInteger("total");
        Integer tradeNo = jsonObject.getInteger("tradeNo");
        int userId = Integer.parseInt(arg.toString());
        //模擬執(zhí)行本地事務begin=======
        /**
         * 本地事務執(zhí)行會有三種可能
         * 1、commit 成功
         * 2、Rollback 失敗
         * 3、網(wǎng)絡等原因服務宕機收不到返回結果
         */
        log.info("本地事務執(zhí)行參數(shù),用戶id={},商品ID={},銷售庫存={}",userId,productId,total);
        int result = produceOrderService.save(userId, productId, total, tradeNo);
        //模擬執(zhí)行本地事務end========
        //TODO 實際開發(fā)下面不需要我們手動返回,而是根據(jù)本地事務執(zhí)行結果自動返回
        //1、二次確認消息,然后消費者可以消費
        if (result == 0) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        //2、回滾消息,Broker端會刪除半消息
        if (result == 1) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        //3、Broker端會進行回查消息
        if (result == 2) {
            return LocalTransactionState.UNKNOW;
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    /**
     * 只有上面接口返回 LocalTransactionState.UNKNOW 才會調用查接口被調用
     *
     * @param msg 消息
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        log.info("==========回查接口=========");
        String key = msg.getKeys();
        //TODO 1、必須根據(jù)key先去檢查本地事務消息是否完成。
        /**
         * 因為有種情況就是:上面本地事務執(zhí)行成功了,但是return LocalTransactionState.COMMIT_MESSAG的時候
         * 服務掛了,那么最終 Brock還未收到消息的二次確定,還是個半消息 ,所以當重新啟動的時候還是回調這個回調接口。
         * 如果不先查詢上面本地事務的執(zhí)行情況 直接在執(zhí)行本地事務,那么就相當于成功執(zhí)行了兩次本地事務了。
         */
        // TODO 2、這里返回要么commit 要么rollback。沒有必要在返回 UNKNOW
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

  我們啟動一下程序。我們登錄localhost:7001注冊中心,看到3個服務都成功注冊了。

  然后我們進行下訂單調用接口,userId用戶編碼,productId產品號,total購買量,tradeNo交易訂單號

http://localhost:9001/api/v1/order/chargeOrder?userId=1&productId=1&total=1&tradeNo=4

  charge-order-service顯示出來本地執(zhí)行事務的參數(shù)。

  phone-charge-service消費事務執(zhí)行的日志。

  當我們在創(chuàng)建charge-order-service和phone-charge-service的時候一定要注意,將工程里邊的application.properties變?yōu)閎ootstrap.yml,要不然程序啟動的時候會從8888找config的配置內容。因為SpringCloud里面有個“啟動上下文”,主要是用于加載遠端的配置,也就是加載ConfigServer里面的配置,默認加載順序為:加載bootstrap.*里面的配置 --> 鏈接configserver,加載遠程配置 --> 加載application.*里面的配置。

  好了,代碼也放到git地址了,https://github.com/stonehqs/springcloud-rocketmq-transaction,

到此這篇關于SpringCloud+RocketMQ實現(xiàn)分布式事務的實踐的文章就介紹到這了,更多相關SpringCloud RocketMQ分布式事務內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Java 深入分析鏈表面試實例題目

    Java 深入分析鏈表面試實例題目

    鏈表是一種物理存儲單元上非連續(xù)、非順序的存儲結構,數(shù)據(jù)元素的邏輯順序是通過鏈表中的指針鏈接次序實現(xiàn)的,本篇帶你通過兩個實例題目來深入探索
    2022-03-03
  • Java中static的特點

    Java中static的特點

    本文主要介紹了Java中static的特點。具有很好的參考價值。下面跟著小編一起來看下吧
    2017-03-03
  • Java并發(fā)編程之Fork/Join框架詳解

    Java并發(fā)編程之Fork/Join框架詳解

    這篇文章主要介紹了Java并發(fā)編程之Fork/Join框架詳解,Fork/Join框架是Java7提供的一個用于并行執(zhí)行任務的框架,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架,需要的朋友可以參考下
    2023-12-12
  • java Date獲取年月日時分秒的實現(xiàn)方法

    java Date獲取年月日時分秒的實現(xiàn)方法

    下面小編就為大家?guī)硪黄猨ava Date獲取年月日時分秒的實現(xiàn)方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2016-06-06
  • 詳解SpringBoot實現(xiàn)事件同步與異步監(jiān)聽

    詳解SpringBoot實現(xiàn)事件同步與異步監(jiān)聽

    這篇文章主要通過示例為大家詳細介紹了SpringBoot中的事件的用法和原理以及如何實現(xiàn)事件同步與異步監(jiān)聽,快跟隨小編一起學習學習吧
    2022-06-06
  • Map獲取鍵值,Map的幾種遍歷方法總結(推薦)

    Map獲取鍵值,Map的幾種遍歷方法總結(推薦)

    下面小編就為大家?guī)硪黄狹ap獲取鍵值,Map的幾種遍歷方法總結(推薦)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-04-04
  • Java讀取resources中資源文件路徑以及jar中文件無法讀取的解決

    Java讀取resources中資源文件路徑以及jar中文件無法讀取的解決

    這篇文章主要介紹了Java讀取resources中資源文件路徑以及jar中文件無法讀取的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-05-05
  • java 中ArrayList迭代的兩種實現(xiàn)方法

    java 中ArrayList迭代的兩種實現(xiàn)方法

    這篇文章主要介紹了java 中ArrayList迭代的兩種實現(xiàn)方法的相關資料,Iterator與for語句的結合,需要的朋友可以參考下
    2017-09-09
  • Java容器ArrayList知識點總結

    Java容器ArrayList知識點總結

    本篇文章給大家分享了Java容器ArrayList的相關知識點,對此有需要的朋友可以跟著學習參考下。
    2018-05-05
  • Java多線程Thread , Future , Callable , FutureTask的使用

    Java多線程Thread , Future , Callable ,

    本文主要介紹了Java多線程Thread , Future , Callable , FutureTask的使用,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-03-03

最新評論