Java多線程工具CompletableFuture詳解
簡介
CompletableFuture 是 java 1.8 追加的新特性,通俗的話來說,是一個函數(shù)式的,用于控制多任務(wù)同步、異步組合操作的工具,實現(xiàn)諸如:
- 控制若干個線程任務(wù)間是同步還是異步
- 控制若干個線程間的先后執(zhí)行順序、依賴關(guān)系
- 若干個線程任務(wù),任意其中一個完成就執(zhí)行某種邏輯
- ……
將變得十分簡單。如果你對前端有一定了解,你會發(fā)現(xiàn)它和 Javascript 中的 Promise 是十分類似的。
使用方法
CompletableFuture 需要依仗線程池實現(xiàn)自身功能,這個線程池是個非必填值,如果未特殊指明,將會使用 ForkJoinPool 的實例,構(gòu)造方法為 ForkJoinPool.makeCommonPool(),該線程池大小為 Runtime.getRuntime().availableProcessors() - 1 即 當(dāng)前電腦 cpu 可用核心數(shù) -1。
常見 API
| 方法名稱 | 備注 |
| complete | 標識自身已完成任務(wù),并傳入一個參數(shù)作為 CompletableFuture.get() 將獲取的值;標識結(jié)束是 CAS 方式設(shè)置,只有 未結(jié)束→結(jié)束 的變化才能成功,complete 操作返回 true 時才真正影響 CompletableFuture.get() 將獲取的值 |
| completedFuture | 這是個靜態(tài)方法,構(gòu)造一個已完成的 CompletableFuture 對象,并以傳入的參數(shù)作為 CompletableFuture.get() 將獲取的值 |
| get | 阻塞直至任務(wù)完成,并獲取該任務(wù)的返回值 |
| join | 阻塞直至任務(wù)完成,并獲取該任務(wù)的返回值(幾乎與 get 等同,但 join 不會拋出檢查型異常,不強制要求你必須處理) |
| cancel | 標識自身已完成,無法阻斷自身任務(wù),但會構(gòu)造 CancellationException傳給關(guān)聯(lián)在該對象后續(xù)的 CompletableFuture,后續(xù)的 CompletableFuture 會因捕獲到異常而終止任務(wù)。另:該函數(shù)入?yún)魅氲?boolean 不會產(chǎn)生任何作用( javadoc 里這么描述也是絕了) |
| completeExceptionally | 可以理解為 cancel 的可自定義異常版本,其入?yún)⒕褪莻鬟f給后續(xù) CompletableFuture 對象的異常 |
| exceptionally | CompletableFuture 鏈路上發(fā)生異常時會觸發(fā)該方法,給鏈路的最后一個 CompletableFuture 對象配置,即可對全鏈路進行異常捕獲,其入?yún)楫惓L幚頃r需要執(zhí)行的 Function |
| isDone | CompletableFuture 任務(wù)是否已完成 |
剩下的大多 API 是 run、accept、apply、then、either、both、async …… 的組合,本質(zhì)上都是語法糖,用了原生的 @FunctionalInterface,決定傳入的函數(shù)有無返回值、有無入?yún)ⅰ⒃诋?dāng)前任務(wù)結(jié)束后開始執(zhí)行、是否任意一個完成就結(jié)束、是否全部完成才結(jié)束、是否另起線程執(zhí)行任務(wù) ……
// 你的下一步業(yè)務(wù)邏輯 Runnable next = new MyRunnable(); // 因為是 run 所以無返回值, 因為是 then 所以在 completableFuture1 對應(yīng)的任務(wù)結(jié)束后,執(zhí)行一段任務(wù) completableFuture1.thenRun(next); // 因為是 run 所以無返回值, 因為是 then 所以在 completableFuture1 對應(yīng)的任務(wù)結(jié)束后,因為 async ,所以要另起一個線程,執(zhí)行一段任務(wù) completableFuture1.thenRunAsync(next);
在看得懂函數(shù)式編程的情況下,其他你可通過源碼函數(shù)定義以此類推
如果想使用自定義的線程池執(zhí)行任務(wù),那么使用帶 Executor executor 的重載函數(shù)即可,后不再重復(fù)說明,例如:
// 自定義線程池
ThreadPoolExecutor executorService = new ThreadPoolExecutor(10, 20,
2L, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(10),
// 你可以根據(jù)你的需要自己實現(xiàn) Handler,此處為簡寫使用現(xiàn)成的 Handler,此處捕獲的是流量過載異常
new ThreadPoolExecutor.AbortPolicy());
// 通過自定義線程池使用 CompletableFuture
CompletableFuture.runAsync(() -> {
System.out.println("我的業(yè)務(wù)代碼");
}, executorService);使用示例
為簡潔代碼,睡眠模擬任務(wù)運行耗時均使用下列函數(shù),后續(xù)不再重復(fù)說明
public static void sleep(Long sleepTime) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}請時刻注意要進行異常處理,意味著你的 completableFuture 鏈路得保證調(diào)用過下列代碼,進行異常處理(鏈路最后一個使用,則全鏈路可捕獲異常),后續(xù)為簡潔代碼已省略
completableFuture.exceptionally((Throwable ex) -> {
ex.printStackTrace();
// 取決于 completableFuture 有無返回值,類型是啥。此處實例是 completableFuture 為 CompletableFuture<Long>
return -1L;
});將常規(guī)線程任務(wù)轉(zhuǎn)化為 CompletableFuture 對象
CompletableFuture completableFuture = new CompletableFuture();
Long startTs = System.currentTimeMillis();
new Thread(() -> {
sleep(200L);
completableFuture.complete("完成");// 設(shè)置 completableFuture 結(jié)果并將狀態(tài)設(shè)置為已完成
}).start();
while (!completableFuture.isDone()) {
// 非阻塞式獲取結(jié)果,如果當(dāng)前未執(zhí)行完成則返回入?yún)⒆址?我還沒完成"
// 執(zhí)行到此處恰好任務(wù)完成,然后在執(zhí)行 while 循環(huán)判斷跳出,你在循環(huán)內(nèi)最多輸出一次 “完成”
System.out.println(completableFuture.getNow("未完"));
}
System.out.println("最終結(jié)果:" + completableFuture.getNow("未完") + " " + (System.currentTimeMillis() - startTs) + " ms");阻塞到任意一個任務(wù)完成
public static void 阻塞到任意一個完成() throws IOException {
// 模擬一個耗時 20 L 的任務(wù)
CompletableFuture<Long> completableFuture1 = CompletableFuture.supplyAsync(() -> {
sleep(20L);
System.out.println("completableFuture1 完成" + " --" + (System.currentTimeMillis() - startTs));
return 20L;
});
// 模擬一個耗時 10 L 的任務(wù)
CompletableFuture<Long> completableFuture2 = CompletableFuture.supplyAsync(() -> {
sleep(10L);
System.out.println("completableFuture2 完成" + " --" + (System.currentTimeMillis() - startTs));
return 10L;
});
// applyToEitherAsync 代表另起一個線程去執(zhí)行第二個入?yún)⒌拇a塊,這里其實沒啥影響,我就不加 Async 了
CompletableFuture result = completableFuture1.applyToEither(completableFuture2, fasterOne -> {
System.out.println(fasterOne);
return fasterOne;
});
// 除了寫回調(diào)函數(shù)方法外的另一種獲取最快值的方法
System.out.println("最快的為:" + result.join());
// 調(diào)用讀取行阻塞住,防止異步任務(wù)還未完成就退出了
System.in.read();
}遇到特別多任務(wù)的情況下,你可以嘗試數(shù)組
CompletableFuture[] array = new CompletableFuture[2];
array[0] = completableFuture1;
array[1] = completableFuture2;
CompletableFuture fasterOne = CompletableFuture.anyOf(array);
System.out.println("最快的為:" + fasterOne.join());阻塞到全部任務(wù)完成
public static void 阻塞到全部完成() {
Long startTs = System.currentTimeMillis();
// 模擬一個耗時 20 L 的任務(wù)
CompletableFuture<Long> completableFuture1 = CompletableFuture.supplyAsync(() -> {
sleep(20L);
System.out.println("completableFuture1 完成" + " --" + (System.currentTimeMillis() - startTs));
return 20L;
});
// 模擬一個耗時 10 L 的任務(wù)
CompletableFuture<Long> completableFuture2 = CompletableFuture.supplyAsync(() -> {
sleep(10L);
System.out.println("completableFuture2 完成" + " --" + (System.currentTimeMillis() - startTs));
return 10L;
});
// thenAcceptBothAsync 代表另起一個線程去執(zhí)行第二個入?yún)⒌拇a塊,這里其實沒啥影響,我就不加 Async 了
CompletableFuture result = completableFuture1.thenAcceptBoth(completableFuture2, (r1, r2) -> {
System.out.println("completableFuture1 :" + r1);
System.out.println("completableFuture2 :" + r2);
});
// thenAcceptBoth 是沒有返回值的,所以這里是 null ,但這句代碼還是有作用的,相當(dāng)于阻塞到全部任務(wù)都完成
System.out.println("返回這個 null 之后意味著全部任務(wù)已完成:" + result.join());
}遇到特別多任務(wù)的情況下,你可以嘗試數(shù)組
CompletableFuture[] array = new CompletableFuture[2];
array[0] = completableFuture1;
array[1] = completableFuture2;
CompletableFuture<Void> all = CompletableFuture.allOf(array);
System.out.println("返回這個 null 之后意味著全部任務(wù)已完成:" + all.join());合并任務(wù)
合并任務(wù)會涉及到 Compose、Combine,他們區(qū)別在于合并的邏輯不同:
- Compose: 合并的兩個任務(wù)間是同步阻塞執(zhí)行的,后一個任務(wù)需要阻塞等待第一個任務(wù)執(zhí)行完成。你需要傳入一個函數(shù) —— 已知第一個任務(wù)的返回值,返回合并之后的 CompletableFuture 對象
- Combine: 合并的兩個任務(wù)間是異步執(zhí)行的。你需要傳入另一個任務(wù)、一個函數(shù) —— 已知兩個任務(wù)的返回值,合并成最終返回值
public static void 合并() {
Long startTs = System.currentTimeMillis();
// 模擬一個耗時 100 L 的任務(wù)
CompletableFuture<Long> completableFuture1 = CompletableFuture.supplyAsync(() -> {
sleep(100L);
System.out.println("completableFuture1 完成" + " --" + (System.currentTimeMillis() - startTs));
return 100L;
});
// 模擬一個耗時 100 L 的任務(wù)
CompletableFuture<Long> completableFuture2 = CompletableFuture.supplyAsync(() -> {
sleep(100L);
System.out.println("completableFuture2 完成" + " --" + (System.currentTimeMillis() - startTs));
return 100L;
});
// 模擬 completableFuture1 合并一個耗時 120 L 的任務(wù),返回值為兩個任務(wù)總工時
// thenCombineAsync 代表另起一個線程去執(zhí)行第二個入?yún)⒌拇a塊,這里其實沒啥影響,我就不加 Async 了
CompletableFuture<Long> completableFuture3 = completableFuture1.thenCombine(CompletableFuture.supplyAsync(() -> {
sleep(120L);
return 120L;
}), (x, y) -> x + y);
// 模擬 completableFuture2 合并一個耗時 50 L 的任務(wù),返回值為兩個任務(wù)總工時
// thenComposeAsync 代表另起一個線程去執(zhí)行第一個入?yún)⒌拇a塊,這里其實沒啥影響,我就不加 Async 了
CompletableFuture<Long> completableFuture4 = completableFuture2.thenCompose(r2 -> {
CompletableFuture<Long> temp = CompletableFuture.completedFuture(r2);
return temp.thenApply(rTemp -> {
sleep(50L);
return temp.join() + 50L;
});
});
boolean printFlag3 = true;
boolean printFlag4 = true;
String completableFuture3Info = null;
String completableFuture4Info = null;
while (!completableFuture3.isDone() || !completableFuture4.isDone()) {
if (completableFuture3.isDone()) {
if (printFlag3) {
printFlag3 = false;
completableFuture3Info = "completableFuture3 完成:" + completableFuture3.join() + " --" + (System.currentTimeMillis() - startTs);
}
}
if (completableFuture4.isDone()) {
if (printFlag4) {
printFlag4 = false;
completableFuture4Info = "completableFuture4 完成:" + completableFuture4.join() + " --" + (System.currentTimeMillis() - startTs);
}
}
}
System.out.println(completableFuture3Info != null ? completableFuture3Info : "completableFuture3 actual:" + completableFuture3.getNow(-100L) + " --" + (System.currentTimeMillis() - startTs));
System.out.println(completableFuture4Info != null ? completableFuture4Info : "completableFuture4 actual:" + completableFuture4.getNow(-100L) + " --" + (System.currentTimeMillis() - startTs));
}你會觀測到總工時更長的反而實際結(jié)束時間點更早,completableFuture3 早于 completableFuture4
到此這篇關(guān)于Java多線程工具CompletableFuture詳解的文章就介紹到這了,更多相關(guān)多線程工具CompletableFuture內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Feign如何解決服務(wù)之間調(diào)用傳遞token
這篇文章主要介紹了Feign如何解決服務(wù)之間調(diào)用傳遞token,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03
JPA @GeneratedValue 四種標準用法TABLE,SEQUENCE,IDENTITY,
這篇文章主要介紹了@GeneratedValue 四種標準用法TABLE,SEQUENCE,IDENTITY,AUTO詳解,需要的朋友可以參考下2024-03-03
關(guān)于Java異常處理的幾條建議_動力節(jié)點Java學(xué)院整理
Java提供了拋出異常、捕捉異常和finally語句的使用來處理程序異常,下面就來具體看一下關(guān)于Java異常處理的幾條建議2017-06-06
利用Java的Struts框架實現(xiàn)電子郵件發(fā)送功能
這篇文章主要介紹了利用Java的Struts框架實現(xiàn)電子郵件發(fā)送功能,Struts框架是Java的SSH三大web開發(fā)框架之一,需要的朋友可以參考下2015-12-12
Java設(shè)計模式之模板方法模式Template Method Pattern詳解
在我們實際開發(fā)中,如果一個方法極其復(fù)雜時,如果我們將所有的邏輯寫在一個方法中,那維護起來就很困難,要替換某些步驟時都要重新寫,這樣代碼的擴展性就很差,當(dāng)遇到這種情況就要考慮今天的主角——模板方法模式2022-11-11
Spring Boot實現(xiàn)郵件發(fā)送必會的5種姿勢
這篇文章主要給大家介紹了關(guān)于Spring Boot實現(xiàn)郵件發(fā)送必會的5種姿勢,文中通過示例代碼介紹的非常詳細,對大家學(xué)習(xí)或者使用Spring Boot具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-07-07
MyBatis實現(xiàn)兩種查詢樹形數(shù)據(jù)的方法詳解(嵌套結(jié)果集和遞歸查詢)
樹形結(jié)構(gòu)數(shù)據(jù)在開發(fā)中十分常見,比如:菜單數(shù)、組織樹, 利用 MyBatis 提供嵌套查詢功能可以很方便地實現(xiàn)這個功能需求。本文主要介紹了兩種方法,感興趣的可以了解一下2021-09-09

