Java中的ForkJoinPool使用方法詳解(附案例)

一、ForkJoinPool 是什么?
ForkJoinPool 是 Java 7 引入的一個線程池實現(xiàn),位于 java.util.concurrent 包下,專為分治算法(Divide-and-Conquer)設(shè)計,特別適合處理可分解為多個子任務(wù)的復(fù)雜任務(wù)。它的核心思想是將大任務(wù) “拆分”(fork)為小任務(wù),并行執(zhí)行后再 “合并”(join)結(jié)果,同時通過工作竊取(Work Stealing)機制提高線程利用率。
二、核心特點
- 工作竊取機制
每個線程都有自己的任務(wù)隊列(雙端隊列),當(dāng)線程完成自身任務(wù)后,會主動從其他線程的隊列中 "竊取"任務(wù)執(zhí)行,減少線程空閑時間。 (簡單理解:有兩個線程做兩個任務(wù),另一個線程做完了,會幫另一個線程分擔(dān)任務(wù),從而提高效率) - 分治任務(wù)模型
適合處理可拆分的任務(wù):通過 fork() 拆分任務(wù),通過 join()等待子任務(wù)完成并合并結(jié)果。 - 并行度控制
默認(rèn)并行度為 CPU 核心數(shù)(Runtime.getRuntime().availableProcessors()),也可手動指定。 - 輕量級線程管理
相比傳統(tǒng)線程池,F(xiàn)orkJoinPool 對線程的調(diào)度更高效,尤其適合計算密集型任務(wù)。
三、核心類與接口
- ForkJoinPool:線程池核心類,負(fù)責(zé)管理線程和任務(wù)調(diào)度。
- ForkJoinTask< v >:任務(wù)抽象類,是所有可在 ForkJoinPool 中執(zhí)行的任務(wù)的父類,主要子類有:
RecursiveAction:無返回值的任務(wù)(void)。
RecursiveTask:有返回值的任務(wù)(泛型 V)。
四、ForkJoinPool 方法介紹
1、構(gòu)造方法
用于創(chuàng)建 ForkJoinPool 實例,控制并行度、線程工廠和異常處理器等:
ForkJoinPool():
默認(rèn)構(gòu)造器:并行度為 CPU 核心數(shù)(Runtime.getRuntime().availableProcessors()),使用默認(rèn)線程工廠和異常處理器。ForkJoinPool(int parallelism):
指定并行度(期望的線程數(shù)),其他參數(shù)使用默認(rèn)值。
parallelism:并行級別,通常設(shè)為 CPU 核心數(shù)(非強制,實際線程數(shù)可能動態(tài)調(diào)整)。ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode)
全參數(shù)構(gòu)造器:
3.1、factory:創(chuàng)建工作線程的工廠(默認(rèn) DefaultForkJoinWorkerThreadFactory)。
3.2、handler:線程未捕獲異常的處理器(默認(rèn) null)。
3.3、asyncMode:任務(wù)隊列是否為異步模式(false 為 LIFO 模式,true 為 FIFO 模式,影響任務(wù)調(diào)度順序)。
2、提交任務(wù)的核心方法
用于向線程池提交 ForkJoinTask 任務(wù)(或其他類型任務(wù)):
- T invoke(ForkJoinTask task)
提交任務(wù)并阻塞等待其完成,返回任務(wù)結(jié)果。
示例:
ForkJoinPool pool = new ForkJoinPool(); Integer result = pool.invoke(new SumTask(array, 0, array.length)); // 阻塞至任務(wù)完成
- void execute(ForkJoinTask<?> task)
提交任務(wù)異步執(zhí)行(無返回值,不阻塞),任務(wù)結(jié)果需通過 task.join() 獲取。
示例:
SumTask task = new SumTask(array, 0, array.length); pool.execute(task); // 后續(xù)通過 task.join() 獲取結(jié)果(會阻塞)
ForkJoinTask submit(ForkJoinTask task)
提交任務(wù)異步執(zhí)行,返回 ForkJoinTask 對象,可通過其 get() 或 join() 方法獲取結(jié)果。
與 execute() 類似,但返回任務(wù)本身,更靈活。List<Future> invokeAll(Collection<? extends Callable> tasks)
批量提交 Callable 任務(wù),阻塞等待所有任務(wù)完成,返回結(jié)果列表(類似普通線程池的 invokeAll)。
3、任務(wù)執(zhí)行相關(guān)方法
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
阻塞等待線程池終止(所有任務(wù)完成且關(guān)閉),超時后返回 false。
需配合 shutdown() 使用。ForkJoinTask<?> pollSubmission()
從提交隊列中獲取并移除一個未執(zhí)行的任務(wù)(主要用于內(nèi)部調(diào)度,一般不直接調(diào)用)。int getQueuedSubmissionCount()
返回等待執(zhí)行的提交任務(wù)數(shù)量。
4、線程池狀態(tài)管理
void shutdown()
平緩關(guān)閉線程池:不再接受新任務(wù),等待已提交任務(wù)完成后終止線程。List shutdownNow()
立即關(guān)閉線程池:嘗試中斷正在執(zhí)行的任務(wù),返回未執(zhí)行的任務(wù)列表。boolean isShutdown()
判斷線程池是否已關(guān)閉(調(diào)用過 shutdown() 或 shutdownNow())。boolean isTerminated()
判斷線程池是否已終止(所有任務(wù)完成且線程已退出)。
static ForkJoinPool commonPool()
返回公共線程池(靜態(tài)單例),適用于輕量級并行任務(wù),無需手動創(chuàng)建線程池。
示例:
ForkJoinPool.commonPool().invoke(new SumTask(array, 0, array.length));
5、獲取線程池信息
- int getParallelism()
返回創(chuàng)建時設(shè)置的并行度。 - int getPoolSize()
返回當(dāng)前線程池中的工作線程數(shù)量。 - int getActiveThreadCount()
返回正在執(zhí)行任務(wù)的活躍線程數(shù)量。 - long getStealCount()
返回工作竊取的總次數(shù)(反映線程利用效率,次數(shù)越高說明負(fù)載越均衡)。 - int getQueuedTaskCount()
返回所有工作線程的任務(wù)隊列中待執(zhí)行的任務(wù)總數(shù)。
6、異常處理
ForkJoinPool 中的任務(wù)異常會被包裝在 ExecutionException 中,可通過以下方式捕獲:
- 對于 RecursiveTask:調(diào)用 join() 或 get() 時,異常會被拋出(join() 拋出未檢查異常,get() 拋出受檢異常)。
- 對于 RecursiveAction:需重寫 completedAbnormally() 方法或通過 getException() 獲取異常。
五、使用步驟
- 定義任務(wù):繼承 RecursiveTask(有返回值)或 RecursiveAction(無返回值),重寫 compute() 方法。
- 在 compute() 中實現(xiàn)任務(wù)拆分邏輯:
若任務(wù)小到無需拆分,則直接執(zhí)行并返回結(jié)果。
若需要拆分,則創(chuàng)建子任務(wù),通過 fork() 提交子任務(wù),通過 join() 獲取結(jié)果并合并。 - 創(chuàng)建 ForkJoinPool 實例,提交任務(wù)并獲取結(jié)果。
1.計算數(shù)組總和(RecursiveTask)
下面用 ForkJoinPool 并行計算數(shù)組總和,展示分治思想:
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
// 有返回值的任務(wù):計算數(shù)組指定范圍的和
class SumTask extends RecursiveTask<Integer> {
// 任務(wù)拆分閾值(小于該值則不拆分)
private static final int THRESHOLD = 1000;
private int[] array;
private int start;
private int end;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
// 若任務(wù)足夠小,直接計算
if (end - start <= THRESHOLD) {
int sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// 拆分任務(wù):分為左右兩個子任務(wù)
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, mid); // 左半部分任務(wù)
SumTask rightTask = new SumTask(array, mid, end); // 右半部分任務(wù)
// 執(zhí)行子任務(wù)(fork() 會將任務(wù)提交到線程池,可能并行執(zhí)行(非阻塞)
leftTask.fork();
// 直接執(zhí)行右任務(wù)(當(dāng)前線程可參與執(zhí)行,減少線程創(chuàng)建)
int rightSum = rightTask.compute();
// 等待左任務(wù)完成并獲取結(jié)果
int leftSum = leftTask.join();
// 合并結(jié)果
return leftSum + rightSum;
}
}
}
public class ForkJoinPoolExample {
public static void main(String[] args) {
// 創(chuàng)建測試數(shù)組
int[] array = new int[10_000_000];
for (int i = 0; i < array.length; i++) {
array[i] = i + 1; // 1~10,000,000
}
// 創(chuàng)建ForkJoinPool(默認(rèn)并行度為CPU核心數(shù))
try (ForkJoinPool pool = new ForkJoinPool()) {
// 提交任務(wù)
SumTask task = new SumTask(array, 0, array.length);
Integer result = pool.invoke(task); // 阻塞等待結(jié)果
System.out.println("數(shù)組總和:" + result); // 預(yù)期結(jié)果:50000005000000
}
}
}
2.示例:無返回值任務(wù)(RecursiveAction)
如果任務(wù)無需返回值,可繼承 RecursiveAction,重寫 compute() 方法:
代碼如下(示例):
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
// 無返回值的任務(wù):打印數(shù)組指定范圍元素
class PrintTask extends RecursiveAction {
private static final int THRESHOLD = 10;
private int[] array;
private int start;
private int end;
public PrintTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start <= THRESHOLD) {
// 直接打印
for (int i = start; i < end; i++) {
System.out.print(array[i] + " ");
}
} else {
// 拆分任務(wù)
int mid = (start + end) / 2;
PrintTask leftTask = new PrintTask(array, start, mid);
PrintTask rightTask = new PrintTask(array, mid, end);
leftTask.fork();
rightTask.fork(); // 也可直接執(zhí)行rightTask.compute()
// 等待子任務(wù)完成(無返回值,僅等待)
leftTask.join();
rightTask.join();
}
}
}
public class RecursiveActionExample {
public static void main(String[] args) {
int[] array = new int[30];
for (int i = 0; i < array.length; i++) {
array[i] = i + 1;
}
try (ForkJoinPool pool = new ForkJoinPool()) {
pool.invoke(new PrintTask(array, 0, array.length));
}
}
}
3.示例:分批量保存數(shù)據(jù)(RecursiveAction)
@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {
@Resource
private UserService userService;
/**
* 批量新增用戶數(shù)據(jù)
*/
private void createBomPackage(List<User> users) {
AtomicReference<Exception> exceptionRef = new AtomicReference<>();
if (CollUtil.isNotEmpty(users)) {
EXECUTOR.execute(() -> {
ForkJoinPool forkJoinPool = new ForkJoinPool();
try {
forkJoinPool.submit(new SumRecursiveTask(users, userService));
} catch (Exception e) {
log.error("新增用戶數(shù)據(jù)失敗", e);
exceptionRef.set(e);
} finally {
if (!forkJoinPool.isShutdown()) {
forkJoinPool.shutdown();
}
}
});
if (exceptionRef.get() != null) {
exceptionRef.get().printStackTrace();
throw new BaseException("異步任務(wù)執(zhí)行新增用戶異常");
}
}
}
/**
* 分批創(chuàng)建類
*/
private static class SumRecursiveTask extends RecursiveAction {
private static final int MAX_STRIDE = 200; //可以處理任務(wù)
@Resource
private final UserService userService;
private final List<User> users;
public SumRecursiveTask(List<User> users, UserService userService) {
this.userService= userService; //保存的service接口
this.users= users; //你要保存的List集合
}
@Override
protected void compute() {
int size = users.size(); //本次處理大小
if (size <= MAX_STRIDE) { //如果小于設(shè)定閾值,直接保存
userService.saveBatch(temp);
} else { // 如果大于,將任務(wù)拆分,分而治之。
int middle = (size) / 2;
SumRecursiveTask left = new SumRecursiveTask(temp.subList(0, middle), userService);
SumRecursiveTask right = new SumRecursiveTask(temp.subList(middle, size), userService);
left.fork(); // 提交左任務(wù)(非阻塞)
right.fork(); // 也可直接執(zhí)行rightTask.compute()
}
}
}
六、工作原理
ForkJoinPool 的核心工作原理可以概括為 “分治(Fork)+ 合并(Join)+ 工作竊?。╓ork Stealing)”。下面通過一個具體的例子(計算數(shù)組總和)來詳細(xì)說明其運作過程。
1、場景:計算大數(shù)組的總和
假設(shè)我們有一個包含 8 個元素的數(shù)組 [1, 2, 3, 4, 5, 6, 7, 8],需要計算所有元素的總和。如果用單線程計算,直接遍歷累加即可;但用 ForkJoinPool 時,會通過 “分治” 將大任務(wù)拆分成小任務(wù),并行執(zhí)行后再合并結(jié)果,同時通過 “工作竊取” 提高線程利用率。
2、步驟 1:定義分治任務(wù)(RecursiveTask)
首先需要創(chuàng)建一個繼承 RecursiveTask(有返回值的任務(wù))的子類,重寫 compute() 方法,實現(xiàn) “拆分任務(wù)” 和 “合并結(jié)果” 的邏輯。
import java.util.concurrent.RecursiveTask;
// 計算數(shù)組總和的任務(wù)(有返回值,用 RecursiveTask)
class SumTask extends RecursiveTask<Integer> {
// 任務(wù)拆分的閾值:當(dāng)數(shù)組長度 <= 2 時,直接計算(不再拆分)
private static final int THRESHOLD = 2;
private int[] array; // 目標(biāo)數(shù)組
private int start; // 起始索引
private int end; // 結(jié)束索引(不包含)
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int length = end - start;
// 若任務(wù)足夠?。ㄐ∮陂撝担?,直接計算結(jié)果
if (length <= THRESHOLD) {
int sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
System.out.println("直接計算:" + start + "~" + end + ",結(jié)果=" + sum);
return sum;
}
// 否則,拆分任務(wù)(Fork)
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, mid); // 左半部分任務(wù)
SumTask rightTask = new SumTask(array, mid, end); // 右半部分任務(wù)
// 執(zhí)行子任務(wù)(fork() 會將任務(wù)提交到線程池,可能并行執(zhí)行)
leftTask.fork(); // 拆分左任務(wù)
rightTask.fork(); // 拆分右任務(wù)
// 合并子任務(wù)結(jié)果(join() 會等待子任務(wù)完成并獲取結(jié)果)
int leftResult = leftTask.join();
int rightResult = rightTask.join();
int total = leftResult + rightResult;
System.out.println("合并:" + start + "~" + end + ",左=" + leftResult + ",右=" + rightResult + ",總=" + total);
return total;
}
}
3、步驟 2:用 ForkJoinPool 執(zhí)行任務(wù)
創(chuàng)建 ForkJoinPool 實例,提交任務(wù)并獲取結(jié)果:
import java.util.concurrent.ForkJoinPool;
public class ForkJoinDemo {
public static void main(String[] args) {
int[] array = {1, 2, 3, 4, 5, 6, 7, 8};
// 創(chuàng)建 ForkJoinPool(默認(rèn)并行度為 CPU 核心數(shù),這里假設(shè)為 2 個核心)
try (ForkJoinPool pool = new ForkJoinPool()) {
// 提交根任務(wù)(計算整個數(shù)組的和)
SumTask rootTask = new SumTask(array, 0, array.length);
int result = pool.invoke(rootTask); // 阻塞等待結(jié)果
System.out.println("最終總和:" + result); // 輸出 36
}
}
}
4、步驟 3:詳解工作原理(分治 + 工作竊取)
假設(shè) ForkJoinPool 的并行度為 2(即初始有 2 個工作線程 Thread-1 和 Thread-2),整個執(zhí)行過程如下:
- 分治(Fork):拆分任務(wù)
根任務(wù):SumTask(0~8)(計算整個數(shù)組的和)。由于數(shù)組長度為 8(> 閾值 2),根任務(wù)會拆分成兩個子任務(wù):
- 左任務(wù):SumTask(0~4)(計算前 4 個元素)。
- 右任務(wù):SumTask(4~8)(計算后 4 個元素)。
根任務(wù)調(diào)用 leftTask.fork() 和 rightTask.fork(),將兩個子任務(wù)提交到線程池,由 Thread-1 和 Thread-2 分別執(zhí)行。
- 子任務(wù)繼續(xù)拆分
- Thread-1 執(zhí)行 SumTask(0~4):
數(shù)組長度為 4(> 閾值 2),繼續(xù)拆分:- 左子任務(wù):SumTask(0~2)(元素 [1,2])。
- 右子任務(wù):SumTask(2~4)(元素 [3,4])。
- Thread-1 調(diào)用 fork() 提交這兩個子任務(wù),然后等待它們的結(jié)果(join())。
- Thread-2 執(zhí)行 SumTask(4~8):
數(shù)組長度為 4(> 閾值 2),繼續(xù)拆分:- 左子任務(wù):SumTask(4~6)(元素 [5,6])。
- 右子任務(wù):SumTask(6~8)(元素 [7,8])。
- Thread-2 調(diào)用 fork() 提交這兩個子任務(wù),然后等待結(jié)果(join())。
- 執(zhí)行最小任務(wù)(達到閾值)
當(dāng)子任務(wù)的數(shù)組長度 ≤ 閾值(2)時,不再拆分,直接計算結(jié)果:
- SumTask(0~2) 計算 1+2=3(假設(shè)由 Thread-1 執(zhí)行)。
- SumTask(2~4) 計算 3+4=7(假設(shè) Thread-1 執(zhí)行完左子任務(wù)后,繼續(xù)執(zhí)行右子任務(wù))。
- SumTask(4~6) 計算 5+6=11(假設(shè)由 Thread-2 執(zhí)行)。
- SumTask(6~8) 計算 7+8=15(假設(shè) Thread-2 執(zhí)行完左子任務(wù)后,繼續(xù)執(zhí)行右子任務(wù))。
- 合并結(jié)果(Join)
子任務(wù)完成后,上層任務(wù)會合并結(jié)果:
- SumTask(0~4) 合并 3 + 7 = 10。
- SumTask(4~8) 合并 11 + 15 = 26。
- 根任務(wù) SumTask(0~8) 合并 10 + 26 = 36(最終結(jié)果)。
- 工作竊取(Work Stealing):提升效率的關(guān)鍵
假設(shè) Thread-1 先完成了自己的所有子任務(wù)(0~2 和 2~4),而 Thread-2 還在處理 6~8。此時 Thread-1 的任務(wù)隊列已空,它會主動 “竊取”Thread-2 隊列中未執(zhí)行的任務(wù)(比如 6~8),幫助 Thread-2 執(zhí)行,避免線程空閑。
- 竊取規(guī)則:線程的任務(wù)隊列是 “雙端隊列”(Deque)。自己的任務(wù)從隊列頭部(LIFO)取,竊取的任務(wù)從其他隊列的尾部(FIFO)取,減少競爭。
- 效果:避免某一線程任務(wù)過多而其他線程空閑,充分利用多核 CPU 資源。
5、核心原理總結(jié)
- 分治(Fork):將大任務(wù)遞歸拆分成小任務(wù),直到任務(wù)足夠?。ㄟ_到閾值),適合并行處理。
- 合并(Join):等待所有子任務(wù)完成后,匯總結(jié)果得到最終答案。
- 工作竊取(Work Stealing):空閑線程主動竊取其他線程的任務(wù),平衡負(fù)載,最大化利用 CPU 核心。
七、注意事項
- 任務(wù)拆分粒度:拆分閾值(THRESHOLD)需合理設(shè)置,過細(xì)會增加任務(wù)調(diào)度開銷,過粗則無法充分利用并行性。
- 異常處理:任務(wù)中拋出的異常會被包裝在 ExecutionException 中,需通過 get() 或 join() 捕獲。
- 資源管理:ForkJoinPool 實現(xiàn)了 AutoCloseable 接口,建議用 try-with-resources 自動關(guān)閉。
- 適用場景:適合計算密集型任務(wù)(如大數(shù)據(jù)量計算、排序等),不適合 I/O 密集型任務(wù)(線程等待時間長,工作竊取效率低)。
- 與 Executors.newWorkStealingPool() 的關(guān)系:newWorkStealingPool() 本質(zhì)上是 ForkJoinPool 的封裝(Java 8+),默認(rèn)并行度為 CPU 核心數(shù)。
ForkJoinPool 是 Java 并行計算的重要工具,尤其在處理大規(guī)模數(shù)據(jù)并行任務(wù)時,能有效利用多核 CPU 提升效率。
到此這篇關(guān)于Java中的ForkJoinPool使用方法的文章就介紹到這了,更多相關(guān)Java中ForkJoinPool使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringCloud_Eureka服務(wù)注冊與發(fā)現(xiàn)基礎(chǔ)及構(gòu)建步驟
Eureka服務(wù)注冊中心,主要用于提供服務(wù)注冊功能,當(dāng)微服務(wù)啟動時,會將自己的服務(wù)注冊到Eureka Server,這篇文章主要介紹了SpringCloud中Eureka的配置及詳細(xì)使用,需要的朋友可以參考下2023-01-01
解決springboot沒有啟動標(biāo)識,啟動類也沒有啟動標(biāo)識的問題
這篇文章主要介紹了解決springboot沒有啟動標(biāo)識,啟動類也沒有啟動標(biāo)識的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01
Java中ScheduledExecutorService的使用方法詳解
ScheduledExecutorService是ExecutorService的一個子接口,它主要用于在給定的延遲之后或周期性地執(zhí)行任務(wù),本文主要介紹了ScheduledExecutorService的使用方法,感興趣的可以了解下2024-12-12
Java微信公眾平臺開發(fā)(12) 微信用戶信息的獲取
這篇文章主要為大家詳細(xì)介紹了Java微信公眾平臺開發(fā)第十二步,微信用戶信息的獲取,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-04-04
Spring Boot Security 結(jié)合 JWT 實現(xiàn)無狀態(tài)的分布式API接口
JSON Web Token(縮寫 JWT)是目前最流行的跨域認(rèn)證解決方案。這篇文章主要介紹了Spring Boot Security 結(jié)合 JWT 實現(xiàn)無狀態(tài)的分布式API接口 ,需要的朋友可以參考下2019-04-04

