java CompletableFuture實(shí)現(xiàn)異步編排詳解
前言
為什么需要異步執(zhí)行?
場景:電商系統(tǒng)中獲取一個(gè)完整的商品信息可能分為以下幾步:
①獲取商品基本信息
②獲取商品圖片信息
③獲取商品促銷活動信息
④獲取商品各種類的基本信息 等操作,如果使用串行方式去執(zhí)行這些操作,假設(shè)每個(gè)操作執(zhí)行1s,那么用戶看到完整的商品詳情就需要4s的時(shí)間,如果使用并行方式執(zhí)行這些操作,可能只需要1s就可以完成。所以這就是異步執(zhí)行的好處。
JDK5的Future
接口
Future
接口用于代表異步計(jì)算的結(jié)果,通過Future接口提供的方法可以查看異步計(jì)算是否執(zhí)行完成,或者等待執(zhí)行結(jié)果并獲取執(zhí)行結(jié)果,同時(shí)還可以取消執(zhí)行。
列舉Future
接口的方法:
get()
:獲取任務(wù)執(zhí)行結(jié)果,如果任務(wù)還沒完成則會阻塞等待直到任務(wù)執(zhí)行完成。如果任務(wù)被取消則會拋出CancellationException
異常,如果任務(wù)執(zhí)行過程發(fā)生異常則會拋出ExecutionException
異常,如果阻塞等待過程中被中斷則會拋出InterruptedException
異常。get(long timeout,Timeunit unit)
:帶超時(shí)時(shí)間的get()
方法,如果阻塞等待過程中超時(shí)則會拋出TimeoutException
異常。cancel()
:用于取消異步任務(wù)的執(zhí)行。如果異步任務(wù)已經(jīng)完成或者已經(jīng)被取消,或者由于某些原因不能取消,則會返回false。如果任務(wù)還沒有被執(zhí)行,則會返回true并且異步任務(wù)不會被執(zhí)行。如果任務(wù)已經(jīng)開始執(zhí)行了但是還沒有執(zhí)行完成,若mayInterruptIfRunning
為true,則會立即中斷執(zhí)行任務(wù)的線程并返回true,若mayInterruptIfRunning
為false,則會返回true且不會中斷任務(wù)執(zhí)行線程。isCanceled()
:判斷任務(wù)是否被取消,如果任務(wù)在結(jié)束(正常執(zhí)行結(jié)束或者執(zhí)行異常結(jié)束)前被取消則返回true,否則返回false。isDone()
:判斷任務(wù)是否已經(jīng)完成,如果完成則返回true,否則返回false。需要注意的是:任務(wù)執(zhí)行過程中發(fā)生異常、任務(wù)被取消也屬于任務(wù)已完成,也會返回true。
使用Future
接口和Callable
接口實(shí)現(xiàn)異步執(zhí)行:
public static void main(String[] args) { // 快速創(chuàng)建線程池 ExecutorService executorService = Executors.newFixedThreadPool(4); // 獲取商品基本信息(可以使用Lambda表達(dá)式簡化Callable接口,這里為了便于觀察不使用) Future<String> future1 = executorService.submit(new Callable<String>() { @Override public String call() throws Exception { return "獲取到商品基本信息"; } }); // 獲取商品圖片信息 Future<String> future2 = executorService.submit(new Callable<String>() { @Override public String call() throws Exception { return "獲取商品圖片信息"; } }); // 獲取商品促銷信息 Future<String> future3 = executorService.submit(new Callable<String>() { @Override public String call() throws Exception { return "獲取商品促銷信息"; } }); // 獲取商品各種類基本信息 Future<String> future4 = executorService.submit(new Callable<String>() { @Override public String call() throws Exception { return "獲取商品各種類基本信息"; } }); // 獲取結(jié)果 try { System.out.println(future1.get()); System.out.println(future2.get()); System.out.println(future3.get()); System.out.println(future4.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }finally { executorService.shutdown(); } }
既然Future可以實(shí)現(xiàn)異步執(zhí)行并獲取結(jié)果,為什么還會需要CompletableFuture?
簡述一下Future接口的弊端:
- 不支持手動完成
- 當(dāng)提交了一個(gè)任務(wù),但是執(zhí)行太慢了,通過其他路徑已經(jīng)獲取到了任務(wù)結(jié)果,現(xiàn)在沒法把這個(gè)任務(wù)結(jié)果通知到正在執(zhí)行的線程,所以必須主動取消或者一直等待它執(zhí)行完成。
- 不支持進(jìn)一步的非阻塞調(diào)用
- 通過Future的
get()
方法會一直阻塞到任務(wù)完成,但是想在獲取任務(wù)之后執(zhí)行額外的任務(wù),因?yàn)?Future 不支持回調(diào)函數(shù),所以無法實(shí)現(xiàn)這個(gè)功能。
- 通過Future的
- 不支持鏈?zhǔn)秸{(diào)用
- 對于Future的執(zhí)行結(jié)果,想繼續(xù)傳到下一個(gè)Future處理使用,從而形成一個(gè)鏈?zhǔn)降膒ipline調(diào)用,這在 Future中無法實(shí)現(xiàn)。
- 不支持多個(gè) Future 合并
- 比如有10個(gè)Future并行執(zhí)行,想在所有的Future運(yùn)行完畢之后,執(zhí)行某些函數(shù),是無法通過Future實(shí)現(xiàn)的。
- 不支持異常處理
- Future的API沒有任何的異常處理的api,所以在異步運(yùn)行時(shí),如果出了異常問題不好定位。
使用Future接口可以通過get()
阻塞式獲取結(jié)果或者通過輪詢+isDone()
非阻塞式獲取結(jié)果,但是前一種方法會阻塞,后一種會耗費(fèi)CPU資源,所以JDK的Future接口實(shí)現(xiàn)異步執(zhí)行對獲取結(jié)果不太友好,所以在JDK8時(shí)推出了CompletableFuture實(shí)現(xiàn)異步編排。
CompletableFuture的使用
CompletableFuture概述
JDK8中新增加了一個(gè)包含50個(gè)方法左右的類CompletableFuture,提供了非常強(qiáng)大的Future的擴(kuò)展功能,可以幫助我們簡化異步編程的復(fù)雜性,提供了函數(shù)式編程的能力,可以通過回調(diào)的方式處理計(jì)算結(jié)果,并且提供了轉(zhuǎn)換和組合CompletableFuture的方法。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
CompletableFuture
類實(shí)現(xiàn)了Future
接口和CompletionStage
接口,即除了可以使用Future
接口的所有方法之外,CompletionStage<T>
接口提供了更多方法來更好的實(shí)現(xiàn)異步編排,并且大量的使用了JDK8引入的函數(shù)式編程概念。后面會細(xì)致的介紹常用的API。
① 創(chuàng)建CompletableFuture的方式
使用new
關(guān)鍵字創(chuàng)建
// 無返回結(jié)果 CompletableFuture<String> completableFuture = new CompletableFuture<>(); // 已知返回結(jié)果 CompletableFuture<String> completableFuture = new CompletableFuture<>("result"); // 已知返回結(jié)果(底層其實(shí)也是帶參數(shù)的構(gòu)造器賦值) CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("result");
創(chuàng)建一個(gè)返回結(jié)果類型為String的CompletableFuture,可以使用Future
接口的get()
方法獲取該值(同樣也會阻塞)。
可以使用無參構(gòu)造器返回一個(gè)沒有結(jié)果的CompletableFuture,也可以通過構(gòu)造器的傳參CompletableFuture設(shè)置好返回結(jié)果,或者使用CompletableFuture.completedFuture(U value)
構(gòu)造一個(gè)已知結(jié)果的CompletableFuture。
使用CompletableFuture類的靜態(tài)工廠方法(常用)
runAsync()
無返回值
// 使用默認(rèn)線程池 public static CompletableFuture<Void> runAsync(Runnable runnable) // 使用自定義線程池(推薦) public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
runAsync()
方法的參數(shù)是Runnable接口,這是一個(gè)函數(shù)式接口,不允許返回值。當(dāng)需要異步操作且不關(guān)心返回結(jié)果的時(shí)候可以使用runAsync()
方法。
// 例子 public static void main(String[] args) { // 快速創(chuàng)建線程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { // 通過Lambda表達(dá)式實(shí)現(xiàn)Runnable接口 CompletableFuture.runAsync(()-> System.out.println("獲取商品基本信息成功"), executor).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }finally { executor.shutdown(); } }
supplyAsync()
有返回值
// 使用默認(rèn)線程池 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) // 使用自定義線程池(推薦) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
supplyAsync()
方法的參數(shù)是Supplier<U>
供給型接口(無參有返回值),這也是一個(gè)函數(shù)式接口,U
是返回結(jié)果值的類型。當(dāng)需要異步操作且關(guān)心返回結(jié)果的時(shí)候,可以使用supplyAsync()
方法。
// 例子 public static void main(String[] args) { // 快速創(chuàng)建線程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { // 通過Lambda表達(dá)式實(shí)現(xiàn)執(zhí)行內(nèi)容,并返回結(jié)果通過CompletableFuture接收 CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("獲取商品信息成功"); return "信息"; }, executor); // 輸出結(jié)果 System.out.println(completableFuture.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }finally { executor.shutdown(); } }
關(guān)于第二個(gè)參數(shù)Executor executor
說明
在沒有指定第二個(gè)參數(shù)(即沒有指定線程池)時(shí),CompletableFuture直接使用默認(rèn)的ForkJoinPool.commonPool()
作為它的線程池執(zhí)行異步代碼。
在實(shí)際生產(chǎn)中會使用自定義的線程池來執(zhí)行異步代碼,具體可以參考另一篇文章深入理解線程池ThreadPoolExecutor ,里面的第二節(jié)有生產(chǎn)中怎么創(chuàng)建自定義線程的例子,可以參考一下。
② 獲得異步執(zhí)行結(jié)果
get()
阻塞式獲取執(zhí)行結(jié)果
該方法調(diào)用后如果任務(wù)還沒完成則會阻塞等待直到任務(wù)執(zhí)行完成。如果任務(wù)執(zhí)行過程發(fā)生異常則會拋出ExecutionException
異常,如果阻塞等待過程中被中斷則會拋出InterruptedException
異常。
get(long timeout, TimeUnit unit)
帶超時(shí)的阻塞式獲取執(zhí)行結(jié)果
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
該方法調(diào)用后如果如果任務(wù)還沒完成則會阻塞等待直到任務(wù)執(zhí)行完成或者超出timeout時(shí)間,如果阻塞等待過程中超時(shí)則會拋出TimeoutException
異常。
getNow(T valueIfAbsent)
立刻獲取執(zhí)行結(jié)果
public T getNow(T valueIfAbsent)
該方法調(diào)用后,會立刻獲取結(jié)果不會阻塞等待。如果任務(wù)完成則直接返回執(zhí)行完成后的結(jié)果,如果任務(wù)沒有完成,則返回調(diào)用方法時(shí)傳入的參數(shù)valueIfAbsent
值。
join()
不拋異常的阻塞時(shí)獲取執(zhí)行結(jié)果
public T join()
該方法和get()
方法作用一樣,只是不會拋出異常。
complete(T value)
主動觸發(fā)計(jì)算,返回異步是否執(zhí)行完畢
public boolean complete(T value)
該方法調(diào)用后,會主動觸發(fā)計(jì)算結(jié)果,如果此時(shí)異步執(zhí)行并沒有完成(此時(shí)boolean值返回true),則通過get()
拿到的數(shù)據(jù)會是complete()
設(shè)置的參數(shù)value
值,如果此時(shí)異步執(zhí)行已經(jīng)完成(此時(shí)boolean值返回false),則通過get()
拿到的就是執(zhí)行完成的結(jié)果。
// 例子 public static void main(String[] args) { // 快速創(chuàng)建線程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { // 通過Lambda表達(dá)式實(shí)現(xiàn)執(zhí)行內(nèi)容,并返回結(jié)果通過CompletableFuture接收 CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { // 休眠2秒,使得異步執(zhí)行變慢,會導(dǎo)致主動觸發(fā)計(jì)算先執(zhí)行,此時(shí)返回的get就是555 try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return 666; }, executor); // 主動觸發(fā)計(jì)算,判斷異步執(zhí)行是否完成 System.out.println(completableFuture.complete(555)); // 輸出結(jié)果 System.out.println(completableFuture.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 輸出結(jié)果: true 555 **/
③ 對執(zhí)行結(jié)果進(jìn)行處理
whenComplete
等待前面任務(wù)執(zhí)行完再執(zhí)行當(dāng)前處理
public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action)
在創(chuàng)建好的初始任務(wù)或者是上一個(gè)任務(wù)后通過鏈?zhǔn)秸{(diào)用該方法,會在之前任務(wù)執(zhí)行完成后繼續(xù)執(zhí)行whenComplete
里的內(nèi)容(whenComplete
傳入的action只是對之前任務(wù)的結(jié)果進(jìn)行處理),即使用該方法可以避免前面說到的Future
接口的問題,不再需要通過阻塞或者輪詢的方式去獲取結(jié)果,而是通過調(diào)用該方法等任務(wù)執(zhí)行完畢自動調(diào)用。
該方法的參數(shù)為BiConsumer<? super T, ? super Throwable> action
消費(fèi)者接口,可以接收兩個(gè)參數(shù),一個(gè)是任務(wù)執(zhí)行完的結(jié)果,一個(gè)是執(zhí)行任務(wù)時(shí)的異常。
// 例子 public static void main(String[] args) { // 快速創(chuàng)建線程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture.supplyAsync(() -> 666, executor) .whenComplete((res, ex) -> System.out.println("任務(wù)執(zhí)行完畢,結(jié)果為" + res + " 異常為" + ex) ); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 輸出結(jié)果: 任務(wù)執(zhí)行完畢,結(jié)果為666 異常為null **/
除了上述的方法外,還有一些類似的方法如XXXAsync()
或者是XXXAsync(XX,Executor executor)
,對于這些方法,這里統(tǒng)一說明,后續(xù)文章中將不會再列舉
public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action) public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor)
XXXAsync()
:表示上一個(gè)任務(wù)執(zhí)行完成后,不會再使用之前任務(wù)中的線程,而是重新使用從默認(rèn)線程(ForkJoinPool 線程池)中重新獲取新的線程執(zhí)行當(dāng)前任務(wù)。
XXXAsync(XX,Executor executor)
:表示不會沿用之前任務(wù)的線程,而是使用自己第二個(gè)參數(shù)指定的線程池重新獲取線程執(zhí)行當(dāng)前任務(wù)。
④ 對執(zhí)行結(jié)果進(jìn)行消費(fèi)
thenRun
前面任務(wù)執(zhí)行完后執(zhí)行當(dāng)前任務(wù),不關(guān)心前面任務(wù)的結(jié)果,也沒返回值
public CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture.supplyAsync(actionA).thenRun(actionB)
像這樣鏈?zhǔn)秸{(diào)用該方法表示:執(zhí)行任務(wù)A完成后接著執(zhí)行任務(wù)B,但是任務(wù)B不需要A的結(jié)果,并且執(zhí)行完任務(wù)B也不會返回結(jié)果。
thenRun(Runnable action)
的參數(shù)為Runnable接口即沒有傳入?yún)?shù)。
// 例子 public static void main(String[] args) { // 快速創(chuàng)建線程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture.supplyAsync(() -> 666, executor) .thenRun(() -> System.out.println("我都沒有參數(shù)怎么拿到之前的結(jié)果,我也沒有返回值。") ); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 輸出結(jié)果: 我都沒有參數(shù)怎么拿到之前的結(jié)果,我也沒有返回值。 **/
thenAccept
前面任務(wù)執(zhí)行完后執(zhí)行當(dāng)前任務(wù),消費(fèi)前面的結(jié)果,沒有返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture.supplyAsync(actionA).thenRun(actionB)
像這樣鏈?zhǔn)秸{(diào)用該方法表示:執(zhí)行任務(wù)A完成后接著執(zhí)行任務(wù)B,而且任務(wù)B需要A的結(jié)果,但是執(zhí)行完任務(wù)B不會返回結(jié)果。
thenAccept(Consumer<? super T> action)
的參數(shù)為消費(fèi)者接口,即可以傳入一個(gè)參數(shù),該參數(shù)為上一個(gè)任務(wù)的執(zhí)行結(jié)果。
// 例子 public static void main(String[] args) { // 快速創(chuàng)建線程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture.supplyAsync(() -> 666, executor) .thenAccept((res) -> System.out.println("我能拿到上一個(gè)的結(jié)果" + res + ",但是我沒法傳出去。") ); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 輸出結(jié)果: 我能拿到上一個(gè)的結(jié)果666,但是我沒法傳出去。 **/
thenApply
前面任務(wù)執(zhí)行完后執(zhí)行當(dāng)前任務(wù),消費(fèi)前面的結(jié)果,具有返回值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
CompletableFuture.supplyAsync(actionA).thenRun(actionB)
像這樣鏈?zhǔn)秸{(diào)用該方法表示:執(zhí)行任務(wù)A完成后接著執(zhí)行任務(wù)B,而且任務(wù)B需要A的結(jié)果,并且執(zhí)行完任務(wù)B需要有返回結(jié)果。
thenApply(Function<? super T,? extends U> fn)
的參數(shù)為函數(shù)式接口,即可以傳入一個(gè)參數(shù)類型為T,該參數(shù)是上一個(gè)任務(wù)的執(zhí)行結(jié)果,并且函數(shù)式接口需要有返回值,類型為U。
// 例子 public static void main(String[] args) { // 快速創(chuàng)建線程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture.supplyAsync(() -> 666, executor) .thenApply((res) -> { System.out.println("我能拿到上一個(gè)的結(jié)果" + res + "并且我要將結(jié)果傳出去"); return res; } ).whenComplete((res, ex) -> System.out.println("結(jié)果" + res)); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 輸出結(jié)果: 我能拿到上一個(gè)的結(jié)果666并且我要將結(jié)果傳出去 結(jié)果666 **/
⑤ 異常處理
exceptionally
異常捕獲,只消費(fèi)前面任務(wù)中出現(xiàn)的異常信息,具有返回值
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
可以通過鏈?zhǔn)秸{(diào)用該方法來獲取異常信息,并且具有返回值。如果某一個(gè)任務(wù)出現(xiàn)異常被exceptionally
捕獲到則剩余的任務(wù)將不會再執(zhí)行。類似于Java異常處理的catch。
exceptionally(Function<Throwable, ? extends T> fn)
的參數(shù)是函數(shù)式接口,具有一個(gè)參數(shù)以及返回值,該參數(shù)為前面任務(wù)的異常信息。
// 例子 public static void main(String[] args) { // 快速創(chuàng)建線程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) throw new RuntimeException("error"); return 666; }, executor) .thenApply((res) -> { System.out.println("不出現(xiàn)異常,結(jié)果為" + res); return res; }).exceptionally((ex) -> { ex.printStackTrace(); return -1; }); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 輸出結(jié)果: // 這是不出現(xiàn)異常的情況 不出現(xiàn)異常,結(jié)果為666 // 這是出現(xiàn)異常的情況 java.util.concurrent.CompletionException: java.lang.RuntimeException: error at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.RuntimeException: error at com.xqsr.review.thread.ThreadTest.lambda$main$0(ThreadTest.java:15) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ... 3 more **/
handle
異常處理,消費(fèi)前面的結(jié)果及異常信息,具有返回值,不會中斷后續(xù)任務(wù)
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
可以通過鏈?zhǔn)秸{(diào)用該方法可以跟thenApply()
一樣可以消費(fèi)前面任務(wù)的結(jié)果并完成自己任務(wù)內(nèi)容,并且具有返回值。不同之處在于出現(xiàn)異常也可以接著往下執(zhí)行,根據(jù)異常參數(shù)做進(jìn)一步處理。
handle(BiFunction<? super T, Throwable, ? extends U> fn)
的參數(shù)是消費(fèi)者接口,一個(gè)參數(shù)是任務(wù)執(zhí)行結(jié)果,一個(gè)是異常信息,并且具有返回值。
// 例子 public static void main(String[] args) { // 快速創(chuàng)建線程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture.supplyAsync(() -> 666, executor) .thenApply((res) -> { if (Math.random() < 0.5) throw new RuntimeException("error"); return res; }).handle((res, ex) -> { System.out.println("結(jié)果" + res + "(null表示之前出現(xiàn)異常導(dǎo)致結(jié)果無法傳過來)"); return res == null ? -1 : res; }).thenApply((res) -> { System.out.println("結(jié)果為" + res + "(-1表示之前出現(xiàn)異常,經(jīng)過handler使得結(jié)果處理成-1)"); return res; }).exceptionally((ex) -> { ex.printStackTrace(); return -1; }); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 輸出結(jié)果: // 這是不出現(xiàn)異常的情況 結(jié)果666(null表示之前出現(xiàn)異常導(dǎo)致結(jié)果無法傳過來) 結(jié)果為666(-1表示之前出現(xiàn)異常,經(jīng)過handler使得結(jié)果處理成-1) // 這是出現(xiàn)異常的情況 結(jié)果null(null表示之前出現(xiàn)異常導(dǎo)致結(jié)果無法傳過來) 結(jié)果為-1(-1表示之前出現(xiàn)異常,經(jīng)過handler使得結(jié)果處理成-1) **/
可以看到通過handle
類似于Java異常處理的finally,出現(xiàn)異常并不會像使用exceptionally
那樣中斷后續(xù)的任務(wù),而是繼續(xù)執(zhí)行,可以通過handle為之前出現(xiàn)異常無法獲得的結(jié)果重新賦值(根據(jù)業(yè)務(wù)需求設(shè)置安全值之類的)。
⑥ 兩組任務(wù)按順序執(zhí)行
thenCompose
實(shí)現(xiàn)兩組任務(wù)按前后順序執(zhí)行
public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn)
A.thenCompose(B)
相當(dāng)于任務(wù)A要排在任務(wù)B前面,即順序的執(zhí)行任務(wù)A、任務(wù)B。該方法的參數(shù)是函數(shù)式接口,函數(shù)式接口的參數(shù)是調(diào)用者的執(zhí)行結(jié)果,返回值是另一個(gè)任務(wù)B。
public static void main(String[] args) { // 快速創(chuàng)建線程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> { System.out.println("任務(wù)A先執(zhí)行結(jié)果為666"); return 666; }, executor); actionA.thenCompose((res) -> CompletableFuture.supplyAsync(() -> { System.out.println("任務(wù)B后執(zhí)行結(jié)果加上333"); return 333 + res; })).whenComplete((res, ex) -> System.out.println(res)); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 輸出結(jié)果: 任務(wù)A先執(zhí)行結(jié)果為666 任務(wù)B后執(zhí)行結(jié)果加上333 999 **/
⑦ 兩組任務(wù)誰快用誰
applyToEither
比較兩組任務(wù)執(zhí)行速度,誰快消費(fèi)誰的執(zhí)行結(jié)果
public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn)
該方法用于比較兩組任務(wù)的執(zhí)行速度,誰先執(zhí)行完就用誰的執(zhí)行結(jié)果。
傳入?yún)?shù)說明:第一個(gè)參數(shù)傳入的是另一個(gè)任務(wù)的執(zhí)行內(nèi)容,第二個(gè)參數(shù)傳入的是最終這兩個(gè)任務(wù)誰快返回誰的結(jié)果,并通過當(dāng)前函數(shù)式接口進(jìn)行接收和處理(使用函數(shù)式接口,有參且有返回值)。
// 例子 public static void main(String[] args) { // 快速創(chuàng)建線程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任務(wù)A等待久一點(diǎn),執(zhí)行結(jié)果為555"); return 555; }, executor); actionA.applyToEither(CompletableFuture.supplyAsync(() -> { System.out.println("任務(wù)B很快,執(zhí)行結(jié)果為666"); return 666; }), (res) -> { System.out.println("最終使用的執(zhí)行結(jié)果為" + res); return res; }); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 輸出結(jié)果: 任務(wù)B很快,執(zhí)行結(jié)果為666 最終使用的執(zhí)行結(jié)果為666 任務(wù)A等待久一點(diǎn),執(zhí)行結(jié)果為555 **/
除了applyToEither
對任務(wù)最終結(jié)果進(jìn)行獲取并消費(fèi),并且具有返回值的方法外,還有兩個(gè)類似的方法。
// 這個(gè)方法效果和上面的一樣,比誰快拿誰的結(jié)果,不同的是這個(gè)方法只消費(fèi)不具有返回值 public CompletableFuture<Void> acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action)
// 這個(gè)方法效果和上面的一樣,比誰快拿誰的結(jié)果,不同的是這個(gè)方法不消費(fèi)結(jié)果也不具有返回值 public CompletableFuture<Void> runAfterEither( CompletionStage<?> other, Runnable action)
⑧ 兩組任務(wù)完成后合并
thenCombine
等待兩組任務(wù)執(zhí)行完畢后,合并兩組任務(wù)的執(zhí)行結(jié)果
public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
該方法用于兩組任務(wù)都完成后,將兩組任務(wù)的執(zhí)行結(jié)果一起交給當(dāng)前方法的BiFunction處理。先完成的任務(wù)會等待后者任務(wù)完成。
傳入?yún)?shù)說明:第一個(gè)參數(shù)傳入的是另一個(gè)任務(wù)的執(zhí)行內(nèi)容,第二個(gè)參數(shù)傳入的是帶兩個(gè)參數(shù)的函數(shù)式接口(第一個(gè)參數(shù)是任務(wù)1的執(zhí)行結(jié)果,第二個(gè)參數(shù)是任務(wù)2的執(zhí)行結(jié)果,具有返回值)。
// 例子 public static void main(String[] args) { // 快速創(chuàng)建線程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任務(wù)A等待久一點(diǎn),執(zhí)行結(jié)果為333"); return 333; }, executor); CompletableFuture<Integer> actionB = CompletableFuture.supplyAsync(() -> { System.out.println("任務(wù)B很快,執(zhí)行結(jié)果為666"); return 666; }, executor); actionA.thenCombine(actionB, (res1, res2) -> { System.out.println("最終使用的執(zhí)行結(jié)果為" + (res1 + res2)); return res1 + res2; }); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 輸出結(jié)果: 任務(wù)B很快,執(zhí)行結(jié)果為666 任務(wù)A等待久一點(diǎn),執(zhí)行結(jié)果為333 最終使用的執(zhí)行結(jié)果為999 **/
除了thenCombine
對任務(wù)最終結(jié)果進(jìn)行獲取并消費(fèi),并且具有返回值的方法外,還有兩個(gè)類似的方法。
// 這個(gè)方法效果和上面的一樣,獲取合并結(jié)果,不同的是這個(gè)方法只消費(fèi)不具有返回值 public <U> CompletableFuture<Void> thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
// 這個(gè)方法效果和上面的一樣,獲取合并結(jié)果,不同的是這個(gè)方法不消費(fèi)結(jié)果也不具有返回值 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
⑨ 多任務(wù)組合
allOf
實(shí)現(xiàn)并行地執(zhí)行多個(gè)任務(wù),等待所有任務(wù)執(zhí)行完成(無需考慮執(zhí)行順序)
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
該方法可以實(shí)現(xiàn)并行地執(zhí)行多個(gè)任務(wù),適用于多個(gè)任務(wù)沒有依賴關(guān)系,可以互相獨(dú)立執(zhí)行的,傳入?yún)?shù)為多個(gè)任務(wù),沒有返回值。
allOf()
方法會等待所有的任務(wù)執(zhí)行完畢再返回,可以通過get()
阻塞確保所有任務(wù)執(zhí)行完畢
// 例子 public static void main(String[] args) { // 快速創(chuàng)建線程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture<Void> actionA = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任務(wù)A等待2秒后執(zhí)行完畢"); }, executor); CompletableFuture<Void> actionB = CompletableFuture.runAsync(() -> { System.out.println("任務(wù)B很快執(zhí)行完畢"); }, executor); CompletableFuture<Void> actionC = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任務(wù)C等待1秒后執(zhí)行完畢"); }, executor); CompletableFuture<Void> actionD = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任務(wù)D等待5秒后執(zhí)行完畢"); }, executor); CompletableFuture.allOf(actionA, actionB, actionC, actionD).get(); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 輸出結(jié)果: 任務(wù)B很快執(zhí)行完畢 任務(wù)C等待1秒后執(zhí)行完畢 任務(wù)A等待2秒后執(zhí)行完畢 任務(wù)D等待5秒后執(zhí)行完畢 **/
anyOf
實(shí)現(xiàn)并行地執(zhí)行多個(gè)任務(wù),只要有個(gè)一個(gè)完成的便會返回執(zhí)行結(jié)果
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
該方法可以實(shí)現(xiàn)并行地執(zhí)行多個(gè)任務(wù),傳入?yún)?shù)為多個(gè)任務(wù),具有返回值。該方法不會等待所有任務(wù)執(zhí)行完成后再返回結(jié)果,而是當(dāng)有一個(gè)任務(wù)完成時(shí),便會返回那個(gè)任務(wù)的執(zhí)行結(jié)果。
// 例子 public static void main(String[] args) { // 快速創(chuàng)建線程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任務(wù)A等待2秒后執(zhí)行完畢"); return 555; }, executor); CompletableFuture<Integer> actionB = CompletableFuture.supplyAsync(() -> { System.out.println("任務(wù)B很快執(zhí)行完畢"); return 666; }, executor); CompletableFuture<Integer> actionC = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任務(wù)C等待1秒后執(zhí)行完畢"); return 999; }, executor); CompletableFuture<Integer> actionD = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任務(wù)D等待5秒后執(zhí)行完畢"); return 888; }, executor); System.out.println("最先執(zhí)行完的返回結(jié)果為" + CompletableFuture.anyOf(actionA, actionB, actionC, actionD).get()); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 輸出結(jié)果: 任務(wù)B很快執(zhí)行完畢 最先執(zhí)行完的返回結(jié)果為666 任務(wù)C等待1秒后執(zhí)行完畢 任務(wù)A等待2秒后執(zhí)行完畢 任務(wù)D等待5秒后執(zhí)行完畢 **/
一個(gè)使用CompletableFuture異步編排的例子
不需要關(guān)心例子中的業(yè)務(wù)內(nèi)容,使用時(shí)按照自己業(yè)務(wù)的需求,對不同的需求調(diào)用不同API即可。
編寫任務(wù)時(shí)主要關(guān)心以下幾點(diǎn):
① 是否需要消費(fèi)之前任務(wù)的結(jié)果
② 是否需要返回結(jié)果給其他任務(wù)消費(fèi)
③ 是否要求順序執(zhí)行(是否允許并行,有沒有前置要求)
/** * 該方法用于獲取單個(gè)商品的所有信息 * 1. 商品的基本信息 * 2. 商品的圖片信息 * 3. 商品的銷售屬性組合 * 4. 商品的各種分類基本信息 * 5. 商品的促銷信息 */ @Override public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException { // 創(chuàng)建商品Vo通過各個(gè)任務(wù)去完善Vo的信息 SkuItemVo skuItemVo = new SkuItemVo(); // 獲取商品基本信息 查詢到后設(shè)置進(jìn)Vo中,返回基本信息給后續(xù)任務(wù)消費(fèi) (使用自定義的線程池進(jìn)行異步) CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> { SkuInfoEntity info = this.getById(skuId); skuItemVo.setInfo(info); return info; }, executor); // 獲取商品的圖片信息 獲取后設(shè)置進(jìn)Vo中,此處不需要消費(fèi)圖片信息,也不需要返回結(jié)果。所以使用runAsync即可 CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> { List<SkuImagesEntity> imagesEntities = skuImagesService.getImagesBySkuId(skuId); skuItemVo.setImages(imagesEntities); }, executor); // 獲取商品銷售屬性 因?yàn)橐弥安樵兊降幕拘畔?,但后續(xù)任務(wù)不需要消費(fèi)銷售屬性(不需要返回結(jié)果),所以使用thenAcceptAsync消費(fèi)之前的基本信息,不返回銷售信息。 CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> { List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId()); skuItemVo.setSaleAttr(saleAttrVos); }, executor); // 獲取商品各分類基本信息,同樣要消費(fèi)之前的基本信息,但無需返回,所以使用thenAcceptAsync即可 CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> { SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId()); skuItemVo.setDesc(spuInfoDescEntity); }, executor); // 獲取商品的促銷信息 這個(gè)也不需要消費(fèi)之前任務(wù)的結(jié)果,也不需要返回結(jié)果。所以直接使用runAsync即可 CompletableFuture<Void> seckillFuture = CompletableFuture.runAsync(() -> { R skuSeckilInfo = seckillFeignService.getSkuSeckilInfo(skuId); if (skuSeckilInfo.getCode() == 0) { SeckillSkuVo seckilInfoData = skuSeckilInfo.getData("data", new TypeReference<SeckillSkuVo>() { }); skuItemVo.setSeckillSkuVo(seckilInfoData); if (seckilInfoData != null) { long currentTime = System.currentTimeMillis(); if (currentTime > seckilInfoData.getEndTime()) { skuItemVo.setSeckillSkuVo(null); } } } }, executor); // 使用allOf()組合所有任務(wù),并且使用get()阻塞,等待所有任務(wù)完成。 // 補(bǔ)充:infoFuture不能放入allOf中,因?yàn)閍llOf是并行無序執(zhí)行(需要多個(gè)條件是無依賴性的)的,當(dāng)上面任務(wù)中有需要消費(fèi)infoFuture的結(jié)果,所以需要先執(zhí)行infoFuture。 CompletableFuture.allOf(imageFuture,saleAttrFuture,descFuture,seckillFuture).get(); // 最后返回商品Vo return skuItemVo; }
以上就是java CompletableFuture實(shí)現(xiàn)異步編排詳解的詳細(xì)內(nèi)容,更多關(guān)于java CompletableFuture異步編排的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Mybatis mapper接口動態(tài)代理開發(fā)步驟解析
這篇文章主要介紹了Mybatis mapper接口動態(tài)代理開發(fā)步驟解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07Java postgresql數(shù)組字段類型處理方法詳解
這篇文章主要介紹了Java postgresql數(shù)組字段類型處理方法,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-10-10SpringBoot動態(tài)定時(shí)功能實(shí)現(xiàn)方案詳解
在SpringBoot項(xiàng)目中簡單使用定時(shí)任務(wù),不過由于要借助cron表達(dá)式且都提前定義好放在配置文件里,不能在項(xiàng)目運(yùn)行中動態(tài)修改任務(wù)執(zhí)行時(shí)間,實(shí)在不太靈活?,F(xiàn)在我們就來實(shí)現(xiàn)可以動態(tài)修改cron表達(dá)式的定時(shí)任務(wù),感興趣的可以了解一下2022-11-11elasticsearch通過guice注入Node組裝啟動過程
這篇文章主要為大家介紹了?elasticsearch通過guice注入Node組裝啟動過程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-04-04Java數(shù)據(jù)結(jié)構(gòu)(線性表)詳解
本文主要介紹了Java數(shù)據(jù)結(jié)構(gòu)(線性表)的相關(guān)知識。具有很好的參考價(jià)值,下面跟著小編一起來看下吧2017-01-01Java并發(fā)編程之ReadWriteLock讀寫鎖的操作方法
這篇文章主要介紹了Java并發(fā)編程之ReadWriteLock讀寫鎖的操作方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-02-02spring boot項(xiàng)目application.properties文件存放及使用介紹
這篇文章主要介紹了spring boot項(xiàng)目application.properties文件存放及使用介紹,我們的application.properties文件中會有很多敏感信息,大家在使用過程中要多加小心2021-06-06