java高并發(fā)ThreadPoolExecutor類解析線程池執(zhí)行流程
摘要
ThreadPoolExecutor是Java線程池中最核心的類之一,它能夠保證線程池按照正常的業(yè)務(wù)邏輯執(zhí)行任務(wù),并通過原子方式更新線程池每個階段的狀態(tài)。
今天,我們通過ThreadPoolExecutor類的源碼深度解析線程池執(zhí)行任務(wù)的核心流程,小伙伴們最好是打開IDEA,按照冰河說的步驟,調(diào)試下ThreadPoolExecutor類的源碼,這樣會理解的更加深刻,好了,開始今天的主題。
核心邏輯概述
ThreadPoolExecutor是Java線程池中最核心的類之一,它能夠保證線程池按照正常的業(yè)務(wù)邏輯執(zhí)行任務(wù),并通過原子方式更新線程池每個階段的狀態(tài)。
ThreadPoolExecutor類中存在一個workers工作線程集合,用戶可以向線程池中添加需要執(zhí)行的任務(wù),workers集合中的工作線程可以直接執(zhí)行任務(wù),或者從任務(wù)隊列中獲取任務(wù)后執(zhí)行。ThreadPoolExecutor類中提供了整個線程池從創(chuàng)建到執(zhí)行任務(wù),再到消亡的整個流程方法。本文,就結(jié)合ThreadPoolExecutor類的源碼深度分析線程池執(zhí)行任務(wù)的整體流程。
在ThreadPoolExecutor類中,線程池的邏輯主要體現(xiàn)在execute(Runnable)方法,addWorker(Runnable, boolean)方法,addWorkerFailed(Worker)方法和拒絕策略上,接下來,我們就深入分析這幾個核心方法。
execute(Runnable)方法
execute(Runnable)方法的作用是提交Runnable類型的任務(wù)到線程池中。我們先看下execute(Runnable)方法的源碼,如下所示。
public void execute(Runnable command) { //如果提交的任務(wù)為空,則拋出空指針異常 if (command == null) throw new NullPointerException(); //獲取線程池的狀態(tài)和線程池中線程的數(shù)量 int c = ctl.get(); //線程池中的線程數(shù)量小于corePoolSize的值 if (workerCountOf(c) < corePoolSize) { //重新開啟線程執(zhí)行任務(wù) if (addWorker(command, true)) return; c = ctl.get(); } //如果線程池處于RUNNING狀態(tài),則將任務(wù)添加到阻塞隊列中 if (isRunning(c) && workQueue.offer(command)) { //再次獲取線程池的狀態(tài)和線程池中線程的數(shù)量,用于二次檢查 int recheck = ctl.get(); //如果線程池沒有未處于RUNNING狀態(tài),從隊列中刪除任務(wù) if (! isRunning(recheck) && remove(command)) //執(zhí)行拒絕策略 reject(command); //如果線程池為空,則向線程池中添加一個線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //任務(wù)隊列已滿,則新增worker線程,如果新增線程失敗,則執(zhí)行拒絕策略 else if (!addWorker(command, false)) reject(command); }
整個任務(wù)的執(zhí)行流程,我們可以簡化成下圖所示。
接下來,我們拆解execute(Runnable)方法,具體分析execute(Runnable)方法的執(zhí)行邏輯。
(1)線程池中的線程數(shù)是否小于corePoolSize核心線程數(shù),如果小于corePoolSize核心線程數(shù),則向workers工作線程集合中添加一個核心線程執(zhí)行任務(wù)。代碼如下所示。
//線程池中的線程數(shù)量小于corePoolSize的值 if (workerCountOf(c) < corePoolSize) { //重新開啟線程執(zhí)行任務(wù) if (addWorker(command, true)) return; c = ctl.get(); }
(2)如果線程池中的線程數(shù)量大于corePoolSize核心線程數(shù),則判斷當(dāng)前線程池是否處于RUNNING狀態(tài),如果處于RUNNING狀態(tài),則添加任務(wù)到待執(zhí)行的任務(wù)隊列中。注意:這里向任務(wù)隊列添加任務(wù)時,需要判斷線程池是否處于RUNNING狀態(tài),只有線程池處于RUNNING狀態(tài)時,才能向任務(wù)隊列添加新任務(wù)。否則,會執(zhí)行拒絕策略。代碼如下所示。
if (isRunning(c) && workQueue.offer(command))
(3)向任務(wù)隊列中添加任務(wù)成功,由于其他線程可能會修改線程池的狀態(tài),所以這里需要對線程池進行二次檢查,如果當(dāng)前線程池的狀態(tài)不再是RUNNING狀態(tài),則需要將添加的任務(wù)從任務(wù)隊列中移除,執(zhí)行后續(xù)的拒絕策略。如果當(dāng)前線程池仍然處于RUNNING狀態(tài),則判斷線程池是否為空,如果線程池中不存在任何線程,則新建一個線程添加到線程池中,如下所示。
//再次獲取線程池的狀態(tài)和線程池中線程的數(shù)量,用于二次檢查 int recheck = ctl.get(); //如果線程池沒有未處于RUNNING狀態(tài),從隊列中刪除任務(wù) if (! isRunning(recheck) && remove(command)) //執(zhí)行拒絕策略 reject(command); //如果線程池為空,則向線程池中添加一個線程 else if (workerCountOf(recheck) == 0) addWorker(null, false);
(4)如果在步驟(3)中向任務(wù)隊列中添加任務(wù)失敗,則嘗試開啟新的線程執(zhí)行任務(wù)。此時,如果線程池中的線程數(shù)量已經(jīng)大于線程池中的最大線程數(shù)maximumPoolSize,則不能再啟動新線程。此時,表示線程池中的任務(wù)隊列已滿,并且線程池中的線程已滿,需要執(zhí)行拒絕策略,代碼如下所示。
//任務(wù)隊列已滿,則新增worker線程,如果新增線程失敗,則執(zhí)行拒絕策略 else if (!addWorker(command, false)) reject(command);
這里,我們將execute(Runnable)方法拆解,結(jié)合流程圖來理解線程池中任務(wù)的執(zhí)行流程就比較簡單了。可以這么說,execute(Runnable)方法的邏輯基本上就是一般線程池的執(zhí)行邏輯,理解了execute(Runnable)方法,就基本理解了線程池的執(zhí)行邏輯。
注意:有關(guān)ScheduledThreadPoolExecutor類和ForkJoinPool類執(zhí)行線程池的邏輯,在【高并發(fā)專題】系列文章中的后文中會詳細說明,理解了這些類的執(zhí)行邏輯,就基本全面掌握了線程池的執(zhí)行流程。
在分析execute(Runnable)方法的源碼時,我們發(fā)現(xiàn)execute(Runnable)方法中多處調(diào)用了addWorker(Runnable, boolean)方法,接下來,我們就一起分析下addWorker(Runnable, boolean)方法的邏輯。
addWorker(Runnable, boolean)方法
總體上,addWorker(Runnable, boolean)方法可以分為三部分,第一部分是使用CAS安全的向線程池中添加工作線程;第二部分是創(chuàng)建新的工作線程;第三部分則是將任務(wù)通過安全的并發(fā)方式添加到workers中,并啟動工作線程執(zhí)行任務(wù)。
接下來,我們看下addWorker(Runnable, boolean)方法的源碼,如下所示。
private boolean addWorker(Runnable firstTask, boolean core) { //標記重試的標識 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 檢查隊列是否在某些特定的條件下為空 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //下面循環(huán)的主要作用為通過CAS方式增加線程的個數(shù) for (;;) { //獲取線程池中的線程數(shù)量 int wc = workerCountOf(c); //如果線程池中的線程數(shù)量超出限制,直接返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //通過CAS方式向線程池新增線程數(shù)量 if (compareAndIncrementWorkerCount(c)) //通過CAS方式保證只有一個線程執(zhí)行成功,跳出最外層循環(huán) break retry; //重新獲取ctl的值 c = ctl.get(); //如果CAS操作失敗了,則需要在內(nèi)循環(huán)中重新嘗試通過CAS新增線程數(shù)量 if (runStateOf(c) != rs) continue retry; } } //跳出最外層for循環(huán),說明通過CAS新增線程數(shù)量成功 //此時創(chuàng)建新的工作線程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //將執(zhí)行的任務(wù)封裝成worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //獨占鎖,保證操作workers時的同步 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //此處需要重新檢查線程池狀態(tài) //原因是在獲得鎖之前可能其他的線程改變了線程池的狀態(tài) int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); //向worker中添加新任務(wù) workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; //將是否添加了新任務(wù)的標識設(shè)置為true workerAdded = true; } } finally { //釋放獨占鎖 mainLock.unlock(); } //添加新任成功,則啟動線程執(zhí)行任務(wù) if (workerAdded) { t.start(); //將任務(wù)是否已經(jīng)啟動的標識設(shè)置為true workerStarted = true; } } } finally { //如果任務(wù)未啟動或啟動失敗,則調(diào)用addWorkerFailed(Worker)方法 if (! workerStarted) addWorkerFailed(w); } //返回是否啟動任務(wù)的標識 return workerStarted; }
乍一看,addWorker(Runnable, boolean)方法還蠻長的,這里,我們還是將addWorker(Runnable, boolean)方法進行拆解。
(1)檢查任務(wù)隊列是否在某些特定的條件下為空,代碼如下所示。
// 檢查隊列是否在某些特定的條件下為空 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;
(2)在通過步驟(1)的校驗后,則進入內(nèi)層for循環(huán),在內(nèi)層for循環(huán)中通過CAS來增加線程池中的線程數(shù)量,如果CAS操作成功,則直接退出雙重for循環(huán)。如果CAS操作失敗,則查看當(dāng)前線程池的狀態(tài)是否發(fā)生了變化,如果線程池的狀態(tài)發(fā)生了變化,則通過continue關(guān)鍵字重新通過外層for循環(huán)校驗任務(wù)隊列,檢驗通過再次執(zhí)行內(nèi)層for循環(huán)的CAS操作。如果線程池的狀態(tài)沒有發(fā)生變化,此時上一次CAS操作失敗了,則繼續(xù)嘗試CAS操作。代碼如下所示。
for (;;) { //獲取線程池中的線程數(shù)量 int wc = workerCountOf(c); //如果線程池中的線程數(shù)量超出限制,直接返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //通過CAS方式向線程池新增線程數(shù)量 if (compareAndIncrementWorkerCount(c)) //通過CAS方式保證只有一個線程執(zhí)行成功,跳出最外層循環(huán) break retry; //重新獲取ctl的值 c = ctl.get(); //如果CAS操作失敗了,則需要在內(nèi)循環(huán)中重新嘗試通過CAS新增線程數(shù)量 if (runStateOf(c) != rs) continue retry; }
(3)CAS操作成功后,表示向線程池中成功添加了工作線程,此時,還沒有線程去執(zhí)行任務(wù)。使用全局的獨占鎖mainLock來將新增的工作線程Worker對象安全的添加到workers中。
總體邏輯就是:創(chuàng)建新的Worker對象,并獲取Worker對象中的執(zhí)行線程,如果線程不為空,則獲取獨占鎖,獲取鎖成功后,再次檢查線線程的狀態(tài),這是避免在獲取獨占鎖之前其他線程修改了線程池的狀態(tài),或者關(guān)閉了線程池。如果線程池關(guān)閉,則需要釋放鎖。否則將新增加的線程添加到工作集合中,釋放鎖并啟動線程執(zhí)行任務(wù)。將是否啟動線程的標識設(shè)置為true。最后,判斷線程是否啟動,如果沒有啟動,則調(diào)用addWorkerFailed(Worker)方法。最終返回線程是否起送的標識。
//跳出最外層for循環(huán),說明通過CAS新增線程數(shù)量成功 //此時創(chuàng)建新的工作線程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //將執(zhí)行的任務(wù)封裝成worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //獨占鎖,保證操作workers時的同步 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //此處需要重新檢查線程池狀態(tài) //原因是在獲得鎖之前可能其他的線程改變了線程池的狀態(tài) int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); //向worker中添加新任務(wù) workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; //將是否添加了新任務(wù)的標識設(shè)置為true workerAdded = true; } } finally { //釋放獨占鎖 mainLock.unlock(); } //添加新任成功,則啟動線程執(zhí)行任務(wù) if (workerAdded) { t.start(); //將任務(wù)是否已經(jīng)啟動的標識設(shè)置為true workerStarted = true; } } } finally { //如果任務(wù)未啟動或啟動失敗,則調(diào)用addWorkerFailed(Worker)方法 if (! workerStarted) addWorkerFailed(w); } //返回是否啟動任務(wù)的標識 return workerStarted;
addWorkerFailed(Worker)方法
在addWorker(Runnable, boolean)方法中,如果添加工作線程失敗或者工作線程啟動失敗時,則會調(diào)用addWorkerFailed(Worker)方法,下面我們就來看看addWorkerFailed(Worker)方法的實現(xiàn),如下所示。
private void addWorkerFailed(Worker w) { //獲取獨占鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //如果Worker任務(wù)不為空 if (w != null) //將任務(wù)從workers集合中移除 workers.remove(w); //通過CAS將任務(wù)數(shù)量減1 decrementWorkerCount(); tryTerminate(); } finally { //釋放鎖 mainLock.unlock(); } }
addWorkerFailed(Worker)方法的邏輯就比較簡單了,獲取獨占鎖,將任務(wù)從workers中移除,并且通過CAS將任務(wù)的數(shù)量減1,最后釋放鎖。
拒絕策略
我們在分析execute(Runnable)方法時,線程池會在適當(dāng)?shù)臅r候調(diào)用reject(Runnable)方法來執(zhí)行相應(yīng)的拒絕策略,我們看下reject(Runnable)方法的實現(xiàn),如下所示。
final void reject(Runnable command) { handler.rejectedExecution(command, this); }
通過代碼,我們發(fā)現(xiàn)調(diào)用的是handler的rejectedExecution方法,handler又是個什么鬼,我們繼續(xù)跟進代碼,如下所示。
private volatile RejectedExecutionHandler handler;
再看看RejectedExecutionHandler是個啥類型,如下所示。
package java.util.concurrent; public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
可以發(fā)現(xiàn)RejectedExecutionHandler是個接口,定義了一個rejectedExecution(Runnable, ThreadPoolExecutor)方法。既然RejectedExecutionHandler是個接口,那我們就看看有哪些類實現(xiàn)了RejectedExecutionHandler接口。
看到這里,我們發(fā)現(xiàn)RejectedExecutionHandler接口的實現(xiàn)類正是線程池默認提供的四種拒絕策略的實現(xiàn)類。
至于reject(Runnable)方法中具體會執(zhí)行哪個類的拒絕策略,是根據(jù)創(chuàng)建線程池時傳遞的參數(shù)決定的。如果沒有傳遞拒絕策略,則默認會執(zhí)行AbortPolicy類的拒絕策略。否則會執(zhí)行傳遞的類的拒絕策略。
在創(chuàng)建線程池時,除了能夠傳遞JDK默認提供的拒絕策略外,還可以傳遞自定義的拒絕策略。如果想使用自定義的拒絕策略,則只需要實現(xiàn)RejectedExecutionHandler接口,并重寫rejectedExecution(Runnable, ThreadPoolExecutor)方法即可。例如,下面的代碼。
public class CustomPolicy implements RejectedExecutionHandler { public CustomPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { System.out.println("使用調(diào)用者所在的線程來執(zhí)行任務(wù)") r.run(); } } }
使用如下方式創(chuàng)建線程池。
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new CustomPolicy());
至此,線程池執(zhí)行任務(wù)的整體核心邏輯分析結(jié)束,更多關(guān)于java ThreadPoolExecutor類解析線程池的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java實戰(zhàn)個人博客系統(tǒng)的實現(xiàn)流程
讀萬卷書不如行萬里路,只學(xué)書上的理論是遠遠不夠的,只有在實戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+springboot+mybatis+redis+vue+elementui+Mysql實現(xiàn)一個個人博客系統(tǒng),大家可以在過程中查缺補漏,提升水平2022-01-01詳解spring+springmvc+mybatis整合注解
本篇文章主要介紹了詳解spring+springmvc+mybatis整合注解,詳細的介紹了ssm框架的使用,具有一定的參考價值,有興趣的可以了解一下2017-04-04關(guān)于jd-gui啟動報This?program?requires?Java?1.8+的錯誤問題及解決方法
最近,在Mac使用上JD-GUI啟動時總是報錯,接下來通過本文給大家介紹關(guān)于jd-gui啟動報this?program?requires?Java?1.8+的錯誤問題及解決方法,需要的朋友可以參考下2022-05-05SpringBoot中實現(xiàn)訂單30分鐘自動取消的項目實踐
現(xiàn)在電子商務(wù)平臺上訂單創(chuàng)建成功,等待支付,一般會給30分鐘的時間,本文主要介紹了SpringBoot中實現(xiàn)訂單30分鐘自動取消的項目實踐,具有一定的參考價值,感興趣的可以了解一下2023-10-10