java并發(fā)學(xué)習(xí)之Executor源碼解析
Runnable && Thread
Runnable和Thread都是java.lang包最基本的線程操作類,相當(dāng)于官方的,而Executor接口及其實(shí)現(xiàn)都是Doug Lea寫(xiě)的java.util.concurrent包下,屬于民間的,當(dāng)然因?yàn)樘1屏怂砸苍趈dk中
先看官方提供的線程操作,其中Runnable是一個(gè)函數(shù)式接口
@FunctionalInterface public interface Runnable { public abstract void run(); }
可以理解為一個(gè)待執(zhí)行的函數(shù),或者理解為一個(gè)任務(wù)(通過(guò)調(diào)用run方法可以實(shí)際的執(zhí)行任務(wù))
Runable是一個(gè)定義的任務(wù),而Thread是它的一個(gè)執(zhí)行者,它提供start方法可以開(kāi)啟一個(gè)新線程執(zhí)行傳入的Runable任務(wù),這很像命令模式,用戶通過(guò)實(shí)現(xiàn)Runable制定一個(gè)命令,交給Thread這個(gè)執(zhí)行者去具體執(zhí)行
Runnable && Thread
所以一般開(kāi)啟新線程執(zhí)行方法的方式如下
Runnable task = () -> { // do something }; new Thread(task).start(); // 開(kāi)啟新線程執(zhí)行
而開(kāi)啟新線程執(zhí)行方法也只有這一個(gè)途徑可走,就是必須通過(guò)官方的Thread.start
方法
Executor
雖然開(kāi)啟線程執(zhí)行任務(wù)只能走Thread.start
方法,方法只有一個(gè),但我們能做的是可以改變?nèi)蝿?wù)運(yùn)行的方式,比如我們可以決定什么時(shí)候執(zhí)行任務(wù),多少個(gè)任務(wù)共用某個(gè)線程排隊(duì)工作
最具代表性的就是線程池,線程池只是修改了任務(wù)執(zhí)行的方式:即所有任務(wù)共用固定數(shù)量的線程,但最終的運(yùn)行終歸還是通過(guò)Thread.start
方法實(shí)際在線程中執(zhí)行Runnable
方法
Doug Lea所寫(xiě)的java.util.concurrent.Executor
把各種方式的Runnable執(zhí)行器的一個(gè)抽象
public interface Executor { void execute(Runnable command); }
Executor
而且提供了一些常用的執(zhí)行器供我們使用,比如ThreadPoolExecutor(線程池),F(xiàn)orkJoinPool,當(dāng)然我們也可以自己定義一個(gè)Executor按照自己的方式執(zhí)行任務(wù),比如netty中實(shí)現(xiàn)的SingleThreadEventExecutor是一種單個(gè)線程依次處理所有任務(wù)的執(zhí)行器
ExecutorService
如果說(shuō)Executor是一種對(duì)按自己套路執(zhí)行任務(wù)的執(zhí)行器,是一種抽象分類,那么ExecutorService就是其下的一個(gè)子分類,它是一種特殊的執(zhí)行器,從名字直譯來(lái)看:"執(zhí)行器服務(wù)",從一個(gè)執(zhí)行器升級(jí)為執(zhí)行服務(wù),像不像某公司從賣產(chǎn)品業(yè)務(wù)升級(jí)為產(chǎn)品安裝售后整套服務(wù)
所以ExecutorService作為一個(gè)特殊的服務(wù)類Executor,不光能按照自己的方式執(zhí)行任務(wù),還推出了一系列附加"服務(wù)",那就看看這種特殊的執(zhí)行器都提供了什么服務(wù)
public interface ExecutorService extends Executor { void shutdown(); Future<?> submit(Runnable task); <T> Future<T> submit(Callable<T> task); ...... }
只貼了些重要方法,首先繼承了Executor肯定是要繼承void execute(Runnable)
方法代表它首先是一個(gè)任務(wù)執(zhí)行器
shutdown
方法代表這個(gè)執(zhí)行器是有狀態(tài)的,可以關(guān)閉服務(wù)的
然后就是重量級(jí)的submit
方法,這也是ExecutorService提供的最具特色的服務(wù),如果打開(kāi)ExecutorService的類,注釋第一句就寫(xiě)著:
/** * An {@link Executor} that provides methods to manage termination and * methods that can produce a {@link Future} for tracking progress of * one or more asynchronous tasks.
翻譯過(guò)來(lái)大致就是ExecutorService是一個(gè)特殊的Executor執(zhí)行器,他可以終止服務(wù)并且可以創(chuàng)建一個(gè)Future來(lái)跟蹤任務(wù)執(zhí)行進(jìn)度
ExecutorService的submit不光能接受Runnable,還可以接受一種新型任務(wù)形式:Callable
,即有返回結(jié)果和異常的任務(wù)
Callable
上面我們總結(jié)Runnable是一種可執(zhí)行任務(wù),而這種任務(wù)是沒(méi)有返回結(jié)果的,也不能拋出異常,很顯然現(xiàn)實(shí)中很多任務(wù)是需要有返回結(jié)果的,比如計(jì)算1+1等于幾的任務(wù),所以為了擴(kuò)展任務(wù)類型,Doug Lea又定義一種新的任務(wù):Callable
,而ExecutorService可以接受并處理這樣的任務(wù)
@FunctionalInterface public interface Callable<V> { V call() throws Exception; }
Future
submit執(zhí)行的返回值是一個(gè)Future,從注釋看出他可以跟蹤任務(wù)的進(jìn)度,它就好比任務(wù)的一個(gè)訂單,通過(guò)訂單可以取消任務(wù),查看任務(wù)進(jìn)度等
- boolean cancel(boolean) 取消
取消任務(wù),就好比在某寶買了個(gè)東西,本質(zhì)就是提交一個(gè)"把東西給我送過(guò)來(lái)"的任務(wù),而通過(guò)訂單我們就可以取消這個(gè)任務(wù) - isCancelled() && isDone()
查看任務(wù)狀態(tài),是否取消和是否完成 - V get()
獲取任務(wù)執(zhí)行結(jié)果,如果沒(méi)完成則阻塞,如果是Runnable,返回的就是null
ExecutorService
AbstractExecutorService
ExecutorService制訂了一種新型執(zhí)行器,它的特殊在于可以跟蹤任務(wù)進(jìn)度甚至取消任務(wù),那么如何實(shí)現(xiàn)吶
首先execute方法只會(huì)單純的執(zhí)行任務(wù),按照自己執(zhí)行器的邏輯調(diào)用Thread.start方法,不會(huì)有返回值,也不支持跟蹤進(jìn)度或取消
所以解決方案只有一個(gè):調(diào)包任務(wù),具體這樣操作:當(dāng)用戶提交任務(wù),不是直接去execute執(zhí)行,而是把任務(wù)包裝為一個(gè)新任務(wù),新任務(wù)執(zhí)行原任務(wù)的同時(shí),還負(fù)責(zé)獲取任務(wù)結(jié)果,跟蹤任務(wù)狀態(tài)等工作,相當(dāng)于是原任務(wù)的一個(gè)代理
以上即是AbstractExecutorService負(fù)責(zé)的工作,它是ExecutorService關(guān)于任務(wù)跟蹤業(yè)務(wù)的相關(guān)實(shí)現(xiàn)(解決方案即是代理任務(wù)),繼承了AbstractExecutorService的任務(wù)執(zhí)行器既可以實(shí)現(xiàn)跟蹤任務(wù)的功能
AbstractExecutorService
來(lái)看一下AbstractExecutorService的submit方法
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); // 生成一個(gè)依賴于原任務(wù)的新任務(wù) RunnableFuture<Void> ftask = newTaskFor(task, null); // 執(zhí)行新任務(wù) execute(ftask); // 返回新任務(wù)(充當(dāng)任務(wù)跟蹤器) return ftask; }
其中newTaskFor
方法負(fù)責(zé)生成新任務(wù),同時(shí)也是原任務(wù)的跟蹤器(訂單)
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
所以整個(gè)執(zhí)行過(guò)程就是把Runnable或Callable轉(zhuǎn)換為FutureTask的過(guò)程,而FutureTask首先是一個(gè)新任務(wù)(繼承Runnable),又是原任務(wù)的跟蹤器(繼承Future),這種任務(wù)歸類為RunnableFuture
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
FutureTask
接下來(lái)就看FutureTask是如何實(shí)現(xiàn)跟蹤替換原任務(wù)的,首先它的重點(diǎn)屬性如下
- int state; 存儲(chǔ)原任務(wù)的執(zhí)行狀態(tài)
- Callable<V> callable; 原任務(wù)Runnable也可以適配為返回null的Callable
- Object outcome; 原任務(wù)的返回結(jié)果
再看一下重點(diǎn)方法
1.初始化,存儲(chǔ)原任務(wù),任務(wù)狀態(tài)為NEW
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
2.run(),真實(shí)的被執(zhí)行任務(wù),加入狀態(tài)判斷,執(zhí)行原任務(wù)通過(guò)try-catch獲取異常,通過(guò)outcome保存結(jié)果
public void run() { // 如果任務(wù)不是新?tīng)顟B(tài),直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { // 獲取原任務(wù) Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 執(zhí)行原任務(wù) result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 有異常設(shè)置異常 setException(ex); } if (ran) // 設(shè)置outcome存儲(chǔ)返回值 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
3.get(),獲取結(jié)果,如果狀態(tài)未完成阻塞等待,完成則返回outcome
public V get() throws InterruptedException, ExecutionException { int s = state; // 如果任務(wù)未完成,阻塞等待 if (s <= COMPLETING) s = awaitDone(false, 0L); // 任務(wù)完成,返回結(jié)果 return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; // 返回結(jié)果即outcome if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
4.cancel(),只要把狀態(tài)設(shè)置為CANCEL,run時(shí)就會(huì)直接return而不會(huì)執(zhí)行原任務(wù)
public boolean cancel(boolean mayInterruptIfRunning) { // 狀態(tài)變?yōu)镃ANCELLED if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; // 如果正在運(yùn)行調(diào)用interrupt阻斷正在執(zhí)行的任務(wù) try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }
總結(jié)
AbstractExecutorService只是實(shí)現(xiàn)了讓任務(wù)變得可跟蹤,通過(guò)調(diào)包任務(wù),而具體任務(wù)的執(zhí)行最終依然會(huì)調(diào)用execute,這個(gè)方法AbstractExecutorService并沒(méi)有實(shí)現(xiàn),因?yàn)檫@也是所有執(zhí)行器的差異所在,即按自己的方式選擇線程執(zhí)行任務(wù), 比如ThreadPoolExecutor線程池的固定線程數(shù)執(zhí)行所有任務(wù)的模式,也就是只需要實(shí)現(xiàn)execute方法,而submit則交給父類AbstractExecutorService處理
以上就是java并發(fā)學(xué)習(xí)之Executor源碼解析的詳細(xì)內(nèi)容,更多關(guān)于java并發(fā)Executor的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
關(guān)于HashMap相同key累加value的問(wèn)題
這篇文章主要介紹了關(guān)于HashMap相同key累加value的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-05-05解決springboot 2.x 里面訪問(wèn)靜態(tài)資源的坑
這篇文章主要介紹了解決springboot 2.x 里面訪問(wèn)靜態(tài)資源的坑,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08Springboot如何根據(jù)實(shí)體類生成數(shù)據(jù)庫(kù)表
這篇文章主要介紹了Springboot如何根據(jù)實(shí)體類生成數(shù)據(jù)庫(kù)表的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09解決java字符串轉(zhuǎn)換成時(shí)間Unparseable date出錯(cuò)的問(wèn)題
這篇文章主要介紹了解決java字符串轉(zhuǎn)換成時(shí)間Unparseable date出錯(cuò)的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06springboot通過(guò)jar包啟動(dòng)中文日志亂碼問(wèn)題及解決
這篇文章主要介紹了springboot通過(guò)jar包啟動(dòng)中文日志亂碼問(wèn)題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06Java二叉搜索樹(shù)基礎(chǔ)原理與實(shí)現(xiàn)方法詳解
這篇文章主要介紹了Java二叉搜索樹(shù)基礎(chǔ)原理與實(shí)現(xiàn)方法,結(jié)合圖文與實(shí)例形式詳細(xì)分析了Java二叉搜索樹(shù)的基本概念、原理、實(shí)現(xiàn)方法與操作注意事項(xiàng),需要的朋友可以參考下2020-03-03Java基礎(chǔ)之finally語(yǔ)句與return語(yǔ)句詳解
這篇文章主要介紹了Java基礎(chǔ)之finally語(yǔ)句與return語(yǔ)句詳解,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java基礎(chǔ)的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-04-04