SpringBoot Event 事件如何實現(xiàn)異步延遲執(zhí)行
SpringBoot Event 事件實現(xiàn)異步延遲執(zhí)行
Spring的事件(Application Event)非常好用,雖然有一點會出現(xiàn)代碼污染,但是在做不使用其他框架來做異步的情況先,還是非常方便的。
使用它只需要三樣?xùn)|西
- 自定義事件:繼承 ApplicationEvent,創(chuàng)建一個你想傳的數(shù)據(jù)的對象,會在監(jiān)聽器那邊收到該對象。
- 定義監(jiān)聽器,實現(xiàn) ApplicationListener 或者通過 @EventListener 注解到方法上,兩種方式都行,但是推薦使用@EventListener,只要參數(shù)是你寫的繼承ApplicationEvent的對象,就會自動找到執(zhí)行方法。
- 定義發(fā)布者,通過 ApplicationEventPublisher,自帶的bean,不需要單獨聲明,直接@Autowired就能使用,主要只需要publishEvent方法。
但是有時候我需要做延時執(zhí)行,自帶的功能缺不支持,但是我發(fā)現(xiàn)ApplicationEvent對象里面有兩個成員變量,source和timestamp,構(gòu)造函數(shù)(@since 5.3.8)也提供了同時注入這兩個變量數(shù)據(jù)。
? ?/**
?? ? * Create a new {@code ApplicationEvent} with its {@link #getTimestamp() timestamp}
?? ? * set to the value returned by {@link Clock#millis()} in the provided {@link Clock}.
?? ? * <p>This constructor is typically used in testing scenarios.
?? ? * @param source the object on which the event initially occurred or with
?? ? * which the event is associated (never {@code null})
?? ? * @param clock a clock which will provide the timestamp
?? ? * @since 5.3.8
?? ? * @see #ApplicationEvent(Object)
?? ? */
?? ?public ApplicationEvent(Object source, Clock clock) {
?? ??? ?super(source);
?? ??? ?this.timestamp = clock.millis();
?? ?}但是,看了說明timestamp只是標(biāo)志執(zhí)行的時間,并不是為了延遲執(zhí)行,可惜了。
于是查了一些資料,找到j(luò)ava.util.concurrent.DelayQueue對象,JDK自帶了延遲的隊列對象,我們可以考慮利用自帶的timestamp和延遲隊列DelayQueue結(jié)合一起來實現(xiàn),具體DelayQueue的使用請自行查詢,非常的簡單。
首先,繼承的ApplicationEvent重新實現(xiàn)一下。
不單單要繼承ApplicationEvent,還需要實現(xiàn)Delayed,主要是因為DelayQueue隊列中必須是Delayed的實現(xiàn)類
import java.time.Clock;
import java.time.Duration;
import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
?
import org.springframework.context.ApplicationEvent;
?
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
?
@Data
@EqualsAndHashCode(callSuper = false)
public class ApplicationDelayedEvent extends ApplicationEvent implements Delayed {
?
?? ?private static final long serialVersionUID = 1L;
?
?? ?public ApplicationDelayedEvent(Object source) {
?? ??? ?this(source, 0L);
?? ?}
?
?? ?public ApplicationDelayedEvent(Object source, long delaySeconds) {
?? ??? ?super(source, Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(delaySeconds)));
?? ?}
?
?? ?@Override
?? ?public int compareTo(Delayed o) {
?? ??? ?// 最好用NANOSECONDS,更精確,但是用處不大
?? ??? ?long delta = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
?? ??? ?return (int) delta;
?? ?}
?
?? ?@Override
?? ?public long getDelay(TimeUnit unit) {
?? ??? ?// 最好用NANOSECONDS,更精確,但是用處不大,負(fù)數(shù)也會認(rèn)為到時間了
?? ??? ?long millis = this.getTimestamp();
?? ??? ?long currentTimeMillis = System.currentTimeMillis();
?? ??? ?long sourceDuration = millis - currentTimeMillis;
?? ??? ?return unit.convert(sourceDuration, unit);
?? ?}
}多了兩個必須實現(xiàn)的方法,compareTo是排序,應(yīng)該是隊列中的順序。
getDelay是主要的方法,目的是歸0的時候會從DelayQueue釋放出來,當(dāng)然那必須是NANOSECONDS級別的,我使用MILLISECONDS,就會出現(xiàn)負(fù)數(shù),但也是可以的,也能釋放出來。
另一個需要改的就是發(fā)布者,所以重新寫一個ApplicationDelayEventPublisher
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.DelayQueue;
?
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
?
import lombok.extern.slf4j.Slf4j;
?
@Slf4j
@Component
public class ApplicationDelayEventPublisher implements ApplicationRunner {
?
?? ?// ApplicationDelayedEvent需要import進來
?? ?private DelayQueue<ApplicationDelayedEvent> delayQueue = new DelayQueue<>();
?
?? ?@Autowired
?? ?private ApplicationEventPublisher eventPublisher;
?
?? ?@Autowired
? ? @Qualifier("watchTaskExecutor")
?? ?private ThreadPoolTaskExecutor poolTaskExecutor;
?
?? ?public void publishEvent(ApplicationDelayedEvent event) {
?? ??? ?boolean result = delayQueue.offer(event);
?? ??? ?log.info("加入延遲隊列。。。。{}", result);
?? ?}
?
?? ?@Override
?? ?public void run(ApplicationArguments args) throws Exception {
?? ??? ?poolTaskExecutor.execute(() -> watchThread());
?? ?}
?
?? ?private void watchThread() {
?? ??? ?while (true) {
?? ??? ??? ?try {
?? ??? ??? ??? ?log.info("啟動延時任務(wù)的監(jiān)聽線程。。。。");
?? ??? ??? ??? ?ApplicationDelayedEvent event = this.delayQueue.take();
?? ??? ??? ??? ?log.info("接收到延時任務(wù)執(zhí)行。。。{}", ZonedDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
?? ??? ??? ??? ?eventPublisher.publishEvent(event);
?? ??? ??? ?} catch (InterruptedException e) {
?? ??? ??? ??? ?log.info("啟動延時任務(wù)的監(jiān)聽線程關(guān)閉");
?? ??? ??? ??? ?this.delayQueue.clear();
?? ??? ??? ??? ?break;
?? ??? ??? ?}
?? ??? ?}
?? ?}
}需要實現(xiàn)ApplicationRunner作為Spring boot的啟動時候運行的bean,目的就是開啟監(jiān)聽線程,有事件到了執(zhí)行時間take方法會得到數(shù)據(jù),然后調(diào)用Spring原生的事件發(fā)布。
另外特別說明的就是監(jiān)聽線程不能隨便創(chuàng)建,脫離了Spring容器的線程池會造成關(guān)閉服務(wù)的時候造成無法關(guān)閉的現(xiàn)象,所以建議還是自定義一個ThreadPoolTaskExecutor
? ? @Bean
?? ?public ThreadPoolTaskExecutor watchTaskExecutor() {
?? ??? ?ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
?? ??? ?executor.setCorePoolSize(1);
?? ??? ?executor.setMaxPoolSize(1);
?? ??? ?executor.setQueueCapacity(1);
?? ??? ?executor.setKeepAliveSeconds(60);
?? ??? ?executor.setThreadNamePrefix("watch_task_");
?
?? ??? ?// 線程池對拒絕任務(wù)的處理策略
//?? ??? ?ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常
//?? ??? ?ThreadPoolExecutor.DiscardPolicy:丟棄任務(wù),但是不拋出異常。
//?? ??? ?ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面。
//?? ??? ?ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用者處理該任務(wù) 。
?? ??? ?executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
?? ??? ?// 初始化
?? ??? ?executor.initialize();
?? ??? ?return executor;
?? ?}最后就是接收事件,跟傳統(tǒng)的接收是一樣的,異步只需要在配置類上加上@EnableAsync注解就行了,然后在監(jiān)聽的方法上加@Async
import java.util.concurrent.ThreadPoolExecutor;
?
import javax.annotation.PostConstruct;
?
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
?
import lombok.extern.slf4j.Slf4j;
?
@Slf4j
@Configuration
@EnableAsync
@ConditionalOnClass(ApplicationDelayEventPublisher.class)
public class DelayEventConfiguration {
?
?? ?@PostConstruct
?? ?public void init() {
?? ??? ?log.info("延遲Spring事件模塊啟動中。。。");
?? ?}
? ??
? ? // 不能和監(jiān)聽線程放到一個線程池,不然無法執(zhí)行
?? ?@Bean
?? ?public ThreadPoolTaskExecutor poolTaskExecutor() {
?? ??? ?ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
?? ??? ?executor.setCorePoolSize(3);
?? ??? ?executor.setMaxPoolSize(50);
?? ??? ?executor.setQueueCapacity(10000);
?? ??? ?executor.setKeepAliveSeconds(30);
?? ??? ?executor.setThreadNamePrefix("my_task_");
?
?? ??? ?// 線程池對拒絕任務(wù)的處理策略
//?? ??? ?ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常
//?? ??? ?ThreadPoolExecutor.DiscardPolicy:丟棄任務(wù),但是不拋出異常。
//?? ??? ?ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面。
//?? ??? ?ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用者處理該任務(wù) 。
?? ??? ?executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
?? ??? ?// 初始化
?? ??? ?executor.initialize();
?? ??? ?return executor;
?? ?}
? ??
? ? @Bean
?? ?public ThreadPoolTaskExecutor watchTaskExecutor() {
?? ??? ?ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
?? ??? ?executor.setCorePoolSize(1);
?? ??? ?executor.setMaxPoolSize(1);
?? ??? ?executor.setQueueCapacity(1);
?? ??? ?executor.setKeepAliveSeconds(60);
?? ??? ?executor.setThreadNamePrefix("watch_task_");
?
?? ??? ?// 線程池對拒絕任務(wù)的處理策略
//?? ??? ?ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常
//?? ??? ?ThreadPoolExecutor.DiscardPolicy:丟棄任務(wù),但是不拋出異常。
//?? ??? ?ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面。
//?? ??? ?ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用者處理該任務(wù) 。
?? ??? ?executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
?? ??? ?// 初始化
?? ??? ?executor.initialize();
?? ??? ?return executor;
?? ?}
}
?? ?@Async("poolTaskExecutor")
?? ?@EventListener
?? ?public void listenDelayEvent(ApplicationDelayedEvent event) {
?? ??? ?log.info("收到執(zhí)行事件:{}", event.getSource());
?? ?}總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Mybatis-Plus中Mapper的接口文件與xml文件相關(guān)的坑記錄
這篇文章主要介紹了Mybatis-Plus中Mapper的接口文件與xml文件相關(guān)的坑記錄,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01
將java程序打成jar包在cmd命令行下執(zhí)行的方法
這篇文章主要給大家介紹了關(guān)于將java程序打成jar包在cmd命令行下執(zhí)行的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2018-01-01
REST架構(gòu)及RESTful應(yīng)用程序簡介
MyBatis中基于別名typeAliases的設(shè)置

