Atomikos + MybatisPlus解決多數(shù)據(jù)源事務(wù)一致性問題解決
多數(shù)據(jù)源事務(wù)
在實(shí)際項(xiàng)目的開發(fā)過程中,我們經(jīng)常會(huì)遇到在同一個(gè)項(xiàng)目或微服務(wù)中牽涉到使用兩個(gè)或多個(gè)數(shù)據(jù)源的,由于每個(gè)數(shù)據(jù)源需要使用不同的事務(wù)管理器,而每個(gè)事務(wù)管理器管理不同的數(shù)據(jù)源每個(gè)數(shù)據(jù)源只能保證單個(gè)數(shù)據(jù)源內(nèi)的事物一致性.所以在使用多個(gè)數(shù)據(jù)源的同時(shí)帶來的常見問題就是多數(shù)據(jù)源的事務(wù)一致性問題.本文通過利用一種常見的分布式事物管理器atomikos來解決此類問題.
Atomikos
Atomikos 是一個(gè)Java事務(wù)管理解決方案,用于處理分布式事務(wù)。它提供了一個(gè)可靠和可擴(kuò)展的事務(wù)管理器,可以協(xié)調(diào)多個(gè)資源(如數(shù)據(jù)庫、消息隊(duì)列等)之間的事務(wù)操作,以保證分布式系統(tǒng)的數(shù)據(jù)一致性與隔離性。
Atomikos 提供了以下主要功能和特點(diǎn):
分布式事務(wù)協(xié)調(diào):Atomikos 使用兩階段提交(Two-Phase Commit)協(xié)議來確保分布式事務(wù)的一致性。它充當(dāng)協(xié)調(diào)者角色,與參與者(各個(gè)資源管理器)進(jìn)行協(xié)作并決定是否提交或回滾事務(wù)。
事務(wù)原子性:Atomikos 確保在分布式環(huán)境中進(jìn)行的事務(wù)操作以原子方式執(zhí)行。如果其中任何一個(gè)資源的操作失敗,Atomikos 將自動(dòng)回滾整個(gè)事務(wù),確保數(shù)據(jù)的一致性。
分布式數(shù)據(jù)源和連接池:Atomikos 提供了分布式數(shù)據(jù)源和連接池,用于管理多個(gè)數(shù)據(jù)庫連接和資源。它能夠有效地管理連接和提供高性能和可伸縮性。
事務(wù)隔離級(jí)別:Atomikos 支持不同的事務(wù)隔離級(jí)別,包括讀未提交(Read Uncommitted)、讀已提交(Read Committed)、可重復(fù)讀(Repeatable Read)和串行化(Serializable)。
高可靠性和擴(kuò)展性:Atomikos 可以在分布式環(huán)境中進(jìn)行集群部署,提供高可靠性和擴(kuò)展性。多個(gè) Atomikos 事務(wù)管理器可以一起工作,以保證負(fù)載均衡和容錯(cuò)。
引入相關(guān)依賴
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- atomikos 依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<!-- mybatis plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.3.1</version>
</dependency>
<!-- mysql 驅(qū)動(dòng)包 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>application.yml配置
# 服務(wù)端口號(hào)
server:
port:
8069
# 多數(shù)據(jù)源配置
spring:
datasource:
properties:
user_db:
url: jdbc:mysql://192.168.1.18:3306/user_db
user: root
password: xxxxxxx
data_db:
url: jdbc:mysql://192.168.1.18:3306/data_db
user: root
password: xxxxxxx
上述配置中使用spring.datasource.properties配置多個(gè)數(shù)據(jù)源,在配置類中會(huì)使用一個(gè)Map<String,Map<String,String>>讀取多數(shù)據(jù)源的每個(gè)配置項(xiàng).
多數(shù)據(jù)源配置類:DataSourceConfiguration
給配置類負(fù)責(zé)創(chuàng)建多個(gè)數(shù)據(jù)源以及對(duì)應(yīng)的SqlSessionFactory.代碼如下:
package personal.gltm.demo.config;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import javax.sql.DataSource;
import java.util.LinkedHashMap;
import java.util.Map;
@Configuration
@ConfigurationProperties("spring.datasource")
public class DataSourceConfiguration {
private Map<String,Map<String,String>> properties; // spring.datasource.properties
private Map<String,DataSource> sourceMap; // 用來保存數(shù)據(jù)源信息
public Map<String, Map<String,String>> getProperties() {
return properties;
}
public void setProperties(Map<String, Map<String,String>> properties) {
this.properties = properties;
}
/**
* 添加user_db數(shù)據(jù)源
* @return
*/
@Bean("user_db")
@DependsOn({"datasourceMap"})
public DataSource userDatasource(){
return sourceMap.get("user_db");
}
/**
* 添加user_db SqlSessionFactory
* @return
* @throws Exception
*/
@Bean("user_db_sql_session_factory")
@DependsOn("user_db")
public SqlSessionFactory userDbSqlSessionFactory() throws Exception {
MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
bean.setDataSource(this.sourceMap.get("user_db"));
// 設(shè)置mapper位置
bean.setTypeAliasesPackage("personal.gltm.demo.mapper.user");
// 設(shè)置mapper.xml文件的路徑
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:user/mapper/*.xml"));
return bean.getObject();
}
/**
* 添加 data_db 數(shù)據(jù)源
* @return
*/
@Bean("data_db")
@DependsOn({"datasourceMap"})
public DataSource dataDatasource(){
return sourceMap.get("data_db");
}
/**
* 添加data_db SqlSessionFactory
* @return
* @throws Exception
*/
@Bean("data_db_sql_session_factory")
@DependsOn("data_db")
public SqlSessionFactory dataDbSqlSessionFactory() throws Exception {
MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
bean.setDataSource(this.sourceMap.get("data_db"));
// 設(shè)置mapper位置
bean.setTypeAliasesPackage("personal.gltm.demo.mapper.data");
// 設(shè)置mapper.xml文件的路徑
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:data/mapper/*.xml"));
return bean.getObject();
}
/**
* 創(chuàng)建XA 數(shù)據(jù)源并添加到sourceMap中
* @param dataSourceConfiguration
* @return
*/
@Bean("datasourceMap")
@Primary
public Map<String, DataSource> datasourceMap(DataSourceConfiguration dataSourceConfiguration){
Map<String, DataSource> map = new LinkedHashMap<>();
for(Map.Entry<String,Map<String, String>> entry:dataSourceConfiguration.properties.entrySet()){
// 讀取每個(gè)數(shù)據(jù)源的配置創(chuàng)建datasource
MysqlXADataSource dataSource = new MysqlXADataSource();
dataSource.setUrl(entry.getValue().get("url"));
dataSource.setUser(entry.getValue().get("user"));
dataSource.setPassword(entry.getValue().get("password"));
// 創(chuàng)建atomikosDataSource數(shù)據(jù)源
AtomikosDataSourceBean atomikosDataSource = new AtomikosDataSourceBean();
atomikosDataSource.setMaxPoolSize(10);
atomikosDataSource.setMinPoolSize(5);
atomikosDataSource.setBeanName(entry.getKey()); // 設(shè)置bean的名稱
atomikosDataSource.setXaDataSource(dataSource); // 設(shè)置Xa數(shù)據(jù)源
atomikosDataSource.setTestQuery("select now()");
map.put(entry.getKey(), atomikosDataSource);
}
this.sourceMap = map;
return map;
}
}
上述配置和代碼中我們創(chuàng)建了兩個(gè)mysql數(shù)據(jù)源分別是user_db和data_db.并且在兩個(gè)數(shù)據(jù)源中分別創(chuàng)建了兩個(gè)不同的數(shù)據(jù)表:user_db.t_user{id:bigint,user_name:varchar}和data_db. t_data{id:bigint,context:varchar}
多數(shù)據(jù)源事務(wù)管理器配置:AtomikosConfig
package personal.gltm.demo.config;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import lombok.SneakyThrows;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.mybatis.spring.annotation.MapperScans;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
@Configuration
@EnableTransactionManagement
@MapperScans({@MapperScan(basePackages={"personal.gltm.demo.mapper.user"},sqlSessionFactoryRef = "user_db_sql_session_factory"),
@MapperScan(basePackages = {"personal.gltm.demo.mapper.data"},sqlSessionFactoryRef = "data_db_sql_session_factory")})
public class AtomikosConfig {
// 用于在應(yīng)用程序中執(zhí)行事務(wù)的控制操作。
@Bean(name = "userTransaction")
@SneakyThrows(Exception.class)
public UserTransaction userTransaction() throws SystemException {
final UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(1000);
return userTransactionImp;
}
// 用于管理和控制分布式事務(wù)的整個(gè)生命周期。
@Bean(name = "atomikosTransactionManager")
@SneakyThrows(Exception.class)
public TransactionManager atomikosTransactionManager() {
final UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
return userTransactionManager;
}
// JtaTransactionManager 的主要作用是管理和協(xié)調(diào)分布式事務(wù),它支持使用 JTA 來處理分布式事務(wù),與 JTA 兼容的事務(wù)管理器進(jìn)行交互。
@Bean(name = "transactionManager")
@SneakyThrows(Throwable.class)
public PlatformTransactionManager transactionManager(
@Qualifier("atomikosTransactionManager") TransactionManager atomikosTransactionManager,
@Qualifier("userTransaction") UserTransaction userTransaction) throws SystemException {
return new JtaTransactionManager(userTransaction(), atomikosTransactionManager());
}
}上述代碼需要使用@MapperScans注解綁定sqlsessionfactory的掃描包.
驗(yàn)證測(cè)試
添加測(cè)試類:
package personal.gltm.demo.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import personal.gltm.demo.model.Data;
import personal.gltm.demo.model.User;
import personal.gltm.demo.service.DataService;
import personal.gltm.demo.service.UserService;
import java.math.BigDecimal;
@RequestMapping("/test")
@RestController
public class TestController {
@Autowired
private UserService userService;
@Autowired
private DataService dataService;
@GetMapping("/test/{u-id}/{d-id}")
@Transactional(rollbackFor = Throwable.class)
public String Test(@PathVariable("u-id")BigDecimal uId,@PathVariable("d-id") BigDecimal dId){
User user = new User();
user.setId(uId);
user.setUserName("sihong" + Math.floor(Math.random() * 20));
userService.insert(user);
Data data = new Data();
data.setId(dId);
data.setContext("mine mine mine ..... " + System.nanoTime());
dataService.insert(data);
return "success";
}
}
測(cè)試類中,我們添加了一個(gè)Test方法,接收兩個(gè)參數(shù):userId和dataId,然后把這兩個(gè)參數(shù)作為主鍵分別插入到user_db.t_user和data_db.t_data中.如果主鍵沖突則其中一個(gè)會(huì)報(bào)錯(cuò).如果要保證多數(shù)據(jù)源事物一致性另一個(gè)事物也必須回滾.
在瀏覽器中先調(diào)用:http://localhost/test/test/1/1 分別在user_db.t_user 和data_db.t_data中插入主鍵為1 的數(shù)據(jù).再調(diào)用http://localhost/test/test/1/2 時(shí)會(huì)向數(shù)據(jù)庫插入主鍵分別為1和2的數(shù)據(jù),但是t_user中會(huì)存在主鍵沖突.整個(gè)事務(wù)回滾t_data 中也不會(huì)插入數(shù)據(jù).
到此這篇關(guān)于Atomikos + MybatisPlus解決多數(shù)據(jù)源事務(wù)一致性問題解決的文章就介紹到這了,更多相關(guān)Atomikos MybatisPlus多數(shù)據(jù)源事務(wù)一致性內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringCloud如何引用xxjob定時(shí)任務(wù)
Spring?Cloud?本身不直接支持?XXL-JOB?這樣的定時(shí)任務(wù)框架,如果你想在?Spring?Cloud?應(yīng)用中集成?XXL-JOB,你需要手動(dòng)進(jìn)行配置,本文給大家介紹SpringCloud如何引用xxjob定時(shí)任務(wù),感興趣的朋友一起看看吧2024-04-04
詳細(xì)談?wù)凧ava中l(wèi)ong和double的原子性
原子性是指一個(gè)操作或多個(gè)操作要么全部執(zhí)行,且執(zhí)行的過程不會(huì)被任何因素打斷,要么就都不執(zhí)行,下面這篇文章主要給大家介紹了關(guān)于Java中l(wèi)ong和double原子性的相關(guān)資料,需要的朋友可以參考下2021-08-08
Java設(shè)計(jì)模式之監(jiān)聽器模式實(shí)例詳解
這篇文章主要介紹了Java設(shè)計(jì)模式之監(jiān)聽器模式,結(jié)合實(shí)例形式較為詳細(xì)的分析了java設(shè)計(jì)模式中監(jiān)聽器模式的概念、原理及相關(guān)實(shí)現(xiàn)與使用技巧,需要的朋友可以參考下2018-02-02
詳解Java ScheduledThreadPoolExecutor的踩坑與解決方法
最近項(xiàng)目上反饋某個(gè)重要的定時(shí)任務(wù)突然不執(zhí)行了,很頭疼,開發(fā)環(huán)境和測(cè)試環(huán)境都沒有出現(xiàn)過這個(gè)問題。定時(shí)任務(wù)采用的是ScheduledThreadPoolExecutor,后來一看代碼發(fā)現(xiàn)踩了一個(gè)大坑。本文就來和大家聊聊這次的踩坑記錄與解決方法,需要的可以參考一下2022-10-10
Java實(shí)現(xiàn)短信驗(yàn)證碼和國際短信群發(fā)功能的示例
本篇文章主要介紹了Java實(shí)現(xiàn)短信驗(yàn)證碼和國際短信群發(fā)功能的示例,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2017-02-02
SpringBoot工程下Lombok的應(yīng)用教程詳解
這篇文章主要給大家介紹了關(guān)于SpringBoot工程下Lombok應(yīng)用的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11

