Java實(shí)現(xiàn)手寫(xiě)一個(gè)線(xiàn)程池的示例代碼
概述
線(xiàn)程池技術(shù)想必大家都不陌生把,相信在平時(shí)的工作中沒(méi)有少用,而且這也是面試頻率非常高的一個(gè)知識(shí)點(diǎn),那么大家知道它的實(shí)現(xiàn)原理和細(xì)節(jié)嗎?如果直接去看jdk源碼的話(huà),可能有一定的難度,那么我們可以先通過(guò)手寫(xiě)一個(gè)簡(jiǎn)單的線(xiàn)程池框架,去掌握線(xiàn)程池的基本原理后,再去看jdk的線(xiàn)程池源碼就會(huì)相對(duì)容易,而且不容易忘記。
線(xiàn)程池框架設(shè)計(jì)
我們都知道,線(xiàn)程資源的創(chuàng)建和銷(xiāo)毀并不是沒(méi)有代價(jià)的,甚至開(kāi)銷(xiāo)是非常高的。同時(shí),線(xiàn)程也不是任意多創(chuàng)建的,因?yàn)榛钴S的線(xiàn)程會(huì)消耗系統(tǒng)資源,特別是內(nèi)存,在一定的范圍內(nèi),增加線(xiàn)程可以提高系統(tǒng)的吞吐率,如果超過(guò)了這個(gè)范圍,反而會(huì)降低程序的執(zhí)行速度。
因此,設(shè)計(jì)一個(gè)容納多個(gè)線(xiàn)程的容器,容器中的線(xiàn)程可以重復(fù)使用,省去了頻繁創(chuàng)建和銷(xiāo)毀線(xiàn)程對(duì)象的操作, 達(dá)到下面的目標(biāo):
- 降低資源消耗,減少了創(chuàng)建和銷(xiāo)毀線(xiàn)程的次數(shù),每個(gè)工作線(xiàn)程都可以被重復(fù)利用,可執(zhí)行多個(gè)任務(wù)
- 提高響應(yīng)速度,當(dāng)任務(wù)到達(dá)時(shí),如果有線(xiàn)程可以直接用,不會(huì)出現(xiàn)系統(tǒng)僵死
- 提高線(xiàn)程的可管理性,如果無(wú)限制的創(chuàng)建線(xiàn)程,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線(xiàn)程池可以進(jìn)行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控
線(xiàn)程池的核心思想: 線(xiàn)程復(fù)用,同一個(gè)線(xiàn)程可以被重復(fù)使用,來(lái)處理多個(gè)任務(wù)。
為了實(shí)現(xiàn)線(xiàn)程池功能,需要考慮下面幾個(gè)設(shè)計(jì)要點(diǎn):
- 線(xiàn)程池可以接口外部提交的任務(wù)執(zhí)行
- 線(xiàn)程池有工作線(xiàn)程的數(shù)量,有任務(wù)執(zhí)行,沒(méi)有任務(wù)也空閑在那,等待任務(wù)過(guò)來(lái),這樣既避免線(xiàn)程頻繁創(chuàng)建銷(xiāo)毀帶來(lái)的開(kāi)銷(xiāo),同時(shí)也可以避免線(xiàn)程池?zé)o限制的創(chuàng)建線(xiàn)程
- 如果線(xiàn)程池接受提交的任務(wù)超過(guò)工作線(xiàn)程的數(shù)量了,該怎么辦?可以用一個(gè)隊(duì)列把任務(wù)存下來(lái),等工作線(xiàn)程完成任務(wù)后去隊(duì)列中獲取任務(wù),執(zhí)行
- 那如果任務(wù)實(shí)在是太多太多了,達(dá)到了我們認(rèn)為的隊(duì)列最大值,怎么辦,我們可以設(shè)計(jì)一種任務(wù)太多的策略,可以進(jìn)行切換,比如直接丟棄任務(wù)、報(bào)錯(cuò)等等
看了上面的設(shè)計(jì)目標(biāo)和要點(diǎn),是不是能立刻想到一個(gè)非常經(jīng)典的設(shè)計(jì)模型——生產(chǎn)者消費(fèi)者模型。

- 阻塞隊(duì)列存儲(chǔ)執(zhí)行任務(wù),比如外部main函數(shù)作為生產(chǎn)者向隊(duì)列生產(chǎn)任務(wù)。
- 線(xiàn)程池中的工作線(xiàn)程作為消費(fèi)者獲取任務(wù)執(zhí)行。
現(xiàn)在我們將我們的設(shè)計(jì)思路轉(zhuǎn)換為代碼。
代碼實(shí)現(xiàn)
阻塞隊(duì)列的實(shí)現(xiàn)
- 阻塞隊(duì)列主要存放任務(wù),有容量限制
- 阻塞隊(duì)列提供添加和刪除任務(wù)的API, 如果超過(guò)容量,阻塞不能添加任務(wù),如果沒(méi)有任務(wù),阻塞無(wú)法獲取任務(wù)。
/**
* <p>自定義任務(wù)隊(duì)列, 用來(lái)存放任務(wù) </p>
*
* @author: cxw (332059317@qq.com)
* @date: 2022/10/18 10:15
* @version: 1.0.0
*/
@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
// 容量
private int capcity;
// 雙端任務(wù)隊(duì)列容器
private Deque<T> deque = new ArrayDeque<>();
// 重入鎖
private ReentrantLock lock = new ReentrantLock();
// 生產(chǎn)者條件變量
private Condition fullWaitSet = lock.newCondition();
// 生產(chǎn)者條件變量
private Condition emptyWaitSet = lock.newCondition();
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
// 阻塞的方式添加任務(wù)
public void put(T task) {
lock.lock();
try {
// 通過(guò)while的方式
while (deque.size() >= capcity) {
log.debug("wait to add queue");
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
deque.offer(task);
log.debug("task add successfully");
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
// 阻塞獲取任務(wù)
public T take() {
lock.lock();
try {
// 通過(guò)while的方式
while (deque.isEmpty()) {
try {
log.debug("wait to take task");
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
fullWaitSet.signal();
T task = deque.poll();
log.debug("take task successfully");
// 從隊(duì)列中獲取元素
return task;
} finally {
lock.unlock();
}
}
}- put()方法是向阻塞隊(duì)列中添加任務(wù)
- take()方法是向阻塞隊(duì)列中獲取任務(wù)
線(xiàn)程池消費(fèi)端實(shí)現(xiàn)
1.定義執(zhí)行器接口
/**
* <p>定義一個(gè)執(zhí)行器的接口:</p>
*
* @author: cxw (332059317@qq.com)
* @date: 2022/10/18 12:31
* @version: 1.0.0
*/
public interface Executor {
/**
* 提交任務(wù)執(zhí)行
* @param task 任務(wù)
*/
void execute(Runnable task);
}2.定義線(xiàn)程池類(lèi)實(shí)現(xiàn)該接口
@Slf4j(topic = "c.ThreadPool")
public class ThreadPool implements Executor {
/**
* 任務(wù)隊(duì)列
*/
private BlockingQueue<Runnable> taskQueue;
/**
* 核心工作線(xiàn)程數(shù)
*/
private int coreSize;
/**
* 工作線(xiàn)程集合
*/
private Set<Worker> workers = new HashSet<>();
/**
* 創(chuàng)建線(xiàn)程池
* @param coreSize 工作線(xiàn)程數(shù)量
* @param capcity 阻塞隊(duì)列容量
*/
public ThreadPool(int coreSize, int capcity) {
this.coreSize = coreSize;
this.taskQueue = new BlockingQueue<>(capcity);
}
/**
* 提交任務(wù)執(zhí)行
*/
@Override
public void execute(Runnable task) {
synchronized (workers) {
// 如果工作線(xiàn)程數(shù)小于閾值,直接開(kāi)始任務(wù)執(zhí)行
if(workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
} else {
// 如果超過(guò)了閾值,加入到隊(duì)列中
taskQueue.put(task);
}
}
}
/**
* 工作線(xiàn)程,對(duì)執(zhí)行的任務(wù)做了一層包裝處理
*/
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 如果任務(wù)不為空,或者可以從隊(duì)列中獲取任務(wù)
while (task != null || (task = taskQueue.take()) != null) {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 執(zhí)行完后,設(shè)置任務(wù)為空
task = null;
}
}
// 移除工作線(xiàn)程
synchronized (workers){
log.debug("remove worker successfully");
workers.remove(this);
}
}
}
}- Worker類(lèi)是工作線(xiàn)程類(lèi),包裝了執(zhí)行任務(wù),里面實(shí)現(xiàn)了從隊(duì)列獲取任務(wù),然后執(zhí)行任務(wù)。
- execute方法的實(shí)現(xiàn)中,如果工作線(xiàn)程數(shù)量小于閾值的話(huà),直接創(chuàng)建新的工作線(xiàn)程,否則將任務(wù)添加到隊(duì)列中。
3.演示
@Test
public void testThreadPool1() throws InterruptedException {
Executor executor = new ThreadPool(2, 4);
// 提交任務(wù)
for (int i = 0; i < 6; i++) {
final int j = i;
executor.execute(() -> {
try {
Thread.sleep(10);
log.info("run task {}", j);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread.sleep(10);
}
Thread.sleep(10000);
}運(yùn)行結(jié)果:

獲取任務(wù)超時(shí)設(shè)計(jì)
目前從隊(duì)列中獲取任務(wù)是永久阻塞等待的,可以改成阻塞一段時(shí)間沒(méi)有獲取任務(wù),丟棄的策略。
@Slf4j(topic = "c.TimeoutBlockingQueue")
public class TimeoutBlockingQueue<T> {
// 容量
private int capcity;
// 雙端任務(wù)隊(duì)列容器
private Deque<T> deque = new ArrayDeque<>();
// 重入鎖
private ReentrantLock lock = new ReentrantLock();
// 生產(chǎn)者條件變量
private Condition fullWaitSet = lock.newCondition();
// 生產(chǎn)者條件變量
private Condition emptyWaitSet = lock.newCondition();
public TimeoutBlockingQueue(int capcity) {
this.capcity = capcity;
}
// 帶超時(shí)時(shí)間的獲取
public T poll(long timeout, TimeUnit unit){
lock.lock();
try{
// 將 timeout 統(tǒng)一轉(zhuǎn)換為 納秒
long nanos = unit.toNanos(timeout);
while (deque.isEmpty()){
try {
if (nanos<=0){
return null;
}
// 返回的是剩余的等待時(shí)間,更改navos的值,使虛假喚醒的時(shí)候可以繼續(xù)等待
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
fullWaitSet.signal();
return deque.getFirst();
}finally {
lock.unlock();
}
}
// 帶超時(shí)時(shí)間的增加
public boolean offer(T task , long timeout , TimeUnit unit){
lock.lock();
try{
// 將 timeout 統(tǒng)一轉(zhuǎn)換為 納秒
long nanos = unit.toNanos(timeout);
while (deque.size() == capcity){
try {
if (nanos<=0){
return false;
}
// 更新剩余需要等待的時(shí)間
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任務(wù)隊(duì)列 {}", task);
deque.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
}新加TimeoutBlockingQueue類(lèi),添加offer和poll待超時(shí)的添加和獲取任務(wù)的方法。
拒絕策略設(shè)計(jì)
目前的實(shí)現(xiàn)還是有個(gè)漏洞,無(wú)法自定義任務(wù)超出閾值的一個(gè)拒絕策略,我們可以通過(guò)利用函數(shù)式編程+策略模式去實(shí)現(xiàn)。
1.定義策略模式的函數(shù)式接口
/**
* <p>拒絕策略的函數(shù)式接口:</p>
*
* @author: cxw (332059317@qq.com)
* @date: 2022/10/18 13:15
* @version: 1.0.0
*/
@FunctionalInterface
public interface RejectPolicy<T> {
/**
* 拒絕策略的接口
* @param queue
* @param task
*/
void reject(BlockingQueue<T> queue, T task);
}2.添加函數(shù)式接口的調(diào)用入口
我們可以在阻塞隊(duì)列添加任務(wù)新加一個(gè)api, 添加任務(wù)如果超過(guò)容量,調(diào)用函數(shù)式接口。
@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
........
/**
* 嘗試添加任務(wù)
* @param rejectPolicy
* @param task
*/
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try{
// 如果隊(duì)列超過(guò)容量
if (deque.size()> capcity){
log.debug("task too much, do reject");
rejectPolicy.reject(this, task);
}else {
deque.offer(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}3.修改ThreadPool類(lèi)
@Slf4j(topic = "c.ThreadPool")
public class ThreadPool implements Executor {
.....
/**
* 拒絕策略
*/
private RejectPolicy rejectPolicy;
// 通過(guò)構(gòu)造方法傳入執(zhí)行的拒絕策略
public ThreadPool(int coreSize, int capcity, RejectPolicy rejectPolicy) {
this.coreSize = coreSize;
this.taskQueue = new BlockingQueue<>(capcity);
this.rejectPolicy = rejectPolicy;
}
/**
* 提交任務(wù)執(zhí)行
*/
@Override
public void execute(Runnable task) {
synchronized (workers) {
// 如果工作線(xiàn)程數(shù)小于閾值,直接開(kāi)始任務(wù)執(zhí)行
if(workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
} else {
// 如果超過(guò)了閾值,加入到隊(duì)列中
//taskQueue.put(task);
// 調(diào)用tryPut的方式
taskQueue.tryPut(rejectPolicy, task);
}
}
}
....
}通過(guò)構(gòu)造方法的方式傳入要執(zhí)行的拒絕策略
調(diào)用tryPut方法添加任務(wù)
4.演示

以上就是Java實(shí)現(xiàn)手寫(xiě)一個(gè)線(xiàn)程池的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于Java線(xiàn)程池的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Java實(shí)現(xiàn)手寫(xiě)乞丐版線(xiàn)程池的示例代碼
- 一文了解Java?線(xiàn)程池的正確使用姿勢(shì)
- Java手寫(xiě)線(xiàn)程池之向JDK線(xiàn)程池進(jìn)發(fā)
- Java線(xiàn)程池源碼的深度解析
- 一篇文章帶你搞懂Java線(xiàn)程池實(shí)現(xiàn)原理
- 詳解Java線(xiàn)程池如何統(tǒng)計(jì)線(xiàn)程空閑時(shí)間
- Java中的異步與線(xiàn)程池解讀
- 詳解Java線(xiàn)程池隊(duì)列中的延遲隊(duì)列DelayQueue
- 一文帶你弄懂Java中線(xiàn)程池的原理
- java 線(xiàn)程池的實(shí)現(xiàn)原理、優(yōu)點(diǎn)與風(fēng)險(xiǎn)、以及4種線(xiàn)程池實(shí)現(xiàn)
相關(guān)文章
淺談一下Java中的悲觀(guān)鎖和樂(lè)觀(guān)鎖
這篇文章主要介紹了一下Java中的悲觀(guān)鎖和樂(lè)觀(guān)鎖,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04
Springboot自定義mybatis攔截器實(shí)現(xiàn)擴(kuò)展
本文主要介紹了Springboot自定義mybatis攔截器實(shí)現(xiàn)擴(kuò)展,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12
Java中String類(lèi)常用方法總結(jié)詳解
String類(lèi)是一個(gè)很常用的類(lèi),是Java語(yǔ)言的核心類(lèi),用來(lái)保存代碼中的字符串常量的,并且封裝了很多操作字符串的方法。本文為大家總結(jié)了一些String類(lèi)常用方法的使用,感興趣的可以了解一下2022-08-08
Java listener簡(jiǎn)介_(kāi)動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要介紹了Java listener簡(jiǎn)介,可以用于統(tǒng)計(jì)用戶(hù)在線(xiàn)人數(shù)等,有興趣的可以了解一下2017-07-07
SpringBoot中的五種對(duì)靜態(tài)資源的映射規(guī)則的實(shí)現(xiàn)
這篇文章主要介紹了SpringBoot中的五種對(duì)靜態(tài)資源的映射規(guī)則的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12
Java如何實(shí)現(xiàn)可折疊Panel方法示例
這篇文章主要給大家介紹了關(guān)于利用Java如何實(shí)現(xiàn)可折疊Panel的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2018-07-07
java文件操作練習(xí)代碼 讀取某個(gè)盤(pán)符下的文件
這篇文章主要介紹了java讀取某個(gè)盤(pán)符下的文件示例,代碼中要求的是絕對(duì)路徑,編譯過(guò)程中要注意絕對(duì)路徑問(wèn)題和異常的抓取2014-01-01
Java中?springcloud.openfeign應(yīng)用案例解析
使用OpenFeign能讓編寫(xiě)Web?Service客戶(hù)端更加簡(jiǎn)單,使用時(shí)只需定義服務(wù)接口,然后在上面添加注解,OpenFeign也支持可拔插式的編碼和解碼器,這篇文章主要介紹了Java中?springcloud.openfeign應(yīng)用案例解析,需要的朋友可以參考下2024-06-06

