java 線程池封裝及拒絕策略示例詳解
前文
提到線程的使用以及線程間通信方式,通常情況下我們通過(guò)new Thread或者new Runnable創(chuàng)建線程,這種情況下,需要開(kāi)發(fā)者手動(dòng)管理線程的創(chuàng)建和回收,線程對(duì)象沒(méi)有復(fù)用,大量的線程對(duì)象創(chuàng)建與銷(xiāo)毀會(huì)引起頻繁GC,那么事否有機(jī)制自動(dòng)進(jìn)行線程的創(chuàng)建,管理和回收呢?線程池可以實(shí)現(xiàn)該能力。
線程池的優(yōu)點(diǎn):
- 線程池中線程重用,避免線程創(chuàng)建和銷(xiāo)毀帶來(lái)的性能開(kāi)銷(xiāo)
- 能有效控制線程數(shù)量,避免大量線程搶占資源造成阻塞
- 對(duì)線程進(jìn)行簡(jiǎn)單管理,提供定時(shí)執(zhí)行預(yù)計(jì)指定間隔執(zhí)行等策略
線程池的封裝實(shí)現(xiàn)
在java.util.concurrent包中提供了一系列的工具類(lèi)以方便開(kāi)發(fā)者創(chuàng)建和使用線程池,這些類(lèi)的繼承關(guān)系及說(shuō)明如下:
類(lèi)名 | 說(shuō)明 | 備注 |
---|---|---|
Executor | Executor接口提供了一種任務(wù)提交后的執(zhí)行機(jī)制,包括線程的創(chuàng)建與運(yùn)行,線程調(diào)度等,通常不直接使用該類(lèi) | / |
ExecutorService | ExecutorService接口,提供了創(chuàng)建,管理,終止Future執(zhí)行的方法,用于跟蹤一個(gè)或多個(gè)異步任務(wù)的進(jìn)度,通常不直接使用該類(lèi) | / |
ScheduledExecutorService | ExecutorService的實(shí)現(xiàn)接口,提供延時(shí),周期性執(zhí)行Future的能力,同時(shí)具備ExecutorService的基礎(chǔ)能力,通常不直接使用該類(lèi) | / |
AbstractExecutorService | AbstractExecutorService是個(gè)虛類(lèi),對(duì)ExecutorService中方法進(jìn)行了默認(rèn)實(shí)現(xiàn),其提供了newTaskFor函數(shù),用于獲取RunnableFuture對(duì)象,該對(duì)象實(shí)現(xiàn)了submit,invokeAny和invokeAll方法,通常不直接使用該類(lèi) | / |
ThreadPoolExecutor | 通過(guò)創(chuàng)建該類(lèi)對(duì)象就可以構(gòu)建一個(gè)線程池,通過(guò)調(diào)用execute方法可以向該線程池提交任務(wù)。通常情況下,開(kāi)發(fā)者通過(guò)自定義參數(shù),構(gòu)造該類(lèi)對(duì)象就來(lái)獲得一個(gè)符合業(yè)務(wù)需求的線程池 | / |
ScheduledThreadPoolExecutor | 通過(guò)創(chuàng)建該類(lèi)對(duì)象就可以構(gòu)建一個(gè)可以周期性執(zhí)行任務(wù)的線程池,通過(guò)調(diào)用schedule,scheduleWithFixedDelay等方法可以向該線程池提交任務(wù)并在指定時(shí)間節(jié)點(diǎn)運(yùn)行。通常情況下,開(kāi)發(fā)者通過(guò)構(gòu)造該類(lèi)對(duì)象就來(lái)獲得一個(gè)符合業(yè)務(wù)需求的可周期性執(zhí)行任務(wù)的線程池 | / |
由上表可知,對(duì)于開(kāi)發(fā)者而言,通常情況下我們可以通過(guò)構(gòu)造ThreadPoolExecutor對(duì)象來(lái)獲取一個(gè)線程池對(duì)象,通過(guò)其定義的execute方法來(lái)向該線程池提交任務(wù)并執(zhí)行,那么怎么創(chuàng)建線程池呢?讓我們一起看下
ThreadPoolExecutor
ThreadPoolExecutor完整參數(shù)的構(gòu)造函數(shù)如下所示:
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
從上述代碼可以看出,在構(gòu)建ThreadPoolExecutor時(shí),主要涉及以下參數(shù):
- corePoolSize:核心線程個(gè)數(shù),一般情況下可以使用 處理器個(gè)數(shù)/2 作為核心線程數(shù)的取值,可以通過(guò)Runtime.getRuntime().availableProcessors()來(lái)獲取處理器個(gè)數(shù)
- maximumPoolSize:最大線程個(gè)數(shù),該線程池支持同時(shí)存在的最大線程數(shù)量
- keepAliveTime:非核心線程閑置時(shí)的超時(shí)時(shí)長(zhǎng),超過(guò)這個(gè)時(shí)長(zhǎng),非核心線程就會(huì)被回收,我們也可以通過(guò)allowCoreThreadTimeOut(true)來(lái)設(shè)置核心線程閑置時(shí),在超時(shí)時(shí)間到達(dá)后回收
- unit:keepAliveTime的時(shí)間單位
- workQueue:線程池中的任務(wù)隊(duì)列,當(dāng)核心線程數(shù)滿(mǎn)或最大線程數(shù)滿(mǎn)時(shí),通過(guò)線程池的execute方法提交的Runnable對(duì)象存儲(chǔ)在這個(gè)參數(shù)中,遵循先進(jìn)先出原則
- threadFactory:創(chuàng)建線程的工廠 ,用于批量創(chuàng)建線程,統(tǒng)一在創(chuàng)建線程時(shí)進(jìn)行一些初始化設(shè)置,如是否守護(hù)線程、線程的優(yōu)先級(jí)等。不指定時(shí),默認(rèn)使用Executors.defaultThreadFactory() 來(lái)創(chuàng)建線程,線程具有相同的NORM_PRIORITY優(yōu)先級(jí)并且是非守護(hù)線程
- handler:任務(wù)拒絕處理策略,當(dāng)線程數(shù)量等于最大線程數(shù)且等待隊(duì)列已滿(mǎn)時(shí),就會(huì)采用拒絕處理策略處理新提交的任務(wù),不指定時(shí),默認(rèn)的處理策略是AbortPolicy,即拋棄該任務(wù)
綜上,我們可以看出創(chuàng)建一個(gè)線程池最少需要明確核心線程數(shù),最大線程數(shù),超時(shí)時(shí)間及單位,等待隊(duì)列這五個(gè)參數(shù),下面我們創(chuàng)建一個(gè)核心線程數(shù)為1,最大線程數(shù)為3,5s超時(shí)回收,等待隊(duì)列最多能存放5個(gè)任務(wù)的線程池,代碼如下:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1,3,5,TimeUnit.SECONDS,new LinkedBlockingQueue<>(5));
隨后我們使用for循環(huán)向該executor中提交任務(wù),代碼如下:
public static void main(String[] args) { // 創(chuàng)建線程池 ThreadPoolExecutor executor = new ThreadPoolExecutor(1,3,5,TimeUnit.SECONDS,new LinkedBlockingQueue<>(5)); for (int i=0;i<10;i++) { int finalI = i; System.out.println("put runnable "+ finalI +"to executor"); // 向線程池提交任務(wù) executor.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+",runnable "+ finalI +"start"); try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName()+",runnable "+ finalI +"executed"); } }); } }
輸出如下:
從輸出可以看到,當(dāng)提交一個(gè)任務(wù)到線程池時(shí),其執(zhí)行流程如下:
線程池拒絕策略
線程池拒絕策略有四類(lèi),定義在ThreadPoolExecutor中,分別是:
- AbortPolicy:默認(rèn)拒絕策略,丟棄提交的任務(wù)并拋出RejectedExecutionException,在該異常輸出信息中,可以看到當(dāng)前線程池狀態(tài)
- DiscardPolicy:丟棄新來(lái)的任務(wù),但是不拋出異常
- DiscardOldestPolicy:丟棄隊(duì)列頭部的舊任務(wù),然后嘗試重新執(zhí)行,如果再次失敗,重復(fù)該過(guò)程
- CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
當(dāng)然,如果上述拒絕策略不能滿(mǎn)足需求,我們也可以自定義異常,實(shí)現(xiàn)RejectedExecutionHandler接口,即可創(chuàng)建自己的線程池拒絕策略,下面是使用自定義拒絕策略的示例代碼:
public static void main(String[] args) { RejectedExecutionHandler handler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("runnable " + r +" in executor "+executor+" is refused"); } }; ThreadPoolExecutor executor = new ThreadPoolExecutor(1,3,5,TimeUnit.SECONDS,new LinkedBlockingQueue<>(5),handler); for (int i=0;i<10;i++) { int finalI = i; Runnable runnable = new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+",runnable "+ finalI +"start"); try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName()+",runnable "+ finalI +"executed"); } }; System.out.println("put runnable "+ runnable+" index:"+finalI +" to executor:"+executor); executor.execute(runnable); } }
輸出如下:
任務(wù)隊(duì)列
對(duì)于線程池而言,任務(wù)隊(duì)列需要是BlockingQueue的實(shí)現(xiàn)類(lèi),BlockingQueue接口的實(shí)現(xiàn)類(lèi)類(lèi)圖如下:
下面我們針對(duì)常用隊(duì)列做簡(jiǎn)單了解:
ArrayBlockingQueue:ArrayBlockingQueue是基于數(shù)組的阻塞隊(duì)列,在其內(nèi)部維護(hù)一個(gè)定長(zhǎng)數(shù)組,所以使用ArrayBlockingQueue時(shí)必須指定任務(wù)隊(duì)列長(zhǎng)度,因?yàn)椴徽搶?duì)數(shù)據(jù)的寫(xiě)入或者讀取都使用的是同一個(gè)鎖對(duì)象,所以沒(méi)有實(shí)現(xiàn)讀寫(xiě)分離,同時(shí)在創(chuàng)建時(shí)我們可以指定鎖內(nèi)部是否采用公平鎖,默認(rèn)實(shí)現(xiàn)是非公平鎖。
非公平鎖與公平鎖
公平鎖:多個(gè)任務(wù)阻塞在同一鎖時(shí),等待時(shí)長(zhǎng)長(zhǎng)的優(yōu)先獲取鎖
非公平鎖:多個(gè)任務(wù)阻塞在同一鎖時(shí),鎖可獲取時(shí),一起搶鎖,誰(shuí)先搶到誰(shuí)先執(zhí)行
LinkedBlockingQueue:LinkedBlockingQueue是基于鏈表的阻塞隊(duì)列,在創(chuàng)建時(shí)可不指定任務(wù)隊(duì)列長(zhǎng)度,默認(rèn)值是Integer.MAX_VALUE,在LinkedBlockingQueue中讀鎖和寫(xiě)鎖實(shí)現(xiàn)了分支,相對(duì)ArrayBlockingQueue而言,效率提升明顯。
SynchronousQueue:SynchronousQueue是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,也就是說(shuō)當(dāng)需要插入元素時(shí),必須等待上一個(gè)元素被移出,否則不能插入,其適用于任務(wù)多但是執(zhí)行比較快的場(chǎng)景。
PriorityBlockingQueue:PriorityBlockingQueue是一個(gè)支持指定優(yōu)先即的阻塞隊(duì)列,默認(rèn)初始化長(zhǎng)度為11,最大長(zhǎng)度為Integer.MAX_VALUE - 8,可以通過(guò)讓裝入隊(duì)列的對(duì)象實(shí)現(xiàn)Comparable接口,定義對(duì)象排序規(guī)則來(lái)指定隊(duì)列中元素優(yōu)先級(jí),優(yōu)先級(jí)高的元素會(huì)被優(yōu)先取出。
DelayQueue:DelayQueue是一個(gè)帶有延遲時(shí)間的阻塞隊(duì)列,隊(duì)列中的元素,只有等待延時(shí)時(shí)間到了才可以被取出,由于其內(nèi)部用PriorityBlockingQueue維護(hù)數(shù)據(jù),故其長(zhǎng)度與PriorityBlockingQueue一致。一般用于定時(shí)調(diào)度類(lèi)任務(wù)。
下表從一些角度對(duì)上述隊(duì)列進(jìn)行了比較:
隊(duì)列名稱(chēng) | 底層數(shù)據(jù)結(jié)構(gòu) | 默認(rèn)長(zhǎng)度 | 最大長(zhǎng)度 | 是否讀寫(xiě)分離 | 適用場(chǎng)景 |
---|---|---|---|---|---|
ArrayBlockingQueue | 數(shù)組 | 0 | 開(kāi)發(fā)者指定大小 | 否 | 任務(wù)數(shù)量較少時(shí)使用 |
LinkedBlockingQueue | 鏈表 | Integer.MAX_VALUE | Integer.MAX_VALUE | 是 | 大量任務(wù)時(shí)使用 |
SynchronousQueue | 公平鎖-隊(duì)列/非公平鎖-棧 | 0 | / | 否 | 任務(wù)多但是執(zhí)行速度快的場(chǎng)景 |
PriorityBlockingQueue | 對(duì)象數(shù)組 | 11 | Integer.MAX_VALUE-8 | 否 | 有任務(wù)需要優(yōu)先處理的場(chǎng)景 |
DelayQueue | 對(duì)象數(shù)組 | 11 | Integer.MAX_VALUE-8 | 否 | 定時(shí)調(diào)度類(lèi)場(chǎng)景 |
以上就是java 線程池封裝及拒絕策略示例詳解的詳細(xì)內(nèi)容,更多關(guān)于java 線程池封裝拒絕策略的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
介紹Java的大數(shù)類(lèi)(BigDecimal)和八種舍入模式
在實(shí)際應(yīng)用中,需要對(duì)更大或者更小的數(shù)進(jìn)行運(yùn)算和處理。Java在java.math包中提供的API類(lèi)BigDecimal,用來(lái)對(duì)超過(guò)16位有效位的數(shù)進(jìn)行精確的運(yùn)算。本文將介紹Java中的大數(shù)類(lèi)BigDecimal及其八種舍入模式,有需要的可以參考借鑒。2016-08-08springcloud feign調(diào)其他微服務(wù)時(shí)參數(shù)是對(duì)象的問(wèn)題
這篇文章主要介紹了springcloud feign調(diào)其他微服務(wù)時(shí)參數(shù)是對(duì)象的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03Java System類(lèi)兩個(gè)常用方法代碼實(shí)例
這篇文章主要介紹了Java System類(lèi)兩個(gè)常用方法代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02使用SpringBoot 工廠模式自動(dòng)注入到Map
這篇文章主要介紹了使用SpringBoot 工廠模式自動(dòng)注入到Map,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09Spring Boot配置特定屬性spring.profiles的方法
這篇文章主要介紹了Spring Boot配置特定屬性spring.profiles的方法,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2018-11-11SpringBoot使用mybatis-plus分頁(yè)查詢(xún)無(wú)效的問(wèn)題解決
MyBatis-Plus提供了很多便捷的功能,包括分頁(yè)查詢(xún),本文主要介紹了SpringBoot使用mybatis-plus分頁(yè)查詢(xún)無(wú)效的問(wèn)題解決,具有一定的參考價(jià)值,感興趣的可以了解一下2023-12-12Spring中@Controller和@RestController的區(qū)別詳解
這篇文章主要介紹了Spring中@Controller和@RestController的區(qū)別詳解,@RestController?是?@Controller?和?@ResponseBody?的結(jié)合體,單獨(dú)使用?@RestController?的效果與?@Controller?和?@ResponseBody?二者同時(shí)使用的效果相同,需要的朋友可以參考下2023-10-10