java實(shí)現(xiàn)延遲/超時(shí)/定時(shí)問(wèn)題
java實(shí)現(xiàn)延遲/超時(shí)/定時(shí)
java 每間隔5秒執(zhí)行一次,一共執(zhí)行5次然后結(jié)束
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class Main { public static void main(String[] args) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); AtomicInteger count = new AtomicInteger(0); Runnable task = () -> { System.out.println("Task executed at " + System.currentTimeMillis()); // 這里放置你的任務(wù)代碼 // ... // 當(dāng)執(zhí)行次數(shù)達(dá)到5次后,關(guān)閉線程池 if (count.getAndIncrement() == 4) { // 注意:計(jì)數(shù)從0開(kāi)始,所以這里是4 executor.shutdown(); } }; // 每隔5秒執(zhí)行一次 executor.scheduleAtFixedRate(task, 0, 5, TimeUnit.SECONDS); // 確保線程池結(jié)束,避免程序無(wú)法退出 try { executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { executor.shutdownNow(); Thread.currentThread().interrupt(); } } }
任務(wù)代碼執(zhí)行時(shí)間超過(guò)5秒會(huì)怎樣
在Java中,如果你使用ScheduledExecutorService.scheduleAtFixedRate()
方法,并且你的任務(wù)執(zhí)行時(shí)間超過(guò)了預(yù)定的周期(在這個(gè)例子中是5秒),那么下一次任務(wù)將在當(dāng)前任務(wù)完成后立即開(kāi)始執(zhí)行,而不是等待下一個(gè)完整周期。
也就是說(shuō),如果某個(gè)任務(wù)執(zhí)行了6秒,則下一次任務(wù)不會(huì)等到10秒(即上一個(gè)任務(wù)結(jié)束后的5秒)才開(kāi)始,而是在第一個(gè)任務(wù)完成之后立刻啟動(dòng)。因此,任務(wù)的執(zhí)行將不再嚴(yán)格遵循每隔5秒執(zhí)行一次的規(guī)律,而是盡量保持每?jī)纱稳蝿?wù)執(zhí)行之間的開(kāi)始時(shí)間間隔為5秒。
這可能導(dǎo)致連續(xù)的任務(wù)執(zhí)行時(shí)間重疊,尤其是在任務(wù)執(zhí)行時(shí)間不穩(wěn)定或持續(xù)較長(zhǎng)的情況下。如果你希望每次任務(wù)都至少間隔5秒,即使前一個(gè)任務(wù)超時(shí)也要等到下個(gè)周期再執(zhí)行,你應(yīng)該考慮使用ScheduledExecutorService.scheduleWithFixedDelay()
方法替代。
如果你想確保每次任務(wù)執(zhí)行之間至少有5秒的間隔,即使前一個(gè)任務(wù)超時(shí)也等待到下個(gè)周期再開(kāi)始執(zhí)行,你可以使用ScheduledExecutorService.scheduleWithFixedDelay()
方法替代。以下是修改后的代碼
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class Main { public static void main(String[] args) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); AtomicInteger count = new AtomicInteger(0); Runnable task = () -> { long startTime = System.currentTimeMillis(); System.out.println("Task started at " + startTime); // 這里放置你的任務(wù)代碼 // ... // 假設(shè)這里是模擬耗時(shí)的任務(wù) try { Thread.sleep(7000); // 模擬任務(wù)執(zhí)行時(shí)間超過(guò)5秒的情況 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("Task ended at " + System.currentTimeMillis()); // 當(dāng)執(zhí)行次數(shù)達(dá)到5次后,取消任務(wù) if (count.getAndIncrement() == 4) { executor.shutdown(); } }; // 每隔5秒(即使前一次任務(wù)執(zhí)行超過(guò)5秒)開(kāi)始執(zhí)行下一次任務(wù) executor.scheduleWithFixedDelay(task, 0, 5, TimeUnit.SECONDS); // 確保線程池結(jié)束,避免程序無(wú)法退出 try { executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { executor.shutdownNow(); Thread.currentThread().interrupt(); } } }
在這個(gè)示例中,即使任務(wù)執(zhí)行時(shí)間超過(guò)了5秒,下一次任務(wù)也會(huì)在當(dāng)前任務(wù)結(jié)束后至少等待5秒才開(kāi)始執(zhí)行
executor.shutdownNow()
方法用于嘗試停止正在執(zhí)行的任務(wù),并取消尚未開(kāi)始執(zhí)行的任務(wù)。此方法會(huì)立即關(guān)閉線程池,同時(shí)返回一個(gè)包含所有已提交但尚未開(kāi)始執(zhí)行的任務(wù)列表。
調(diào)用 shutdownNow()
時(shí)會(huì)發(fā)生以下情況:
- 立即停止當(dāng)前正在執(zhí)行的任務(wù)(如果可能的話)。對(duì)于某些任務(wù)來(lái)說(shuō),這可能意味著中斷正在運(yùn)行的任務(wù)。因此,你的任務(wù)應(yīng)該能夠正確處理中斷請(qǐng)求(通過(guò)檢查
Thread.currentThread().isInterrupted()
)并盡可能快速且干凈地退出。 - 取消所有等待隊(duì)列中的未開(kāi)始執(zhí)行的任務(wù)。
- 調(diào)用
shutdownNow()
后,線程池將不再接受新的任務(wù)。
與之相對(duì)的是 executor.shutdown()
方法,它不會(huì)立即停止正在執(zhí)行的任務(wù),而是等待所有已提交的任務(wù)完成后才關(guān)閉線程池,且不再接受新任務(wù)。但是,shutdown()
方法并不會(huì)中斷正在執(zhí)行的任務(wù)。
scheduleAtFixedRate 和 scheduleWithFixedDelay 的區(qū)別
scheduleAtFixedRate():
- 此方法按照固定的頻率執(zhí)行任務(wù)。
- 即使前一次任務(wù)尚未完成(如果任務(wù)執(zhí)行時(shí)間超過(guò)了預(yù)定周期),下一次任務(wù)也會(huì)在上一次開(kāi)始執(zhí)行的時(shí)間基礎(chǔ)上加上固定周期后立即啟動(dòng)。
- 因此,如果任務(wù)執(zhí)行耗時(shí)不一致或較長(zhǎng),連續(xù)的任務(wù)可能會(huì)重疊執(zhí)行。
scheduleWithFixedDelay():
- 此方法確保每次任務(wù)執(zhí)行完成后,都會(huì)等待一個(gè)固定的延遲時(shí)間后再啟動(dòng)下一次任務(wù)。
- 即使前一次任務(wù)超時(shí),下一次任務(wù)也會(huì)在前一次任務(wù)結(jié)束時(shí)刻的基礎(chǔ)上加上指定的延遲時(shí)間才開(kāi)始。
- 這意味著,無(wú)論任務(wù)執(zhí)行所需時(shí)間如何,兩次任務(wù)執(zhí)行之間的間隔總是至少等于指定的延遲時(shí)間。
總結(jié)來(lái)說(shuō):
- 如果你希望任務(wù)按固定的時(shí)間間隔開(kāi)始,而不考慮每個(gè)任務(wù)的實(shí)際執(zhí)行時(shí)間,使用
scheduleAtFixedRate()
。 - 如果你希望每個(gè)任務(wù)結(jié)束后有一段固定的“冷靜期”,確保任何時(shí)間點(diǎn)相鄰兩次任務(wù)之間至少有一定的時(shí)間間隔,那么應(yīng)該使用
scheduleWithFixedDelay()
。
DelayQueue
DelayQueue是JDK提供的api,是一個(gè)延遲隊(duì)列
DelayQueue泛型參數(shù)得實(shí)現(xiàn)Delayed接口,Delayed繼承了Comparable接口。
getDelay
方法返回這個(gè)任務(wù)還剩多久時(shí)間可以執(zhí)行,小于0的時(shí)候說(shuō)明可以這個(gè)延遲任務(wù)到了執(zhí)行的時(shí)間了。compareTo
這個(gè)是對(duì)任務(wù)排序的,保證最先到延遲時(shí)間的任務(wù)排到隊(duì)列的頭。
demo
@Getter public class SanYouTask implements Delayed { private final String taskContent; private final Long triggerTime; public SanYouTask(String taskContent, Long delayTime) { this.taskContent = taskContent; this.triggerTime = System.currentTimeMillis() + delayTime * 1000; } @Override public long getDelay(TimeUnit unit) { return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return this.triggerTime.compareTo(((SanYouTask) o).triggerTime); } }
SanYouTask實(shí)現(xiàn)了Delayed接口,構(gòu)造參數(shù)
taskContent
:延遲任務(wù)的具體的內(nèi)容delayTime
:延遲時(shí)間,秒為單位
測(cè)試
@Slf4j public class DelayQueueDemo { public static void main(String[] args) { DelayQueue<SanYouTask> sanYouTaskDelayQueue = new DelayQueue<>(); new Thread(() -> { while (true) { try { SanYouTask sanYouTask = sanYouTaskDelayQueue.take(); log.info("獲取到延遲任務(wù):{}", sanYouTask.getTaskContent()); } catch (Exception e) { } } }).start(); log.info("提交延遲任務(wù)"); sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日記5s", 5L)); sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日記3s", 3L)); sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日記8s", 8L)); } }
開(kāi)啟一個(gè)線程從DelayQueue中獲取任務(wù),然后提交了三個(gè)任務(wù),延遲時(shí)間分為別5s,3s,8s。
測(cè)試結(jié)果:
成功實(shí)現(xiàn)了延遲任務(wù)。
實(shí)現(xiàn)原理
offer
方法在提交任務(wù)的時(shí)候,會(huì)通過(guò)根據(jù)compareTo
的實(shí)現(xiàn)對(duì)任務(wù)進(jìn)行排序,將最先需要被執(zhí)行的任務(wù)放到隊(duì)列頭。take
方法獲取任務(wù)的時(shí)候,會(huì)拿到隊(duì)列頭部的元素,也就是隊(duì)列中最早需要被執(zhí)行的任務(wù),通過(guò)getDelay返回值判斷任務(wù)是否需要被立刻執(zhí)行,如果需要的話,就返回任務(wù),如果不需要就會(huì)等待這個(gè)任務(wù)到延遲時(shí)間的剩余時(shí)間,當(dāng)時(shí)間到了就會(huì)將任務(wù)返回。
Timer
Timer也是JDK提供的api
demo
@Slf4j public class TimerDemo { public static void main(String[] args) { Timer timer = new Timer(); log.info("提交延遲任務(wù)"); timer.schedule(new TimerTask() { @Override public void run() { log.info("執(zhí)行延遲任務(wù)"); } }, 5000); } }
通過(guò)schedule
提交一個(gè)延遲時(shí)間為5s的延遲任務(wù)
實(shí)現(xiàn)原理
提交的任務(wù)是一個(gè)TimerTask
public abstract class TimerTask implements Runnable { //忽略其它屬性 long nextExecutionTime; }
TimerTask內(nèi)部有一個(gè)nextExecutionTime
屬性,代表下一次任務(wù)執(zhí)行的時(shí)間,在提交任務(wù)的時(shí)候會(huì)計(jì)算出nextExecutionTime
值。
Timer內(nèi)部有一個(gè)TaskQueue對(duì)象,用來(lái)保存TimerTask任務(wù)的,會(huì)根據(jù)nextExecutionTime
來(lái)排序,保證能夠快速獲取到最早需要被執(zhí)行的延遲任務(wù)。
在Timer內(nèi)部還有一個(gè)執(zhí)行任務(wù)的線程TimerThread,這個(gè)線程就跟DelayQueue demo中開(kāi)啟的線程作用是一樣的,用來(lái)執(zhí)行到了延遲時(shí)間的任務(wù)。
所以總的來(lái)看,Timer有點(diǎn)像整體封裝了DelayQueue demo中的所有東西,讓用起來(lái)簡(jiǎn)單點(diǎn)。
雖然Timer用起來(lái)比較簡(jiǎn)單,但是在阿里規(guī)范中是不推薦使用的,主要是有以下幾點(diǎn)原因:
- Timer使用單線程來(lái)處理任務(wù),長(zhǎng)時(shí)間運(yùn)行的任務(wù)會(huì)導(dǎo)致其他任務(wù)的延時(shí)處理
- Timer沒(méi)有對(duì)運(yùn)行時(shí)異常進(jìn)行處理,一旦某個(gè)任務(wù)觸發(fā)運(yùn)行時(shí)異常,會(huì)導(dǎo)致整個(gè)Timer崩潰,不安全
ScheduledThreadPoolExecutor
由于Timer在使用上有一定的問(wèn)題,所以在JDK1.5版本的時(shí)候提供了ScheduledThreadPoolExecutor,這個(gè)跟Timer的作用差不多,并且他們的方法的命名都是差不多的,但是ScheduledThreadPoolExecutor解決了單線程和異常崩潰等問(wèn)題。
demo
@Slf4j public class ScheduledThreadPoolExecutorDemo { public static void main(String[] args) { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, new ThreadPoolExecutor.CallerRunsPolicy()); log.info("提交延遲任務(wù)"); executor.schedule(() -> log.info("執(zhí)行延遲任務(wù)"), 5, TimeUnit.SECONDS); } }
結(jié)果
實(shí)現(xiàn)原理
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,也就是繼承了線程池,所以可以有很多個(gè)線程來(lái)執(zhí)行任務(wù)。
ScheduledThreadPoolExecutor在構(gòu)造的時(shí)候會(huì)傳入一個(gè)DelayedWorkQueue阻塞隊(duì)列,所以線程池內(nèi)部的阻塞隊(duì)列是DelayedWorkQueue。
在提交延遲任務(wù)的時(shí)候,任務(wù)會(huì)被封裝一個(gè)任務(wù)會(huì)被封裝成ScheduledFutureTask
對(duì)象,然后放到DelayedWorkQueue阻塞隊(duì)列中。
ScheduledFutureTask
實(shí)現(xiàn)了前面提到的Delayed接口,所以其實(shí)可以猜到DelayedWorkQueue會(huì)根據(jù)ScheduledFutureTask
對(duì)于Delayed接口的實(shí)現(xiàn)來(lái)排序,所以線程能夠獲取到最早到延遲時(shí)間的任務(wù)。
當(dāng)線程從DelayedWorkQueue中獲取到需要執(zhí)行的任務(wù)之后就會(huì)執(zhí)行任務(wù)。
監(jiān)聽(tīng)Redis過(guò)期key
在Redis中,有個(gè)發(fā)布訂閱的機(jī)制
生產(chǎn)者在消息發(fā)送時(shí)需要到指定發(fā)送到哪個(gè)channel上,消費(fèi)者訂閱這個(gè)channel就能獲取到消息。圖中channel理解成MQ中的topic。
并且在Redis中,有很多默認(rèn)的channel,只不過(guò)向這些channel發(fā)送消息的生產(chǎn)者不是我們寫(xiě)的代碼,而是Redis本身。這里面就有這么一個(gè)channel叫做__keyevent@<db>__:expired
,db是指Redis數(shù)據(jù)庫(kù)的序號(hào)。
當(dāng)某個(gè)Redis的key過(guò)期之后,Redis內(nèi)部會(huì)發(fā)布一個(gè)事件到__keyevent@<db>__:expired
這個(gè)channel上,只要監(jiān)聽(tīng)這個(gè)事件,那么就可以獲取到過(guò)期的key。
所以基于監(jiān)聽(tīng)Redis過(guò)期key實(shí)現(xiàn)延遲任務(wù)的原理如下:
- 將延遲任務(wù)作為key,過(guò)期時(shí)間設(shè)置為延遲時(shí)間
- 監(jiān)聽(tīng)
__keyevent@<db>__:expired
這個(gè)channel,那么一旦延遲任務(wù)到了過(guò)期時(shí)間(延遲時(shí)間),那么就可以獲取到這個(gè)任務(wù)
demo
Spring已經(jīng)實(shí)現(xiàn)了監(jiān)聽(tīng)__keyevent@*__:expired
這個(gè)channel這個(gè)功能,__keyevent@*__:expired
中的*
代表通配符的意思,監(jiān)聽(tīng)所有的數(shù)據(jù)庫(kù)。
所以demo寫(xiě)起來(lái)就很簡(jiǎn)單了,只需4步即可
依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.2.5.RELEASE</version> </dependency>
配置文件
spring: redis: host: 192.168.200.144 port: 6379
配置類(lèi)
@Configuration public class RedisConfiguration { @Bean public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(connectionFactory); return redisMessageListenerContainer; } @Bean public KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) { return new KeyExpirationEventMessageListener(redisMessageListenerContainer); } }
KeyExpirationEventMessageListener實(shí)現(xiàn)了對(duì)__keyevent@*__:expired
channel的監(jiān)聽(tīng)
當(dāng)KeyExpirationEventMessageListener收到Redis發(fā)布的過(guò)期Key的消息的時(shí)候,會(huì)發(fā)布RedisKeyExpiredEvent事件
所以我們只需要監(jiān)聽(tīng)RedisKeyExpiredEvent事件就可以拿到過(guò)期消息的Key,也就是延遲消息。
對(duì)RedisKeyExpiredEvent事件的監(jiān)聽(tīng)實(shí)現(xiàn)MyRedisKeyExpiredEventListener
@Component public class MyRedisKeyExpiredEventListener implements ApplicationListener<RedisKeyExpiredEvent> { @Override public void onApplicationEvent(RedisKeyExpiredEvent event) { byte[] body = event.getSource(); System.out.println("獲取到延遲消息:" + new String(body)); } }
代碼寫(xiě)好,啟動(dòng)應(yīng)用
之后我直接通過(guò)Redis命令設(shè)置消息,消息的key為sanyou,值為task,值不重要,過(guò)期時(shí)間為5s
set sanyou task expire sanyou 5
成功獲取到延遲任務(wù)
雖然這種方式可以實(shí)現(xiàn)延遲任務(wù),但是這種方式坑比較多
任務(wù)存在延遲
Redis過(guò)期事件的發(fā)布不是指key到了過(guò)期時(shí)間就發(fā)布,而是key到了過(guò)期時(shí)間被清除之后才會(huì)發(fā)布事件。
而Redis過(guò)期key的兩種清除策略,就是面試八股文常背的兩種:
- 惰性清除。當(dāng)這個(gè)key過(guò)期之后,訪問(wèn)時(shí),這個(gè)Key才會(huì)被清除
- 定時(shí)清除。后臺(tái)會(huì)定期檢查一部分key,如果有key過(guò)期了,就會(huì)被清除
所以即使key到了過(guò)期時(shí)間,Redis也不一定會(huì)發(fā)送key過(guò)期事件,這就到導(dǎo)致雖然延遲任務(wù)到了延遲時(shí)間也可能獲取不到延遲任務(wù)。
丟消息太頻繁
Redis實(shí)現(xiàn)的發(fā)布訂閱模式,消息是沒(méi)有持久化機(jī)制,當(dāng)消息發(fā)布到某個(gè)channel之后,如果沒(méi)有客戶端訂閱這個(gè)channel,那么這個(gè)消息就丟了,并不會(huì)像MQ一樣進(jìn)行持久化,等有消費(fèi)者訂閱的時(shí)候再給消費(fèi)者消費(fèi)。
所以說(shuō),假設(shè)服務(wù)重啟期間,某個(gè)生產(chǎn)者或者是Redis本身發(fā)布了一條消息到某個(gè)channel,由于服務(wù)重啟,沒(méi)有監(jiān)聽(tīng)這個(gè)channel,那么這個(gè)消息自然就丟了。
消息消費(fèi)只有廣播模式
Redis的發(fā)布訂閱模式消息消費(fèi)只有廣播模式一種。
所謂的廣播模式就是多個(gè)消費(fèi)者訂閱同一個(gè)channel,那么每個(gè)消費(fèi)者都能消費(fèi)到發(fā)布到這個(gè)channel的所有消息。
如圖,生產(chǎn)者發(fā)布了一條消息,內(nèi)容為sanyou,那么兩個(gè)消費(fèi)者都可以同時(shí)收到sanyou這條消息。
所以,如果通過(guò)監(jiān)聽(tīng)channel來(lái)獲取延遲任務(wù),那么一旦服務(wù)實(shí)例有多個(gè)的話,還得保證消息不能重復(fù)處理,額外地增加了代碼開(kāi)發(fā)量。
接收到所有key的某個(gè)事件
這個(gè)不屬于Redis發(fā)布訂閱模式的問(wèn)題,而是Redis本身事件通知的問(wèn)題。
當(dāng)監(jiān)聽(tīng)了__keyevent@<db>__:expired
的channel,那么所有的Redis的key只要發(fā)生了過(guò)期事件都會(huì)被通知給消費(fèi)者,不管這個(gè)key是不是消費(fèi)者想接收到的。
所以如果你只想消費(fèi)某一類(lèi)消息的key,那么還得自行加一些標(biāo)記,比如消息的key加個(gè)前綴,消費(fèi)的時(shí)候判斷一下帶前綴的key就是需要消費(fèi)的任務(wù)。
Redisson的RDelayedQueue
Redisson他是Redis的兒子(Redis son),基于Redis實(shí)現(xiàn)了非常多的功能,其中最常使用的就是Redis分布式鎖的實(shí)現(xiàn),但是除了實(shí)現(xiàn)Redis分布式鎖之外,它還實(shí)現(xiàn)了延遲隊(duì)列的功能。
demo
引入pom
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.1</version> </dependency
封裝了一個(gè)RedissonDelayQueue類(lèi)
@Component @Slf4j public class RedissonDelayQueue { private RedissonClient redissonClient; private RDelayedQueue<String> delayQueue; private RBlockingQueue<String> blockingQueue; @PostConstruct public void init() { initDelayQueue(); startDelayQueueConsumer(); } private void initDelayQueue() { Config config = new Config(); SingleServerConfig serverConfig = config.useSingleServer(); serverConfig.setAddress("redis://localhost:6379"); redissonClient = Redisson.create(config); blockingQueue = redissonClient.getBlockingQueue("SANYOU"); delayQueue = redissonClient.getDelayedQueue(blockingQueue); } private void startDelayQueueConsumer() { new Thread(() -> { while (true) { try { String task = blockingQueue.take(); log.info("接收到延遲任務(wù):{}", task); } catch (Exception e) { e.printStackTrace(); } } }, "SANYOU-Consumer").start(); } public void offerTask(String task, long seconds) { log.info("添加延遲任務(wù):{} 延遲時(shí)間:{}s", task, seconds); delayQueue.offer(task, seconds, TimeUnit.SECONDS); } }
這個(gè)類(lèi)在創(chuàng)建的時(shí)候會(huì)去初始化延遲隊(duì)列,創(chuàng)建一個(gè)RedissonClient對(duì)象,之后通過(guò)RedissonClient對(duì)象獲取到RDelayedQueue和RBlockingQueue對(duì)象,傳入的隊(duì)列名字叫SANYOU,這個(gè)名字無(wú)所謂。
當(dāng)延遲隊(duì)列創(chuàng)建之后,會(huì)開(kāi)啟一個(gè)延遲任務(wù)的消費(fèi)線程,這個(gè)線程會(huì)一直從RBlockingQueue中通過(guò)take方法阻塞獲取延遲任務(wù)。
添加任務(wù)的時(shí)候是通過(guò)RDelayedQueue的offer方法添加的。
controller類(lèi),通過(guò)接口添加任務(wù),延遲時(shí)間為5s
@RestController public class RedissonDelayQueueController { @Resource private RedissonDelayQueue redissonDelayQueue; @GetMapping("/add") public void addTask(@RequestParam("task") String task) { redissonDelayQueue.offerTask(task, 5); } }
啟動(dòng)項(xiàng)目,在瀏覽器輸入如下連接,添加任務(wù)
http://localhost:8080/add?task=sanyou
靜靜等待5s,成功獲取到任務(wù)。
實(shí)現(xiàn)原理
如下是Redisson延遲隊(duì)列的實(shí)現(xiàn)原理
SANYOU前面的前綴都是固定的,Redisson創(chuàng)建的時(shí)候會(huì)拼上前綴。
redisson_delay_queue_timeout:SANYOU
,sorted set數(shù)據(jù)類(lèi)型,存放所有延遲任務(wù),按照延遲任務(wù)的到期時(shí)間戳(提交任務(wù)時(shí)的時(shí)間戳 + 延遲時(shí)間)來(lái)排序的,所以列表的最前面的第一個(gè)元素就是整個(gè)延遲隊(duì)列中最早要被執(zhí)行的任務(wù),這個(gè)概念很重要redisson_delay_queue:SANYOU
,list數(shù)據(jù)類(lèi)型,也是存放所有的任務(wù),但是研究下來(lái)發(fā)現(xiàn)好像沒(méi)什么用。。SANYOU
,list數(shù)據(jù)類(lèi)型,被稱為目標(biāo)隊(duì)列,這個(gè)里面存放的任務(wù)都是已經(jīng)到了延遲時(shí)間的,可以被消費(fèi)者獲取的任務(wù),所以上面demo中的RBlockingQueue的take方法是從這個(gè)目標(biāo)隊(duì)列中獲取到任務(wù)的redisson_delay_queue_channel:SANYOU
,是一個(gè)channel,用來(lái)通知客戶端開(kāi)啟一個(gè)延遲任務(wù)
任務(wù)提交的時(shí)候,Redisson會(huì)將任務(wù)放到redisson_delay_queue_timeout:SANYOU
中,分?jǐn)?shù)就是提交任務(wù)的時(shí)間戳+延遲時(shí)間,就是延遲任務(wù)的到期時(shí)間戳
Redisson客戶端內(nèi)部通過(guò)監(jiān)聽(tīng)redisson_delay_queue_channel:SANYOU
這個(gè)channel來(lái)提交一個(gè)延遲任務(wù),這個(gè)延遲任務(wù)能夠保證將redisson_delay_queue_timeout:SANYOU
中到了延遲時(shí)間的任務(wù)從redisson_delay_queue_timeout:SANYOU
中移除,存到SANYOU
這個(gè)目標(biāo)隊(duì)列中。
于是消費(fèi)者就可以從SANYOU
這個(gè)目標(biāo)隊(duì)列獲取到延遲任務(wù)了。
所以從這可以看出,Redisson的延遲任務(wù)的實(shí)現(xiàn)跟前面說(shuō)的MQ的實(shí)現(xiàn)都是殊途同歸,最開(kāi)始任務(wù)放到中間的一個(gè)地方,叫做redisson_delay_queue_timeout:SANYOU
,然后會(huì)開(kāi)啟一個(gè)類(lèi)似于定時(shí)任務(wù)的一個(gè)東西,去判斷這個(gè)中間地方的消息是否到了延遲時(shí)間,到了再放到最終的目標(biāo)的隊(duì)列供消費(fèi)者消費(fèi)。
Redisson的這種實(shí)現(xiàn)方式比監(jiān)聽(tīng)Redis過(guò)期key的實(shí)現(xiàn)方式更加可靠,因?yàn)橄⒍即嬖趌ist和sorted set數(shù)據(jù)類(lèi)型中,所以消息很少丟。
Hutool的SystemTimer
Hutool工具類(lèi)也提供了延遲任務(wù)的實(shí)現(xiàn)SystemTimer
demo
@Slf4j public class SystemTimerDemo { public static void main(String[] args) { SystemTimer systemTimer = new SystemTimer(); systemTimer.start(); log.info("提交延遲任務(wù)"); systemTimer.addTask(new TimerTask(() -> log.info("執(zhí)行延遲任務(wù)"), 5000)); } }
執(zhí)行結(jié)果
Hutool底層其實(shí)也用到了時(shí)間輪。
Quartz
quartz是一款開(kāi)源作業(yè)調(diào)度框架,基于quartz提供的api也可以實(shí)現(xiàn)延遲任務(wù)的功能。
demo
依賴
<dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.3.2</version> </dependency>
SanYouJob實(shí)現(xiàn)Job接口,當(dāng)任務(wù)到達(dá)執(zhí)行時(shí)間的時(shí)候會(huì)調(diào)用execute的實(shí)現(xiàn),從context可以獲取到任務(wù)的內(nèi)容
@Slf4j public class SanYouJob implements Job { @Override public void execute(JobExecutionContext context) throws JobExecutionException { JobDetail jobDetail = context.getJobDetail(); JobDataMap jobDataMap = jobDetail.getJobDataMap(); log.info("獲取到延遲任務(wù):{}", jobDataMap.get("delayTask")); } }
測(cè)試類(lèi)
public class QuartzDemo { public static void main(String[] args) throws SchedulerException, InterruptedException { // 1.創(chuàng)建Scheduler的工廠 SchedulerFactory sf = new StdSchedulerFactory(); // 2.從工廠中獲取調(diào)度器實(shí)例 Scheduler scheduler = sf.getScheduler(); // 6.啟動(dòng) 調(diào)度器 scheduler.start(); // 3.創(chuàng)建JobDetail,Job類(lèi)型就是上面說(shuō)的SanYouJob JobDetail jb = JobBuilder.newJob(SanYouJob.class) .usingJobData("delayTask", "這是一個(gè)延遲任務(wù)") .build(); // 4.創(chuàng)建Trigger Trigger t = TriggerBuilder.newTrigger() //任務(wù)的觸發(fā)時(shí)間就是延遲任務(wù)到的延遲時(shí)間 .startAt(DateUtil.offsetSecond(new Date(), 5)) .build(); // 5.注冊(cè)任務(wù)和定時(shí)器 log.info("提交延遲任務(wù)"); scheduler.scheduleJob(jb, t); } }
執(zhí)行結(jié)果:
實(shí)現(xiàn)原理
核心組件
Job
:表示一個(gè)任務(wù),execute方法的實(shí)現(xiàn)是對(duì)任務(wù)的執(zhí)行邏輯JobDetail
:任務(wù)的詳情,可以設(shè)置任務(wù)需要的參數(shù)等信息Trigger
:觸發(fā)器,是用來(lái)觸發(fā)業(yè)務(wù)的執(zhí)行,比如說(shuō)指定5s后觸發(fā)任務(wù),那么任務(wù)就會(huì)在5s后觸發(fā)Scheduler
:調(diào)度器,內(nèi)部可以注冊(cè)多個(gè)任務(wù)和對(duì)應(yīng)任務(wù)的觸發(fā)器,之后會(huì)調(diào)度任務(wù)的執(zhí)行
啟動(dòng)的時(shí)候會(huì)開(kāi)啟一個(gè)QuartzSchedulerThread調(diào)度線程,這個(gè)線程會(huì)去判斷任務(wù)是否到了執(zhí)行時(shí)間,到的話就將任務(wù)交給任務(wù)線程池去執(zhí)行。
無(wú)限輪詢延遲任務(wù)
無(wú)限輪詢的意思就是開(kāi)啟一個(gè)線程不停的去輪詢?nèi)蝿?wù),當(dāng)這些任務(wù)到達(dá)了延遲時(shí)間,那么就執(zhí)行任務(wù)。
demo
@Slf4j public class PollingTaskDemo { private static final List<DelayTask> DELAY_TASK_LIST = new CopyOnWriteArrayList<>(); public static void main(String[] args) { new Thread(() -> { while (true) { try { for (DelayTask delayTask : DELAY_TASK_LIST) { if (delayTask.triggerTime <= System.currentTimeMillis()) { log.info("處理延遲任務(wù):{}", delayTask.taskContent); DELAY_TASK_LIST.remove(delayTask); } } TimeUnit.MILLISECONDS.sleep(100); } catch (Exception e) { } } }).start(); log.info("提交延遲任務(wù)"); DELAY_TASK_LIST.add(new DelayTask("三友的java日記", 5L)); } @Getter @Setter public static class DelayTask { private final String taskContent; private final Long triggerTime; public DelayTask(String taskContent, Long delayTime) { this.taskContent = taskContent; this.triggerTime = System.currentTimeMillis() + delayTime * 1000; } } }
任務(wù)可以存在數(shù)據(jù)庫(kù)又或者是內(nèi)存,看具體的需求,這里我為了簡(jiǎn)單就放在內(nèi)存里了。
執(zhí)行結(jié)果:
這種操作簡(jiǎn)單,但是就是效率低下,每次都得遍歷所有的任務(wù)。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
java WSDL接口webService實(shí)現(xiàn)方式
這篇文章主要為大家詳細(xì)介紹了java WSDL接口webService實(shí)現(xiàn)方式的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-04-04java數(shù)據(jù)結(jié)構(gòu)之二分查找法 binarySearch的實(shí)例
這篇文章主要介紹了java數(shù)據(jù)結(jié)構(gòu)之二分查找法 binarySearch的實(shí)例的相關(guān)資料,希望通過(guò)本文能幫助到大家,讓大家理解掌握這部分內(nèi)容,需要的朋友可以參考下2017-10-10SpringMVC post請(qǐng)求中文亂碼問(wèn)題解決
這篇文章主要介紹了SpringMVC post請(qǐng)求中文亂碼問(wèn)題解決,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12Java中將MultipartFile和File互轉(zhuǎn)的方法詳解
我們?cè)陂_(kāi)發(fā)過(guò)程中經(jīng)常需要接收前端傳來(lái)的文件,通常需要處理MultipartFile格式的文件,今天來(lái)介紹一下MultipartFile和File怎么進(jìn)行優(yōu)雅的互轉(zhuǎn),需要的朋友可以參考下2023-10-10java實(shí)現(xiàn)文本框和文本區(qū)的輸入輸出
這篇文章主要介紹了java實(shí)現(xiàn)文本框和文本區(qū)的輸入輸出的方法和具體示例,有需要的小伙伴可以參考下。2015-06-06java 全角半角字符轉(zhuǎn)換的方法實(shí)例
這篇文章主要介紹了java 全角半角字符轉(zhuǎn)換的方法,大家參考使用吧2013-11-11Mybatis如何通過(guò)接口實(shí)現(xiàn)sql執(zhí)行原理解析
為了簡(jiǎn)化MyBatis的使用,MyBatis提供了接口方式自動(dòng)化生成調(diào)用過(guò)程,可以大大簡(jiǎn)化MyBatis的開(kāi)發(fā),下面這篇文章主要給大家介紹了關(guān)于Mybatis如何通過(guò)接口實(shí)現(xiàn)sql執(zhí)行原理解析的相關(guān)資料,需要的朋友可以參考下2023-01-01