Java多線程Future松獲取異步任務(wù)結(jié)果輕松實現(xiàn)
前言
最近因為一些個人原因,未能抽出太多精力更新Java多線程系列,一擱置就是好幾個月,先向讀者諸君致歉。
在本系列的其他文章中,已經(jīng)提到過線程之間的相互協(xié)作, 通過分工,將程序系統(tǒng)的不同任務(wù)進行線程分離,充分利用機器性能、提升特定線程的利用率和程序的體驗感。
詳見拙作:Java多線程基礎(chǔ)--線程生命周期與線程協(xié)作詳解.
并在線程池相關(guān)文章中提到:作為程序構(gòu)建者,我們更關(guān)心線程(組)的特性和它們所執(zhí)行的任務(wù),并不愿意分心去做線程操作。
詳見拙作:Java多線程基礎(chǔ)--線程的創(chuàng)建與線程池管理
然而實際開發(fā)中,我們同樣關(guān)心一個任務(wù)對程序系統(tǒng)產(chǎn)生的影響,習慣上稱之為任務(wù)的的執(zhí)行結(jié)果。
Runnable的局限性
在前文中我們談到,通過編碼實現(xiàn)Runnable接口,將獲得具有邊界性的 "任務(wù)",在指定的線程(或者線程池)中運行。
重新觀察該接口,不難發(fā)現(xiàn)它并沒有方法返回值:
public interface Runnable { void run(); }
在JDK1.5之前,想利用任務(wù)的執(zhí)行結(jié)果,需要小心的操作線程訪問臨界區(qū)資源。使用 回調(diào)
進行解耦是非常不錯的選擇。
練手小Demo -- 回顧既往文章知識
注意,為了減少篇幅使用了lambda,但jdk1.5之前并不支持lambda
將計算任務(wù)分離到其他線程執(zhí)行,再回到主線程消費結(jié)果
我們將計算、IO等耗時任務(wù)丟到其他線程,讓主線程專注于自身業(yè)務(wù),假想它在接受用戶輸入以及處理反饋,但我們略去這一部分
我們可以設(shè)計出類似下面的代碼:
雖然它還有很多不合理之處值得優(yōu)化,但也足以用于演示
class Demo { static final Object queueLock = new Object(); static List<Runnable> mainQueue = new ArrayList<>(); static boolean running = true; static final Runnable FINISH = () -> running = false; public static void main(String[] args) { synchronized (queueLock) { mainQueue.add(Demo::onStart); } while (running) { Runnable runnable = null; synchronized (queueLock) { if (!mainQueue.isEmpty()) runnable = mainQueue.remove(0); } if (runnable != null) { runnable.run(); } Thread.yield(); } } public static void onStart() { //... } public static void finish() { synchronized (queueLock) { mainQueue.clear(); mainQueue.add(FINISH); } } }
再模擬一個計算的線程和任務(wù)回調(diào):
interface Callback { void onResultCalculated(int result); } class CalcThread extends Thread { private final Callback callback; private final int a; private final int b; public CalcThread(Callback callback, int a, int b) { this.callback = callback; this.a = a; this.b = b; } @Override public void run() { super.run(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = a + b; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); synchronized (queueLock) { mainQueue.add(() -> callback.onResultCalculated(result)); } } }
填充一下onStart業(yè)務(wù):
class Demo { public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); new CalcThread(result -> { System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis()); finish(); }, 200, 300).start(); } }
復(fù)習:優(yōu)化為使用Runnable
在前文我們提到,如果業(yè)務(wù)僅關(guān)注任務(wù)的執(zhí)行,并不過于關(guān)心線程本身,則可以利用Runnable:
class Demo { static class CalcRunnable implements Runnable { private final Callback callback; private final int a; private final int b; public CalcRunnable(Callback callback, int a, int b) { this.callback = callback; this.a = a; this.b = b; } @Override public void run() { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = a + b; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); synchronized (queueLock) { mainQueue.add(() -> callback.onResultCalculated(result)); } } } public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); new Thread(new CalcRunnable(result -> { System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis()); finish(); }, 200, 300)).start(); } }
不難想象出:我們非常需要
- 讓特定線程、特定類型的線程方便地接收任務(wù),回顧本系列文章中的 線程池篇 ,線程池是應(yīng)運而生
- 擁有比Synchronize更輕量的機制
- 擁有更方便的數(shù)據(jù)結(jié)構(gòu)
至此,我們可以體會到:JDK1.5之前,因為JDK的功能不足,Java程序?qū)τ诰€程的使用 較為粗糙。
為異步而生的Future
終于在JDK1.5中,迎來了新特性: Future
以及先前文章中提到的線程池, 時光荏苒,一晃將近20年了。
/** * 略 * @since 1.5 * @author Doug Lea * @param <V> The result type returned by this Future's {@code get} method */ public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
盡管已經(jīng)移除了API注釋,但仍然能夠理解每個API的含義,不多做贅述。
顯而易見,為了增加返回值,沒有必要用如此復(fù)雜的 接口來替代 Runnable
。簡單思考后可以對返回值的情況進行歸納:
- 返回Runnable中業(yè)務(wù)的結(jié)果,例如計算、讀取資源等
- 單純的在Runnable執(zhí)行完畢后返回一個結(jié)果
從業(yè)務(wù)層上看,僅需要如下接口即可,它增加了返回值、并可以更友好地讓使用者處理異常:
作者按:拋開底層實現(xiàn),僅看業(yè)務(wù)方編碼需要
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
顯然,JDK需要提供后向兼容能力:
- Runnable 不能夠丟棄,也不應(yīng)當丟棄
- 不能要求使用者完全的重構(gòu)代碼
所以一并提供了適配器,讓使用者進行簡單的局部重構(gòu)即可用上新特性
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
而Future恰如其名,它代表了在 "未來" 的一個結(jié)果和狀態(tài),為了更方便地處理異步而生。
并且內(nèi)置了 FutureTask
,在 FutureTask詳解 章節(jié)中再行展開。
類圖
在JDK1.8的基礎(chǔ)上,看一下精簡的類圖結(jié)構(gòu):
FutureTask詳解
構(gòu)造函數(shù)
public class FutureTask { public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } }
生命周期
public class FutureTask { //新建 private static final int NEW = 0; //處理中 private static final int COMPLETING = 1; //正常 private static final int NORMAL = 2; //異常 private static final int EXCEPTIONAL = 3; //已取消 private static final int CANCELLED = 4; //中斷中 private static final int INTERRUPTING = 5; //已中斷 private static final int INTERRUPTED = 6; }
可能的生命周期轉(zhuǎn)換如下:
- NEW -> COMPLETING -> NORMAL
- NEW -> COMPLETING -> EXCEPTIONAL
- NEW -> CANCELLED
- NEW -> INTERRUPTING -> INTERRUPTED
JDK中原汁原味的解釋如下:
The run state of this task, initially NEW. The run state transitions to a terminal state only in methods set, setException, and cancel. During completion, state may take on transient values of COMPLETING (while outcome is being set) or INTERRUPTING (only while interrupting the runner to satisfy a cancel(true)). Transitions from these intermediate to final states use cheaper ordered/lazy writes because values are unique and cannot be further modified.
核心方法
本節(jié)從以下三塊入手閱讀源碼
- 狀態(tài)判斷
- 取消
- 獲取結(jié)果
狀態(tài)判斷API的實現(xiàn)非常簡單
public class FutureTask { public boolean isCancelled() { return state >= CANCELLED; } public boolean isDone() { return state != NEW; } }
取消:
- 當前狀態(tài)為
NEW
且 CAS修改 state 成功,否則返回取消失敗 - 如果
mayInterruptIfRunning
則中斷在執(zhí)行的線程并CAS修改state為INTERRUPTED - 調(diào)用 finishCompletion
刪除并通知所有等待的線程
調(diào)用done()
設(shè)置callable為null
public class FutureTask { public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) { return false; } try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; } private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null; ) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (; ; ) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } }
獲取結(jié)果: 先判斷狀態(tài),如果未進入到 COMPLETING
(即為NEW狀態(tài)),則阻塞等待狀態(tài)改變,返回結(jié)果或拋出異常
public class FutureTask { public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V) x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable) x); } }
如何使用
而使用則非常簡單,也非常的樸素。
我們以文中的的例子進行改造:
- 沿用原Runnable邏輯
- 移除回調(diào),增加
CalcResult
- 將
CalcResult
對象作為既定返回結(jié)果,Runnable中設(shè)置其屬性
class Demo { static class CalcResult { public int result; } public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); final CalcResult calcResult = new CalcResult(); Future<CalcResult> resultFuture = Executors.newSingleThreadExecutor().submit(() -> { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = 200 + 300; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); calcResult.result = result; }, calcResult); System.out.println("threadId" + Thread.currentThread().getId() + "反正干點什么," + System.currentTimeMillis()); if (resultFuture.isDone()) { try { final int ret = resultFuture.get().result; System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } finish(); } }
如果直接使用新特性Callback,則如下:
直接返回結(jié)果,當然也可以直接返回Integer,不再包裹一層
class Demo { public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); ExecutorService executor = Executors.newSingleThreadExecutor(); Future<CalcResult> resultFuture = executor.submit(() -> { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = 200 + 300; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); final CalcResult calcResult = new CalcResult(); calcResult.result = result; return calcResult; }); System.out.println("threadId" + Thread.currentThread().getId() + "反正干點什么," + System.currentTimeMillis()); if (resultFuture.isDone()) { try { final int ret = resultFuture.get().result; System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } executor.shutdown(); finish(); } }
相信讀者諸君會有這樣的疑惑:
為何使用Future比原先的回調(diào)看起來粗糙?
首先要明確一點:文中前段的回調(diào)Demo,雖然達成了既定目標,但效率并不高?。≡诋敃r計算很昂貴的背景下,并不會如此莽撞地使用!
而在JDK1.5開始,提供了大量內(nèi)容支持多線程開發(fā)??紤]到篇幅,會在系列文章中逐步展開。
另外,F(xiàn)utureTask中的CAS與Happens-Before本篇中亦不做展開。
接下來,再做一些引申,簡單看一看多線程業(yè)務(wù)模式。
引申,多線程業(yè)務(wù)模式
常用的多線程設(shè)計模式包括:
- Future模式
- Master-Worker模式
- Guarded Suspension模式
- 不變模式
- 生產(chǎn)者-消費
Future模式
文中對于Future的使用方式遵循了Future模式。
業(yè)務(wù)方在使用時,已經(jīng)明確了任務(wù)被分離到其他線程執(zhí)行時有等待期,在此期間,可以干點別的事情,不必浪費系統(tǒng)資源。
Master-Worker模式
在程序系統(tǒng)中設(shè)計兩類線程,并相互協(xié)作:
- Master線程(單個)
- Worker線程
Master線程負責接受任務(wù)、分配任務(wù)、接收(必要時進一步組合)結(jié)果并返回;
Worker線程負責處理子任務(wù),當子任務(wù)處理完成后,向Master線程返回結(jié)果;
作者按:此時可再次回想一下文章開頭的Demo
Guarded Suspension模式
- 使用緩存隊列,使得 服務(wù)線程/服務(wù)進程 在未就緒、忙碌時能夠延遲處理請求。
- 使用等待-通知機制,將消費
服務(wù)的返回結(jié)果
的方式規(guī)范化
不變模式
在并行開發(fā)過程中,為確保數(shù)據(jù)的一致性和正確性,有必要對對象進行同步,而同步操作會對程序系統(tǒng)的性能產(chǎn)生相當?shù)膿p耗。
因此,使用狀態(tài)不可改變的對象,依靠其不變性來確保 并行操作 在 沒有同步機制 的情況下,保持一致性和正確性。
- 對象創(chuàng)建后,其內(nèi)部狀態(tài)和數(shù)據(jù)不再發(fā)生改變
- 對象被共享、被多個線程訪問
生產(chǎn)者-消費
設(shè)計兩類線程:若干個生產(chǎn)者線程和若干個消費者線程。
生產(chǎn)者線程負責提交用戶請求,消費者線程負責處理用戶請求。生產(chǎn)者和消費者之間通過共享內(nèi)存緩沖區(qū)進行通信。
內(nèi)存緩沖區(qū)的意義:
- 解決是數(shù)據(jù)在多線程間的共享問題
- 緩解生產(chǎn)者和消費者之間的性能差
這幾種模式從不同角度出發(fā)解決特定問題,但亦有一定的相似之處,不再展開。
后記
至此,我們已經(jīng)進入尾聲,JDK1.5中,對多線程的支持迎來一波井噴。本文以及系列文章中關(guān)于線程池的內(nèi)容也僅僅是基礎(chǔ)中的基礎(chǔ),仍舊有大量的內(nèi)容值得深入,本篇不再往下挖掘。
在后續(xù)的系列文章中,我們將展開AQS、HAPPENS-BEFORE等內(nèi)容,以及和本文高度關(guān)聯(lián)的CompleteFutureTask,JUC工具等。
更多關(guān)于Java Future異步任務(wù)獲取的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java開發(fā)實現(xiàn)的Socket雙向通信功能示例
這篇文章主要介紹了Java開發(fā)實現(xiàn)的Socket雙向通信功能,結(jié)合實例形式分析了java基于socket實現(xiàn)的服務(wù)器端與客戶端雙向通信相關(guān)操作技巧,需要的朋友可以參考下2018-01-01SpringBoot連接PostgreSQL+MybatisPlus入門案例(代碼詳解)
這篇文章主要介紹了SpringBoot連接PostgreSQL+MybatisPlus入門案例,本文通過實例代碼圖文相結(jié)合給大家介紹的非常詳細,感興趣的朋友跟隨小編一起看看吧2024-07-07Java中NoClassDefFoundError?和?ClassNotFoundException的區(qū)別
Java中NoClassDefFoundError和ClassNotFoundException的區(qū)別,從類繼承層次上來看,ClassNotFoundException是從Exception繼承的,所以ClassNotFoundException是一個檢查異常。具體詳情需要的朋友可以參考下面文章內(nèi)容2022-06-06Java中ShardingSphere 數(shù)據(jù)分片的實現(xiàn)
其實很多人對分庫分表多少都有點恐懼,我們今天用ShardingSphere 給大家演示數(shù)據(jù)分片,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-09-09SpringBoot返回統(tǒng)一的JSON標準格式實現(xiàn)步驟
這篇文章主要介紹了SpringBoot返回統(tǒng)一的JSON標準格式,本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-08-08用java實現(xiàn)學(xué)生信息管理系統(tǒng)
這篇文章主要為大家詳細介紹了java實現(xiàn)學(xué)生信息管理系統(tǒng),文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-09-09