亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Seata AT模式如何實(shí)現(xiàn)行鎖詳解

 更新時(shí)間:2022年11月10日 15:13:32   作者:夢(mèng)想實(shí)現(xiàn)家_Z  
這篇文章主要為大家介紹了Seata AT模式如何實(shí)現(xiàn)行鎖詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

前言

我們?cè)诤芏嗖┛椭卸加邪l(fā)現(xiàn),Seata AT模式里面的全局鎖其實(shí)是行鎖,這也是Seata AT模式和XA模式在鎖粒度上的最大區(qū)別。我們可以在官網(wǎng)看到這樣一個(gè)例子:

兩個(gè)全局事務(wù) tx1 和 tx2,分別對(duì) a 表的 m 字段進(jìn)行更新操作,m 的初始值 1000。

tx1 先開(kāi)始,開(kāi)啟本地事務(wù),拿到本地鎖,更新操作 m = 1000 - 100 = 900。本地事務(wù)提交前,先拿到該記錄的 全局鎖 ,本地提交釋放本地鎖。 tx2 后開(kāi)始,開(kāi)啟本地事務(wù),拿到本地鎖,更新操作 m = 900 - 100 = 800。本地事務(wù)提交前,嘗試拿該記錄的 全局鎖 ,tx1 全局提交前,該記錄的全局鎖被 tx1 持有,tx2 需要重試等待 全局鎖 。

tx1 二階段全局提交,釋放 全局鎖 。tx2 拿到 全局鎖 提交本地事務(wù)。

如果 tx1 的二階段全局回滾,則 tx1 需要重新獲取該數(shù)據(jù)的本地鎖,進(jìn)行反向補(bǔ)償?shù)母虏僮?,?shí)現(xiàn)分支的回滾。

此時(shí),如果 tx2 仍在等待該數(shù)據(jù)的 全局鎖,同時(shí)持有本地鎖,則 tx1 的分支回滾會(huì)失敗。分支的回滾會(huì)一直重試,直到 tx2 的 全局鎖 等鎖超時(shí),放棄 全局鎖 并回滾本地事務(wù)釋放本地鎖,tx1 的分支回滾最終成功。

因?yàn)檎麄€(gè)過(guò)程 全局鎖 在 tx1 結(jié)束前一直是被 tx1 持有的,所以不會(huì)發(fā)生 臟寫 的問(wèn)題。

那么你知道Seata AT模式是如何實(shí)現(xiàn)行鎖的嘛?為了搞明白AT模式到底是怎么獲取全局鎖的,我們深入源碼來(lái)看看。

如何加鎖

為了證實(shí)全局鎖就是我們所說(shuō)的行鎖,經(jīng)過(guò)一番尋找,我在BaseTransactionalExecutor類中的prepareUndoLog()方法中找到了這樣一段代碼:

TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
String lockKeys = buildLockKey(lockKeyRecords);
if (null != lockKeys) {
    connectionProxy.appendLockKey(lockKeys);
    SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
    connectionProxy.appendUndoLog(sqlUndoLog);
}
  • 如果是刪除的SQL,那么通過(guò)beforeImage生成行鎖標(biāo)記,否則通過(guò)afterImage生成行鎖標(biāo)記;

比如表名wallet_tbl,里面有一個(gè)主鍵id值為1,那么最終生成的lockKeyswallet_tbl:1,如果有多行記錄id值分別為1、2、3,那么最終生成的lockKeyswallet_tbl:1,2,3;多個(gè)主鍵索引的話使用_連接。所以我們可以總結(jié)出lockKeys的生成規(guī)則為:tableName:1_A,2_B,3_C,1、2、3A、BC分別為主鍵索引的值。

此時(shí)還沒(méi)有真正地拿到鎖,只是生成一個(gè)鎖的標(biāo)記。真正地上鎖需要查看ConnectionProxy.register()方法:

private void register() throws TransactionException {
    if (!context.hasUndoLog() || !context.hasLockKey()) {
        return;
    }
    Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(), null, context.getXid(), context.getApplicationData(), context.buildLockKeys());
    context.setBranchId(branchId);
}

branchRegister()方法就是RMTC進(jìn)行分支注冊(cè),同時(shí)會(huì)申請(qǐng)行鎖。那么獲取行鎖的核心代碼應(yīng)該就是在TC端了,我們順著branchRegister()邏輯一路找到BranchSession.lock()

   public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException {
        if (this.getBranchType().equals(BranchType.AT)) {
            // 只有AT模式需要獲取行鎖
            return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock);
        }
        return true;
    }

下面就要真正地開(kāi)始進(jìn)入LockerManager來(lái)申請(qǐng)鎖了:

@Override
    public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException {
        if (branchSession == null) {
            throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
        }
        String lockKey = branchSession.getLockKey();
        if (StringUtils.isNullOrEmpty(lockKey)) {
            // no lock
            return true;
        }
        // get locks of branch
        // 將lockKey解析成多行RowLock
        List<RowLock> locks = collectRowLocks(branchSession);
        if (CollectionUtils.isEmpty(locks)) {
            // no lock
            return true;
        }
        return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);
    }

這里做了一步將lockKey解析成多行RowLock,根據(jù)上面的tableName:1_A,2_B,3_C規(guī)則,最終解析成3個(gè)RowLock對(duì)象:{tableName,1_A},{tableName,2_B},{tableName,3_C}

最終我們追蹤到最后一個(gè)關(guān)鍵方法LockStoreDataBaseDAO.acquireLock()

@Override
    public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit, boolean skipCheckLock) {
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        Set<String> dbExistedRowKeys = new HashSet<>();
        boolean originalAutoCommit = true;
        // 如果有多行鎖,那么先去重
        if (lockDOs.size() > 1) {
            lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
        }
        try {
            conn = lockStoreDataSource.getConnection();
            if (originalAutoCommit = conn.getAutoCommit()) {
                conn.setAutoCommit(false);
            }
            List<LockDO> unrepeatedLockDOs = lockDOs;
?
            //check lock
            if (!skipCheckLock) {
?
                boolean canLock = true;
                // 查詢是否已經(jīng)存在行鎖
                // "select row_key, xid, transaction_id, branch_id, reource_id, table_name, pk, status, gmt_create, gmt_modified from lock_table where row_key in (?, ?, ?, ?) order by status desc"
                // in里面最多限制1000個(gè)
                String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size());
                ps = conn.prepareStatement(checkLockSQL);
                for (int i = 0; i < lockDOs.size(); i++) {
                    ps.setString(i + 1, lockDOs.get(i).getRowKey());
                }
                rs = ps.executeQuery();
                String currentXID = lockDOs.get(0).getXid();
                boolean failFast = false;
                while (rs.next()) {
                    String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
                    // 如果發(fā)現(xiàn)有其他分布式事務(wù)和當(dāng)前申請(qǐng)行鎖的數(shù)據(jù)一致,那么加鎖失敗
                    if (!StringUtils.equals(dbXID, currentXID)) {
                        if (LOGGER.isInfoEnabled()) {
                            String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
                            String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
                            long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
                            LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId);
                        }
                        if (!autoCommit) {
                            int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS);
                            if (status == LockStatus.Rollbacking.getCode()) {
                                failFast = true;
                            }
                        }
                        // 加鎖失敗
                        canLock = false;
                        break;
                    }
?
                    dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
                }
                // 加鎖失敗,回滾拋異常
                if (!canLock) {
                    conn.rollback();
                    if (failFast) {
                        throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast));
                    }
                    return false;
                }
                // 如果是同一個(gè)分布式事務(wù)中申請(qǐng)行鎖,那么剔除重復(fù)的鎖數(shù)據(jù)
                if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
                    unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey()))
                            .collect(Collectors.toList());
                }
                // 如果剔除后不需要再補(bǔ)充行鎖,那么直接返回申請(qǐng)成功
                if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
                    conn.rollback();
                    return true;
                }
            }
?
            // 申請(qǐng)行鎖,分1行和多行兩種情況
            if (unrepeatedLockDOs.size() == 1) {
                LockDO lockDO = unrepeatedLockDOs.get(0);
                if (!doAcquireLock(conn, lockDO)) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());
                    }
                    conn.rollback();
                    return false;
                }
            } else {
                if (!doAcquireLocks(conn, unrepeatedLockDOs)) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(),
                            unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));
                    }
                    conn.rollback();
                    return false;
                }
            }
            conn.commit();
            return true;
        } catch (SQLException e) {
            throw new StoreException(e);
        } finally {
            IOUtil.close(rs, ps);
            if (conn != null) {
                try {
                    if (originalAutoCommit) {
                        conn.setAutoCommit(true);
                    }
                    conn.close();
                } catch (SQLException e) {
                }
            }
        }
    }

1.先通過(guò)查詢語(yǔ)句檢查是否存在鎖沖突,鎖沖突的話,就直接失敗拋異常;

2.不存在鎖沖突,檢查是否鎖重入,重入的話,補(bǔ)充行鎖;

3.添加行鎖;

檢查鎖沖突的SQL語(yǔ)句如下:

select row_key, xid, transaction_id, branch_id, reource_id, table_name, pk, status, gmt_create, gmt_modified from lock_table where row_key in (?, ?, ?, ?) order by status desc

添加行鎖SQL語(yǔ)句如下:

insert into lock_table (row_key, xid, transaction_id, branch_id, reource_id, table_name, pk, status, gmt_create, gmt_modified) values (?, ?, ?, ?, ?, ?, ?, now(), now(), ?)

為什么是行鎖

根據(jù)上面加鎖的邏輯,我們發(fā)現(xiàn)一直比較的都是row_key這個(gè)主鍵,那么為什么row_key代表的是行鎖呢?這個(gè)問(wèn)題就要回到row_key是如何產(chǎn)生的:

protected LockDO convertToLockDO(RowLock rowLock) {
        LockDO lockDO = new LockDO();
        lockDO.setBranchId(rowLock.getBranchId());
        lockDO.setPk(rowLock.getPk());
        lockDO.setResourceId(rowLock.getResourceId());
        // row_key的生成
        lockDO.setRowKey(getRowKey(rowLock.getResourceId(), rowLock.getTableName(), rowLock.getPk()));
        lockDO.setXid(rowLock.getXid());
        lockDO.setTransactionId(rowLock.getTransactionId());
        lockDO.setTableName(rowLock.getTableName());
        return lockDO;
    }

根據(jù)上面代碼,我們很清楚地了解到,row_key是由resource_id、tableNamepk這三個(gè)字段連接生成的,也就意味著row_key是代表表里面的具體一行數(shù)據(jù),也就是我們的行記錄,所以我們確信AT模式的全局鎖其實(shí)就是行鎖。

以上就是Seata AT模式如何實(shí)現(xiàn)行鎖詳解的詳細(xì)內(nèi)容,更多關(guān)于Seata AT模式實(shí)現(xiàn)行鎖的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Mybatis-plus+通用mapper(tk.mybatis)的使用

    Mybatis-plus+通用mapper(tk.mybatis)的使用

    本文主要介紹了Mybatis-plus+通用mapper(tk.mybatis)的使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧<BR>
    2024-03-03
  • 基于SpringBoot和PostGIS的某國(guó)基地可視化實(shí)戰(zhàn)

    基于SpringBoot和PostGIS的某國(guó)基地可視化實(shí)戰(zhàn)

    本文以Java開(kāi)發(fā)語(yǔ)言為例,使用SpringBoot框架來(lái)進(jìn)行后臺(tái)開(kāi)發(fā),詳細(xì)講解如何使用Leaflet對(duì)PostGIS的全球基地信息進(jìn)行Web可視化,最后分享Web可視化結(jié)果,感興趣的朋友跟隨小編一起看看吧
    2024-08-08
  • java上界通配符(? extends Type)的使用

    java上界通配符(? extends Type)的使用

    在Java中,? extends Type是一個(gè)上界通配符,本文主要介紹了java上界通配符(? extends Type)的使用,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-01-01
  • JAVA線程同步實(shí)例教程

    JAVA線程同步實(shí)例教程

    這篇文章主要介紹了JAVA線程同步實(shí)例教程,在Java程序設(shè)計(jì)中有著非常廣泛的應(yīng)用,需要的朋友可以參考下
    2014-08-08
  • Eclipse中改變默認(rèn)的workspace的方法及說(shuō)明詳解

    Eclipse中改變默認(rèn)的workspace的方法及說(shuō)明詳解

    eclipse中改變默然的workspace的方法有哪幾種呢?接下來(lái)腳本之家小編給大家介紹Eclipse中改變默認(rèn)的workspace的方法及說(shuō)明,對(duì)eclipse改變workspace相關(guān)知識(shí)感興趣的朋友一起學(xué)習(xí)吧
    2016-04-04
  • java 線程池keepAliveTime的含義說(shuō)明

    java 線程池keepAliveTime的含義說(shuō)明

    這篇文章主要介紹了java 線程池keepAliveTime的含義說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-02-02
  • Spring MVC的項(xiàng)目準(zhǔn)備和連接建立方法

    Spring MVC的項(xiàng)目準(zhǔn)備和連接建立方法

    SpringWebMVC是基于Servlet API的Web框架,屬于Spring框架的一部分,主要用于簡(jiǎn)化Web應(yīng)用程序的開(kāi)發(fā),SpringMVC通過(guò)控制器接收請(qǐng)求,使用模型處理數(shù)據(jù),并通過(guò)視圖展示結(jié)果,感興趣的朋友跟隨小編一起看看吧
    2024-10-10
  • java異步調(diào)用的4種實(shí)現(xiàn)方法

    java異步調(diào)用的4種實(shí)現(xiàn)方法

    日常開(kāi)發(fā)中,會(huì)經(jīng)常遇到說(shuō),前臺(tái)調(diào)服務(wù),本文主要介紹了java異步調(diào)用的4種實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-03-03
  • Java Calendar類的詳解及使用實(shí)例

    Java Calendar類的詳解及使用實(shí)例

    這篇文章主要介紹了Java Calendar類的詳解及使用實(shí)例的相關(guān)資料,需要的朋友可以參考下
    2017-04-04
  • Java 批量刪除Word中的空白段落示例代碼

    Java 批量刪除Word中的空白段落示例代碼

    這篇文章主要介紹了Java 批量刪除Word中的空白段落,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-11-11

最新評(píng)論