sharding-jdbc中的事務詳細解讀
序言
sharding-jdbc在分庫分表方面提供了很大的便利性
在使用DB的時候,通常都會涉及到事務這個概念,而在分庫分表的環(huán)境上再加上事務,就會使事情變得復雜起來。
本章試圖剖析sharding-jdbc在事務方面的解決思路。
傳統(tǒng)事務回顧
傳統(tǒng)的事務模型如下:
Connection conn = getConnection(); try{ Statement stmt1 = conn.parpareStatement(sql1); stmt1.executeUpdate(); Statement stmt2 = conn.parpareStatement(sql2); stmt2.executeUpdate(); conn.commit(); }catch(Exception e){ conn.rollback(); }
對于同一個連接,可以執(zhí)行多條sql語句,任何一條語句出現(xiàn)錯誤的時候,整個操作流程都可以回滾,從而達到事務的原子操作。
再來看最基本的spring事務操作:
class ServiceA(){ public void updateA(){...} } class ServiceB(){ public void updateB(){...} } @Transactional class ServiceC(){ public void updateC(){ serviceA.updateA(); serviceB.updateB(); } }
我們知道,當updateC執(zhí)行的時候,不管是updateA還是updateB出現(xiàn)了異常,updateC都可以整體回滾,達到原子操作的效果,其主要原因是updateA和updateB共享了同一個Connection,這是spring底層通過ThreadLocal緩存了Connection實現(xiàn)的。
以上介紹的這兩種情況都只是針對單庫單表的原子操作,事務的實現(xiàn)并不難理解,那么在跨庫的情況下,sharding-jdbc又是如何解決事務問題的呢?
shrading-jdbc之弱事務
在官方文檔中,針對弱事務有如下三點說明:
- 完全支持非跨庫事務,例如:僅分表,或分庫但是路由的結果在單庫中。
- 完全支持因邏輯異常導致的跨庫事務。例如:同一事務中,跨兩個庫更新。更新完畢后,拋出空指針,則兩個庫的內容都能回滾。
- 不支持因網(wǎng)絡、硬件異常導致的跨庫事務。例如:同一事務中,跨兩個庫更新,更新完畢后、未提交之前,第一個庫死機,則只有第二個庫數(shù)據(jù)提交。
為了理解以上幾點,我們來看看sharding-jdbc默認是如何處理事務的。
這是一個非常常見的處理模式,一個總連接處理了多條sql語句,最后一次性提交整個事務,每一條sql語句可能會分為多條子sql分庫分表去執(zhí)行,這意味著底層可能會關聯(lián)多個真正的數(shù)據(jù)庫連接,我們先來看看如果一切正常,commit會如何去處理。
public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection { @Override public final void commit() throws SQLException { Collection<SQLException> exceptions = new LinkedList<>(); for (Connection each : cachedConnections.values()) { try { each.commit(); } catch (final SQLException ex) { exceptions.add(ex); } } throwSQLExceptionIfNecessary(exceptions); } }
引擎會遍歷底層所有真正的數(shù)據(jù)庫連接,一個個進行commit操作,如果任何一個出現(xiàn)了異常,直接捕獲異常,但是也只是捕獲而已,然后接著下一個連接的commit,這也就很好的說明了,如果在執(zhí)行任何一條sql語句出現(xiàn)了異常,整個操作是可以原子性回滾的,因為此時所有連接都不會執(zhí)行commit,但如果已經(jīng)到了commit這一步的話,如果有連接commit失敗了,是不會影響到其他連接的。
sharding-jdbc之柔性事務
sharding-jdbc的弱事務并不是完美的,有時可能會導致數(shù)據(jù)的一致性問題,所以針對某些特定的場景,又提出了柔性事務的概念。先來看一張官方的說明圖:
這里想表達兩個意思:
1. 對于sql的執(zhí)行,在執(zhí)行前記錄日志,如果執(zhí)行成功,把日志刪除,如果執(zhí)行失敗,重試一定次數(shù)(如果未達到最大嘗試次數(shù)便執(zhí)行成功了,一樣刪除日志)。
2. 異步任務不斷掃描執(zhí)行日志,如果重試次數(shù)未達到最大上限,嘗試重新執(zhí)行,如果執(zhí)行成功,刪除日志。
從上面兩點分析可以看出,由于采用的是重試的模式,也就是說同一條語句,是有可能被多次執(zhí)行的,所以官方提到了柔性事務的適用場景:
- 根據(jù)主鍵刪除數(shù)據(jù)。
- 更新記錄永久狀態(tài),如更新通知送達狀態(tài)。
而且它還有一定的限制: SQL需要滿足冪等性,具體為:
- INSERT語句要求必須包含主鍵,且不能是自增主鍵。
- UPDATE語句要求冪等,不能是UPDATE xxx SET x=x+1
- DELETE語句無要求。
在有了一個大概的了解之后,我們來更加深入的了解。
sharding-jdbc使用了google的EventBus事件模型,注冊了一個Listener,監(jiān)聽器對三種事件進行了處理,如下代碼所示:
switch (event.getEventExecutionType()) { case BEFORE_EXECUTE: transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(), event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0)); return; case EXECUTE_SUCCESS: transactionLogStorage.remove(event.getId()); return; case EXECUTE_FAILURE: boolean deliverySuccess = false; for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) { if (deliverySuccess) { return; } boolean isNewConnection = false; Connection conn = null; PreparedStatement preparedStatement = null; try { conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML); if (!isValidConnection(conn)) { bedSoftTransaction.getConnection().release(conn); conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML); isNewConnection = true; } preparedStatement = conn.prepareStatement(event.getSql()); //TODO for batch event need split to 2-level records for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) { preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex)); } preparedStatement.executeUpdate(); deliverySuccess = true; transactionLogStorage.remove(event.getId()); } catch (final SQLException ex) { log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex); } finally { close(isNewConnection, conn, preparedStatement); } } return; default: throw new UnsupportedOperationException(event.getEventExecutionType().toString()); }
以上代碼可以抽取為如下圖的描述:
監(jiān)聽器根據(jù)三種不同的事件類型對事務日志進行不同的操作。有監(jiān)聽 ,必然就有事件的投遞,那么引擎是什么時候產(chǎn)生這些事件的呢? 我們知道每一條sql語句拆分后有可能對應多條子sql語句,而每一條子sql語句是單獨執(zhí)行的,執(zhí)行是封裝在一個內部方法的:
private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback, final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception { synchronized (baseStatementUnit.getStatement().getConnection()) { T result; ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown); ExecutorDataMap.setDataMap(dataMap); List<AbstractExecutionEvent> events = new LinkedList<>(); if (parameterSets.isEmpty()) { events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList())); } for (List<Object> each : parameterSets) { events.add(getExecutionEvent(sqlType, baseStatementUnit, each)); } for (AbstractExecutionEvent event : events) { EventBusInstance.getInstance().post(event); } try { result = executeCallback.execute(baseStatementUnit); } catch (final SQLException ex) { for (AbstractExecutionEvent each : events) { each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE); each.setException(Optional.of(ex)); EventBusInstance.getInstance().post(each); ExecutorExceptionHandler.handleException(ex); } return null; } for (AbstractExecutionEvent each : events) { each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS); EventBusInstance.getInstance().post(each); } return result; } }
以上代碼可以簡化為如下流程:
其實執(zhí)行流程比較簡單,但還有兩個重要的細節(jié)這里沒有體現(xiàn):
- 當使用柔性事務的時候,需要創(chuàng)建事務管理器,并獲取事務對象,調用事務對象的begin開始一個事務,在這一步,會強制設置連接的autoCommit=true,這會導致所有的sql語句執(zhí)時后立即提交,想想如果能回滾,那柔性事務也就失去了意義。
- 當事務執(zhí)行begin時,會標記當前不拋出異常,這樣當執(zhí)行sql語句有異常時,會生成相應的EXECUTE_FAILURE事件,從而進行事務日志處理,而不是往外拋出異常,當事務結束時,調用事務對象的end方法,恢復異常的捕獲。
一個常見的代碼編寫模式如下(來自官方的demo)
private static void updateFailure(final DataSource dataSource) throws SQLException { String sql1 = "UPDATE t_order SET status='UPDATE_1' WHERE user_id=10 AND order_id=1000"; String sql2 = "UPDATE t_order SET not_existed_column=1 WHERE user_id=1 AND order_id=?"; String sql3 = "UPDATE t_order SET status='UPDATE_2' WHERE user_id=10 AND order_id=1000"; SoftTransactionManager transactionManager = new SoftTransactionManager(getSoftTransactionConfiguration(dataSource)); transactionManager.init(); BEDSoftTransaction transaction = (BEDSoftTransaction) transactionManager.getTransaction(SoftTransactionType.BestEffortsDelivery); Connection conn = null; try { conn = dataSource.getConnection(); transaction.begin(conn); PreparedStatement preparedStatement1 = conn.prepareStatement(sql1); PreparedStatement preparedStatement2 = conn.prepareStatement(sql2); preparedStatement2.setObject(1, 1000); PreparedStatement preparedStatement3 = conn.prepareStatement(sql3); preparedStatement1.executeUpdate(); preparedStatement2.executeUpdate(); preparedStatement3.executeUpdate(); } finally { transaction.end(); if (conn != null) { conn.close(); } } }
看到這個編寫模式,你一定會想,如果我使用MyBatis和spring,這一切能否整合起來,這個話題有興趣大家可以去嘗試。
總結
分布式事務處理起來有一定的難度,sharding-jdbc采用了簡單的弱事務模式和特殊場景下的柔性事務模式,沒有最好,只有更好,根據(jù)自身業(yè)務去選擇事務模式才是最重要的。
到此這篇關于sharding-jdbc中的事務詳細解讀的文章就介紹到這了,更多相關sharding-jdbc事務內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringBoot文件上傳接口并發(fā)性能調優(yōu)
在一個項目現(xiàn)場,文件上傳接口(文件500K)QPS只有30,這個并發(fā)性能確實堪憂,此文記錄出坑過程,文中通過代碼示例講解的非常詳細,具有一定的參考價值,需要的朋友可以參考下2024-06-06spring boot創(chuàng)建和數(shù)據(jù)庫關聯(lián)模塊詳解
這篇文章主要給大家介紹了關于spring boot創(chuàng)建和數(shù)據(jù)庫關聯(lián)模塊的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-10-10一篇文章帶你解決 IDEA 每次新建項目 maven home directory 總是改變的問題
這篇文章主要介紹了一篇文章帶你解決 IDEA 每次新建項目 maven home directory 總是改變的問題,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-09-09Java并發(fā)包工具類CountDownLatch的應用詳解
CountDownLatch是Java并發(fā)包中非常實用的一個工具類,它可以幫助我們實現(xiàn)線程之間的同步和協(xié)作。本文主要介紹了CountDownLatch的應用場景及最佳實踐,希望對大家有所幫助2023-04-04