Java中的線程池ThreadPoolExecutor深入解析
1.什么是線程池?
線程池,thread pool,是一種線程使用模式,線程池維護(hù)著多個(gè)線程,等待著監(jiān)督管理者分配可并發(fā)執(zhí)行的任務(wù)。
通俗來說,就是可管理和維護(hù)以及分配線程的“池子”。
2.為什么使用線程池?
為了減少創(chuàng)建和銷毀線程的次數(shù),讓每個(gè)線程都可以多次的使用,可以根據(jù)系統(tǒng)情況調(diào)整線程的數(shù)量,防止消耗過多內(nèi)存。在實(shí)際使用中,服務(wù)器在創(chuàng)建和銷毀線程上花費(fèi)的時(shí)間和消耗的系統(tǒng)資源都相當(dāng)大,使用線程池就可以優(yōu)化。 線程池主要用來解決線程生命周期開銷問題和資源不足問題。通過對(duì)多個(gè)任務(wù)重復(fù)使用線程,線程創(chuàng)建的開銷就被分?jǐn)偟搅硕鄠€(gè)任務(wù)上了,而且由于在請(qǐng)求到達(dá)時(shí)線程已經(jīng)存在,所以消除了線程創(chuàng)建所帶來的延遲。這樣,就可以立即為請(qǐng)求服務(wù),使用應(yīng)用程序響應(yīng)更快。另外,通過適當(dāng)?shù)恼{(diào)整線程中的線程數(shù)目可以防止出現(xiàn)資源不足的情況。
通俗來說,就是為了優(yōu)化線程的內(nèi)存開銷。
3.線程池的核心參數(shù)
public ThreadPoolExecutor(int corePoolSize,//核心線程數(shù) int maximumPoolSize,//最大線程數(shù) long keepAliveTime,//線程空閑時(shí)間 TimeUnit unit,//時(shí)間單位 BlockingQueue<Runnable> workQueue,//任務(wù)隊(duì)列 ThreadFactory threadFactory,//線程工廠 RejectedExecutionHandler handler//拒絕策略) { ... }
4.線程池的執(zhí)行順序
線程池按以下行為執(zhí)行任務(wù)
- 當(dāng)線程數(shù)小于核心線程數(shù)時(shí),創(chuàng)建線程。
- 當(dāng)線程數(shù)大于等于核心線程數(shù),且任務(wù)隊(duì)列未滿時(shí),將任務(wù)放入任務(wù)隊(duì)列。
- 當(dāng)線程數(shù)大于等于核心線程數(shù),且任務(wù)隊(duì)列已滿,若線程數(shù)小于最大線程數(shù),創(chuàng)建線程。
- 若線程數(shù)等于最大線程數(shù),則執(zhí)行拒絕策略
5.線程池的參數(shù)詳解
(a)、corePoolSize
核心線程數(shù),默認(rèn)為1。
設(shè)置規(guī)則:
CPU密集型(CPU密集型也叫計(jì)算密集型,指的是運(yùn)算較多,cpu占用高,讀/寫I/O(硬盤/內(nèi)存)較少):corePoolSize = CPU核數(shù) + 1IO密集型(與cpu密集型相反,系統(tǒng)運(yùn)作,大部分的狀況是CPU在等I/O (硬盤/內(nèi)存) 的讀/寫操作,此時(shí)CPU Loading并不高。):corePoolSize = CPU核數(shù) * 2
(b)、maximumPoolSize最大線程數(shù),默認(rèn)為Integer.MAX_VALUE一般設(shè)置為和核心線程數(shù)一樣
(c)、keepAliveTime線程空閑時(shí)間,默認(rèn)為60s,一般設(shè)置為默認(rèn)60s
(d)、unit時(shí)間單位,默認(rèn)為秒
(e)、workQueue隊(duì)列,當(dāng)線程數(shù)目超過核心線程數(shù)時(shí)用于保存任務(wù)的隊(duì)列。(BlockingQueue workQueue)此隊(duì)列僅保存實(shí)現(xiàn)Runnable接口的任務(wù)。(因?yàn)榫€程池的底層BlockingQueue的泛型為Runnable)
(1)無界隊(duì)列
隊(duì)列大小無限制,常用的為無界的LinkedBlockingQueue,使用該隊(duì)列作為阻塞隊(duì)列時(shí)要尤其當(dāng)心,當(dāng)任務(wù)耗時(shí)較長時(shí)可能會(huì)導(dǎo)致大量新任務(wù)在隊(duì)列中堆積最終導(dǎo)致OOM。閱讀代碼發(fā)現(xiàn),Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue,而博主踩到的就是這個(gè)坑,當(dāng)QPS很高,發(fā)送數(shù)據(jù)很大,大量的任務(wù)被添加到這個(gè)無界LinkedBlockingQueue 中,導(dǎo)致cpu和內(nèi)存飆升服務(wù)器掛掉。
當(dāng)然這種隊(duì)列,maximumPoolSize 的值也就無效了。當(dāng)每個(gè)任務(wù)完全獨(dú)立于其他任務(wù),即任務(wù)執(zhí)行互不影響時(shí),適合于使用無界隊(duì)列;例如,在 Web 頁服務(wù)器中。這種排隊(duì)可用于處理瞬態(tài)突發(fā)請(qǐng)求,當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí),此策略允許無界線程具有增長的可能性。
(2)有界隊(duì)列
當(dāng)使用有限的 maximumPoolSizes 時(shí),有界隊(duì)列有助于防止資源耗盡,但是可能較難調(diào)整和控制。常用的有兩類,一類是遵循FIFO原則的隊(duì)列如ArrayBlockingQueue,另一類是優(yōu)先級(jí)隊(duì)列如PriorityBlockingQueue。PriorityBlockingQueue中的優(yōu)先級(jí)由任務(wù)的Comparator決定。
使用有界隊(duì)列時(shí)隊(duì)列大小需和線程池大小互相配合,線程池較小有界隊(duì)列較大時(shí)可減少內(nèi)存消耗,降低cpu使用率和上下文切換,但是可能會(huì)限制系統(tǒng)吞吐量。
(3)同步移交隊(duì)列
如果不希望任務(wù)在隊(duì)列中等待而是希望將任務(wù)直接移交給工作線程,可使用SynchronousQueue作為等待隊(duì)列。SynchronousQueue不是一個(gè)真正的隊(duì)列,而是一種線程之間移交的機(jī)制。要將一個(gè)元素放入SynchronousQueue中,必須有另一個(gè)線程正在等待接收這個(gè)元素。只有在使用無界線程池或者有飽和策略時(shí)才建議使用該隊(duì)列。
(f)、threadFactory
線程工廠,用來創(chuàng)建線程。
為了統(tǒng)一在創(chuàng)建線程時(shí)設(shè)置一些參數(shù),如是否守護(hù)線程,線程一些特性等,如優(yōu)先級(jí)。通過這個(gè)TreadFactory創(chuàng)建出來的線程能保證有相同的特性。
它是一個(gè)接口類,而且方法只有一個(gè),就是創(chuàng)建一個(gè)線程。
如果沒有另外說明,則在同一個(gè)ThreadGroup 中一律使用Executors.defaultThreadFactory() 創(chuàng)建線程,并且這些線程具有相同的NORM_PRIORITY 優(yōu)先級(jí)和非守護(hù)進(jìn)程狀態(tài)。
通過提供不同的 ThreadFactory,可以改變線程的名稱、線程組、優(yōu)先級(jí)、守護(hù)進(jìn)程狀態(tài),等等。
如果從newThread 返回 null 時(shí)ThreadFactory 未能創(chuàng)建線程,則執(zhí)行程序?qū)⒗^續(xù)運(yùn)行,但不能執(zhí)行任何任務(wù)。
(g)、handler
拒絕策略,默認(rèn)是AbortPolicy,會(huì)拋出異常。
當(dāng)線程數(shù)已經(jīng)達(dá)到maxPoolSize,且隊(duì)列已滿,會(huì)拒絕新任務(wù)。
當(dāng)線程池被調(diào)用shutdown()后,會(huì)等待線程池里的任務(wù)執(zhí)行完畢再shutdown。如果在調(diào)用shutdown()和線程池真正shutdown之間提交任務(wù),會(huì)拒絕新任務(wù)。
- AbortPolicy 丟棄任務(wù),拋運(yùn)行時(shí)異常。
- CallerRunsPolicy 由當(dāng)前調(diào)用的任務(wù)線程執(zhí)行任務(wù)。
- DiscardPolicy 忽視,什么都不會(huì)發(fā)生。
- DiscardOldestPolicy 從隊(duì)列中踢出最先進(jìn)入隊(duì)列(最后一個(gè)執(zhí)行)的任務(wù)。
6.ThreadPoolExecutor和spring封裝的ThreadPoolTaskExecutor案例
ThreadPoolExecutor是Java的線程池 ThreadPoolTaskExecutor是spring封裝的線程池
package com.thgy.bc.common.config; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.*; @Slf4j @Configuration public class ThreadPoolConfig { @Bean public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); int i = Runtime.getRuntime().availableProcessors(); //核心線程數(shù)目 executor.setCorePoolSize(i * 2); //指定最大線程數(shù) executor.setMaxPoolSize(i * 2); //隊(duì)列中最大的數(shù)目 executor.setQueueCapacity(i * 2 * 10); //線程名稱前綴 executor.setThreadNamePrefix("ThreadPoolTaskExecutor-"); //rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù) //CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是由調(diào)用者所在的線程來執(zhí)行 //對(duì)拒絕task的處理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //當(dāng)調(diào)度器shutdown被調(diào)用時(shí)等待當(dāng)前被調(diào)度的任務(wù)完成 executor.setWaitForTasksToCompleteOnShutdown(true); //線程空閑后的最大存活時(shí)間 executor.setKeepAliveSeconds(60); //加載 executor.initialize(); log.info("初始化線程池成功"); return executor; } @Bean public ThreadPoolExecutor threadPoolExecutor() { //獲取cpu核心數(shù) int i = Runtime.getRuntime().availableProcessors(); //核心線程數(shù) int corePoolSize = i * 2; //最大線程數(shù) int maximumPoolSize = i * 2; //線程無引用存活時(shí)間 long keepAliveTime = 60; //時(shí)間單位 TimeUnit unit = TimeUnit.SECONDS; //任務(wù)隊(duì)列,接收一個(gè)整型的參數(shù),這個(gè)整型參數(shù)指的是隊(duì)列的長度, //ArrayBlockingQueue(int,boolean),boolean類型的參數(shù)是作為可重入鎖的參數(shù)進(jìn)行初始化,默認(rèn)false,另外初始化了notEmpty、notFull兩個(gè)信號(hào)量。 BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue(i * 2 * 10); //1. 同步阻塞隊(duì)列 (put,take),直接提交。直接提交策略表示線程池不對(duì)任務(wù)進(jìn)行緩存。新進(jìn)任務(wù)直接提交給線程池,當(dāng)線程池中沒有空閑線程時(shí),創(chuàng)建一個(gè)新的線程處理此任務(wù)。 // 這種策略需要線程池具有無限增長的可能性。實(shí)現(xiàn)為:SynchronousQueue //2. 有界隊(duì)列。當(dāng)線程池中線程達(dá)到corePoolSize時(shí),新進(jìn)任務(wù)被放在隊(duì)列里排隊(duì)等待處理。有界隊(duì)列(如ArrayBlockingQueue)有助于防止資源耗盡, // 但是可能較難調(diào)整和控制。隊(duì)列大小和最大池大小可能需要相互折衷:使用大型隊(duì)列和小型池可以最大限度地降低 CPU 使用率、操作系統(tǒng)資源和上下文切換開銷, // 但是可能導(dǎo)致人工降低吞吐量。如果任務(wù)頻繁阻塞(例如,如果它們是 I/O 邊界),則系統(tǒng)可能為超過您許可的更多線程安排時(shí)間。使用小型隊(duì)列通常要求較大的池大小, // CPU 使用率較高,但是可能遇到不可接受的調(diào)度開銷,這樣也會(huì)降低吞吐量。 //3. 無界隊(duì)列。使用無界隊(duì)列(例如,不具有預(yù)定義容量的 LinkedBlockingQueue)將導(dǎo)致在所有 corePoolSize 線程都忙時(shí)新任務(wù)在隊(duì)列中等待。 // 這樣,創(chuàng)建的線程就不會(huì)超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當(dāng)每個(gè)任務(wù)完全獨(dú)立于其他任務(wù),即任務(wù)執(zhí)行互不影響時(shí), // 適合于使用無界隊(duì)列;例如,在 Web 頁服務(wù)器中。這種排隊(duì)可用于處理瞬態(tài)突發(fā)請(qǐng)求,當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí),此策略允許無界線程具有增長的可能性。 //線程工廠 //defaultThreadFactory() //返回用于創(chuàng)建新線程的默認(rèn)線程工廠。 //privilegedThreadFactory() //返回一個(gè)用于創(chuàng)建與當(dāng)前線程具有相同權(quán)限的新線程的線程工廠。 ThreadFactory threadFactory =Executors.defaultThreadFactory(); //拒絕執(zhí)行處理器 RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy(); //創(chuàng)建線程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); return threadPoolExecutor; } }
7.jdk自帶的四種線程池創(chuàng)建方式
// 第一種線程池:固定個(gè)數(shù)的線程池,可以為每個(gè)CPU核綁定一定數(shù)量的線程數(shù) ExecutorService fixedThreadPool = Executors.newFixedThreadPool(processors * 2); // 緩存線程池,無上限 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); // 單一線程池,永遠(yuǎn)會(huì)維護(hù)存在一條線程 ExecutorService singleThreadPool = Executors.newSingleThreadExecutor(); // 固定個(gè)數(shù)的線程池,可以執(zhí)行延時(shí)任務(wù),也可以執(zhí)行帶有返回值的任務(wù)。 ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
8.調(diào)用線程池的線程案例
public List<AccountRecordVO> requestTest() throws ExecutionException, InterruptedException { List<String> ids = Lists.newArrayList(); ids.add("1"); ids.add("2"); ids.add("3"); ids.add("4"); //有返回值的情況,定義接收返回值 List<AccountRecordVO> futureList2 = Lists.newArrayList(); //分布式計(jì)數(shù)器,若業(yè)務(wù)不需要?jiǎng)t可以不定義 CountDownLatch countDownLatch = new CountDownLatch(ids.size()); for (String id : ids) { //調(diào)用線程池的線程執(zhí)行任務(wù) threadPoolTaskExecutor.submit(new Runnable() { @Override public void run() { test(Lists.newArrayList(id),futureList2); //計(jì)數(shù)器-1 countDownLatch.countDown(); } }); } //await阻塞,直到計(jì)數(shù)器為0 countDownLatch.await(); System.out.println("主線程"+"===="); return futureList2; } public List<AccountRecordVO> test(List<String> ids, List<AccountRecordVO> list2){ //隨便寫的業(yè)務(wù)邏輯代碼,無實(shí)際意義,僅作演示 System.out.println("線程體" + "===="); List<AccountRecordVO> accountRecordVOS = Lists.newArrayList(); int i = 0; AccountRecordVO accountRecordVO = new AccountRecordVO(); accountRecordVO.setUserId("123"); accountRecordVO.setAmount(12333); for (String id : ids){ accountRecordVOS.add(accountRecordVO); list2.add(accountRecordVO); } try{ Thread.sleep(Long.valueOf("1000")); }catch (Exception e){ log.error(e.getMessage()); } System.out.println("線程體結(jié)束" + "===="); return accountRecordVOS; }
到此這篇關(guān)于Java中的線程池ThreadPoolExecutor深入解析的文章就介紹到這了,更多相關(guān)Java線程池ThreadPoolExecutor內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實(shí)現(xiàn)解壓zip和rar包的示例代碼
這篇文章主要為大家詳細(xì)介紹了如何使用Java實(shí)現(xiàn)解壓zip和rar包,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-01-01SpringCloud的JPA連接PostgreSql的教程
這篇文章主要介紹了SpringCloud的JPA接入PostgreSql 教程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-06-06分布式開發(fā)醫(yī)療掛號(hào)系統(tǒng)數(shù)據(jù)字典模塊前后端實(shí)現(xiàn)
這篇文章主要為大家介紹了分布式開發(fā)醫(yī)療掛號(hào)系統(tǒng)數(shù)據(jù)字典模塊前后端實(shí)現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-04-04JAVA技術(shù)實(shí)現(xiàn)上傳下載文件到FTP服務(wù)器(完整)
這篇文章主要介紹了JAVA技術(shù)實(shí)現(xiàn)上傳下載文件到FTP服務(wù)器(完整),本文使用 Apache Jakarta Commons Net(commons-net-3.3.jar) 基于FileZilla Server服務(wù)器實(shí)現(xiàn)FTP服務(wù)器上文件的上傳/下載/刪除等操作,需要的朋友可以參考下2015-07-07小白也可以學(xué)會(huì)的Java NIO的Write事件
剛開始對(duì)NIO的寫操作理解的不深,不知道為什么要注冊(cè)寫事件,何時(shí)注冊(cè)寫事件,為什么寫完之后要取消注冊(cè)寫事件,今天特地整理了本篇文章,需要的朋友可以參考下2021-06-06