Java 自定義線程池和線程總數(shù)控制操作
1 概述
池化是常見的思想,線程池是非常典型的池化的實現(xiàn),《Java并發(fā)編程實戰(zhàn)》也大篇幅去講解了Java中的線程池。本文實現(xiàn)一個簡單的線程池。
2 核心類
【1】接口定義
public interface IThreadPool<Job extends Runnable> {
/**
* 關(guān)閉線程池
*/
public void shutAlldown();
/**
* 執(zhí)行任務(wù)
*
* @param job 任務(wù)
*/
public void execute(Job job);
/**
* 添加工作者
*
* @param addNum 添加數(shù)
*/
public void addWorkers(int addNum);
/**
* 減少工作者
*
* @param reduceNum 減少數(shù)目
*/
public void reduceWorkers(int reduceNum);
}
【2】實現(xiàn)類
線程池的核心是維護了1個任務(wù)列表和1個工作者列表。
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
public class XYThreadPool<Job extends Runnable> implements IThreadPool<Job> {
// 默認線程數(shù)
private static int DEAFAULT_SIZE = 5;
// 最大線程數(shù)
private static int MAX_SIZE = 10;
// 任務(wù)列表
private LinkedList<Job> tasks = new LinkedList<Job>();
// 工作線程列表
private List<Worker> workers = Collections
.synchronizedList(new ArrayList<Worker>());
/**
* 默認構(gòu)造函數(shù)
*/
public XYThreadPool() {
initWokers(DEAFAULT_SIZE);
}
/**
* 執(zhí)行線程數(shù)
*
* @param threadNums 線程數(shù)
*/
public XYThreadPool(int workerNum) {
workerNum = workerNum <= 0 ? DEAFAULT_SIZE
: workerNum > MAX_SIZE ? MAX_SIZE : workerNum;
initWokers(workerNum);
}
/**
* 初始化線程池
*
* @param threadNums 線程數(shù)
*/
public void initWokers(int threadNums) {
for (int i = 0; i < threadNums; i++) {
Worker worker = new Worker();
worker.start();
workers.add(worker);
}
// 添加關(guān)閉鉤子
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
shutAlldown();
}
});
}
@Override
public void shutAlldown() {
for (Worker worker : workers) {
worker.shutdown();
}
}
@Override
public void execute(Job job) {
synchronized (tasks) {
// 提交任務(wù)就是將任務(wù)對象加入任務(wù)隊列,等待工作線程去處理
tasks.addLast(job);
tasks.notifyAll();
}
}
@Override
public void addWorkers(int addNum) {
// 新線程數(shù)必須大于零,并且線程總數(shù)不能大于最大線程數(shù)
if ((workers.size() + addNum) <= MAX_SIZE && addNum > 0) {
initWokers(addNum);
} else {
System.out.println("addNum too large");
}
}
@Override
public void reduceWorkers(int reduceNum) {
if ((workers.size() - reduceNum <= 0))
System.out.println("thread num too small");
else {
// 暫停指定數(shù)量的工作者
int count = 0;
while (count != reduceNum) {
for (Worker w : workers) {
w.shutdown();
count++;
}
}
}
}
/**
* 工作線程
*/
class Worker extends Thread {
private volatile boolean flag = true;
@Override
public void run() {
while (flag) {
Job job = null;
// 加鎖(若只有一個woker可不必加鎖,那就是所謂的單線程的線程池,線程安全)
synchronized (tasks) {
// 任務(wù)隊列為空
while (tasks.isEmpty()) {
try {
// 阻塞,放棄對象鎖,等待被notify喚醒
tasks.wait();
System.out.println("block when tasks is empty");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 不為空取出任務(wù)
job = tasks.removeFirst();
System.out.println("get job:" + job + ",do biz");
job.run();
}
}
}
public void shutdown() {
flag = false;
}
}
}
(1) 當(dāng)調(diào)用wait()方法時線程會放棄對象鎖,進入等待此對象的等待鎖定池,只有針對此對象調(diào)用notify()方法后本線程才進入對象鎖定池準(zhǔn)備
(2) Object的方法:void notify(): 喚醒一個正在等待該對象的線程。void notifyAll(): 喚醒所有正在等待該對象的線程。
notifyAll使所有原來在該對象上等待被notify的線程統(tǒng)統(tǒng)退出wait狀態(tài),變成等待該對象上的鎖,一旦該對象被解鎖,它們會去競爭。
notify只是選擇一個wait狀態(tài)線程進行通知,并使它獲得該對象上的鎖,但不驚動其它同樣在等待被該對象notify的線程們,當(dāng)?shù)谝粋€線程運行完畢以后釋放對象上的鎖,此時如果該對象沒有再次使用notify語句,即便該對象已經(jīng)空閑,其他wait狀態(tài)等待的線程由于沒有得到該對象的通知,繼續(xù)處在wait狀態(tài),直到這個對象發(fā)出一個notify或notifyAll,它們等待的是被notify或notifyAll,而不是鎖。
3 無需控制線程總數(shù)
每調(diào)用一次就會創(chuàng)建一個擁有10個線程工作者的線程池。
public class TestService1 {
public static void main(String[] args) {
// 啟動10個線程
XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("====1 test====");
}
});
}
}
public class TestService2 {
public static void main(String[] args) {
// 啟動10個線程
XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("====2 test====");
}
});
}
}
4 控制線程總數(shù)
在項目中所有的線程調(diào)用,一般都共用1個固定工作者數(shù)大小的線程池。
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Component;
import com.xy.pool.XYThreadPool;
/**
* 統(tǒng)一線程池管理類
*/
@Component
public class XYThreadManager {
private XYThreadPool<Runnable> executorPool;
@PostConstruct
public void init() {
executorPool = new XYThreadPool<Runnable>(10);
}
public XYThreadPool<Runnable> getExecutorPool() {
return executorPool;
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service("testService3")
public class TestService3 {
@Autowired
private XYThreadManager threadManager;
public void test() {
threadManager.getExecutorPool().execute(new Runnable() {
@Override
public void run() {
System.out.println("====3 test====");
}
});
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service("testService4")
public class TestService4 {
@Autowired
private XYThreadManager threadManager;
public void test() {
threadManager.getExecutorPool().execute(new Runnable() {
@Override
public void run() {
System.out.println("====4 test====");
}
});
}
}
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class TestMain {
@SuppressWarnings("resource")
public static void main(String[] args) {
ApplicationContext atc = new ClassPathXmlApplicationContext("applicationContext.xml");
TestService3 t3 = (TestService3) atc.getBean("testService3");
t3.test();
TestService4 t4 = (TestService4) atc.getBean("testService4");
t4.test();
}
}
補充:論如何優(yōu)雅的自定義ThreadPoolExecutor線程池
前言
線程池想必大家也都用過,JDK的Executors 也自帶一些線程池。但是不知道大家有沒有想過,如何才是最優(yōu)雅的方式去使用過線程池嗎? 生產(chǎn)環(huán)境要怎么去配置自己的線程池才是合理的呢?
今天周末,剛好有時間來總結(jié)一下自己所認為的'優(yōu)雅', 如有問題歡迎大家指正。
線程池使用規(guī)則
要使用好線程池,那么一定要遵循幾個規(guī)則:
線程個數(shù)大小的設(shè)置
線程池相關(guān)參數(shù)配置
利用Hook嵌入你的行為
線程池的關(guān)閉
線程池配置相關(guān)
線程池大小的設(shè)置
這其實是一個面試的考點,很多面試官會問你線程池coreSize 的大小來考察你對于線程池的理解。
首先針對于這個問題,我們必須要明確我們的需求是計算密集型還是IO密集型,只有了解了這一點,我們才能更好的去設(shè)置線程池的數(shù)量進行限制。
1、計算密集型:
顧名思義就是應(yīng)用需要非常多的CPU計算資源,在多核CPU時代,我們要讓每一個CPU核心都參與計算,將CPU的性能充分利用起來,這樣才算是沒有浪費服務(wù)器配置,如果在非常好的服務(wù)器配置上還運行著單線程程序那將是多么重大的浪費。對于計算密集型的應(yīng)用,完全是靠CPU的核數(shù)來工作,所以為了讓它的優(yōu)勢完全發(fā)揮出來,避免過多的線程上下文切換,比較理想方案是:
線程數(shù) = CPU核數(shù)+1,也可以設(shè)置成CPU核數(shù)*2,但還要看JDK的版本以及CPU配置(服務(wù)器的CPU有超線程)。
一般設(shè)置CPU * 2即可。
2、IO密集型
我們現(xiàn)在做的開發(fā)大部分都是WEB應(yīng)用,涉及到大量的網(wǎng)絡(luò)傳輸,不僅如此,與數(shù)據(jù)庫,與緩存間的交互也涉及到IO,一旦發(fā)生IO,線程就會處于等待狀態(tài),當(dāng)IO結(jié)束,數(shù)據(jù)準(zhǔn)備好后,線程才會繼續(xù)執(zhí)行。因此從這里可以發(fā)現(xiàn),對于IO密集型的應(yīng)用,我們可以多設(shè)置一些線程池中線程的數(shù)量,這樣就能讓在等待IO的這段時間內(nèi),線程可以去做其它事,提高并發(fā)處理效率。那么這個線程池的數(shù)據(jù)量是不是可以隨便設(shè)置呢?當(dāng)然不是的,請一定要記得,線程上下文切換是有代價的。目前總結(jié)了一套公式,對于IO密集型應(yīng)用:
線程數(shù) = CPU核心數(shù)/(1-阻塞系數(shù)) 這個阻塞系數(shù)一般為0.8~0.9之間,也可以取0.8或者0.9。
套用公式,對于雙核CPU來說,它比較理想的線程數(shù)就是20,當(dāng)然這都不是絕對的,需要根據(jù)實際情況以及實際業(yè)務(wù)來調(diào)整:final int poolSize = (int)(cpuCore/(1-0.9))
針對于阻塞系數(shù),《Programming Concurrency on the JVM Mastering》即《Java 虛擬機并發(fā)編程》中有提到一句話:
對于阻塞系數(shù),我們可以先試著猜測,抑或采用一些細嫩分析工具或java.lang.management API 來確定線程花在系統(tǒng)/IO操作上的時間與CPU密集任務(wù)所耗的時間比值。
線程池相關(guān)參數(shù)配置
說到這一點,我們只需要謹記一點,一定不要選擇沒有上限限制的配置項。
這也是為什么不建議使用Executors 中創(chuàng)建線程的方法。
比如,Executors.newCachedThreadPool的設(shè)置與無界隊列的設(shè)置因為某些不可預(yù)期的情況,線程池會出現(xiàn)系統(tǒng)異常,導(dǎo)致線程暴增的情況或者任務(wù)隊列不斷膨脹,內(nèi)存耗盡導(dǎo)致系統(tǒng)崩潰和異常。 我們推薦使用自定義線程池來避免該問題,這也是在使用線程池規(guī)范的首要原則! 小心無大錯,千萬別過度自信!
可以看下Executors中四個創(chuàng)建線程池的方法:
//使用無界隊列
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//線程池數(shù)量是無限的
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
其他的就不再列舉了,大家可以自行查閱源碼。
第二,合理設(shè)置線程數(shù)量、和線程空閑回收時間,根據(jù)具體的任務(wù)執(zhí)行周期和時間去設(shè)定,避免頻繁的回收和創(chuàng)建,雖然我們使用線程池的目的是為了提升系統(tǒng)性能和吞吐量,但是也要考慮下系統(tǒng)的穩(wěn)定性,不然出現(xiàn)不可預(yù)期問題會很麻煩!
第三,根據(jù)實際場景,選擇適用于自己的拒絕策略。進行補償,不要亂用JDK支持的自動補償機制!盡量采用自定義的拒絕策略去進行兜底!
第四,線程池拒絕策略,自定義拒絕策略可以實現(xiàn)RejectedExecutionHandler接口。
JDK自帶的拒絕策略如下:
AbortPolicy:直接拋出異常阻止系統(tǒng)正常工作。
CallerRunsPolicy:只要線程池未關(guān)閉,該策略直接在調(diào)用者線程中,運行當(dāng)前被丟棄的任務(wù)。
DiscardOldestPolicy:丟棄最老的一個請求,嘗試再次提交當(dāng)前任務(wù)。
DiscardPolicy:丟棄無法處理的任務(wù),不給予任何處理。
利用Hook
利用Hook,留下線程池執(zhí)行軌跡:
ThreadPoolExecutor提供了protected類型可以被覆蓋的鉤子方法,允許用戶在任務(wù)執(zhí)行之前會執(zhí)行之后做一些事情。我們可以通過它來實現(xiàn)比如初始化ThreadLocal、收集統(tǒng)計信息、如記錄日志等操作。這類Hook如beforeExecute和afterExecute。另外還有一個Hook可以用來在任務(wù)被執(zhí)行完的時候讓用戶插入邏輯,如rerminated 。
如果hook方法執(zhí)行失敗,則內(nèi)部的工作線程的執(zhí)行將會失敗或被中斷。
我們可以使用beforeExecute和afterExecute來記錄線程之前前和后的一些運行情況,也可以直接把運行完成后的狀態(tài)記錄到ELK等日志系統(tǒng)。
關(guān)閉線程池
內(nèi)容當(dāng)線程池不在被引用并且工作線程數(shù)為0的時候,線程池將被終止。我們也可以調(diào)用shutdown來手動終止線程池。如果我們忘記調(diào)用shutdown,為了讓線程資源被釋放,我們還可以使用keepAliveTime和allowCoreThreadTimeOut來達到目的!
當(dāng)然,穩(wěn)妥的方式是使用虛擬機Runtime.getRuntime().addShutdownHook方法,手工去調(diào)用線程池的關(guān)閉方法!
線程池使用實例
線程池核心代碼:
public class AsyncProcessQueue {
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/**
* Task 包裝類<br>
* 此類型的意義是記錄可能會被 Executor 吃掉的異常<br>
*/
public static class TaskWrapper implements Runnable {
private static final Logger _LOGGER = LoggerFactory.getLogger(TaskWrapper.class);
private final Runnable gift;
public TaskWrapper(final Runnable target) {
this.gift = target;
}
@Override
public void run() {
// 捕獲異常,避免在 Executor 里面被吞掉了
if (gift != null) {
try {
gift.run();
} catch (Exception e) {
_LOGGER.error("Wrapped target execute exception.", e);
}
}
}
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/**
* 執(zhí)行指定的任務(wù)
*
* @param task
* @return
*/
public static boolean execute(final Runnable task) {
return AsyncProcessor.executeTask(new TaskWrapper(task));
}
}
public class AsyncProcessor {
static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class);
/**
* 默認最大并發(fā)數(shù)<br>
*/
private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;
/**
* 線程池名稱格式
*/
private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-%d";
/**
* 線程工廠名稱
*/
private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder().namingPattern(THREAD_POOL_NAME)
.daemon(true).build();
/**
* 默認隊列大小
*/
private static final int DEFAULT_SIZE = 500;
/**
* 默認線程存活時間
*/
private static final long DEFAULT_KEEP_ALIVE = 60L;
/**NewEntryServiceImpl.java:689
* Executor
*/
private static ExecutorService executor;
/**
* 執(zhí)行隊列
*/
private static BlockingQueue<Runnable> executeQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE);
static {
// 創(chuàng)建 Executor
// 此處默認最大值改為處理器數(shù)量的 4 倍
try {
executor = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
TimeUnit.SECONDS, executeQueue, FACTORY);
// 關(guān)閉事件的掛鉤
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
AsyncProcessor.LOGGER.info("AsyncProcessor shutting down.");
executor.shutdown();
try {
// 等待1秒執(zhí)行關(guān)閉
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
AsyncProcessor.LOGGER.error("AsyncProcessor shutdown immediately due to wait timeout.");
executor.shutdownNow();
}
} catch (InterruptedException e) {
AsyncProcessor.LOGGER.error("AsyncProcessor shutdown interrupted.");
executor.shutdownNow();
}
AsyncProcessor.LOGGER.info("AsyncProcessor shutdown complete.");
}
}));
} catch (Exception e) {
LOGGER.error("AsyncProcessor init error.", e);
throw new ExceptionInInitializerError(e);
}
}
/**
* 此類型無法實例化
*/
private AsyncProcessor() {
}
/**
* 執(zhí)行任務(wù),不管是否成功<br>
* 其實也就是包裝以后的 {@link Executer} 方法
*
* @param task
* @return
*/
public static boolean executeTask(Runnable task) {
try {
executor.execute(task);
} catch (RejectedExecutionException e) {
LOGGER.error("Task executing was rejected.", e);
return false;
}
return true;
}
/**
* 提交任務(wù),并可以在稍后獲取其執(zhí)行情況<br>
* 當(dāng)提交失敗時,會拋出 {@link }
*
* @param task
* @return
*/
public static <T> Future<T> submitTask(Callable<T> task) {
try {
return executor.submit(task);
} catch (RejectedExecutionException e) {
LOGGER.error("Task executing was rejected.", e);
throw new UnsupportedOperationException("Unable to submit the task, rejected.", e);
}
}
}
使用方式:
AsyncProcessQueue.execute(new Runnable() {
@Override
public void run() {
//do something
}
});
可以根據(jù)自己的使用場景靈活變更,我這里并沒有用到beforeExecute和afterExecute以及拒絕策略。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。如有錯誤或未考慮完全的地方,望不吝賜教。
相關(guān)文章
Elasticsearch倒排索引詳解及實際應(yīng)用中的優(yōu)化
Elasticsearch(ES)使用倒排索引來加速文本的搜索速度,倒排索引之所以高效,主要是因為它改變了數(shù)據(jù)的組織方式,使得查詢操作可以快速完成,這篇文章主要給大家介紹了關(guān)于Elasticsearch倒排索引詳解及實際應(yīng)用中優(yōu)化的相關(guān)資料,需要的朋友可以參考下2024-08-08
spring boot切面execution表達式添加多個包路徑問題及解決方案
在Spring Boot中,如果你想為多個包中的方法創(chuàng)建一個切面,你可以在@Pointcut注解中使用||操作符來指定多個包,下面給大家分享spring boot切面execution表達式添加多個包路徑問題及解決方案,感興趣的朋友跟隨小編一起看看吧2024-03-03
詳解Spring的兩種代理方式:JDK動態(tài)代理和CGLIB動態(tài)代理
這篇文章主要介紹了詳解Spring的兩種代理方式:JDK動態(tài)代理和CGLIB動態(tài)代理,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-04-04
spring中ApplicationListener的使用小結(jié)
ApplicationListener是spring提供的一個監(jiān)聽器,本文主要介紹了spring中ApplicationListener的使用小結(jié),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-07-07
JDBC插入數(shù)據(jù)返回數(shù)據(jù)主鍵代碼實例
這篇文章主要介紹了JDBC插入數(shù)據(jù)返回數(shù)據(jù)主鍵代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-11-11

