SpringBoot Event 事件如何實現(xiàn)異步延遲執(zhí)行
SpringBoot Event 事件實現(xiàn)異步延遲執(zhí)行
Spring的事件(Application Event)非常好用,雖然有一點會出現(xiàn)代碼污染,但是在做不使用其他框架來做異步的情況先,還是非常方便的。
使用它只需要三樣?xùn)|西
- 自定義事件:繼承 ApplicationEvent,創(chuàng)建一個你想傳的數(shù)據(jù)的對象,會在監(jiān)聽器那邊收到該對象。
- 定義監(jiān)聽器,實現(xiàn) ApplicationListener 或者通過 @EventListener 注解到方法上,兩種方式都行,但是推薦使用@EventListener,只要參數(shù)是你寫的繼承ApplicationEvent的對象,就會自動找到執(zhí)行方法。
- 定義發(fā)布者,通過 ApplicationEventPublisher,自帶的bean,不需要單獨聲明,直接@Autowired就能使用,主要只需要publishEvent方法。
但是有時候我需要做延時執(zhí)行,自帶的功能缺不支持,但是我發(fā)現(xiàn)ApplicationEvent對象里面有兩個成員變量,source和timestamp,構(gòu)造函數(shù)(@since 5.3.8)也提供了同時注入這兩個變量數(shù)據(jù)。
? ?/** ?? ? * Create a new {@code ApplicationEvent} with its {@link #getTimestamp() timestamp} ?? ? * set to the value returned by {@link Clock#millis()} in the provided {@link Clock}. ?? ? * <p>This constructor is typically used in testing scenarios. ?? ? * @param source the object on which the event initially occurred or with ?? ? * which the event is associated (never {@code null}) ?? ? * @param clock a clock which will provide the timestamp ?? ? * @since 5.3.8 ?? ? * @see #ApplicationEvent(Object) ?? ? */ ?? ?public ApplicationEvent(Object source, Clock clock) { ?? ??? ?super(source); ?? ??? ?this.timestamp = clock.millis(); ?? ?}
但是,看了說明timestamp只是標(biāo)志執(zhí)行的時間,并不是為了延遲執(zhí)行,可惜了。
于是查了一些資料,找到j(luò)ava.util.concurrent.DelayQueue對象,JDK自帶了延遲的隊列對象,我們可以考慮利用自帶的timestamp和延遲隊列DelayQueue結(jié)合一起來實現(xiàn),具體DelayQueue的使用請自行查詢,非常的簡單。
首先,繼承的ApplicationEvent重新實現(xiàn)一下。
不單單要繼承ApplicationEvent,還需要實現(xiàn)Delayed,主要是因為DelayQueue隊列中必須是Delayed的實現(xiàn)類
import java.time.Clock; import java.time.Duration; import java.util.Date; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; ? import org.springframework.context.ApplicationEvent; ? import lombok.Data; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; ? @Data @EqualsAndHashCode(callSuper = false) public class ApplicationDelayedEvent extends ApplicationEvent implements Delayed { ? ?? ?private static final long serialVersionUID = 1L; ? ?? ?public ApplicationDelayedEvent(Object source) { ?? ??? ?this(source, 0L); ?? ?} ? ?? ?public ApplicationDelayedEvent(Object source, long delaySeconds) { ?? ??? ?super(source, Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(delaySeconds))); ?? ?} ? ?? ?@Override ?? ?public int compareTo(Delayed o) { ?? ??? ?// 最好用NANOSECONDS,更精確,但是用處不大 ?? ??? ?long delta = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); ?? ??? ?return (int) delta; ?? ?} ? ?? ?@Override ?? ?public long getDelay(TimeUnit unit) { ?? ??? ?// 最好用NANOSECONDS,更精確,但是用處不大,負數(shù)也會認為到時間了 ?? ??? ?long millis = this.getTimestamp(); ?? ??? ?long currentTimeMillis = System.currentTimeMillis(); ?? ??? ?long sourceDuration = millis - currentTimeMillis; ?? ??? ?return unit.convert(sourceDuration, unit); ?? ?} }
多了兩個必須實現(xiàn)的方法,compareTo是排序,應(yīng)該是隊列中的順序。
getDelay是主要的方法,目的是歸0的時候會從DelayQueue釋放出來,當(dāng)然那必須是NANOSECONDS級別的,我使用MILLISECONDS,就會出現(xiàn)負數(shù),但也是可以的,也能釋放出來。
另一個需要改的就是發(fā)布者,所以重新寫一個ApplicationDelayEventPublisher
import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.DelayQueue; ? import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.context.ApplicationEventPublisher; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; ? import lombok.extern.slf4j.Slf4j; ? @Slf4j @Component public class ApplicationDelayEventPublisher implements ApplicationRunner { ? ?? ?// ApplicationDelayedEvent需要import進來 ?? ?private DelayQueue<ApplicationDelayedEvent> delayQueue = new DelayQueue<>(); ? ?? ?@Autowired ?? ?private ApplicationEventPublisher eventPublisher; ? ?? ?@Autowired ? ? @Qualifier("watchTaskExecutor") ?? ?private ThreadPoolTaskExecutor poolTaskExecutor; ? ?? ?public void publishEvent(ApplicationDelayedEvent event) { ?? ??? ?boolean result = delayQueue.offer(event); ?? ??? ?log.info("加入延遲隊列。。。。{}", result); ?? ?} ? ?? ?@Override ?? ?public void run(ApplicationArguments args) throws Exception { ?? ??? ?poolTaskExecutor.execute(() -> watchThread()); ?? ?} ? ?? ?private void watchThread() { ?? ??? ?while (true) { ?? ??? ??? ?try { ?? ??? ??? ??? ?log.info("啟動延時任務(wù)的監(jiān)聽線程。。。。"); ?? ??? ??? ??? ?ApplicationDelayedEvent event = this.delayQueue.take(); ?? ??? ??? ??? ?log.info("接收到延時任務(wù)執(zhí)行。。。{}", ZonedDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)); ?? ??? ??? ??? ?eventPublisher.publishEvent(event); ?? ??? ??? ?} catch (InterruptedException e) { ?? ??? ??? ??? ?log.info("啟動延時任務(wù)的監(jiān)聽線程關(guān)閉"); ?? ??? ??? ??? ?this.delayQueue.clear(); ?? ??? ??? ??? ?break; ?? ??? ??? ?} ?? ??? ?} ?? ?} }
需要實現(xiàn)ApplicationRunner作為Spring boot的啟動時候運行的bean,目的就是開啟監(jiān)聽線程,有事件到了執(zhí)行時間take方法會得到數(shù)據(jù),然后調(diào)用Spring原生的事件發(fā)布。
另外特別說明的就是監(jiān)聽線程不能隨便創(chuàng)建,脫離了Spring容器的線程池會造成關(guān)閉服務(wù)的時候造成無法關(guān)閉的現(xiàn)象,所以建議還是自定義一個ThreadPoolTaskExecutor
? ? @Bean ?? ?public ThreadPoolTaskExecutor watchTaskExecutor() { ?? ??? ?ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ?? ??? ?executor.setCorePoolSize(1); ?? ??? ?executor.setMaxPoolSize(1); ?? ??? ?executor.setQueueCapacity(1); ?? ??? ?executor.setKeepAliveSeconds(60); ?? ??? ?executor.setThreadNamePrefix("watch_task_"); ? ?? ??? ?// 線程池對拒絕任務(wù)的處理策略 //?? ??? ?ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常 //?? ??? ?ThreadPoolExecutor.DiscardPolicy:丟棄任務(wù),但是不拋出異常。 //?? ??? ?ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面。 //?? ??? ?ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用者處理該任務(wù) 。 ?? ??? ?executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); ?? ??? ?// 初始化 ?? ??? ?executor.initialize(); ?? ??? ?return executor; ?? ?}
最后就是接收事件,跟傳統(tǒng)的接收是一樣的,異步只需要在配置類上加上@EnableAsync注解就行了,然后在監(jiān)聽的方法上加@Async
import java.util.concurrent.ThreadPoolExecutor; ? import javax.annotation.PostConstruct; ? import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; ? import lombok.extern.slf4j.Slf4j; ? @Slf4j @Configuration @EnableAsync @ConditionalOnClass(ApplicationDelayEventPublisher.class) public class DelayEventConfiguration { ? ?? ?@PostConstruct ?? ?public void init() { ?? ??? ?log.info("延遲Spring事件模塊啟動中。。。"); ?? ?} ? ?? ? ? // 不能和監(jiān)聽線程放到一個線程池,不然無法執(zhí)行 ?? ?@Bean ?? ?public ThreadPoolTaskExecutor poolTaskExecutor() { ?? ??? ?ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ?? ??? ?executor.setCorePoolSize(3); ?? ??? ?executor.setMaxPoolSize(50); ?? ??? ?executor.setQueueCapacity(10000); ?? ??? ?executor.setKeepAliveSeconds(30); ?? ??? ?executor.setThreadNamePrefix("my_task_"); ? ?? ??? ?// 線程池對拒絕任務(wù)的處理策略 //?? ??? ?ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常 //?? ??? ?ThreadPoolExecutor.DiscardPolicy:丟棄任務(wù),但是不拋出異常。 //?? ??? ?ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面。 //?? ??? ?ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用者處理該任務(wù) 。 ?? ??? ?executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); ?? ??? ?// 初始化 ?? ??? ?executor.initialize(); ?? ??? ?return executor; ?? ?} ? ?? ? ? @Bean ?? ?public ThreadPoolTaskExecutor watchTaskExecutor() { ?? ??? ?ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ?? ??? ?executor.setCorePoolSize(1); ?? ??? ?executor.setMaxPoolSize(1); ?? ??? ?executor.setQueueCapacity(1); ?? ??? ?executor.setKeepAliveSeconds(60); ?? ??? ?executor.setThreadNamePrefix("watch_task_"); ? ?? ??? ?// 線程池對拒絕任務(wù)的處理策略 //?? ??? ?ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常 //?? ??? ?ThreadPoolExecutor.DiscardPolicy:丟棄任務(wù),但是不拋出異常。 //?? ??? ?ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面。 //?? ??? ?ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用者處理該任務(wù) 。 ?? ??? ?executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); ?? ??? ?// 初始化 ?? ??? ?executor.initialize(); ?? ??? ?return executor; ?? ?} } ?? ?@Async("poolTaskExecutor") ?? ?@EventListener ?? ?public void listenDelayEvent(ApplicationDelayedEvent event) { ?? ??? ?log.info("收到執(zhí)行事件:{}", event.getSource()); ?? ?}
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Mybatis-Plus中Mapper的接口文件與xml文件相關(guān)的坑記錄
這篇文章主要介紹了Mybatis-Plus中Mapper的接口文件與xml文件相關(guān)的坑記錄,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01將java程序打成jar包在cmd命令行下執(zhí)行的方法
這篇文章主要給大家介紹了關(guān)于將java程序打成jar包在cmd命令行下執(zhí)行的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2018-01-01

REST架構(gòu)及RESTful應(yīng)用程序簡介

MyBatis中基于別名typeAliases的設(shè)置