Java多線程之scheduledThreadPool的方法解析
scheduledThreadPool
我們對java中定時任務(wù)實現(xiàn)可能會有以下疑問:
怎樣做到每個任務(wù)延遲指定時間執(zhí)行?
內(nèi)部使用了什么數(shù)據(jù)結(jié)構(gòu)保存延遲任務(wù)?
延遲任務(wù)放入scheduledThreadPool時機并不固定,怎么保證按延遲時間順序執(zhí)行?
構(gòu)造器
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
corePoolSize就是我們傳過了的參數(shù),maximumPoolSize是Integer.MAX_VALUE,所以最大線程是無窮大,非核心線程成活時間是0,所以非核心線程執(zhí)行完firstTask之后如果poll任務(wù)沒拿到任務(wù)則會直接銷毀。queue是DelayedWorkQueue。但通過后面的分析可以知道,最大線程數(shù)是不起作用的,最多會起核心線程數(shù)的數(shù)量
schedule(Runnable command,long delay, TimeUnit unit)方法
public ScheduledFuture<?>schedule(Runnable command, long delay, TimeUnit unit){ if(command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t =decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; }
- 通過decorateTask方法獲取到RunnableScheduledFuture(實際上是ScheduledFutureTask對象),并把delay時間變成了時間戳
- 執(zhí)行delayedExecute方法
delayedExecute方法
private voiddelayedExecute(RunnableScheduledFuture<?> task){ if(isShutdown()) reject(task); else{ super.getQueue().add(task); if(isShutdown()&& !canRunInCurrentRunState(task.isPeriodic())&& remove(task)) task.cancel(false); else ensurePrestart(); } }
- 使用queue.add方法把task放入queue
- 執(zhí)行ensurePrestart方法
offer方法
public boolean offer(Runnable x){ if(x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e =(RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if(i >= queue.length) grow(); size = i +1; if(i ==0){ queue[0]= e; setIndex(e,0); }else{ siftUp(i, e); } if(queue[0]== e){ leader = null; available.signal(); } } finally { lock.unlock(); } return true; }
- DelayedWorkQueue底層使用的是RunnableScheduledFuture的數(shù)組,初始化容量是16,之后擴容是以1.5倍進行。
- 在offer元素整個過程中使用ReentrantLock進行加鎖,所以DelayedWorkQueue是一個線程安全的隊列。然后使用了condition來實現(xiàn)阻塞的功能,當(dāng)poll沒有元素時會使用await進行等待,當(dāng)offer的是數(shù)組的第一個元素時會signal,這個signal的設(shè)計是排序的點睛之筆,設(shè)計的非常巧妙,這塊需要offer和take方法一起來看,在take方法時會拿第一個元素來判斷delay的時間,如果時間沒到會使用await休眠delay時間,但此時如果有delay時間更短的任務(wù)放入queue中,此時需要take的任務(wù)就不是之前的那個任務(wù)了,就要重新執(zhí)行邏輯獲取這個最新delay的任務(wù),這樣才能做到任務(wù)的正確執(zhí)行。
- 在offer元素時會使用siftUp方法來保證數(shù)組中元素是按delay時間從小到大排列,但要注意的是數(shù)組前半部分肯定都是排了delay最小的任務(wù),但后半部分不一定是有序的
ensurePrestart()方法
voidensurePrestart(){ int wc =workerCountOf(ctl.get()); if(wc < corePoolSize) addWorker(null, true); elseif(wc ==0) addWorker(null, false); }
這個比較簡單,addWorker方法之前我們也分析過了,需要注意的是這里的firstTask默認(rèn)是空的,所以工作線程會直接從queue中拿任務(wù)。這有個比較奇怪的else if,感覺應(yīng)該永遠不用執(zhí)行,因為wc==0肯定已經(jīng)被if條件攔截了,也就是只能起核心線程數(shù)。最大線程數(shù)永遠不會起作用
poll方法
public RunnableScheduledFuture<?>poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; 加鎖 lock.lockInterruptibly(); try { 自旋 for(;;){ 拿到queue中的第一個元素,如果是空則awaitNanos時間,等待時間過后如果queue中還是沒有元素則返回null。 RunnableScheduledFuture<?> first = queue[0]; if(first == null){ if(nanos <=0) return null; else nanos = available.awaitNanos(nanos); }else{ 拿到第一個任務(wù)的delay時間,如果到了delay時間則返回finishPoll方法的結(jié)果 long delay = first.getDelay(NANOSECONDS); if(delay <=0) returnfinishPoll(first); 如果傳入的nanos小于等于0則返回null if(nanos <=0) return null; first = null;// don't retain ref while waiting 如果等待時間還不夠或前一個需要執(zhí)行的任務(wù)還在執(zhí)行,則當(dāng)前線程直接等待 if(nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else{否則當(dāng)前線程可以執(zhí)行(leader線程),但需要awaitNanos delay的時間才能執(zhí)行 Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { 當(dāng)?shù)却龝r間到了之后就 leader = null說明此時可以返回finishPoll方法的結(jié)果 if(leader == thisThread) leader = null; } } } } } finally { if(leader == null && queue[0]!= null) available.signal(); lock.unlock(); } }
- DelayedWorkQueue的poll方法也是使用reentrantLock來保證線程安全,然后使用condition.awaitNanos來達到等待特定時間的效果,這里使用leader線程保證了排在第一位的任務(wù)只有一個工作線程獲取到,其他工作線程進行排隊等待,在獲取到第一個任務(wù)的工作線程delay時間到了之后會take到這個任務(wù)并signal排隊的第一個工作線程繼續(xù)獲取下一個任務(wù),周而復(fù)始。
- 在使用finishPoll方法返回delay時間到了的任務(wù)時會用siftDown對queue后半部分的任務(wù)進行排序,因為之前offer時使用siftUp方法只對queue前半部分進行了排序
- 回到ScheduledThreadPool線程池,keepAliveTime是0,所以當(dāng)first任務(wù)的delay時間還沒有到時會直接返回null,然后非核心工作線程就會直接銷毀,之后的代碼都不會執(zhí)行,而核心線程則執(zhí)行的take方法,take方法才會進入下面這段邏輯
if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } }
scheduleAtFixedRate方法
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
- 這個方法跟之前schedule方法差不多了,都是使用了ScheduledFutureTask,這邊多了個period變量保存執(zhí)行周期值,outerTask引用了自身的對象,然后也是使用delayExecute方法把任務(wù)放入了queue中,此時任務(wù)的delay是initialDelay,所以會在initialDelay時間之后出隊然后執(zhí)行
- 由于現(xiàn)在工作線程中的task是ScheduledFutureTask,所以工作線程調(diào)用的task.run方法是ScheduledFutureTask.run方法
ScheduledFutureTask.run方法
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
1.判斷是不是周期執(zhí)行的任務(wù),之前的schedule方法的period是0,所以會執(zhí)行super.run();然后執(zhí)行傳入的runnable中的run方法,而scheduleAtFixedRate方法的period不是0,則會執(zhí)行super.runAndReset();方法,執(zhí)行傳入的runnable中的run方法之后執(zhí)行setNextRunTime();
重新設(shè)置delay時間(initialDelay+period),然后把任務(wù)又放入queue中
scheduleWithFixedDelay方法
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t;
這個方法幾乎跟scheduleAtFixedRate方法一模一樣,區(qū)別在于period是個負數(shù),通過之前我們對scheduleAtFixedRate方法的分析,period這個參數(shù)在算周期執(zhí)行間隔時會用到,也就是setNextRunTime方法
setNextRunTime方法
private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }
當(dāng)period大于0時,也就是scheduleAtFixedRate執(zhí)行時,是直接在之前的time加上了period,而scheduleWithFixedDelay方法執(zhí)行時,是用triggerTime方法在當(dāng)前時間加上了periode,不同的計算方式的區(qū)別在于,scheduleAtFixedRate不會管任務(wù)的執(zhí)行時間,我只要保證任務(wù)固定頻率執(zhí)行就好了,所以他是幾乎精確的period時間執(zhí)行,而scheduleWithFixedDelay是在任務(wù)之后的時間+period時間來確定下一次任務(wù)執(zhí)行的時間,所以任務(wù)執(zhí)行的頻率相對來說不固定
到此這篇關(guān)于Java多線程之scheduledThreadPool的方法解析的文章就介紹到這了,更多相關(guān)scheduledThreadPool的方法內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
深入了解Spring Boot2.3.0及以上版本的Liveness和Readiness功能
這篇文章主要介紹了Spring Boot2.3.0及以上版本的Liveness和Readiness功能示例深入解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-10-10Spring Boot 簡單使用EhCache緩存框架的方法
本篇文章主要介紹了Spring Boot 簡單使用EhCache緩存框架的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-07-07MyBatis動態(tài)<if>標(biāo)簽使用避坑指南
這篇文章主要為大家介紹了MyBatis動態(tài)<if>標(biāo)簽使用避坑指南,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-03-03MyBatis-Plus實現(xiàn)公共字段自動填充功能詳解
在開發(fā)中經(jīng)常遇到多個實體類有共同的屬性字段,這些字段屬于公共字段,也就是很多表中都有這些字段,能不能對于這些公共字段在某個地方統(tǒng)一處理,來簡化開發(fā)呢?MyBatis-Plus就提供了這一功能,本文就來為大家詳細講講2022-08-08Spring內(nèi)置任務(wù)調(diào)度如何實現(xiàn)添加、取消與重置詳解
任務(wù)調(diào)度是我們?nèi)粘i_發(fā)中經(jīng)常會碰到的,下面這篇文章主要給大家介紹了關(guān)于Spring內(nèi)置任務(wù)調(diào)度如何實現(xiàn)添加、取消與重置的相關(guān)資料,文中通過示例代碼介紹的非常詳細,需要的朋友可以參考借鑒,下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2017-10-10SpringBoot四大神器之Actuator的使用小結(jié)
這篇文章主要介紹了SpringBoot四大神器之Actuator的使用小結(jié),詳細的介紹了Actuator的使用和端點的使用,有興趣的可以了解一下2017-11-11