Java如何優(yōu)雅關閉異步中的ExecutorService
1.ExecutorService的核心價值
在并發(fā)編程領域,Java的ExecutorService(位于java.util.concurrent包)是線程池管理的關鍵接口。作為Executor框架的核心組件,它通過解耦任務提交與執(zhí)行機制,為開發(fā)者提供了:
- 線程生命周期管理自動化
- 任務隊列智能調(diào)度
- 資源復用優(yōu)化機制
- 異步執(zhí)行結果追蹤能力
2.關閉機制的必要性
不正確的線程池關閉會導致:
- 內(nèi)存泄漏(滯留線程無法回收)
- 應用無法正常終止(非守護線程保持活躍)
- 任務狀態(tài)不一致(突然中斷導致數(shù)據(jù)問題)
- 系統(tǒng)資源耗盡(無限制線程創(chuàng)建)
3.shutdown()方法詳解
3.1 方法特性
void shutdown()
狀態(tài)轉(zhuǎn)換
將線程池狀態(tài)設置為SHUTDOWN,觸發(fā)以下行為:
- 拒絕新任務提交(觸發(fā)RejectedExecutionHandler)
- 繼續(xù)執(zhí)行已存在的任務:
- 正在執(zhí)行的任務(running tasks)
- 等待隊列中的任務(queued tasks)
典型應用場景
ExecutorService executor = Executors.newFixedThreadPool(4);
// 提交多個任務...
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("仍有任務未在時限內(nèi)完成");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
3.2 內(nèi)部運作機制
- 原子性狀態(tài)更新:CAS操作修改線程池控制狀態(tài)
- 中斷空閑線程:僅中斷等待任務的Worker線程
- 隊列消費保證:完全處理BlockingQueue中的剩余任務
4.shutdownNow()方法剖析
4.1 方法定義
List<Runnable> shutdownNow()
狀態(tài)轉(zhuǎn)換
將線程池狀態(tài)設置為STOP,觸發(fā):
- 立即拒絕新任務
- 中斷所有工作線程(無論是否在執(zhí)行任務)
- 清空任務隊列,返回未執(zhí)行任務列表
4.2 中斷處理要點
executor.shutdownNow();
// 典型返回值處理
List<Runnable> unprocessed = executor.shutdownNow();
if (!unprocessed.isEmpty()) {
logger.warn("丟棄{}個未執(zhí)行任務", unprocessed.size());
}
任務中斷條件
只有當任務代碼正確處理中斷時才能被終止:
class InterruptibleTask implements Runnable {
public void run() {
while (!Thread.currentThread().isInterrupted()) {
// 執(zhí)行可中斷的操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 重置中斷狀態(tài)
break;
}
}
}
}
5.對比分析
| 特性 | shutdown() | shutdownNow() |
|---|---|---|
| 新任務接受 | 立即拒絕 | 立即拒絕 |
| 運行中任務處理 | 等待完成 | 嘗試中斷 |
| 隊列任務處理 | 全部執(zhí)行 | 清除并返回 |
| 返回值 | void | 未執(zhí)行任務列表 |
| 適用場景 | 優(yōu)雅關閉 | 緊急終止 |
| 線程中斷策略 | 僅中斷空閑線程 | 強制中斷所有線程 |
6.最佳實踐代碼示例
6.1 標準關閉模板
public class GracefulShutdownExample {
// 定義超時時間和時間單位(30秒)
private static final int TIMEOUT = 30;
private static final TimeUnit UNIT = TimeUnit.SECONDS;
// 執(zhí)行任務的方法,接收一個任務列表并將其提交給線程池執(zhí)行
public void executeTasks(List<Runnable> tasks) {
// 創(chuàng)建一個固定大小的線程池,大小為系統(tǒng)可用處理器核心數(shù)
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
try {
// 將任務列表中的每個任務提交到線程池
tasks.forEach(executor::submit);
} finally {
// 在所有任務提交完后,禁用線程池接收新任務,開始優(yōu)雅關閉線程池
executor.shutdown(); // 禁止再提交新任務
try {
// 等待線程池中的任務在指定超時內(nèi)完成,如果超時未完成,則強制關閉線程池
if (!executor.awaitTermination(TIMEOUT, UNIT)) {
// 如果未能在超時內(nèi)完成,則調(diào)用 shutdownNow() 強制終止所有活動任務
List<Runnable> unfinished = executor.shutdownNow();
// 處理未完成的任務,例如記錄日志或重新提交
handleUnfinishedTasks(unfinished);
}
} catch (InterruptedException e) {
// 如果在等待終止時被中斷,恢復中斷狀態(tài)并強制關閉線程池
Thread.currentThread().interrupt();
executor.shutdownNow();
}
}
}
// 處理未完成任務的方法,這里我們打印未完成任務的數(shù)量
private void handleUnfinishedTasks(List<Runnable> tasks) {
// 如果有未完成的任務,打印任務數(shù)量并執(zhí)行額外的處理
if (!tasks.isEmpty()) {
System.out.println("未完成任務數(shù): " + tasks.size());
// 可在此處記錄日志、重新排隊未完成的任務等
}
}
}構造線程池: Executors.newFixedThreadPool() 創(chuàng)建一個固定大小的線程池,大小為系統(tǒng)可用的處理器核心數(shù),這樣可以更高效地利用 CPU 資源。
提交任務: 使用 tasks.forEach(executor::submit) 提交每個任務到線程池中執(zhí)行。
優(yōu)雅關閉線程池:
executor.shutdown()禁用線程池接收新任務,但仍會執(zhí)行已經(jīng)提交的任務。awaitTermination()方法用于等待所有任務執(zhí)行完成。如果超時后任務未完成,則調(diào)用shutdownNow()強制關閉線程池,停止所有正在運行的任務,并返回未完成的任務。
處理中斷: 如果在等待終止過程中發(fā)生 InterruptedException,線程會恢復中斷狀態(tài),并且強制關閉線程池。
處理未完成任務: handleUnfinishedTasks() 方法會處理未完成的任務,比如記錄日志或者重新排隊未完成的任務。
6.2 帶回調(diào)的增強實現(xiàn)
public class EnhancedExecutorManager {
// 定義線程池對象
private final ExecutorService executor;
// 定義超時時間及單位
private final long timeout;
private final TimeUnit unit;
// 構造函數(shù),初始化線程池并設置超時時間和單位
public EnhancedExecutorManager(int corePoolSize, long timeout, TimeUnit unit) {
// 創(chuàng)建一個核心池大小為 corePoolSize,最大池大小為 corePoolSize * 2,最大空閑時間 60秒的線程池
this.executor = new ThreadPoolExecutor(
corePoolSize, // 核心線程池大小
corePoolSize * 2, // 最大線程池大小
60L, TimeUnit.SECONDS, // 空閑線程的存活時間
new LinkedBlockingQueue<>(1000), // 使用容量為 1000 的隊列來緩存任務
new CustomThreadFactory(), // 自定義線程工廠
new ThreadPoolExecutor.CallerRunsPolicy() // 當任務無法提交時,調(diào)用者線程執(zhí)行該任務
);
this.timeout = timeout; // 設置超時時間
this.unit = unit; // 設置超時時間單位
}
// 優(yōu)雅關閉線程池的方法
public void shutdown() {
executor.shutdown(); // 首先嘗試正常關閉線程池,不再接收新的任務
try {
// 如果線程池未能在指定的超時時間內(nèi)終止,則強制關閉
if (!executor.awaitTermination(timeout, unit)) {
System.out.println("強制終止線程池...");
// 強制停止所有正在執(zhí)行的任務并返回丟棄的任務列表
List<Runnable> droppedTasks = executor.shutdownNow();
System.out.println("丟棄任務數(shù): " + droppedTasks.size());
}
} catch (InterruptedException e) {
// 如果在等待過程中線程池關閉操作被中斷,立即強制關閉并恢復中斷狀態(tài)
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
// 自定義線程工廠類,用于創(chuàng)建線程
private static class CustomThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1); // 線程池編號,用于生成線程名
private final ThreadGroup group; // 線程組
private final AtomicInteger threadNumber = new AtomicInteger(1); // 線程編號
private final String namePrefix; // 線程名稱前綴
CustomThreadFactory() {
// 獲取當前系統(tǒng)的安全管理器,如果沒有,則使用當前線程的線程組
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
// 設置線程池的名稱前綴
namePrefix = "pool-" +
poolNumber.getAndIncrement() + // 線程池編號遞增
"-thread-";
}
// 創(chuàng)建新線程的方法
public Thread newThread(Runnable r) {
// 創(chuàng)建新的線程,線程組、名稱及優(yōu)先級均已設置
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0); // 默認優(yōu)先級和daemon設置
// 如果線程是守護線程,則將其設置為非守護線程
if (t.isDaemon())
t.setDaemon(false);
// 設置線程優(yōu)先級為默認
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t; // 返回新創(chuàng)建的線程
}
}
}線程池初始化:
EnhancedExecutorManager的構造方法使用ThreadPoolExecutor創(chuàng)建一個線程池,線程池大小通過corePoolSize參數(shù)傳遞。線程池的最大線程數(shù)是核心線程數(shù)的兩倍。LinkedBlockingQueue用作任務隊列,大小為 1000。若任務量超過隊列容量,則使用CallerRunsPolicy策略,即由提交任務的線程執(zhí)行該任務。- 使用自定義的
CustomThreadFactory來創(chuàng)建線程。
優(yōu)雅關閉線程池:
shutdown()方法首先調(diào)用executor.shutdown()來拒絕接受新的任務,然后等待線程池在指定的超時時間內(nèi)關閉。- 如果線程池在超時時間內(nèi)未能正常關閉,則調(diào)用
shutdownNow()強制關閉并丟棄未執(zhí)行的任務,同時輸出丟棄任務的數(shù)量。 - 如果在等待關閉過程中發(fā)生
InterruptedException,會強制關閉線程池,并恢復中斷狀態(tài)。
自定義線程工廠:
CustomThreadFactory通過實現(xiàn)ThreadFactory接口來定義創(chuàng)建線程的行為,主要包括線程組、線程名稱、守護線程狀態(tài)和線程優(yōu)先級的配置。- 每個線程的名稱遵循
pool-編號-thread-編號的格式。線程池的編號是遞增的,每個線程有自己的編號。
7.關鍵注意事項
- 守護線程問題:默認創(chuàng)建的是非守護線程,需顯式關閉
- 中斷策略一致性:任務必須實現(xiàn)正確的中斷處理邏輯
- 拒絕策略配合:合理配置RejectedExecutionHandler
- 資源釋放順序:數(shù)據(jù)庫連接等資源應先于線程池關閉
- 監(jiān)控機制:建議集成線程池監(jiān)控(如JMX)
8.高級應用場景
分級關閉策略
public class TieredShutdownManager {
// 定義三個優(yōu)先級的線程池列表:高優(yōu)先級、中優(yōu)先級、低優(yōu)先級
private final List<ExecutorService> highPriority;
private final List<ExecutorService> normalPriority;
private final List<ExecutorService> lowPriority;
// 公共方法用于優(yōu)雅關閉所有線程池
public void gracefulShutdown() {
// 依次關閉高、中、低優(yōu)先級的線程池
shutdownTier(highPriority, 10, TimeUnit.SECONDS);
shutdownTier(normalPriority, 30, TimeUnit.SECONDS);
shutdownTier(lowPriority, 60, TimeUnit.SECONDS);
}
// 私有方法,用于優(yōu)雅關閉指定優(yōu)先級的線程池
private void shutdownTier(List<ExecutorService> tier, long timeout, TimeUnit unit) {
// 對指定的線程池列表執(zhí)行關閉操作
tier.forEach(ExecutorService::shutdown);
// 對每個線程池執(zhí)行等待終止的操作,指定超時時間
tier.forEach(executor -> {
try {
// 如果線程池未在超時時間內(nèi)終止,則調(diào)用 shutdownNow 強制關閉
if (!executor.awaitTermination(timeout, unit)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
// 如果在等待終止過程中線程被中斷,恢復中斷狀態(tài)并強制關閉線程池
Thread.currentThread().interrupt();
executor.shutdownNow();
}
});
}
}gracefulShutdown 方法按照優(yōu)先級順序依次關閉高、中、低優(yōu)先級的線程池。
shutdownTier 方法首先嘗試正常關閉每個線程池(調(diào)用 shutdown),然后通過 awaitTermination 方法等待線程池在指定的時間內(nèi)結束,如果未成功結束,則調(diào)用 shutdownNow 強制關閉。
在關閉過程中,如果發(fā)生中斷,則會捕獲 InterruptedException 異常,并且中斷當前線程,同時強制關閉線程池。
9.性能優(yōu)化建議
根據(jù)任務類型選擇隊列策略:
- CPU密集型:有界隊列(ArrayBlockingQueue)
- IO密集型:無界隊列(LinkedBlockingQueue)
監(jiān)控關鍵指標:
ThreadPoolExecutor executor = (ThreadPoolExecutor) service;
System.out.println("活躍線程數(shù): " + executor.getActiveCount());
System.out.println("完成任務數(shù): " + executor.getCompletedTaskCount());
System.out.println("隊列大小: " + executor.getQueue().size());
動態(tài)調(diào)整參數(shù):
executor.setCorePoolSize(newSize); executor.setMaximumPoolSize(newMaxSize);
10.總結建議
根據(jù)Oracle官方文檔建議,在大多數(shù)生產(chǎn)場景中推薦以下關閉流程:
- 優(yōu)先調(diào)用shutdown()
- 設置合理的awaitTermination超時
- 必要時調(diào)用shutdownNow()
- 始終處理返回的未完成任務
- 記錄完整的關閉日志
正確選擇關閉策略需要綜合考量:
- 任務重要性等級
- 系統(tǒng)資源限制
- 業(yè)務連續(xù)性需求
- 數(shù)據(jù)一致性要求
到此這篇關于Java如何優(yōu)雅關閉異步中的ExecutorService的文章就介紹到這了,更多相關Java關閉ExecutorService內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
java科學計數(shù)法轉(zhuǎn)換成數(shù)字的幾種方法
我們在處理大數(shù)值的時候,常常會遇到使用科學計數(shù)法表示的數(shù)字,科學計數(shù)法是一種表示大數(shù)值或小數(shù)值的方式,下面這篇文章主要給大家介紹了關于java科學計數(shù)法轉(zhuǎn)換成數(shù)字的幾種方法,需要的朋友可以參考下2024-03-03
SpringBoot熱部署Springloaded實現(xiàn)過程解析
這篇文章主要介紹了SpringBoot熱部署Springloaded實現(xiàn)過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-03-03
關于springboot集成swagger3時spring-plugin-core報錯的問題
這篇文章主要介紹了關于springboot集成swagger3時spring-plugin-core報錯的問題,本文給大家分享解決方法,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-09-09
java使用Nagao算法實現(xiàn)新詞發(fā)現(xiàn)、熱門詞的挖掘
這篇文章主要介紹了java使用Nagao算法實現(xiàn)新詞發(fā)現(xiàn)、熱門詞的挖掘的思路和詳細代碼,需要的朋友可以參考下2015-07-07
解決SpringBoot中MultipartResolver和ServletFileUpload的沖突問題
這篇文章主要介紹了解決SpringBoot中MultipartResolver和ServletFileUpload的沖突問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10

