Java中的Timer與TimerTask源碼及使用解析
一. Timer
在Java中,經(jīng)常使用Timer來定時(shí)調(diào)度任務(wù)。
Timer 調(diào)度任務(wù)有一次性調(diào)度和循環(huán)調(diào)度,循環(huán)調(diào)度有分為固定速率調(diào)度(fixRate)和固定時(shí)延調(diào)度(fixDelay)。固定速率就好比你今天加班到很晚,但是到了第二天還必須準(zhǔn)點(diǎn)到公司上班,如果你一不小心加班到了第二天早上 9 點(diǎn),你就連休息的時(shí)間都沒有了。而固定時(shí)延的意思是你必須睡夠 8 個(gè)小時(shí)再過來上班,如果你加班到凌晨 6 點(diǎn),那就可以下午過來上班了。固定速率強(qiáng)調(diào)準(zhǔn)點(diǎn),固定時(shí)延強(qiáng)調(diào)間隔。
Timer timer = new Timer(); TimerTask timerTask = new TimerTask() { @Override public void run() { // TODO Auto-generated method stub } }; //延遲1秒后開始調(diào)度任務(wù) timer.schedule(timerTask, 1000); //延遲1秒,固定延遲1秒周期性調(diào)度 timer.schedule(timerTask, 1000, 1000); //延遲1秒,固定速率1秒周期性調(diào)度 timer.scheduleAtFixedRate(timerTask, 1000, 1000);
如果你有一個(gè)任務(wù)必須每天準(zhǔn)點(diǎn)調(diào)度,那就應(yīng)該使用固定速率調(diào)度,并且要確保每個(gè)任務(wù)執(zhí)行時(shí)間不要太長,千萬別超過了第二天這個(gè)點(diǎn)。如果你有一個(gè)任務(wù)需要每隔幾分鐘跑一次,那就使用固定時(shí)延調(diào)度,它不是很在乎你的單個(gè)任務(wù)要跑多長時(shí)間。
二. 內(nèi)部源碼
Timer 類里包含一個(gè)任務(wù)隊(duì)列和一個(gè)異步輪訓(xùn)線程。任務(wù)隊(duì)列里容納了所有待執(zhí)行的任務(wù),所有的任務(wù)將會在這一個(gè)異步線程里執(zhí)行,切記任務(wù)的執(zhí)行代碼不可以拋出異常,否則會導(dǎo)致 Timer 線程掛掉,所有的任務(wù)都沒得執(zhí)行了。單個(gè)任務(wù)也不易執(zhí)行時(shí)間太長,否則會影響任務(wù)調(diào)度在時(shí)間上的精準(zhǔn)性。比如你一個(gè)任務(wù)跑了太久,其它等著調(diào)度的任務(wù)就一直處于饑餓狀態(tài)得不到調(diào)度。所有任務(wù)的執(zhí)行都是這單一的 TimerThread 線程。
/** * The timer task queue. This data structure is shared with the timer * thread. The timer produces tasks, via its various schedule calls, * and the timer thread consumes, executing timer tasks as appropriate, * and removing them from the queue when they're obsolete. */ private final TaskQueue queue = new TaskQueue(); //調(diào)度隊(duì)列 /** * The timer thread. */ private final TimerThread thread = new TimerThread(queue); //輪詢線程
Timer 的任務(wù)隊(duì)列 TaskQueue 是一個(gè)特殊的隊(duì)列,它內(nèi)部是一個(gè)數(shù)組。這個(gè)數(shù)組會按照待執(zhí)行時(shí)間進(jìn)行堆排序,堆頂元素總是待執(zhí)行時(shí)間最小的任務(wù)。輪訓(xùn)線程會每次輪訓(xùn)出時(shí)間點(diǎn)最近的并且到點(diǎn)的任務(wù)來執(zhí)行。數(shù)組會自動擴(kuò)容,如果任務(wù)非常多。
private TimerTask[] queue = new TimerTask[128]; //增加調(diào)度任務(wù)時(shí),如果長度超過隊(duì)列長度,則擴(kuò)容為原先2倍 void add(TimerTask task) { // Grow backing store if necessary if (size + 1 == queue.length) queue = Arrays.copyOf(queue, 2*queue.length); queue[++size] = task; fixUp(size); } private void sched(TimerTask task, long time, long period) { if (time < 0) throw new IllegalArgumentException("Illegal execution time."); // Constrain value of period sufficiently to prevent numeric // overflow while still being effectively infinitely large. if (Math.abs(period) > (Long.MAX_VALUE >> 1)) period >>= 1; synchronized(queue) { //任意線程均可修改調(diào)度隊(duì)列,因此會導(dǎo)致現(xiàn)場不安全,所以這邊用同步。 if (!thread.newTasksMayBeScheduled) throw new IllegalStateException("Timer already cancelled."); synchronized(task.lock) { //調(diào)度隊(duì)列增加任務(wù)時(shí),其他線程也可以修改該任務(wù),導(dǎo)致線程不安全,所以這邊用同步 if (task.state != TimerTask.VIRGIN) throw new IllegalStateException( "Task already scheduled or cancelled"); task.nextExecutionTime = time; task.period = period; task.state = TimerTask.SCHEDULED; } queue.add(task); //增加到調(diào)度隊(duì)列中 if (queue.getMin() == task) queue.notify(); } }
三. 任務(wù)狀態(tài)
TimerTask 有 4 個(gè)狀態(tài),VIRGIN 是默認(rèn)狀態(tài),剛剛實(shí)例化還沒有被調(diào)度。SCHEDULED 表示已經(jīng)將任務(wù)塞進(jìn) TaskQueue 等待被執(zhí)行。EXECUTED 表示任務(wù)已經(jīng)執(zhí)行完成。CANCELLED 表示任務(wù)被取消了,還沒來得及執(zhí)行就被人為取消了。
/** * The state of this task, chosen from the constants below. */ int state = VIRGIN; //默認(rèn)狀態(tài)是VIRGIN /** * This task has not yet been scheduled. */ static final int VIRGIN = 0; /** * This task is scheduled for execution. If it is a non-repeating task, * it has not yet been executed. */ static final int SCHEDULED = 1; /** * This non-repeating task has already executed (or is currently * executing) and has not been cancelled. */ static final int EXECUTED = 2; /** * This task has been cancelled (with a call to TimerTask.cancel). */ static final int CANCELLED = 3; /** * Next execution time for this task in the format returned by * System.currentTimeMillis, assuming this task is scheduled for execution. * For repeating tasks, this field is updated prior to each task execution. */ long nextExecutionTime; //下次調(diào)度執(zhí)行時(shí)間 /** * Period in milliseconds for repeating tasks. A positive value indicates * fixed-rate execution. A negative value indicates fixed-delay execution. * A value of 0 indicates a non-repeating task. */ long period = 0; //間隔
對于一個(gè)循環(huán)任務(wù)來說,它不存在 EXECUTED 狀態(tài),因?yàn)樗看蝿倓倛?zhí)行完成,就被重新調(diào)度了。EXECUTED 狀態(tài)僅僅存在于一次性任務(wù),而且這個(gè)狀態(tài)其實(shí)并不是表示任務(wù)已經(jīng)執(zhí)行完成,它是指已經(jīng)從任務(wù)隊(duì)列里摘出來了,馬上就要執(zhí)行。
任務(wù)間隔字段 period 比較特殊,當(dāng)使用固定速率時(shí),period 為正值,當(dāng)使用固定間隔時(shí),period 為負(fù)值,當(dāng)任務(wù)是一次性時(shí),period 為零。
public void schedule(TimerTask task, long delay) { if (delay < 0) throw new IllegalArgumentException("Negative delay."); sched(task, System.currentTimeMillis()+delay, 0); } public void schedule(TimerTask task, long delay, long period) { if (delay < 0) throw new IllegalArgumentException("Negative delay."); if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, System.currentTimeMillis()+delay, -period); } public void scheduleAtFixedRate(TimerTask task, long delay, long period) { if (delay < 0) throw new IllegalArgumentException("Negative delay."); if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, System.currentTimeMillis()+delay, period); }
對于固定速率來說,如果任務(wù)執(zhí)行時(shí)間太長超出了間隔,那么它可能會持續(xù)霸占任務(wù)隊(duì)列,因?yàn)樗恼{(diào)度時(shí)間將總是低于 currentTime,排在堆頂,每次輪訓(xùn)取出來的都是它。運(yùn)行完畢后,重新調(diào)度這個(gè)任務(wù),它的時(shí)間依舊趕不上。持續(xù)下去你會看到這個(gè)任務(wù)的調(diào)度時(shí)間遠(yuǎn)遠(yuǎn)落后于當(dāng)前時(shí)間,而其它任務(wù)可能會徹底餓死。這就是為什么一定要特別注意固定速率的循環(huán)任務(wù)運(yùn)行時(shí)間不宜過長。
四. 任務(wù)鎖
Timer 的任務(wù)支持取消操作,取消任務(wù)的線程和執(zhí)行任務(wù)的線程極有可能不是一個(gè)線程。有可能任務(wù)正在執(zhí)行中,結(jié)果另一個(gè)線程表示要取消任務(wù)。這時(shí)候 Timer 是如何處理的呢?在 TimerTask 類里看到了一把鎖。當(dāng)任務(wù)屬性需要修改的時(shí)候,都會加鎖。
public void cancel() { synchronized(queue) { thread.newTasksMayBeScheduled = false; queue.clear(); queue.notify(); // In case queue was already empty. } } private void sched(TimerTask task, long time, long period) { if (time < 0) throw new IllegalArgumentException("Illegal execution time."); // Constrain value of period sufficiently to prevent numeric // overflow while still being effectively infinitely large. if (Math.abs(period) > (Long.MAX_VALUE >> 1)) period >>= 1; synchronized(queue) { if (!thread.newTasksMayBeScheduled) throw new IllegalStateException("Timer already cancelled."); synchronized(task.lock) { if (task.state != TimerTask.VIRGIN) throw new IllegalStateException( "Task already scheduled or cancelled"); task.nextExecutionTime = time; task.period = period; task.state = TimerTask.SCHEDULED; } queue.add(task); if (queue.getMin() == task) queue.notify(); } } private void mainLoop() { while (true) { try { TimerTask task; boolean taskFired; synchronized(queue) { // Wait for queue to become non-empty while (queue.isEmpty() && newTasksMayBeScheduled) queue.wait(); if (queue.isEmpty()) break; // Queue is empty and will forever remain; die // Queue nonempty; look at first evt and do the right thing long currentTime, executionTime; task = queue.getMin(); synchronized(task.lock) { if (task.state == TimerTask.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again } currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; if (taskFired = (executionTime<=currentTime)) { if (task.period == 0) { // Non-repeating, remove queue.removeMin(); task.state = TimerTask.EXECUTED; } else { // Repeating task, reschedule queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period); } } } if (!taskFired) // Task hasn't yet fired; wait queue.wait(executionTime - currentTime); } if (taskFired) // Task fired; run it, holding no locks task.run(); } catch(InterruptedException e) { } } }
在任務(wù)運(yùn)行之前會檢查任務(wù)是不是已經(jīng)被取消了,如果取消了,就從隊(duì)列中移除。一旦任務(wù)開始運(yùn)行 run(),對于單次任務(wù)來說它就無法被取消了,而循環(huán)任務(wù)將不會繼續(xù)下次調(diào)度。如果任務(wù)沒有機(jī)會得到執(zhí)行(時(shí)間設(shè)置的太長),那么即使這個(gè)任務(wù)被取消了,它也會一直持續(xù)躺在任務(wù)隊(duì)列中。設(shè)想如果你調(diào)度了一系列久遠(yuǎn)的任務(wù),然后都取消了,這可能會成為一個(gè)內(nèi)存泄露點(diǎn)。所以 Timer 還單獨(dú)提供了一個(gè) purge() 方法可以一次性清空所有的已取消的任務(wù)。
public int purge() { int result = 0; synchronized(queue) { for (int i = queue.size(); i > 0; i--) { if (queue.get(i).state == TimerTask.CANCELLED) { queue.quickRemove(i); result++; } } if (result != 0) queue.heapify(); } return result; }
五. 任務(wù)隊(duì)列為空
任務(wù)隊(duì)列里沒有任務(wù)了,調(diào)度線程必須按一定的策略進(jìn)行睡眠。它需要睡眠一直到最先執(zhí)行的任務(wù)到點(diǎn)時(shí)立即醒來,所以睡眠截止時(shí)間就是第一個(gè)任務(wù)將要執(zhí)行的時(shí)間。同時(shí)在睡覺的時(shí)候,有可能會有新的任務(wù)被添加進(jìn)來,它的調(diào)度時(shí)間可能會更加提前,所以當(dāng)有新的任務(wù)到來時(shí)需要可以喚醒正在睡眠的線程。
private void mainLoop() { while (true) { try { TimerTask task; boolean taskFired; synchronized(queue) { // Wait for queue to become non-empty while (queue.isEmpty() && newTasksMayBeScheduled) queue.wait(); if (queue.isEmpty()) break; // Queue is empty and will forever remain; die // Queue nonempty; look at first evt and do the right thing long currentTime, executionTime; task = queue.getMin(); synchronized(task.lock) { if (task.state == TimerTask.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again } currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; if (taskFired = (executionTime<=currentTime)) { if (task.period == 0) { // Non-repeating, remove queue.removeMin(); task.state = TimerTask.EXECUTED; } else { // Repeating task, reschedule queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period); } } } if (!taskFired) // Task hasn't yet fired; wait queue.wait(executionTime - currentTime); } if (taskFired) // Task fired; run it, holding no locks task.run(); } catch(InterruptedException e) { } } } private void sched(TimerTask task, long time, long period) { if (time < 0) throw new IllegalArgumentException("Illegal execution time."); // Constrain value of period sufficiently to prevent numeric // overflow while still being effectively infinitely large. if (Math.abs(period) > (Long.MAX_VALUE >> 1)) period >>= 1; synchronized(queue) { if (!thread.newTasksMayBeScheduled) throw new IllegalStateException("Timer already cancelled."); synchronized(task.lock) { if (task.state != TimerTask.VIRGIN) throw new IllegalStateException( "Task already scheduled or cancelled"); task.nextExecutionTime = time; task.period = period; task.state = TimerTask.SCHEDULED; } queue.add(task); if (queue.getMin() == task) queue.notify(); } }
代碼中的 wait() 方法就是調(diào)用了 Object.wait() 來進(jìn)行睡眠。當(dāng)有新任務(wù)進(jìn)來了,發(fā)現(xiàn)這個(gè)新任務(wù)的運(yùn)行時(shí)間是最早的,那就調(diào)用 notify() 方法喚醒輪訓(xùn)線程。
六. Timer終止
Timer 提供了 cancel() 方法清空隊(duì)列,停止調(diào)度器,不允許有任何新任務(wù)進(jìn)來。它會將 newTasksMayBeScheduled 字段設(shè)置為 false 表示 Timer 即將終止。
public void cancel() { synchronized(queue) { thread.newTasksMayBeScheduled = false; queue.clear(); queue.notify(); // In case queue was already empty. } } private void sched(TimerTask task, long time, long period) { if (time < 0) throw new IllegalArgumentException("Illegal execution time."); // Constrain value of period sufficiently to prevent numeric // overflow while still being effectively infinitely large. if (Math.abs(period) > (Long.MAX_VALUE >> 1)) period >>= 1; synchronized(queue) { if (!thread.newTasksMayBeScheduled) //調(diào)度器已經(jīng)停止則拋出異常 throw new IllegalStateException("Timer already cancelled."); synchronized(task.lock) { if (task.state != TimerTask.VIRGIN) throw new IllegalStateException( "Task already scheduled or cancelled"); task.nextExecutionTime = time; task.period = period; task.state = TimerTask.SCHEDULED; } queue.add(task); if (queue.getMin() == task) queue.notify(); } }
我們還注意到 Timer.cancel() 方法會喚醒輪訓(xùn)線程,為的是可以立即停止輪訓(xùn)。不過如果任務(wù)正在執(zhí)行中,這之后 cancel() 就必須等到任務(wù)執(zhí)行完畢才可以停止。
到此這篇關(guān)于Java中的Timer與TimerTask源碼及使用解析的文章就介紹到這了,更多相關(guān)Java中的Timer與TimerTask內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java 處理圖片與base64 編碼的相互轉(zhuǎn)換的示例
本篇文章主要介紹了Java 處理圖片與base64 編碼的相互轉(zhuǎn)換的示例,具有一定的參考價(jià)值,有興趣的可以了解一下2017-08-08Java 中 String,StringBuffer 和 StringBuilder 的區(qū)別及用法
這篇文章主要介紹了Java 中 String,StringBuffer 和 StringBuilder 的區(qū)別及用法的相關(guān)資料,需要的朋友可以參考下2017-03-03Spring的事件機(jī)制知識點(diǎn)詳解及實(shí)例分析
在本篇內(nèi)容里小編給大家分享的是一篇關(guān)于Spring的事件機(jī)制知識點(diǎn)詳解及實(shí)例分析,有需要的朋友么可以參考下。2021-12-12詳解Spring Data操作Redis數(shù)據(jù)庫
Redis是一種NOSQL數(shù)據(jù)庫,Key-Value形式對數(shù)據(jù)進(jìn)行存儲,其中數(shù)據(jù)可以以內(nèi)存形式存在,也可以持久化到文件系統(tǒng)。Spring data對Redis進(jìn)行了很好的封裝,用起來也是十分的得心應(yīng)手,接下來通過本文給大家分享Spring Data操作Redis數(shù)據(jù)庫,需要的朋友參考下2017-03-03java圖的深度優(yōu)先遍歷實(shí)現(xiàn)隨機(jī)生成迷宮
這篇文章主要為大家詳細(xì)介紹了java圖的深度優(yōu)先遍歷實(shí)現(xiàn)隨機(jī)生成迷宮,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-01-01springboot實(shí)現(xiàn)maven多模塊和打包部署
本文主要介紹了springboot實(shí)現(xiàn)maven多模塊和打包部署,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04MyBatis的注解使用、ORM層優(yōu)化方式(懶加載和緩存)
這篇文章主要介紹了MyBatis的注解使用、ORM層優(yōu)化方式(懶加載和緩存),具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10