Spring多數(shù)據(jù)源切換失敗,發(fā)現(xiàn)與事務相關問題
背景
一個方法里,A數(shù)據(jù)源需要進行查詢和更新操作,B數(shù)據(jù)源進行查詢操作。
詳細業(yè)務是查詢用戶基礎信息數(shù)據(jù)(A數(shù)據(jù)源)的時候,需要查詢登錄行為數(shù)據(jù)(B數(shù)據(jù)源),同時對用戶信息進行修改,所以該方法最后還需要更新用戶基礎信息數(shù)據(jù)(A數(shù)據(jù)源)。
原來的A實現(xiàn)思路
controller調(diào)用serviceImpl方法,在serviceImpl方法內(nèi)某一段代碼去切換B數(shù)據(jù)源查詢結果,根據(jù)查詢結果更新A數(shù)據(jù)源。
實現(xiàn)A思路的結果
表面上一切調(diào)試都顯示切換了B數(shù)據(jù)源,應該是還沒有深入源碼去debug、
應該某個地方拿的數(shù)據(jù)源連接還是默認的A數(shù)據(jù)源,導致報錯,提示沒有找到對應數(shù)據(jù)庫表,A數(shù)據(jù)源肯定沒有B數(shù)據(jù)源的表啊,郁悶。
后來的B實現(xiàn)思路
查詢帖子后發(fā)現(xiàn)事務的干預,導致數(shù)據(jù)源切換失敗,決定換個思路,把切換數(shù)據(jù)源的方法放在了controller,因為我是僅對一個數(shù)據(jù)源進行更新操作,另一個數(shù)據(jù)源只作查詢操作,此時整個事務其實只在A數(shù)據(jù)源進行,所以我就單獨把對A數(shù)據(jù)源的操作聲明為A方法,對B數(shù)據(jù)源的操作聲明為B方法,在controller先調(diào)用B方法獲取查詢結果,作為入?yún)⑷フ{(diào)用A方法,這樣就解決了數(shù)據(jù)源切換問題也解決了A方法的事務問題。
實現(xiàn)B思路的結果
達到了預期。
切換數(shù)據(jù)源成功,A數(shù)據(jù)源查詢、更新、事務都沒問題,B數(shù)據(jù)源查詢沒問題。
??注意:
此思路是把service的方法一分為二,在controller分別調(diào)用,只適用于對其中單一數(shù)據(jù)源作修改數(shù)據(jù)操作,并不適用于對多數(shù)據(jù)源同時進行修改數(shù)據(jù)操作,因為單數(shù)據(jù)源進行數(shù)據(jù)操作是普通數(shù)據(jù)源事務,并不復雜,就和我們平時使用@Transactional一樣。
但是如果你對多數(shù)據(jù)源進行修改數(shù)據(jù)操作的話!事情就變得復雜起來了,多數(shù)據(jù)源事務,可讓你頭疼的了,因為回滾非常麻煩,類似于分布式事務了,阿里的分布式事務有SEATA支撐,這個我了解,但是以后我再講這方面的,因為這個單體系統(tǒng)的多數(shù)據(jù)源事務還需要深入研究一下。
原因
由于默認使用的是主數(shù)據(jù)源master,只有在mapper接口方法上標注從數(shù)據(jù)源slave才會切換數(shù)據(jù)源過去,但是要注意事務(因為之前看一個帖子,說一個事物里,緩存了默認的數(shù)據(jù)庫連接,即使代碼里切換了數(shù)據(jù)源,重新去建立連接時候發(fā)現(xiàn)有緩存一個數(shù)據(jù)庫連接耶,直接拿這個,導致我們切換數(shù)據(jù)源失敗,因為拿的還是默認的數(shù)據(jù)庫連接。
配置文件
spring: datasource: type: com.alibaba.druid.pool.DruidDataSource driverClassName: com.mysql.jdbc.Driver master: #主庫A type: com.alibaba.druid.pool.DruidDataSource driverClassName: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/A?useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&useSSL=false&serverTimezone=GMT%2B8 username: root password: 123456 slave: #從庫B type: com.alibaba.druid.pool.DruidDataSource driverClassName: com.mysql.jdbc.Driver url: jdbc:mysql://192.168.1.12:3306/B?useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&useSSL=true&serverTimezone=GMT%2B8 username: root password: 123456
配置類(蠻多的,注意,請復制完整)
import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy; import javax.sql.DataSource; import java.util.HashMap; // 主從數(shù)據(jù)源配置 @Configuration public class DataSourceConfiguration { DataSourceProperties masterDataSource = new DataSourceProperties(); DataSourceProperties slaveDataSource = new DataSourceProperties(); @Bean @ConfigurationProperties(prefix = "spring.datasource.master") public DataSource masterDataSource() { DruidDataSource druidDataSource = masterDataSource.setDataSource(DruidDataSourceBuilder.create().build()); return druidDataSource; } @Bean @ConfigurationProperties(prefix = "spring.datasource.slave") public DataSource slaveDataSource() { DruidDataSource druidDataSource = slaveDataSource.setDataSource(DruidDataSourceBuilder.create().build()); return druidDataSource; } @Bean public DataSource routeDataSource() { RoutingDataSource routingDataSource = new RoutingDataSource() {{ setDefaultTargetDataSource(masterDataSource()); setTargetDataSources(new HashMap<Object, Object>() {{ put(DbType.MASTER, masterDataSource()); put(DbType.SLAVE, slaveDataSource()); }}); }}; return routingDataSource; } @Bean @Primary public LazyConnectionDataSourceProxy lazyConnectionDataSourceProxy() { return new LazyConnectionDataSourceProxy(routeDataSource()); } }
import com.alibaba.druid.pool.DruidDataSource; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; /** * 數(shù)據(jù)源配置文件 */ @Setter @Configuration @ConfigurationProperties(prefix = "spring.datasource.druid") public class DataSourceProperties { private int initialSize = 10; private int minIdle = 10; private int maxActive = 50; private int maxWait; private int timeBetweenEvictionRunsMillis = 300000; private int minEvictableIdleTimeMillis = 60000; private int maxEvictableIdleTimeMillis = 7200000; private String validationQuery = "SELECT 1 FROM DUAL"; private boolean testWhileIdle = true; private boolean testOnBorrow = true; private boolean testOnReturn = true; public DruidDataSource setDataSource(DruidDataSource datasource) { /** 配置初始化大小、最小、最大 */ datasource.setInitialSize(initialSize); //優(yōu)先級:application的spring.datasource.master.initialSize > application的spring.datasource.druid.initialSize > datasource.setInitialSize(20)和datasource.setInitialSize(initialSize) datasource.setMaxActive(maxActive); datasource.setMinIdle(minIdle); /** 配置獲取連接等待超時的時間 */ // datasource.setMaxWait(maxWait); /** 配置間隔多久才進行一次檢測,檢測需要關閉的空閑連接,單位是毫秒 */ datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); /** 配置一個連接在池中最小、最大生存的時間,單位是毫秒 */ datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); datasource.setMaxEvictableIdleTimeMillis(maxEvictableIdleTimeMillis); /** * 用來檢測連接是否有效的sql,要求是一個查詢語句,常用select 'x'。如果validationQuery為null,testOnBorrow、testOnReturn、testWhileIdle都不會起作用。 */ datasource.setValidationQuery(validationQuery); /** 建議配置為true,不影響性能,并且保證安全性。申請連接的時候檢測,如果空閑時間大于timeBetweenEvictionRunsMillis,執(zhí)行validationQuery檢測連接是否有效。 */ datasource.setTestWhileIdle(testWhileIdle); /** 申請連接時執(zhí)行validationQuery檢測連接是否有效,做了這個配置會降低性能。 */ datasource.setTestOnBorrow(testOnBorrow); /** 歸還連接時執(zhí)行validationQuery檢測連接是否有效,做了這個配置會降低性能。 */ datasource.setTestOnReturn(testOnReturn); return datasource; } }
import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.springframework.stereotype.Component; // aop環(huán)繞增強切換數(shù)據(jù)源 @Slf4j @Aspect @Component public class DBConnectionAOP { @Around("@annotation(connectToDB)") public Object proceed(ProceedingJoinPoint pjp, ConnectToDB connectToDB) throws Throwable { try { if (connectToDB.value().equals("MASTER")) { log.info("Master DB 配置"); DBContextHolder.setDbType(DbType.MASTER); } else if (connectToDB.value().equals("SLAVE")) { log.info("Slave DB 配置"); DBContextHolder.setDbType(DbType.SLAVE); } else { log.info("默認 DB 配置"); } Object result = pjp.proceed(); DBContextHolder.clearDbType(); return result; } finally { DBContextHolder.clearDbType(); } } }
public class DBContextHolder { private static final ThreadLocal<DbType> contextHolder = new ThreadLocal<DbType>(); public static void setDbType(DbType dbType) { if (dbType == null) { throw new NullPointerException(); } contextHolder.set(dbType); } public static DbType getDbType() { return (DbType) contextHolder.get(); } public static void clearDbType() { contextHolder.remove(); } }
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; // 動態(tài)數(shù)據(jù)源路由配置 public class RoutingDataSource extends AbstractRoutingDataSource { // 決定使用哪個數(shù)據(jù)源 @Override protected Object determineCurrentLookupKey() { return DBContextHolder.getDbType(); } }
// 數(shù)據(jù)源枚舉 public enum DbType { MASTER, SLAVE }
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; // 數(shù)據(jù)源選擇注解 @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface ConnectToDB { // 默認主數(shù)據(jù)庫 String value() default "primary"; }
異步線程池配置
如果有異步需求的話,可以借鑒,所以也放上來吧。
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * 異步線程池的配置類 */ @Configuration @EnableAsync public class ExecutorConfig { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Bean public Executor asyncServiceExecutor() { logger.info("注冊asyncServiceExecutor"); // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor(); //配置核心線程數(shù) executor.setCorePoolSize(5); //配置最大線程數(shù) executor.setMaxPoolSize(50); //配置隊列大小 // Set the capacity for the ThreadPoolExecutor's BlockingQueue. Default is Integer.MAX_VALUE. // Any positive value will lead to a LinkedBlockingQueue instance; any other value will lead to a SynchronousQueue instance. executor.setQueueCapacity(100); // 設置允許的空閑時間(秒) executor.setKeepAliveSeconds(60); //配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix("async-service-"); // rejection-policy:當pool已經(jīng)達到max size的時候,如何處理新任務;CALLER_RUNS:不在新線程中執(zhí)行任務,而是有調(diào)用者所在的線程來執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); //執(zhí)行初始化 executor.initialize(); return executor; } }
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.concurrent.ListenableFuture; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; /** * ThreadPoolTaskExecutor的子類,在父類的基礎上加入了日志信息,查看線程池的信息 */ public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { private static final Logger logger = LoggerFactory.getLogger(VisibleThreadPoolTaskExecutor.class); @Override public void execute(Runnable task) { showThreadPoolInfo("1. do execute"); super.execute(task); } @Override public void execute(Runnable task, long startTimeout) { showThreadPoolInfo("2. do execute"); super.execute(task, startTimeout); } @Override public Future<?> submit(Runnable task) { showThreadPoolInfo("1. do submit"); return super.submit(task); } @Override public <T> Future<T> submit(Callable<T> task) { showThreadPoolInfo("2. do submit"); return super.submit(task); } @Override public ListenableFuture<?> submitListenable(Runnable task) { showThreadPoolInfo("1. do submitListenable"); return super.submitListenable(task); } @Override public <T> ListenableFuture<T> submitListenable(Callable<T> task) { showThreadPoolInfo("2. do submitListenable"); return super.submitListenable(task); } private void showThreadPoolInfo(String prefix) { ThreadPoolExecutor executor = getThreadPoolExecutor(); if (null == executor) { return; } logger.info("MaximumPoolSize:" + executor.getMaximumPoolSize() + ",CorePoolSize:" + executor.getCorePoolSize() + ",ThreadNamePrefix:" + this.getThreadNamePrefix() + ",prefix:" + prefix + ",TaskCount:" + executor.getTaskCount() + ",CompletedTaskCount:" + executor.getCompletedTaskCount() + ",ActiveCount:" + executor.getActiveCount() + ",PoolSize:" + executor.getPoolSize() + ",QueueSize:" + executor.getQueue().size() ); } }
A思路業(yè)務代碼(反面教材,請勿模仿)
controller
@PostMapping("queryUserData.do") @VerifyLogin public RetInfo<Object> queryUserData(String temp, String channelId) throws JsonProcessingException { String mobile = UserUtil.getMobile(); User user = userService.queryUserData(mobile, temp, channelId); return new RetInfo<>(RetEnum.SUCCESS.getCode(), RetEnum.SUCCESS.getMsg(), user); }
@Slf4j @Service @Transactional(rollbackFor = Exception.class) public class UserServiceImpl implements UserService { @Autowired private UserLogService userLogService; @Override public User queryUserData(String mobile, String temp, String channelId) throws JsonProcessingException { // 切換slave從數(shù)據(jù)源查詢 Date loginTime = userLogService.validateLoginByToday(mobile); // 查詢當天登錄時間 Date date = new Date(); Integer userStatus = 0; // 以下所有操作都切換回master主數(shù)據(jù)源查詢和更新 User user = userMapper.selectUserByMobile(mobile); // 業(yè)務代碼..... userMapper.insert(user); } }
B思路業(yè)務代碼(正確!有相同業(yè)務場景的可以借鑒)
controller
@PostMapping("queryUserData.do") public RetInfo<Object> queryUserData(String temp, String channelId) throws JsonProcessingException { long start = System.currentTimeMillis(); String mobile = UserUtil.getMobile(); HashMap<String, Object> map = new HashMap<>(); // 切換slave從數(shù)據(jù)源查詢 Date loginTime = userLogService.validateLoginByToday(mobile); // 查詢用戶當天登錄時間 // 切換回master主數(shù)據(jù)源查詢和更新 User user = userService.queryUserData(mobile, loginTime, temp, channelId); return new RetInfo<>(RetEnum.SUCCESS.getCode(), RetEnum.SUCCESS.getMsg(), user); }
service 只是貼出了從數(shù)據(jù)源查詢代碼,主數(shù)據(jù)源的代碼就和平常寫的一樣就行了。
@Service public class UserLogServiceImpl implements UserLogService { private UserBehaviorService userBehaviorService; public UserLogServiceImpl(UserBehaviorService userBehaviorService) { this.userBehaviorService = userBehaviorService; } @Override public Date validateLoginByToday(String mobile) { return userBehaviorService.validateLoginByToday(mobile); } }
@Slf4j @Service public class UserBehaviorServiceImpl implements UserBehaviorService { private UserBehaviorMapper userBehaviorMapper; public UserBehaviorServiceImpl(UserBehaviorMapper userBehaviorMapper) { this.userBehaviorMapper = userBehaviorMapper; } @Transactional @ConnectToDB(value = "SLAVE") // 這就是切換數(shù)據(jù)源最重要注解 @Override public Date validateLoginByToday(String mobile) { return userBehaviorMapper.validateLoginByToday(mobile, DateUtil.dateToString(new Date(), "yyyyMMdd")); } }
mapper(和平常一樣,沒有什么特別的)
@Mapper public interface UserBehaviorMapper { /** * 查詢用戶當天是否活躍 * @param mobile * @param dateStr * @return Integer */ Integer validateActive(@Param("mobile") String mobile, @Param("dateStr") String dateStr); }
總結
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
IDEA新建springboot項目時未生成pom.xml文件的解決操作
這篇文章主要給大家介紹了關于IDEA新建springboot項目時未生成pom.xml文件的解決操作方法,文中通過實例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2023-02-02SpringBoot整合Spring?Security過濾器鏈加載執(zhí)行流程源碼分析(最新推薦)
Spring?Boot?對于?Spring?Security?提供了自動化配置方案,可以使用更少的配置來使用?Spring?Security,這篇文章主要介紹了SpringBoot整合Spring?Security過濾器鏈加載執(zhí)行流程源碼分析,需要的朋友可以參考下2023-02-02解決eclipse啟動tomcat時不能加載web項目的問題
這篇文章主要介紹了解決eclipse啟動tomcat時不能加載web項目的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06