Java線程池之線程復(fù)用原理全面解析
什么是線程復(fù)用
在Java中,我們正常創(chuàng)建線程執(zhí)行任務(wù),一般都是一條線程綁定一個Runnable執(zhí)行任務(wù)。
而Runnable實際只是一個普通接口,真正要執(zhí)行,則還是利用了Thread類的run方法。
這個rurn方法由native本地方法start0進行調(diào)用。
我們看Thread類的run方法實現(xiàn)
/* What will be run. */
private Runnable target;
/**
* If this thread was constructed using a separate
* <code>Runnable</code> run object, then that
* <code>Runnable</code> object's <code>run</code> method is called;
* otherwise, this method does nothing and returns.
* <p>
* Subclasses of <code>Thread</code> should override this method.
*
* @see #start()
* @see #stop()
* @see #Thread(ThreadGroup, Runnable, String)
*/
@Override
public void run() {
if (target != null) {
target.run();
}
}很明顯,Thread類的run方法就是使用構(gòu)造Thread類傳入來的Runnable對象,執(zhí)行Runnable的run方法。
這樣可以很好的將任務(wù)和Thread類解耦,如果繼承Thread類再去重寫run方法當(dāng)然也是可以,但卻耦合了,并且Java是單繼承,所以繼承Thread類這種方式通常不會使用,沒有任何好處。
現(xiàn)在問題是,一個線程只能執(zhí)行一個Runnable對象,那么這條線程它就是不能復(fù)用的,完成任務(wù)它就該Terminated了。
如果系統(tǒng)任務(wù)很多,頻繁創(chuàng)建線程帶來的開銷大,線程數(shù)量不可控導(dǎo)致系統(tǒng)處于一種不安全的狀況,系統(tǒng)隨時可能被大量線程搞跨,于是線程池就出現(xiàn)了。
線程池要解決的問題就是用少量線程處理更多的任務(wù),這樣一來,線程池首先要實現(xiàn)的就是線程復(fù)用。
不能說還是一條線程只處理一個Runnable任務(wù),而是一條線程處理無數(shù)Runnable任務(wù)。
最容易想到的方案就是將Runnable對象放到隊列中,在Thread類的run方法中不斷從隊列中拉取任務(wù)執(zhí)行,這樣一來就實現(xiàn)了線程復(fù)用。
當(dāng)然,實際線程池也差不多是這么干的,下面我們詳細看一下線程池實現(xiàn)線程復(fù)用的原理。
線程池處理任務(wù)的過程
在線程池原理解析中有詳述線程池創(chuàng)建線程及處理任務(wù)的過程。
這里再次簡單看一下流程圖以方便理解下面的線程復(fù)用原理解析。

線程復(fù)用原理解析
線程處理任務(wù)過程源碼解析
首先我們看看線程池是怎么使用的
import cn.hutool.core.thread.ThreadFactoryBuilder;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author kangming.ning
* @date 2023-02-24 16:27
* @since 1.0
**/
public class CustomThreadPool1 {
private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNamePrefix("線程池-").build();
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10),
threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
public static void main(String[] args) throws InterruptedException {
Runnable r = () -> {
System.out.println(Thread.currentThread().getName() + " is running");
};
for (int i = 0; i < 35; i++) {
Thread.sleep(1000);
threadPoolExecutor.submit(r);
}
}
}可見,threadPoolExecutor的sumit方法就是用來提交任務(wù)的,于是,從這個方法開始分析源碼,把源碼的關(guān)注點放在線程復(fù)用部分。
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}第一句只是用來包裝一下有返回值的任務(wù),不必關(guān)注,重點看execute(ftask)這句。
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}代碼量不多,信息量極大。
注釋的內(nèi)容就是在解釋線程池執(zhí)行任務(wù)的處理過程,這個看上面的流程圖即可。
任務(wù)如果為空直接拋空指針異常。
下面看第一個if語句
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}如果worker數(shù)量少于核心線程數(shù),則通過addWorker(command, true)方法添加一個worker。這里要注意,線程池把每一條線程都封裝成了Worker的實例。
addWorker方法的作用是在線程池中創(chuàng)建一個線程并執(zhí)行第一個參數(shù)傳入的任務(wù),它的第二個參數(shù)是個boolean值,如果傳入 true 則代表增加線程時判斷當(dāng)前線程是否少于 corePoolSize,小于則增加新線程,大于等于則不增加;如果傳入false則使用maximumPoolSize來判斷是否增加新線程。
接下來看下面第二個if的代碼
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}執(zhí)行到這里說明核心線程數(shù)已滿或者說addWorker失敗了。此時先檢查線程池是否為運行狀態(tài),是的話直接把任務(wù)放隊列,這跟上面的流程圖是一致的,核心線程數(shù)滿則放隊列。
當(dāng)然當(dāng)任務(wù)提交成功后還是會重新檢查線程池的狀態(tài),如果線程池沒在跑則會移除任務(wù)并且執(zhí)行拒絕策略。
再看里面的else if分支
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);進入else if分支說明線程池是在運行的,這里是檢查一下是否有線程可供使用,雖說上面已經(jīng)檢查過目前的線程數(shù)已大于核心線程數(shù),但不排除核心線程數(shù)設(shè)置為0 這種情況,這樣一來,任務(wù)添加后缺沒線程去執(zhí)行,這種情況是不允許的。
再往下看最后一句else if代碼
else if (!addWorker(command, false))
reject(command);能執(zhí)行到這里,說明要么是線程池不在運行中,要么就是核心線程和隊列都滿了,此時需要開啟線程池的后備力量,嘗試添加非核心線程直到線程數(shù)達到最大線程數(shù)限制,注意到addWorker方法第二個參數(shù)傳了false,正是添加線程時使用最大線程數(shù)限制來判斷是否添加新線程。
假設(shè)添加失敗意味著最大線程數(shù)也達到了最大值并且沒空閑線程去執(zhí)行當(dāng)前任務(wù),此時執(zhí)行reject拒絕策略。
線程復(fù)用源碼解析
通過上面的解析我們可以看到,添加線程以執(zhí)行任務(wù)的核心方法是addWorker,大概看一下Worker的代碼
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}可見,Worker對Thread進行了封裝,它本身也是一個Runnable對象,內(nèi)部的Thread對象則是真正用來執(zhí)行任務(wù)的線程對象。
因此添加Worker實則就是在線程池中添加運行任務(wù)的線程,可以看出在Worker的構(gòu)造函數(shù)中新建了一條線程并且把引用賦值給了thread對象。
而在上面的addWorker方法中start了這條線程,而這條線程的Runnable對象正是Worker對象自身。
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}既然addWorker方法執(zhí)行了線程的start方法,因此Worker類里面的run方法將被系統(tǒng)調(diào)度
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}里面只有一個runWorker方法,并且把Worker對象傳了進去,明顯,runWorker是實現(xiàn)線程復(fù)用的方法 。
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}代碼不多,注釋很多,核心意思就是這是一個死循環(huán),不斷從隊列獲取任務(wù)進行執(zhí)行。
通過上面代碼可以清晰的看出,一開始將firstTask賦值給task Runnable對象,然后下面有個while死循環(huán),不斷的從隊列獲取task進行執(zhí)行,里面的核心邏輯就是task.run(),Runnable對象的run方法由這條Worker線程像調(diào)用普通方法一樣的調(diào)用,這個就是線程復(fù)用的原理。
將Runnable對象放隊列,然后在一個主循環(huán)里面不斷從隊列里獲取任務(wù)進行執(zhí)行。
最后看一下getTask方法
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}可見,里面就是從隊列里面獲取一個Runnable對象進行返回而已。
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
springboot controller 增加指定前綴的兩種實現(xiàn)方法
這篇文章主要介紹了springboot controller 增加指定前綴的兩種實現(xiàn)方法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02
Springboot中的Validation參數(shù)校驗詳解
這篇文章主要介紹了Springboot中的Validation參數(shù)校驗詳解,Springboot參數(shù)校驗是一種常用的驗證機制,在傳遞參數(shù)時進行校驗,以確保參數(shù)的有效性和正確性,該機制可以幫助開發(fā)者在代碼實現(xiàn)前就避免一些常見的錯誤,需要的朋友可以參考下2023-10-10
分布式開發(fā)醫(yī)療掛號系統(tǒng)數(shù)據(jù)字典模塊前后端實現(xiàn)
這篇文章主要為大家介紹了分布式開發(fā)醫(yī)療掛號系統(tǒng)數(shù)據(jù)字典模塊前后端實現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-04-04
淺談SpringMVC之視圖解析器(ViewResolver)
本篇文章主要介紹了淺談SpringMVC之視圖解析器(ViewResolver),具有一定的參考價值,有興趣的可以了解一下2017-08-08
使用SpringBoot創(chuàng)建一個RESTful API的詳細步驟
使用 Java 的 Spring Boot 創(chuàng)建 RESTful API 可以滿足多種開發(fā)場景,它提供了快速開發(fā)、易于配置、可擴展、可維護的優(yōu)點,尤其適合現(xiàn)代軟件開發(fā)的需求,幫助你快速構(gòu)建出高性能的后端服務(wù),需要的朋友可以參考下2025-01-01
Java中synchronized關(guān)鍵字修飾方法同步的用法詳解
synchronized可以用來同步靜態(tài)和非靜態(tài)方法,下面就具體來看一下Java中synchronized關(guān)鍵字修飾方法同步的用法詳解:2016-06-06

