亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

深入了解Java線程池:從設(shè)計(jì)思想到源碼解讀

 更新時(shí)間:2021年12月29日 09:09:41   作者:Ayue、  
這篇文章將從設(shè)計(jì)思想到源碼解讀,帶大家深入了解Java的線程池,文中的示例代碼講解詳細(xì),對(duì)我們的學(xué)習(xí)或工作有一定的幫助,需要的可以參考一下

為什么需要線程池

我們知道創(chuàng)建線程的常用方式就是 new Thread() ,而每一次 new Thread() 都會(huì)重新創(chuàng)建一個(gè)線程,而線程的創(chuàng)建和銷毀都需要耗時(shí)的,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性。在 jdk1.5 的 JUC 包中有一個(gè) Executors,他能使我們創(chuàng)建的線程得到復(fù)用,不會(huì)頻繁的創(chuàng)建和銷毀線程。

線程池首先創(chuàng)建一些線程,它們的集合稱為線程池。使用線程池可以很好地提高性能,線程池在系統(tǒng)啟動(dòng)時(shí)即創(chuàng)建大量空閑的線程,程序?qū)⒁粋€(gè)任務(wù)傳給線程池,線程池就會(huì)啟動(dòng)一條線程來執(zhí)行這個(gè)任務(wù),執(zhí)行結(jié)束以后,該線程并不會(huì)死亡,而是再次返回線程池中成為空閑狀態(tài),等待執(zhí)行下一個(gè)任務(wù)。

先不管它到底是個(gè)啥,先看看使用線程池和 new Thread() 的耗時(shí)情況:

public class ThreadPoolTest {

    static CountDownLatch latch = new CountDownLatch(100000);
    static ExecutorService es = Executors.newFixedThreadPool(4);

    public static void main(String[] args) throws InterruptedException {

        long timeStart = System.currentTimeMillis();

        for (int i = 0; i < 100000; i++) {
            newThread();
            //executors();
        }
        latch.await();
        System.out.println(System.currentTimeMillis() - timeStart);
        es.shutdown();
    }

    /**
     * 使用線程池
     */
    public static void executors() {
        es.submit(() -> {
            latch.countDown();
        });
    }

    /**
     * 直接new
     */
    public static void newThread() {
        new Thread(() -> {
            latch.countDown();
        }).start();
    }
}

對(duì)于 10 萬(wàn)個(gè)線程同時(shí)跑,如果使用 new 的方式耗時(shí):

使用線程池耗時(shí):

總得來說,合理的使用線程池可以帶來以下幾個(gè)好處:

1.降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程,降低線程創(chuàng)建和銷毀造成的消耗。

2.提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。

3.增加線程的可管理性。線程是稀缺資源,使用線程池可以進(jìn)行統(tǒng)一分配,調(diào)優(yōu)和監(jiān)控。

線程池設(shè)計(jì)思路

我們先了解線程池的思路,哪怕你重來沒了解過什么是線程池,所以不會(huì)一上來就給你講一堆線程池的參數(shù)。我嘗試多種想法來解釋它的設(shè)計(jì)思路,但都過于官方,但在查找資料的時(shí)候在博客上看到了非常通俗易懂的描述,它是這樣描述的,先假想一個(gè)工廠的生產(chǎn)流程:

工廠中有固定的一批工人,稱為正式工人,工廠接收的訂單由這些工人去完成。當(dāng)訂單增加,正式工人已經(jīng)忙不過來了,工廠會(huì)將生產(chǎn)原料暫時(shí)堆積在倉(cāng)庫(kù)中,等有空閑的工人時(shí)再處理(因?yàn)楣と丝臻e了也不會(huì)主動(dòng)處理倉(cāng)庫(kù)中的生產(chǎn)任務(wù),所以需要調(diào)度員實(shí)時(shí)調(diào)度)。倉(cāng)庫(kù)堆積滿了后,訂單還在增加怎么辦?工廠只能臨時(shí)擴(kuò)招一批工人來應(yīng)對(duì)生產(chǎn)高峰,而這批工人高峰結(jié)束后是要清退的,所以稱為臨時(shí)工。當(dāng)時(shí)臨時(shí)工也以招滿后(受限于工位限制,臨時(shí)工數(shù)量有上限),后面的訂單只能忍痛拒絕了。

和線程池的映射如下:

  • 工廠——線程池
  • 訂單——任務(wù)(Runnable)
  • 正式工人——核心線程
  • 臨時(shí)工——普通線程
  • 倉(cāng)庫(kù)——任務(wù)隊(duì)列
  • 調(diào)度員——getTask()

getTask()是一個(gè)方法,將任務(wù)隊(duì)列中的任務(wù)調(diào)度給空閑線程,源碼分析再去了解。

映射后,形成線程池流程圖如下:

線程池的工作機(jī)制

了解了線程池設(shè)計(jì)思路,我們可以總結(jié)一下線程池的工作機(jī)制:

在線程池的編程模式下,任務(wù)是提交給整個(gè)線程池,而不是直接提交給某個(gè)線程,線程池在拿到任務(wù)后, 在內(nèi)部尋找是否有空閑的線程 ,如果有,則將任務(wù)交給某個(gè)空閑的線程。如果不存在空閑線程,即線程池中的線程數(shù)大于核心線程 corePoolSize ,則將任務(wù)添加到任務(wù)隊(duì)列中 workQueue ,如果任務(wù)隊(duì)列有界且滿了之后則會(huì)判斷線程池中的線程數(shù)是否大于最大線程數(shù) maximumPoolSize ,如果小于則會(huì)創(chuàng)建新的線程來執(zhí)行任務(wù),否則在沒有空閑線程的情況下就會(huì)執(zhí)行決絕策略 handler 。

注意:線程池中剛開始沒有線程,當(dāng)一個(gè)任務(wù)提交給線程池后,線程池會(huì)創(chuàng)建一個(gè)新線程來執(zhí)行任務(wù)。一個(gè)線程同時(shí)只能執(zhí)行一個(gè)任務(wù),但可以同時(shí)向一個(gè)線程池提交多個(gè)任務(wù)。

線程池的參數(shù)及使用

線程池的真正實(shí)現(xiàn)類是 ThreadPoolExecutor ,類的集成關(guān)系如下:

ThreadPoolExecutor的構(gòu)造方法有幾個(gè),掌握最主要的即可,其中包含 7 個(gè)參數(shù):

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 

corePoolSize(必需),線程池中的核心線程數(shù)。

當(dāng)提交一個(gè)任務(wù)時(shí),線程池創(chuàng)建一個(gè)新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于 corePoolSize。

如果當(dāng)前線程數(shù)小于 corePoolSize,此時(shí)存在 空閑線程 ,提交的任務(wù)會(huì)創(chuàng)建一個(gè)新線程來執(zhí)行該任務(wù)。

如果當(dāng)前線程數(shù)等于 corePoolSize,則繼續(xù)提交的任務(wù)被保存到阻塞隊(duì)列中,等待被執(zhí)行。

如果執(zhí)行了線程池 prestartAllCoreThreads() 方法,線程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線程。

maximumPoolSize(必需),線程池中允許的最大線程數(shù)。

當(dāng)隊(duì)列滿了,且 已創(chuàng)建的線程數(shù)小于 maximumPoolSize ,則線程池會(huì)創(chuàng)建新的線程來執(zhí)行任務(wù)。另外,對(duì)于無(wú)界隊(duì)列,可忽略該參數(shù)。

keepAliveTime(必需),線程存活保持時(shí)間。

當(dāng)線程沒有任務(wù)執(zhí)行時(shí),繼續(xù)存活的時(shí)間。默認(rèn)情況下,該參數(shù)只在線程數(shù)大于 corePoolSize 時(shí)才有用,即當(dāng)非核心線程處于空閑狀態(tài)的時(shí)間超過這個(gè)時(shí)間后,該線程將被回收。將 allowCoreThreadTimeOut 參數(shù)設(shè)置為 true 后,核心線程也會(huì)被回收。

unit(必需),keepAliveTime 的時(shí)間單位。

workQueue(必需),任務(wù)隊(duì)列。

用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列。workQueue 必須是 BlockingQueue 阻塞隊(duì)列。當(dāng)線程池中的線程數(shù)超過它的 corePoolSize 的時(shí)候,線程會(huì)進(jìn)入阻塞隊(duì)列進(jìn)行阻塞等待。

一般來說,我們應(yīng)該盡量使用有界隊(duì)列,因?yàn)槭褂脽o(wú)界隊(duì)列作為工作隊(duì)列會(huì)對(duì)線程池帶來如下影響。

當(dāng)線程池中的線程數(shù)達(dá)到 corePoolSize 后,新任務(wù)將在無(wú)界隊(duì)列中等待,因此線程池中的線程數(shù)不會(huì)超過 corePoolSize。

由于 1,使用無(wú)界隊(duì)列時(shí) maximumPoolSize 將是一個(gè)無(wú)效參數(shù)。

由于 1 和 2,使用無(wú)界隊(duì)列時(shí) keepAliveTime 將是一個(gè)無(wú)效參數(shù)。

更重要的,使用無(wú)界 queue 可能會(huì)耗盡系統(tǒng)資源,有界隊(duì)列則有助于防止資源耗盡,同時(shí)即使使用有界隊(duì)列,也要盡量控制隊(duì)列的大小在一個(gè)合適的范圍。一般使用, ArrayBlockingQueue 、 LinkedBlockingQueue 、 SynchronousQueue 、 PriorityBlockingQueue 等。

threadFactory(可選),創(chuàng)建線程的工廠。

通過自定義的線程工廠可以給每個(gè)新建的線程設(shè)置一個(gè)具有識(shí)別度的 線程名 ,threadFactory 創(chuàng)建的線程也是采用 new Thread() 方式,threadFactory 創(chuàng)建的線程名都具有統(tǒng)一的風(fēng)格: pool-m-thread-n (m 為線程池的編號(hào),n 為線程池內(nèi)的線程編號(hào))。

handler(可選),線程飽和策略。

當(dāng)阻塞隊(duì)列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務(wù),必須采取一種策略處理該任務(wù),線程池提供了 四種策略:

AbortPolicy,直接拋出異常,默認(rèn)策略。

CallerRunsPolicy,用調(diào)用者所在的線程來執(zhí)行任務(wù)。

DiscardOldestPolicy,丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù)。

DiscardPolicy,直接丟棄任務(wù)。

當(dāng)然也可以根據(jù)應(yīng)用場(chǎng)景實(shí)現(xiàn) RejectedExecutionHandler 接口,自定義飽和策略,如記錄日志或持久化存儲(chǔ)不能處理的任務(wù)。

線程池的狀態(tài)

ThreadPoolExecutor 使用 int 的高 3 位來表示線程池狀態(tài),低 29 位表示線程數(shù)量:

源碼如下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;//29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//約5億
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

至于為什么這么設(shè)計(jì),我覺得主要原因是為了避免額外的開銷,如果使用 2 個(gè)變量來分別表示狀態(tài)和線程數(shù)量,為了保證原子性必須進(jìn)行額外的加鎖操作,而 ctl 則通過原子類就解決了該問題,在通過位運(yùn)算就能得到狀態(tài)和線程數(shù)量。

提交任務(wù)

可以使用兩個(gè)方法向線程池提交任務(wù),分別為 execute() 和 submit() 方法。

  • execute(),用于提交不需要返回值的任務(wù),所以無(wú)法判斷任務(wù)是否被線程池執(zhí)行成功。
  • submit(),用于提交需要返回值的任務(wù)。線程池會(huì)返回一個(gè) future 類型的對(duì)象,通過這個(gè) future 對(duì)象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過 future 的 get() 方法來獲取返回值, get() 方法會(huì)阻塞當(dāng)前線程直到任務(wù)完成,而使用 get(long timeout,TimeUnit unit) 方法則會(huì)阻塞當(dāng)前線程一段時(shí)間后立即返回,這 時(shí)候有可能任務(wù)沒有執(zhí)行完。

此外, ExecutorService 還提供了兩個(gè)提交任務(wù)的方法, invokeAny() 和 invokeAll() 。

  • invokeAny(),提交所有任務(wù),哪個(gè)任務(wù)先成功執(zhí)行完畢,返回此任務(wù)執(zhí)行結(jié)果,其它任務(wù)取消。
  • invokeAll(),提交所有的任務(wù)且必須全部執(zhí)行完成。

corePoolSize 和 maximumPoolSize

測(cè)試核心線程數(shù)為 1 ,最大線程數(shù)為 2,任務(wù)隊(duì)列為 1。

@Slf4j(topic = "ayue")
public class ThreadExecutorPoolTest1 {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = 
            new ThreadPoolExecutor(1, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
        for (int i = 1; i < 4; i++) {
            //執(zhí)行任務(wù)
            executor.execute(new MyTask(i));
        }
    }


    //任務(wù)
    static class MyTask implements Runnable {

        private int taskNum;

        public MyTask(int num) {
            this.taskNum = num;
        }

        @Override
        public void run() {
            log.debug("線程名稱:{},正在執(zhí)行task:{}", Thread.currentThread().getName(), taskNum);
            try {
                //模擬其他操作
                Thread.currentThread().sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("task{}執(zhí)行完畢", taskNum);
        }
    }
}

輸出:

<code data-type="codeline">11:07:04.377 [pool-1-thread-2] DEBUG ayue - 線程名稱:pool-1-thread-2,正在執(zhí)行task:3</code><code data-type="codeline">11:07:04.377 [pool-1-thread-1] DEBUG ayue - 線程名稱:pool-1-thread-1,正在執(zhí)行task:1</code><code data-type="codeline">11:07:05.384 [pool-1-thread-2] DEBUG ayue - task3執(zhí)行完畢</code><code data-type="codeline">11:07:05.384 [pool-1-thread-1] DEBUG ayue - task1執(zhí)行完畢</code><code data-type="codeline">11:07:05.384 [pool-1-thread-2] DEBUG ayue - 線程名稱:pool-1-thread-2,正在執(zhí)行task:2</code><code data-type="codeline">11:07:06.397 [pool-1-thread-2] DEBUG ayue - task2執(zhí)行完畢</code>

當(dāng)有 3 個(gè)線程通過線程池執(zhí)行任務(wù)時(shí),由于核心線程只有一個(gè),且任務(wù)隊(duì)列為 1,所以當(dāng)?shù)?3 個(gè)線程到來的時(shí)候, 會(huì)重新開啟一個(gè)新的線程 pool-1-thread-2 來執(zhí)行任務(wù)。

當(dāng)然,這里可能有人問核心線程會(huì)不會(huì)大于最大線程?當(dāng)然不會(huì),如果 corePoolSize > maximumPoolSize ,則程序啟動(dòng)會(huì)直接報(bào)錯(cuò)。

任務(wù)隊(duì)列

任務(wù)隊(duì)列是基于阻塞隊(duì)列實(shí)現(xiàn)的,即采用生產(chǎn)者消費(fèi)者模式,在 Java 中需要實(shí)現(xiàn) BlockingQueue 接口。但 Java 已經(jīng)為我們提供了 7 種阻塞隊(duì)列的實(shí)現(xiàn):

1.ArrayBlockingQueue:一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。

2.LinkedBlockingQueue: 一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列,在未指明容量時(shí),容量默認(rèn)為 Integer.MAX_VALUE 。

3.PriorityBlockingQueue: 一個(gè)支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列,對(duì)元素沒有要求,可以實(shí)現(xiàn) Comparable 接口也可以提供 Comparator 來對(duì)隊(duì)列中的元素進(jìn)行比較。跟時(shí)間沒有任何關(guān)系,僅僅是 按照優(yōu)先級(jí)取任務(wù) 。

4.DelayQueue:類似于 PriorityBlockingQueue,是二叉堆實(shí)現(xiàn)的無(wú)界優(yōu)先級(jí)阻塞隊(duì)列。要求元素都實(shí)現(xiàn) Delayed 接口,通過執(zhí)行時(shí)延從隊(duì)列中提取任務(wù),時(shí)間沒到任務(wù)取不出來。

5.SynchronousQueue: 一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,消費(fèi)者線程調(diào)用 take() 方法的時(shí)候就會(huì)發(fā)生阻塞,直到有一個(gè)生產(chǎn)者線程生產(chǎn)了一個(gè)元素,消費(fèi)者線程就可以拿到這個(gè)元素并返回;生產(chǎn)者線程調(diào)用 put() 方法的時(shí)候也會(huì)發(fā)生阻塞,直到有一個(gè)消費(fèi)者線程消費(fèi)了一個(gè)元素,生產(chǎn)者才會(huì)返回。

6.LinkedBlockingDeque: 使用雙向隊(duì)列實(shí)現(xiàn)的有界雙端阻塞隊(duì)列。雙端意味著可以像普通隊(duì)列一樣 FIFO(先進(jìn)先出),也可以像棧一樣 FILO(先進(jìn)后出)。

7.LinkedTransferQueue: 它是 ConcurrentLinkedQueue、LinkedBlockingQueue 和 SynchronousQueue 的結(jié)合體,但是把它用在 ThreadPoolExecutor 中,和 LinkedBlockingQueue 行為一致,但是是無(wú)界的阻塞隊(duì)列。

線程工廠

線程工廠默認(rèn)創(chuàng)建的線程名: pool-m-thread-n ,在 Executors.defaultThreadFactory() 可以看到:

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
    }
    public Thread newThread(Runnable r) {
        //線程名:namePrefix + threadNumber.getAndIncrement()
        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(),0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

我們也可以通過 ThreadPoolExecutor 自定義線程名:

@Slf4j(topic = "ayue")
public class ThreadExecutorPoolTest1 {

    public static void main(String[] args) {
        //自增線程id
        AtomicInteger threadNumber = new AtomicInteger(1);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "javatv-" + threadNumber.getAndIncrement());
            }
        });
        for (int i = 1; i < 4; i++) {
            executor.execute(new MyTask(i));
        }
    }


    static class MyTask implements Runnable {

        private int taskNum;

        public MyTask(int num) {
            this.taskNum = num;
        }

        @Override
        public void run() {
            log.debug("線程名稱:{},正在執(zhí)行task:{}", Thread.currentThread().getName(), taskNum);
            try {
                //模擬其他操作
                Thread.currentThread().sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("task{}執(zhí)行完畢", taskNum);
        }
    }
}

輸出:

<code data-type="codeline">14:08:07.166 [javatv-1] DEBUG ayue - 線程名稱:javatv-1,正在執(zhí)行task:1</code><code data-type="codeline">14:08:07.166 [javatv-2] DEBUG ayue - 線程名稱:javatv-2,正在執(zhí)行task:3</code><code data-type="codeline">14:08:08.170 [javatv-1] DEBUG ayue - task1執(zhí)行完畢</code><code data-type="codeline">14:08:08.170 [javatv-2] DEBUG ayue - task3執(zhí)行完畢</code><code data-type="codeline">14:08:08.170 [javatv-1] DEBUG ayue - 線程名稱:javatv-1,正在執(zhí)行task:2</code><code data-type="codeline">14:08:09.172 [javatv-1] DEBUG ayue - task2執(zhí)行完畢</code>

拒絕策略

線程池提供了 四種策略:

1.AbortPolicy,直接拋出異常,默認(rèn)策略。

2.CallerRunsPolicy,用調(diào)用者所在的線程來執(zhí)行任務(wù)。

3.DiscardOldestPolicy,丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù)。

4.DiscardPolicy,直接丟棄任務(wù)。、

把上面代碼的循環(huán)次數(shù)改為 4 次,則會(huì)拋出 java.util.concurrent.RejectedExecutionException 異常。

for (int i = 1; i < 5; i++) {
    executor.execute(new MyTask(i));
}

關(guān)閉線程池

可以通過調(diào)用線程池的 shutdown 或 shutdownNow 方法來關(guān)閉線程池。它們的原理是遍歷線程池中的工作線程,然后逐個(gè)調(diào)用線程的 interrupt 方法來中斷線程,所以無(wú)法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無(wú)法終止。但是它們存在一定的區(qū)別, shutdownNow 首先將線程池的狀態(tài)設(shè)置成 STOP ,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表,而 shutdown 只是將線程池的狀態(tài)設(shè)置成 SHUTDOWN 狀態(tài),然后中斷所有沒有正在執(zhí)行任務(wù)的線程。 簡(jiǎn)單來說:

  • shutdown():線程池狀態(tài)變?yōu)?SHUTDOWN,不會(huì)接收新任務(wù),但已提交任務(wù)會(huì)執(zhí)行完,不會(huì)阻塞調(diào)用線程的執(zhí)行 。
  • shutdownNow():線程池狀態(tài)變?yōu)?STOP,會(huì)接收新任務(wù),會(huì)將隊(duì)列中的任務(wù)返回,并用 interrupt 的方式中斷正在執(zhí)行的任務(wù)。

只要調(diào)用了這兩個(gè)關(guān)閉方法中的任意一個(gè), isShutdown 方法就會(huì)返回 true。當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時(shí)調(diào)用 isTerminaed 方法會(huì)返回 true。至于應(yīng)該調(diào)用哪一種方法來關(guān)閉線程池,應(yīng)該由提交到線程池的任務(wù)特性決定,通常調(diào)用 shutdown 方法來關(guān)閉線程池,如果任務(wù)不一定要執(zhí)行完,則可以調(diào)用 shutdownNow 方法。

Executors 靜態(tài)工廠

Executors,提供了一系列靜態(tài)工廠方法用于創(chuàng)建各種類型的線程池,基于 ThreadPoolExecutor。

1.FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
       return new ThreadPoolExecutor(nThreads, nThreads,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue<Runnable>());
   }

特點(diǎn):核心線程數(shù)等于最大線程數(shù),因此也無(wú)需超時(shí)時(shí)間,執(zhí)行完立即回收,阻塞隊(duì)列是無(wú)界的,可以放任意數(shù)量的任務(wù)。

場(chǎng)景:適用于任務(wù)量已知,相對(duì)耗時(shí)的任務(wù)。

2.newCachedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
       return new ThreadPoolExecutor(nThreads, nThreads,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue<Runnable>());
   }

可根據(jù)需要?jiǎng)?chuàng)建新線程的線程池,如果現(xiàn)有線程沒有可用的,則創(chuàng)建一個(gè)新線程并添加到池中,如果有被使用完但是還沒銷毀的線程,就復(fù)用該線程。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。因此,長(zhǎng)時(shí)間保持空閑的線程池不會(huì)使用任何資源。這種線程池比較靈活, 對(duì)于執(zhí)行很多短期異步任務(wù)的程序而言,這些線程池通??商岣叱绦蛐阅?。

特點(diǎn):核心線程數(shù)是 0, 最大線程數(shù)是 Integer.MAX_VALUE ,全部都是空閑線程 60s 后回收。

場(chǎng)景:執(zhí)行大量、耗時(shí)少的任務(wù)。

3.newSingleThreadExecutor

 public static ExecutorService newSingleThreadExecutor() {
       return new FinalizableDelegatedExecutorService
           (new ThreadPoolExecutor(1, 1,
                                   0L, TimeUnit.MILLISECONDS,
                                   new LinkedBlockingQueue<Runnable>()));
   }

特點(diǎn):?jiǎn)尉€程線程池。希望多個(gè)任務(wù)排隊(duì)執(zhí)行,線程數(shù)固定為 1,任務(wù)數(shù)多于 1 時(shí),會(huì)放入無(wú)界隊(duì)列排隊(duì),任務(wù)執(zhí)行完畢,這唯一的線程也不會(huì)被釋放。

場(chǎng)景:區(qū)別于自己創(chuàng)建一個(gè)單線程串行執(zhí)行任務(wù),如果使用 new Thread 任務(wù)執(zhí)行失敗而終止那么沒有任何補(bǔ)救措施,而線程池還會(huì)新建一個(gè)線程,保證池的正常工作。

4.ScheduledThreadPool

 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
       return new ScheduledThreadPoolExecutor(corePoolSize);
   }

ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor。它主要用來在給定的延遲之后運(yùn)行任務(wù),或者定期執(zhí)行任務(wù)。ScheduledThreadPoolExecuto 的功能與 Timer 類似,但 ScheduledThreadPoolExecutor 功能更強(qiáng)大、更靈活。Timer 對(duì)應(yīng)的是單個(gè)后臺(tái)線程,而 ScheduledThreadPoolExecutor 可以在構(gòu)造函數(shù)中指定多個(gè)對(duì)應(yīng)的后臺(tái)線程數(shù)。

特點(diǎn):核心線程數(shù)量固定,非核心線程數(shù)量無(wú)限,執(zhí)行完閑置 10ms 后回收,任務(wù)隊(duì)列為延時(shí)阻塞隊(duì)列。

場(chǎng)景:執(zhí)行定時(shí)或周期性的任務(wù)。

合理地配置線程池

需要針對(duì)具體情況而具體處理,不同的任務(wù)類別應(yīng)采用不同規(guī)模的線程池,任務(wù)類別可劃分為 CPU 密集型任務(wù)、IO 密集型任務(wù)和混合型任務(wù)。

  • CPU 密集型任務(wù):線程池中線程個(gè)數(shù)應(yīng)盡量少,不應(yīng)大于 CPU 核心數(shù);
  • IO 密集型任務(wù):由于 IO 操作速度遠(yuǎn)低于 CPU 速度,那么在運(yùn)行這類任務(wù)時(shí),CPU 絕大多數(shù)時(shí)間處于空閑狀態(tài),那么線程池可以配置盡量多些的線程,以提高 CPU 利用率;
  • 混合型任務(wù):可以拆分為 CPU 密集型任務(wù)和 IO 密集型任務(wù),當(dāng)這兩類任務(wù)執(zhí)行時(shí)間相差無(wú)幾時(shí),通過拆分再執(zhí)行的吞吐率高于串行執(zhí)行的吞吐率,但若這兩類任務(wù)執(zhí)行時(shí)間有數(shù)據(jù)級(jí)的差距,那么沒有拆分的意義。

線程池的監(jiān)控

如果在系統(tǒng)中大量使用線程池,則有必要對(duì)線程池進(jìn)行監(jiān)控,方便在出現(xiàn)問題時(shí),可以根據(jù)線程池的使用狀況快速定位問題。利用線程池提供的參數(shù)進(jìn)行監(jiān)控,參數(shù)如下:

  • taskCount:線程池需要執(zhí)行的任務(wù)數(shù)量。
  • completedTaskCount:線程池在運(yùn)行過程中已完成的任務(wù)數(shù)量,小于或等于 taskCount。
  • largestPoolSize:線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量,通過這個(gè)數(shù)據(jù)可以知道線程池是否滿過。如等于線程池的最大大小,則表示線程池曾經(jīng)滿了。
  • getPoolSize:線程池的線程數(shù)量。如果線程池不銷毀的話,池里的線程不會(huì)自動(dòng)銷毀,所以這個(gè)大小只增不減。
  • getActiveCount:獲取活動(dòng)的線程數(shù)。

通過擴(kuò)展線程池進(jìn)行監(jiān)控:繼承線程池并重寫線程池的 beforeExecute() , afterExecute() 和 terminated() 方法,可以在任務(wù)執(zhí)行前、后和線程池關(guān)閉前自定義行為。如監(jiān)控任務(wù)的平均執(zhí)行時(shí)間,最大執(zhí)行時(shí)間和最小執(zhí)行時(shí)間等。

源碼分析

在使用線程池的時(shí)候,我其實(shí)有一些問題也隨之而來,比如線程池的線程怎么創(chuàng)建?任務(wù)怎么執(zhí)行?任務(wù)怎么分配?線程執(zhí)行完后怎么辦?是存活還是死亡?什么時(shí)候死亡?為什么要使用阻塞隊(duì)列等等問題。帶著這些問題,我們?nèi)プx讀源碼,讀源碼怎么入手?通過 ThreadPoolExecutor 的 execute() 方法。submit 底層也是調(diào)用了 execute() 。

execute

public void execute(Runnable command) {
    //如果沒有任務(wù)直接拋出異常
    if (command == null)
        throw new NullPointerException();
    //獲取當(dāng)前線程的狀態(tài)+線程個(gè)數(shù)
    int c = ctl.get();
    /**
     * workerCountOf,線程池當(dāng)前線程數(shù),并判斷是否小于核心線程數(shù)
     */
    if (workerCountOf(c) < corePoolSize) {//如果小于
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        // 這里是向任務(wù)隊(duì)列投放任務(wù)成功,對(duì)線程池的運(yùn)行中狀態(tài)做二次檢查
        // 如果線程池二次檢查狀態(tài)是非運(yùn)行中狀態(tài),則從任務(wù)隊(duì)列移除當(dāng)前的任務(wù)調(diào)用拒絕策略處理(也就是移除前面成功入隊(duì)的任務(wù)實(shí)例)
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        /* 走到下面的else if分支,說明有以下的前提:
         * 1、待執(zhí)行的任務(wù)已經(jīng)成功加入任務(wù)隊(duì)列
         * 2、線程池可能是RUNNING狀態(tài)
         * 3、傳入的任務(wù)可能從任務(wù)隊(duì)列中移除失敗(移除失敗的唯一可能就是任務(wù)已經(jīng)被執(zhí)行了)
         *
         * 如果當(dāng)前工作線程數(shù)量為0,則創(chuàng)建一個(gè)非核心線程并且傳入的任務(wù)對(duì)象為null - 返回
         * 也就是創(chuàng)建的非核心線程不會(huì)馬上運(yùn)行,而是等待獲取任務(wù)隊(duì)列的任務(wù)去執(zhí)行 
         * 如果前工作線程數(shù)量不為0,原來應(yīng)該是最后的else分支,但是可以什么也不做,
         * 因?yàn)槿蝿?wù)已經(jīng)成功入隊(duì)列,總會(huì)有合適的時(shí)機(jī)分配其他空閑線程去執(zhí)行它。
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    /* 走到這里說明有以下的前提:
     * 1、線程池中的工作線程總數(shù)已經(jīng)大于等于corePoolSize(簡(jiǎn)單來說就是核心線程已經(jīng)全部懶創(chuàng)建完畢)
     * 2、線程池可能不是RUNNING狀態(tài)
     * 3、線程池可能是RUNNING狀態(tài)同時(shí)任務(wù)隊(duì)列已經(jīng)滿了
     *
     * 如果向任務(wù)隊(duì)列投放任務(wù)失敗,則會(huì)嘗試創(chuàng)建非核心線程傳入任務(wù)執(zhí)行
     * 創(chuàng)建非核心線程失敗,此時(shí)需要拒絕執(zhí)行任務(wù)
     */
    else if (!addWorker(command, false))
        reject(command);
}

addWorker

第一個(gè) if 判斷線程池當(dāng)前線程數(shù)是否小于核心線程數(shù)。

if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
}

如果小于,則進(jìn)入 addWorker 方法:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    //外層循環(huán):判斷線程池狀態(tài)
    for (;;) {
        int c = ctl.get();
        //獲取線程池狀態(tài)
        int rs = runStateOf(c);
        // 檢查線程池的狀態(tài)是否存活.
        if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
            return false;
        //內(nèi)層循環(huán):線程池添加核心線程并返回是否添加成功的結(jié)果
        for (;;) {
            //線程數(shù)量
            int wc = workerCountOf(c);
            //線程數(shù)量超過容量,返回false
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //CAS增加線程數(shù)量,若成功跳出外層循環(huán)
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //否則失敗,并更新c
            c = ctl.get();  // Re-read ctl
            //如果這時(shí)的線程池狀態(tài)發(fā)生變化,重新對(duì)外層循環(huán)進(jìn)行自旋
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    //如果CAS成功了,則繼續(xù)往下走
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //創(chuàng)建一個(gè)Worker,這個(gè)Worker實(shí)現(xiàn)了Runable,把它看成一個(gè)任務(wù)單元
        w = new Worker(firstTask);
        //這個(gè)Thread就是當(dāng)前的任務(wù)單元Worker,即this
        final Thread t = w.thread;
        if (t != null) {
            //加鎖,因?yàn)榭赡苡卸鄠€(gè)線程來調(diào)用
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 再次檢查線程池的狀態(tài),避免在獲取鎖前調(diào)用shutdown方法
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                    //如果t線程已經(jīng)啟動(dòng)尚未終止,則拋出異常
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //否則,加入線程池
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //加入線程池后,啟動(dòng)該線程,上面已經(jīng)設(shè)置為true
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果線程啟動(dòng)失敗,則調(diào)用addWorkerFailed,回滾操作
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker

Worker 是 ThreadPoolExecutor 的內(nèi)部類,繼承了 AQS 并且實(shí)現(xiàn)了 Runnable。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    
    //構(gòu)造方法
    Worker(Runnable firstTask) {
        //在調(diào)用runWorker前禁止中斷
        //當(dāng)其它線程調(diào)用了線程池的 shutdownNow 時(shí)候,如果 worker 狀態(tài) >= 0 則會(huì)中斷該線程
        //具體方法在 interruptIfStarted() 中可以看到
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
  }
    
    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
    //省略其他代碼...
}

可以看到,在 Worker 的構(gòu)造方法可以知道,其中的 thread 屬性就是通過 this 去創(chuàng)建的,所以線程池核心線程的創(chuàng)建主要是 run 方法中的 runWorker 方法:

runWorker

runWorker 核心線程執(zhí)行邏輯。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 調(diào)用unlock()是為了讓外部可以中斷
    w.unlock(); // allow interrupts
    // 線程退出的原因,true是任務(wù)導(dǎo)致,false是線程正常退出
    boolean completedAbruptly = true;
    try {
        // 1. 如果firstTask不為null,則執(zhí)行firstTask
         // 2. 如果firstTask為null,則調(diào)用getTask()從隊(duì)列獲取任務(wù)
        // 3. 阻塞隊(duì)列的特性就是:當(dāng)隊(duì)列為空時(shí),當(dāng)前線程會(huì)被阻塞等待
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 判斷線程池的狀態(tài),如果線程池正在停止,則對(duì)當(dāng)前線程進(jìn)行中斷操作
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();//中斷
            try {
                //該方法里面沒有內(nèi)容,可以自己擴(kuò)展實(shí)現(xiàn),比如上面提到的線程池的監(jiān)控
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //執(zhí)行具體的任務(wù)
                    task.run();
                } catch (RuntimeException x) {//線程異常后操作
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //同 beforeExecute()
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;//help gc
                //統(tǒng)計(jì)當(dāng)前worker完成了多少個(gè)任務(wù)
                w.completedTasks++;
                //釋放鎖
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 處理線程退出,completedAbruptly為true說明由于任務(wù)異常導(dǎo)致線程非正常退出
        processWorkerExit(w, completedAbruptly);
    }
}

getTask

而對(duì)于其中的 getTask() 方法,任務(wù)隊(duì)列中的任務(wù)調(diào)度給空閑線程,該方法是非常重要的,為什么重要?其中就涉及到面試官常問的 線程池如何保證核心線程不會(huì)被銷毀,而空閑線程會(huì)被銷毀?

private Runnable getTask() {
    //判斷最新一次的poll是否超時(shí)
    //poll:取走BlockingQueue里排在首位的對(duì)象
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        /**
         * 條件1:線程池狀態(tài)SHUTDOWN、STOP、TERMINATED狀態(tài)
         * 條件2:線程池STOP、TERMINATED狀態(tài)或workQueue為空
         * 條件1與條件2同時(shí)為true,則workerCount-1,并且返回null
         * 注:條件2是考慮到SHUTDOWN狀態(tài)的線程池不會(huì)接受任務(wù),但仍會(huì)處理任務(wù)(前面也講到了)
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        /*
         * 該屬性的作用是判斷當(dāng)前線程是否允許超時(shí):
         * 1.allowCoreThreadTimeOut
         *   如果為 false(默認(rèn)),核心線程即使在空閑時(shí)也保持活動(dòng)狀態(tài)。
         *   如果為 true,則核心線程使用 keepAliveTime 超時(shí)等待工作。
         * 2.wc > corePoolSize
         *   當(dāng)前線程是否已經(jīng)超過核心線程數(shù)量。
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        /*
         * 判斷當(dāng)前線程是否可以退出:
         * 1.wc > maximumPoolSize || (timed && timedOut)
         *   wc > maximumPoolSize = true,說明當(dāng)前的工作線程總數(shù)大于線程池最大線程數(shù)。
         *   timed && timedOut = true,說明當(dāng)前線程允許超時(shí)并且已經(jīng)超時(shí)。
         * 2.wc > 1 || workQueue.isEmpty()
         *   工作線程總數(shù)大于1或者任務(wù)隊(duì)列為空,則通過CAS把線程數(shù)減去1,同時(shí)返回null
         */
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            /*
       * 1.poll(long timeout, TimeUnit unit):從BlockingQueue取出一個(gè)隊(duì)首的對(duì)象,
       * 如果在指定時(shí)間內(nèi),隊(duì)列一旦有數(shù)據(jù)可取,則立即返回隊(duì)列中的數(shù)據(jù)。否則直到時(shí)間超時(shí)還沒有數(shù)據(jù)可取,返回失敗。
       *
       * 2.take():取走BlockingQueue里排在首位的對(duì)象,若BlockingQueue為空,阻斷進(jìn)入等待狀態(tài)直到BlockingQueue有新的數(shù)據(jù)被加入。     
       *
       *
       * 如果timed為true,通過poll()方法做超時(shí)拉取,keepAliveTime時(shí)間內(nèi)沒有等待到有效的任務(wù),則返回null。
       *
             * 如果timed為false,通過take()做阻塞拉取,會(huì)阻塞到有下一個(gè)有效的任務(wù)時(shí)候再返回(一般不會(huì)是null)。
       */
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            //通過poll()方法從任務(wù)隊(duì)列中拉取任務(wù)為null
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

① 對(duì)于 getTask() 下面的這段代碼,這段邏輯大多數(shù)情況下是針對(duì)非核心線程:

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
    if (compareAndDecrementWorkerCount(c))
        return null;
    continue;
}

② 我們這樣來閱讀這段代碼,當(dāng)工作線程數(shù)大于核心線程 corePoolSize ,此時(shí)進(jìn)入 execute() 方法中的第二個(gè) if 語(yǔ)句:

if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}

此時(shí)線程池總數(shù)已經(jīng)超過了 corePoolSize 但小于 maximumPoolSize ,當(dāng)任務(wù)隊(duì)列已經(jīng)滿了的時(shí)候,會(huì)通過 addWorker(task,false) 添加非核心線程。

而在高并發(fā)的情況下,肯定會(huì)產(chǎn)生多余的線程,也就是出現(xiàn) ① 中的情況 wc > maximumPoolSize ,而這些多余的線程怎么辦,是不是會(huì)被回收?如果 workQueue.poll 沒有獲取到有效的任務(wù),那么①中的邏輯剛好與 addWorker(task,false) 相反,通過 CAS 減少非核心線程,使得工作線程總數(shù)趨向于 corePoolSize 。

如果對(duì)于非核心線程,上一輪循環(huán)獲取任務(wù)對(duì)象為 null ,在默認(rèn)情況下 allowCoreThreadTimeOut = false ,因此, getTask() 中 timed = true ,如果沒有獲取到任務(wù),此時(shí) timedOut = true ,這一輪循環(huán)很容易滿足 timed && timedOut 為 true,這個(gè)時(shí)候 getTask() 返回 null 會(huì)導(dǎo)致 Worker#runWorker() 方法跳出死循環(huán),之后執(zhí)行 processWorkerExit() 方法處理后續(xù)工作,而該非核心線程對(duì)應(yīng)的 Worker 則變成 游離對(duì)象 ,等待被 JVM 回收。

當(dāng) allowCoreThreadTimeOut 設(shè)置為 true 的時(shí)候,這里分析的非核心線程的生命周期終結(jié)邏輯同時(shí)會(huì)適用于核心線程。

由此推出一個(gè)面試題: 線程池有多個(gè)線程同時(shí)沒取到任務(wù),會(huì)全部回收嗎?

舉個(gè)例子:線程池核心線程數(shù)是 5,最大線程數(shù)為 5,當(dāng)前工作線程數(shù)為 6(6>5,意味著當(dāng)前可以觸發(fā)線程回收),如果此時(shí)有 3 個(gè)線程同時(shí)超時(shí)沒有獲取到任務(wù),這 3 個(gè)線程會(huì)都被回收銷毀嗎?

思路:這道題的核心點(diǎn)在于有多個(gè)線程同時(shí)超時(shí)獲取不到任務(wù)。正常情況下,此時(shí)會(huì)觸發(fā)線程回收的流程。但是我們知道,正常不設(shè)置 allowCoreThreadTimeOut 變量時(shí),線程池即使沒有任務(wù)處理,也會(huì)保持核心線程數(shù)的線程。如果這邊 3 個(gè)線程被全部回收,那此時(shí)線程數(shù)就變成了 3 個(gè),不符合核心線程數(shù) 5 個(gè),所以這邊我們可以首先得出答案:不會(huì)被全部回收。這個(gè)時(shí)候面試官肯定會(huì)問為什么?

根據(jù)答案不難推測(cè),為了防止本題的這種并發(fā)回收問題的出現(xiàn),線程回收的流程必然會(huì)有并發(fā)控制。compareAndDecrementWorkerCount(c) 用的是 CAS 方法,如果 CAS 失敗就 continue,進(jìn)入下一輪循環(huán),重新判斷。

像上述例子,其中一條線程會(huì) CAS 失敗,然后重新進(jìn)入循環(huán),發(fā)現(xiàn)工作線程數(shù)已經(jīng)只有 5 了, timed = false , 這條線程就不會(huì)被銷毀,可以一直阻塞了,此時(shí)就會(huì)調(diào)用 workQueue.take() 阻塞等待下一次的任務(wù),也就是說核心線程并不會(huì)死亡。

從這里也可以看出,雖然有核心線程數(shù),但線程并沒有區(qū)分是核心還是非核心,并不是先創(chuàng)建的就是核心,超過核心線程數(shù)后創(chuàng)建的就是非核心,最終保留哪些線程,完全隨機(jī)。

然后可以回答出前面的問題,線程池如何保證核心線程不會(huì)被銷毀,而空閑線程會(huì)被銷毀?

核心線程是因?yàn)檎{(diào)用了阻塞方法而不會(huì)被銷毀,空閑線程調(diào)用了超時(shí)方法在下次執(zhí)行時(shí)獲取不到任務(wù)而死亡。

這樣回答其實(shí)是可以的,但是這可能顯示出你是背得八股文,所以你應(yīng)該回答核心線程不僅僅是因?yàn)檎{(diào)用了阻塞方法而不會(huì)被銷毀,同時(shí)利用了 CAS 來保證。

還可以得出 getTask() 返回 null 的情況 :

1.線程池的狀態(tài)已經(jīng)為 STOP,TIDYING, TERMINATED,或者是 SHUTDOWN 且工作隊(duì)列為空。

2.工作線程數(shù)大于最大線程數(shù)或當(dāng)前工作線程已超時(shí),且,其存在工作線程或任務(wù)隊(duì)列為空。

runWorker 的流程:

processWorkerExit

在 runWorker 的 finally 塊中,當(dāng)任務(wù)執(zhí)行之后,要對(duì)其做處理,作線程在執(zhí)行完 processWorkerExit() 方法才算真正的終結(jié),該方法如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 因?yàn)閽伋鲇脩舢惓?dǎo)致線程終結(jié),直接使工作線程數(shù)減1即可
    // 如果沒有任何異常拋出的情況下是通過getTask()返回null引導(dǎo)線程正常跳出runWorker()方法的while死循環(huán)從而正常終結(jié),這種情況下,在getTask()中已經(jīng)把線程數(shù)減1
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 全局的已完成任務(wù)記錄數(shù)加上此將要終結(jié)的Worker中的已完成任務(wù)數(shù)
        completedTaskCount += w.completedTasks;
        // 工作線程集合中移除此將要終結(jié)的Worker
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
     
    // 見下一小節(jié)分析,用于根據(jù)當(dāng)前線程池的狀態(tài)判斷是否需要進(jìn)行線程池terminate處理
    tryTerminate();

    int c = ctl.get();
    // 如果線程池的狀態(tài)小于STOP,也就是處于RUNNING或者SHUTDOWN狀態(tài)的前提下:
    // 1.如果線程不是由于拋出用戶異常終結(jié),如果允許核心線程超時(shí),則保持線程池中至少存在一個(gè)工作線程
    // 2.如果線程由于拋出用戶異常終結(jié),或者當(dāng)前工作線程數(shù),那么直接添加一個(gè)新的非核心線程
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            // 如果允許核心線程超時(shí),最小值為0,否則為corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果最小值為0,同時(shí)任務(wù)隊(duì)列不空,則更新最小值為1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 工作線程數(shù)大于等于最小值,直接返回不新增非核心線程
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

代碼的后面部分區(qū)域,會(huì)判斷線程池的狀態(tài),如果線程池是 RUNNING 或者 SHUTDOWN 狀態(tài)的前提下,如果當(dāng)前的工作線程由于拋出異常被終結(jié),那么會(huì)新創(chuàng)建一個(gè)非核心線程。如果當(dāng)前的工作線程并不是拋出用戶異常被終結(jié)(正常情況下的終結(jié)),那么會(huì)這樣處理:

allowCoreThreadTimeOut 為 true,也就是允許核心線程超時(shí)的前提下,如果任務(wù)隊(duì)列空,則會(huì)通過創(chuàng)建一個(gè)非核心線程保持線程池中至少有一個(gè)工作線程。

allowCoreThreadTimeOut 為 false,如果工作線程總數(shù)大于 corePoolSize 則直接返回,否則創(chuàng)建一個(gè)非核心線程,也就是會(huì)趨向于保持線程池中的工作線程數(shù)量趨向于 corePoolSize 。

processWorkerExit() 執(zhí)行完畢之后,意味著該工作線程的生命周期已經(jīng)完結(jié)。

面試題

1、線程池的線程怎么創(chuàng)建?任務(wù)怎么執(zhí)行?

主要在于 Worker 類以及 Worker#runWorker() 方法。

2、任務(wù)怎么分配?

參考 getTask() 方法。

3、線程池如何保證核心線程不會(huì)被銷毀,而空閑線程會(huì)被銷毀?

參考上文。

4、線程池有多個(gè)線程同時(shí)沒取到任務(wù),會(huì)全部回收嗎?

參考上文。

5、為什么要使用阻塞隊(duì)列?

線程池是采用生產(chǎn)者-消費(fèi)者模式設(shè)計(jì)的。線程池為消費(fèi)者。

在線程池中活躍線程數(shù)達(dá)到 corePoolSize 時(shí),線程池將會(huì)將后續(xù)的任務(wù)提交到 BlockingQueue 中, (每個(gè) task 都是單獨(dú)的生產(chǎn)者線程)進(jìn)入到堵塞對(duì)列中的 task 線程會(huì) wait() 從而釋放 cpu,從而提高 cpu 利用率。?

以上就是深入了解Java線程池:從設(shè)計(jì)思想到源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于Java線程池的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • java ConcurrentHashMap鎖分段技術(shù)及原理詳解

    java ConcurrentHashMap鎖分段技術(shù)及原理詳解

    這篇文章主要介紹了java ConcurrentHashMap鎖分段技術(shù)詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-07-07
  • Java模擬實(shí)現(xiàn)斗地主的洗牌和發(fā)牌

    Java模擬實(shí)現(xiàn)斗地主的洗牌和發(fā)牌

    這篇文章主要為大家詳細(xì)介紹了Java模擬實(shí)現(xiàn)斗地主的洗牌和發(fā)牌,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-04-04
  • 關(guān)于Java中的CAS如何使用

    關(guān)于Java中的CAS如何使用

    這篇文章主要介紹了關(guān)于Java中的CAS如何使用,CAS是Compare And Swap(比較并交換)的縮寫,是一種非阻塞式并發(fā)控制技術(shù),用于保證多個(gè)線程在修改同一個(gè)共享資源時(shí)不會(huì)出現(xiàn)競(jìng)爭(zhēng)條件,從而避免了傳統(tǒng)鎖機(jī)制的各種問題,需要的朋友可以參考下
    2023-09-09
  • Java生成二維碼的實(shí)現(xiàn)方式匯總

    Java生成二維碼的實(shí)現(xiàn)方式匯總

    本文將基于Spring Boot介紹兩種生成二維碼的實(shí)現(xiàn)方式,一種是基于Google開發(fā)工具包,另一種是基于Hutool來實(shí)現(xiàn),下面我們將基于Spring Boot,并采用兩種方式實(shí)現(xiàn)二維碼的生成,對(duì)于每一種方式還提供兩種類型的二維碼返回形式,需要的朋友可以參考下
    2023-09-09
  • 使用jvisualvm配合Visual GC插件監(jiān)控Java程序詳細(xì)總結(jié)

    使用jvisualvm配合Visual GC插件監(jiān)控Java程序詳細(xì)總結(jié)

    本節(jié)將會(huì)介紹一下jvisualvm的特性及作用、各個(gè)功能是如何使用的、最后會(huì)介紹jvisualvm的插件Visual GC的安裝及使用
    2021-09-09
  • 老生常談Java字符串進(jìn)階(必看篇)

    老生常談Java字符串進(jìn)階(必看篇)

    下面小編就為大家?guī)硪黄仙U凧ava字符串進(jìn)階(必看篇)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-06-06
  • Mybatis select記錄封裝的實(shí)現(xiàn)

    Mybatis select記錄封裝的實(shí)現(xiàn)

    這篇文章主要介紹了Mybatis select記錄封裝的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-10-10
  • Java函數(shù)式編程(十二):監(jiān)控文件修改

    Java函數(shù)式編程(十二):監(jiān)控文件修改

    這篇文章主要介紹了Java函數(shù)式編程(十二):監(jiān)控文件修改,本文是系列文章的第12篇,其它文章請(qǐng)參閱本文底部的相關(guān)文章,需要的朋友可以參考下
    2014-09-09
  • Java多線程volatile原理及用法解析

    Java多線程volatile原理及用法解析

    這篇文章主要介紹了Java多線程volatile原理及用法解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-07-07
  • Java BufferWriter寫文件寫不進(jìn)去或缺失數(shù)據(jù)的解決

    Java BufferWriter寫文件寫不進(jìn)去或缺失數(shù)據(jù)的解決

    這篇文章主要介紹了Java BufferWriter寫文件寫不進(jìn)去或缺失數(shù)據(jù)的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-07-07

最新評(píng)論