Atomikos + MybatisPlus解決多數(shù)據(jù)源事務一致性問題解決
多數(shù)據(jù)源事務
在實際項目的開發(fā)過程中,我們經(jīng)常會遇到在同一個項目或微服務中牽涉到使用兩個或多個數(shù)據(jù)源的,由于每個數(shù)據(jù)源需要使用不同的事務管理器,而每個事務管理器管理不同的數(shù)據(jù)源每個數(shù)據(jù)源只能保證單個數(shù)據(jù)源內(nèi)的事物一致性.所以在使用多個數(shù)據(jù)源的同時帶來的常見問題就是多數(shù)據(jù)源的事務一致性問題.本文通過利用一種常見的分布式事物管理器atomikos來解決此類問題.
Atomikos
Atomikos 是一個Java事務管理解決方案,用于處理分布式事務。它提供了一個可靠和可擴展的事務管理器,可以協(xié)調(diào)多個資源(如數(shù)據(jù)庫、消息隊列等)之間的事務操作,以保證分布式系統(tǒng)的數(shù)據(jù)一致性與隔離性。
Atomikos 提供了以下主要功能和特點:
分布式事務協(xié)調(diào):Atomikos 使用兩階段提交(Two-Phase Commit)協(xié)議來確保分布式事務的一致性。它充當協(xié)調(diào)者角色,與參與者(各個資源管理器)進行協(xié)作并決定是否提交或回滾事務。
事務原子性:Atomikos 確保在分布式環(huán)境中進行的事務操作以原子方式執(zhí)行。如果其中任何一個資源的操作失敗,Atomikos 將自動回滾整個事務,確保數(shù)據(jù)的一致性。
分布式數(shù)據(jù)源和連接池:Atomikos 提供了分布式數(shù)據(jù)源和連接池,用于管理多個數(shù)據(jù)庫連接和資源。它能夠有效地管理連接和提供高性能和可伸縮性。
事務隔離級別:Atomikos 支持不同的事務隔離級別,包括讀未提交(Read Uncommitted)、讀已提交(Read Committed)、可重復讀(Repeatable Read)和串行化(Serializable)。
高可靠性和擴展性:Atomikos 可以在分布式環(huán)境中進行集群部署,提供高可靠性和擴展性。多個 Atomikos 事務管理器可以一起工作,以保證負載均衡和容錯。
引入相關依賴
<dependencies> <!-- Spring Boot Starter --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- atomikos 依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency> <!-- mybatis plus --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.4.3.1</version> </dependency> <!-- mysql 驅動包 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.26</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
application.yml配置
# 服務端口號 server: port: 8069 # 多數(shù)據(jù)源配置 spring: datasource: properties: user_db: url: jdbc:mysql://192.168.1.18:3306/user_db user: root password: xxxxxxx data_db: url: jdbc:mysql://192.168.1.18:3306/data_db user: root password: xxxxxxx
上述配置中使用spring.datasource.properties配置多個數(shù)據(jù)源,在配置類中會使用一個Map<String,Map<String,String>>讀取多數(shù)據(jù)源的每個配置項.
多數(shù)據(jù)源配置類:DataSourceConfiguration
給配置類負責創(chuàng)建多個數(shù)據(jù)源以及對應的SqlSessionFactory.代碼如下:
package personal.gltm.demo.config; import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean; import com.mysql.cj.jdbc.MysqlXADataSource; import org.apache.ibatis.session.SqlSessionFactory; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.Primary; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import javax.sql.DataSource; import java.util.LinkedHashMap; import java.util.Map; @Configuration @ConfigurationProperties("spring.datasource") public class DataSourceConfiguration { private Map<String,Map<String,String>> properties; // spring.datasource.properties private Map<String,DataSource> sourceMap; // 用來保存數(shù)據(jù)源信息 public Map<String, Map<String,String>> getProperties() { return properties; } public void setProperties(Map<String, Map<String,String>> properties) { this.properties = properties; } /** * 添加user_db數(shù)據(jù)源 * @return */ @Bean("user_db") @DependsOn({"datasourceMap"}) public DataSource userDatasource(){ return sourceMap.get("user_db"); } /** * 添加user_db SqlSessionFactory * @return * @throws Exception */ @Bean("user_db_sql_session_factory") @DependsOn("user_db") public SqlSessionFactory userDbSqlSessionFactory() throws Exception { MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean(); bean.setDataSource(this.sourceMap.get("user_db")); // 設置mapper位置 bean.setTypeAliasesPackage("personal.gltm.demo.mapper.user"); // 設置mapper.xml文件的路徑 bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:user/mapper/*.xml")); return bean.getObject(); } /** * 添加 data_db 數(shù)據(jù)源 * @return */ @Bean("data_db") @DependsOn({"datasourceMap"}) public DataSource dataDatasource(){ return sourceMap.get("data_db"); } /** * 添加data_db SqlSessionFactory * @return * @throws Exception */ @Bean("data_db_sql_session_factory") @DependsOn("data_db") public SqlSessionFactory dataDbSqlSessionFactory() throws Exception { MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean(); bean.setDataSource(this.sourceMap.get("data_db")); // 設置mapper位置 bean.setTypeAliasesPackage("personal.gltm.demo.mapper.data"); // 設置mapper.xml文件的路徑 bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:data/mapper/*.xml")); return bean.getObject(); } /** * 創(chuàng)建XA 數(shù)據(jù)源并添加到sourceMap中 * @param dataSourceConfiguration * @return */ @Bean("datasourceMap") @Primary public Map<String, DataSource> datasourceMap(DataSourceConfiguration dataSourceConfiguration){ Map<String, DataSource> map = new LinkedHashMap<>(); for(Map.Entry<String,Map<String, String>> entry:dataSourceConfiguration.properties.entrySet()){ // 讀取每個數(shù)據(jù)源的配置創(chuàng)建datasource MysqlXADataSource dataSource = new MysqlXADataSource(); dataSource.setUrl(entry.getValue().get("url")); dataSource.setUser(entry.getValue().get("user")); dataSource.setPassword(entry.getValue().get("password")); // 創(chuàng)建atomikosDataSource數(shù)據(jù)源 AtomikosDataSourceBean atomikosDataSource = new AtomikosDataSourceBean(); atomikosDataSource.setMaxPoolSize(10); atomikosDataSource.setMinPoolSize(5); atomikosDataSource.setBeanName(entry.getKey()); // 設置bean的名稱 atomikosDataSource.setXaDataSource(dataSource); // 設置Xa數(shù)據(jù)源 atomikosDataSource.setTestQuery("select now()"); map.put(entry.getKey(), atomikosDataSource); } this.sourceMap = map; return map; } }
上述配置和代碼中我們創(chuàng)建了兩個mysql數(shù)據(jù)源分別是user_db和data_db.并且在兩個數(shù)據(jù)源中分別創(chuàng)建了兩個不同的數(shù)據(jù)表:user_db.t_user{id:bigint,user_name:varchar}和data_db. t_data{id:bigint,context:varchar}
多數(shù)據(jù)源事務管理器配置:AtomikosConfig
package personal.gltm.demo.config; import com.atomikos.icatch.jta.UserTransactionImp; import com.atomikos.icatch.jta.UserTransactionManager; import lombok.SneakyThrows; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.annotation.MapperScan; import org.mybatis.spring.annotation.MapperScans; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.jta.JtaTransactionManager; import javax.transaction.SystemException; import javax.transaction.TransactionManager; import javax.transaction.UserTransaction; @Configuration @EnableTransactionManagement @MapperScans({@MapperScan(basePackages={"personal.gltm.demo.mapper.user"},sqlSessionFactoryRef = "user_db_sql_session_factory"), @MapperScan(basePackages = {"personal.gltm.demo.mapper.data"},sqlSessionFactoryRef = "data_db_sql_session_factory")}) public class AtomikosConfig { // 用于在應用程序中執(zhí)行事務的控制操作。 @Bean(name = "userTransaction") @SneakyThrows(Exception.class) public UserTransaction userTransaction() throws SystemException { final UserTransactionImp userTransactionImp = new UserTransactionImp(); userTransactionImp.setTransactionTimeout(1000); return userTransactionImp; } // 用于管理和控制分布式事務的整個生命周期。 @Bean(name = "atomikosTransactionManager") @SneakyThrows(Exception.class) public TransactionManager atomikosTransactionManager() { final UserTransactionManager userTransactionManager = new UserTransactionManager(); userTransactionManager.setForceShutdown(false); return userTransactionManager; } // JtaTransactionManager 的主要作用是管理和協(xié)調(diào)分布式事務,它支持使用 JTA 來處理分布式事務,與 JTA 兼容的事務管理器進行交互。 @Bean(name = "transactionManager") @SneakyThrows(Throwable.class) public PlatformTransactionManager transactionManager( @Qualifier("atomikosTransactionManager") TransactionManager atomikosTransactionManager, @Qualifier("userTransaction") UserTransaction userTransaction) throws SystemException { return new JtaTransactionManager(userTransaction(), atomikosTransactionManager()); } }
上述代碼需要使用@MapperScans注解綁定sqlsessionfactory的掃描包.
驗證測試
添加測試類:
package personal.gltm.demo.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import personal.gltm.demo.model.Data; import personal.gltm.demo.model.User; import personal.gltm.demo.service.DataService; import personal.gltm.demo.service.UserService; import java.math.BigDecimal; @RequestMapping("/test") @RestController public class TestController { @Autowired private UserService userService; @Autowired private DataService dataService; @GetMapping("/test/{u-id}/{d-id}") @Transactional(rollbackFor = Throwable.class) public String Test(@PathVariable("u-id")BigDecimal uId,@PathVariable("d-id") BigDecimal dId){ User user = new User(); user.setId(uId); user.setUserName("sihong" + Math.floor(Math.random() * 20)); userService.insert(user); Data data = new Data(); data.setId(dId); data.setContext("mine mine mine ..... " + System.nanoTime()); dataService.insert(data); return "success"; } }
測試類中,我們添加了一個Test方法,接收兩個參數(shù):userId和dataId,然后把這兩個參數(shù)作為主鍵分別插入到user_db.t_user和data_db.t_data中.如果主鍵沖突則其中一個會報錯.如果要保證多數(shù)據(jù)源事物一致性另一個事物也必須回滾.
在瀏覽器中先調(diào)用:http://localhost/test/test/1/1 分別在user_db.t_user 和data_db.t_data中插入主鍵為1 的數(shù)據(jù).再調(diào)用http://localhost/test/test/1/2 時會向數(shù)據(jù)庫插入主鍵分別為1和2的數(shù)據(jù),但是t_user中會存在主鍵沖突.整個事務回滾t_data 中也不會插入數(shù)據(jù).
到此這篇關于Atomikos + MybatisPlus解決多數(shù)據(jù)源事務一致性問題解決的文章就介紹到這了,更多相關Atomikos MybatisPlus多數(shù)據(jù)源事務一致性內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
詳解Java ScheduledThreadPoolExecutor的踩坑與解決方法
最近項目上反饋某個重要的定時任務突然不執(zhí)行了,很頭疼,開發(fā)環(huán)境和測試環(huán)境都沒有出現(xiàn)過這個問題。定時任務采用的是ScheduledThreadPoolExecutor,后來一看代碼發(fā)現(xiàn)踩了一個大坑。本文就來和大家聊聊這次的踩坑記錄與解決方法,需要的可以參考一下2022-10-10Java實現(xiàn)短信驗證碼和國際短信群發(fā)功能的示例
本篇文章主要介紹了Java實現(xiàn)短信驗證碼和國際短信群發(fā)功能的示例,具有一定的參考價值,感興趣的小伙伴們可以參考一下。2017-02-02