Java中線程池自定義實(shí)現(xiàn)詳解
前言
最初使用線程池的時候,網(wǎng)上的文章告訴我說線程池可以線程復(fù)用,提高線程的創(chuàng)建效率。從此我的腦海中便為線程池打上了一個標(biāo)簽——線程池可以做到線程的復(fù)用。但是我總以為線程的復(fù)用是指在創(chuàng)建出來的線程可以多次的更換run()方法的內(nèi)容,來達(dá)到線程復(fù)用的目的,于是我嘗試了一下.同一個線程調(diào)用多次,然后使run的內(nèi)容不一樣,但是我發(fā)現(xiàn)我錯了,一個線程第一次運(yùn)行是沒問題的,當(dāng)再次調(diào)用start方法是會拋出異常(java.lang.IllegalThreadStateException)。
線程為什么不能多次調(diào)用start方法
從源碼可以得知,調(diào)用start方法時,程序還會判斷當(dāng)前的線程狀態(tài)

這里又引申出另一個問題,線程到底有幾種狀態(tài)
年輕的時候背八股文時,只是說五種狀態(tài),這五種狀態(tài)也不知道是哪里來的,不知道有沒有人和我一樣,當(dāng)初只是知其然不知其所以然。貼出源碼來:
public enum State {
/**
* Thread state for a thread which has not yet started.
*/
NEW, // 新建
/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
RUNNABLE, // 運(yùn)行中
/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
BLOCKED, // 阻塞
/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
*/
WAITING, // 等待
/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
TIMED_WAITING, // 定時等待
/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
TERMINATED; // 結(jié)束狀態(tài)
}綜上,其實(shí)線程的狀態(tài)有六種:
- NEW 新建狀態(tài),一般通過Thread thread = new Thread(runable);此時的線程屬于新建狀態(tài)。
- RUNABLE 可運(yùn)行狀態(tài),當(dāng)調(diào)用start時,線程進(jìn)入RUNNABLE狀態(tài),該狀態(tài)其實(shí)還包含兩個狀態(tài),一種是被cpu選中正在運(yùn)行中,另一種是未被cpu選中,處于就緒狀態(tài)。
- BLOCKED 阻塞狀態(tài), 一般可以通過調(diào)用sleep()方法來進(jìn)入阻塞狀態(tài),此時線程沒有釋放鎖資源,sleep到期時,繼續(xù)進(jìn)入Runable狀態(tài)
- WAITING 等待狀態(tài), 一般可以通過調(diào)用wait()方法來進(jìn)入等待狀態(tài),此時釋放cpu,cpu去干其他事情,需要調(diào)用noitfy方法喚醒,喚醒后的線程為RUNABLE狀態(tài)。
- TIMED_WAIRING 定時等待, 一般可以通過wait(long)方法進(jìn)入定時等待?;旧贤琖AITING.
- TERMINATED 結(jié)束狀態(tài),RUNCABLE運(yùn)行正常結(jié)束的線程的狀態(tài)就是TERMINATED
可以看出八股文不能亂背,之前傻呵呵背的八股文很有可能是錯誤的,比如線程的運(yùn)行中狀態(tài)(RUNNING),其實(shí)這個狀態(tài)根本不存在,RUNABLE狀態(tài)就已經(jīng)包含了RUNNNING狀態(tài)了。
再回到標(biāo)題的問題,為什么不能多次調(diào)用start方法,原因其實(shí)源碼的注釋上已經(jīng)說明了,
/** * This method is not invoked for the main method thread or "system" * group threads created/set up by the VM. Any new functionality added * to this method in the future may have to also be added to the VM. * * A zero status value corresponds to state "NEW". */
0狀態(tài)對應(yīng)的是NEW,也就是說只有新建狀態(tài)的線程才能調(diào)用start方法,其他狀態(tài)的線程調(diào)用就會拋出異常,而一般第二次調(diào)用時,線程狀態(tài)肯定不是new狀態(tài)了。因此不可以多次調(diào)用。
線程池到底是如何復(fù)用的
經(jīng)過多次的反復(fù)調(diào)試,原理其實(shí)很簡單,比如以下代碼:
public void testThreadPool() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 3, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue(3));
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
for (int i=0; i<5; i++) {
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
ThreadUtils.doSleep(10000L);
System.out.println(Thread.currentThread().getName() + "--運(yùn)行");
}
});
}
threadPoolExecutor.shutdown();
}
其中循環(huán)往threadPoolExecutor中添加的是自定義的業(yè)務(wù)任務(wù)。而真正去運(yùn)行任務(wù)的是線程池中新建的一個線程。因此這里的復(fù)用指的是線程池創(chuàng)建出來得這個線程,這個線程并不會銷毀,而是循環(huán)去隊(duì)列中獲取任務(wù)。千萬不可理解為線程池復(fù)用的線程是使用者自定義的那個業(yè)務(wù)任務(wù)。具體的復(fù)用最核心的代碼就是下面這段:
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}這段代碼是runworker中的一段代碼,線程就是通過循環(huán)去獲取隊(duì)列中的任務(wù)來達(dá)到線程復(fù)用的,前臺創(chuàng)建多個runable對象,將任務(wù)放到runable中,然后將runable放到隊(duì)列中,線程池創(chuàng)建線程,線程持續(xù)循環(huán)獲取隊(duì)列中的任務(wù)。這就是線程池的實(shí)現(xiàn)邏輯。
下面嘗試自己去實(shí)現(xiàn)一個線程池:該線程只是為了模擬線程池的運(yùn)行,并未做線程安全的考慮,也未做非核心線程超時回收等功能。
package com.cz.lock.distributed.impl.redis;
import java.util.List;
import java.util.concurrent.*;
/**
* @program: Reids
* @description: 自定義線程池
* @author: Cheng Zhi
* @create: 2023-02-28 09:28
**/
public class JefThreadPoolExecutor extends AbstractExecutorService {
/**
* 使用隊(duì)列來保存現(xiàn)有的worker
*/
private final BlockingQueue<Worker> workers = new LinkedBlockingQueue<Worker>();
private static int coreThreadCount = 5;
private static int maxThreadCount = 10;
private static int defaultQueueSize = maxThreadCount * 5;
private static BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(defaultQueueSize);
/**
* 默認(rèn)線程池
*/
JefThreadPoolExecutor() {
this(coreThreadCount, maxThreadCount, blockingQueue);
}
/**
* 可以自定義的線程池
* @param coreThreadCount
* @param maxThreadCount
* @param blockingQueue
*/
JefThreadPoolExecutor(int coreThreadCount, int maxThreadCount, BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
this.coreThreadCount = coreThreadCount;
this.maxThreadCount = maxThreadCount;
}
@Override
public void shutdown() {
}
@Override
public List<Runnable> shutdownNow() {
return null;
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void execute(Runnable command) {
int currentWorkCount = workers.size(); // 當(dāng)前創(chuàng)建的線程總數(shù)
if (currentWorkCount < coreThreadCount) { // 如果當(dāng)前線程總數(shù)小于核心線程數(shù),則新建線程
Worker worker = new Worker(command);
final Thread thread = worker.thread;
thread.start();
addWorker(worker);
return;
}
if (!blockingQueue.offer(command) && currentWorkCount <= maxThreadCount) { // 隊(duì)列可以正常放入則返回true,如果滿了返回false
// 隊(duì)列如果滿了,需要創(chuàng)建新的線程
Worker worker = new Worker(command);
final Thread thread = worker.thread;
thread.start();
addWorker(worker);
return;
} else if (currentWorkCount > maxThreadCount){
System.out.println("線程池滿了....沒有多余的線程了");
}
}
public void addWorker(Worker worker) {
workers.add(worker);
}
public Runnable getTask() {
Runnable poll = blockingQueue.poll();
return poll;
}
public void runWorker(Worker worker) {
Runnable task = worker.firstTask; // 獲取到new Worker時傳入的那個任務(wù),并在下面運(yùn)行
if (task != null) {
task.run();
}
worker.firstTask = null;
// 循環(huán)從隊(duì)列中獲取任務(wù)處理
while((task = getTask()) != null) {
task.run();
}
}
/**
* 匿名內(nèi)部類
*/
private class Worker implements Runnable{
volatile int state = 0;
public Runnable firstTask;
final Thread thread;
public Worker(Runnable firstTask) {
this.firstTask = firstTask;
thread = new Thread(this);
}
@Override
public void run() {
runWorker(this);
}
}
}使用方式:
/**
* 使用默認(rèn)配置
*/
public static void singleThreadPoolExecutor() {
JefThreadPoolExecutor jefThreadPoolExecutor = new JefThreadPoolExecutor();
for (int i=0; i<10; i++) {
jefThreadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "--運(yùn)行");
}
});
}
}
/**
* 自定義配置
*/
public static void diyThreadPoolExecutor() {
JefThreadPoolExecutor jefThreadPoolExecutor = new JefThreadPoolExecutor(2, 10, new ArrayBlockingQueue(50));
for (int i=0; i<500; i++) {
jefThreadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "--運(yùn)行");
}
});
}
}以上就是Java中線程池自定義實(shí)現(xiàn)詳解的詳細(xì)內(nèi)容,更多關(guān)于Java線程池的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
解決@Test注解在Maven工程的Test.class類中無法使用的問題
這篇文章主要介紹了解決@Test注解在Maven工程的Test.class類中無法使用的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03
使用springboot不自動初始化數(shù)據(jù)庫連接池
這篇文章主要介紹了使用springboot不自動初始化數(shù)據(jù)庫連接池,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09
SpringBoot?替換?if?的參數(shù)校驗(yàn)示例代碼
Spring?Validation是對hibernate?validation的二次封裝,用于支持spring?mvc參數(shù)自動校驗(yàn),接下來,我們以spring-boot項(xiàng)目為例,介紹Spring?Validation的使用,需要的朋友可以參考下2022-12-12
Java使用Canal同步MySQL數(shù)據(jù)到Redis
在現(xiàn)代微服務(wù)架構(gòu)中,數(shù)據(jù)同步是一個常見的需求,特別是將?MySQL?數(shù)據(jù)實(shí)時同步到?Redis,下面我們就來看看Java如何使用Canal同步MySQL數(shù)據(jù)到Redis吧2024-11-11
idea中增強(qiáng)for循環(huán)提示unexpected token問題
這篇文章主要介紹了idea中增強(qiáng)for循環(huán)提示unexpected token問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01
Java中String.split()的最詳細(xì)源碼解讀及注意事項(xiàng)
以前經(jīng)常使用String.split()方法,但是從來沒有注意,下面這篇文章主要給大家介紹了關(guān)于Java中String.split()最詳細(xì)源碼解讀及注意事項(xiàng)的相關(guān)資料,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下2022-07-07
Java數(shù)據(jù)結(jié)構(gòu)篇之實(shí)現(xiàn)二叉搜索樹的核心方法
二叉搜索樹是一種常用的數(shù)據(jù)結(jié)構(gòu),它是一棵二叉樹,且每個節(jié)點(diǎn)的值都大于其左子樹中任何節(jié)點(diǎn)的值,而小于其右子樹中任何節(jié)點(diǎn)的值,這篇文章主要給大家介紹了關(guān)于Java數(shù)據(jù)結(jié)構(gòu)篇之實(shí)現(xiàn)二叉搜索樹的核心方法,需要的朋友可以參考下2023-12-12

