亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

解讀CompletableFuture異步多線程的使用方式

 更新時間:2024年07月11日 10:32:28   作者:熊出沒  
這篇文章主要介紹了CompletableFuture異步多線程的使用方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

一、一個示例回顧Future

一些業(yè)務(wù)場景我們需要使用多線程異步執(zhí)行任務(wù),加快任務(wù)執(zhí)行速度。

JDK5新增了Future接口,用于描述一個異步計算的結(jié)果。

雖然 Future 以及相關(guān)使用方法提供了異步執(zhí)行任務(wù)的能力,但是對于結(jié)果的獲取卻是很不方便,我們必須使用Future.get()的方式阻塞調(diào)用線程,或者使用輪詢方式判斷 Future.isDone 任務(wù)是否結(jié)束,再獲取結(jié)果。

這兩種處理方式都不是很優(yōu)雅,相關(guān)代碼如下:

@Test
public void testFuture() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Future<String> future = executorService.submit(() -> {
Thread.sleep(2000);
return "hello";
});
System.out.println(future.get());
System.out.println("end");
}

與此同時,F(xiàn)uture無法解決多個異步任務(wù)需要相互依賴的場景,簡單點說就是,主線程需要等待子線程任務(wù)執(zhí)行完畢之后在進行執(zhí)行,這個時候你可能想到了CountDownLatch,沒錯確實可以解決,代碼如下。

這里定義兩個Future,第一個通過用戶id獲取用戶信息,第二個通過商品id獲取商品信息。

@Test
public void testCountDownLatch() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
CountDownLatch downLatch = new CountDownLatch(2);
long startTime = System.currentTimeMillis();
Future<String> userFuture = executorService.submit(() -> {
//模擬查詢商品耗時500毫秒
Thread.sleep(500);
downLatch.countDown();
return "用戶A";
});

Future<String> goodsFuture = executorService.submit(() -> {
//模擬查詢商品耗時500毫秒
Thread.sleep(400);
downLatch.countDown();
return "商品A";
});

downLatch.await();
//模擬主程序耗時時間
Thread.sleep(600);
System.out.println("獲取用戶信息:" + userFuture.get());
System.out.println("獲取商品信息:" + goodsFuture.get());
System.out.println("總共用時" + (System.currentTimeMillis() - startTime) + "ms");

}

執(zhí)行結(jié)果如下所示:

從運行結(jié)果可以看出結(jié)果都已經(jīng)獲取,而且如果我們不用異步操作,執(zhí)行時間應(yīng)該是:500+400+600 = 1500,用異步操作后實際只用1110。

但是Java8以后我不在認為這是一種優(yōu)雅的解決方式,接下來來了解下CompletableFuture的使用。

二、通過CompletableFuture實現(xiàn)上面的示例

@Test
public void testCompletableInfo() throws InterruptedException, ExecutionException {
long startTime = System.currentTimeMillis();

//調(diào)用用戶服務(wù)獲取用戶基本信息
CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() ->
//模擬查詢商品耗時500毫秒
{
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "用戶A";
});

//調(diào)用商品服務(wù)獲取商品基本信息
CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() ->
//模擬查詢商品耗時500毫秒
{
try {
Thread.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "商品A";
});

System.out.println("獲取用戶信息:" + userFuture.get());
System.out.println("獲取商品信息:" + goodsFuture.get());

//模擬主程序耗時時間
Thread.sleep(600);
System.out.println("總共用時" + (System.currentTimeMillis() - startTime) + "ms");
}

執(zhí)行結(jié)果如下:

通過CompletableFuture可以很輕松的實現(xiàn)CountDownLatch的功能,你以為這就結(jié)束了,遠遠不止,CompletableFuture比這要強多了。

比如可以實現(xiàn):任務(wù)1執(zhí)行完了再執(zhí)行任務(wù)2,甚至任務(wù)1執(zhí)行的結(jié)果,作為任務(wù)2的入?yún)?shù)等等強大功能,下面就來學學CompletableFuture的API。

三、CompletableFuture的創(chuàng)建方式

3.1 常用的4種創(chuàng)建方式

CompletableFuture源碼中有四個靜態(tài)方法用來執(zhí)行異步任務(wù)

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}
public static CompletableFuture<Void> runAsync(Runnable runnable){..}
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}

一般我們用上面的靜態(tài)方法來創(chuàng)建CompletableFuture,這里也解釋下他們的區(qū)別:

supplyAsync」執(zhí)行任務(wù),支持返回值。

runAsync」執(zhí)行任務(wù),沒有返回值。

  • supplyAsync方法」
//使用默認內(nèi)置線程池ForkJoinPool.commonPool(),根據(jù)supplier構(gòu)建執(zhí)行任務(wù)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//自定義線程,根據(jù)supplier構(gòu)建執(zhí)行任務(wù)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
  • runAsync方法」
//使用默認內(nèi)置線程池ForkJoinPool.commonPool(),根據(jù)runnable構(gòu)建執(zhí)行任務(wù)
public static CompletableFuture<Void> runAsync(Runnable runnable)
//自定義線程,根據(jù)runnable構(gòu)建執(zhí)行任務(wù)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

3.2 獲取結(jié)果的4種方式

CompltableFuture類提供了四種方式

//方式一
public T get()
//方式二
public T get(long timeout, TimeUnit unit)
//方式三
public T getNow(T valueIfAbsent)
//方式四
public T join()

說明:

  • 「get()和get(long timeout, TimeUnit unit)」 => 在Future中就已經(jīng)提供了,后者提供超時處理,如果在指定時間內(nèi)未獲取結(jié)果將拋出超時異常
  • 「getNow」 => 立即獲取結(jié)果不阻塞,結(jié)果計算已完成將返回結(jié)果或計算過程中的異常,如果未計算完成將返回設(shè)定的valueIfAbsent值
  • 「join」 => 方法里不會拋出異常
@Test
public void testCompletableGet() throws InterruptedException, ExecutionException {
CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "商品A";
});

// getNow方法測試
System.out.println(cp1.getNow("商品B"));

//join方法測試
CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((() -> 1 / 0));
System.out.println(cp2.join());
System.out.println("-----------------------------------------------------");
//get方法測試
CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((() -> 1 / 0));
System.out.println(cp3.get());
}

「運行結(jié)果」

第一個執(zhí)行結(jié)果為 「商品B」,因為要先睡上1秒結(jié)果不能立即獲取

join方法獲取結(jié)果方法里不會拋異常,但是執(zhí)行結(jié)果會拋異常,拋出的異常為CompletionException

get方法獲取結(jié)果方法里將拋出異常,執(zhí)行結(jié)果拋出的異常為ExecutionException

四、異步回調(diào)方法

4.1 thenRun/thenRunAsync

通俗點講就是,「做完第一個任務(wù)后,再做第二個任務(wù),第二個任務(wù)也沒有返回值」。

示例如下所示:

@Test
public void testCompletableThenRunAsync() throws InterruptedException, ExecutionException {
long startTime = System.currentTimeMillis();

CompletableFuture<Void> cp1 = CompletableFuture.runAsync(() -> {
try {
//執(zhí)行任務(wù)A
Thread.sleep(600);
} catch (InterruptedException e) {
e.printStackTrace();
}

});

CompletableFuture<Void> cp2 = cp1.thenRun(() -> {
try {
//執(zhí)行任務(wù)B
Thread.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

// get方法測試
System.out.println(cp2.get());

//模擬主程序耗時時間
Thread.sleep(600);
System.out.println("總共用時" + (System.currentTimeMillis() - startTime) + "ms");
}

結(jié)果如下:

總共用時1610ms

「thenRun 和thenRunAsync有什么區(qū)別呢?」

如果你執(zhí)行第一個任務(wù)的時候,傳入了一個自定義線程池:

調(diào)用thenRun方法執(zhí)行第二個任務(wù)時,則第二個任務(wù)和第一個任務(wù)是共用同一個線程池。

調(diào)用thenRunAsync執(zhí)行第二個任務(wù)時,則第一個任務(wù)使用的是你自己傳入的線程池,第二個任務(wù)使用的是ForkJoin線程池。

說明: 后面介紹的thenAcceptthenAcceptAsync,thenApplythenApplyAsync等,它們之間的區(qū)別也是這個。

4.2 thenAccept/thenAcceptAsync

第一個任務(wù)執(zhí)行完成后,執(zhí)行第二個回調(diào)方法任務(wù),會將第一個任務(wù)的執(zhí)行結(jié)果,作為入?yún)?/strong>,傳遞到第二個回調(diào)任務(wù)方法中,但是回調(diào)方法是沒有返回值的。

代碼示例:

@Test
public void testCompletableThenAccept() throws ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
return "dev";

});
CompletableFuture<Void> cp2 = cp1.thenAccept((a) -> {
System.out.println("上一個任務(wù)的返回結(jié)果為: " + a);
});
cp2.get();
}

結(jié)果:

上一個任務(wù)的返回結(jié)果為: dev

4.3 thenApply/thenApplyAsync

表示第一個任務(wù)執(zhí)行完成后,執(zhí)行第二個回調(diào)方法任務(wù),會將該任務(wù)的執(zhí)行結(jié)果,作為入?yún)?,傳遞到回調(diào)方法中,并且回調(diào)方法是有返回值的。

示例如下:

@Test
public void testCompletableThenApply() throws ExecutionException, InterruptedException {
CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
return "dev";

}).thenApply((a) -> {
if(Objects.equals(a,"dev")){
return "dev";
}
return "prod";
});
System.out.println("當前環(huán)境為:" + cp1.get());
}

結(jié)果如下:

當前環(huán)境為:dev

五、異常回調(diào)

CompletableFuture的任務(wù)不論是正常完成還是出現(xiàn)異常它都會調(diào)用 「whenComplete」這回調(diào)函數(shù)。

  • 「正常完成」:whenComplete返回結(jié)果和上級任務(wù)一致,異常為null;
  • 「出現(xiàn)異?!?/strong>:whenComplete返回結(jié)果為null,異常為上級任務(wù)的異常;

即調(diào)用get()時,正常完成時就獲取到結(jié)果,出現(xiàn)異常時就會拋出異常,需要你處理該異常。

下面看示例:

5.1 只用whenComplete

@Test
public void testCompletableWhenComplete() throws ExecutionException, InterruptedException {
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {

if (Math.random() < 0.5) {
throw new RuntimeException("出錯了");
}
System.out.println("正常結(jié)束");
return 0.11;

}).whenComplete((aDouble, throwable) -> {
if (aDouble == null) {
System.out.println("whenComplete aDouble is null");
} else {
System.out.println("whenComplete aDouble is " + aDouble);
}
if (throwable == null) {
System.out.println("whenComplete throwable is null");
} else {
System.out.println("whenComplete throwable is " + throwable.getMessage());
}
});
System.out.println("最終返回的結(jié)果 = " + future.get());
}

正常完成,沒有異常時:

出現(xiàn)異常時:get()會拋出異常

5.2 whenComplete + exceptionally示例

@Test
public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException {
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("出錯了");
}
System.out.println("正常結(jié)束");
return 0.11;

}).whenComplete((aDouble, throwable) -> {
if (aDouble == null) {
System.out.println("whenComplete aDouble is null");
} else {
System.out.println("whenComplete aDouble is " + aDouble);
}
if (throwable == null) {
System.out.println("whenComplete throwable is null");
} else {
System.out.println("whenComplete throwable is " + throwable.getMessage());
}
}).exceptionally((throwable) -> {
System.out.println("exceptionally中異常:" + throwable.getMessage());
return 0.0;
});

System.out.println("最終返回的結(jié)果 = " + future.get());
}

當出現(xiàn)異常時,exceptionally中會捕獲該異常,給出默認返回值0.0。

六、多任務(wù)組合回調(diào)(重點學習,任務(wù)編排)

6.1 AND組合關(guān)系

thenCombine / thenAcceptBoth / runAfterBoth都表示:「當任務(wù)一和任務(wù)二都完成再執(zhí)行任務(wù)三」。

區(qū)別在于:

  • 「runAfterBoth」 不會把執(zhí)行結(jié)果當做方法入?yún)?,且沒有返回值
  • 「thenAcceptBoth」: 會將兩個任務(wù)的執(zhí)行結(jié)果作為方法入?yún)ⅲ瑐鬟f到指定方法中,且無返回值
  • 「thenCombine」:會將兩個任務(wù)的執(zhí)行結(jié)果作為方法入?yún)?,傳遞到指定方法中,且有返回值
@Test
public void testCompletableThenCombine() throws ExecutionException, InterruptedException {
//創(chuàng)建線程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
//開啟異步任務(wù)1
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
System.out.println("異步任務(wù)1,當前線程是:" + Thread.currentThread().getId());
int result = 1 + 1;
System.out.println("異步任務(wù)1結(jié)束");
return result;
}, executorService);

//開啟異步任務(wù)2
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("異步任務(wù)2,當前線程是:" + Thread.currentThread().getId());
int result = 1 + 1;
System.out.println("異步任務(wù)2結(jié)束");
return result;
}, executorService);

//任務(wù)組合
CompletableFuture<Integer> task3 = task.thenCombineAsync(task2, (f1, f2) -> {
System.out.println("執(zhí)行任務(wù)3,當前線程是:" + Thread.currentThread().getId());
System.out.println("任務(wù)1返回值:" + f1);
System.out.println("任務(wù)2返回值:" + f2);
return f1 + f2;
}, executorService);

Integer res = task3.get();
System.out.println("最終結(jié)果:" + res);
}

結(jié)果如下:

6.2 OR組合關(guān)系

applyToEither / acceptEither / runAfterEither 都表示:「兩個任務(wù),只要有一個任務(wù)完成,就執(zhí)行任務(wù)三」。

區(qū)別在于:

  • 「runAfterEither」:不會把執(zhí)行結(jié)果當做方法入?yún)?,且沒有返回值
  • 「acceptEither」: 會將已經(jīng)執(zhí)行完成的任務(wù),作為方法入?yún)?,傳遞到指定方法中,且無返回值
  • 「applyToEither」:會將已經(jīng)執(zhí)行完成的任務(wù),作為方法入?yún)?,傳遞到指定方法中,且有返回值

代碼示例:

@Test
public void testCompletableEitherAsync() {
//創(chuàng)建線程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
//開啟異步任務(wù)1
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
System.out.println("異步任務(wù)1,當前線程是:" + Thread.currentThread().getId());

int result = 1 + 1;
System.out.println("異步任務(wù)1結(jié)束");
return result;
}, executorService);

//開啟異步任務(wù)2
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("異步任務(wù)2,當前線程是:" + Thread.currentThread().getId());
int result = 1 + 2;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("異步任務(wù)2結(jié)束");
return result;
}, executorService);

//任務(wù)組合
task.acceptEitherAsync(task2, (res) -> {
System.out.println("執(zhí)行任務(wù)3,當前線程是:" + Thread.currentThread().getId());
System.out.println("上一個任務(wù)的結(jié)果為:"+res);
}, executorService);
}

運行結(jié)果如下:

通過結(jié)果可以看出,異步任務(wù)2都沒有執(zhí)行結(jié)束,任務(wù)3獲取的也是1的執(zhí)行結(jié)果 。

注意

如果把上面的核心線程數(shù)改為1也就是

ExecutorService executorService = Executors.newFixedThreadPool(1);

結(jié)果如下,可以看到任務(wù)3根本沒執(zhí)行,直接丟棄

6.3 多任務(wù)組合

  • 「allOf」:等待所有任務(wù)完成
  • 「anyOf」:只要有一個任務(wù)完成

示例

  • allOf:等待所有任務(wù)完成
@Test
public void testCompletableAallOf() throws ExecutionException, InterruptedException {
//創(chuàng)建線程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
//開啟異步任務(wù)1
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
System.out.println("異步任務(wù)1,當前線程是:" + Thread.currentThread().getId());
int result = 1 + 1;
System.out.println("異步任務(wù)1結(jié)束");
return result;
}, executorService);

//開啟異步任務(wù)2
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("異步任務(wù)2,當前線程是:" + Thread.currentThread().getId());
int result = 1 + 2;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("異步任務(wù)2結(jié)束");
return result;
}, executorService);

//開啟異步任務(wù)3
CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {
System.out.println("異步任務(wù)3,當前線程是:" + Thread.currentThread().getId());
int result = 1 + 3;
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("異步任務(wù)3結(jié)束");
return result;
}, executorService);

//任務(wù)組合
CompletableFuture<Void> allOf = CompletableFuture.allOf(task, task2, task3);

//等待所有任務(wù)完成
allOf.get();
//獲取任務(wù)的返回結(jié)果
System.out.println("task結(jié)果為:" + task.get());
System.out.println("task2結(jié)果為:" + task2.get());
System.out.println("task3結(jié)果為:" + task3.get());
}
  • anyOf: 只要有一個任務(wù)完成

結(jié)果如下,可以看到第一個異步任務(wù)完成后就執(zhí)行完主線程了

七、CompletableFuture使用有哪些注意點

CompletableFuture 使我們的異步編程更加便利的、代碼更加優(yōu)雅的同時,我們也要關(guān)注下它,使用的一些注意點。

7.1 Future需要獲取返回值,才能獲取異常信息

注意事項:

join()方法拋出的是unchecked異常(即RuntimeException),不會強制開發(fā)者拋出,會將異常包裝成CompletionException異常 /CancellationException異常,但是本質(zhì)原因還是代碼內(nèi)存在的真正的異常,在運行時會拋出

get()方法拋出的是經(jīng)過檢查的異常,ExecutionException, InterruptedException 需要用戶手動處理(拋出或者 try catch)

7.2 CompletableFuture的get()方法是阻塞的

CompletableFutureget()方法是阻塞的,如果使用它來獲取異步調(diào)用的返回值,需要添加超時時間。

7.3、不建議使用默認線程池

CompletableFuture代碼中又使用了默認的 「ForkJoin線程池」,處理的線程個數(shù)是電腦 「CPU核數(shù)-1」。

在大量請求過來的時候,處理邏輯復雜的話,響應(yīng)會很慢。

一般建議使用自定義線程池,優(yōu)化線程池配置參數(shù)。

7.4、自定義線程池時,注意飽和策略

CompletableFuture的get()方法是阻塞的,我們一般建議使用future.get(5, TimeUnit.SECONDS)。并且一般建議使用自定義線程池。

但是如果線程池拒絕策略是DiscardPolicy或者DiscardOldestPolicy,當線程池飽和時,會直接丟棄任務(wù),不會拋棄異常。因此建議,CompletableFuture線程池策略最好使用AbortPolicy,然后耗時的異步線程,做好線程池隔離。

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評論