亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Java 線程池ThreadPoolExecutor源碼解析

 更新時間:2022年03月11日 13:04:42   作者:Q.E.D.  
這篇文章主要介紹了Java 線程池ThreadPoolExecutor源碼解析

引導語

線程池我們在工作中經常會用到。在請求量大時,使用線程池,可以充分利用機器資源,增加請求的處理速度,本章節(jié)我們就和大家一起來學習線程池。

本章的順序,先說源碼,弄懂原理,接著看一看面試題,最后看看實際工作中是如何運用線程池的。

1、整體架構圖

我們畫了線程池的整體圖,如下:

圖片描述

 本小節(jié)主要就按照這個圖來進行 ThreadPoolExecutor 源碼的講解,大家在看各個方法時,可以結合這個圖一起看。

1.1、類結構

首先我們來看一下 ThreadPoolExecutor 的類結構,如下圖:

圖片描述

從上圖中,我們從命名上來看,都有 Executor 的共同命名,Executor 的中文意思為執(zhí)行的意思,表示對提供的任務進行執(zhí)行,我們在第五章線程中學習到了幾種任務:Runnable、Callable、FutureTask,之前我們都是使用 Thread 來執(zhí)行這些任務的,除了 Thread,這些 Executor 命名的類和接口也是可以執(zhí)行這幾種任務的,接下來我們大概的看下這幾個類的大概含義:

Executor:定義 execute 方法來執(zhí)行任務,入參是 Runnable,無出參:

圖片描述

 ExecutorService:Executor 的功能太弱,ExecutorService 豐富了對任務的執(zhí)行和管理的功能,主要代碼如下:

// 關閉,不會接受新的任務,也不會等待未完成的任務
// 如果需要等待未完成的任務,可以使用 awaitTermination 方法
void shutdown();
// executor 是否已經關閉了,返回值 true 表示已關閉
boolean isShutdown();
// 所有的任務是否都已經終止,是的話,返回 true
boolean isTerminated();
// 在超時時間內,等待剩余的任務終止
boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException;
// 提交有返回值的任務,使用 get 方法可以阻塞等待任務的執(zhí)行結果返回
<T> Future<T> submit(Callable<T> task);
// 提交沒有返回值的任務,如果使用 get 方法的話,任務執(zhí)行完之后得到的是 null 值
Future<?> submit(Runnable task);
// 給定任務集合,返回已經執(zhí)行完成的 Future 集合,每個返回的 Future 都是 isDone = true 的狀態(tài)
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;
// 給定任務中有一個執(zhí)行成功就返回,如果拋異常,其余未完成的任務將被取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException;

AbstractExecutorService 是一個抽象類,封裝了 Executor 的很多通用功能,比如:

// 把 Runnable 轉化成 RunnableFuture
// RunnableFuture 是一個接口,實現了 Runnable 和 Future
// FutureTask 是 RunnableFuture 的實現類,主要是對任務進行各種管理
// Runnable + Future => RunnableFuture => FutureTask
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}
// 提交無返回值的任務
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    // ftask 其實是 FutureTask
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
// 提交有返回值的任務
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    // ftask 其實是 FutureTask
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

有幾個點需要注意下:

FutureTask 我們在第五章有說,其本身就是一個任務,而且具備對任務管理的功能,比如可以通過 get 方法拿到任務的執(zhí)行結果;

submit 方法是我們平時使用線程池時提交任務的方法,支持 Runable 和 Callable 兩種任務的提交,方法中 execute 方法是其子類 ThreadPoolExecutor 實現的,不管是那種任務入參,execute 方法最終執(zhí)行的任務都是 FutureTask;

ThreadPoolExecutor 繼承了 AbstractExecutorService 抽象類,具備以上三個類的所有功能。

1.2、類注釋

ThreadPoolExecutor 的類注釋有很多,我們選取關鍵的注釋如下:

  1. ExecutorService 使用線程池中的線程執(zhí)行提交的任務,線程池我們可以使用 Executors 進行配置;
  2. 線程池解決兩個問題:1:通過減少任務間的調度開銷 (主要是通過線程池中的線程被重復使用的方式),來提高大量任務時的執(zhí)行性能;2:提供了一種方式來管理線程和消費,維護基本數據統(tǒng)計等工作,比如統(tǒng)計已完成的任務數;
  3. Executors 為常用的場景設定了可直接初始化線程池的方法,比如 Executors#newCachedThreadPool 無界的線程池,并且可以自動回收;Executors#newFixedThreadPool 固定大小線程池;Executors#newSingleThreadExecutor 單個線程的線程池;
  4. 為了在各種上下文中使用線程池,線程池提供可供擴展的參數設置:1:coreSize:當新任務提交時,發(fā)現運行的線程數小于 coreSize,一個新的線程將被創(chuàng)建,即使這時候其它工作線程是空閑的,可以通過 getCorePoolSize 方法獲得 coreSize;2:maxSize: 當任務提交時,coreSize < 運行線程數 <= maxSize,但隊列沒有滿時,任務提交到隊列中,如果隊列滿了,在 maxSize 允許的范圍內新建線程;
  5. 一般來說,coreSize 和 maxSize 在線程池初始化時就已經設定了,但我們也可以通過 setCorePoolSize、setMaximumPoolSize 方法動態(tài)的修改這兩個值;
  6. 默認的,core threads 需要到任務提交后才創(chuàng)建的,但我們可以分別使用 prestartCoreThread、prestartAllCoreThreads 兩個方法來提前創(chuàng)建一個、所有的 core threads;
  7. 新的線程被默認 ThreadFactory 創(chuàng)建時,優(yōu)先級會被限制成 NORM_PRIORITY,默認會被設置成非守護線程,這個和新建線程的繼承是不同的;
  8. Keep-alive times 參數的作用:1:如果當前線程池中有超過 coreSize 的線程;2:并且線程空閑的時間超過 keepAliveTime,當前線程就會被回收,這樣可以避免線程沒有被使用時的資源浪費;
  9. 通過 setKeepAliveTime 方法可以動態(tài)的設置 keepAliveTime 的值;
  10. 如果設置 allowCoreThreadTimeOut 為 ture 的話,core thread 空閑時間超過 keepAliveTime 的話,也會被回收;
  11. 線程池新建時,有多種隊列可供選擇,比如:1:SynchronousQueue,為了避免任務被拒絕,要求線程池的 maxSize 無界,缺點是當任務提交的速度超過消費的速度時,可能出現無限制的線程增長;2:LinkedBlockingQueue,無界隊列,未消費的任務可以在隊列中等待;3:ArrayBlockingQueue,有界隊列,可以防止資源被耗盡;
  12. 隊列的維護:提供了 getQueue () 方法方便我們進行監(jiān)控和調試,嚴禁用于其他目的,remove 和 purge 兩個方法可以對隊列中的元素進行操作;
  13. 在 Executor 已經關閉或對最大線程和最大隊列都使用飽和時,可以使用 RejectedExecutionHandler 類進行異常捕捉,有如下四種處理策略:ThreadPoolExecutor.AbortPolicy、ThreadPoolExecutor.DiscardPolicy、ThreadPoolExecutor.CallerRunsPolicy、ThreadPoolExecutor.DiscardOldestPolicy;
  14. 線程池提供了很多可供擴展的鉤子函數,比如有:1:提供在每個任務執(zhí)行之前 beforeExecute 和執(zhí)行之后 afterExecute 的鉤子方法,主要用于操作執(zhí)行環(huán)境,比如初始化 ThreadLocals、收集統(tǒng)計數據、添加日志條目等;2: 如果在執(zhí)行器執(zhí)行完成之后想干一些事情,可以實現 terminated 方法,如果鉤子方法執(zhí)行時發(fā)生異常,工作線程可能會失敗并立即終止。

可以看到 ThreadPoolExecutor 的注釋是非常多的,也是非常重要的,我們很多面試的題目,在注釋上都能找到答案。

1.3、ThreadPoolExecutor 重要屬性

接下來我們來看一看 ThreadPoolExecutor 都有哪些重要屬性,如下:

//ctl 線程池狀態(tài)控制字段,由兩部分組成:
//1:workerCount  wc 工作線程數,我們限制 workerCount 最大到(2^29)-1,大概 5 億個線程
//2:runState rs 線程池的狀態(tài),提供了生命周期的控制,源碼中有很多關于狀態(tài)的校驗,狀態(tài)枚舉如下:
//RUNNING(-536870912):接受新任務或者處理隊列里的任務。
//SHUTDOWN(0):不接受新任務,但仍在處理已經在隊列里面的任務。
//STOP(-536870912):不接受新任務,也不處理隊列中的任務,對正在執(zhí)行的任務進行中斷。
//TIDYING(1073741824): 所以任務都被中斷,workerCount 是 0,整理狀態(tài)
//TERMINATED(1610612736): terminated() 已經完成的時候
//runState 之間的轉變過程:
//RUNNING -> SHUTDOWN:調用 shudown(),finalize()
//(RUNNING or SHUTDOWN) -> STOP:調用shutdownNow()
//SHUTDOWN -> TIDYING -> workerCount ==0
//STOP -> TIDYING -> workerCount ==0
//TIDYING -> TERMINATED -> terminated() 執(zhí)行完成之后
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;// 29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// =(2^29)-1=536870911
// Packing and unpacking ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;//-536870912
private static final int SHUTDOWN   =  0 << COUNT_BITS;//0
private static final int STOP       =  1 << COUNT_BITS;//-536870912
private static final int TIDYING    =  2 << COUNT_BITS;//1073741824
private static final int TERMINATED =  3 << COUNT_BITS;//1610612736
// 已完成任務的計數
volatile long completedTasks;
// 線程池最大容量
private int largestPoolSize;
// 已經完成的任務數
private long completedTaskCount;
// 用戶可控制的參數都是 volatile 修飾的
// 可以使用 threadFactory 創(chuàng)建 thread
// 創(chuàng)建失敗一般不拋出異常,只有在 OutOfMemoryError 時候才會
private volatile ThreadFactory threadFactory;
// 飽和或者運行中拒絕任務的 handler 處理類
private volatile RejectedExecutionHandler handler;
// 線程存活時間設置
private volatile long keepAliveTime;
// 設置 true 的話,核心線程空閑 keepAliveTime 時間后,也會被回收
private volatile boolean allowCoreThreadTimeOut;
// coreSize
private volatile int corePoolSize;
// maxSize 最大限制 (2^29)-1
private volatile int maximumPoolSize;
// 默認的拒絕策略
private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();
// 隊列會 hold 住任務,并且利用隊列的阻塞的特性,來保持線程的存活周期
private final BlockingQueue<Runnable> workQueue;
// 大多數情況下是控制對 workers 的訪問權限
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();
// 包含線程池中所有的工作線程
private final HashSet<Worker> workers = new HashSet<Worker>();

屬性也是非常多,為了方便理解線程池的狀態(tài)扭轉,畫了一個圖:

圖片描述

 Worker 我們可以理解成線程池中任務運行的最小單元,Worker 的大致結構如下:

// 線程池中任務執(zhí)行的最小單元
// Worker 繼承 AQS,具有鎖功能
// Worker 實現 Runnable,本身是一個可執(zhí)行的任務
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    // 任務運行的線程
    final Thread thread;
    // 需要執(zhí)行的任務
    Runnable firstTask;
    // 非常巧妙的設計,Worker本身是個 Runnable,把自己作為任務傳遞給 thread
    // 內部有個屬性又設置了 Runnable
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 把 Worker 自己作為 thread 運行的任務
        this.thread = getThreadFactory().newThread(this);
    }
   /** Worker 本身是 Runnable,run 方法是 Worker 執(zhí)行的入口, runWorker 是外部的方法 */
    public void run() {
        runWorker(this);
    }
    private static final long serialVersionUID = 6138294804551838833L;
    // Lock methods
    // 0 代表沒有鎖住,1 代表鎖住
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    // 嘗試加鎖,CAS 賦值為 1,表示鎖住
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    // 嘗試釋放鎖,釋放鎖沒有 CAS 校驗,可以任意的釋放鎖
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

理解 Worker 非常關鍵,主要有以下幾點:

Worker 很像是任務的代理,在線程池中,最小的執(zhí)行單位就是 Worker,所以 Worker 實現了 Runnable 接口,實現了 run 方法;

在 Worker 初始化時 this.thread = getThreadFactory ().newThread (this) 這行代碼比較關鍵,它把當前 Worker 作為線程的構造器入參,我們在后續(xù)的實現中會發(fā)現這樣的代碼:Thread t = w.thread;t.start (),此時的 w 是 Worker 的引用申明,此處 t.start 實際上執(zhí)行的就是 Worker 的 run 方法;

Worker 本身也實現了 AQS,所以其本身也是一個鎖,其在執(zhí)行任務的時候,會鎖住自己,任務執(zhí)行完成之后,會釋放自己。

2、線程池的任務提交

線程池的任務提交從 submit 方法說起,submit 方法是 AbstractExecutorService 抽象類定義的,主要做了兩件事情:

  1. 把 Runnable 和 Callable 都轉化成 FutureTask,這個我們之前看過源碼了;
  2. 使用 execute 方法執(zhí)行 FutureTask。

execute 方法是 ThreadPoolExecutor 中的方法,源碼如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 工作的線程小于核心線程數,創(chuàng)建新的線程,成功返回,失敗不拋異常
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        // 線程池狀態(tài)可能發(fā)生變化
        c = ctl.get();
    }
    // 工作的線程大于等于核心線程數,或者新建線程失敗
    // 線程池狀態(tài)正常,并且可以入隊的話,嘗試入隊列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 如果線程池狀態(tài)異常 嘗試從隊列中移除任務,可以移除的話就拒絕掉任務
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 發(fā)現可運行的線程數是 0,就初始化一個線程,這里是個極限情況,入隊的時候,突然發(fā)現
        // 可用線程都被回收了
        else if (workerCountOf(recheck) == 0)
            // Runnable是空的,不會影響新增線程,但是線程在 start 的時候不會運行
            // Thread.run() 里面有判斷
            addWorker(null, false);
    }
    // 隊列滿了,開啟線程到 maxSize,如果失敗直接拒絕,
    else if (!addWorker(command, false))
        reject(command);
}

execute 方法執(zhí)行的就是整體架構圖的左半邊的邏輯,其中多次調用 addWorker 方法,addWorker 方法的作用是新建一個 Worker,我們一起來看下源碼:

// 結合線程池的情況看是否可以添加新的 worker
// firstTask 不為空可以直接執(zhí)行,為空執(zhí)行不了,Thread.run()方法有判斷,Runnable為空不執(zhí)行
// core 為 true 表示線程最大新增個數是 coresize,false 表示最大新增個數是 maxsize
// 返回 true 代表成功,false 失敗
// break retry 跳到retry處,且不再進入循環(huán)
// continue retry 跳到retry處,且再次進入循環(huán)
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 先是各種狀態(tài)的校驗
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        // rs >= SHUTDOWN 說明線程池狀態(tài)不正常
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            // 工作中的線程數大于等于容量,或者大于等于 coreSize or maxSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                // break 結束 retry 的 for 循環(huán)
                break retry;
            c = ctl.get();  // Re-read ctl
            // 線程池狀態(tài)被更改
            if (runStateOf(c) != rs)
                // 跳轉到retry位置
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 巧妙的設計,Worker 本身是個 Runnable.
        // 在初始化的過程中,會把 worker 丟給 thread 去初始化
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 啟動線程,實際上去執(zhí)行 Worker.run 方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker 方法首先是執(zhí)行了一堆校驗,然后使用 new Worker (firstTask) 新建了 Worker,最后使用 t.start () 執(zhí)行 Worker,上文我們說了 Worker 在初始化時的關鍵代碼:this.thread = getThreadFactory ().newThread (this),Worker(this) 是作為新建線程的構造器入參的,所以 t.start () 會執(zhí)行到 Worker 的 run 方法上,源碼如下:

public void run() {
    runWorker(this);
}

runWorker 方法是非常重要的方法,我們一起看下源碼實現:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    //幫助gc回收
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // task 為空的情況:
        // 1:任務入隊列了,極限情況下,發(fā)現沒有運行的線程,于是新增一個線程;
        // 2:線程執(zhí)行完任務執(zhí)行,再次回到 while 循環(huán)。
        // 如果 task 為空,會使用 getTask 方法阻塞從隊列中拿數據,如果拿不到數據,會阻塞住
        while (task != null || (task = getTask()) != null) {
            //鎖住 worker
            w.lock();
            // 線程池 stop 中,但是線程沒有到達中斷狀態(tài),幫助線程中斷
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //執(zhí)行 before 鉤子函數
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //同步執(zhí)行任務
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //執(zhí)行 after 鉤子函數,如果這里拋出異常,會覆蓋 catch 的異常
                    //所以這里異常最好不要拋出來
                    afterExecute(task, thrown);
                }
            } finally {
                //任務執(zhí)行完成,計算解鎖
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //做一些拋出異常的善后工作
        processWorkerExit(w, completedAbruptly);
    }
}

這個方法執(zhí)行的邏輯是架構圖中的標紅部分:

圖片描述

我們聚焦一下這行代碼:task.run () 此時的 task 是什么呢?此時的 task 是 FutureTask 類,所以我們繼續(xù)追索到 FutureTask 類的 run 方法的源碼,如下:

/**
 * run 方法可以直接被調用
 * 也可以由線程池進行調用
 */
public void run() {
    // 狀態(tài)不是任務創(chuàng)建,或者當前任務已經有線程在執(zhí)行了
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        // Callable 不為空,并且已經初始化完成
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 調用執(zhí)行
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            // 給 outcome 賦值
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

 run 方法中有兩行關鍵代碼:

  • result = c.call () 這行代碼是真正執(zhí)行業(yè)務代碼的地方;
  • set (result) 這里是給 outCome 賦值,這樣 Future.get 方法執(zhí)行時,就可以從 outCome 中拿值,這個我們在《Future、ExecutorService 源碼解析》章節(jié)中都有說到。

至此,submit 方法就執(zhí)行完成了,整體流程比較復雜,我們畫一個圖釋義一下任務提交執(zhí)行的主流程:

圖片描述

3、線程執(zhí)行完任務之后都在干啥 

線程執(zhí)行完任務之后,是消亡還是干什么呢?這是一個值得思考的問題,我們可以從源碼中找到答案,從 ThreadPoolExecutor 的 runWorker 方法中,不知道有沒有同學注意到一個 while 循環(huán),我們截圖釋義一下:

圖片描述

 這個 while 循環(huán)有個 getTask 方法,getTask 的主要作用是阻塞從隊列中拿任務出來,如果隊列中有任務,那么就可以拿出來執(zhí)行,如果隊列中沒有任務,這個線程會一直阻塞到有任務為止(或者超時阻塞),下面我們一起來看下 getTask 方法,源碼如下:

// 從阻塞隊列中拿任務
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //線程池關閉 && 隊列為空,不需要在運行了,直接放回
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        // true  運行的線程數大于 coreSize || 核心線程也可以被滅亡
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // 隊列以 LinkedBlockingQueue 為例,timedOut 為 true 的話說明下面 poll 方法執(zhí)行返回的是 null
        // 說明在等待 keepAliveTime 時間后,隊列中仍然沒有數據
        // 說明此線程已經空閑了 keepAliveTime 了
        // 再加上 wc > 1 || workQueue.isEmpty() 的判斷
        // 所以使用 compareAndDecrementWorkerCount 方法使線程池數量減少 1
        // 并且直接 return,return 之后,此空閑的線程會自動被回收
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            // 從隊列中阻塞拿 worker
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 設置已超時,說明此時隊列沒有數據
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

代碼有兩處關鍵:

使用隊列的 poll 或 take 方法從隊列中拿數據,根據隊列的特性,隊列中有任務可以返回,隊列中無任務會阻塞;

方法中的第二個 if 判斷,說的是在滿足一定條件下(條件看注釋),會減少空閑的線程,減少的手段是使可用線程數減一,并且直接 return,直接 return 后,該線程就執(zhí)行結束了,JVM 會自動回收該線程。

4、總結

本章節(jié)主要以 submit 方法為主線闡述了 ThreadPoolExecutor 的整體架構和底層源碼,只要有隊列和線程的基礎知識的話,理解 ThreadPoolExecutor 并不復雜。ThreadPoolExecutor 還有一些其他的源碼,比如說拒絕請求的策略、得到各種屬性、設置各種屬性等等方法,這些方法都比較簡單,感興趣的同學可以自己去看一看。

以上就是Java 線程池ThreadPoolExecutor源碼解析的詳細內容,更多關于Java 線程池ThreadPoolExecutor的資料請關注腳本之家其它相關文章!

相關文章

  • 深入理解Java設計模式之策略模式

    深入理解Java設計模式之策略模式

    這篇文章主要介紹了JAVA設計模式之策略模式的的相關資料,文中示例代碼非常詳細,供大家參考和學習,感興趣的朋友可以了解下
    2021-11-11
  • Java使用ScriptEngine動態(tài)執(zhí)行代碼(附Java幾種動態(tài)執(zhí)行代碼比較)

    Java使用ScriptEngine動態(tài)執(zhí)行代碼(附Java幾種動態(tài)執(zhí)行代碼比較)

    這篇文章主要介紹了Java使用ScriptEngine動態(tài)執(zhí)行代碼,并且分享Java幾種動態(tài)執(zhí)行代碼比較,需要的朋友可以參考下
    2021-04-04
  • java實現獲取網站的keywords,description

    java實現獲取網站的keywords,description

    這篇文章主要介紹了java實現獲取網站的keywords,description的相關資料,需要的朋友可以參考下
    2015-03-03
  • Java中LinkedList數據結構的詳細介紹

    Java中LinkedList數據結構的詳細介紹

    這篇文章主要介紹了Java中LinkedList,Linked List 是 java.util 包中 Collection 框架的一部分,文中提供了詳細的代碼說明,需要的朋友可以參考下
    2023-05-05
  • Java日常練習題,每天進步一點點(37)

    Java日常練習題,每天進步一點點(37)

    下面小編就為大家?guī)硪黄狫ava基礎的幾道練習題(分享)。小編覺得挺不錯的,現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧,希望可以幫到你
    2021-07-07
  • Java異常架構和異常關鍵字圖文詳解

    Java異常架構和異常關鍵字圖文詳解

    Java異常是Java提供的一種識別及響應錯誤的一致性機制,下面這篇文章主要給大家介紹了關于Java異常架構和異常關鍵字的相關資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下
    2022-05-05
  • Java日常練習題,每天進步一點點(6)

    Java日常練習題,每天進步一點點(6)

    下面小編就為大家?guī)硪黄狫ava基礎的幾道練習題(分享)。小編覺得挺不錯的,現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧,希望可以幫到你
    2021-07-07
  • springboot+vue實現Minio文件存儲的示例代碼

    springboot+vue實現Minio文件存儲的示例代碼

    本文主要介紹了springboot+vue實現Minio文件存儲的示例代碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2024-02-02
  • Java中線程Thread的三種方式和對比

    Java中線程Thread的三種方式和對比

    這篇文章主要介紹了Java中線程Thread的三種方式和對比,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2021-04-04
  • spring Boot與Mybatis整合優(yōu)化詳解

    spring Boot與Mybatis整合優(yōu)化詳解

    關于spring-boot與mybatis整合優(yōu)化方面的介紹,就是Mybatis-Spring-boot-starter的介紹,具體內容詳情大家參考下本文
    2017-07-07

最新評論