springboot一個自定義注解如何搞定多線程事務
springboot多線程(聲明式)的使用方法?
1、springboot提供了注解@Async來使用線程池,具體使用方法如下:
(1) 在啟動類(配置類)添加@EnableAsync來開啟線程池
(2) 在需要開啟子線程的方法上添加注解@Async
所以使用時需要配置自定義線程池,如下:
@Configuration @EnableAsync public class ThreadPoolTaskConfig { @Bean("threadPoolTaskExecutor")//自定義線程池名稱 public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //線程池創(chuàng)建的核心線程數,線程池維護線程的最少數量,即使沒有任務需要執(zhí)行,也會一直存活 executor.setCorePoolSize(16); //如果設置allowCoreThreadTimeout=true(默認false)時,核心線程會超時關閉 //executor.setAllowCoreThreadTimeOut(true); //阻塞隊列 當核心線程數達到最大時,新任務會放在隊列中排隊等待執(zhí)行 executor.setQueueCapacity(124); //最大線程池數量,當線程數>=corePoolSize,且任務隊列已滿時。線程池會創(chuàng)建新線程來處理任務 //任務隊列已滿時, 且當線程數=maxPoolSize,,線程池會拒絕處理任務而拋出異常 executor.setMaxPoolSize(64); //當線程空閑時間達到keepAliveTime時,線程會退出,直到線程數量=corePoolSize //允許線程空閑時間30秒,當maxPoolSize的線程在空閑時間到達的時候銷毀 //如果allowCoreThreadTimeout=true,則會直到線程數量=0 executor.setKeepAliveSeconds(30); //spring 提供的 ThreadPoolTaskExecutor 線程池,是有setThreadNamePrefix() 方法的。 //jdk 提供的ThreadPoolExecutor 線程池是沒有 setThreadNamePrefix() 方法的 executor.setThreadNamePrefix("自定義線程池-"); // rejection-policy:拒絕策略:當線程數已經達到maxSize的時候,如何處理新任務 // CallerRunsPolicy():交由調用方線程運行,比如 main 線程;如果添加到線程池失敗,那么主線程會自己去執(zhí)行該任務,不會等待線程池中的線程去執(zhí)行, (個人推薦) // AbortPolicy():該策略是線程池的默認策略,如果線程池隊列滿了丟掉這個任務并且拋出RejectedExecutionException異常。 // DiscardPolicy():如果線程池隊列滿了,會直接丟掉這個任務并且不會有任何異常 // DiscardOldestPolicy():丟棄隊列中最老的任務,隊列滿了,會將最早進入隊列的任務刪掉騰出空間,再嘗試加入隊列 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //設置線程池關閉的時候等待所有任務都完成再繼續(xù)銷毀其他的Bean,這樣這些異步任務的銷毀就會先于Redis線程池的銷毀 executor.setWaitForTasksToCompleteOnShutdown(true); //設置線程池中任務的等待時間,如果超過這個時候還沒有銷毀就強制銷毀,以確保應用最后能夠被關閉,而不是阻塞住。 executor.setAwaitTerminationSeconds(60); executor.initialize(); return executor; } }
開啟子線程方法: 在需要開啟線程的方法上添加 注解@Async("threadPoolTaskExecutor")即可,其中注解中的參數為自定義線程池的名稱。
二、自定義注解實現多線程事務控制
1.自定義注解
本文是使用了兩個注解共同作用實現的,主線程當做協(xié)調者,各子線程作為參與者
package com.example.anno; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * 多線程事務注解: 主事務 * * @author zlj * @since 2022/11/3 */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface MainTransaction { int value();//子線程數量 } package com.example.anno; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * 多線程事務注解: 子事務 * * @author zlj * @since 2022/11/3 */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface SonTransaction { String value() default ""; }
解釋:
兩個注解都是用在方法上的,須配合@Transactional(rollbackFor = Exception.class)一起使用
- @MainTransaction注解 用在調用方,其參數為必填,參數值為本方法中調用的方法開啟的線程數,如:在這個方法中調用的方法中有2個方法用@Async注解開啟了子線程,則參數為@MainTransaction(2),另外如果未使用@MainTransaction注解,則直接已無多線程事務執(zhí)行(不影響方法的單線程事務)
- @SonTransaction注解 用在被調用方(開啟線程的方法),無需傳入參數
2.AOP內容
代碼如下:
package com.example.aop; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.example.anno.MainTransaction; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.springframework.stereotype.Component; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.DefaultTransactionDefinition; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; import java.util.Vector; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; /** * 多線程事務 * * @author zlj * @since 2022/11/3 */ @Aspect @Component public class TransactionAop { //用來存儲各線程計數器數據(每次執(zhí)行后會從map中刪除) private static final Map<String, Object> map = new HashMap<>(); @Resource private PlatformTransactionManager transactionManager; @Around("@annotation(mainTransaction)") public void mainIntercept(ProceedingJoinPoint joinPoint, MainTransaction mainTransaction) throws Throwable { //當前線程名稱 Thread thread = Thread.currentThread(); String threadName = thread.getName(); //初始化計數器 CountDownLatch mainDownLatch = new CountDownLatch(1); CountDownLatch sonDownLatch = new CountDownLatch(mainTransaction.value());//@MainTransaction注解中的參數, 為子線程的數量 // 用來記錄子線程的運行狀態(tài),只要有一個失敗就變?yōu)閠rue AtomicBoolean rollBackFlag = new AtomicBoolean(false); // 用來存每個子線程的異常,把每個線程的自定義異常向vector的首位置插入,其余異常向末位置插入,避免線程不安全,所以使用vector代替list Vector<Throwable> exceptionVector = new Vector<>(); map.put(threadName + "mainDownLatch", mainDownLatch); map.put(threadName + "sonDownLatch", sonDownLatch); map.put(threadName + "rollBackFlag", rollBackFlag); map.put(threadName + "exceptionVector", exceptionVector); try { joinPoint.proceed();//執(zhí)行方法 } catch (Throwable e) { exceptionVector.add(0, e); rollBackFlag.set(true);//子線程回滾 mainDownLatch.countDown();//放行所有子線程 } if (!rollBackFlag.get()) { try { // sonDownLatch等待,直到所有子線程執(zhí)行完插入操作,但此時還沒有提交事務 sonDownLatch.await(); mainDownLatch.countDown();// 根據rollBackFlag狀態(tài)放行子線程的await處,告知是回滾還是提交 } catch (Exception e) { rollBackFlag.set(true); exceptionVector.add(0, e); } } if (CollectionUtils.isNotEmpty(exceptionVector)) { map.remove(threadName + "mainDownLatch"); map.remove(threadName + "sonDownLatch"); map.remove(threadName + "rollBackFlag"); map.remove(threadName + "exceptionVector"); throw exceptionVector.get(0); } } @Around("@annotation(com.huigu.common.anno.SonTransaction)") public void sonIntercept(ProceedingJoinPoint joinPoint) throws Throwable { Object[] args = joinPoint.getArgs(); Thread thread = (Thread) args[args.length - 1]; String threadName = thread.getName(); CountDownLatch mainDownLatch = (CountDownLatch) map.get(threadName + "mainDownLatch"); if (mainDownLatch == null) { //主事務未加注解時, 直接執(zhí)行子事務 joinPoint.proceed();//這里最好的方式是:交由上面的thread來調用此方法,但我沒有找尋到對應api,只能直接放棄事務, 歡迎大神來優(yōu)化, 留言分享 return; } CountDownLatch sonDownLatch = (CountDownLatch) map.get(threadName + "sonDownLatch"); AtomicBoolean rollBackFlag = (AtomicBoolean) map.get(threadName + "rollBackFlag"); Vector<Throwable> exceptionVector = (Vector<Throwable>) map.get(threadName + "exceptionVector"); //如果這時有一個子線程已經出錯,那當前線程不需要執(zhí)行 if (rollBackFlag.get()) { sonDownLatch.countDown(); return; } DefaultTransactionDefinition def = new DefaultTransactionDefinition();// 開啟事務 def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);// 設置事務隔離級別 TransactionStatus status = transactionManager.getTransaction(def); try { joinPoint.proceed();//執(zhí)行方法 sonDownLatch.countDown();// 對sonDownLatch-1 mainDownLatch.await();// 如果mainDownLatch不是0,線程會在此阻塞,直到mainDownLatch變?yōu)? // 如果能執(zhí)行到這一步說明所有子線程都已經執(zhí)行完畢判斷如果atomicBoolean是true就回滾false就提交 if (rollBackFlag.get()) { transactionManager.rollback(status); } else { transactionManager.commit(status); } } catch (Throwable e) { exceptionVector.add(0, e); // 回滾 transactionManager.rollback(status); // 并把狀態(tài)設置為true rollBackFlag.set(true); mainDownLatch.countDown(); sonDownLatch.countDown(); } } }
擴展說明: CountDownLatch是什么?
一個同步輔助類
- 創(chuàng)建對象時: 用給定的數字初始化 CountDownLatch
- countDown() 方法: 使計數減1
- await() 方法: 阻塞當前線程, 直至當前計數到達零。
本文中:
用 計數 1 初始化的 mainDownLatch 當作一個簡單的開/關鎖存器,或入口:在通過調用 countDown() 的線程打開入口前,所有調用 await 的線程都一直在入口處等待。
用 子線程數量 初始化的 sonDownLatch 可以使一個線程在 N 個線程完成某項操作之前一直等待,或者使其在某項操作完成 N 次之前一直等待。
3.注解使用Demo
任務方法:
package com.example.demo.service; import com.example.demo.anno.SonTransaction; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; /** * @author zlj * @since 2022/11/14 */ @Service public class SonService { /** * 參數說明: 以下4個方法參數和此相同 * * @param args 業(yè)務中需要傳遞的參數 * @param thread 調用者的線程, 用于aop獲取參數, 不建議以方法重寫的方式簡略此參數, * 在調用者方法中可以以此參數為標識計算子線程的個數作為注解參數,避免線程參數計算錯誤導致鎖表 * 傳參時參數固定為: Thread.currentThread() */ @Transactional(rollbackFor = Exception.class) @Async("threadPoolTaskExecutor") @SonTransaction public void sonMethod1(String args, Thread thread) { System.out.println(args + "開啟了線程"); } @Transactional(rollbackFor = Exception.class) @Async("threadPoolTaskExecutor") @SonTransaction public void sonMethod2(String args1, String args2, Thread thread) { System.out.println(args1 + "和" + args2 + "開啟了線程"); } @Transactional(rollbackFor = Exception.class) @Async("threadPoolTaskExecutor") @SonTransaction public void sonMethod3(String args, Thread thread) { System.out.println(args + "開啟了線程"); } //sonMethod4方法沒有使用線程池 @Transactional(rollbackFor = Exception.class) public void sonMethod4(String args) { System.out.println(args + "沒有開啟線程"); } }
調用方:
package com.example.demo.service; import com.example.demo.anno.MainTransaction; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; /** * @author zlj * @since 2022/11/14 */ @Service public class MainService { @Resource private SonService sonService; @MainTransaction(3)//調用的方法中sonMethod1/sonMethod2/sonMethod3使用@Async開啟了線程, 所以參數為: 3 @Transactional(rollbackFor = Exception.class) public void test1() { sonService.sonMethod1("路飛", Thread.currentThread()); sonService.sonMethod2("索隆", "山治", Thread.currentThread()); sonService.sonMethod3("娜美", Thread.currentThread()); sonService.sonMethod4("羅賓"); } /* * 有的業(yè)務中存在if的多種可能, 每一種走向調用的方法(開啟線程的方法)數量如果不同, 這時可以選擇放棄使用@MainTransaction注解避免鎖表 * 這時候如果發(fā)生異常會導致多線程不能同時回滾, 可根據業(yè)務自己權衡是否使用 */ @Transactional(rollbackFor = Exception.class) public void test2() { sonService.sonMethod1("路飛", Thread.currentThread()); sonService.sonMethod2("索隆", "山治", Thread.currentThread()); sonService.sonMethod3("娜美", Thread.currentThread()); sonService.sonMethod4("羅賓"); } }
總結
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
Spring實戰(zhàn)之使用注解實現聲明式事務操作示例
這篇文章主要介紹了Spring實戰(zhàn)之使用注解實現聲明式事務操作,結合實例形式詳細分析了spring使用注解實現聲明式事務相關配置、接口實現與使用技巧,需要的朋友可以參考下2020-01-01Java?springBoot初步使用websocket的代碼示例
這篇文章主要介紹了Java?springBoot初步使用websocket的相關資料,WebSocket是一種實現實時雙向通信的協(xié)議,適用于需要實時通信的應用程序,文中通過代碼介紹的非常詳細,需要的朋友可以參考下2025-03-03關于IDEA2020.1新建項目maven PKIX 報錯問題解決方法
這篇文章主要介紹了關于IDEA2020.1新建項目maven PKIX 報錯問題解決方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-06-06