Java并發(fā)工具Fork/Join原理
正文
我們一直講,并發(fā)編程可以分為三個(gè)層面的問(wèn)題,分別是分工、協(xié)作和互斥,當(dāng)你關(guān)注于任務(wù)的時(shí)候,你會(huì)發(fā)現(xiàn)你的視角已經(jīng)從并發(fā)編程的細(xì)節(jié)中跳出來(lái)了,你應(yīng)用的更多的是現(xiàn)實(shí)世界的思維模式,類(lèi)比的往往是現(xiàn)實(shí)世界里的分工,所以我把線(xiàn)程池、Future、CompletableFuture和CompletionService都列到了分工里面。
下面我用現(xiàn)實(shí)世界里的工作流程圖描述了并發(fā)編程領(lǐng)域的簡(jiǎn)單并行任務(wù)、聚合任務(wù)和批量并行任務(wù),輔以這些流程圖,相信你一定能將你的思維模式轉(zhuǎn)換到現(xiàn)實(shí)世界里來(lái)。
從上到下,依次為簡(jiǎn)單并行任務(wù)、聚合任務(wù)和批量并行任務(wù)示意圖
上面提到的簡(jiǎn)單并行、聚合、批量并行這三種任務(wù)模型,基本上能夠覆蓋日常工作中的并發(fā)場(chǎng)景了,但還是不夠全面,因?yàn)檫€有一種“分治”的任務(wù)模型沒(méi)有覆蓋到。 分治,顧名思義,即分而治之,是一種解決復(fù)雜問(wèn)題的思維方法和模式;具體來(lái)講,指的是 把一個(gè)復(fù)雜的問(wèn)題分解成多個(gè)相似的子問(wèn)題,然后再把子問(wèn)題分解成更小的子問(wèn)題,直到子問(wèn)題簡(jiǎn)單到可以直接求解。理論上來(lái)講,解決每一個(gè)問(wèn)題都對(duì)應(yīng)著一個(gè)任務(wù),所以對(duì)于問(wèn)題的分治,實(shí)際上就是對(duì)于任務(wù)的分治。
分治思想在很多領(lǐng)域都有廣泛的應(yīng)用,例如算法領(lǐng)域有分治算法(歸并排序、快速排序都屬于分治算法,二分法查找也是一種分治算法);大數(shù)據(jù)領(lǐng)域知名的計(jì)算框架MapReduce背后的思想也是分治。既然分治這種任務(wù)模型如此普遍,那Java顯然也需要支持,Java并發(fā)包里提供了一種叫做Fork/Join的并行計(jì)算框架,就是用來(lái)支持分治這種任務(wù)模型的。
分治任務(wù)模型
這里你需要先深入了解一下分治任務(wù)模型,分治任務(wù)模型可分為兩個(gè)階段:一個(gè)階段是 任務(wù)分解,也就是將任務(wù)迭代地分解為子任務(wù),直至子任務(wù)可以直接計(jì)算出結(jié)果;另一個(gè)階段是 結(jié)果合并,即逐層合并子任務(wù)的執(zhí)行結(jié)果,直至獲得最終結(jié)果。下圖是一個(gè)簡(jiǎn)化的分治任務(wù)模型圖,你可以對(duì)照著理解。
簡(jiǎn)版分治任務(wù)模型圖
在這個(gè)分治任務(wù)模型里,任務(wù)和分解后的子任務(wù)具有相似性,這種相似性往往體現(xiàn)在任務(wù)和子任務(wù)的算法是相同的,但是計(jì)算的數(shù)據(jù)規(guī)模是不同的。具備這種相似性的問(wèn)題,我們往往都采用遞歸算法。
Fork/Join的使用
Fork/Join是一個(gè)并行計(jì)算的框架,主要就是用來(lái)支持分治任務(wù)模型的,這個(gè)計(jì)算框架里的 Fork對(duì)應(yīng)的是分治任務(wù)模型里的任務(wù)分解,Join對(duì)應(yīng)的是結(jié)果合并。Fork/Join計(jì)算框架主要包含兩部分,一部分是 分治任務(wù)的線(xiàn)程池ForkJoinPool,另一部分是 分治任務(wù)ForkJoinTask。這兩部分的關(guān)系類(lèi)似于ThreadPoolExecutor和Runnable的關(guān)系,都可以理解為提交任務(wù)到線(xiàn)程池,只不過(guò)分治任務(wù)有自己獨(dú)特類(lèi)型ForkJoinTask。
ForkJoinTask是一個(gè)抽象類(lèi),它的方法有很多,最核心的是fork()方法和join()方法,其中fork()方法會(huì)異步地執(zhí)行一個(gè)子任務(wù),而join()方法則會(huì)阻塞當(dāng)前線(xiàn)程來(lái)等待子任務(wù)的執(zhí)行結(jié)果。ForkJoinTask有兩個(gè)子類(lèi)——RecursiveAction和RecursiveTask,通過(guò)名字你就應(yīng)該能知道,它們都是用遞歸的方式來(lái)處理分治任務(wù)的。這兩個(gè)子類(lèi)都定義了抽象方法compute(),不過(guò)區(qū)別是RecursiveAction定義的compute()沒(méi)有返回值,而RecursiveTask定義的compute()方法是有返回值的。這兩個(gè)子類(lèi)也是抽象類(lèi),在使用的時(shí)候,需要你定義子類(lèi)去擴(kuò)展。
接下來(lái)我們就來(lái)實(shí)現(xiàn)一下,看看如何用Fork/Join這個(gè)并行計(jì)算框架計(jì)算斐波那契數(shù)列(下面的代碼源自Java官方示例)。首先我們需要?jiǎng)?chuàng)建一個(gè)分治任務(wù)線(xiàn)程池以及計(jì)算斐波那契數(shù)列的分治任務(wù),之后通過(guò)調(diào)用分治任務(wù)線(xiàn)程池的 invoke() 方法來(lái)啟動(dòng)分治任務(wù)。由于計(jì)算斐波那契數(shù)列需要有返回值,所以Fibonacci 繼承自RecursiveTask。分治任務(wù)Fibonacci 需要實(shí)現(xiàn)compute()方法,這個(gè)方法里面的邏輯和普通計(jì)算斐波那契數(shù)列非常類(lèi)似,區(qū)別之處在于計(jì)算 Fibonacci(n - 1)
使用了異步子任務(wù),這是通過(guò) f1.fork()
這條語(yǔ)句實(shí)現(xiàn)的。
static void main(String[] args){ //創(chuàng)建分治任務(wù)線(xiàn)程池 ForkJoinPool fjp = new ForkJoinPool(4); //創(chuàng)建分治任務(wù) Fibonacci fib = new Fibonacci(30); //啟動(dòng)分治任務(wù) Integer result = fjp.invoke(fib); //輸出結(jié)果 System.out.println(result); } //遞歸任務(wù) static class Fibonacci extends RecursiveTask<Integer>{ final int n; Fibonacci(int n){this.n = n;} protected Integer compute(){ if (n <= 1) return n; Fibonacci f1 = new Fibonacci(n - 1); //創(chuàng)建子任務(wù) f1.fork(); Fibonacci f2 = new Fibonacci(n - 2); //等待子任務(wù)結(jié)果,并合并結(jié)果 return f2.compute() + f1.join(); } }
ForkJoinPool工作原理
Fork/Join并行計(jì)算的核心組件是ForkJoinPool,所以下面我們就來(lái)簡(jiǎn)單介紹一下ForkJoinPool的工作原理。
ThreadPoolExecutor本質(zhì)上是一個(gè)生產(chǎn)者-消費(fèi)者模式的實(shí)現(xiàn),內(nèi)部有一個(gè)任務(wù)隊(duì)列,這個(gè)任務(wù)隊(duì)列是生產(chǎn)者和消費(fèi)者通信的媒介;ThreadPoolExecutor可以有多個(gè)工作線(xiàn)程,但是這些工作線(xiàn)程都共享一個(gè)任務(wù)隊(duì)列。
ForkJoinPool本質(zhì)上也是一個(gè)生產(chǎn)者-消費(fèi)者的實(shí)現(xiàn),但是更加智能,你可以參考下面的ForkJoinPool工作原理圖來(lái)理解其原理。ThreadPoolExecutor內(nèi)部只有一個(gè)任務(wù)隊(duì)列,而ForkJoinPool內(nèi)部有多個(gè)任務(wù)隊(duì)列,當(dāng)我們通過(guò)ForkJoinPool的invoke()或者submit()方法提交任務(wù)時(shí),F(xiàn)orkJoinPool根據(jù)一定的路由規(guī)則把任務(wù)提交到一個(gè)任務(wù)隊(duì)列中,如果任務(wù)在執(zhí)行過(guò)程中會(huì)創(chuàng)建出子任務(wù),那么子任務(wù)會(huì)提交到工作線(xiàn)程對(duì)應(yīng)的任務(wù)隊(duì)列中。
如果工作線(xiàn)程對(duì)應(yīng)的任務(wù)隊(duì)列空了,是不是就沒(méi)活兒干了呢?不是的,F(xiàn)orkJoinPool支持一種叫做“ 任務(wù)竊取”的機(jī)制,如果工作線(xiàn)程空閑了,那它可以“竊取”其他工作任務(wù)隊(duì)列里的任務(wù),例如下圖中,線(xiàn)程T2對(duì)應(yīng)的任務(wù)隊(duì)列已經(jīng)空了,它可以“竊取”線(xiàn)程T1對(duì)應(yīng)的任務(wù)隊(duì)列的任務(wù)。如此一來(lái),所有的工作線(xiàn)程都不會(huì)閑下來(lái)了。
ForkJoinPool中的任務(wù)隊(duì)列采用的是雙端隊(duì)列,工作線(xiàn)程正常獲取任務(wù)和“竊取任務(wù)”分別是從任務(wù)隊(duì)列不同的端消費(fèi),這樣能避免很多不必要的數(shù)據(jù)競(jìng)爭(zhēng)。我們這里介紹的僅僅是簡(jiǎn)化后的原理,F(xiàn)orkJoinPool的實(shí)現(xiàn)遠(yuǎn)比我們這里介紹的復(fù)雜,如果你感興趣,建議去看它的源碼。
ForkJoinPool工作原理圖
模擬MapReduce統(tǒng)計(jì)單詞數(shù)量
學(xué)習(xí)MapReduce有一個(gè)入門(mén)程序,統(tǒng)計(jì)一個(gè)文件里面每個(gè)單詞的數(shù)量,下面我們來(lái)看看如何用Fork/Join并行計(jì)算框架來(lái)實(shí)現(xiàn)。
我們可以先用二分法遞歸地將一個(gè)文件拆分成更小的文件,直到文件里只有一行數(shù)據(jù),然后統(tǒng)計(jì)這一行數(shù)據(jù)里單詞的數(shù)量,最后再逐級(jí)匯總結(jié)果,你可以對(duì)照前面的簡(jiǎn)版分治任務(wù)模型圖來(lái)理解這個(gè)過(guò)程。
思路有了,我們馬上來(lái)實(shí)現(xiàn)。下面的示例程序用一個(gè)字符串?dāng)?shù)組 String[] fc
來(lái)模擬文件內(nèi)容,fc里面的元素與文件里面的行數(shù)據(jù)一一對(duì)應(yīng)。關(guān)鍵的代碼在 compute()
這個(gè)方法里面,這是一個(gè)遞歸方法,前半部分?jǐn)?shù)據(jù)fork一個(gè)遞歸任務(wù)去處理(關(guān)鍵代碼mr1.fork()),后半部分?jǐn)?shù)據(jù)則在當(dāng)前任務(wù)中遞歸處理(mr2.compute())。
static void main(String[] args){ String[] fc = {"hello world", "hello me", "hello fork", "hello join", "fork join in world"}; //創(chuàng)建ForkJoin線(xiàn)程池 ForkJoinPool fjp = new ForkJoinPool(3); //創(chuàng)建任務(wù) MR mr = new MR( fc, 0, fc.length); //啟動(dòng)任務(wù) Map<String, Long> result = fjp.invoke(mr); //輸出結(jié)果 result.forEach((k, v)-> System.out.println(k+":"+v)); } //MR模擬類(lèi) static class MR extends RecursiveTask<Map<String, Long>> { private String[] fc; private int start, end; //構(gòu)造函數(shù) MR(String[] fc, int fr, int to){ this.fc = fc; this.start = fr; this.end = to; } @Override protected Map<String, Long> compute(){ if (end - start == 1) { return calc(fc[start]); } else { int mid = (start+end)/2; MR mr1 = new MR( fc, start, mid); mr1.fork(); MR mr2 = new MR( fc, mid, end); //計(jì)算子任務(wù),并返回合并的結(jié)果 return merge(mr2.compute(), mr1.join()); } } //合并結(jié)果 private Map<String, Long> merge( Map<String, Long> r1, Map<String, Long> r2) { Map<String, Long> result = new HashMap<>(); result.putAll(r1); //合并結(jié)果 r2.forEach((k, v) -> { Long c = result.get(k); if (c != null) result.put(k, c+v); else result.put(k, v); }); return result; } //統(tǒng)計(jì)單詞數(shù)量 private Map<String, Long> calc(String line) { Map<String, Long> result = new HashMap<>(); //分割單詞 String [] words = line.split("\\s+"); //統(tǒng)計(jì)單詞數(shù)量 for (String w : words) { Long v = result.get(w); if (v != null) result.put(w, v+1); else result.put(w, 1L); } return result; } }
總結(jié)
Fork/Join并行計(jì)算框架主要解決的是分治任務(wù)。分治的核心思想是“分而治之”:將一個(gè)大的任務(wù)拆分成小的子任務(wù)去解決,然后再把子任務(wù)的結(jié)果聚合起來(lái)從而得到最終結(jié)果。這個(gè)過(guò)程非常類(lèi)似于大數(shù)據(jù)處理中的MapReduce,所以你可以把Fork/Join看作單機(jī)版的MapReduce。
Fork/Join并行計(jì)算框架的核心組件是ForkJoinPool。ForkJoinPool支持任務(wù)竊取機(jī)制,能夠讓所有線(xiàn)程的工作量基本均衡,不會(huì)出現(xiàn)有的線(xiàn)程很忙,而有的線(xiàn)程很閑的狀況,所以性能很好。Java 1.8提供的Stream API里面并行流也是以ForkJoinPool為基礎(chǔ)的。不過(guò)需要你注意的是,默認(rèn)情況下所有的并行流計(jì)算都共享一個(gè)ForkJoinPool,這個(gè)共享的ForkJoinPool默認(rèn)的線(xiàn)程數(shù)是CPU的核數(shù);如果所有的并行流計(jì)算都是CPU密集型計(jì)算的話(huà),完全沒(méi)有問(wèn)題,但是如果存在I/O密集型的并行流計(jì)算,那么很可能會(huì)因?yàn)橐粋€(gè)很慢的I/O計(jì)算而拖慢整個(gè)系統(tǒng)的性能。所以 建議用不同的ForkJoinPool執(zhí)行不同類(lèi)型的計(jì)算任務(wù)。
以上就是Java并發(fā)工具Fork/Join原理的詳細(xì)內(nèi)容,更多關(guān)于Java并發(fā)工具Fork/Join原理的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring Cloud Alibaba之Sentinel實(shí)現(xiàn)熔斷限流功能
這篇文章主要介紹了Spring Cloud Alibaba之Sentinel,這里使用阿里的sentinel來(lái)實(shí)現(xiàn)熔斷限流功能,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04dubbo服務(wù)引用之創(chuàng)建Invoker流程詳解
這篇文章主要為大家介紹了dubbo服務(wù)引用二之創(chuàng)建Invoker流程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08聽(tīng)說(shuō)用了YYYY-MM-dd的程序員,前些天都在加班改Bug
這篇文章主要介紹了YYYY-MM-dd的實(shí)用方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-01-01總結(jié)Java集合類(lèi)操作優(yōu)化經(jīng)驗(yàn)
本文主要介紹的就是集合框架的使用經(jīng)驗(yàn),告訴大家如何高效、方便地管理對(duì)象,所有代碼基于JDK7,需要的朋友可以參考下2015-08-08springboot項(xiàng)目關(guān)閉swagger如何防止漏洞掃描
這篇文章主要介紹了springboot項(xiàng)目關(guān)閉swagger如何防止漏洞掃描,本文通過(guò)示例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-05-05HTTPClient如何在Springboot中封裝工具類(lèi)
這篇文章主要介紹了HTTPClient如何在Springboot中封裝工具類(lèi)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09基于Java實(shí)現(xiàn)的圖的廣度優(yōu)先遍歷算法
這篇文章主要介紹了基于Java實(shí)現(xiàn)的圖的廣度優(yōu)先遍歷算法,需要的朋友可以參考下2014-07-07