SpringBoot中實(shí)現(xiàn)異步調(diào)用@Async詳解
為什么要用異步框架,它解決什么問(wèn)題?
在SpringBoot的日常開發(fā)中,一般都是同步調(diào)用的。但實(shí)際中有很多場(chǎng)景非常適合使用異步來(lái)處理,如:注冊(cè)新用戶,送100個(gè)積分;或下單成功,發(fā)送push消息等等。
就拿注冊(cè)新用戶這個(gè)用例來(lái)說(shuō),為什么要異步處理?
- 第一個(gè)原因:容錯(cuò)性、健壯性,如果送積分出現(xiàn)異常,不能因?yàn)樗头e分而導(dǎo)致用戶注冊(cè)失??;因?yàn)橛脩糇?cè)是主要功能,送積分是次要功能,即使送積分異常也要提示用戶注冊(cè)成功,然后后面在針對(duì)積分異常做補(bǔ)償處理。
- 第二個(gè)原因:提升性能,例如注冊(cè)用戶花了20毫秒,送積分花費(fèi)50毫秒,如果用同步的話,總耗時(shí)70毫秒,用異步的話,無(wú)需等待積分,故耗時(shí)20毫秒。
故,異步能解決2個(gè)問(wèn)題,性能和容錯(cuò)性。
SpringBoot如何實(shí)現(xiàn)異步調(diào)用?
對(duì)于異步方法調(diào)用,從Spring3開始提供了@Async注解,我們只需要在方法上標(biāo)注此注解,此方法即可實(shí)現(xiàn)異步調(diào)用。
當(dāng)然,我們還需要一個(gè)配置類,通過(guò)Enable模塊驅(qū)動(dòng)注解@EnableAsync來(lái)開啟異步功能。
實(shí)現(xiàn)異步調(diào)用
第一步:新建配置類,開啟@Async功能支持
使用@EnableAsync來(lái)開啟異步任務(wù)支持,@EnableAsync注解可以直接放在SpringBoot啟動(dòng)類上,也可以單獨(dú)放在其他配置類上。我們這里選擇使用單獨(dú)的配置類AsyncConfiguration。
至于為什么使用線程池,后面會(huì)講到。
import lombok.extern.slf4j.Slf4j; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * 描述:異步配置 */ @Slf4j @Configuration @EnableAsync // 可放在啟動(dòng)類上或單獨(dú)的配置類 public class AsyncConfiguration implements AsyncConfigurer { @Bean(name = "asyncPoolTaskExecutor") public ThreadPoolTaskExecutor executor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //核心線程數(shù) taskExecutor.setCorePoolSize(10); //線程池維護(hù)線程的最大數(shù)量,只有在緩沖隊(duì)列滿了之后才會(huì)申請(qǐng)超過(guò)核心線程數(shù)的線程 taskExecutor.setMaxPoolSize(100); //緩存隊(duì)列 taskExecutor.setQueueCapacity(50); //設(shè)置線程的空閑時(shí)間,當(dāng)超過(guò)了核心線程出之外的線程在空閑時(shí)間到達(dá)之后會(huì)被銷毀 taskExecutor.setKeepAliveSeconds(200); //異步方法內(nèi)部線程名稱 taskExecutor.setThreadNamePrefix("async-"); /** * 當(dāng)線程池的任務(wù)緩存隊(duì)列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來(lái)就會(huì)采取任務(wù)拒絕策略 * 通常有以下四種策略: * ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 * ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。 * ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程) * ThreadPoolExecutor.CallerRunsPolicy:重試添加當(dāng)前的任務(wù),自動(dòng)重復(fù)調(diào)用 execute() 方法,直到成功 */ taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.initialize(); return taskExecutor; } /** * 指定默認(rèn)線程池 * The {@link Executor} instance to be used when processing async method invocations. */ @Override public Executor getAsyncExecutor() { return executor(); } /** * The {@link AsyncUncaughtExceptionHandler} instance to be used * when an exception is thrown during an asynchronous method execution * with {@code void} return type. */ @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> log.error("線程池執(zhí)行任務(wù)發(fā)送未知錯(cuò)誤, 執(zhí)行方法:{}", method.getName(), ex); } }
第二步:在方法上標(biāo)記異步調(diào)用
增加一個(gè)Component類,用來(lái)進(jìn)行業(yè)務(wù)處理,同時(shí)添加@Async注解,代表該方法為異步處理。
import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; /** * 描述:異步方法調(diào)用 */ @Slf4j @Component public class AsyncTask { @SneakyThrows @Async // 在異步配置類設(shè)置了默認(rèn)線程池則不需要再指定線程池名稱 // @Async("asyncPoolTaskExecutor") public void doTask1() { long t1 = System.currentTimeMillis(); Thread.sleep(2000); long t2 = System.currentTimeMillis(); log.info("task1方法耗時(shí) {} ms" , t2-t1); } @SneakyThrows @Async // @Async("otherPoolTaskExecutor") // 其他線程池的名稱 public void doTask2() { long t1 = System.currentTimeMillis(); Thread.sleep(3000); long t2 = System.currentTimeMillis(); log.info("task2方法耗時(shí) {} ms" , t2-t1); } }
第三步:在Controller中進(jìn)行異步方法調(diào)用
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @Slf4j @RestController @RequestMapping("/async") public class AsyncController { @Autowired private AsyncTask asyncTask; @RequestMapping("/task") public void task() throws InterruptedException { long t1 = System.currentTimeMillis(); asyncTask.doTask1(); asyncTask.doTask2(); Thread.sleep(1000); long t2 = System.currentTimeMillis(); log.info("main方法耗時(shí){} ms", t2-t1); } }
通過(guò)訪問(wèn)//localhost:8080/async/task查看控制臺(tái)日志:
[2021-12-01 20:21:36.036] [INFO] [http-nio-8080-exec-1] - task(AsyncController.java:27) - main方法耗時(shí) 1009 ms
[2021-12-01 20:21:37.037] [INFO] [async-1] - doTask1(AsyncTask.java:23) - task1方法耗時(shí) 2004 ms
[2021-12-01 20:21:38.038] [INFO] [async-2] - doTask2(AsyncTask.java:32) - task2方法耗時(shí) 3003 ms
通過(guò)日志可以看到:主線程不需要等待異步方法執(zhí)行完成,減少了響應(yīng)時(shí)間,提高了接口性能。
通過(guò)上面三步我們就可以在SpringBoot中使用異步方法來(lái)提高我們接口性能了。
為什么要給@Async自定義線程池?
使用@Async注解,在默認(rèn)情況下用的是SimpleAsyncTaskExecutor線程池,該線程池不是真正意義上的線程池。
使用此線程池?zé)o法實(shí)現(xiàn)線程重用,每次調(diào)用都會(huì)新建一條線程。若系統(tǒng)中不斷的創(chuàng)建線程,最終會(huì)導(dǎo)致系統(tǒng)占用內(nèi)存過(guò)高,引發(fā)OutOfMemoryError錯(cuò)誤,關(guān)鍵代碼如下:
public void execute(Runnable task, long startTimeout) { Assert.notNull(task, "Runnable must not be null"); Runnable taskToUse = this.taskDecorator != null ? this.taskDecorator.decorate(task) : task; //判斷是否開啟限流,默認(rèn)為否 if (this.isThrottleActive() && startTimeout > 0L) { //執(zhí)行前置操作,進(jìn)行限流 this.concurrencyThrottle.beforeAccess(); this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(taskToUse)); } else { //未限流的情況,執(zhí)行線程任務(wù) this.doExecute(taskToUse); } } protected void doExecute(Runnable task) { //不斷創(chuàng)建線程 Thread thread = this.threadFactory != null ? this.threadFactory.newThread(task) : this.createThread(task); thread.start(); } //創(chuàng)建線程 public Thread createThread(Runnable runnable) { //指定線程名,task-1,task-2... Thread thread = new Thread(this.getThreadGroup(), runnable, this.nextThreadName()); thread.setPriority(this.getThreadPriority()); thread.setDaemon(this.isDaemon()); return thread; }
我們也可以直接通過(guò)上面的控制臺(tái)日志觀察,每次打印的線程名都是[task-1]、[task-2]、[task-3]、[task-4]…遞增的。
正因如此,所以我們?cè)谑褂肧pring中的@Async異步框架時(shí)一定要自定義線程池,替代默認(rèn)的SimpleAsyncTaskExecutor。
Spring提供了多種線程池
- SimpleAsyncTaskExecutor:不是真的線程池,這個(gè)類不重用線程,每次調(diào)用都會(huì)創(chuàng)建一個(gè)新的線程。
- SyncTaskExecutor:這個(gè)類沒(méi)有實(shí)現(xiàn)異步調(diào)用,只是一個(gè)同步操作。只適用于不需要多線程的地
- ConcurrentTaskExecutor:Executor的適配類,不推薦使用。如果ThreadPoolTaskExecutor不滿足要求時(shí),才用考慮使用這個(gè)類
- ThreadPoolTaskScheduler:可以使用cron表達(dá)式
- ThreadPoolTaskExecutor:最常使用,推薦。其實(shí)質(zhì)是對(duì)java.util.concurrent.ThreadPoolExecutor的包裝
為@Async實(shí)現(xiàn)一個(gè)自定義線程池
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; /** * 描述:異步配置2 */ @Configuration @EnableAsync // 可放在啟動(dòng)類上或單獨(dú)的配置類 public class AsyncConfiguration2 { @Bean(name = "asyncPoolTaskExecutor") public ThreadPoolTaskExecutor executor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //核心線程數(shù) taskExecutor.setCorePoolSize(10); //線程池維護(hù)線程的最大數(shù)量,只有在緩沖隊(duì)列滿了之后才會(huì)申請(qǐng)超過(guò)核心線程數(shù)的線程 taskExecutor.setMaxPoolSize(100); //緩存隊(duì)列 taskExecutor.setQueueCapacity(50); //許的空閑時(shí)間,當(dāng)超過(guò)了核心線程出之外的線程在空閑時(shí)間到達(dá)之后會(huì)被銷毀 taskExecutor.setKeepAliveSeconds(200); //異步方法內(nèi)部線程名稱 taskExecutor.setThreadNamePrefix("async-"); /** * 當(dāng)線程池的任務(wù)緩存隊(duì)列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來(lái)就會(huì)采取任務(wù)拒絕策略 * 通常有以下四種策略: * ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 * ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。 * ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程) * ThreadPoolExecutor.CallerRunsPolicy:重試添加當(dāng)前的任務(wù),自動(dòng)重復(fù)調(diào)用 execute() 方法,直到成功 */ taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.initialize(); return taskExecutor; } }
配置自定義線程池以后我們就可以大膽的使用@Async提供的異步處理能力了。
多個(gè)線程池處理
在現(xiàn)實(shí)的互聯(lián)網(wǎng)項(xiàng)目開發(fā)中,針對(duì)高并發(fā)的請(qǐng)求,一般的做法是高并發(fā)接口單獨(dú)線程池隔離處理。
假設(shè)現(xiàn)在2個(gè)高并發(fā)接口:一個(gè)是修改用戶信息接口,刷新用戶redis緩存;一個(gè)是下訂單接口,發(fā)送app push信息。往往會(huì)根據(jù)接口特征定義兩個(gè)線程池,這時(shí)候我們?cè)谑褂聾Async時(shí)就需要通過(guò)指定線程池名稱進(jìn)行區(qū)分。
為@Async指定線程池名字
@SneakyThrows @Async("asyncPoolTaskExecutor") public void doTask1() { long t1 = System.currentTimeMillis(); Thread.sleep(2000); long t2 = System.currentTimeMillis(); log.info("task1方法耗時(shí) {} ms" , t2-t1); }
當(dāng)系統(tǒng)存在多個(gè)線程池時(shí),我們也可以配置一個(gè)默認(rèn)線程池,對(duì)于非默認(rèn)的異步任務(wù)再通過(guò)@Async(“otherTaskExecutor”)來(lái)指定線程池名稱。
配置默認(rèn)線程池
可以修改配置類讓其實(shí)現(xiàn)AsyncConfigurer,并重寫getAsyncExecutor()方法,指定默認(rèn)線程池:
import lombok.extern.slf4j.Slf4j; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * 描述:異步配置 */ @Slf4j @Configuration @EnableAsync // 可放在啟動(dòng)類上或單獨(dú)的配置類 public class AsyncConfiguration implements AsyncConfigurer { @Bean(name = "asyncPoolTaskExecutor") public ThreadPoolTaskExecutor executor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //核心線程數(shù) taskExecutor.setCorePoolSize(10); //線程池維護(hù)線程的最大數(shù)量,只有在緩沖隊(duì)列滿了之后才會(huì)申請(qǐng)超過(guò)核心線程數(shù)的線程 taskExecutor.setMaxPoolSize(100); //緩存隊(duì)列 taskExecutor.setQueueCapacity(50); //設(shè)置線程的空閑時(shí)間,當(dāng)超過(guò)了核心線程出之外的線程在空閑時(shí)間到達(dá)之后會(huì)被銷毀 taskExecutor.setKeepAliveSeconds(200); //異步方法內(nèi)部線程名稱 taskExecutor.setThreadNamePrefix("async-"); /** * 當(dāng)線程池的任務(wù)緩存隊(duì)列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來(lái)就會(huì)采取任務(wù)拒絕策略 * 通常有以下四種策略: * ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 * ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。 * ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程) * ThreadPoolExecutor.CallerRunsPolicy:重試添加當(dāng)前的任務(wù),自動(dòng)重復(fù)調(diào)用 execute() 方法,直到成功 */ taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.initialize(); return taskExecutor; } /** * 指定默認(rèn)線程池 * The {@link Executor} instance to be used when processing async method invocations. */ @Override public Executor getAsyncExecutor() { return executor(); } /** * The {@link AsyncUncaughtExceptionHandler} instance to be used * when an exception is thrown during an asynchronous method execution * with {@code void} return type. */ @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> log.error("線程池執(zhí)行任務(wù)發(fā)送未知錯(cuò)誤, 執(zhí)行方法:{}", method.getName(), ex); } }
如下,doTask1()方法使用默認(rèn)使用線程池asyncPoolTaskExecutor,doTask2()使用線程池otherTaskExecutor,非常靈活。
@SneakyThrows @Async public void doTask1() { long t1 = System.currentTimeMillis(); Thread.sleep(2000); long t2 = System.currentTimeMillis(); log.info("task1方法耗時(shí) {} ms" , t2-t1); } @SneakyThrows @Async("otherTaskExecutor") public void doTask2() { long t1 = System.currentTimeMillis(); Thread.sleep(3000); long t2 = System.currentTimeMillis(); log.info("task2方法耗時(shí) {} ms" , t2-t1); }
使用@Async注解可能會(huì)導(dǎo)致的問(wèn)題
如果serviceA、serviceB對(duì)象之間相互依賴,serviceA和serviceB總一個(gè)一個(gè)會(huì)先實(shí)例化,而serviceA或serviceB里面使用了@Async注解,會(huì)導(dǎo)致循環(huán)依賴異常:org.springframework.beans.factory.BeanCurrentlyInCreationException
在springboot中,以上報(bào)錯(cuò)被捕捉,拋出的異常是: The dependencies of some of the beans in the application context form a cycle
原因
我們知道,spring三級(jí)緩存一定程度上解決了循環(huán)依賴問(wèn)題。A對(duì)象在實(shí)例化之后,屬性賦值【opulateBean(beanName, mbd, instanceWrapper)】執(zhí)行之前,將ObjectFactory添加至三級(jí)緩存中,從而使得在B對(duì)象實(shí)例化后的屬性賦值過(guò)程中,能從三級(jí)緩存拿到ObjectFactory,調(diào)用getObject()方法拿到A的引用,B由此能順利完成初始化并加入到IOC容器。此時(shí)A對(duì)象完成屬性賦值之后,將會(huì)執(zhí)行初始化【initializeBean(beanName, exposedObject, mbd)方法】,重點(diǎn)是@Async注解的處理正是在這地方完成的,其對(duì)應(yīng)的后置處理器AsyncAnnotationBeanPostProcessor,在postProcessAfterInitialization方法中將返回代理對(duì)象,此代理對(duì)象與B中持有的A對(duì)象引用不同,導(dǎo)致了以上報(bào)錯(cuò)。
解決辦法
1.在A類上加@Lazy,保證A對(duì)象實(shí)例化晚于B對(duì)象
2.不使用@Async注解,通過(guò)自定義異步工具類發(fā)起異步線程(線程池)
3.不要讓@Async的Bean參與循環(huán)依賴
到此這篇關(guān)于SpringBoot中實(shí)現(xiàn)異步調(diào)用@Async詳解的文章就介紹到這了,更多相關(guān)SpringBoot異步調(diào)用@Async內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
intellij idea中安裝、配置mybatis插件Free Mybatis plugin的教程詳解
這篇文章主要介紹了intellij idea中安裝、配置mybatis插件Free Mybatis plugin的教程,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-09-09Spring security用戶URL權(quán)限FilterSecurityInterceptor使用解析
這篇文章主要介紹了Spring security用戶URL權(quán)限FilterSecurityInterceptor使用解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12java鏈表數(shù)據(jù)結(jié)構(gòu)LinkedList插入刪除元素時(shí)間復(fù)雜度面試精講
這篇文章主要為大家介紹了java LinkedList插入和刪除元素的時(shí)間復(fù)雜度面試精講,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10SpringBoot整合Quartz及異步調(diào)用的案例
Quartz是一個(gè)完全由java編寫的開源作業(yè)調(diào)度框架、它的簡(jiǎn)單易用受到業(yè)內(nèi)人士的一致好評(píng),這篇文章主要介紹了SpringBoot整合Quartz及異步調(diào)用,需要的朋友可以參考下2023-03-03SpringBoot使用Sharding-JDBC實(shí)現(xiàn)數(shù)據(jù)分片和讀寫分離的方法
本文主要介紹了SpringBoot使用Sharding-JDBC實(shí)現(xiàn)數(shù)據(jù)分片和讀寫分離,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-10-10