Java中Future接口詳解
一、背景
在系統(tǒng)中,異步執(zhí)行任務(wù),是很常見的功能邏輯,但是在不同的場景中,又存在很多細節(jié)差異;
有的任務(wù)只強調(diào)「執(zhí)行過程」,并不需要追溯任務(wù)自身的「執(zhí)行結(jié)果」,這里并不是指對系統(tǒng)和業(yè)務(wù)產(chǎn)生的效果,比如定時任務(wù)、消息隊列等場景;
但是有些任務(wù)即強調(diào)「執(zhí)行過程」,又需要追溯任務(wù)自身的「執(zhí)行結(jié)果」,在流程中依賴某個異步結(jié)果,判斷流程是否中斷,比如「并行」處理;
【串行處理】整個流程按照邏輯逐步推進,如果出現(xiàn)異常會導(dǎo)致流程中斷;

【并行處理】主流程按照邏輯逐步推進,其他「異步」交互的流程執(zhí)行完畢后,將結(jié)果返回到主流程,如果「異步」流程異常,會影響部分結(jié)果;

此前在《「訂單」業(yè)務(wù)》的內(nèi)容中,聊過關(guān)于「串行」和「并行」的應(yīng)用對比,即在訂單詳情的加載過程中,通過「并行」的方式讀取:商品、商戶、訂單、用戶等信息,提升接口的響應(yīng)時間;
二、Future接口
1、入門案例
異步是對流程的解耦,但是有的流程中又依賴異步執(zhí)行的最終結(jié)果,此時就可以使用「Future」接口來達到該目的,先來看一個簡單的入門案例;
public class ServerTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(2000);
return 3;
}
}
public class FutureBase01 {
public static void main(String[] args) throws Exception {
TimeInterval timer = DateUtil.timer();
// 線程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 批量任務(wù)
List<ServerTask> serverTasks = new ArrayList<>() ;
for (int i=0;i<3;i++){
serverTasks.add(new ServerTask());
}
List<Future<Integer>> taskResList = executor.invokeAll(serverTasks) ;
// 結(jié)果輸出
for (Future<Integer> intFuture:taskResList){
System.out.println(intFuture.get());
}
// 耗時統(tǒng)計
System.out.println("timer...interval = "+timer.interval());
}
}這里模擬一個場景,以線程池批量執(zhí)行異步任務(wù),在任務(wù)內(nèi)線程休眠2秒,以并行的方式最終獲取全部結(jié)果,只耗時2秒多一點,如果串行的話耗時肯定超過6秒;
2、Future接口
Future表示異步計算的結(jié)果,提供了用于檢查計算是否完成、等待計算完成、以及檢索計算結(jié)果的方法。
【核心方法】
get():等待任務(wù)完成,獲取執(zhí)行結(jié)果,如果任務(wù)取消會拋出異常;get(long timeout, TimeUnit unit):指定等待任務(wù)完成的時間,等待超時會拋出異常;isDone():判斷任務(wù)是否完成;isCancelled():判斷任務(wù)是否被取消;cancel(boolean mayInterruptIfRunning):嘗試取消此任務(wù)的執(zhí)行,如果任務(wù)已經(jīng)完成、已經(jīng)取消或由于其他原因無法取消,則此嘗試將失??;
【基礎(chǔ)用法】
public class FutureBase02 {
public static void main(String[] args) throws Exception {
// 線程池執(zhí)行任務(wù)
ExecutorService executor = Executors.newFixedThreadPool(3);
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(3000);
return "task...OK";
}
}) ;
executor.execute(futureTask);
// 任務(wù)信息獲取
System.out.println("是否完成:"+futureTask.isDone());
System.out.println("是否取消:"+futureTask.isCancelled());
System.out.println("獲取結(jié)果:"+futureTask.get());
System.out.println("嘗試取消:"+futureTask.cancel(Boolean.TRUE));
}
}【FutureTask】
Future接口的基本實現(xiàn)類,提供了計算的啟動和取消、查詢計算是否完成以及檢索計算結(jié)果的方法;

在「FutureTask」類中,可以看到線程異步執(zhí)行任務(wù)時,其中的核心狀態(tài)轉(zhuǎn)換,以及最終結(jié)果寫出的方式;
雖然「Future」從設(shè)計上,實現(xiàn)了異步計算的結(jié)果獲取,但是通過上面的案例也可以發(fā)現(xiàn),流程的主線程在執(zhí)行get()方法時會阻塞,直到最終獲取結(jié)果,顯然對于程序來說并不友好;
在JDK1.8提供「CompletableFuture」類,對「Future」進行優(yōu)化和擴展;
三、CompletableFuture類
1、基礎(chǔ)說明
「CompletableFuture」類提供函數(shù)編程的能力,可以通過回調(diào)的方式處理計算結(jié)果,并且支持組合操作,提供很多方法來實現(xiàn)異步編排,降低異步編程的復(fù)雜度;

「CompletableFuture」實現(xiàn)「Future」和「CompletionStage」兩個接口;
- Future:表示異步計算的結(jié)果;
- CompletionStage:表示異步計算的一個步驟,當(dāng)一個階段計算完成時,可能會觸發(fā)其他階段,即步驟可能由其他CompletionStage觸發(fā);
【入門案例】
public class CompletableBase01 {
public static void main(String[] args) throws Exception {
// 線程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 任務(wù)執(zhí)行
CompletableFuture<String> cft = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Res...OK";
}, executor);
// 結(jié)果輸出
System.out.println(cft.get());
}
}2、核心方法
2.1 實例方法
public class Completable01 {
public static void main(String[] args) throws Exception {
// 線程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 1、創(chuàng)建未完成的CompletableFuture,通過complete()方法完成
CompletableFuture<Integer> cft01 = new CompletableFuture<>() ;
cft01.complete(99) ;
// 2、創(chuàng)建已經(jīng)完成CompletableFuture,并且給定結(jié)果
CompletableFuture<String> cft02 = CompletableFuture.completedFuture("given...value");
// 3、有返回值,默認ForkJoinPool線程池
CompletableFuture<String> cft03 = CompletableFuture.supplyAsync(() -> {return "OK-3";});
// 4、有返回值,采用Executor自定義線程池
CompletableFuture<String> cft04 = CompletableFuture.supplyAsync(() -> {return "OK-4";},executor);
// 5、無返回值,默認ForkJoinPool線程池
CompletableFuture<Void> cft05 = CompletableFuture.runAsync(() -> {});
// 6、無返回值,采用Executor自定義線程池
CompletableFuture<Void> cft06 = CompletableFuture.runAsync(()-> {}, executor);
}
}2.2 計算方法
public class Completable02 {
public static void main(String[] args) throws Exception {
// 線程池
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "OK";
},executor);
// 1、計算完成后,執(zhí)行后續(xù)處理
// cft01.whenComplete((res, ex) -> System.out.println("Result:"+res+";Exe:"+ex));
// 2、觸發(fā)計算,如果沒有完成,則get設(shè)定的值,如果已完成,則get任務(wù)返回值
// boolean completeFlag = cft01.complete("given...value");
// if (completeFlag){
// System.out.println(cft01.get());
// } else {
// System.out.println(cft01.get());
// }
// 3、開啟新CompletionStage,重新獲取線程執(zhí)行任務(wù)
cft01.whenCompleteAsync((res, ex) -> System.out.println("Result:"+res+";Exe:"+ex),executor);
}
}2.3 結(jié)果獲取方法
public class Completable03 {
public static void main(String[] args) throws Exception {
// 線程池
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Res...OK";
},executor);
// 1、阻塞直到獲取結(jié)果
// System.out.println(cft01.get());
// 2、設(shè)定超時的阻塞獲取結(jié)果
// System.out.println(cft01.get(4, TimeUnit.SECONDS));
// 3、非阻塞獲取結(jié)果,如果任務(wù)已經(jīng)完成,則返回結(jié)果,如果任務(wù)未完成,返回給定的值
// System.out.println(cft01.getNow("given...value"));
// 4、get獲取拋檢查異常,join獲取非檢查異常
System.out.println(cft01.join());
}
}
2.4 任務(wù)編排方法
public class Completable04 {
public static void main(String[] args) throws Exception {
// 線程池
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("OK-1");
return "OK";
},executor);
// 1、cft01任務(wù)執(zhí)行完成后,執(zhí)行之后的任務(wù),此處不關(guān)注cft01的結(jié)果
// cft01.thenRun(() -> System.out.println("task...run")) ;
// 2、cft01任務(wù)執(zhí)行完成后,執(zhí)行之后的任務(wù),可以獲取cft01的結(jié)果
// cft01.thenAccept((res) -> {
// System.out.println("cft01:"+res);
// System.out.println("task...run");
// });
// 3、cft01任務(wù)執(zhí)行完成后,執(zhí)行之后的任務(wù),獲取cft01的結(jié)果,并且具有返回值
// CompletableFuture<Integer> cft02 = cft01.thenApply((res) -> {
// System.out.println("cft01:"+res);
// return 99 ;
// });
// System.out.println(cft02.get());
// 4、順序執(zhí)行cft01、cft02
// CompletableFuture<String> cft02 = cft01.thenCompose((res) -> CompletableFuture.supplyAsync(() -> {
// System.out.println("cft01:"+res);
// return "OK-2";
// }));
// cft02.whenComplete((res,ex) -> System.out.println("Result:"+res+";Exe:"+ex));
// 5、對比任務(wù)的執(zhí)行效率,由于cft02先完成,所以取cft02的結(jié)果
// CompletableFuture<String> cft02 = cft01.applyToEither(CompletableFuture.supplyAsync(() -> {
// System.out.println("run...cft02");
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// return "OK-2";
// }),(res) -> {
// System.out.println("either...result:" + res);
// return res;
// });
// System.out.println("finally...result:" + cft02.get());
// 6、兩組任務(wù)執(zhí)行完成后,對結(jié)果進行合并
// CompletableFuture<String> cft02 = CompletableFuture.supplyAsync(() -> "OK-2") ;
// String finallyRes = cft01.thenCombine(cft02,(res1,res2) -> {
// System.out.println("res1:"+res1+";res2:"+res2);
// return res1+";"+res2 ;
// }).get();
// System.out.println(finallyRes);
CompletableFuture<String> cft02 = CompletableFuture.supplyAsync(() -> {
System.out.println("OK-2");
return "OK-2";
}) ;
CompletableFuture<String> cft03 = CompletableFuture.supplyAsync(() -> {
System.out.println("OK-3");
return "OK-3";
}) ;
// 7、等待批量任務(wù)執(zhí)行完返回
// CompletableFuture.allOf(cft01,cft02,cft03).get();
// 8、任意一個任務(wù)執(zhí)行完即返回
System.out.println("Sign:"+CompletableFuture.anyOf(cft01,cft02,cft03).get());
}
}
2.5 異常處理方法
public class Completable05 {
public static void main(String[] args) throws Exception {
// 線程池
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> {
if (1 > 0){
throw new RuntimeException("task...exception");
}
return "OK";
},executor);
// 1、捕獲cft01的異常信息,并提供返回值
String finallyRes = cft01.thenApply((res) -> {
System.out.println("cft01-res:" + res);
return res;
}).exceptionally((ex) -> {
System.out.println("cft01-exe:" + ex.getMessage());
return "error" ;
}).get();
System.out.println("finallyRes="+finallyRes);
CompletableFuture<String> cft02 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "OK-2";
},executor);
// 2、如果cft02未完成,則get時拋出指定異常信息
boolean exeFlag = cft02.completeExceptionally(new RuntimeException("given...exception"));
if (exeFlag){
System.out.println(cft02.get());
} else {
System.out.println(cft02.get());
}
}
}
3、線程池問題
- 在實踐中,通常不使用
ForkJoinPool#commonPool()公共線程池,會出現(xiàn)線程競爭問題,從而形成系統(tǒng)瓶頸; - 在任務(wù)編排中,如果出現(xiàn)依賴情況或者父子任務(wù),盡量使用多個線程池,從而避免任務(wù)請求同一個線程池,規(guī)避死鎖情況發(fā)生;
四、CompletableFuture原理
1、核心結(jié)構(gòu)
在分析「CompletableFuture」其原理之前,首先看一下涉及的核心結(jié)構(gòu);

【CompletableFuture】
在該類中有兩個關(guān)鍵的字段:「result」存儲當(dāng)前CF的結(jié)果,「stack」代表棧頂元素,即當(dāng)前CF計算完成后會觸發(fā)的依賴動作;從上面案例中可知,依賴動作可以沒有或者有多個;
【Completion】
依賴動作的封裝類;
【UniCompletion】
繼承Completion類,一元依賴的基礎(chǔ)類,「executor」指線程池,「dep」指依賴的計算,「src」指源動作;
【BiCompletion】
繼承UniCompletion類,二元或者多元依賴的基礎(chǔ)類,「snd」指第二個源動作;
2、零依賴
顧名思義,即各個CF之間不產(chǎn)生依賴關(guān)系;
public class DepZero {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(()-> "OK-1",executor);
CompletableFuture<String> cft2 = CompletableFuture.supplyAsync(()-> "OK-2",executor);
System.out.println(cft1.get()+";"+cft2.get());
}
}3、一元依賴
即CF之間的單個依賴關(guān)系;這里使用「thenApply」方法演示,為了看到效果,使「cft1」長時間休眠,斷點查看「stack」結(jié)構(gòu);
public class DepOne {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "OK-1";
},executor);
CompletableFuture<String> cft2 = cft1.thenApply(res -> {
System.out.println("cft01-res"+res);
return "OK-2" ;
});
System.out.println("cft02-res"+cft2.get());
}
}斷點截圖:

原理分析:

觀察者Completion注冊到「cft1」,注冊時會檢查計算是否完成,未完成則觀察者入棧,當(dāng)「cft1」計算完成會彈棧;已完成則直接觸發(fā)觀察者;
可以調(diào)整斷點代碼,讓「cft1」先處于完成狀態(tài),再查看其運行時結(jié)構(gòu),從而分析完整的邏輯;
4、二元依賴
即一個CF同時依賴兩個CF;這里使用「thenCombine」方法演示;為了看到效果,使「cft1、cft2」長時間休眠,斷點查看「stack」結(jié)構(gòu);
public class DepTwo {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "OK-1";
},executor);
CompletableFuture<String> cft2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "OK-2";
},executor);
// cft3 依賴 cft1和cft2 的計算結(jié)果
CompletableFuture<String> cft3 = cft1.thenCombine(cft2,(res1,res2) -> {
System.out.println("cft01-res:"+res1);
System.out.println("cft02-res:"+res2);
return "OK-3" ;
});
System.out.println("cft03-res:"+cft3.get());
}
}
斷點截圖:

原理分析:

在「cft1」和「cft2」未完成的狀態(tài)下,嘗試將BiApply壓入「cft1」和「cft2」兩個棧中,任意CF完成時,會嘗試觸發(fā)觀察者,觀察者檢查「cft1」和「cft2」是否都完成,如果完成則執(zhí)行;
5、多元依賴
即一個CF同時依賴多個CF;這里使用「allOf」方法演示;為了看到效果,使「cft1、cft2、cft3」長時間休眠,斷點查看「stack」結(jié)構(gòu);
public class DepMore {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "OK-1";
},executor);
CompletableFuture<String> cft2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "OK-2";
},executor);
CompletableFuture<String> cft3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "OK-3";
},executor);
// cft4 依賴 cft1和cft2和cft3 的計算結(jié)果
CompletableFuture<Void> cft4 = CompletableFuture.allOf(cft1,cft2,cft3);
CompletableFuture<String> finallyRes = cft4.thenApply(tm -> {
System.out.println("cft01-res:"+cft1.join());
System.out.println("cft02-res:"+cft2.join());
System.out.println("cft03-res:"+cft3.join());
return "OK-4";
});
System.out.println("finally-res:"+finallyRes.get());
}
}
斷點截圖:

原理分析:

多元依賴的回調(diào)方法除了「allOf」還有「anyOf」,其實現(xiàn)原理都是將依賴的多個CF補全為平衡二叉樹,從斷點圖可知會按照樹的層級處理,核心結(jié)構(gòu)參考二元依賴即可;
五、參考源碼
編程文檔: https://gitee.com/cicadasmile/butte-java-note
應(yīng)用倉庫: https://gitee.com/cicadasmile/butte-flyer-parent
Gitee主頁: https://gitee.com/cicadasmile/butte-java-note
到此這篇關(guān)于Java中Future接口詳解的文章就介紹到這了,更多相關(guān)Java中Future接口內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
mybatis注解之@Mapper和@MapperScan的使用
這篇文章主要介紹了mybatis注解之@Mapper和@MapperScan的使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10
Java開發(fā)環(huán)境配置及Vscode搭建過程
今天通過圖文并茂的形式給大家介紹Java開發(fā)環(huán)境配置及Vscode搭建過程,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2021-07-07
springMVC前臺傳數(shù)組類型,后臺用list類型接收實例代碼
這篇文章主要介紹了springMVC前臺傳數(shù)組類型,后臺用list類型接收實例代碼,具有一定借鑒價值,需要的朋友可以參考下。2017-12-12
Java HashMap 如何正確遍歷并刪除元素的方法小結(jié)
這篇文章主要介紹了Java HashMap 如何正確遍歷并刪除元素的方法小結(jié),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-05-05
java查詢近七日數(shù)據(jù)功能的實現(xiàn)
這篇文章主要介紹了java查詢近七日數(shù)據(jù)功能的實現(xiàn),文章內(nèi)容詳細,簡單易懂,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2023-01-01
Java實現(xiàn)Map遍歷key-value的四種方法
本文主要介紹了Java實現(xiàn)Map遍歷key-value的四種方法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07
SpringBoot ApplicationContextAware拓展接口使用詳解
當(dāng)一個類實現(xiàn)了這個接口(ApplicationContextAware)之后,這個類就可以方便獲得ApplicationContext中的所有bean。換句話說,就是這個類可以直接獲取spring配置文件中,所有有引用到的bean對象2023-04-04
SpringBoot中Bean生命周期自定義初始化和銷毀方法詳解
這篇文章給大家詳細介紹了SpringBoot中Bean生命周期自定義初始化和銷毀方法,文中通過代碼示例講解的非常詳細,對大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2024-01-01

