Java線程池ForkJoinPool(工作竊取算法)的使用
概述

Fork 就是把一個大任務(wù)切分為若干個子任務(wù)并行地執(zhí)行,Join 就是合并這些子任務(wù)的執(zhí)行結(jié)果,最后得到這個大任務(wù)的結(jié)果。Fork/Join 框架使用的是工作竊取算法。
工作竊取算法
工作竊取算法是指某個線程從其他隊列里竊取任務(wù)來執(zhí)行。對于一個比較大的任務(wù),可以把它分割為若干個互不依賴的子任務(wù),為了減少線程間的競爭,把這些子任務(wù)分別放到不同的隊列里,并為每個隊列創(chuàng)建一個單獨的線程來執(zhí)行隊列里的任務(wù),線程和隊列一一對應(yīng)。但是,有的線程會先把自己隊列里的任務(wù)干完,而其他線程對應(yīng)的隊列里還有任務(wù)需要處理,于是它就去其他線程的隊列里竊取一個任務(wù)來執(zhí)行。由于此時它們訪問同一個隊列,為了減小競爭,通常會使用雙端隊列。被竊取任務(wù)的線程永遠從雙端隊列的頭部獲取任務(wù),竊取任務(wù)的線程永遠從雙端隊列的尾部獲取任務(wù)。

工作竊取算法的優(yōu)缺點
優(yōu)點:充分利用線程進行并行計算,減少了線程間的競爭。
缺點:雙端隊列只存在一個任務(wù)時會導(dǎo)致競爭,會消耗更多的系統(tǒng)資源,因為需要創(chuàng)建多個線程和多個雙端隊列。
使用 ForkJoinPool 進行分叉和合并
ForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很相似,除了一點不同。ForkJoinPool 讓我們可以很方便地把任務(wù)分成幾個更小的任務(wù),這些分出來的任務(wù)也將會提交給 ForkJoinPool。任務(wù)可以繼續(xù)分割成更小的子任務(wù),只要它還能分割。可能聽起來有些抽象,因此本節(jié)中我們將會解釋 ForkJoinPool 是如何工作的,還有任務(wù)分割是如何進行的。
分叉和合并解釋
在我們開始看 ForkJoinPool 之前我們先來簡要解釋一下分叉和合并的原理。
分叉和合并原理包含兩個遞歸進行的步驟。兩個步驟分別是分叉步驟和合并步驟。
分叉
一個使用了分叉和合并原理的任務(wù)可以將自己分叉(分割)為更小的子任務(wù),這些子任務(wù)可以被并發(fā)執(zhí)行。如下圖所示:

通過把自己分割成多個子任務(wù),每個子任務(wù)可以由不同的 CPU 并行執(zhí)行,或者被同一個 CPU 上的不同線程執(zhí)行。
只有當(dāng)給的任務(wù)過大,把它分割成幾個子任務(wù)才有意義。把任務(wù)分割成子任務(wù)有一定開銷,因此對于小型任務(wù),這個分割的消耗可能比每個子任務(wù)并發(fā)執(zhí)行的消耗還要大。
什么時候把一個任務(wù)分割成子任務(wù)是有意義的,這個界限也稱作一個閥值。這要看每個任務(wù)對有意義閥值的決定。很大程度上取決于它要做的工作的種類。
合并
當(dāng)一個任務(wù)將自己分割成若干子任務(wù)之后,該任務(wù)將進入等待所有子任務(wù)的結(jié)束之中。一旦子任務(wù)執(zhí)行結(jié)束,該任務(wù)可以把所有結(jié)果合并到同一個結(jié)果。圖示如下:

當(dāng)然,并非所有類型的任務(wù)都會返回一個結(jié)果。如果這個任務(wù)并不返回一個結(jié)果,它只需等待所有子任務(wù)執(zhí)行完畢。也就不需要結(jié)果的合并啦。
ForkJoinPool使用
ForkJoinPool 是一個特殊的線程池,它的設(shè)計是為了更好的配合 分叉-和-合并 任務(wù)分割的工作。ForkJoinPool 也在 java.util.concurrent 包中,其完整類名為 java.util.concurrent.ForkJoinPool。
創(chuàng)建一個 ForkJoinPool
你可以通過其構(gòu)造子創(chuàng)建一個 ForkJoinPool。作為傳遞給 ForkJoinPool 構(gòu)造子的一個參數(shù),你可以定義你期望的并行級別。并行級別表示你想要傳遞給 ForkJoinPool 的任務(wù)所需的線程或 CPU 數(shù)量。以下是一個 ForkJoinPool 示例:
// 這個示例創(chuàng)建了一個并行級別為 4 的 ForkJoinPool。 如果是默認構(gòu)造會自動識別當(dāng)前電腦的cup核數(shù)進行并行 ForkJoinPool forkJoinPool = new ForkJoinPool(4);
提交任務(wù)到 ForkJoinPool
就像提交任務(wù)到 ExecutorService 那樣,把任務(wù)提交到 ForkJoinPool。你可以提交兩種類型的任務(wù)。一種是沒有任何返回值的(一個 “行動”),另一種是有返回值的(一個"任務(wù)")。這兩種類型分別由 RecursiveAction 和 RecursiveTask 表示。接下來介紹如何使用這兩種類型的任務(wù),以及如何對它們進行提交。
RecursiveAction
RecursiveAction 是一種沒有任何返回值的任務(wù)。它只是做一些工作,比如寫數(shù)據(jù)到磁盤,然后就退出了。一個 RecursiveAction 可以把自己的工作分割成更小的幾塊,這樣它們可以由獨立的線程或者 CPU 執(zhí)行。
你可以通過繼承來實現(xiàn)一個 RecursiveAction。示例如下:
package com;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
public class MyRecursiveAction extends RecursiveAction {
private long workLoad = 0;
public MyRecursiveAction(long workLoad) {
this.workLoad = workLoad;
}
@Override
protected void compute() {
//如果工作超出閾值,將任務(wù)分解成更小的任務(wù)
if(this.workLoad > 10) {
System.out.println("將工作負載 : " + this.workLoad);
//將工作負載分成多個子任務(wù)
List<MyRecursiveAction> subtasks =
new ArrayList<MyRecursiveAction>();
subtasks.addAll(createSubtasks());
//將子任務(wù)加入到任務(wù)隊列中
for(RecursiveAction subtask : subtasks){
subtask.fork();
}
} else {
System.out.println("自己完成工作量: " + this.workLoad);
}
}
//將工作負載分成多個子任務(wù)
private List<MyRecursiveAction> createSubtasks() {
List<MyRecursiveAction> subtasks = new ArrayList<MyRecursiveAction>();
//將工作負載分成兩個子任務(wù) 24/2=12 12/2=6
MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);
MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);
subtasks.add(subtask1);
subtasks.add(subtask2);
return subtasks;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24);
forkJoinPool.invoke(myRecursiveAction);
}
}

RecursiveTask
RecursiveTask 是一種會返回結(jié)果的任務(wù)。它可以將自己的工作分割為若干更小任務(wù),并將這些子任務(wù)的執(zhí)行結(jié)果合并到一個集體結(jié)果。用法和RecursiveAction一樣唯一不同的就是可以返回值
以下是一個 RecursiveTask 示例:
package com;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
//配置RecursiveTask,返回值為Long
public class MyRecursiveTask extends RecursiveTask<Long> {
private long workLoad = 0;
public MyRecursiveTask (long workLoad) {
this.workLoad = workLoad;
}
@Override
protected Long compute() {
//如果工作超出閾值,將任務(wù)分解成更小的任務(wù)
if(this.workLoad > 10) {
System.out.println("將工作負載 : " + this.workLoad);
//將工作負載分成多個子任務(wù)
List<MyRecursiveTask > subtasks = new ArrayList<MyRecursiveTask >();
subtasks.addAll(createSubtasks());
//將子任務(wù)加入到任務(wù)隊列中
for(RecursiveTask subtask : subtasks){
subtask.fork();
}
//等待子任務(wù)執(zhí)行完,并得到其結(jié)果,并將結(jié)果相加
long result = 0;
for(MyRecursiveTask subtask : subtasks) {
result += subtask.join();
}
return result;
} else {
System.out.println("自己完成工作量: " + this.workLoad);
return 1L ;//返回計算結(jié)果
}
}
//將工作負載分成多個子任務(wù)
private List<MyRecursiveTask > createSubtasks() {
List<MyRecursiveTask > subtasks = new ArrayList<MyRecursiveTask >();
//將工作負載分成兩個子任務(wù) 24/2=12 12/2=6
MyRecursiveTask subtask1 = new MyRecursiveTask (this.workLoad / 2);
MyRecursiveTask subtask2 = new MyRecursiveTask (this.workLoad / 2);
subtasks.add(subtask1);
subtasks.add(subtask2);
return subtasks;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
MyRecursiveTask myRecursiveAction = new MyRecursiveTask(24);
Long invoke = forkJoinPool.invoke(myRecursiveAction);
System.out.println("最終結(jié)果: " + invoke);//4 從結(jié)果可以看出,任務(wù)被分成了4個子任務(wù),每個子任務(wù)都是一個線程
}
}

MyRecursiveTask 類繼承自 RecursiveTask,這也就意味著它將返回一個 Long 類型的結(jié)果。MyRecursiveTask 示例也會將工作分割為子任務(wù),并通過 fork() 方法對這些子任務(wù)計劃執(zhí)行。此外,本示例還通過調(diào)用每個子任務(wù)的 join() 方法收集它們返回的結(jié)果。子任務(wù)的結(jié)果隨后被合并到一個更大的結(jié)果,并最終將其返回。對于不同級別的遞歸,這種子任務(wù)的結(jié)果合并可能會發(fā)生遞歸。
Fork/Join 案例Demo
需求:使用 Fork/Join 計算 1-10000的和,當(dāng)一個任務(wù)的計算數(shù)量大于3000時拆分任務(wù),數(shù)量小于3000時計算。

因為1~10000求和,耗時較少。下面我們將數(shù)據(jù)調(diào)大,求和1 ~ 59999999999(599億),然后來對比一下使用 Fork/Join求和 和 普通求和之間的效率差異。
普通求和
public static void main(String[] args) {
//開始時間
Long start = System.currentTimeMillis();
long sum = 0l;
for (long i = 1; i <= 59999999999L; i++) {
sum+=i;
}
System.out.println(sum); //結(jié)果為負數(shù),因為超出了long的最大值了 ,平均消耗時間:16秒
//結(jié)束時間
Long end = System.currentTimeMillis();
System.out.println("消耗時間:"+(end-start));
}
Fork/Join求和
package com;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
//配置RecursiveTask,返回值為Long
public class SumRecursiveTask extends RecursiveTask<Long> {
//大于3000要拆分(創(chuàng)建一個變量)
//是否要拆分的臨界值
private static final long THRESHOLD = 3000L;
//起始值
private final long start;
//結(jié)束值
private final long end;
//構(gòu)造方法(傳遞起始值、結(jié)束值)
public SumRecursiveTask(long start, long end) {
this.start = start;
this.end = end;
}
//任務(wù)編寫完成
@Override
protected Long compute() {
long length = end - start;
//計算
if(length < THRESHOLD){
long sum = 0;
for (long i = start; i <= end; i++) {
sum +=i;
}
return sum;
}else{
//拆分
long middle = (start + end) /2;
SumRecursiveTask left = new SumRecursiveTask(start,middle);//從小到大
left.fork();
SumRecursiveTask right = new SumRecursiveTask(middle+1,end);//從大到小
right.fork();
return left.join() +right.join();
}
}
public static void main(String[] args) {
Long start = System.currentTimeMillis();
//放入線程池
ForkJoinPool pool = new ForkJoinPool();
SumRecursiveTask task = new SumRecursiveTask(1, 59999999999L);
Long result = pool.invoke(task);
System.out.println("result="+result); //結(jié)果為負數(shù),因為超出了long的最大值了 ,平均消耗時間:4秒
Long end = System.currentTimeMillis();
System.out.println("消耗時間:"+(end-start));
}
}
總結(jié): 可以發(fā)現(xiàn)使用工作竊取算法能大大的提高我們計算的速度,理論上只要你電腦足夠快這個提升是沒有上限的 ,前提是任務(wù)是可拆分的
到此這篇關(guān)于Java線程池ForkJoinPool(工作竊取算法)的使用的文章就介紹到這了,更多相關(guān)Java線程池ForkJoinPool內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
基于Java+SpringBoot+Vue前后端分離實現(xiàn)倉庫管理系統(tǒng)
這篇文章主要介紹了一個完整的倉庫管理系統(tǒng)是基于Java+Springboot + Vue前后端分離編寫的,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-06-06
使用CXF和Jersey框架來進行Java的WebService編程
這篇文章主要介紹了使用CXF和Jersey框架來進行Java的WebService編程,Web service是一個平臺獨立的低耦合的自包含的基于可編程的web的應(yīng)用程序,需要的朋友可以參考下2015-12-12
Java多線程程序中synchronized修飾方法的使用實例
synchronized關(guān)鍵字主要北用來進行線程同步,這里我們主要來演示Java多線程程序中synchronized修飾方法的使用實例,需要的朋友可以參考下:2016-06-06
SpringBoot解決同名類導(dǎo)致的bean名沖突bean name conflicts問題
這篇文章主要介紹了SpringBoot解決同名類導(dǎo)致的bean名沖突bean name conflicts問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-06-06
SpringBoot項目實現(xiàn)統(tǒng)一異常處理的最佳方案
在前后端分離的項目開發(fā)過程中,我們通常會對數(shù)據(jù)返回格式進行統(tǒng)一的處理,這樣可以方便前端人員取數(shù)據(jù),后端發(fā)生異常時同樣會使用此格式將異常信息返回給前端,本文介紹了如何在SpringBoot項目中實現(xiàn)統(tǒng)一異常處理,如有錯誤,還望批評指正2024-02-02

