spring event 事件異步處理方式(發(fā)布,監(jiān)聽,異步處理)
spring event 事件異步處理(發(fā)布,監(jiān)聽,異步處理)
// 定義事件
public class EventDemo extends ApplicationEvent {
private String supplierCode;
private String productCode;
public EventDemo(Object source, String supplierCode, String productCode) {
super(source);
this.supplierCode = supplierCode;
this.productCode = productCode;
}
public String getSupplierCode() {
return supplierCode;
}
public String getProductCode() {
return productCode;
}
}// 發(fā)布事件
@Component
public class EventDemoPublish {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public void publish(String message) {
EventDemo demo = new EventDemo(this, message);
applicationEventPublisher.publishEvent(demo);
System.out.println("發(fā)布事件執(zhí)行結(jié)束");
}
}// 監(jiān)聽事件
@Component
public class EventDemoListener implements ApplicationListener<EventDemo> {
@Override
public void onApplicationEvent(EventDemo event) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("事件監(jiān)聽開始...... " + "商家編碼:" + event.getSupplierCode() + ",商品編碼:" + event.getProductCode());
}
}<!--定義事件異步處理-->
<bean id="commonTaskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 線程池維持處于Keep-alive狀態(tài)的線程數(shù)量。如果設(shè)置了allowCoreThreadTimeOut為true,該值可能為0。
并發(fā)線程數(shù),想達(dá)到真正的并發(fā)效果,最好對(duì)應(yīng)CPU的線程數(shù)及核心數(shù) -->
<property name="corePoolSize" value="2" />
<!-- 最大線程池容量 -->
<property name="maxPoolSize" value="2" />
<!-- 超過最大線程池容量后,允許的線程隊(duì)列數(shù) -->
<property name="queueCapacity" value="2" />
<!-- 線程池維護(hù)線程所允許的空閑時(shí)間 .單位毫秒,默認(rèn)為60s,超過這個(gè)時(shí)間后會(huì)將大于corePoolSize的線程關(guān)閉,保持corePoolSize的個(gè)數(shù) -->
<property name="keepAliveSeconds" value="1000" />
<!-- 允許核心線程超時(shí): false(默認(rèn)值)不允許超時(shí),哪怕空閑;true則使用keepAliveSeconds來控制等待超時(shí)時(shí)間,最終corePoolSize的個(gè)數(shù)可能為0 -->
<property name="allowCoreThreadTimeOut" value="true" />
<!-- 線程池對(duì)拒絕任務(wù)(無線程可用)的處理策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
<!-- java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy:主線程直接執(zhí)行該任務(wù),執(zhí)行完之后嘗試添加下一個(gè)任務(wù)到線程池中 -->
<!-- java.util.concurrent.ThreadPoolExecutor$AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常 -->
</property>
</bean>
<!--名字必須是applicationEventMulticaster,因?yàn)锳bstractApplicationContext默認(rèn)找個(gè)-->
<bean id="applicationEventMulticaster" class="org.springframework.context.event.SimpleApplicationEventMulticaster">
<!--注入任務(wù)執(zhí)行器 這樣就實(shí)現(xiàn)了異步調(diào)用-->
<property name="taskExecutor" ref="commonTaskExecutor"></property>
</bean>spring事件之異步線程執(zhí)行
Spring 不僅為我們提供了IOC , AOP功能外,還在這個(gè)基礎(chǔ)上提供了許多的功能,我們用的最多的可能就是Spring MVC了吧,但是讓我們來看下spring-context包,其中包含了緩存、調(diào)度、校驗(yàn)功能等等

這里主要想介紹一下Spring提供的觀察者模式實(shí)現(xiàn)(事件發(fā)布監(jiān)聽)及異步方法執(zhí)行,這些功能也都是基于AOP實(shí)現(xiàn)的
Spring 事件
觀察者模式大家都了解,它可以解耦各個(gè)功能,但是自己實(shí)現(xiàn)的話比較麻煩,Spring為我們提供了一種事件發(fā)布機(jī)制,可以按需要發(fā)布事件,之后由監(jiān)聽此事件的類或方法來執(zhí)行各自對(duì)應(yīng)的功能,代碼互相不影響,以后修改訂單后續(xù)的邏輯時(shí)不會(huì)影響到訂單創(chuàng)建,有點(diǎn)類似于使用MQ的感覺~
比如在配置中心apollo項(xiàng)目中,在portal創(chuàng)建了app后會(huì)發(fā)送app創(chuàng)建事件,監(jiān)聽此事件的邏輯處將此消息同步到各個(gè)環(huán)境的admin sevice中,大家有興趣可以看下相關(guān)代碼
現(xiàn)在我們來看看具體如何使用:假設(shè)一個(gè)下單場(chǎng)景,訂單創(chuàng)建成功后可能有一些后續(xù)邏輯要處理,但是和創(chuàng)建訂單本身沒有關(guān)系,此時(shí)就可以在創(chuàng)建訂單完成后,發(fā)送一個(gè)消息,又相應(yīng)部分的代碼進(jìn)行監(jiān)聽處理,避免代碼耦合到一起
首先創(chuàng)建對(duì)應(yīng)的事件
import org.springframework.context.ApplicationEvent;
public class CreatedOrderEvent extends ApplicationEvent {
private final String orderSn;
public CreatedOrderEvent(Object source, String orderSn) {
super(source);
this.orderSn = orderSn;
}
public String getOrderSn() {
return this.orderSn;
}
}現(xiàn)在還需要一個(gè)事件發(fā)布者和監(jiān)聽者,創(chuàng)建一下
發(fā)布
import org.springframework.context.ApplicationEventPublisher; private ApplicationEventPublisher applicationEventPublisher; applicationEventPublisher.publishEvent(new CreatedOrderEvent(this, orderSn));
監(jiān)聽的多種實(shí)現(xiàn)
1:注解實(shí)現(xiàn) @EventListener
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OrderEventListener {
@EventListener
public void orderEventListener(CreatedOrderEvent event) {
}
}2:代碼實(shí)現(xiàn)
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
@Slf4j
@Component
public class OrderEventListener implements ApplicationListener<CreatedOrderEvent> {
@Override
public void onApplicationEvent(CreatedOrderEvent event) {
}
}簡(jiǎn)單的事件發(fā)布就完成了,其中的其他復(fù)雜邏輯由Spring替我們處理了
這里我們要注意一點(diǎn):發(fā)布和監(jiān)聽后處理的邏輯是在一個(gè)線程中執(zhí)行的,不是異步執(zhí)行
異步方法
有時(shí)候我們?yōu)榱颂岣唔憫?yīng)速度,有些方法可以異步去執(zhí)行,一般情況下我們可能是手動(dòng)將方法調(diào)用提交到線程池中去執(zhí)行,但是Spring 為我們提供了簡(jiǎn)化的寫法,在開啟了異步情況下,不用修改代碼,只使用注解即可完成此功能
這時(shí)只需要在要異步執(zhí)行的方法上添加@Async注解即可異步執(zhí)行;@EnableAsync 啟動(dòng)異步線程, 如
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@EnableAsync
public class OrderEventListener {
@Async
@EventListener
public void orderEventListener(CreatedOrderEvent event) {
}
}在使用@Async會(huì)有一些問題建議看各位看下相關(guān)文檔及源碼
我們通過Spring事件同步線程改為異步線程,默認(rèn)的線程池是不復(fù)用線程
我覺得這是這個(gè)注解最坑的地方,沒有之一!我們來看看它默認(rèn)使用的線程池是哪個(gè),在前文的源碼分析中,我們可以看到?jīng)Q定要使用線程池的方法是
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor
其源碼如下:
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
// 可以在@Async注解中配置線程池的名字
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
// 獲取默認(rèn)的線程池
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}最終會(huì)調(diào)用到
org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor
這個(gè)方法中
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}可以看到,它默認(rèn)使用的線程池是SimpleAsyncTaskExecutor。我們不看這個(gè)類的源碼,只看它上面的文檔注釋,如下:

主要說了三點(diǎn)
- 為每個(gè)任務(wù)新起一個(gè)線程
- 默認(rèn)線程數(shù)不做限制
- 不復(fù)用線程
就這三點(diǎn),你還敢用嗎?只要你的任務(wù)耗時(shí)長(zhǎng)一點(diǎn),說不定服務(wù)器就給你來個(gè)OOM。
解決方案
最好的辦法就是使用自定義的線程池,主要有這么幾種配置方法
1.在之前的源碼分析中,我們可以知道,可以通過AsyncConfigurer來配置使用的線程池
如下:
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.lang.NonNull;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 異步線程池配置
*/
@Slf4j
@Component
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
MdcThreadPoolTaskExecutor executor = new MdcThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(200);
executor.setKeepAliveSeconds(5 * 60);
executor.setQueueCapacity(1000);
// 自定義實(shí)現(xiàn)拒絕策略
executor.setRejectedExecutionHandler((Runnable runnable, ThreadPoolExecutor exe) -> log.error("當(dāng)前任務(wù)線程池隊(duì)列已滿."));
// 或者選擇已經(jīng)定義好的其中一種拒絕策略
// 丟棄任務(wù)并拋出RejectedExecutionException異常
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 丟棄任務(wù),但是不拋出異常
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 由調(diào)用線程處理該任務(wù)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
// 線程名稱前綴
executor.setThreadNamePrefix("Async-");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> log.error("線程池執(zhí)行任務(wù)發(fā)生未知異常.", ex);
}
/**
* 增加日志MDC
*/
public static class MdcThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
/**
* Gets context for task *
*
* @return context for task
*/
private Map<String, String> getContextForTask() {
return MDC.getCopyOfContextMap();
}
/**
* All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code execute()} etc.)
* all delegate to this.
*/
@Override
public void execute(@NonNull Runnable command) {
super.execute(wrap(command, getContextForTask()));
}
/**
* All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
* all delegate to this.
*/
@NonNull
@Override
public Future<?> submit(@NonNull Runnable task) {
return super.submit(wrap(task, getContextForTask()));
}
/**
* All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
* all delegate to this.
*/
@NonNull
@Override
public <T> Future<T> submit(@NonNull Callable<T> task) {
return super.submit(wrap(task, getContextForTask()));
}
/**
* Wrap callable
*
* @param <T> parameter
* @param task task
* @param context context
* @return the callable
*/
private <T> Callable<T> wrap(final Callable<T> task, final Map<String, String> context) {
return () -> {
Map<String, String> previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
return task.call();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
};
}
/**
* Wrap runnable
*
* @param runnable runnable
* @param context context
* @return the runnable
*/
private Runnable wrap(final Runnable runnable, final Map<String, String> context) {
return () -> {
Map<String, String> previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
};
}
}
}該方式實(shí)現(xiàn)線程的復(fù)用以及,子線程繼承父線程全鏈路traceId,方便定位問題
2.直接在@Async注解中配置要使用的線程池的名稱
@Async(value = "自定義線程名")
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- SpringBoot使用ApplicationEvent&Listener完成業(yè)務(wù)解耦
- 基于Spring Boot應(yīng)用ApplicationEvent案例場(chǎng)景
- 詳解SpringBoot實(shí)現(xiàn)ApplicationEvent事件的監(jiān)聽與發(fā)布
- SpringBoot中ApplicationEvent和ApplicationListener用法小結(jié)
- SpringBoot Event 事件如何實(shí)現(xiàn)異步延遲執(zhí)行
- Event?Sourcing事件溯源模式優(yōu)化業(yè)務(wù)系統(tǒng)
相關(guān)文章
idea雙擊圖標(biāo)打不開,無反應(yīng)的解決
這篇文章主要介紹了idea雙擊圖標(biāo)打不開,無反應(yīng)的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09
微信小程序調(diào)用微信登陸獲取openid及java做為服務(wù)端示例
這篇文章主要介紹了微信小程序調(diào)用微信登陸獲取openid及java做為服務(wù)端示例,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-01-01
servlet3新特性_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要為大家詳細(xì)介紹了servlet3新特性的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-07-07
Spring Boot整合RabbitMQ開發(fā)實(shí)戰(zhàn)詳解
這篇文章主要介紹了Spring Boot整合RabbitMQ開發(fā)實(shí)戰(zhàn),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-02-02
Spring配置多個(gè)數(shù)據(jù)源并實(shí)現(xiàn)數(shù)據(jù)源的動(dòng)態(tài)切換功能
這篇文章主要介紹了Spring配置多個(gè)數(shù)據(jù)源并實(shí)現(xiàn)數(shù)據(jù)源的動(dòng)態(tài)切換功能,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2024-01-01
spring cloud 阿波羅 apollo 本地開發(fā)環(huán)境搭建過程
Apollo(阿波羅)是攜程框架部門研發(fā)的配置管理平臺(tái),能夠集中化管理應(yīng)用不同環(huán)境、不同集群的配置,配置修改后能夠?qū)崟r(shí)推送到應(yīng)用端,并且具備規(guī)范的權(quán)限、流程治理等特性2018-01-01
解決IDEA集成Docker插件后出現(xiàn)日志亂碼的問題
這篇文章主要介紹了解決IDEA集成Docker插件后出現(xiàn)日志亂碼的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-11-11

