詳解Java中的ForkJoin
ForkJoin簡(jiǎn)介
Fork/Join框架是Java 7提供的一種用于并行執(zhí)行任務(wù)的框架,它將大任務(wù)分解為若干個(gè)小任務(wù),并行執(zhí)行這些小任務(wù),最終通過(guò)合并每個(gè)小任務(wù)的結(jié)果得到大任務(wù)的結(jié)果。
Fork/Join采用的是分而治之的基本思想,分而治之就是將一個(gè)復(fù)雜的任務(wù),按照規(guī)定的閾值劃分成多個(gè)簡(jiǎn)單的小任務(wù),然后將這些小任務(wù)的結(jié)果再進(jìn)行匯總返回,得到最終的任務(wù)。
并行和并發(fā)的區(qū)別
并行和并發(fā)是計(jì)算機(jī)科學(xué)中的兩個(gè)概念,它們之間有一些相似之處,但也有明顯的區(qū)別。
并行是指多個(gè)處理器或者是多核的處理器同時(shí)處理多個(gè)不同的任務(wù)。并行可以在多處理器系統(tǒng)中實(shí)現(xiàn),利用每個(gè)處理機(jī)來(lái)處理一個(gè)可并發(fā)執(zhí)行的程序,從而實(shí)現(xiàn)多個(gè)程序的同時(shí)執(zhí)行。在并行執(zhí)行時(shí),每個(gè)處理器可以同時(shí)執(zhí)行多個(gè)程序,從而提高計(jì)算效率。
并發(fā)是指邏輯上的同時(shí)發(fā)生(即 true 的同時(shí)性),而并行是物理上的同時(shí)發(fā)生。在多道程序環(huán)境下,并發(fā)性是指在一段時(shí)間內(nèi)宏觀上有多個(gè)程序在同時(shí)運(yùn)行,但在單處理機(jī)系統(tǒng)中,每一時(shí)刻卻僅能有一道程序執(zhí)行,故微觀上這些程序只能是分時(shí)地交替執(zhí)行。
簡(jiǎn)而言之,并行是指多個(gè)處理器或多核處理器同時(shí)處理多個(gè)任務(wù),而并發(fā)是指在同一時(shí)間內(nèi)多個(gè)任務(wù)同時(shí)發(fā)生。
工作竊取算法
工作竊取算法是指某個(gè)線程從其他隊(duì)列里竊取任務(wù)來(lái)執(zhí)行。當(dāng)工作隊(duì)列中有空閑任務(wù)時(shí),就將任務(wù)從原線程的隊(duì)列中竊取過(guò)來(lái),執(zhí)行完成后再將結(jié)果返回給原線程。這樣就保證了原線程不會(huì)一直等待空閑任務(wù),從而提高了程序的效率。
Fork/Join框架使用ForkJoinPool這個(gè)特殊的線程池來(lái)處理任務(wù)之間有依賴的情況,其實(shí)現(xiàn)了“work-stealing”算法(工作量竊取算法)并執(zhí)行ForkJoinTask對(duì)象。ForkJoinPool保持多個(gè)線程,其線程數(shù)量默認(rèn)為機(jī)器cpu核心數(shù)。每個(gè)線程都有一個(gè)特殊類型的deques隊(duì)列(雙端隊(duì)列),放置該線程的所有任務(wù),而不是所有線程共享一個(gè)公共隊(duì)列。
每個(gè)線程都會(huì)保證將自己隊(duì)列中的任務(wù)執(zhí)行完,當(dāng)自己的任務(wù)執(zhí)行完成之后,在去看其他線程的任務(wù)隊(duì)列中是否有未處理完的任務(wù),如果有則會(huì)幫助其他線程執(zhí)行。
這時(shí)雙端隊(duì)列的優(yōu)勢(shì)就體現(xiàn)出來(lái)了,被竊取的任務(wù)只會(huì)從隊(duì)列的頭部獲取任務(wù),而正常處理的線程每次都是從隊(duì)列的尾部獲取任務(wù)。
求1到1億的和
package com.fandf.test.forkjoin; import lombok.extern.slf4j.Slf4j; import org.springframework.util.StopWatch; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; /** * @author fandongfeng */ @Slf4j public class ForkJoinDemo extends RecursiveTask<Long> { /** * 小任務(wù)的大小閾值 */ public static final int TASK_SIZE = 100000; /** * 開始數(shù)字 */ private final Long start; /** * 結(jié)束數(shù)字 */ private final Long end; public ForkJoinDemo(Long start, Long end) { this.start = start; this.end = end; } @Override protected Long compute() { long sum = 0L; //如果任務(wù)足夠小就計(jì)算任務(wù) boolean canCompute = (end - start) <= TASK_SIZE; if (canCompute) { for (Long i = start; i <= end; i++) { sum += i; } } else { // 如果任務(wù)大于閾值,就分裂成兩個(gè)子任務(wù)計(jì)算 long middle = (start + end) / 2; ForkJoinDemo leftTask = new ForkJoinDemo(start, middle); ForkJoinDemo rightTask = new ForkJoinDemo(middle + 1, end); // 執(zhí)行子任務(wù) leftTask.fork(); rightTask.fork(); // 等待任務(wù)執(zhí)行結(jié)束合并其結(jié)果 Long leftResult = leftTask.join(); Long rightResult = rightTask.join(); // 合并子任務(wù) sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkjoinPool = new ForkJoinPool(); //生成一個(gè)計(jì)算任務(wù),計(jì)算1+2+3+4+...+100000000 ForkJoinDemo task = new ForkJoinDemo(1L, 100000000L); StopWatch stopWatch = new StopWatch(); stopWatch.start(); //執(zhí)行一個(gè)任務(wù) Future<Long> result = forkjoinPool.submit(task); try { System.out.println("result:" + result.get()); } catch (Exception e) { log.error("exception", e); } stopWatch.stop(); System.out.println("總耗時(shí):" + stopWatch.getTotalTimeMillis() + "毫秒"); System.out.println("getParallelism:" + forkjoinPool.getParallelism()); System.out.println("getPoolSize:" + forkjoinPool.getPoolSize()); } }
輸出結(jié)果
result:5000000050000000
總耗時(shí):330毫秒
getParallelism:6
getPoolSize:7
ForkJoin框架實(shí)現(xiàn)
ForkJoinPool
ForkJoinPool
是用于運(yùn)行ForkJoinTasks
的線程池,實(shí)現(xiàn)了Executor
接口
public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); }
ForkJoinPool構(gòu)造方法有四個(gè)參數(shù):
- parallelism:期望并發(fā)數(shù)。默認(rèn)會(huì)使用
Runtime.getRuntime().availableProcessors()
的值 - factory:創(chuàng)建ForkJoin工作線程的工廠,默認(rèn)為defaultForkJoinWorkerThreadFactory
- handler:執(zhí)行任務(wù)時(shí)遇到不可恢復(fù)的錯(cuò)誤時(shí)的處理程序,默認(rèn)為null
- asyncMode:工作線程獲取任務(wù)使用FIFO(先進(jìn)先出)模式還是LIFO(后進(jìn)先出)模式,默認(rèn)為L(zhǎng)IFO
ForkJoinTask
ForkJoinTask
是對(duì)于在ForkJoinPool
中運(yùn)行任務(wù)的抽象類定義。
JDK為我們提供了三種特定類型的ForkJoinTask父類供我們自定義時(shí)繼承使用。
- RecursiveAction:子任務(wù)不返回結(jié)果
- RecursiveTask:子任務(wù)返回結(jié)果
- CountedCompleter:在任務(wù)完成執(zhí)行后會(huì)觸發(fā)執(zhí)行
ForkJoinWorkerThread
ForkJoinPool
中用于執(zhí)行ForkJoinTask
的線程。
ForkJoinPool實(shí)現(xiàn)了Executor接口。但是和我們常用的ThreadPoolExecutor又有一些區(qū)別。
如果使用ThreadPoolExecutor來(lái)實(shí)現(xiàn)上面分治任務(wù),那么每個(gè)子任務(wù)都需要?jiǎng)?chuàng)建一個(gè)線程,如果子任務(wù)的數(shù)量很大,假設(shè)有上萬(wàn)個(gè),那么使用ThreadPoolExecutor創(chuàng)建出上萬(wàn)個(gè)線程,這顯然是不可行也不合理的;
而ForkJoinPool在處理任務(wù)時(shí),并不會(huì)按照任務(wù)開啟線程,而是按照指定的期望并行數(shù)量創(chuàng)建線程。在每個(gè)線程工作時(shí),如果需要繼續(xù)拆分子任務(wù),則會(huì)將當(dāng)前任務(wù)放入ForkJoinWorkerThread的任務(wù)隊(duì)列中,遞歸處理直到最外層的任務(wù)。
ForkJoinTask啟動(dòng)方式
- 異步執(zhí)行
forkjoinPool.execute(task);無(wú)返回結(jié)果 - 同步執(zhí)行
forkjoinPool.invoke(task);等待返回結(jié)果 - 異步執(zhí)行,通過(guò)Future獲取結(jié)果
forkjoinPool.submit(task);
總結(jié)
在使用Fork/Join框架時(shí),需要注意以下幾點(diǎn):
- 必須首先創(chuàng)建一個(gè)ForkJoinTask對(duì)象。
- 在分發(fā)任務(wù)時(shí),需要注意線程安全問(wèn)題,防止多個(gè)線程同時(shí)訪問(wèn)共享資源。可以使用synchronized關(guān)鍵字或者Lock對(duì)象來(lái)保證線程安全。
- 在合并結(jié)果時(shí),也需要注意線程安全問(wèn)題,可以使用CountDownLatch對(duì)象來(lái)確保每個(gè)Fork執(zhí)行完成后才能提交結(jié)果。
- 在使用Fork/Join框架時(shí),需要考慮算法的效率和性能問(wèn)題??梢允褂肅ache技術(shù)來(lái)減少不必要的計(jì)算,使用join策略來(lái)合并結(jié)果等。
總之,F(xiàn)ork/Join框架是一種非常有用的并行計(jì)算框架,可以大大提高程序的執(zhí)行效率和并發(fā)能力。
以上就是詳解Java中的ForkJoin的詳細(xì)內(nèi)容,更多關(guān)于Java ForkJoin的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java超詳細(xì)精講數(shù)據(jù)結(jié)構(gòu)之bfs與雙端隊(duì)列
廣搜BFS的基本思想是: 首先訪問(wèn)初始點(diǎn)v并將其標(biāo)志為已經(jīng)訪問(wèn)。接著通過(guò)鄰接關(guān)系將鄰接點(diǎn)入隊(duì)。然后每訪問(wèn)過(guò)一個(gè)頂點(diǎn)則出隊(duì)。按照順序,訪問(wèn)每一個(gè)頂點(diǎn)的所有未被訪問(wèn)過(guò)的頂點(diǎn)直到所有的頂點(diǎn)均被訪問(wèn)過(guò)。廣度優(yōu)先遍歷類似與層次遍歷2022-07-07Java中的HashMap弱引用之WeakHashMap詳解
這篇文章主要介紹了Java中的HashMap弱引用之WeakHashMap詳解,當(dāng)內(nèi)存空間不足,Java虛擬機(jī)寧愿拋出OutOfMemoryError錯(cuò)誤,使程序異常終止,也不會(huì)靠隨意回收具有強(qiáng)引用的對(duì)象來(lái)解決內(nèi)存不足的問(wèn)題,需要的朋友可以參考下2023-09-09java 模仿拼多多紅包遞減算法的實(shí)現(xiàn)
這篇文章主要介紹了java 模仿拼多多紅包遞減算法的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-02-02Spring?Boot?集成JWT實(shí)現(xiàn)前后端認(rèn)證的示例代碼
小程序、H5應(yīng)用的快速發(fā)展,使得前后端分離已經(jīng)成為了趨勢(shì),本文主要介紹了Spring?Boot?集成JWT實(shí)現(xiàn)前后端認(rèn)證,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-04-04springMVC+ajax實(shí)現(xiàn)文件上傳且?guī)нM(jìn)度條實(shí)例
本篇文章主要介紹了springMVC+ajax實(shí)現(xiàn)文件上傳且?guī)нM(jìn)度條實(shí)例,具有一定的參考價(jià)值,有興趣的可以了解一下。2017-01-01SpringBoot實(shí)現(xiàn)權(quán)限驗(yàn)證的示例步驟
權(quán)限驗(yàn)證是一種用于控制對(duì)系統(tǒng)資源和操作的訪問(wèn)的機(jī)制。它允許開發(fā)人員定義誰(shuí)可以執(zhí)行特定操作或訪問(wèn)特定資源,并確保只有經(jīng)過(guò)授權(quán)的用戶才能執(zhí)行這些操作,這篇文章主要介紹了SpringBoot實(shí)現(xiàn)權(quán)限驗(yàn)證,需要的朋友可以參考下2023-08-08java使用JSCH實(shí)現(xiàn)SFTP文件管理
這篇文章主要為大家詳細(xì)介紹了java使用JSCH實(shí)現(xiàn)SFTP文件管理,實(shí)現(xiàn)上傳、下載等功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-08-08微信公眾號(hào)測(cè)試賬號(hào)自定義菜單的實(shí)例代碼
這篇文章主要介紹了微信公眾號(hào)測(cè)試賬號(hào)自定義菜單的實(shí)例代碼,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2017-02-02