Java多線程工具CompletableFuture的使用教程
前言
Future的問題
寫多線程程序的時候,可以使用Future從一個異步線程中拿到結(jié)果,但是如果使用過程中會發(fā)現(xiàn)一些問題:
- 如果想要對Future的結(jié)果做進一步的操作,需要阻塞當前線程
- 多個Future不能被鏈式的執(zhí)行,每個Future的結(jié)果都是獨立的,期望對一個Future的結(jié)果做另外一件異步的事情;
- 沒有異常處理策略,如果Future執(zhí)行失敗了,需要手動捕捉
CompletableFuture應運而生
為了解決Future問題,JDK在1.8的時候給我們提供了一個好用的工具類CompletableFuture;
它實現(xiàn)了Future和CompletionStage接口,針對Future的不足之處給出了相應的處理方式。
- 在異步線程執(zhí)行結(jié)束后可以自動回調(diào)我們新的處理邏輯,無需阻塞
- 可以對多個異步任務進行編排,組合或者排序
- 異常處理
CompletableFuture的核心思想是將每個異步任務都可以看做一個步驟(CompletionStage),然后其他的異步任務可以根據(jù)這個步驟做一些想做的事情。
CompletionStage定義了許多步驟處理的方法,功能非常強大,這里就只列一下日常中常用到的一些方法供大家參考。
使用方式
基本使用-提交異步任務
簡單的使用方式
異步執(zhí)行,無需結(jié)果:
// 可以執(zhí)行Executors異步執(zhí)行,如果不指定,默認使用ForkJoinPool CompletableFuture.runAsync(() -> System.out.println("Hello CompletableFuture!"));
異步執(zhí)行,同時返回結(jié)果:
// 同樣可以指定線程池 CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "Hello CompletableFuture!"); System.out.println(stringCompletableFuture.get());
處理上個異步任務結(jié)果
thenRun: 不需要上一步的結(jié)果,直接直接新的操作
thenAccept:獲取上一步異步處理的內(nèi)容,進行新的操作
thenApply: 獲取上一步的內(nèi)容,然后產(chǎn)生新的內(nèi)容
所有加上Async后綴的,代表新的處理操作仍然是異步的。Async的操作都可以指定Executors進行處理
// Demo CompletableFuture .supplyAsync(() -> "Hello CompletableFuture!") // 針對上一步的結(jié)果做處理,產(chǎn)生新的結(jié)果 .thenApplyAsync(s -> s.toUpperCase()) // 針對上一步的結(jié)果做處理,不返回結(jié)果 .thenAcceptAsync(s -> System.out.println(s)) // 不需要上一步返回的結(jié)果,直接進行操作 .thenRunAsync(() -> System.out.println("end")); ;
對兩個結(jié)果進行選用-acceptEither
當我們有兩個回調(diào)在處理的時候,任何完成都可以使用,兩者結(jié)果沒有關系,那么使用acceptEither。
兩個異步線程誰先執(zhí)行完成,用誰的結(jié)果,其余類型的方法也是如此。
// 返回abc CompletableFuture .supplyAsync(() -> { SleepUtils.sleep(100); return "Hello CompletableFuture!"; }) .acceptEither(CompletableFuture.supplyAsync(() -> "abc"), new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } }); // 返回Hello CompletableFuture! CompletableFuture .supplyAsync(() -> "Hello CompletableFuture!") .acceptEither(CompletableFuture.supplyAsync(() -> { SleepUtils.sleep(100); return "abc"; }), new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } });
對兩個結(jié)果進行合并-thenCombine, thenAcceptBoth
thenCombine
當我們有兩個CompletionStage時,需要對兩個的結(jié)果進行整合處理,然后計算得出一個新的結(jié)果。
- thenCompose是對上一個CompletionStage的結(jié)果進行處理,返回結(jié)果,并且返回類型必須是CompletionStage。
- thenCombine是得到第一個CompletionStage的結(jié)果,然后拿到當前的CompletionStage,兩者的結(jié)果進行處理。
CompletableFuture<Integer> heightAsync = CompletableFuture.supplyAsync(() -> 172); CompletableFuture<Double> weightAsync = CompletableFuture.supplyAsync(() -> 65) .thenCombine(heightAsync, new BiFunction<Integer, Integer, Double>() { @Override public Double apply(Integer wight, Integer height) { return wight * 10000.0 / (height * height); } }) ;
thenAcceptBoth
需要兩個異步CompletableFuture的結(jié)果,兩者都完成的時候,才進入thenAcceptBoth回調(diào)。
// thenAcceptBoth案例: CompletableFuture .supplyAsync(() -> "Hello CompletableFuture!") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> "abc"), new BiConsumer<String, String>() { // 參數(shù)一為我們剛開始運行時的CompletableStage,新傳入的作為第二個參數(shù) @Override public void accept(String s, String s2) { System.out.println("param1=" + s + ", param2=" + s2); } }); // 結(jié)果:param1=Hello CompletableFuture!, param2=abc
異常處理
當我們使用CompleteFuture進行鏈式調(diào)用的時候,多個異步回調(diào)中,如果有一個執(zhí)行出現(xiàn)問題,那么接下來的回調(diào)都會停止,所以需要一種異常處理策略。
exceptionally
exceptionally是當出現(xiàn)錯誤時,給我們機會進行恢復,自定義返回內(nèi)容。
CompletableFuture.supplyAsync(() -> { throw new RuntimeException("發(fā)生錯誤"); }).exceptionally(throwable -> { log.error("調(diào)用錯誤 {}", throwable.getMessage(), throwable); return "異常處理內(nèi)容"; });
handle
exceptionally是只有發(fā)生異常時才會執(zhí)行,而handle則是不管是否發(fā)生錯誤都會執(zhí)行。
CompletableFuture.supplyAsync(() -> { return "abc"; }) .handle((r,err) -> { log.error("調(diào)用錯誤 {}", err.getMessage(), err); // 對結(jié)果做額外的處理 return r; }) ;
案例
大量用戶發(fā)送短信|消息
需求為對某個表中特定條件的用戶進行短信通知,但是短信用戶有成百上千萬,如果使用單線程讀取效率會很慢。這個時候可以考慮使用多線程的方式進行讀??;
1、將讀取任務拆分為多個不同的子任務,指定讀取的偏移量和個數(shù)
// 假設有500萬條記錄 long recordCount = 500 * 10000; int subTaskRecordCount = 10000; // 對記錄進行分片 List<Map> subTaskList = new LinkedList<>(); for (int i = 0; i < recordCount / 500; i++) { // 如果子任務結(jié)構(gòu)復雜,建議使用對象 HashMap<String, Integer> subTask = new HashMap<>(); subTask.put("index", i); subTask.put("offset", i * subTaskRecordCount); subTask.put("count", subTaskRecordCount); subTaskList.add(subTask); }
2、使用多線程進行批量讀取
// 進行subTask批量處理,拆分為不同的任務 subTaskList.stream() .map(subTask -> CompletableFuture.runAsync(()->{ // 讀取數(shù)據(jù),然后處理 // dataTunel.read(subTask); },excuturs)) // 使用應用的通用任務線程池 .map(c -> ((CompletableFuture<?>) c).join());
3、進行業(yè)務邏輯處理,或者直接在讀取完進行業(yè)務邏輯處理也是可以;
并發(fā)獲取商品不同信息
在系統(tǒng)拆分比較細的時候,價格,優(yōu)惠券,庫存,商品詳情等信息分散在不同的系統(tǒng)中,有時候需要同時獲取商品的所有信息, 有時候可能只需要獲取商品的部分信息。
當然問題點在于要調(diào)用多個不同的系統(tǒng),需要將RT降低下來,那么需要進行并發(fā)調(diào)用;
List<Task> taskList = new ArrayList<>(); List<Object> result = taskList.stream() .map(task -> CompletableFuture.supplyAsync(()->{ // handlerMap.get(task).query(); return ""; }, executorService)) .map(c -> c.join()) .collect(Collectors.toList());
問題
thenRun和thenRunAsync有什么區(qū)別
- 如果不使用傳入的線程池,大家用默認的線程池ForkJoinPool
- thenRun用的默認和上一個任務使用相同的線程池
- thenRunAsync在執(zhí)行新的任務的時候可以接受傳入一個新的線程池,使用新的線程池執(zhí)行任務;
handle和exceptional有什么區(qū)別
exceptionally是只有發(fā)生異常時才會執(zhí)行,而handle則是不管是否發(fā)生錯誤都會執(zhí)行。
最后
一般情況下上述簡單的API已經(jīng)滿足絕大部分的場景了,如果有更復雜的訴求,可繼續(xù)深入研究。
到此這篇關于Java多線程工具CompletableFuture的使用教程的文章就介紹到這了,更多相關Java CompletableFuture內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java的動態(tài)代理和靜態(tài)代理及反射常用API詳解
這篇文章主要介紹了Java的動態(tài)代理和靜態(tài)代理及反射常用API詳解,動態(tài)代理是一種在運行時動態(tài)生成代理對象的技術,它是一種設計模式,用于在不修改原始對象的情況下,通過代理對象來間接訪問原始對象,并在訪問前后執(zhí)行額外的操作,需要的朋友可以參考下2024-01-01Java+MyBatis+MySQL開發(fā)環(huán)境搭建流程詳解
Java的MyBatis框架提供了強大的數(shù)據(jù)庫操作支持,這里我們先在本地的開發(fā)環(huán)境中上手,來看一下Java+MyBatis+MySQL開發(fā)環(huán)境搭建流程詳2016-06-06修改Springboot默認序列化工具Jackson配置的實例代碼
這篇文章主要介紹了如何修改Springboot默認序列化工具Jackson的配置,當Spring容器中存在多個同類型的Bean時,默認情況下最后一個創(chuàng)建的Bean將作為首選Bean,文中通過代碼給大家介紹的非常詳細,需要的朋友可以參考下2024-02-02java中JsonObject與JsonArray轉(zhuǎn)換方法實例
在項目日常開發(fā)中常常會遇到JSONArray和JSONObject的轉(zhuǎn)換,很多公司剛?cè)肼毜男∶刃聲ㄔ谶@里,下面這篇文章主要給大家介紹了關于java中JsonObject與JsonArray轉(zhuǎn)換方法的相關資料,需要的朋友可以參考下2023-04-04java實戰(zhàn)技巧之if-else代碼優(yōu)化技巧大全
代碼中如果if-else比較多,閱讀起來比較困難,維護起來也比較困難,很容易出bug,下面這篇文章主要給大家介紹了關于java實戰(zhàn)技巧之if-else代碼優(yōu)化技巧的相關資料,需要的朋友可以參考下2022-02-02Spring MVC文件上傳大小和類型限制以及超大文件上傳bug問題
這篇文章主要介紹了Spring MVC文件上傳大小和類型限制以及超大文件上傳bug問題,非常具有實用價值,需要的朋友可以參考下2017-10-10