關(guān)于Future機(jī)制原理及解析
future機(jī)制是
在通過(guò)線程去執(zhí)行某個(gè)任務(wù)的時(shí)候,如果比較耗時(shí),我們可以通過(guò)futureTask機(jī)制,異步返回,繼續(xù)去執(zhí)行其他的任務(wù),在需要獲取執(zhí)行結(jié)果的時(shí)候,通過(guò)future.get()方法去獲取,如果任務(wù)沒(méi)有執(zhí)行完畢,會(huì)通過(guò)lockSupport.park()方法去阻塞主線程,直到run()方法執(zhí)行完畢之后,會(huì)通過(guò)lockSupport.unpark()方法去喚醒線程
應(yīng)用
public class FutureTest {
? ? public static void main(String[] args) throws ExecutionException, InterruptedException {
? ? ? ? MyThread myThread = new MyThread();
? ? ? ? FutureTask<String> futureTask = new FutureTask<>(myThread);
? ? ? ? /**
? ? ? ? ?* 同一個(gè)futureTask對(duì)象,通過(guò)多個(gè)線程進(jìn)行多次調(diào)用,但是只會(huì)執(zhí)行一次
? ? ? ? ?* 如果是計(jì)算的操作,需要進(jìn)行多次計(jì)算操作,需要聲明不同的futureTask對(duì)象
? ? ? ? ?*/
? ? ? ? new Thread(futureTask, "A").start();
? ? ? ? new Thread(futureTask, "B").start();
? ? ? ? System.out.println(Thread.currentThread().getName() + "測(cè)試,在調(diào)用future.get()方法之前,可以執(zhí)行其他邏輯 ");
? ? ? ? System.out.println(futureTask.get());
? ? ? ? System.out.println("測(cè)試futureTask的get方法阻塞");
? ? }
}
class MyThread implements Callable<String> {
? ? @Override
? ? public String call() throws Exception {
? ? ? ? System.out.println(Thread.currentThread().getName() + " 測(cè)試callable");
? ? ? ? TimeUnit.SECONDS.sleep(4);
? ? ? ? return "success";
? ? }
}這里就是模擬thread比較耗時(shí),此時(shí)就可以通過(guò)futureTask異步返回之后,在需要使用的時(shí)候,調(diào)用其get方法去獲取執(zhí)行結(jié)果,如果call()方法還沒(méi)有執(zhí)行完,那futureTask.get()方法會(huì)一直阻塞,直到線程執(zhí)行完畢,獲取到執(zhí)行結(jié)果
原理
我們先說(shuō)future.get()是如何阻塞的,也就是說(shuō)在線程對(duì)應(yīng)的方法還未執(zhí)行完時(shí),主線程如何去阻塞?
在源碼中,有一個(gè)重要的屬性
private volatile int state; ? ? /** ? ? ?* 在構(gòu)造函數(shù)中,設(shè)置為new ? ? ?*/ ? ? private static final int NEW ? ? ? ? ?= 0; ? ? /** ? ? ?* 線程正常執(zhí)行完畢,通過(guò)CAS將state修改為completing ? ? ?*/ ? ? private static final int COMPLETING ? = 1; ? ? /** ? ? ?* ? ? ?*/ ? ? private static final int NORMAL ? ? ? = 2; ? ? /** ? ? ?* 執(zhí)行線程的時(shí)候,如果拋出異常,通過(guò)cas修改為exceptional ? ? ?*/ ? ? private static final int EXCEPTIONAL ?= 3; ? ? /** ? ? ?* 如果調(diào)用了cancel(boolean mayInterruptIfRunning)方法 ? ? ?* 入?yún)⒌膍ayInterruptIfRunning為true,就通過(guò)cas將state設(shè)置為INTERRUPTING ? ? ?* 如果為false,就通過(guò)cas將state修改為cancelled ? ? ?*/ ? ? private static final int CANCELLED ? ?= 4; ? ? private static final int INTERRUPTING = 5; ? ? /** ? ? ?* 在調(diào)用cancel()方法,入?yún)閠rue的情況下,如果中斷成功,通過(guò)cas將state從 ? ? ?* INTERRUPTING修改為INTERRUPTED ? ? ?*/ ? ? private static final int INTERRUPTED ?= 6;
接著來(lái)說(shuō)get()方法
get()
可以看到,在get()方法中,會(huì)先判斷當(dāng)前state是否小于等于 COMPLETING,如果是,就去阻塞
/**
? ? ?* @throws CancellationException {@inheritDoc}
? ? ?* 在調(diào)用future.get()方法的時(shí)候,如果線程的run()沒(méi)有執(zhí)行完畢,主線程會(huì)等待,直到run()方法正確的執(zhí)行完畢
? ? ?* 在內(nèi)部,是通過(guò)lockSupport.park()方法去阻塞線程的
? ? ?* 在該機(jī)制中,涉及到一個(gè)state狀態(tài)標(biāo)識(shí)
? ? ?*/
? ? public V get() throws InterruptedException, ExecutionException {
? ? ? ? int s = state;
? ? ? ? if (s <= COMPLETING)
? ? ? ? ? ? s = awaitDone(false, 0L);
? ? ? ? return report(s);
? ? }對(duì)于get方法來(lái)說(shuō),最為核心的邏輯是在awaitDone()方法中,在該方法中,會(huì)調(diào)用lockSupport.park(this),去阻塞當(dāng)前線程;
所以,我們可以知道,如果主線程在調(diào)用future.get()方法的時(shí)候,假如此時(shí)run()方法還沒(méi)有執(zhí)行完畢,會(huì)阻塞當(dāng)前線程,那阻塞之后,總要有一個(gè)地方去喚起,在run()方法正常執(zhí)行完畢之后,會(huì)喚醒當(dāng)前阻塞的線程,繼續(xù)執(zhí)行業(yè)務(wù)邏輯
run()
public void run() {
? ? ? ? /**
? ? ? ? ?* 1.校驗(yàn)當(dāng)前state是否是new狀態(tài)且通過(guò)cas設(shè)置當(dāng)前線程成功
? ? ? ? ?*/
? ? ? ? if (state != NEW ||
? ? ? ? ? ? !UNSAFE.compareAndSwapObject(this, runnerOffset,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?null, Thread.currentThread()))
? ? ? ? ? ? return;
? ? ? ? try {
? ? ? ? ? ? Callable<V> c = callable;
? ? ? ? ? ? /**
? ? ? ? ? ? ?* 2.校驗(yàn)當(dāng)前線程中的callable是否有效且state為new
? ? ? ? ? ? ?*/
? ? ? ? ? ? if (c != null && state == NEW) {
? ? ? ? ? ? ? ? V result;
? ? ? ? ? ? ? ? boolean ran;
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? /**
? ? ? ? ? ? ? ? ? ? ?* 2.1 執(zhí)行call()方法,如果正常執(zhí)行完畢,設(shè)置ran為true
? ? ? ? ? ? ? ? ? ? ? */
? ? ? ? ? ? ? ? ? ? result = c.call();
? ? ? ? ? ? ? ? ? ? ran = true;
? ? ? ? ? ? ? ? } catch (Throwable ex) {
? ? ? ? ? ? ? ? ? ? result = null;
? ? ? ? ? ? ? ? ? ? ran = false;
? ? ? ? ? ? ? ? ? ? /**
? ? ? ? ? ? ? ? ? ? ?* 2.2 假如執(zhí)行報(bào)錯(cuò),被捕獲到,在setException方法中,也會(huì)去喚醒阻塞的線程
? ? ? ? ? ? ? ? ? ? ?*/
? ? ? ? ? ? ? ? ? ? setException(ex);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? /**
? ? ? ? ? ? ? ? ?* 3.如果ran為true,就調(diào)用set方法,在set方法中,有一步跟重要的操作,就是通過(guò)lockSupport.unpark()喚醒線程
? ? ? ? ? ? ? ? ?*/
? ? ? ? ? ? ? ? if (ran)
? ? ? ? ? ? ? ? ? ? set(result);
? ? ? ? ? ? }
? ? ? ? } finally {
? ? ? ? ? ? // runner must be non-null until state is settled to
? ? ? ? ? ? // prevent concurrent calls to run()
? ? ? ? ? ? runner = null;
? ? ? ? ? ? // state must be re-read after nulling runner to prevent
? ? ? ? ? ? // leaked interrupts
? ? ? ? ? ? int s = state;
? ? ? ? ? ? if (s >= INTERRUPTING)
? ? ? ? ? ? ? ? handlePossibleCancellationInterrupt(s);
? ? ? ? }
? ? }在這里的run()方法中,setException和set(result)方法中,都會(huì)調(diào)用一個(gè)方法:finishCompletion()
set()
?? ?/*
?? ?* 在set方法中,會(huì)通過(guò)cas更新當(dāng)前的state狀態(tài)
? ? ?* 然后在調(diào)用finishCompletion的時(shí)候,會(huì)喚醒阻塞的線程
? ? ?* @param v the value
? ? ?*/
? ? protected void set(V v) {
? ? ? ? if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
? ? ? ? ? ? outcome = v;
? ? ? ? ? ? UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
? ? ? ? ? ? finishCompletion();
? ? ? ? }
? ? }在線程正常執(zhí)行完之后,會(huì)通過(guò)cas將state的狀態(tài)從new修改為completing,然后再修改為normal
finishCompletion()
/**
? ? ?* Removes and signals all waiting threads, invokes done(), and
? ? ?* nulls out callable.
? ? ?* 這個(gè)方法大致的意思是:從等待隊(duì)列中獲取到當(dāng)前在等待的線程信息,然后通過(guò)cas將q設(shè)置為null
? ? ?* 最后會(huì)通過(guò)lockSupport.unPark()方法喚醒線程
? ? ?*/
? ? private void finishCompletion() {
? ? ? ? // assert state > COMPLETING;
? ? ? ? /**
? ? ? ? ?* 1.從waiters中獲取到等待的節(jié)點(diǎn)
? ? ? ? ?*/
? ? ? ? for (WaitNode q; (q = waiters) != null;) {
? ? ? ? ? ? /**
? ? ? ? ? ? ?* 2.通過(guò)cas將q設(shè)置為null
? ? ? ? ? ? ?*/
? ? ? ? ? ? if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
? ? ? ? ? ? ? ? for (;;) {
? ? ? ? ? ? ? ? ? ? Thread t = q.thread;
? ? ? ? ? ? ? ? ? ? /**
? ? ? ? ? ? ? ? ? ? ?* 3.喚醒線程
? ? ? ? ? ? ? ? ? ? ?*/
? ? ? ? ? ? ? ? ? ? if (t != null) {
? ? ? ? ? ? ? ? ? ? ? ? q.thread = null;
? ? ? ? ? ? ? ? ? ? ? ? LockSupport.unpark(t);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? WaitNode next = q.next;
? ? ? ? ? ? ? ? ? ? if (next == null)
? ? ? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? ? ? q.next = null; // unlink to help gc
? ? ? ? ? ? ? ? ? ? q = next;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? break;
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? done();
? ? ? ? callable = null; ? ? ? ?// to reduce footprint
? ? }這個(gè)方法也不需要做過(guò)多解釋了,就是從waitNode中,獲取到當(dāng)前等待的線程,然后喚醒
總結(jié)
所以:對(duì)于future.get()方法,如果run()方法沒(méi)有執(zhí)行完成,該方法會(huì)阻塞線程,并且在方法正常執(zhí)行完畢之后,喚醒阻塞的線程,繼續(xù)去執(zhí)行對(duì)應(yīng)的業(yè)務(wù)代碼
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java 實(shí)現(xiàn)定時(shí)任務(wù)的三種方法
這篇文章主要介紹了Java 實(shí)現(xiàn)定時(shí)任務(wù)的三種方法,幫助大家更好的理解和學(xué)習(xí)使用Java,感興趣的朋友可以了解下2021-03-03
SpringBoot啟動(dòng)報(bào)錯(cuò)屬性循環(huán)依賴報(bào)錯(cuò)問(wèn)題的解決
這篇文章主要介紹了SpringBoot啟動(dòng)報(bào)錯(cuò)屬性循環(huán)依賴報(bào)錯(cuò)問(wèn)題的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-05-05
springBoot+webMagic實(shí)現(xiàn)網(wǎng)站爬蟲(chóng)的實(shí)例代碼
這篇文章主要介紹了springBoot+webMagic實(shí)現(xiàn)網(wǎng)站爬蟲(chóng)的實(shí)例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05
Java將不同的List集合復(fù)制到另一個(gè)集合常見(jiàn)的方法
在Java中,有時(shí)候我們需要將一個(gè)List對(duì)象的屬性值復(fù)制到另一個(gè)List對(duì)象中,使得兩個(gè)對(duì)象的屬性值相同,這篇文章主要介紹了Java將不同的List集合復(fù)制到另一個(gè)集合常見(jiàn)的方法,需要的朋友可以參考下2024-09-09
簡(jiǎn)單捋捋@RequestParam 和 @RequestBody的使用
這篇文章主要介紹了簡(jiǎn)單捋捋@RequestParam 和 @RequestBody的使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12

