一篇文章帶你搞懂Java線(xiàn)程池實(shí)現(xiàn)原理
1. 為什么要使用線(xiàn)程池
使用線(xiàn)程池通常由以下兩個(gè)原因:
- 頻繁創(chuàng)建銷(xiāo)毀線(xiàn)程需要消耗系統(tǒng)資源,使用線(xiàn)程池可以復(fù)用線(xiàn)程。
- 使用線(xiàn)程池可以更容易管理線(xiàn)程,線(xiàn)程池可以動(dòng)態(tài)管理線(xiàn)程個(gè)數(shù)、具有阻塞隊(duì)列、定時(shí)周期執(zhí)行任務(wù)、環(huán)境隔離等。
2. 線(xiàn)程池的使用
/** * @author 一燈架構(gòu) * @apiNote 線(xiàn)程池示例 **/ public class ThreadPoolDemo { public static void main(String[] args) { // 1. 創(chuàng)建線(xiàn)程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); // 2. 往線(xiàn)程池中提交3個(gè)任務(wù) for (int i = 0; i < 3; i++) { threadPoolExecutor.execute(() -> { System.out.println(Thread.currentThread().getName() + " 關(guān)注公眾號(hào):一燈架構(gòu)"); }); } // 3. 關(guān)閉線(xiàn)程池 threadPoolExecutor.shutdown(); } }
線(xiàn)程池的使用非常簡(jiǎn)單:
- 調(diào)用new ThreadPoolExecutor()構(gòu)造方法,指定核心參數(shù),創(chuàng)建線(xiàn)程池。
- 調(diào)用execute()方法提交Runnable任務(wù)
- 使用結(jié)束后,調(diào)用shutdown()方法,關(guān)閉線(xiàn)程池。
再看一下線(xiàn)程池構(gòu)造方法中核心參數(shù)的作用。
3. 線(xiàn)程池核心參數(shù)
線(xiàn)程池共有七大核心參數(shù):
參數(shù)名稱(chēng) | 參數(shù)含義 |
---|---|
int corePoolSize | 核心線(xiàn)程數(shù) |
int maximumPoolSize | 最大線(xiàn)程數(shù) |
long keepAliveTime | 線(xiàn)程存活時(shí)間 |
TimeUnit unit | 時(shí)間單位 |
BlockingQueue workQueue | 阻塞隊(duì)列 |
ThreadFactory threadFactory | 線(xiàn)程創(chuàng)建工廠(chǎng) |
RejectedExecutionHandler handler | 拒絕策略 |
1.corePoolSize 核心線(xiàn)程數(shù)
當(dāng)往線(xiàn)程池中提交任務(wù),會(huì)創(chuàng)建線(xiàn)程去處理任務(wù),直到線(xiàn)程數(shù)達(dá)到corePoolSize,才會(huì)往阻塞隊(duì)列中添加任務(wù)。默認(rèn)情況下,空閑的核心線(xiàn)程并不會(huì)被回收,除非配置了allowCoreThreadTimeOut=true。
2.maximumPoolSize 最大線(xiàn)程數(shù)
當(dāng)線(xiàn)程池中的線(xiàn)程數(shù)達(dá)到corePoolSize,阻塞隊(duì)列又滿(mǎn)了之后,才會(huì)繼續(xù)創(chuàng)建線(xiàn)程,直到達(dá)到maximumPoolSize,另外空閑的非核心線(xiàn)程會(huì)被回收。
3.keepAliveTime 線(xiàn)程存活時(shí)間
非核心線(xiàn)程的空閑時(shí)間達(dá)到了keepAliveTime,將會(huì)被回收。
4.TimeUnit 時(shí)間單位
線(xiàn)程存活時(shí)間的單位,默認(rèn)是TimeUnit.MILLISECONDS(毫秒),可選擇的有:
- TimeUnit.NANOSECONDS(納秒)
- TimeUnit.MICROSECONDS(微秒)
- TimeUnit.MILLISECONDS(毫秒)
- TimeUnit.SECONDS(秒)
- TimeUnit.MINUTES(分鐘)
- TimeUnit.HOURS(小時(shí))
- TimeUnit.DAYS(天)
5.workQueue 阻塞隊(duì)列
當(dāng)線(xiàn)程池中的線(xiàn)程數(shù)達(dá)到corePoolSize,再提交的任務(wù)就會(huì)放到阻塞隊(duì)列的等待,默認(rèn)使用的是LinkedBlockingQueue,可選擇的有:
- LinkedBlockingQueue(基于鏈表實(shí)現(xiàn)的阻塞隊(duì)列)
- ArrayBlockingQueue(基于數(shù)組實(shí)現(xiàn)的阻塞隊(duì)列)
- SynchronousQueue(只有一個(gè)元素的阻塞隊(duì)列)
- PriorityBlockingQueue(實(shí)現(xiàn)了優(yōu)先級(jí)的阻塞隊(duì)列)
- DelayQueue(實(shí)現(xiàn)了延遲功能的阻塞隊(duì)列)
6.threadFactory 線(xiàn)程創(chuàng)建工廠(chǎng)
用來(lái)創(chuàng)建線(xiàn)程的工廠(chǎng),默認(rèn)的是Executors.defaultThreadFactory(),可選擇的還有Executors.privilegedThreadFactory()實(shí)現(xiàn)了線(xiàn)程優(yōu)先級(jí)。當(dāng)然也可以自定義線(xiàn)程創(chuàng)建工廠(chǎng),創(chuàng)建線(xiàn)程的時(shí)候最好指定線(xiàn)程名稱(chēng),便于排查問(wèn)題。
7.RejectedExecutionHandler 拒絕策略
當(dāng)線(xiàn)程池中的線(xiàn)程數(shù)達(dá)到maximumPoolSize,阻塞隊(duì)列也滿(mǎn)了之后,再往線(xiàn)程池中提交任務(wù),就會(huì)觸發(fā)執(zhí)行拒絕策略,默認(rèn)的是AbortPolicy(直接終止,拋出異常),可選擇的有:
- AbortPolicy(直接終止,拋出異常)
- DiscardPolicy(默默丟棄,不拋出異常)
- DiscardOldestPolicy(丟棄隊(duì)列中最舊的任務(wù),執(zhí)行當(dāng)前任務(wù))
- CallerRunsPolicy(返回給調(diào)用者執(zhí)行)
4. 線(xiàn)程池工作原理
線(xiàn)程池的工作原理,簡(jiǎn)單理解如下:
- 當(dāng)往線(xiàn)程池中提交任務(wù)的時(shí)候,會(huì)先判斷線(xiàn)程池中線(xiàn)程數(shù)是否核心線(xiàn)程數(shù),如果小于,會(huì)創(chuàng)建核心線(xiàn)程并執(zhí)行任務(wù)。
- 如果線(xiàn)程數(shù)大于核心線(xiàn)程數(shù),會(huì)判斷阻塞隊(duì)列是否已滿(mǎn),如果沒(méi)有滿(mǎn),會(huì)把任務(wù)添加到阻塞隊(duì)列中等待調(diào)度執(zhí)行。
- 如果阻塞隊(duì)列已滿(mǎn),會(huì)判斷線(xiàn)程數(shù)是否小于最大線(xiàn)程數(shù),如果小于,會(huì)繼續(xù)創(chuàng)建最大線(xiàn)程數(shù)并執(zhí)行任務(wù)。
- 如果線(xiàn)程數(shù)大于最大線(xiàn)程數(shù),會(huì)執(zhí)行拒絕策略,然后結(jié)束。
5. 線(xiàn)程池源碼剖析
5.1 線(xiàn)程池的屬性
public class ThreadPoolExecutor extends AbstractExecutorService { // 線(xiàn)程池的控制狀態(tài),Integer長(zhǎng)度是32位,前3位用來(lái)存儲(chǔ)線(xiàn)程池狀態(tài),后29位用來(lái)存儲(chǔ)線(xiàn)程數(shù)量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 線(xiàn)程個(gè)數(shù)所占的位數(shù) private static final int COUNT_BITS = Integer.SIZE - 3; // 線(xiàn)程池的最大容量,2^29-1,約5億個(gè)線(xiàn)程 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 獨(dú)占鎖,用來(lái)控制多線(xiàn)程下的并發(fā)操作 private final ReentrantLock mainLock = new ReentrantLock(); // 工作線(xiàn)程的集合 private final HashSet<Worker> workers = new HashSet<>(); // 等待條件,用來(lái)響應(yīng)中斷 private final Condition termination = mainLock.newCondition(); // 是否允許回收核心線(xiàn)程 private volatile boolean allowCoreThreadTimeOut; // 線(xiàn)程數(shù)的歷史峰值 private int largestPoolSize; /** * 以下是線(xiàn)程池的七大核心參數(shù) */ private volatile int corePoolSize; private volatile int maximumPoolSize; private volatile long keepAliveTime; private final BlockingQueue<Runnable> workQueue; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; }
線(xiàn)程池的控制狀態(tài)ctl用來(lái)存儲(chǔ)線(xiàn)程池狀態(tài)和線(xiàn)程個(gè)數(shù),前3位用來(lái)存儲(chǔ)線(xiàn)程池狀態(tài),后29位用來(lái)存儲(chǔ)線(xiàn)程數(shù)量。
設(shè)計(jì)者多聰明,用一個(gè)變量存儲(chǔ)了兩塊內(nèi)容。
5.2 線(xiàn)程池狀態(tài)
線(xiàn)程池共有5種狀態(tài):
狀態(tài)名稱(chēng) | 狀態(tài)含義 | 狀態(tài)作用 |
---|---|---|
RUNNING | 運(yùn)行中 | 線(xiàn)程池創(chuàng)建后默認(rèn)狀態(tài),接收新任務(wù),并處理阻塞隊(duì)列中的任務(wù)。 |
SHUTDOWN | 已關(guān)閉 | 調(diào)用shutdown方法后處于該狀態(tài),不再接收新任務(wù),處理阻塞隊(duì)列中任務(wù)。 |
STOP | 已停止 | 調(diào)用shutdownNow方法后處于該狀態(tài),不再新任務(wù),并中斷所有線(xiàn)程,丟棄阻塞隊(duì)列中所有任務(wù)。 |
TIDYING | 處理中 | 所有任務(wù)已完成,所有工作線(xiàn)程都已回收,等待調(diào)用terminated方法。 |
TERMINATED | 已終止 | 調(diào)用terminated方法后處于該狀態(tài),線(xiàn)程池的最終狀態(tài)。 |
5.3 execute源碼
看一下往線(xiàn)程池中提交任務(wù)的源碼,這是線(xiàn)程池的核心邏輯:
// 往線(xiàn)程池中提交任務(wù) public void execute(Runnable command) { // 1. 判斷提交的任務(wù)是否為null if (command == null) throw new NullPointerException(); int c = ctl.get(); // 2. 判斷線(xiàn)程數(shù)是否小于核心線(xiàn)程數(shù) if (workerCountOf(c) < corePoolSize) { // 3. 把任務(wù)包裝成worker,添加到worker集合中 if (addWorker(command, true)) return; c = ctl.get(); } // 4. 判斷如果線(xiàn)程數(shù)不小于corePoolSize,并且可以添加到阻塞隊(duì)列 if (isRunning(c) && workQueue.offer(command)) { // 5. 重新檢查線(xiàn)程池狀態(tài),如果線(xiàn)程池不是運(yùn)行狀態(tài),就移除剛才添加的任務(wù),并執(zhí)行拒絕策略 int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); // 6. 判斷如果線(xiàn)程數(shù)是0,就創(chuàng)建非核心線(xiàn)程(任務(wù)是null,會(huì)從阻塞隊(duì)列中拉取任務(wù)) else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 7. 如果添加阻塞隊(duì)列失敗,就創(chuàng)建一個(gè)Worker else if (!addWorker(command, false)) // 8. 如果創(chuàng)建Worker失敗說(shuō)明已經(jīng)達(dá)到最大線(xiàn)程數(shù)了,則執(zhí)行拒絕策略 reject(command); }
execute方法的邏輯也很簡(jiǎn)單,最終就是調(diào)用addWorker方法,把任務(wù)添加到worker集合中,再看一下addWorker方法的源碼:
// 添加worker private boolean addWorker(Runnable firstTask, boolean core) { retry: for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); // 1. 檢查是否允許提交任務(wù) if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; // 2. 使用死循環(huán)保證添加線(xiàn)程成功 for (; ; ) { int wc = workerCountOf(c); // 3. 校驗(yàn)線(xiàn)程數(shù)是否超過(guò)容量限制 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 4. 使用CAS修改線(xiàn)程數(shù) if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // 5. 如果線(xiàn)程池狀態(tài)變了,則從頭再來(lái) if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 6. 把任務(wù)和新線(xiàn)程包裝成一個(gè)worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 7. 加鎖,控制并發(fā) final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 8. 再次校驗(yàn)線(xiàn)程池狀態(tài)是否異常 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 9. 如果線(xiàn)程已經(jīng)啟動(dòng),就拋出異常 if (t.isAlive()) throw new IllegalThreadStateException(); // 10. 添加到worker集合中 workers.add(w); int s = workers.size(); // 11. 記錄線(xiàn)程數(shù)歷史峰值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 12. 啟動(dòng)線(xiàn)程 t.start(); workerStarted = true; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; }
方法雖然很長(zhǎng),但是邏輯很清晰。就是把任務(wù)和線(xiàn)程包裝成worker,添加到worker集合,并啟動(dòng)線(xiàn)程。
5.4 worker源碼
再看一下worker類(lèi)的結(jié)構(gòu):
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 工作線(xiàn)程 final Thread thread; // 任務(wù) Runnable firstTask; // 創(chuàng)建worker,并創(chuàng)建一個(gè)新線(xiàn)程(用來(lái)執(zhí)行任務(wù)) Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } }
5.5 runWorker源碼
再看一下run方法的源碼:
// 線(xiàn)程執(zhí)行入口 public void run() { runWorker(this); } // 線(xiàn)程運(yùn)行核心方法 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { // 1. 如果當(dāng)前worker中任務(wù)是null,就從阻塞隊(duì)列中獲取任務(wù) while (task != null || (task = getTask()) != null) { // 加鎖,保證thread不被其他線(xiàn)程中斷(除非線(xiàn)程池被中斷) w.lock(); // 2. 校驗(yàn)線(xiàn)程池狀態(tài),是否需要中斷當(dāng)前線(xiàn)程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { // 3. 執(zhí)行run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; // 解鎖 w.unlock(); } } completedAbruptly = false; } finally { // 4. 從worker集合刪除當(dāng)前worker processWorkerExit(w, completedAbruptly); } }
runWorker方法邏輯也很簡(jiǎn)單,就是不斷從阻塞隊(duì)列中拉取任務(wù)并執(zhí)行。
再看一下從阻塞隊(duì)列中拉取任務(wù)的邏輯:
// 從阻塞隊(duì)列中拉取任務(wù) private Runnable getTask() { boolean timedOut = false; for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); // 1. 如果線(xiàn)程池已經(jīng)停了,或者阻塞隊(duì)列是空,就回收當(dāng)前線(xiàn)程 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 2. 再次判斷是否需要回收線(xiàn)程 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 3. 從阻塞隊(duì)列中拉取任務(wù) Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
以上就是一篇文章帶你搞懂Java線(xiàn)程池實(shí)現(xiàn)原理的詳細(xì)內(nèi)容,更多關(guān)于Java線(xiàn)程池的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Java實(shí)現(xiàn)手寫(xiě)乞丐版線(xiàn)程池的示例代碼
- Java實(shí)現(xiàn)手寫(xiě)一個(gè)線(xiàn)程池的示例代碼
- 一文了解Java?線(xiàn)程池的正確使用姿勢(shì)
- Java手寫(xiě)線(xiàn)程池之向JDK線(xiàn)程池進(jìn)發(fā)
- Java線(xiàn)程池源碼的深度解析
- 詳解Java線(xiàn)程池如何統(tǒng)計(jì)線(xiàn)程空閑時(shí)間
- Java中的異步與線(xiàn)程池解讀
- 詳解Java線(xiàn)程池隊(duì)列中的延遲隊(duì)列DelayQueue
- 一文帶你弄懂Java中線(xiàn)程池的原理
- java 線(xiàn)程池的實(shí)現(xiàn)原理、優(yōu)點(diǎn)與風(fēng)險(xiǎn)、以及4種線(xiàn)程池實(shí)現(xiàn)
相關(guān)文章
Spring Bean的初始化和銷(xiāo)毀實(shí)例詳解
這篇文章主要介紹了Spring Bean的初始化和銷(xiāo)毀,結(jié)合實(shí)例形式詳細(xì)分析了Spring Bean的初始化和銷(xiāo)毀相關(guān)配置、使用方法及操作注意事項(xiàng),需要的朋友可以參考下2019-11-11java對(duì)象與json對(duì)象間的相互轉(zhuǎn)換的方法
本篇文章主要介紹了java對(duì)象與json對(duì)象間的相互轉(zhuǎn)換的方法,詳細(xì)介紹了json字符串和java對(duì)象相互轉(zhuǎn)換,有興趣的可以了解一下2017-01-01spring中定時(shí)任務(wù)taskScheduler的詳細(xì)介紹
這篇文章主要介紹了spring中定時(shí)任務(wù)taskScheduler的相關(guān)資料,文中通過(guò)示例代碼介紹的很詳細(xì),相信對(duì)大家具有一定的參考價(jià)值,有需要的朋友們下面來(lái)一起看看吧。2017-02-02實(shí)現(xiàn)java文章點(diǎn)擊量記錄實(shí)例
這篇文章主要為大家介紹了實(shí)現(xiàn)java文章點(diǎn)擊量記錄實(shí)例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10