Spring Boot集成Seata實(shí)現(xiàn)基于AT模式的分布式事務(wù)的解決方案
1.什么是Seata?
Seata 是一款開源的分布式事務(wù)解決方案,致力于提供高性能和簡(jiǎn)單易用的分布式事務(wù)服務(wù)。Seata 將為用戶提供了 AT、TCC、SAGA 和 XA 事務(wù)模式,為用戶打造一站式的分布式解決方案。
AT 模式
前提?
- 基于支持本地 ACID 事務(wù)的關(guān)系型數(shù)據(jù)庫(kù)。
- Java 應(yīng)用,通過(guò) JDBC 訪問(wèn)數(shù)據(jù)庫(kù)。
整體機(jī)制?
兩階段提交協(xié)議的演變:
- 一階段:業(yè)務(wù)數(shù)據(jù)和回滾日志記錄在同一個(gè)本地事務(wù)中提交,釋放本地鎖和連接資源。
- 二階段:
- 提交異步化,非??焖俚赝瓿?。
- 回滾通過(guò)一階段的回滾日志進(jìn)行反向補(bǔ)償。
寫隔離
- 一階段本地事務(wù)提交前,需要確保先拿到 全局鎖 。
- 拿不到 全局鎖 ,不能提交本地事務(wù)。
- 拿 全局鎖 的嘗試被限制在一定范圍內(nèi),超出范圍將放棄,并回滾本地事務(wù),釋放本地鎖。
以一個(gè)示例來(lái)說(shuō)明: 兩個(gè)全局事務(wù) tx1 和 tx2,分別對(duì) a 表的 m 字段進(jìn)行更新操作,m 的初始值 1000。 tx1 先開始,開啟本地事務(wù),拿到本地鎖,更新操作 m = 1000 - 100 = 900。本地事務(wù)提交前,先拿到該記錄的 全局鎖 ,本地提交釋放本地鎖。 tx2 后開始,開啟本地事務(wù),拿到本地鎖,更新操作 m = 900 - 100 = 800。本地事務(wù)提交前,嘗試拿該記錄的 全局鎖 ,tx1 全局提交前,該記錄的全局鎖被 tx1 持有,tx2 需要重試等待 全局鎖 。
tx1 二階段全局提交,釋放 全局鎖 。tx2 拿到 全局鎖 提交本地事務(wù)。
如果 tx1 的二階段全局回滾,則 tx1 需要重新獲取該數(shù)據(jù)的本地鎖,進(jìn)行反向補(bǔ)償?shù)母虏僮?,?shí)現(xiàn)分支的回滾。 此時(shí),如果 tx2 仍在等待該數(shù)據(jù)的 全局鎖,同時(shí)持有本地鎖,則 tx1 的分支回滾會(huì)失敗。分支的回滾會(huì)一直重試,直到 tx2 的 全局鎖 等鎖超時(shí),放棄 全局鎖 并回滾本地事務(wù)釋放本地鎖,tx1 的分支回滾最終成功。 因?yàn)檎麄€(gè)過(guò)程 全局鎖 在 tx1 結(jié)束前一直是被 tx1 持有的,所以不會(huì)發(fā)生 臟寫 的問(wèn)題。
讀隔離
在數(shù)據(jù)庫(kù)本地事務(wù)隔離級(jí)別 讀已提交(Read Committed) 或以上的基礎(chǔ)上,Seata(AT 模式)的默認(rèn)全局隔離級(jí)別是 讀未提交(Read Uncommitted) 。 如果應(yīng)用在特定場(chǎng)景下,必需要求全局的 讀已提交 ,目前 Seata 的方式是通過(guò) SELECT FOR UPDATE 語(yǔ)句的代理。
SELECT FOR UPDATE 語(yǔ)句的執(zhí)行會(huì)申請(qǐng) 全局鎖 ,如果 全局鎖 被其他事務(wù)持有,則釋放本地鎖(回滾 SELECT FOR UPDATE 語(yǔ)句的本地執(zhí)行)并重試。這個(gè)過(guò)程中,查詢是被 block 住的,直到 全局鎖 拿到,即讀取的相關(guān)數(shù)據(jù)是 已提交 的,才返回。
出于總體性能上的考慮,Seata 目前的方案并沒有對(duì)所有 SELECT 語(yǔ)句都進(jìn)行代理,僅針對(duì) FOR UPDATE 的 SELECT 語(yǔ)句。
具體例子相見:What Is Seata? | Apache Seata
2.環(huán)境搭建
安裝mysql
參見代碼倉(cāng)庫(kù)里面的mysql模塊里面的docker文件夾
install seta-server
version: "3.1" services: seata-server: image: seataio/seata-server:latest hostname: seata-server ports: - "7091:7091" - "8091:8091" environment: - SEATA_PORT=8091 - STORE_MODE=file
http://localhost:7091/#/Overview
default username and password is admin/admin
3.代碼工程
實(shí)驗(yàn)?zāi)繕?biāo)
訂單服務(wù)調(diào)用庫(kù)存服務(wù)和賬戶余額服務(wù)進(jìn)行相應(yīng)的扣減,并且最終生成訂單
seata-order
訂單服務(wù)
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>seata</artifactId> <groupId>com.et</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>seata-order</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.48</version> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.2</version> </dependency> <dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>io.seata</groupId> <artifactId>seata-http</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.8</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> </project>
controller
package com.et.seata.order.controller; import com.et.seata.order.service.OrderService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.io.IOException; import java.util.HashMap; import java.util.Map; @RestController public class HelloWorldController { @Autowired private OrderService orderService; @PostMapping("/create") public Map<String, Object> createOrder(@RequestParam("userId") Long userId, @RequestParam("productId") Long productId, @RequestParam("price") Integer price) throws IOException { Map<String, Object> map = new HashMap<>(); map.put("msg", "HelloWorld"); map.put("reuslt", orderService.createOrder(userId,productId,price)); return map; } }
service
package com.et.seata.order.service; import com.alibaba.fastjson.JSONObject; import com.et.seata.order.dao.OrderDao; import com.et.seata.order.dto.OrderDO; import io.seata.core.context.RootContext; import io.seata.integration.http.DefaultHttpExecutor; import io.seata.spring.annotation.GlobalTransactional; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpResponse; import org.apache.http.util.EntityUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; /** * @author liuhaihua * @version 1.0 * @ClassName OrderServiceImpl * @Description todo * @date 2024/08/08/ 13:53 */ @Slf4j @Service public class OrderServiceImpl implements OrderService{ @Autowired OrderDao orderDao; @Override @GlobalTransactional // <1> public Integer createOrder(Long userId, Long productId, Integer price) throws IOException { Integer amount = 1; // 購(gòu)買數(shù)量,暫時(shí)設(shè)置為 1。 log.info("[createOrder] 當(dāng)前 XID: {}", RootContext.getXID()); // <2> 扣減庫(kù)存 this.reduceStock(productId, amount); // <3> 扣減余額 this.reduceBalance(userId, price); // <4> 保存訂單 log.info("[createOrder] 保存訂單"); return this.saveOrder(userId,productId,price,amount); } private Integer saveOrder(Long userId, Long productId, Integer price,Integer amount){ // <4> 保存訂單 OrderDO order = new OrderDO(); order.setUserId(userId); order.setProductId(productId); order.setPayAmount(amount * price); orderDao.saveOrder(order); log.info("[createOrder] 保存訂單: {}", order.getId()); return order.getId(); } private void reduceStock(Long productId, Integer amount) throws IOException { // 參數(shù)拼接 JSONObject params = new JSONObject().fluentPut("productId", String.valueOf(productId)) .fluentPut("amount", String.valueOf(amount)); // 執(zhí)行調(diào)用 HttpResponse response = DefaultHttpExecutor.getInstance().executePost("http://127.0.0.1:8082", "/stock", params, HttpResponse.class); // 解析結(jié)果 Boolean success = Boolean.valueOf(EntityUtils.toString(response.getEntity())); if (!success) { throw new RuntimeException("扣除庫(kù)存失敗"); } } private void reduceBalance(Long userId, Integer price) throws IOException { // 參數(shù)拼接 JSONObject params = new JSONObject().fluentPut("userId", String.valueOf(userId)) .fluentPut("price", String.valueOf(price)); // 執(zhí)行調(diào)用 HttpResponse response = DefaultHttpExecutor.getInstance().executePost("http://127.0.0.1:8083", "/balance", params, HttpResponse.class); // 解析結(jié)果 Boolean success = Boolean.valueOf(EntityUtils.toString(response.getEntity())); if (!success) { throw new RuntimeException("扣除余額失敗"); } } }
application.yaml
server: port: 8081 # 端口 spring: application: name: order-service datasource: url: jdbc:mysql://127.0.0.1:3306/seata_order?useSSL=false&useUnicode=true&characterEncoding=UTF-8 driver-class-name: com.mysql.jdbc.Driver username: root password: 123456 # Seata 配置項(xiàng),對(duì)應(yīng) SeataProperties 類 seata: application-id: ${spring.application.name} # Seata 應(yīng)用編號(hào),默認(rèn)為 ${spring.application.name} tx-service-group: ${spring.application.name}-group # Seata 事務(wù)組編號(hào),用于 TC 集群名 # 服務(wù)配置項(xiàng),對(duì)應(yīng) ServiceProperties 類 service: # 虛擬組和分組的映射 vgroup-mapping: order-service-group: default # 分組和 Seata 服務(wù)的映射 grouplist: default: 127.0.0.1:8091
seata-product
商品庫(kù)存服務(wù)
controller
package com.et.seata.product.controller; import com.et.seata.product.dto.ProductReduceStockDTO; import com.et.seata.product.service.ProductService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @RestController @Slf4j public class ProductController { @Autowired ProductService productService; @PostMapping("/stock") public Boolean reduceStock(@RequestBody ProductReduceStockDTO productReduceStockDTO) { log.info("[reduceStock] 收到減少庫(kù)存請(qǐng)求, 商品:{}, 價(jià)格:{}", productReduceStockDTO.getProductId(), productReduceStockDTO.getAmount()); try { productService.reduceStock(productReduceStockDTO.getProductId(), productReduceStockDTO.getAmount()); // 正??鄢龓?kù)存,返回 true return true; } catch (Exception e) { // 失敗扣除庫(kù)存,返回 false return false; } } }
service
package com.et.seata.product.service; import com.et.seata.product.dao.ProductDao; import io.seata.core.context.RootContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @Service @Slf4j public class ProductServiceImpl implements ProductService { @Autowired private ProductDao productDao; @Override @Transactional // <1> 開啟新事物 public void reduceStock(Long productId, Integer amount) throws Exception { log.info("[reduceStock] 當(dāng)前 XID: {}", RootContext.getXID()); // <2> 檢查庫(kù)存 checkStock(productId, amount); log.info("[reduceStock] 開始扣減 {} 庫(kù)存", productId); // <3> 扣減庫(kù)存 int updateCount = productDao.reduceStock(productId, amount); // 扣除成功 if (updateCount == 0) { log.warn("[reduceStock] 扣除 {} 庫(kù)存失敗", productId); throw new Exception("庫(kù)存不足"); } // 扣除失敗 log.info("[reduceStock] 扣除 {} 庫(kù)存成功", productId); } private void checkStock(Long productId, Integer requiredAmount) throws Exception { log.info("[checkStock] 檢查 {} 庫(kù)存", productId); Integer stock = productDao.getStock(productId); if (stock < requiredAmount) { log.warn("[checkStock] {} 庫(kù)存不足,當(dāng)前庫(kù)存: {}", productId, stock); throw new Exception("庫(kù)存不足"); } } }
dao
package com.et.seata.product.dao; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; import org.apache.ibatis.annotations.Update; import org.springframework.stereotype.Repository; @Mapper @Repository public interface ProductDao { /** * 獲取庫(kù)存 * * @param productId 商品編號(hào) * @return 庫(kù)存 */ @Select("SELECT stock FROM product WHERE id = #{productId}") Integer getStock(@Param("productId") Long productId); /** * 扣減庫(kù)存 * * @param productId 商品編號(hào) * @param amount 扣減數(shù)量 * @return 影響記錄行數(shù) */ @Update("UPDATE product SET stock = stock - #{amount} WHERE id = #{productId} AND stock >= #{amount}") int reduceStock(@Param("productId") Long productId, @Param("amount") Integer amount); }
seata-balance
用戶余額服務(wù)
controller
package com.et.seata.balance.controller; import com.et.seata.balance.dto.AccountReduceBalanceDTO; import com.et.seata.balance.service.AccountService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; @RestController @Slf4j public class AccountController { @Autowired private AccountService accountService; @PostMapping("/balance") public Boolean reduceBalance(@RequestBody AccountReduceBalanceDTO accountReduceBalanceDTO) { log.info("[reduceBalance] 收到減少余額請(qǐng)求, 用戶:{}, 金額:{}", accountReduceBalanceDTO.getUserId(), accountReduceBalanceDTO.getPrice()); try { accountService.reduceBalance(accountReduceBalanceDTO.getUserId(), accountReduceBalanceDTO.getPrice()); // 正常扣除余額,返回 true return true; } catch (Exception e) { // 失敗扣除余額,返回 false return false; } } }
service
package com.et.seata.balance.service; import com.et.seata.balance.dao.AccountDao; import io.seata.core.context.RootContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @Service @Slf4j public class AccountServiceImpl implements AccountService { @Autowired private AccountDao accountDao; @Override @Transactional(propagation = Propagation.REQUIRES_NEW) // <1> 開啟新事物 public void reduceBalance(Long userId, Integer price) throws Exception { log.info("[reduceBalance] 當(dāng)前 XID: {}", RootContext.getXID()); // <2> 檢查余額 checkBalance(userId, price); log.info("[reduceBalance] 開始扣減用戶 {} 余額", userId); // <3> 扣除余額 int updateCount = accountDao.reduceBalance(price); // 扣除成功 if (updateCount == 0) { log.warn("[reduceBalance] 扣除用戶 {} 余額失敗", userId); throw new Exception("余額不足"); } log.info("[reduceBalance] 扣除用戶 {} 余額成功", userId); } private void checkBalance(Long userId, Integer price) throws Exception { log.info("[checkBalance] 檢查用戶 {} 余額", userId); Integer balance = accountDao.getBalance(userId); if (balance < price) { log.warn("[checkBalance] 用戶 {} 余額不足,當(dāng)前余額:{}", userId, balance); throw new Exception("余額不足"); } } }
dao
package com.et.seata.balance.dao; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; import org.apache.ibatis.annotations.Update; import org.springframework.stereotype.Repository; @Mapper @Repository public interface AccountDao { /** * 獲取賬戶余額 * * @param userId 用戶 ID * @return 賬戶余額 */ @Select("SELECT balance FROM account WHERE id = #{userId}") Integer getBalance(@Param("userId") Long userId); /** * 扣減余額 * * @param price 需要扣減的數(shù)目 * @return 影響記錄行數(shù) */ @Update("UPDATE account SET balance = balance - #{price} WHERE id = 1 AND balance >= ${price}") int reduceBalance(@Param("price") Integer price); }
以上只是一些關(guān)鍵代碼,所有代碼請(qǐng)參見下面代碼倉(cāng)庫(kù)
代碼倉(cāng)庫(kù)
https://github.com/Harries/springboot-demo
4.測(cè)試
- 啟動(dòng)seata-order服務(wù)
- 啟動(dòng)seata-product服務(wù)
- 啟動(dòng)seata-balance服務(wù)
?編輯可以看到控制臺(tái)輸出回滾日志
2024-08-08 22:00:59.467 INFO 35051 --- [tch_RMROLE_1_16] i.s.core.rpc.netty.RmMessageListener : onMessage:xid=172.22.0.3:8091:27573281007513609,branchId=27573281007513610,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/seata_storage,applicationData=null
2024-08-08 22:00:59.467 INFO 35051 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 172.22.0.3:8091:27573281007513609 27573281007513610 jdbc:mysql://127.0.0.1:3306/seata_storage
2024-08-08 22:00:59.503 INFO 35051 --- [tch_RMROLE_1_16] i.s.r.d.undo.AbstractUndoLogManager : xid 172.22.0.3:8091:27573281007513609 branch 27573281007513610, undo_log deleted with GlobalFinished
2024-08-08 22:00:59.511 INFO 35051 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
5.引用
到此這篇關(guān)于Spring Boot集成Seata實(shí)現(xiàn)基于AT模式的分布式事務(wù)的文章就介紹到這了,更多相關(guān)Spring Boot集成Seata分布式事務(wù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Security基于JWT登錄認(rèn)證的項(xiàng)目實(shí)踐
JWT被用來(lái)在身份提供者和服務(wù)提供者間傳遞被認(rèn)證的用戶身份信息,本文主要介紹了Spring Security基于JWT登錄認(rèn)證的項(xiàng)目實(shí)踐,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07maven自動(dòng)將源碼打包并發(fā)布的實(shí)現(xiàn)步驟
maven-source-plugin 提供項(xiàng)目自動(dòng)將源碼打包并發(fā)布的功能,在需要發(fā)布源碼項(xiàng)目的 pom.xml 文件中添加即可,本文就來(lái)介紹一下如何設(shè)置,感興趣的可以了解一下2023-11-11Java編程rabbitMQ實(shí)現(xiàn)消息的收發(fā)
RabbitMQ是一個(gè)在AMQP基礎(chǔ)上完成的,可復(fù)用的企業(yè)消息系統(tǒng),本文通過(guò)實(shí)例來(lái)給大家分享通過(guò)操作rabbitMQ實(shí)現(xiàn)消息的收發(fā),感興趣的朋友可以參考下。2017-09-09Spring Cloud Hystrix 服務(wù)容錯(cuò)保護(hù)的原理實(shí)現(xiàn)
這篇文章主要介紹了Spring Cloud Hystrix 服務(wù)容錯(cuò)保護(hù)的原理實(shí)現(xiàn),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-05-05SpringBoot如何使用過(guò)濾器進(jìn)行XSS防御
想對(duì)全局的請(qǐng)求都進(jìn)行XSS防御可以使用servlet中的過(guò)濾器或者spring mvc中的攔截器,下面我們就來(lái)看看如何使用servlet中的過(guò)濾器進(jìn)行XSS防御吧2024-11-11Java C++題解leetcode886可能的二分法并查集染色法
這篇文章主要為大家介紹了Java C++題解leetcode886可能的二分法并查集染色法實(shí)現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10