詳解全局事務注解@GlobalTransactional的識別
一、聲明式全局事務
在Seata
示例工程中,能看到@GlobalTransactional
,如下方法示例:
@GlobalTransactional public boolean purchase(long accountId, long stockId, long quantity) { String xid = RootContext.getXID(); LOGGER.info("New Transaction Begins: " + xid); boolean stockResult = reduceAccount(accountId,stockId, quantity); if (!stockResult) { throw new RuntimeException("賬號服務調用失敗,事務回滾!"); } Long orderId = createOrder(accountId, stockId, quantity); if (orderId == null || orderId <= 0) { throw new RuntimeException("訂單服務調用失敗,事務回滾!"); } return true; }
purchase
方法上加上此注解,即表示此方法內(nèi)的reduceAccount
和createOrder
兩個微服務調用也將加入到分布式事務中,即扣除賬戶余額與創(chuàng)建訂單將具有分布式事務的數(shù)據(jù)一致性保障能力。
了解 Spring 注解事務實現(xiàn)的話,應該也能推測出,Seata 的事務能力也可能是基于 Spring 的 AOP 機制,給標注了@GlobalTransactional
的方法做 AOP 增加,織入額外的邏輯以完成分布式事務的能力,偽代碼大致如下:
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); try { tx.begin(xxx); ... purchase(xxx)//給purchase增加全局事務處理能力 ... tx.commit(); } catch (Exception exx) { tx.rollback(); throw exx; }
二、@GlobalTransactional 注解如何被識別?
2.1 運行環(huán)境
1)引入seata-spring-boot-starter
模塊
<dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>${seata.version}</version> </dependency>
在spring.factories
中有自動裝配類SeataAutoConfiguration
# Auto Configure org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ ... io.seata.spring.boot.autoconfigure.SeataAutoConfiguration ...
此類負責處理全局事務掃描及設置,其中就有@GlobalTransactional
。
2)條件要求:
@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true) @AutoConfigureAfter({SeataCoreAutoConfiguration.class}) public class SeataAutoConfiguration
從類的自動裝備條件,可看出要滿足 2 個條件:
@ConditionalOnProperty
,表明若要生效須具備以下配置條件:
seata.enabled = true
@AutoConfigureAfter
,表示從 bean 的加載順序來看,要求是在SeataCoreAutoConfiguration
之后,SeataCoreAutoConfiguration
又是什么呢?
@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true) @ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties") @Configuration(proxyBeanMethods = false) public class SeataCoreAutoConfiguration { @Bean(BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER) @ConditionalOnMissingBean(name = {BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER}) public SpringApplicationContextProvider springApplicationContextProvider() { return new SpringApplicationContextProvider(); } }
從源碼可知,也很清晰,大概有幾個功能點:
- 同樣要求
seata.enabled = true
- 另外會掃描"io.seata.spring.boot.autoconfigure.properties"
- 會提供一個
SpringApplicationContextProvider
,基于 Spring 的程序中,很常見這種方式,用于提供ApplicationContext
,通常用于方便獲取 bean 和添加 bean 等。
public class SpringApplicationContextProvider implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { System.setProperty("file.listener.enabled", "false"); ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT, applicationContext); } }
2.2 功能-注入事務執(zhí)行失敗處理器FailureHandler
注入一個FailureHandler
,其默認實現(xiàn)DefaultFailureHandlerImpl
中只會打印錯誤日志,建議重寫,異常發(fā)生時及時告知使用者。
2.3 功能-構建全局事務掃描器GlobalTransactionScanner
構建全局事務掃描器GlobalTransactionScanner
,注入到容器中,其內(nèi)部做 2 件事情
- 會初始化
TM
、RM
客戶端 - 掃描
Bean
,對添加了全局事務注解的類(@GlobalTransactional
、@GlobalLock
、@TwoPhaseBusinessAction
)生成代理對象,做AOP
增強,添加對應的攔截器(攔截器內(nèi)補充分布式事務的能力)
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
Spring 中 Bean 的關鍵初始化過程:
實例化 -> 屬性注入 -> postProcessBeforeInitialization -> afterPropertiesSet/init 方法 -> postProcessAfterInitialization
從以下堆棧可以看出,AbstractAutoProxyCreator
在 bean 初始化完成之后創(chuàng)建它的代理 AOP 代理,通過wrapIfNecessary
判斷是否該 bean 是否存在全局事務注解,如果有則需要增強,添加相應的攔截器。
wrapIfNecessary:269, GlobalTransactionScanner (io.seata.spring.annotation) postProcessAfterInitialization:293, AbstractAutoProxyCreator (org.springframework.aop.framework.autoproxy) applyBeanPostProcessorsAfterInitialization:455, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support) initializeBean:1808, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support) doCreateBean:620, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
先看一個關鍵方法existsAnnotation
,當掃描到 bean 之后,獲取 bean 的原始類型,然后通過此方法原始類型的類或者方法中是否有@GlobalTransactional
或@GlobalLock
注解
private boolean existsAnnotation(Class<?>[] classes) { if (CollectionUtils.isNotEmpty(classes)) { for (Class<?> clazz : classes) { if (clazz == null) { continue; } //判斷類上是否有注解@GlobalTransactional GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class); if (trxAnno != null) { return true; } Method[] methods = clazz.getMethods(); for (Method method : methods) { //判斷方法上是否有注解@GlobalTransactional trxAnno = method.getAnnotation(GlobalTransactional.class); if (trxAnno != null) { return true; } //判斷方法上是否有注解@GlobalLock GlobalLock lockAnno = method.getAnnotation(GlobalLock.class); if (lockAnno != null) { return true; } } } } return false; }
wrapIfNecessary
方法中實現(xiàn)了事務注解識別的核心邏輯:
/** * The following will be scanned, and added corresponding interceptor: * * TM: * @see io.seata.spring.annotation.GlobalTransactional // TM annotation * Corresponding interceptor: * @see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction(MethodInvocation, AspectTransactional) // TM handler * * GlobalLock: * @see io.seata.spring.annotation.GlobalLock // GlobalLock annotation * Corresponding interceptor: * @see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalLock(MethodInvocation, GlobalLock) // GlobalLock handler * * TCC mode: * @see io.seata.rm.tcc.api.LocalTCC // TCC annotation on interface * @see io.seata.rm.tcc.api.TwoPhaseBusinessAction // TCC annotation on try method * @see io.seata.rm.tcc.remoting.RemotingParser // Remote TCC service parser * Corresponding interceptor: * @see io.seata.spring.tcc.TccActionInterceptor // the interceptor of TCC mode */ @Override protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { // do checkers // seata提供的有擴展邏輯用于輔助判斷是否需要增強 if (!doCheckers(bean, beanName)) { return bean; } try { synchronized (PROXYED_SET) { //如果已被代理,則跳過該Bean ,PROXYED_SET是一個Set<String> 集合 if (PROXYED_SET.contains(beanName)) { return bean; } interceptor = null; //check TCC proxy //判斷是否TCC模式,如果是TCC,則添加TCC 攔截器 if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { // init tcc fence clean task if enable useTccFence TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext); //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor); } else { //非TCC模式 // 查詢Bean的 Class 類型 Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean); Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean); //判斷是否有GlobalTransactional或者GlobalLock注解,如果沒有就不會代理,直接返回bean if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) { return bean; } // 初始化GlobalTransactionalInterceptor if (globalTransactionalInterceptor == null) { globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook); //添加 禁用全局事務的監(jiān)聽器 ConfigurationCache.addConfigListener( ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor); } interceptor = globalTransactionalInterceptor; } LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName()); // 判斷是否已經(jīng)是AOP代理類,如果不是,則執(zhí)行父類的wrapIfNecessary if (!AopUtils.isAopProxy(bean)) { bean = super.wrapIfNecessary(bean, beanName, cacheKey); } else { // 給代理對象添加攔截器 AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean); Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null)); int pos; for (Advisor avr : advisor) { // Find the position based on the advisor's order, and add to advisors by pos pos = findAddSeataAdvisorPosition(advised, avr); advised.addAdvisor(pos, avr); } } //標識該bean已經(jīng)代理過,本方法入口處有判斷 PROXYED_SET.add(beanName); return bean; } } catch (Exception exx) { throw new RuntimeException(exx); } }
Seata 提供的有擴展邏輯用于輔助判斷是否需要增強,這里的擴展點有好幾個,這些擴展是用于安全保障,使用者可以按需采用。
三、小結:
本篇梳理了引入seata-spring-boot-starter
模塊后,其內(nèi)部會通過的自動裝配機制會在SeataAutoConfiguration
類中,掃描具有@GlobalTransactional
全局事務注解的類和方法的 bean,并對這類 bean 添加GlobalTransactionalInterceptor
,進行 AOP 增強,加入分布式事務的能力,增強后的功能復雜,下篇繼續(xù),更多關于全局事務注解@GlobalTransactional的資料請關注腳本之家其它相關文章!
相關文章
為什么wait和notify必須放在synchronized中使用
這篇文章主要介紹了為什么wait和notify必須放在synchronized中使用,文章圍繞主題的相關問題展開詳細介紹,具有一定的參考價值,需要的小伙伴可以參考以參考一下2022-05-05jdk中keytool的使用以及如何提取jks文件中的公鑰和私鑰
JKS文件由公鑰和密鑰構成利用Java?Keytool工具生成的文件,它是由公鑰和密鑰構成的,下面這篇文章主要給大家介紹了關于jdk中keytool的使用以及如何提取jks文件中公鑰和私鑰的相關資料,需要的朋友可以參考下2024-03-03使用多個servlet時Spring security需要指明路由匹配策略問題
這篇文章主要介紹了使用多個servlet時Spring security需要指明路由匹配策略問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-08-08Java使用Maven BOM統(tǒng)一管理版本號的實現(xiàn)
這篇文章主要介紹了Java使用Maven BOM統(tǒng)一管理版本號的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-04-04