多數(shù)據(jù)源如何實現(xiàn)事務(wù)管理
一. 原理-編程式事務(wù)管理
想直接看實現(xiàn)的朋友,這部分可以直接跳過
1.核心接口
Spring 中對事務(wù)的處理,涉及到三個核心接口:
- PlatformTransactionManager
- TransactionDefinition
- TransactionStatus
這三個核心類是Spring處理事務(wù)的核心類。
1.1 PlatformTransactionManager
public interface PlatformTransactionManager { TransactionStatus getTransaction(TransactionDefinition var1) throws TransactionException; void commit(TransactionStatus var1) throws TransactionException; void rollback(TransactionStatus var1) throws TransactionException; }
可以看到 PlatformTransactionManager 中定義了基本的事務(wù)操作方法,這些事務(wù)操作方法都是平臺無關(guān)的,具體的實現(xiàn)都是由不同的子類來實現(xiàn)的。PlatformTransactionManager 中主要有如下三個方法:
getTransaction()
- getTransaction() 是根據(jù)傳入的 TransactionDefinition 獲取一個事務(wù)對象,TransactionDefinition 中定義了一些事務(wù)的基本規(guī)則,例如傳播性、隔離級別等。
commit()
- commit() 方法用來提交事務(wù)。
rollback()
- rollback() 方法用來回滾事務(wù)。
1.2 TransactionDefinition
可以看到一共有五個方法:
- getIsolationLevel(),獲取事務(wù)的隔離級別
- getName(),獲取事務(wù)的名稱
- getPropagationBehavio(),獲取事務(wù)的傳播性
- getTimeout(),獲取事務(wù)的超時時間
- isReadOnly(),獲取事務(wù)是否是只讀事務(wù)
我們可以從這些個方法中, 很直觀的感受到這個類的作用:設(shè)置事務(wù)的屬性。接下來,我重點結(jié)束幾個屬性的意義。
隔離級別 IsolationLevel
常用狀態(tài)分析:
ReadUncommitted
- 表示:未提交讀。當(dāng)事務(wù)A更新某條數(shù)據(jù)的時候,不容許其他事務(wù)來更新該數(shù)據(jù),但可以進行讀取操作
ReadCommitted
- 表示:提交讀。當(dāng)事務(wù)A更新數(shù)據(jù)時,不容許其他事務(wù)進行任何的操作包括讀取,但事務(wù)A讀取時,其他事務(wù)可以進行讀取、更新。
- 在Read Committed隔離級別下,一個事務(wù)只能讀取到已經(jīng)提交的數(shù)據(jù)。當(dāng)多個事務(wù)同時讀取同一數(shù)據(jù)時,如果有其他事務(wù)正在對該數(shù)據(jù)進行修改但尚未提交,則讀取操作將等待,直到修改完成并提交后才能讀取到最新的數(shù)據(jù)。
- 這意味著在Read Committed級別下,事務(wù)讀取的數(shù)據(jù)是實時更新的,但可能會出現(xiàn)不一致的讀取結(jié)果。
RepeatableRead
- 表示:重復(fù)讀。當(dāng)事務(wù)A更新數(shù)據(jù)時,不容許其他事務(wù)進行任何的操作,但是當(dāng)事務(wù)A進行讀取的時候,其他事務(wù)只能讀取,不能更新。
- 在Repeatable Read隔離級別下,一個事務(wù)在開始讀取數(shù)據(jù)后,任何其他事務(wù)對該數(shù)據(jù)的修改都不會被讀取到,即使這些修改已經(jīng)提交。事務(wù)在整個過程中都能看到一致的數(shù)據(jù)快照。
- 這意味著在Repeatable Read級別下,事務(wù)讀取的數(shù)據(jù)是一致的,不會受到其他并發(fā)事務(wù)的修改的影響,確保了事務(wù)的獨立性和穩(wěn)定性。
Serializable
- 表示:序列化。
- 最嚴(yán)格的隔離級別,當(dāng)然并發(fā)性也是最差的,事務(wù)必須依次進行。
讀取現(xiàn)象:
通過一些現(xiàn)象,可以反映出隔離級別的效果。這些現(xiàn)象有:
- 更新丟失(lost update):當(dāng)系統(tǒng)允許兩個事務(wù)同時更新同一數(shù)據(jù)時,發(fā)生更新丟失。
臟讀(dirty read):當(dāng)一個事務(wù)讀取另一個事務(wù)尚未提交的修改時,產(chǎn)生臟讀。
比如:事務(wù)B執(zhí)行過程中修改了數(shù)據(jù)X,在未提交前,事務(wù)A讀取了X,而事務(wù)B卻回滾了,這樣事務(wù)A就形成了臟讀。
不重復(fù)讀(nonrepeatable read):同一查詢在同一事務(wù)中多次進行,由于其他提交事務(wù)所做的修改或刪除,每次返回不同的結(jié)果集,此時發(fā)生非重復(fù)讀。(A transaction rereads data it has previously read and finds that another committed transaction has modified or deleted the data. )。
比如:事務(wù)A首先讀取了一條數(shù)據(jù),然后執(zhí)行邏輯的時候,事務(wù)B將這條數(shù)據(jù)改變了,然后事務(wù)A再次讀取的時候,發(fā)現(xiàn)數(shù)據(jù)不匹配了,就是所謂的不可重復(fù)讀了。
幻讀(phantom read):同一查詢在同一事務(wù)中多次進行,由于其他提交事務(wù)所做的插入操作,每次返回不同的結(jié)果集,此時發(fā)生幻像讀。(A transaction reexecutes a query returning a set of rows that satisfies a search condition and finds that another committed transaction has inserted additional rows that satisfy the condition. )。
比如:事務(wù)A首先根據(jù)條件索引得到N條數(shù)據(jù),然后事務(wù)B改變了這N條數(shù)據(jù)之外的M條或者增添了M條符合事務(wù)A搜索條件的數(shù)據(jù),導(dǎo)致事務(wù)A再次搜索發(fā)現(xiàn)有N+M條數(shù)據(jù)了,就產(chǎn)生了幻讀。
不可重復(fù)讀和幻讀比較:
兩者有些相似,但是前者針對的是update或delete,后者針對的insert。
- 為什么會出現(xiàn)“臟讀”?因為沒有“select”操作沒有規(guī)矩。
- 為什么會出現(xiàn)“不可重復(fù)讀”?因為“update”操作沒有規(guī)矩。
- 為什么會出現(xiàn)“幻讀”?因為“insert”和“delete”操作沒有規(guī)矩。
隔離級別與讀取現(xiàn)象:
隔離級別 | 臟讀 Dirty Read | 不可重復(fù)讀取 NonRepeatable Read | 幻讀 Phantom Read |
---|---|---|---|
未授權(quán)讀取/未提交讀 read uncommitted | 可能發(fā)生 | 可能發(fā)生 | 可能發(fā)生 |
授權(quán)讀取/提交讀 read committed | - | 可能發(fā)生 | 可能發(fā)生 |
重復(fù)讀 read repeatable | - | - | 可能發(fā)生 |
序列化 serializable | - | - | - |
常見數(shù)據(jù)庫默認(rèn)隔離級別:
數(shù)據(jù)庫 | 默認(rèn)隔離級別 |
---|---|
Oracle | read committed |
SqlServer | read committed |
MySQL(InnoDB) | Read-Repeatable |
傳播性 Propagation
- 1、PROPAGATION_REQUIRED:如果當(dāng)前沒有事務(wù),就創(chuàng)建一個新事務(wù),如果當(dāng)前存在事務(wù),就加入該事務(wù),該設(shè)置是最常用的設(shè)置。
- 2、PROPAGATION_NESTED:如果當(dāng)前存在事務(wù),則在嵌套事務(wù)內(nèi)執(zhí)行。如果當(dāng)前沒有事務(wù),則執(zhí)行與PROPAGATION_REQUIRED類似的操作
- 3、PROPAGATION_SUPPORTS:支持當(dāng)前事務(wù),如果當(dāng)前存在事務(wù),就加入該事務(wù),如果當(dāng)前不存在事務(wù),就以非事務(wù)執(zhí)行。‘
- 4、PROPAGATION_MANDATORY:支持當(dāng)前事務(wù),如果當(dāng)前存在事務(wù),就加入該事務(wù),如果當(dāng)前不存在事務(wù),就拋出異常。
- 5、PROPAGATION_REQUIRES_NEW:支持當(dāng)前事務(wù),創(chuàng)建新事務(wù),無論當(dāng)前存不存在事務(wù),都創(chuàng)建新事務(wù)。
- 6、PROPAGATION_NOT_SUPPORTED:以非事務(wù)方式執(zhí)行操作,如果當(dāng)前存在事務(wù),就把當(dāng)前事務(wù)掛起。
- 7、PROPAGATION_NEVER:以非事務(wù)方式執(zhí)行,如果當(dāng)前存在事務(wù),則拋出異常。
1.3 TransactionStatus
- isNewTransaction() 方法獲取當(dāng)前事務(wù)是否是一個新事務(wù)。
- hasSavepoint() 方法判斷是否存在 savePoint(),即是否已創(chuàng)建為基于保存點的嵌套事務(wù)。此方法主要用于診斷目的,與isNewTransaction()一起使用。對于自定義保存點的編程處理,請使用SavepointManager提供的操作。
- setRollbackOnly() 方法設(shè)置事務(wù)必須回滾。
- isRollbackOnly() 方法獲取事務(wù)只能回滾。
- flush() 方法將底層會話中的修改刷新到數(shù)據(jù)庫,一般用于 Hibernate/JPA 的會話,對如 JDBC 類型的事務(wù)無任何影響。
- isCompleted() 方法用來獲取是一個事務(wù)是否結(jié)束,即是否已提交或回滾。
表示事務(wù)的狀態(tài)。
事務(wù)代碼可以使用它來檢索狀態(tài)信息,并以編程方式請求回滾(而不是引發(fā)導(dǎo)致隱式回滾的異常)。
二、實現(xiàn)跨數(shù)據(jù)源事務(wù)
先說一下兩階段提交:首先多個數(shù)據(jù)源的事務(wù)分別都開起來,然后各事務(wù)分別去執(zhí)行對應(yīng)的sql(此所謂第一階段提交),最后如果都成功就把事務(wù)全部提交,只要有一個失敗就把事務(wù)都回滾——此所謂第二階段提交。
Transactional注解只能指定一個數(shù)據(jù)源的事務(wù)管理器。我們重新定義一個,讓它支持指定多個數(shù)據(jù)源的事務(wù)管理器,然后我們在使用了這個注解的方法前后進行所謂的兩階段協(xié)議,而這可以通過AOP來完成。所以,代碼如下:
定義注解
/** * 多數(shù)據(jù)源事務(wù)注解 * */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface MultiDataSourceTransactional { /** * 事務(wù)管理器數(shù)組 */ String[] transactionManagers(); }
定義切面
我們使用Spring的Aspect來完成切面。
先來回顧一下它的切入點
- @Before: 標(biāo)識一個前置增強方法,相當(dāng)于BeforeAdvice的功能。
- @After: 后置增強,不管是拋出異?;蛘哒M顺龆紩?zhí)行。
- @AfterReturning: 后置增強,似于AfterReturningAdvice, 方法正常退出時執(zhí)行。
- @AfterThrowing: 異常拋出增強,相當(dāng)于ThrowsAdvice。
- @Around: 環(huán)繞增強,相當(dāng)于MethodInterceptor。
咋一看,@Around是可以的:ProceedingJoinPoint的proceed方法是執(zhí)行目標(biāo)方法,在它前面聲明事務(wù),try…catch…一下如果有異常就回滾沒異常就提交。不過,最開始用這個的時候,好像發(fā)現(xiàn)有點問題,具體記不住了,大家可以試一下。
因為當(dāng)時工期緊沒仔細研究,就采用了下面這種
@Before + @AfterReturning + @AfterThrowing組合,看名字和功能簡直是完美契合?。〉怯幸粋€問題,不同方法怎么共享那個事務(wù)呢?成員變量?對,沒錯。但是又有線程安全問題咋辦?ThreadLocal幫你解決(_)。
/** * 編程式事務(wù) 基本過程: * 1.TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition()); * 首先通過transactionManager.getTransaction方法獲取事務(wù)對象status, * 2.transactionManager.commit(status); * 然后在try塊中執(zhí)行數(shù)據(jù)庫操作,最后通過transactionManager.commit(status)提交事務(wù)。 * 3.transactionManager.rollback(status); * 如果在數(shù)據(jù)庫操作中發(fā)生了異常,則會通過transactionManager.rollback(status)回滾事務(wù)。 * */ @Component @Aspect public class MultiDataSourceTransactionAspect implements ApplicationContextAware { /** * 線程本地變量:為什么使用棧?※為了達到后進先出的效果※ */ private static final ThreadLocal<Stack<Pair<DataSourceTransactionManager, TransactionStatus>>> THREAD_LOCAL = new ThreadLocal<>(); /** * 事務(wù)聲明 */ private DefaultTransactionDefinition def = new DefaultTransactionDefinition(); { // 非只讀模式 def.setReadOnly(false); //事務(wù)隔離級別 采用數(shù)據(jù)庫默認(rèn)的 def.setIsolationLevel(DefaultTransactionDefinition.ISOLATION_DEFAULT); //事務(wù)傳播行為 - 創(chuàng)建一個新的事務(wù),并在該事務(wù)中執(zhí)行;如果當(dāng)前存在事務(wù),則將當(dāng)前事務(wù)掛起。 def.setPropagationBehavior(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW); } /** * implements ApplicationContextAware ==> 獲取spring容器 */ private ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } /** * 設(shè)置切入點 */ @Pointcut("@annotation(com.example.mutidatasource.dataSource.multiDataSourceTransaction.MultiDataSourceTransactional)") public void pointcut(){} /** * 聲明事務(wù) * * 冗余了,pointcut() 、 @annotation(transactional) 語義一致 * 但是 不加上這個@annotation(transactional) 報錯, before() 方法期望有一個 MultiDataSourceTransactional 類型的參數(shù), * 雖然pointcut() 確定了有這個注解,但是編輯器不知道啊 */ @Before("pointcut() && @annotation(transactional)") public void before(MultiDataSourceTransactional transactional){ //根據(jù)設(shè)置的事務(wù)名稱按順序聲明,并放到ThreadLocal里 String[] TransactionalNames = transactional.value(); Stack<Pair<DataSourceTransactionManager, TransactionStatus>> pairStack = new Stack<>(); for (String transactionalName : TransactionalNames) { // 從容器中獲取 數(shù)據(jù)庫事務(wù)管理器 DataSourceTransactionManager manager = applicationContext.getBean(transactionalName, DataSourceTransactionManager.class); TransactionStatus status = manager.getTransaction(def); pairStack.push(new Pair<>(manager,status)); } THREAD_LOCAL.set(pairStack); } /** * 提交事務(wù) * * @AfterReturning 和 @After 都是 Spring AOP 框架提供的注解,用于在方法執(zhí)行后執(zhí)行某些操作。但是,它們之間有一些關(guān)鍵的區(qū)別: * * @AfterReturning 僅在方法正常返回時執(zhí)行,而 @After 總是在方法執(zhí)行后執(zhí)行,無論方法是否正常返回。 * @AfterReturning 可以訪問方法的返回值,而 @After 則不能。 * @AfterReturning 可以通過 returning 屬性指定要訪問的返回值的名稱,而 @After 則不能。 */ @AfterReturning("pointcut()") public void afterReturning(){ // ※棧頂彈出(后進先出) Stack<Pair<DataSourceTransactionManager, TransactionStatus>> pairStack = THREAD_LOCAL.get(); while (!pairStack.empty()) { Pair<DataSourceTransactionManager, TransactionStatus> pair = pairStack.pop(); // 提交事務(wù) transactionManager.commit(status); pair.getKey().commit(pair.getValue()); } THREAD_LOCAL.remove(); } /** * 回滾事務(wù) * * */ @AfterThrowing("pointcut()") public void afterThrowing(){ // ※棧頂彈出(后進先出) Stack<Pair<DataSourceTransactionManager, TransactionStatus>> pairStack = THREAD_LOCAL.get(); while (!pairStack.empty()) { Pair<DataSourceTransactionManager, TransactionStatus> pair = pairStack.pop(); // 提交事務(wù) transactionManager.commit(status); pair.getKey().rollback(pair.getValue()); } THREAD_LOCAL.remove(); } }
使用
/** * 測試多數(shù)據(jù)源事務(wù) */ @MultiDataSourceTransactional(transactionManagers={"ATransactionManager","BTransactionManager"}) public void testTransaction() { }
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Spring?Boot?整合RocketMq實現(xiàn)消息過濾功能
這篇文章主要介紹了Spring?Boot?整合RocketMq實現(xiàn)消息過濾,本文講解了RocketMQ實現(xiàn)消息過濾,針對不同的業(yè)務(wù)場景選擇合適的方案即可,需要的朋友可以參考下2022-06-06SpringCloud Ribbon負(fù)載均衡工具使用
Ribbon是Netflix的組件之一,負(fù)責(zé)注冊中心的負(fù)載均衡,有助于控制HTTP和TCP客戶端行為。Spring?Cloud?Netflix?Ribbon一般配合Ribbon進行使用,利用在Eureka中讀取的服務(wù)信息,在調(diào)用服務(wù)節(jié)點時合理進行負(fù)載2023-02-02java.lang.UnsupportedOperationException分析及解決辦法
日常開發(fā)中我遇到j(luò)ava.lang.UnsupportedOperationException:異常兩次了,下面這篇文章主要給對大家介紹了關(guān)于java.lang.UnsupportedOperationException分析及解決辦法,需要的朋友可以參考下2024-03-03java中如何實現(xiàn) zip rar 7z 壓縮包解壓
這篇文章主要介紹了java中如何實現(xiàn) zip rar 7z 壓縮包解壓問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07