解讀CompletableFuture異步多線程的使用方式
一、一個示例回顧Future
一些業(yè)務(wù)場景我們需要使用多線程異步執(zhí)行任務(wù),加快任務(wù)執(zhí)行速度。
JDK5新增了Future
接口,用于描述一個異步計算的結(jié)果。
雖然 Future 以及相關(guān)使用方法提供了異步執(zhí)行任務(wù)的能力,但是對于結(jié)果的獲取卻是很不方便,我們必須使用Future.get()
的方式阻塞調(diào)用線程,或者使用輪詢方式判斷 Future.isDone
任務(wù)是否結(jié)束,再獲取結(jié)果。
這兩種處理方式都不是很優(yōu)雅,相關(guān)代碼如下:
@Test public void testFuture() throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(5); Future<String> future = executorService.submit(() -> { Thread.sleep(2000); return "hello"; }); System.out.println(future.get()); System.out.println("end"); }
與此同時,F(xiàn)uture無法解決多個異步任務(wù)需要相互依賴的場景,簡單點說就是,主線程需要等待子線程任務(wù)執(zhí)行完畢之后在進行執(zhí)行,這個時候你可能想到了CountDownLatch
,沒錯確實可以解決,代碼如下。
這里定義兩個Future,第一個通過用戶id獲取用戶信息,第二個通過商品id獲取商品信息。
@Test public void testCountDownLatch() throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(5); CountDownLatch downLatch = new CountDownLatch(2); long startTime = System.currentTimeMillis(); Future<String> userFuture = executorService.submit(() -> { //模擬查詢商品耗時500毫秒 Thread.sleep(500); downLatch.countDown(); return "用戶A"; }); Future<String> goodsFuture = executorService.submit(() -> { //模擬查詢商品耗時500毫秒 Thread.sleep(400); downLatch.countDown(); return "商品A"; }); downLatch.await(); //模擬主程序耗時時間 Thread.sleep(600); System.out.println("獲取用戶信息:" + userFuture.get()); System.out.println("獲取商品信息:" + goodsFuture.get()); System.out.println("總共用時" + (System.currentTimeMillis() - startTime) + "ms"); }
執(zhí)行結(jié)果如下所示:
從運行結(jié)果可以看出結(jié)果都已經(jīng)獲取,而且如果我們不用異步操作,執(zhí)行時間應(yīng)該是:500+400+600 = 1500
,用異步操作后實際只用1110。
但是Java8以后我不在認為這是一種優(yōu)雅的解決方式,接下來來了解下CompletableFuture
的使用。
二、通過CompletableFuture實現(xiàn)上面的示例
@Test public void testCompletableInfo() throws InterruptedException, ExecutionException { long startTime = System.currentTimeMillis(); //調(diào)用用戶服務(wù)獲取用戶基本信息 CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> //模擬查詢商品耗時500毫秒 { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return "用戶A"; }); //調(diào)用商品服務(wù)獲取商品基本信息 CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() -> //模擬查詢商品耗時500毫秒 { try { Thread.sleep(400); } catch (InterruptedException e) { e.printStackTrace(); } return "商品A"; }); System.out.println("獲取用戶信息:" + userFuture.get()); System.out.println("獲取商品信息:" + goodsFuture.get()); //模擬主程序耗時時間 Thread.sleep(600); System.out.println("總共用時" + (System.currentTimeMillis() - startTime) + "ms"); }
執(zhí)行結(jié)果如下:
通過CompletableFuture
可以很輕松的實現(xiàn)CountDownLatch
的功能,你以為這就結(jié)束了,遠遠不止,CompletableFuture
比這要強多了。
比如可以實現(xiàn):任務(wù)1執(zhí)行完了再執(zhí)行任務(wù)2,甚至任務(wù)1執(zhí)行的結(jié)果,作為任務(wù)2的入?yún)?shù)等等強大功能,下面就來學學CompletableFuture
的API。
三、CompletableFuture的創(chuàng)建方式
3.1 常用的4種創(chuàng)建方式
CompletableFuture
源碼中有四個靜態(tài)方法用來執(zhí)行異步任務(wù)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..} public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..} public static CompletableFuture<Void> runAsync(Runnable runnable){..} public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}
一般我們用上面的靜態(tài)方法來創(chuàng)建CompletableFuture
,這里也解釋下他們的區(qū)別:
supplyAsync」執(zhí)行任務(wù),支持返回值。
runAsync」執(zhí)行任務(wù),沒有返回值。
- supplyAsync方法」
//使用默認內(nèi)置線程池ForkJoinPool.commonPool(),根據(jù)supplier構(gòu)建執(zhí)行任務(wù) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) //自定義線程,根據(jù)supplier構(gòu)建執(zhí)行任務(wù) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
- runAsync方法」
//使用默認內(nèi)置線程池ForkJoinPool.commonPool(),根據(jù)runnable構(gòu)建執(zhí)行任務(wù) public static CompletableFuture<Void> runAsync(Runnable runnable) //自定義線程,根據(jù)runnable構(gòu)建執(zhí)行任務(wù) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
3.2 獲取結(jié)果的4種方式
CompltableFuture
類提供了四種方式
//方式一 public T get() //方式二 public T get(long timeout, TimeUnit unit) //方式三 public T getNow(T valueIfAbsent) //方式四 public T join()
說明:
- 「get()和get(long timeout, TimeUnit unit)」 => 在Future中就已經(jīng)提供了,后者提供超時處理,如果在指定時間內(nèi)未獲取結(jié)果將拋出超時異常
- 「getNow」 => 立即獲取結(jié)果不阻塞,結(jié)果計算已完成將返回結(jié)果或計算過程中的異常,如果未計算完成將返回設(shè)定的valueIfAbsent值
- 「join」 => 方法里不會拋出異常
@Test public void testCompletableGet() throws InterruptedException, ExecutionException { CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "商品A"; }); // getNow方法測試 System.out.println(cp1.getNow("商品B")); //join方法測試 CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((() -> 1 / 0)); System.out.println(cp2.join()); System.out.println("-----------------------------------------------------"); //get方法測試 CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((() -> 1 / 0)); System.out.println(cp3.get()); }
「運行結(jié)果」:
第一個執(zhí)行結(jié)果為 「商品B」,因為要先睡上1秒結(jié)果不能立即獲取
join方法獲取結(jié)果方法里不會拋異常,但是執(zhí)行結(jié)果會拋異常,拋出的異常為CompletionException
get方法獲取結(jié)果方法里將拋出異常,執(zhí)行結(jié)果拋出的異常為ExecutionException
四、異步回調(diào)方法
4.1 thenRun/thenRunAsync
通俗點講就是,「做完第一個任務(wù)后,再做第二個任務(wù),第二個任務(wù)也沒有返回值」。
示例如下所示:
@Test public void testCompletableThenRunAsync() throws InterruptedException, ExecutionException { long startTime = System.currentTimeMillis(); CompletableFuture<Void> cp1 = CompletableFuture.runAsync(() -> { try { //執(zhí)行任務(wù)A Thread.sleep(600); } catch (InterruptedException e) { e.printStackTrace(); } }); CompletableFuture<Void> cp2 = cp1.thenRun(() -> { try { //執(zhí)行任務(wù)B Thread.sleep(400); } catch (InterruptedException e) { e.printStackTrace(); } }); // get方法測試 System.out.println(cp2.get()); //模擬主程序耗時時間 Thread.sleep(600); System.out.println("總共用時" + (System.currentTimeMillis() - startTime) + "ms"); }
結(jié)果如下:
總共用時1610ms
「thenRun 和thenRunAsync有什么區(qū)別呢?」
如果你執(zhí)行第一個任務(wù)的時候,傳入了一個自定義線程池:
調(diào)用thenRun方法執(zhí)行第二個任務(wù)時,則第二個任務(wù)和第一個任務(wù)是共用同一個線程池。
調(diào)用thenRunAsync
執(zhí)行第二個任務(wù)時,則第一個任務(wù)使用的是你自己傳入的線程池,第二個任務(wù)使用的是ForkJoin線程池。
說明: 后面介紹的thenAccept
和thenAcceptAsync
,thenApply
和thenApplyAsync
等,它們之間的區(qū)別也是這個。
4.2 thenAccept/thenAcceptAsync
第一個任務(wù)執(zhí)行完成后,執(zhí)行第二個回調(diào)方法任務(wù),會將第一個任務(wù)的執(zhí)行結(jié)果,作為入?yún)?/strong>,傳遞到第二個回調(diào)任務(wù)方法中,但是回調(diào)方法是沒有返回值的。
代碼示例:
@Test public void testCompletableThenAccept() throws ExecutionException, InterruptedException { long startTime = System.currentTimeMillis(); CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { return "dev"; }); CompletableFuture<Void> cp2 = cp1.thenAccept((a) -> { System.out.println("上一個任務(wù)的返回結(jié)果為: " + a); }); cp2.get(); }
結(jié)果:
上一個任務(wù)的返回結(jié)果為: dev
4.3 thenApply/thenApplyAsync
表示第一個任務(wù)執(zhí)行完成后,執(zhí)行第二個回調(diào)方法任務(wù),會將該任務(wù)的執(zhí)行結(jié)果,作為入?yún)?,傳遞到回調(diào)方法中,并且回調(diào)方法是有返回值的。
示例如下:
@Test public void testCompletableThenApply() throws ExecutionException, InterruptedException { CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { return "dev"; }).thenApply((a) -> { if(Objects.equals(a,"dev")){ return "dev"; } return "prod"; }); System.out.println("當前環(huán)境為:" + cp1.get()); }
結(jié)果如下:
當前環(huán)境為:dev
五、異常回調(diào)
當CompletableFuture
的任務(wù)不論是正常完成還是出現(xiàn)異常它都會調(diào)用 「whenComplete」這回調(diào)函數(shù)。
- 「正常完成」:whenComplete返回結(jié)果和上級任務(wù)一致,異常為null;
- 「出現(xiàn)異?!?/strong>:whenComplete返回結(jié)果為null,異常為上級任務(wù)的異常;
即調(diào)用get()
時,正常完成時就獲取到結(jié)果,出現(xiàn)異常時就會拋出異常,需要你處理該異常。
下面看示例:
5.1 只用whenComplete
@Test public void testCompletableWhenComplete() throws ExecutionException, InterruptedException { CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("出錯了"); } System.out.println("正常結(jié)束"); return 0.11; }).whenComplete((aDouble, throwable) -> { if (aDouble == null) { System.out.println("whenComplete aDouble is null"); } else { System.out.println("whenComplete aDouble is " + aDouble); } if (throwable == null) { System.out.println("whenComplete throwable is null"); } else { System.out.println("whenComplete throwable is " + throwable.getMessage()); } }); System.out.println("最終返回的結(jié)果 = " + future.get()); }
正常完成,沒有異常時:
出現(xiàn)異常時:get()會拋出異常
5.2 whenComplete + exceptionally示例
@Test public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException { CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("出錯了"); } System.out.println("正常結(jié)束"); return 0.11; }).whenComplete((aDouble, throwable) -> { if (aDouble == null) { System.out.println("whenComplete aDouble is null"); } else { System.out.println("whenComplete aDouble is " + aDouble); } if (throwable == null) { System.out.println("whenComplete throwable is null"); } else { System.out.println("whenComplete throwable is " + throwable.getMessage()); } }).exceptionally((throwable) -> { System.out.println("exceptionally中異常:" + throwable.getMessage()); return 0.0; }); System.out.println("最終返回的結(jié)果 = " + future.get()); }
當出現(xiàn)異常時,exceptionally
中會捕獲該異常,給出默認返回值0.0。
六、多任務(wù)組合回調(diào)(重點學習,任務(wù)編排)
6.1 AND組合關(guān)系
thenCombine
/ thenAcceptBoth
/ runAfterBoth
都表示:「當任務(wù)一和任務(wù)二都完成再執(zhí)行任務(wù)三」。
區(qū)別在于:
- 「runAfterBoth」 不會把執(zhí)行結(jié)果當做方法入?yún)?,且沒有返回值
- 「thenAcceptBoth」: 會將兩個任務(wù)的執(zhí)行結(jié)果作為方法入?yún)ⅲ瑐鬟f到指定方法中,且無返回值
- 「thenCombine」:會將兩個任務(wù)的執(zhí)行結(jié)果作為方法入?yún)?,傳遞到指定方法中,且有返回值
@Test public void testCompletableThenCombine() throws ExecutionException, InterruptedException { //創(chuàng)建線程池 ExecutorService executorService = Executors.newFixedThreadPool(10); //開啟異步任務(wù)1 CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> { System.out.println("異步任務(wù)1,當前線程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("異步任務(wù)1結(jié)束"); return result; }, executorService); //開啟異步任務(wù)2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("異步任務(wù)2,當前線程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("異步任務(wù)2結(jié)束"); return result; }, executorService); //任務(wù)組合 CompletableFuture<Integer> task3 = task.thenCombineAsync(task2, (f1, f2) -> { System.out.println("執(zhí)行任務(wù)3,當前線程是:" + Thread.currentThread().getId()); System.out.println("任務(wù)1返回值:" + f1); System.out.println("任務(wù)2返回值:" + f2); return f1 + f2; }, executorService); Integer res = task3.get(); System.out.println("最終結(jié)果:" + res); }
結(jié)果如下:
6.2 OR組合關(guān)系
applyToEither
/ acceptEither
/ runAfterEither
都表示:「兩個任務(wù),只要有一個任務(wù)完成,就執(zhí)行任務(wù)三」。
區(qū)別在于:
- 「runAfterEither」:不會把執(zhí)行結(jié)果當做方法入?yún)?,且沒有返回值
- 「acceptEither」: 會將已經(jīng)執(zhí)行完成的任務(wù),作為方法入?yún)?,傳遞到指定方法中,且無返回值
- 「applyToEither」:會將已經(jīng)執(zhí)行完成的任務(wù),作為方法入?yún)?,傳遞到指定方法中,且有返回值
代碼示例:
@Test public void testCompletableEitherAsync() { //創(chuàng)建線程池 ExecutorService executorService = Executors.newFixedThreadPool(10); //開啟異步任務(wù)1 CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> { System.out.println("異步任務(wù)1,當前線程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("異步任務(wù)1結(jié)束"); return result; }, executorService); //開啟異步任務(wù)2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("異步任務(wù)2,當前線程是:" + Thread.currentThread().getId()); int result = 1 + 2; try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("異步任務(wù)2結(jié)束"); return result; }, executorService); //任務(wù)組合 task.acceptEitherAsync(task2, (res) -> { System.out.println("執(zhí)行任務(wù)3,當前線程是:" + Thread.currentThread().getId()); System.out.println("上一個任務(wù)的結(jié)果為:"+res); }, executorService); }
運行結(jié)果如下:
通過結(jié)果可以看出,異步任務(wù)2都沒有執(zhí)行結(jié)束,任務(wù)3獲取的也是1的執(zhí)行結(jié)果 。
注意
如果把上面的核心線程數(shù)改為1也就是
ExecutorService executorService = Executors.newFixedThreadPool(1);
結(jié)果如下,可以看到任務(wù)3根本沒執(zhí)行,直接丟棄
6.3 多任務(wù)組合
- 「allOf」:等待所有任務(wù)完成
- 「anyOf」:只要有一個任務(wù)完成
示例
- allOf:等待所有任務(wù)完成
@Test public void testCompletableAallOf() throws ExecutionException, InterruptedException { //創(chuàng)建線程池 ExecutorService executorService = Executors.newFixedThreadPool(10); //開啟異步任務(wù)1 CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> { System.out.println("異步任務(wù)1,當前線程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("異步任務(wù)1結(jié)束"); return result; }, executorService); //開啟異步任務(wù)2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("異步任務(wù)2,當前線程是:" + Thread.currentThread().getId()); int result = 1 + 2; try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("異步任務(wù)2結(jié)束"); return result; }, executorService); //開啟異步任務(wù)3 CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> { System.out.println("異步任務(wù)3,當前線程是:" + Thread.currentThread().getId()); int result = 1 + 3; try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("異步任務(wù)3結(jié)束"); return result; }, executorService); //任務(wù)組合 CompletableFuture<Void> allOf = CompletableFuture.allOf(task, task2, task3); //等待所有任務(wù)完成 allOf.get(); //獲取任務(wù)的返回結(jié)果 System.out.println("task結(jié)果為:" + task.get()); System.out.println("task2結(jié)果為:" + task2.get()); System.out.println("task3結(jié)果為:" + task3.get()); }
- anyOf: 只要有一個任務(wù)完成
結(jié)果如下,可以看到第一個異步任務(wù)完成后就執(zhí)行完主線程了
七、CompletableFuture使用有哪些注意點
CompletableFuture
使我們的異步編程更加便利的、代碼更加優(yōu)雅的同時,我們也要關(guān)注下它,使用的一些注意點。
7.1 Future需要獲取返回值,才能獲取異常信息
注意事項:
join()方法拋出的是unchecked異常(即RuntimeException),不會強制開發(fā)者拋出,會將異常包裝成CompletionException異常 /CancellationException異常,但是本質(zhì)原因還是代碼內(nèi)存在的真正的異常,在運行時會拋出
get()方法拋出的是經(jīng)過檢查的異常,ExecutionException, InterruptedException 需要用戶手動處理(拋出或者 try catch)
7.2 CompletableFuture的get()方法是阻塞的
CompletableFuture
的get()
方法是阻塞的,如果使用它來獲取異步調(diào)用的返回值,需要添加超時時間。
7.3、不建議使用默認線程池
CompletableFuture
代碼中又使用了默認的 「ForkJoin線程池」,處理的線程個數(shù)是電腦 「CPU核數(shù)-1」。
在大量請求過來的時候,處理邏輯復雜的話,響應(yīng)會很慢。
一般建議使用自定義線程池,優(yōu)化線程池配置參數(shù)。
7.4、自定義線程池時,注意飽和策略
CompletableFuture
的get()方法是阻塞的,我們一般建議使用future.get(5, TimeUnit.SECONDS)
。并且一般建議使用自定義線程池。
但是如果線程池拒絕策略是DiscardPolicy
或者DiscardOldestPolicy
,當線程池飽和時,會直接丟棄任務(wù),不會拋棄異常。因此建議,CompletableFuture
線程池策略最好使用AbortPolicy
,然后耗時的異步線程,做好線程池隔離。
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
servlet的url-pattern匹配規(guī)則詳細描述(小結(jié))
在利用servlet或Filter進行url請求的匹配時,很關(guān)鍵的一點就是匹配規(guī)則。這篇文章主要介紹了servlet的url-pattern匹配規(guī)則詳細描述(小結(jié)),非常具有實用價值,需要的朋友可以參考下2018-07-07SpringBoot + Mybatis-plus實戰(zhàn)之Mybatis-plus的一級緩存、二級緩存
這篇文章主要介紹了SpringBoot + Mybatis-plus實戰(zhàn)之Mybatis-plus的一級緩存、二級緩存,本文通過實例圖文相結(jié)合給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-12-12java eclipse 出現(xiàn) xxx cannot be resolved to a type 錯誤解決方法
這篇文章主要介紹了java eclipse 出現(xiàn) xxx cannot be resolved to a type 錯誤解決方法的相關(guān)資料,需要的朋友可以參考下2017-03-03java使用Hex編碼解碼實現(xiàn)Aes加密解密功能示例
這篇文章主要介紹了java使用Hex編碼解碼實現(xiàn)Aes加密解密功能,結(jié)合完整實例形式分析了Aes加密解密功能的定義與使用方法,需要的朋友可以參考下2017-01-01