Java中Future接口詳解
一、背景
在系統(tǒng)中,異步執(zhí)行任務(wù),是很常見(jiàn)的功能邏輯,但是在不同的場(chǎng)景中,又存在很多細(xì)節(jié)差異;
有的任務(wù)只強(qiáng)調(diào)「執(zhí)行過(guò)程」,并不需要追溯任務(wù)自身的「執(zhí)行結(jié)果」,這里并不是指對(duì)系統(tǒng)和業(yè)務(wù)產(chǎn)生的效果,比如定時(shí)任務(wù)、消息隊(duì)列等場(chǎng)景;
但是有些任務(wù)即強(qiáng)調(diào)「執(zhí)行過(guò)程」,又需要追溯任務(wù)自身的「執(zhí)行結(jié)果」,在流程中依賴某個(gè)異步結(jié)果,判斷流程是否中斷,比如「并行」處理;
【串行處理】整個(gè)流程按照邏輯逐步推進(jìn),如果出現(xiàn)異常會(huì)導(dǎo)致流程中斷;
【并行處理】主流程按照邏輯逐步推進(jìn),其他「異步」交互的流程執(zhí)行完畢后,將結(jié)果返回到主流程,如果「異步」流程異常,會(huì)影響部分結(jié)果;
此前在《「訂單」業(yè)務(wù)》的內(nèi)容中,聊過(guò)關(guān)于「串行」和「并行」的應(yīng)用對(duì)比,即在訂單詳情的加載過(guò)程中,通過(guò)「并行」的方式讀?。荷唐?、商戶、訂單、用戶等信息,提升接口的響應(yīng)時(shí)間;
二、Future接口
1、入門案例
異步是對(duì)流程的解耦,但是有的流程中又依賴異步執(zhí)行的最終結(jié)果,此時(shí)就可以使用「Future」接口來(lái)達(dá)到該目的,先來(lái)看一個(gè)簡(jiǎn)單的入門案例;
public class ServerTask implements Callable<Integer> { @Override public Integer call() throws Exception { Thread.sleep(2000); return 3; } } public class FutureBase01 { public static void main(String[] args) throws Exception { TimeInterval timer = DateUtil.timer(); // 線程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 批量任務(wù) List<ServerTask> serverTasks = new ArrayList<>() ; for (int i=0;i<3;i++){ serverTasks.add(new ServerTask()); } List<Future<Integer>> taskResList = executor.invokeAll(serverTasks) ; // 結(jié)果輸出 for (Future<Integer> intFuture:taskResList){ System.out.println(intFuture.get()); } // 耗時(shí)統(tǒng)計(jì) System.out.println("timer...interval = "+timer.interval()); } }
這里模擬一個(gè)場(chǎng)景,以線程池批量執(zhí)行異步任務(wù),在任務(wù)內(nèi)線程休眠2秒,以并行的方式最終獲取全部結(jié)果,只耗時(shí)2秒多一點(diǎn),如果串行的話耗時(shí)肯定超過(guò)6秒;
2、Future接口
Future表示異步計(jì)算的結(jié)果,提供了用于檢查計(jì)算是否完成、等待計(jì)算完成、以及檢索計(jì)算結(jié)果的方法。
【核心方法】
get():等待任務(wù)完成,獲取執(zhí)行結(jié)果,如果任務(wù)取消會(huì)拋出異常; get(long timeout, TimeUnit unit):指定等待任務(wù)完成的時(shí)間,等待超時(shí)會(huì)拋出異常; isDone():判斷任務(wù)是否完成; isCancelled():判斷任務(wù)是否被取消; cancel(boolean mayInterruptIfRunning):嘗試取消此任務(wù)的執(zhí)行,如果任務(wù)已經(jīng)完成、已經(jīng)取消或由于其他原因無(wú)法取消,則此嘗試將失??;
【基礎(chǔ)用法】
public class FutureBase02 { public static void main(String[] args) throws Exception { // 線程池執(zhí)行任務(wù) ExecutorService executor = Executors.newFixedThreadPool(3); FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(3000); return "task...OK"; } }) ; executor.execute(futureTask); // 任務(wù)信息獲取 System.out.println("是否完成:"+futureTask.isDone()); System.out.println("是否取消:"+futureTask.isCancelled()); System.out.println("獲取結(jié)果:"+futureTask.get()); System.out.println("嘗試取消:"+futureTask.cancel(Boolean.TRUE)); } }
【FutureTask】
Future接口的基本實(shí)現(xiàn)類,提供了計(jì)算的啟動(dòng)和取消、查詢計(jì)算是否完成以及檢索計(jì)算結(jié)果的方法;
在「FutureTask」類中,可以看到線程異步執(zhí)行任務(wù)時(shí),其中的核心狀態(tài)轉(zhuǎn)換,以及最終結(jié)果寫出的方式;
雖然「Future」從設(shè)計(jì)上,實(shí)現(xiàn)了異步計(jì)算的結(jié)果獲取,但是通過(guò)上面的案例也可以發(fā)現(xiàn),流程的主線程在執(zhí)行g(shù)et()方法時(shí)會(huì)阻塞,直到最終獲取結(jié)果,顯然對(duì)于程序來(lái)說(shuō)并不友好;
在JDK1.8提供「CompletableFuture」類,對(duì)「Future」進(jìn)行優(yōu)化和擴(kuò)展;
三、CompletableFuture類
1、基礎(chǔ)說(shuō)明
「CompletableFuture」類提供函數(shù)編程的能力,可以通過(guò)回調(diào)的方式處理計(jì)算結(jié)果,并且支持組合操作,提供很多方法來(lái)實(shí)現(xiàn)異步編排,降低異步編程的復(fù)雜度;
「CompletableFuture」實(shí)現(xiàn)「Future」和「CompletionStage」兩個(gè)接口;
Future:表示異步計(jì)算的結(jié)果; CompletionStage:表示異步計(jì)算的一個(gè)步驟,當(dāng)一個(gè)階段計(jì)算完成時(shí),可能會(huì)觸發(fā)其他階段,即步驟可能由其他CompletionStage觸發(fā);
【入門案例】
public class CompletableBase01 { public static void main(String[] args) throws Exception { // 線程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 任務(wù)執(zhí)行 CompletableFuture<String> cft = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "Res...OK"; }, executor); // 結(jié)果輸出 System.out.println(cft.get()); } }
2、核心方法
2.1 實(shí)例方法
public class Completable01 { public static void main(String[] args) throws Exception { // 線程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 1、創(chuàng)建未完成的CompletableFuture,通過(guò)complete()方法完成 CompletableFuture<Integer> cft01 = new CompletableFuture<>() ; cft01.complete(99) ; // 2、創(chuàng)建已經(jīng)完成CompletableFuture,并且給定結(jié)果 CompletableFuture<String> cft02 = CompletableFuture.completedFuture("given...value"); // 3、有返回值,默認(rèn)ForkJoinPool線程池 CompletableFuture<String> cft03 = CompletableFuture.supplyAsync(() -> {return "OK-3";}); // 4、有返回值,采用Executor自定義線程池 CompletableFuture<String> cft04 = CompletableFuture.supplyAsync(() -> {return "OK-4";},executor); // 5、無(wú)返回值,默認(rèn)ForkJoinPool線程池 CompletableFuture<Void> cft05 = CompletableFuture.runAsync(() -> {}); // 6、無(wú)返回值,采用Executor自定義線程池 CompletableFuture<Void> cft06 = CompletableFuture.runAsync(()-> {}, executor); } }
2.2 計(jì)算方法
public class Completable02 { public static void main(String[] args) throws Exception { // 線程池 ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "OK"; },executor); // 1、計(jì)算完成后,執(zhí)行后續(xù)處理 // cft01.whenComplete((res, ex) -> System.out.println("Result:"+res+";Exe:"+ex)); // 2、觸發(fā)計(jì)算,如果沒(méi)有完成,則get設(shè)定的值,如果已完成,則get任務(wù)返回值 // boolean completeFlag = cft01.complete("given...value"); // if (completeFlag){ // System.out.println(cft01.get()); // } else { // System.out.println(cft01.get()); // } // 3、開(kāi)啟新CompletionStage,重新獲取線程執(zhí)行任務(wù) cft01.whenCompleteAsync((res, ex) -> System.out.println("Result:"+res+";Exe:"+ex),executor); } }
2.3 結(jié)果獲取方法
public class Completable03 { public static void main(String[] args) throws Exception { // 線程池 ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Res...OK"; },executor); // 1、阻塞直到獲取結(jié)果 // System.out.println(cft01.get()); // 2、設(shè)定超時(shí)的阻塞獲取結(jié)果 // System.out.println(cft01.get(4, TimeUnit.SECONDS)); // 3、非阻塞獲取結(jié)果,如果任務(wù)已經(jīng)完成,則返回結(jié)果,如果任務(wù)未完成,返回給定的值 // System.out.println(cft01.getNow("given...value")); // 4、get獲取拋檢查異常,join獲取非檢查異常 System.out.println(cft01.join()); } }
2.4 任務(wù)編排方法
public class Completable04 { public static void main(String[] args) throws Exception { // 線程池 ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("OK-1"); return "OK"; },executor); // 1、cft01任務(wù)執(zhí)行完成后,執(zhí)行之后的任務(wù),此處不關(guān)注cft01的結(jié)果 // cft01.thenRun(() -> System.out.println("task...run")) ; // 2、cft01任務(wù)執(zhí)行完成后,執(zhí)行之后的任務(wù),可以獲取cft01的結(jié)果 // cft01.thenAccept((res) -> { // System.out.println("cft01:"+res); // System.out.println("task...run"); // }); // 3、cft01任務(wù)執(zhí)行完成后,執(zhí)行之后的任務(wù),獲取cft01的結(jié)果,并且具有返回值 // CompletableFuture<Integer> cft02 = cft01.thenApply((res) -> { // System.out.println("cft01:"+res); // return 99 ; // }); // System.out.println(cft02.get()); // 4、順序執(zhí)行cft01、cft02 // CompletableFuture<String> cft02 = cft01.thenCompose((res) -> CompletableFuture.supplyAsync(() -> { // System.out.println("cft01:"+res); // return "OK-2"; // })); // cft02.whenComplete((res,ex) -> System.out.println("Result:"+res+";Exe:"+ex)); // 5、對(duì)比任務(wù)的執(zhí)行效率,由于cft02先完成,所以取cft02的結(jié)果 // CompletableFuture<String> cft02 = cft01.applyToEither(CompletableFuture.supplyAsync(() -> { // System.out.println("run...cft02"); // try { // Thread.sleep(3000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // return "OK-2"; // }),(res) -> { // System.out.println("either...result:" + res); // return res; // }); // System.out.println("finally...result:" + cft02.get()); // 6、兩組任務(wù)執(zhí)行完成后,對(duì)結(jié)果進(jìn)行合并 // CompletableFuture<String> cft02 = CompletableFuture.supplyAsync(() -> "OK-2") ; // String finallyRes = cft01.thenCombine(cft02,(res1,res2) -> { // System.out.println("res1:"+res1+";res2:"+res2); // return res1+";"+res2 ; // }).get(); // System.out.println(finallyRes); CompletableFuture<String> cft02 = CompletableFuture.supplyAsync(() -> { System.out.println("OK-2"); return "OK-2"; }) ; CompletableFuture<String> cft03 = CompletableFuture.supplyAsync(() -> { System.out.println("OK-3"); return "OK-3"; }) ; // 7、等待批量任務(wù)執(zhí)行完返回 // CompletableFuture.allOf(cft01,cft02,cft03).get(); // 8、任意一個(gè)任務(wù)執(zhí)行完即返回 System.out.println("Sign:"+CompletableFuture.anyOf(cft01,cft02,cft03).get()); } }
2.5 異常處理方法
public class Completable05 { public static void main(String[] args) throws Exception { // 線程池 ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> { if (1 > 0){ throw new RuntimeException("task...exception"); } return "OK"; },executor); // 1、捕獲cft01的異常信息,并提供返回值 String finallyRes = cft01.thenApply((res) -> { System.out.println("cft01-res:" + res); return res; }).exceptionally((ex) -> { System.out.println("cft01-exe:" + ex.getMessage()); return "error" ; }).get(); System.out.println("finallyRes="+finallyRes); CompletableFuture<String> cft02 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "OK-2"; },executor); // 2、如果cft02未完成,則get時(shí)拋出指定異常信息 boolean exeFlag = cft02.completeExceptionally(new RuntimeException("given...exception")); if (exeFlag){ System.out.println(cft02.get()); } else { System.out.println(cft02.get()); } } }
3、線程池問(wèn)題
在實(shí)踐中,通常不使用ForkJoinPool#commonPool()公共線程池,會(huì)出現(xiàn)線程競(jìng)爭(zhēng)問(wèn)題,從而形成系統(tǒng)瓶頸; 在任務(wù)編排中,如果出現(xiàn)依賴情況或者父子任務(wù),盡量使用多個(gè)線程池,從而避免任務(wù)請(qǐng)求同一個(gè)線程池,規(guī)避死鎖情況發(fā)生;
四、CompletableFuture原理
1、核心結(jié)構(gòu)
在分析「CompletableFuture」其原理之前,首先看一下涉及的核心結(jié)構(gòu);
【CompletableFuture】
在該類中有兩個(gè)關(guān)鍵的字段:「result」存儲(chǔ)當(dāng)前CF的結(jié)果,「stack」代表?xiàng)m斣兀串?dāng)前CF計(jì)算完成后會(huì)觸發(fā)的依賴動(dòng)作;從上面案例中可知,依賴動(dòng)作可以沒(méi)有或者有多個(gè);
【Completion】
依賴動(dòng)作的封裝類;
【UniCompletion】
繼承Completion類,一元依賴的基礎(chǔ)類,「executor」指線程池,「dep」指依賴的計(jì)算,「src」指源動(dòng)作;
【BiCompletion】
繼承UniCompletion類,二元或者多元依賴的基礎(chǔ)類,「snd」指第二個(gè)源動(dòng)作;
2、零依賴
顧名思義,即各個(gè)CF之間不產(chǎn)生依賴關(guān)系;
public class DepZero { public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(()-> "OK-1",executor); CompletableFuture<String> cft2 = CompletableFuture.supplyAsync(()-> "OK-2",executor); System.out.println(cft1.get()+";"+cft2.get()); } }
3、一元依賴
即CF之間的單個(gè)依賴關(guān)系;這里使用「thenApply」方法演示,為了看到效果,使「cft1」長(zhǎng)時(shí)間休眠,斷點(diǎn)查看「stack」結(jié)構(gòu);
public class DepOne { public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } return "OK-1"; },executor); CompletableFuture<String> cft2 = cft1.thenApply(res -> { System.out.println("cft01-res"+res); return "OK-2" ; }); System.out.println("cft02-res"+cft2.get()); } }
斷點(diǎn)截圖:
原理分析:
觀察者Completion注冊(cè)到「cft1」,注冊(cè)時(shí)會(huì)檢查計(jì)算是否完成,未完成則觀察者入棧,當(dāng)「cft1」計(jì)算完成會(huì)彈棧;已完成則直接觸發(fā)觀察者;
可以調(diào)整斷點(diǎn)代碼,讓「cft1」先處于完成狀態(tài),再查看其運(yùn)行時(shí)結(jié)構(gòu),從而分析完整的邏輯;
4、二元依賴
即一個(gè)CF同時(shí)依賴兩個(gè)CF;這里使用「thenCombine」方法演示;為了看到效果,使「cft1、cft2」長(zhǎng)時(shí)間休眠,斷點(diǎn)查看「stack」結(jié)構(gòu);
public class DepTwo { public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } return "OK-1"; },executor); CompletableFuture<String> cft2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } return "OK-2"; },executor); // cft3 依賴 cft1和cft2 的計(jì)算結(jié)果 CompletableFuture<String> cft3 = cft1.thenCombine(cft2,(res1,res2) -> { System.out.println("cft01-res:"+res1); System.out.println("cft02-res:"+res2); return "OK-3" ; }); System.out.println("cft03-res:"+cft3.get()); } }
斷點(diǎn)截圖:
原理分析:
在「cft1」和「cft2」未完成的狀態(tài)下,嘗試將BiApply壓入「cft1」和「cft2」兩個(gè)棧中,任意CF完成時(shí),會(huì)嘗試觸發(fā)觀察者,觀察者檢查「cft1」和「cft2」是否都完成,如果完成則執(zhí)行;
5、多元依賴
即一個(gè)CF同時(shí)依賴多個(gè)CF;這里使用「allOf」方法演示;為了看到效果,使「cft1、cft2、cft3」長(zhǎng)時(shí)間休眠,斷點(diǎn)查看「stack」結(jié)構(gòu);
public class DepMore { public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } return "OK-1"; },executor); CompletableFuture<String> cft2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } return "OK-2"; },executor); CompletableFuture<String> cft3 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } return "OK-3"; },executor); // cft4 依賴 cft1和cft2和cft3 的計(jì)算結(jié)果 CompletableFuture<Void> cft4 = CompletableFuture.allOf(cft1,cft2,cft3); CompletableFuture<String> finallyRes = cft4.thenApply(tm -> { System.out.println("cft01-res:"+cft1.join()); System.out.println("cft02-res:"+cft2.join()); System.out.println("cft03-res:"+cft3.join()); return "OK-4"; }); System.out.println("finally-res:"+finallyRes.get()); } }
斷點(diǎn)截圖:
原理分析:
多元依賴的回調(diào)方法除了「allOf」還有「anyOf」,其實(shí)現(xiàn)原理都是將依賴的多個(gè)CF補(bǔ)全為平衡二叉樹(shù),從斷點(diǎn)圖可知會(huì)按照樹(shù)的層級(jí)處理,核心結(jié)構(gòu)參考二元依賴即可;
到此這篇關(guān)于Java中Future接口詳解的文章就介紹到這了,更多相關(guān)Future接口詳解內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java并發(fā)包工具CountDownLatch源碼分析
這篇文章主要為大家介紹了java并發(fā)包工具CountDownLatch源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10基于Spring Cloud Zookeeper實(shí)現(xiàn)服務(wù)注冊(cè)與發(fā)現(xiàn)
這篇文章主要介紹了基于Spring Cloud Zookeeper實(shí)現(xiàn)服務(wù)注冊(cè)與發(fā)現(xiàn),幫助大家更好的理解和學(xué)習(xí)spring框架,感興趣的朋友可以了解下2020-11-114位吸血鬼數(shù)字的java實(shí)現(xiàn)思路與實(shí)例講解
今天小編就為大家分享一篇關(guān)于4位吸血鬼數(shù)字的java實(shí)現(xiàn)思路與實(shí)例講解,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2019-03-03java學(xué)習(xí)筆記之DBUtils工具包詳解
下面小編就為大家分享一篇java學(xué)習(xí)筆記之DBUtils工具包詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-01-01SpringBoot+Hutool實(shí)現(xiàn)圖片驗(yàn)證碼的示例代碼
圖片驗(yàn)證碼在注冊(cè)、登錄、交易、交互等各類場(chǎng)景中都發(fā)揮著巨大作用,本文主要介紹了SpringBoot+Hutool實(shí)現(xiàn)圖片驗(yàn)證碼的示例代碼,具有一定的參考價(jià)值,感興趣的可以了解一下2024-01-01現(xiàn)代高效的java構(gòu)建工具gradle的快速入門
和Maven一樣,Gradle只是提供了構(gòu)建項(xiàng)目的一個(gè)框架,真正起作用的是Plugin,本文主要介紹了gradle入門,文中通過(guò)示例代碼介紹的非常詳細(xì),感興趣的小伙伴們可以參考一下2021-11-11