Java實(shí)現(xiàn)FutureTask的示例詳解
前言
在并發(fā)編程當(dāng)中我們最常見的需求就是啟動一個線程執(zhí)行一個函數(shù)去完成我們的需求,而在這種需求當(dāng)中,我們常常需要函數(shù)有返回值。比如我們需要同一個非常大的數(shù)組當(dāng)中數(shù)據(jù)的和,讓每一個線程求某一個區(qū)間內(nèi)部的和,最終將這些和加起來,那么每個線程都需要返回對應(yīng)區(qū)間的和。而在Java當(dāng)中給我們提供了這種機(jī)制,去實(shí)現(xiàn)這一個效果——FutureTask
。
FutureTask
在自己寫FutureTask
之前我們首先寫一個例子來回顧一下FutureTask
的編程步驟:
寫一個類實(shí)現(xiàn)Callable
接口。
@FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
實(shí)現(xiàn)接口就實(shí)現(xiàn)call
即可,可以看到這個函數(shù)是有返回值的,而FutureTask
返回給我們的值就是這個函數(shù)的返回值。
new
一個FutureTask
對象,并且new
一個第一步寫的類,new FutureTask<>(callable實(shí)現(xiàn)類)
。
最后將剛剛得到的FutureTask
對象傳入Thread
類當(dāng)中,然后啟動線程即可new Thread(futureTask).start();
。
然后我們可以調(diào)用FutureTask
的get
方法得到返回的結(jié)果futureTask.get();
。
假如有一個數(shù)組data
,長度為100000,現(xiàn)在有10個線程,第i
個線程求數(shù)組[i * 10000, (i + 1) * 10000)
所有數(shù)據(jù)的和,然后將這十個線程的結(jié)果加起來。
import java.lang.reflect.Array; import java.util.Arrays; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class FutureTaskDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { int[] data = new int[100000]; Random random = new Random(); for (int i = 0; i < 100000; i++) { data[i] = random.nextInt(10000); } @SuppressWarnings("unchecked") FutureTask<Integer>[] tasks = (FutureTask<Integer>[]) Array.newInstance(FutureTask.class, 10); // 設(shè)置10個 futuretask 任務(wù)計算數(shù)組當(dāng)中數(shù)據(jù)的和 for (int i = 0; i < 10; i++) { int idx = i; tasks[i] = new FutureTask<>(() -> { int sum = 0; for (int k = idx * 10000; k < (idx + 1) * 10000; k++) { sum += data[k]; } return sum; }); } // 開啟線程執(zhí)行 futureTask 任務(wù) for (FutureTask<Integer> futureTask : tasks) { new Thread(futureTask).start(); } int threadSum = 0; for (FutureTask<Integer> futureTask : tasks) { threadSum += futureTask.get(); } int sum = Arrays.stream(data).sum(); System.out.println(sum == threadSum); // 結(jié)果始終為 true } }
可能你會對FutureTask
的使用方式感覺困惑,或者不是很清楚,現(xiàn)在我們來仔細(xì)捋一下思路。
首先啟動一個線程要么是繼承自Thread
類,然后重寫Thread
類的run
方法,要么是給Thread
類傳遞一個實(shí)現(xiàn)了Runnable
的類對象,當(dāng)然可以用匿名內(nèi)部類實(shí)現(xiàn)。
既然我們的FutureTask
對象可以傳遞給Thread
類,說明FutureTask
肯定是實(shí)現(xiàn)了Runnable
接口,我們現(xiàn)在來看一下FutureTask
的繼承體系。
? 可以發(fā)現(xiàn)的是FutureTask
確實(shí)實(shí)現(xiàn)了Runnable
接口,同時還實(shí)現(xiàn)了Future
接口,這個Future
接口主要提供了后面我們使用FutureTask
的一系列函數(shù)比如get
。
看到這里你應(yīng)該能夠大致想到在FutureTask
中的run
方法會調(diào)用Callable
當(dāng)中實(shí)現(xiàn)的call
方法,然后將結(jié)果保存下來,當(dāng)調(diào)用get
方法的時候再將這個結(jié)果返回。
自己實(shí)現(xiàn)FutureTask
工具準(zhǔn)備
經(jīng)過上文的分析你可能已經(jīng)大致了解了FutureTask
的大致執(zhí)行過程了,但是需要注意的是,如果你執(zhí)行FutureTask
的get
方法是可能阻塞的,因?yàn)榭赡?code>Callable的call
方法還沒有執(zhí)行完成。因此在get
方法當(dāng)中就需要有阻塞線程的代碼,但是當(dāng)call
方法執(zhí)行完成之后需要將這些線程都喚醒。
在本篇文章當(dāng)中使用鎖ReentrantLock
和條件變量Condition
進(jìn)行線程的阻塞和喚醒,在我們自己動手實(shí)現(xiàn)FutureTask
之前,我們先熟悉一下上面兩種工具的使用方法。
ReentrantLock
主要有兩個方法:
lock
對臨界區(qū)代碼塊進(jìn)行加鎖。unlock
對臨界區(qū)代碼進(jìn)行解鎖。
Condition
主要有三個方法:
await
阻塞調(diào)用這個方法的線程,等待其他線程喚醒。signal
喚醒一個被await
方法阻塞的線程。signalAll
喚醒所有被await
方法阻塞的線程。
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class LockDemo { private ReentrantLock lock; private Condition condition; LockDemo() { lock = new ReentrantLock(); condition = lock.newCondition(); } public void blocking() { lock.lock(); try { System.out.println(Thread.currentThread() + " 準(zhǔn)備等待被其他線程喚醒"); condition.await(); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } public void inform() throws InterruptedException { // 先休眠兩秒 等他其他線程先阻塞 TimeUnit.SECONDS.sleep(2); lock.lock(); try { System.out.println(Thread.currentThread() + " 準(zhǔn)備喚醒其他線程"); condition.signal(); // 喚醒一個被 await 方法阻塞的線程 // condition.signalAll(); // 喚醒所有被 await 方法阻塞的線程 }finally { lock.unlock(); } } public static void main(String[] args) { LockDemo lockDemo = new LockDemo(); Thread thread = new Thread(() -> { lockDemo.blocking(); // 執(zhí)行阻塞線程的代碼 }, "Blocking-Thread"); Thread thread1 = new Thread(() -> { try { lockDemo.inform(); // 執(zhí)行喚醒線程的代碼 } catch (InterruptedException e) { e.printStackTrace(); } }, "Inform-Thread"); thread.start(); thread1.start(); } }
上面的代碼的輸出:
Thread[Blocking-Thread,5,main] 準(zhǔn)備等待被其他線程喚醒
Thread[Inform-Thread,5,main] 準(zhǔn)備喚醒其他線程
FutureTask設(shè)計與實(shí)現(xiàn)
在前文當(dāng)中我們已經(jīng)談到了FutureTask
的實(shí)現(xiàn)原理,主要有以下幾點(diǎn):
- 構(gòu)造函數(shù)需要傳入一個實(shí)現(xiàn)了
Callable
接口的類對象,這個將會在FutureTask
的run
方法執(zhí)行,然后得到函數(shù)的返回值,并且將返回值存儲起來。 - 當(dāng)線程調(diào)用
get
方法的時候,如果這個時候Callable
當(dāng)中的call
已經(jīng)執(zhí)行完成,直接返回call
函數(shù)返回的結(jié)果就行,如果call
函數(shù)還沒有執(zhí)行完成,那么就需要將調(diào)用get
方法的線程掛起,這里我們可以使用condition.await()
將線程掛起。 - 在
call
函數(shù)執(zhí)行完成之后,需要將之前被get
方法掛起的線程喚醒繼續(xù)執(zhí)行,這里使用condition.signalAll()
將所有掛起的線程喚醒。 - 因?yàn)槭俏覀冏约簩?shí)現(xiàn)
FutureTask
,功能不會那么齊全,只需要能夠滿足我們的主要需求即可,主要是幫助大家了解FutureTask
原理。
實(shí)現(xiàn)代碼如下(分析都在注釋當(dāng)中):
import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; // 這里需要實(shí)現(xiàn) Runnable 接口,因?yàn)樾枰獙⑦@個對象放入 Thread 類當(dāng)中 // 而 Thread 要求傳入的對象實(shí)現(xiàn)了 Runnable 接口 public class MyFutureTask<V> implements Runnable { private final Callable<V> callable; private Object returnVal; // 這個表示我們最終的返回值 private final ReentrantLock lock; private final Condition condition; public MyFutureTask(Callable<V> callable) { // 將傳入的 callable 對象存儲起來 方便在后面的 run 方法當(dāng)中調(diào)用 this.callable = callable; lock = new ReentrantLock(); condition = lock.newCondition(); } @SuppressWarnings("unchecked") public V get(long timeout, TimeUnit unit) { if (returnVal != null) // 如果符合條件 說明 call 函數(shù)已經(jīng)執(zhí)行完成 返回值已經(jīng)不為 null 了 return (V) returnVal; // 直接將結(jié)果返回即可 這樣不用競爭鎖資源 提高程序執(zhí)行效率 lock.lock(); try { // 這里需要進(jìn)行二次判斷 (雙重檢查) // 因?yàn)槿绻粋€線程在第一次判斷 returnVal 為空 // 然后這個時候它可能因?yàn)楂@取鎖而被掛起 // 而在被掛起的這段時間,call 可能已經(jīng)執(zhí)行完成 // 如果這個時候不進(jìn)行判斷直接執(zhí)行 await方法 // 那后面這個線程將無法被喚醒 if (returnVal == null) condition.await(timeout, unit); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return (V) returnVal; } @SuppressWarnings("unchecked") public V get() { if (returnVal != null) return (V) returnVal; lock.lock(); try { // 同樣的需要進(jìn)行雙重檢查 if (returnVal == null) condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return (V) returnVal; } @Override public void run() { if (returnVal != null) return; try { // 在 Runnable 的 run 方法當(dāng)中 // 執(zhí)行 Callable 方法的 call 得到返回結(jié)果 returnVal = callable.call(); } catch (Exception e) { e.printStackTrace(); } lock.lock(); try { // 因?yàn)橐呀?jīng)得到了結(jié)果 // 因此需要將所有被 await 方法阻塞的線程喚醒 // 讓他們從 get 方法返回 condition.signalAll(); }finally { lock.unlock(); } } // 下面是測試代碼 public static void main(String[] args) { MyFutureTask<Integer> ft = new MyFutureTask<>(() -> { TimeUnit.SECONDS.sleep(2); return 101; }); Thread thread = new Thread(ft); thread.start(); System.out.println(ft.get(100, TimeUnit.MILLISECONDS)); // 輸出為 null System.out.println(ft.get()); // 輸出為 101 } }
我們現(xiàn)在用我們自己寫的MyFutureTask
去實(shí)現(xiàn)在前文當(dāng)中數(shù)組求和的例子:
public static void main(String[] args) throws ExecutionException, InterruptedException { int[] data = new int[100000]; Random random = new Random(); for (int i = 0; i < 100000; i++) { data[i] = random.nextInt(10000); } @SuppressWarnings("unchecked") MyFutureTask<Integer>[] tasks = (MyFutureTask<Integer>[]) Array.newInstance(MyFutureTask.class, 10); for (int i = 0; i < 10; i++) { int idx = i; tasks[i] = new MyFutureTask<>(() -> { int sum = 0; for (int k = idx * 10000; k < (idx + 1) * 10000; k++) { sum += data[k]; } return sum; }); } for (MyFutureTask<Integer> MyFutureTask : tasks) { new Thread(MyFutureTask).start(); } int threadSum = 0; for (MyFutureTask<Integer> MyFutureTask : tasks) { threadSum += MyFutureTask.get(); } int sum = Arrays.stream(data).sum(); System.out.println(sum == threadSum); // 輸出結(jié)果為 true }
總結(jié)
在本篇文章當(dāng)中主要給大家介紹了FutureTask
的內(nèi)部原理,并且我們自己通過使用ReentrantLock
和Condition
實(shí)現(xiàn)了我們自己的FutureTask
,本篇文章的主要內(nèi)容如下:
FutureTask
的內(nèi)部原理:
FutureTask
首先會繼承Runnable
接口,這樣就可以將FutureTask
的對象直接放入Thread
類當(dāng)中,作為構(gòu)造函數(shù)的參數(shù)。- 我們在使用
FutureTask
的時候需要傳入一個Callable
實(shí)現(xiàn)類的對象,在函數(shù)call
當(dāng)中實(shí)現(xiàn)我們需要執(zhí)行的函數(shù),執(zhí)行完成之后,將call
函數(shù)的返回值保存下來,當(dāng)有線程調(diào)用get
方法時候?qū)⒈4娴姆祷刂捣祷亍?/li>
我們使用條件變量進(jìn)行對線程的阻塞和喚醒。
- 當(dāng)有線程調(diào)用
get
方法時,如果call
已經(jīng)執(zhí)行完成,那么可以直接將結(jié)果返回,否則需要使用條件變量將線程掛起。 - 當(dāng)
call
函數(shù)執(zhí)行完成的時候,需要使用條件變量將所有阻塞在get
方法的線程喚醒。
雙重檢查:
- 我們在
get
方法當(dāng)中首先判斷returnVal
是否為空,如果不為空直接將結(jié)果返回,這就可以不用去競爭鎖資源了,可以提高程序執(zhí)行的效率。 - 但是我們在使用鎖保護(hù)的臨界區(qū)還需要進(jìn)行判斷,判斷
returnVal
是否為空,因?yàn)槿绻粋€線程在第一次判斷returnVal
為空,然后這個時候它可能因?yàn)楂@取鎖而被掛起, 而在被掛起的這段時間,call 可能已經(jīng)執(zhí)行完成,如果這個時候不進(jìn)行判斷直接執(zhí)行 await方法,那后面這個線程將無法被喚醒,因?yàn)樵?code>call函數(shù)執(zhí)行完成之后調(diào)用了condition.signalAll()
,如果線程在這之后執(zhí)行await
方法,那么將來再沒有線程去將這些線程喚醒。
到此這篇關(guān)于Java實(shí)現(xiàn)FutureTask的示例詳解的文章就介紹到這了,更多相關(guān)Java FutureTask內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Maven在Windows中的配置以及IDE中的項目創(chuàng)建實(shí)例
下面小編就為大家?guī)硪黄狹aven在Windows中的配置以及IDE中的項目創(chuàng)建實(shí)例。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-09-09SpringBoot中的Spring Cloud Hystrix原理和用法詳解
在Spring Cloud中,Hystrix是一個非常重要的組件,Hystrix可以幫助我們構(gòu)建具有韌性的分布式系統(tǒng),保證系統(tǒng)的可用性和穩(wěn)定性,在本文中,我們將介紹SpringBoot中的Hystrix,包括其原理和如何使用,需要的朋友可以參考下2023-07-07Java基于JNDI 實(shí)現(xiàn)讀寫分離的示例代碼
本文主要介紹了Java基于JNDI 實(shí)現(xiàn)讀寫分離的示例代碼,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-12-12java與微信小程序?qū)崿F(xiàn)websocket長連接
這篇文章主要為大家詳細(xì)介紹了java與微信小程序?qū)崿F(xiàn)websocket長連接,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-05-05Spring AOP 自定義注解的實(shí)現(xiàn)代碼
本篇文章主要介紹了Spring AOP 自定義注解的實(shí)現(xiàn)代碼,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-04-04