亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

SpringBoot Event 事件如何實現(xiàn)異步延遲執(zhí)行

 更新時間:2023年02月14日 16:39:14   作者:peak_paradise  
這篇文章主要介紹了Spring Boot Event 事件如何實現(xiàn)異步延遲執(zhí)行問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

SpringBoot Event 事件實現(xiàn)異步延遲執(zhí)行

Spring的事件(Application Event)非常好用,雖然有一點會出現(xiàn)代碼污染,但是在做不使用其他框架來做異步的情況先,還是非常方便的。

使用它只需要三樣?xùn)|西

  • 自定義事件:繼承 ApplicationEvent,創(chuàng)建一個你想傳的數(shù)據(jù)的對象,會在監(jiān)聽器那邊收到該對象。
  • 定義監(jiān)聽器,實現(xiàn) ApplicationListener 或者通過 @EventListener 注解到方法上,兩種方式都行,但是推薦使用@EventListener,只要參數(shù)是你寫的繼承ApplicationEvent的對象,就會自動找到執(zhí)行方法。
  • 定義發(fā)布者,通過 ApplicationEventPublisher,自帶的bean,不需要單獨聲明,直接@Autowired就能使用,主要只需要publishEvent方法。

但是有時候我需要做延時執(zhí)行,自帶的功能缺不支持,但是我發(fā)現(xiàn)ApplicationEvent對象里面有兩個成員變量,source和timestamp,構(gòu)造函數(shù)(@since 5.3.8)也提供了同時注入這兩個變量數(shù)據(jù)。

? ?/**
?? ? * Create a new {@code ApplicationEvent} with its {@link #getTimestamp() timestamp}
?? ? * set to the value returned by {@link Clock#millis()} in the provided {@link Clock}.
?? ? * <p>This constructor is typically used in testing scenarios.
?? ? * @param source the object on which the event initially occurred or with
?? ? * which the event is associated (never {@code null})
?? ? * @param clock a clock which will provide the timestamp
?? ? * @since 5.3.8
?? ? * @see #ApplicationEvent(Object)
?? ? */
?? ?public ApplicationEvent(Object source, Clock clock) {
?? ??? ?super(source);
?? ??? ?this.timestamp = clock.millis();
?? ?}

但是,看了說明timestamp只是標(biāo)志執(zhí)行的時間,并不是為了延遲執(zhí)行,可惜了。

于是查了一些資料,找到j(luò)ava.util.concurrent.DelayQueue對象,JDK自帶了延遲的隊列對象,我們可以考慮利用自帶的timestamp和延遲隊列DelayQueue結(jié)合一起來實現(xiàn),具體DelayQueue的使用請自行查詢,非常的簡單。

首先,繼承的ApplicationEvent重新實現(xiàn)一下。

不單單要繼承ApplicationEvent,還需要實現(xiàn)Delayed,主要是因為DelayQueue隊列中必須是Delayed的實現(xiàn)類

import java.time.Clock;
import java.time.Duration;
import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
?
import org.springframework.context.ApplicationEvent;
?
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
?
@Data
@EqualsAndHashCode(callSuper = false)
public class ApplicationDelayedEvent extends ApplicationEvent implements Delayed {
?
?? ?private static final long serialVersionUID = 1L;
?
?? ?public ApplicationDelayedEvent(Object source) {
?? ??? ?this(source, 0L);
?? ?}
?
?? ?public ApplicationDelayedEvent(Object source, long delaySeconds) {
?? ??? ?super(source, Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(delaySeconds)));
?? ?}
?
?? ?@Override
?? ?public int compareTo(Delayed o) {
?? ??? ?// 最好用NANOSECONDS,更精確,但是用處不大
?? ??? ?long delta = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
?? ??? ?return (int) delta;
?? ?}
?
?? ?@Override
?? ?public long getDelay(TimeUnit unit) {
?? ??? ?// 最好用NANOSECONDS,更精確,但是用處不大,負數(shù)也會認為到時間了
?? ??? ?long millis = this.getTimestamp();
?? ??? ?long currentTimeMillis = System.currentTimeMillis();
?? ??? ?long sourceDuration = millis - currentTimeMillis;
?? ??? ?return unit.convert(sourceDuration, unit);
?? ?}
}

多了兩個必須實現(xiàn)的方法,compareTo是排序,應(yīng)該是隊列中的順序。

getDelay是主要的方法,目的是歸0的時候會從DelayQueue釋放出來,當(dāng)然那必須是NANOSECONDS級別的,我使用MILLISECONDS,就會出現(xiàn)負數(shù),但也是可以的,也能釋放出來。

另一個需要改的就是發(fā)布者,所以重新寫一個ApplicationDelayEventPublisher

import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.DelayQueue;
?
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
?
import lombok.extern.slf4j.Slf4j;
?
@Slf4j
@Component
public class ApplicationDelayEventPublisher implements ApplicationRunner {
?
?? ?// ApplicationDelayedEvent需要import進來
?? ?private DelayQueue<ApplicationDelayedEvent> delayQueue = new DelayQueue<>();
?
?? ?@Autowired
?? ?private ApplicationEventPublisher eventPublisher;
?
?? ?@Autowired
? ? @Qualifier("watchTaskExecutor")
?? ?private ThreadPoolTaskExecutor poolTaskExecutor;
?
?? ?public void publishEvent(ApplicationDelayedEvent event) {
?? ??? ?boolean result = delayQueue.offer(event);
?? ??? ?log.info("加入延遲隊列。。。。{}", result);
?? ?}
?
?? ?@Override
?? ?public void run(ApplicationArguments args) throws Exception {
?? ??? ?poolTaskExecutor.execute(() -> watchThread());
?? ?}
?
?? ?private void watchThread() {
?? ??? ?while (true) {
?? ??? ??? ?try {
?? ??? ??? ??? ?log.info("啟動延時任務(wù)的監(jiān)聽線程。。。。");
?? ??? ??? ??? ?ApplicationDelayedEvent event = this.delayQueue.take();
?? ??? ??? ??? ?log.info("接收到延時任務(wù)執(zhí)行。。。{}", ZonedDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
?? ??? ??? ??? ?eventPublisher.publishEvent(event);
?? ??? ??? ?} catch (InterruptedException e) {
?? ??? ??? ??? ?log.info("啟動延時任務(wù)的監(jiān)聽線程關(guān)閉");
?? ??? ??? ??? ?this.delayQueue.clear();
?? ??? ??? ??? ?break;
?? ??? ??? ?}
?? ??? ?}
?? ?}
}

需要實現(xiàn)ApplicationRunner作為Spring boot的啟動時候運行的bean,目的就是開啟監(jiān)聽線程,有事件到了執(zhí)行時間take方法會得到數(shù)據(jù),然后調(diào)用Spring原生的事件發(fā)布。

另外特別說明的就是監(jiān)聽線程不能隨便創(chuàng)建,脫離了Spring容器的線程池會造成關(guān)閉服務(wù)的時候造成無法關(guān)閉的現(xiàn)象,所以建議還是自定義一個ThreadPoolTaskExecutor

? ? @Bean
?? ?public ThreadPoolTaskExecutor watchTaskExecutor() {
?? ??? ?ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
?? ??? ?executor.setCorePoolSize(1);
?? ??? ?executor.setMaxPoolSize(1);
?? ??? ?executor.setQueueCapacity(1);
?? ??? ?executor.setKeepAliveSeconds(60);
?? ??? ?executor.setThreadNamePrefix("watch_task_");
?
?? ??? ?// 線程池對拒絕任務(wù)的處理策略
//?? ??? ?ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常
//?? ??? ?ThreadPoolExecutor.DiscardPolicy:丟棄任務(wù),但是不拋出異常。
//?? ??? ?ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面。
//?? ??? ?ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用者處理該任務(wù) 。
?? ??? ?executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
?? ??? ?// 初始化
?? ??? ?executor.initialize();
?? ??? ?return executor;
?? ?}

最后就是接收事件,跟傳統(tǒng)的接收是一樣的,異步只需要在配置類上加上@EnableAsync注解就行了,然后在監(jiān)聽的方法上加@Async

import java.util.concurrent.ThreadPoolExecutor;
?
import javax.annotation.PostConstruct;
?
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
?
import lombok.extern.slf4j.Slf4j;
?
@Slf4j
@Configuration
@EnableAsync
@ConditionalOnClass(ApplicationDelayEventPublisher.class)
public class DelayEventConfiguration {
?
?? ?@PostConstruct
?? ?public void init() {
?? ??? ?log.info("延遲Spring事件模塊啟動中。。。");
?? ?}
? ??
? ? // 不能和監(jiān)聽線程放到一個線程池,不然無法執(zhí)行
?? ?@Bean
?? ?public ThreadPoolTaskExecutor poolTaskExecutor() {
?? ??? ?ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
?? ??? ?executor.setCorePoolSize(3);
?? ??? ?executor.setMaxPoolSize(50);
?? ??? ?executor.setQueueCapacity(10000);
?? ??? ?executor.setKeepAliveSeconds(30);
?? ??? ?executor.setThreadNamePrefix("my_task_");
?
?? ??? ?// 線程池對拒絕任務(wù)的處理策略
//?? ??? ?ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常
//?? ??? ?ThreadPoolExecutor.DiscardPolicy:丟棄任務(wù),但是不拋出異常。
//?? ??? ?ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面。
//?? ??? ?ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用者處理該任務(wù) 。
?? ??? ?executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
?? ??? ?// 初始化
?? ??? ?executor.initialize();
?? ??? ?return executor;
?? ?}
? ??
? ? @Bean
?? ?public ThreadPoolTaskExecutor watchTaskExecutor() {
?? ??? ?ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
?? ??? ?executor.setCorePoolSize(1);
?? ??? ?executor.setMaxPoolSize(1);
?? ??? ?executor.setQueueCapacity(1);
?? ??? ?executor.setKeepAliveSeconds(60);
?? ??? ?executor.setThreadNamePrefix("watch_task_");
?
?? ??? ?// 線程池對拒絕任務(wù)的處理策略
//?? ??? ?ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常
//?? ??? ?ThreadPoolExecutor.DiscardPolicy:丟棄任務(wù),但是不拋出異常。
//?? ??? ?ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面。
//?? ??? ?ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用者處理該任務(wù) 。
?? ??? ?executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
?? ??? ?// 初始化
?? ??? ?executor.initialize();
?? ??? ?return executor;
?? ?}
}

?? ?@Async("poolTaskExecutor")
?? ?@EventListener
?? ?public void listenDelayEvent(ApplicationDelayedEvent event) {
?? ??? ?log.info("收到執(zhí)行事件:{}", event.getSource());
?? ?}

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • REST架構(gòu)及RESTful應(yīng)用程序簡介

    REST架構(gòu)及RESTful應(yīng)用程序簡介

    這篇文章主要為大家介紹了REST架構(gòu)及RESTful的應(yīng)用程序簡介,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-03-03
  • java實現(xiàn)pdf按頁轉(zhuǎn)換為圖片

    java實現(xiàn)pdf按頁轉(zhuǎn)換為圖片

    這篇文章主要為大家詳細介紹了java實現(xiàn)pdf按頁轉(zhuǎn)換為圖片,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-12-12
  • SpringBoot調(diào)用外部接口的幾種方式

    SpringBoot調(diào)用外部接口的幾種方式

    SpringBoot應(yīng)用中,調(diào)用外部接口是微服務(wù)架構(gòu)常見需求,本文主要介紹了SpringBoot調(diào)用外部接口的幾種方式,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-10-10
  • 員工管理系統(tǒng)java版

    員工管理系統(tǒng)java版

    這篇文章主要為大家詳細介紹了java版的員工管理系統(tǒng),,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-11-11
  • MyBatis中基于別名typeAliases的設(shè)置

    MyBatis中基于別名typeAliases的設(shè)置

    這篇文章主要介紹了MyBatis中基于別名typeAliases的設(shè)置,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • 最新評論