Java中定時任務(wù)的全方位場景實現(xiàn)思路分析
定時任務(wù)場景
在開發(fā)過程中,根據(jù)需求和業(yè)務(wù)的不同經(jīng)常會有很多場景需要用到不同特性的定時任務(wù),針對這些場景,這里提供不同的一個實現(xiàn)思路。定時任務(wù)可能需要的特性如下:
- 多線程執(zhí)行:即一個定時任務(wù)是需要多線程去跑的,因為一個線程太慢了
- 分布式執(zhí)行:在多線程的基礎(chǔ)上,用多臺機(jī)器的算力去執(zhí)行一個定時任務(wù)
- 動態(tài)時間的定時任務(wù):即定時任務(wù)的開始時間是不確定的
- 連續(xù)上下文多線程定時任務(wù):在使用多線程執(zhí)行定時任務(wù)的時候,嚴(yán)格按照任務(wù)的順序來執(zhí)行,即任務(wù)ABC,多線程執(zhí)行完A,然后再執(zhí)行B和C,不能存在A還有沒跑完的線程,B已經(jīng)開始的情況。
- 可暫停繼續(xù)的定時任務(wù):即一個任務(wù)執(zhí)行了一半,可以暫停后繼續(xù)運(yùn)行。
針對以上的幾種特性,這里講一講相對簡單的實現(xiàn)方式
不同定時任務(wù)特性對應(yīng)的實現(xiàn)方式
多線程執(zhí)行
多線程執(zhí)行相對來說是個比較簡單的需求,只需要定時任務(wù)觸發(fā)的時候,使用線程池去執(zhí)行任務(wù),在springboot中,只需要使用@Async注解就行了。
//定時任務(wù)代碼 @Scheduled(fixedDelay = 60000) public void doSomeThing() { servive.doA(); } ------------------------------------------- //執(zhí)行定時任務(wù)邏輯代碼 @Service public class Service(){ @Async("A-config") public void doA(){ //這里執(zhí)行邏輯省略 } } ------------------------------------------- @Bean("updateById") public Executor updateById() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 核心線程數(shù):線程池創(chuàng)建時候初始化的線程數(shù) executor.setCorePoolSize(64); // 最大線程數(shù):線程池最大的線程數(shù),只有在緩沖隊列滿了之后才會申請超過核心線程數(shù)的線程 executor.setMaxPoolSize(128); // 緩沖隊列:用來緩沖執(zhí)行任務(wù)的隊列 executor.setQueueCapacity(500); // 允許線程的空閑時間60秒:當(dāng)超過了核心線程之外的線程在空閑時間到達(dá)之后會被銷毀 executor.setKeepAliveSeconds(60); // 線程池名的前綴:設(shè)置好了之后可以方便我們定位處理任務(wù)所在的線程池 executor.setThreadNamePrefix("do-somthing-"); // 緩沖隊列滿了之后的拒絕策略:這里調(diào)用主線程執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setWaitForTasksToCompleteOnShutdown(true); executor.initialize(); return executor; }
分布式執(zhí)行
一般分布式定時任務(wù)主要是兩個問題,一個是觸發(fā),一個是分發(fā)。觸發(fā)需要考慮到只能有一個觸發(fā)成功,需要根據(jù)定時任務(wù)的間隔,指定合理的競爭策略,比如每日定時任務(wù),那就用當(dāng)日凌晨當(dāng)key,利用redis的setnx原子操作來競爭。 然后就是分發(fā)問題,一般任務(wù)觸發(fā)后,利用多臺機(jī)分布式執(zhí)行定時任務(wù),由一臺機(jī)觸發(fā)后,把任務(wù)分發(fā)到中間件中,然后多臺機(jī)器的服務(wù)去消費(fèi)這些任務(wù),這個中間件一般是隊列或者redis或者數(shù)據(jù)庫。
動態(tài)時間的定時任務(wù)
當(dāng)需要在一個不確定的時間去執(zhí)行任務(wù),比如我需要定時任務(wù)開啟一場折扣活動,而這個開啟時間取決于業(yè)務(wù)的其他條件出發(fā),這個開始時間會在某一個業(yè)務(wù)時刻存入數(shù)據(jù)庫或者reids,那就需要一個任務(wù)觸發(fā)檢測器了,可以跑一個1s為間隔的定時任務(wù),每秒鐘都檢查一次這個任務(wù)是否需要執(zhí)行或者是否到了執(zhí)行時間,如果需要執(zhí)行就立刻執(zhí)行相應(yīng)的任務(wù)邏輯。
連續(xù)上下文多線程定時任務(wù)
實際業(yè)務(wù)場景經(jīng)常會有這種情況,我需要連續(xù)的執(zhí)行A->B->C這三個任務(wù),一定要按嚴(yán)格的順序執(zhí)行,如果我給這三個任務(wù)分別設(shè)置三個執(zhí)行時間,那后續(xù)任務(wù)如果超時或者執(zhí)行時間超過預(yù)期,就有可能出現(xiàn)B任務(wù)在A沒結(jié)束就執(zhí)行。所以只能按同一個定時任務(wù)觸發(fā)來執(zhí)行。而這時候我又需要多線程來提高效率。所以這個場景的重點(diǎn)在于,子任務(wù)結(jié)束的判定也就是線程執(zhí)行完畢的判斷。這里有種很簡單的實現(xiàn)方式。
實現(xiàn)這個場景的基本思路就是,在執(zhí)行任務(wù)A時候創(chuàng)建線程池去執(zhí)行A任務(wù)邏輯,然后分發(fā)完之后,shutdown線程池,調(diào)用線程池的awaitTermination方法去阻塞等待線程所有執(zhí)行完畢。這個方法結(jié)束后,自然所有A任務(wù)就執(zhí)行完了,銷毀線程池后執(zhí)行任務(wù)B,再創(chuàng)建新的線程池來執(zhí)行任務(wù)B,以此類推,這樣既用了多線程執(zhí)行速度的優(yōu)點(diǎn)又保證了任務(wù)的執(zhí)行順序。當(dāng)然分布式用隊列和消費(fèi)者來做的話,就更簡單,只需要監(jiān)控隊列的已消費(fèi)和待發(fā)送以及消費(fèi)完成數(shù)量,就知道任務(wù)是否完全結(jié)束。以下是多線程上下文任務(wù)的一個實現(xiàn)代碼
//創(chuàng)建線程池A,這里線程池改造過可以指定名字 NamedThreadPoolExecutor APool = new NamedThreadPoolExecutor( 32, 128, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5), new ThreadPoolExecutor.CallerRunsPolicy(), this.getClass().getSimpleName()); ------------------------------------------------------------- //這里拿到任務(wù)后,把任務(wù)都丟到線程池里去執(zhí)行 List<Object> taskA = getTask(); for(Object task:taskA){ APool.execute(new CommonRunable(task)); } //這里調(diào)用shutdown和awaitTermination,并設(shè)置最大超時,在方法執(zhí)行完后就表示任務(wù)A已經(jīng)完全執(zhí)行完了 APool.shutdown(); APool.awaitTermination(Long.MAX_VALUE,TimeUnit.NANOSECONDS); //然后這里A已經(jīng)全部執(zhí)行完了,開始執(zhí)行B NamedThreadPoolExecutor BPool = new NamedThreadPoolExecutor( 32, 128, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5), new ThreadPoolExecutor.CallerRunsPolicy(), this.getClass().getSimpleName()); List<Object> taskB = getTask(); for(Object task:taskB){ BPool.execute(new CommonRunable(task)); } //這里調(diào)用shutdown和awaitTermination,并設(shè)置最大超時,在方法執(zhí)行完后就表示任務(wù)B已經(jīng)完全執(zhí)行完了,繼續(xù)執(zhí)行后面的任務(wù) BPool.shutdown(); BPool.awaitTermination(Long.MAX_VALUE,TimeUnit.NANOSECONDS); //繼續(xù)執(zhí)行C任務(wù) ...
可暫停并重新開始的定時任務(wù)
實現(xiàn)一個可暫停并重新開始的定時任務(wù),首先得考慮場景,如果這個暫停是主動暫停,那需要把這個任務(wù)執(zhí)行的進(jìn)度和一些中間數(shù)據(jù)存入中間件或者內(nèi)存,然后再次開始的時候讀取這些中間數(shù)據(jù)就行了。如果這個暫停的場景包含被動暫停,比如任務(wù)中斷,或者進(jìn)程掛了,那就需要在任務(wù)一開始就把中間數(shù)據(jù)和執(zhí)行進(jìn)度都使用中間件存儲,再次開始的時候會直接從中間件讀取任務(wù)。一般來說,這里的中間件會選擇redis,像java中一些list或者set數(shù)據(jù)都是很方便存儲和讀取的。
到此這篇關(guān)于Java中定時任務(wù)的全方位場景實現(xiàn)思路分析的文章就介紹到這了,更多相關(guān)Java定時任務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java 異步編程實踐_動力節(jié)點(diǎn)Java學(xué)院整理
異步編程提供了一個非阻塞的,事件驅(qū)動的編程模型。下面通過本文給大家介紹Java 異步編程實踐,感興趣的的朋友一起看看吧2017-05-05java:程序包c(diǎn)om.xxx.xxx不存在報錯萬能解決辦法
這篇文章主要給大家介紹了關(guān)于java:程序包c(diǎn)om.xxx.xxx不存在報錯萬能解決辦法,這個問題曾逼瘋初學(xué)者的我,不過弄清楚原理后就很簡單了,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下2023-12-12SpringBoot后端接收參數(shù)優(yōu)化代碼示例(統(tǒng)一處理前端參數(shù))
使用Spring Boot開發(fā)API的時候,讀取請求參數(shù)是服務(wù)端編碼中最基本的一項操作,下面這篇文章主要給大家介紹了關(guān)于SpringBoot后端接收參數(shù)優(yōu)化(統(tǒng)一處理前端參數(shù))的相關(guān)資料,需要的朋友可以參考下2024-07-07mybatis攔截器實現(xiàn)通用權(quán)限字段添加的方法
這篇文章主要給大家介紹了關(guān)于mybatis攔截器實現(xiàn)通用權(quán)限字段添加的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用mybatis具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09SpringBoot 整合Redisson重寫cacheName支持多參數(shù)的案例代碼
這篇文章主要介紹了SpringBoot 整合Redisson重寫cacheName支持多參數(shù),本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-01-01