Java線程池框架核心代碼解析
前言
多線程編程中,為每個(gè)任務(wù)分配一個(gè)線程是不現(xiàn)實(shí)的,線程創(chuàng)建的開銷和資源消耗都是很高的。線程池應(yīng)運(yùn)而生,成為我們管理線程的利器。Java 通過Executor接口,提供了一種標(biāo)準(zhǔn)的方法將任務(wù)的提交過程和執(zhí)行過程解耦開來,并用Runnable表示任務(wù)。
下面,我們來分析一下 Java 線程池框架的實(shí)現(xiàn)ThreadPoolExecutor。
下面的分析基于JDK1.7
生命周期
ThreadPoolExecutor中,使用CAPACITY的高3位來表示運(yùn)行狀態(tài),分別是:
1.RUNNING:接收新任務(wù),并且處理任務(wù)隊(duì)列中的任務(wù)
2.SHUTDOWN:不接收新任務(wù),但是處理任務(wù)隊(duì)列的任務(wù)
3.STOP:不接收新任務(wù),不出來任務(wù)隊(duì)列,同時(shí)中斷所有進(jìn)行中的任務(wù)
4.TIDYING:所有任務(wù)已經(jīng)被終止,工作線程數(shù)量為 0,到達(dá)該狀態(tài)會執(zhí)行terminated()
5.TERMINATED:terminated()執(zhí)行完畢
狀態(tài)轉(zhuǎn)換圖
ThreadPoolExecutor中用原子類來表示狀態(tài)位
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
線程池模型
核心參數(shù)
corePoolSize:最小存活的工作線程數(shù)量(如果設(shè)置allowCoreThreadTimeOut,那么該值為 0)
maximumPoolSize:最大的線程數(shù)量,受限于CAPACITY
keepAliveTime:對應(yīng)線程的存活時(shí)間,時(shí)間單位由TimeUnit指定
workQueue:工作隊(duì)列,存儲待執(zhí)行的任務(wù)
RejectExecutionHandler:拒絕策略,線程池滿后會觸發(fā)
線程池的最大容量:CAPACITY中的前三位用作標(biāo)志位,也就是說工作線程的最大容量為(2^29)-1
四種模型
CachedThreadPool:一個(gè)可緩存的線程池,如果線程池的當(dāng)前規(guī)模超過了處理需求時(shí),那么將回收空閑的線程,當(dāng)需求增加時(shí),則可以添加新的線程,線程池的規(guī)模不存在任何的限制。
FixedThreadPool:一個(gè)固定大小的線程池,提交一個(gè)任務(wù)時(shí)就創(chuàng)建一個(gè)線程,直到達(dá)到線程池的最大數(shù)量,這時(shí)線程池的大小將不再變化。
SingleThreadPool:一個(gè)單線程的線程池,它只有一個(gè)工作線程來執(zhí)行任務(wù),可以確保按照任務(wù)在隊(duì)列中的順序來串行執(zhí)行,如果這個(gè)線程異常結(jié)束將創(chuàng)建一個(gè)新的線程來執(zhí)行任務(wù)。
ScheduledThreadPool:一個(gè)固定大小的線程池,并且以延遲或者定時(shí)的方式來執(zhí)行任務(wù),類似于Timer。
執(zhí)行任務(wù) execute
核心邏輯:
1.當(dāng)前線程數(shù)量 < corePoolSize,直接開啟新的核心線程執(zhí)行任務(wù)addWorker(command, true)
2.當(dāng)前線程數(shù)量 >= corePoolSize,且任務(wù)加入工作隊(duì)列成功
1).檢查線程池當(dāng)前狀態(tài)是否處于RUNNING
2).如果否,則拒絕該任務(wù)
3).如果是,判斷當(dāng)前線程數(shù)量是否為 0,如果為 0,就增加一個(gè)工作線程。
3.開啟普通線程執(zhí)行任務(wù)addWorker(command, false),開啟失敗就拒絕該任務(wù)
從上面的分析可以總結(jié)出線程池運(yùn)行的四個(gè)階段:
1).poolSize < corePoolSize 且隊(duì)列為空,此時(shí)會新建線程來處理提交的任務(wù)
2).poolSize == corePoolSize,此時(shí)提交的任務(wù)進(jìn)入工作隊(duì)列,工作線程從隊(duì)列中獲取任務(wù)執(zhí)行,此時(shí)隊(duì)列不為空且未滿。
3).poolSize == corePoolSize,并且隊(duì)列已滿,此時(shí)也會新建線程來處理提交的任務(wù),但是poolSize < maxPoolSize
4).poolSize == maxPoolSize,并且隊(duì)列已滿,此時(shí)會觸發(fā)拒絕策略
拒絕策略
前面我們提到任務(wù)無法執(zhí)行會被拒絕,RejectedExecutionHandler是處理被拒絕任務(wù)的接口。下面是四種拒絕策略。
AbortPolicy:默認(rèn)策略,終止任務(wù),拋出RejectedException
CallerRunsPolicy:在調(diào)用者線程執(zhí)行當(dāng)前任務(wù),不拋異常
DiscardPolicy: 拋棄策略,直接丟棄任務(wù),不拋異常
DiscardOldersPolicy:拋棄最老的任務(wù),執(zhí)行當(dāng)前任務(wù),不拋異常
線程池中的 Worker
Worker繼承了AbstractQueuedSynchronizer和Runnable,前者給Worker提供鎖的功能,后者執(zhí)行工作線程的主要方法runWorker(Worker w)(從任務(wù)隊(duì)列撈任務(wù)執(zhí)行)。Worker 引用存在workers集合里面,用mainLock守護(hù)。
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();
核心函數(shù) runWorker
下面是簡化的邏輯,注意:每個(gè)工作線程的run都執(zhí)行下面的函數(shù)
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; while (task != null || (task = getTask()) != null) { w.lock(); beforeExecute(wt, task); task.run(); afterExecute(task, thrown); w.unlock(); } processWorkerExit(w, completedAbruptly); }
1.從getTask()中獲取任務(wù)
2.鎖住 worker
3.執(zhí)行beforeExecute(wt, task),這是ThreadPoolExecutor提供給子類的擴(kuò)展方法
4.運(yùn)行任務(wù),如果該worker有配置了首次任務(wù),則先執(zhí)行首次任務(wù)且只執(zhí)行一次。
5.執(zhí)行afterExecute(task, thrown);
6.解鎖 worker
7.如果獲取到的任務(wù)為 null,關(guān)閉 worker
獲取任務(wù) getTask
線程池內(nèi)部的任務(wù)隊(duì)列是一個(gè)阻塞隊(duì)列,具體實(shí)現(xiàn)在構(gòu)造時(shí)傳入。
private final BlockingQueue<Runnable> workQueue;
getTask()從任務(wù)隊(duì)列中獲取任務(wù),支持阻塞和超時(shí)等待任務(wù),四種情況會導(dǎo)致返回null,讓worker關(guān)閉。
1.現(xiàn)有的線程數(shù)量超過最大線程數(shù)量
2.線程池處于STOP狀態(tài)
3.線程池處于SHUTDOWN狀態(tài)且工作隊(duì)列為空
4.線程等待任務(wù)超時(shí),且線程數(shù)量超過保留線程數(shù)量
核心邏輯:根據(jù)timed在阻塞隊(duì)列上超時(shí)等待或者阻塞等待任務(wù),等待任務(wù)超時(shí)會導(dǎo)致工作線程被關(guān)閉。
timed = allowCoreThreadTimeOut || wc > corePoolSize; Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
在以下兩種情況下等待任務(wù)會超時(shí):
1.允許核心線程等待超時(shí),即allowCoreThreadTimeOut(true)
2.當(dāng)前線程是普通線程,此時(shí)wc > corePoolSize
工作隊(duì)列使用的是BlockingQueue,這里就不展開了,后面再寫一篇詳細(xì)的分析。
總結(jié)
ThreadPoolExecutor基于生產(chǎn)者-消費(fèi)者模式,提交任務(wù)的操作相當(dāng)于生產(chǎn)者,執(zhí)行任務(wù)的線程相當(dāng)于消費(fèi)者。
Executors提供了四種基于ThreadPoolExecutor構(gòu)造線程池模型的方法,除此之外,我們還可以直接繼承ThreadPoolExecutor,重寫beforeExecute和afterExecute方法來定制線程池任務(wù)執(zhí)行過程。
使用有界隊(duì)列還是無界隊(duì)列需要根據(jù)具體情況考慮,工作隊(duì)列的大小和線程的數(shù)量也是需要好好考慮的。
拒絕策略推薦使用CallerRunsPolicy,該策略不會拋棄任務(wù),也不會拋出異常,而是將任務(wù)回退到調(diào)用者線程中執(zhí)行。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
最簡單的MyBatis Plus的多表聯(lián)接、分頁查詢實(shí)現(xiàn)方法
這篇文章主要介紹了最簡單的MyBatis Plus的多表聯(lián)接、分頁查詢實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11Java如何避免死鎖和競態(tài)條件的實(shí)現(xiàn)
本文主要介紹了Java如何避免死鎖和競態(tài)條件的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-05-05Java基于ServletContextListener實(shí)現(xiàn)UDP監(jiān)聽
這篇文章主要介紹了Java基于ServletContextListener實(shí)現(xiàn)UDP監(jiān)聽,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12Java設(shè)計(jì)模式之抽象工廠模式實(shí)例詳解
這篇文章主要介紹了Java設(shè)計(jì)模式之抽象工廠模式,結(jié)合實(shí)例形式分析了抽象工廠模式的概念、功能、定義與使用方法,需要的朋友可以參考下2017-09-09SpringBoot項(xiàng)目配置明文密碼泄露問題的處理方式
這篇文章主要介紹了SpringBoot項(xiàng)目配置明文密碼泄露問題的處理方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06