Java線程池中的各個參數(shù)如何合理設(shè)置
一、前言
在開發(fā)過程中,好多場景要用到線程池。每次都是自己根據(jù)業(yè)務(wù)場景來設(shè)置線程池中的各個參數(shù)。
這兩天又有需求碰到了,索性總結(jié)一下方便以后再遇到可以直接看著用。
雖說根據(jù)業(yè)務(wù)場景來設(shè)置各個參數(shù)的值,但有些萬變不離其宗,掌握它的原理對如何用好線程池起了至關(guān)重要的作用。
那我們接下來就來進行線程池的分析。
二、ThreadPoolExecutor的重要參數(shù)
我們先來看下ThreadPoolExecutor的帶的那些重要參數(shù)的構(gòu)造器。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... }
1、corePoolSize: 核心線程數(shù)
這個應(yīng)該是最重要的參數(shù)了,所以如何合理的設(shè)置它十分重要。
- 核心線程會一直存活,及時沒有任務(wù)需要執(zhí)行。
- 當線程數(shù)小于核心線程數(shù)時,即使有線程空閑,線程池也會優(yōu)先創(chuàng)建新線程處理。
- 設(shè)置allowCoreThreadTimeout=true(默認false)時,核心線程會超時關(guān)閉。
如何設(shè)置好的前提我們要很清楚的知道CPU密集型和IO密集型的區(qū)別。
(1)、CPU密集型
CPU密集型也叫計算密集型,指的是系統(tǒng)的硬盤、內(nèi)存性能相對CPU要好很多,此時,系統(tǒng)運作大部分的狀況是CPU Loading 100%,CPU要讀/寫I/O(硬盤/內(nèi)存),I/O在很短的時間就可以完成,而CPU還有許多運算要處理,CPU Loading 很高。
在多重程序系統(tǒng)中,大部分時間用來做計算、邏輯判斷等CPU動作的程序稱之CPU bound。例如一個計算圓周率至小數(shù)點一千位以下的程序,在執(zhí)行的過程當中絕大部分時間用在三角函數(shù)和開根號的計算,便是屬于CPU bound的程序。
CPU bound的程序一般而言CPU占用率相當高。這可能是因為任務(wù)本身不太需要訪問I/O設(shè)備,也可能是因為程序是多線程實現(xiàn)因此屏蔽掉了等待I/O的時間。
(2)、IO密集型
IO密集型指的是系統(tǒng)的CPU性能相對硬盤、內(nèi)存要好很多,此時,系統(tǒng)運作,大部分的狀況是CPU在等I/O (硬盤/內(nèi)存) 的讀/寫操作,此時CPU Loading并不高。
I/O bound的程序一般在達到性能極限時,CPU占用率仍然較低。這可能是因為任務(wù)本身需要大量I/O操作,而pipeline做得不是很好,沒有充分利用處理器能力。
好了,了解完了以后我們就開搞了。
(3)、先看下機器的CPU核數(shù),然后在設(shè)定具體參數(shù):
自己測一下自己機器的核數(shù)
System.out.println(Runtime.getRuntime().availableProcessors());
即CPU核數(shù) = Runtime.getRuntime().availableProcessors()
(4)、分析下線程池處理的程序是CPU密集型還是IO密集型
CPU密集型:corePoolSize = CPU核數(shù) + 1
IO密集型:corePoolSize = CPU核數(shù) * 2
2、maximumPoolSize:最大線程數(shù)
- 當線程數(shù)>=corePoolSize,且任務(wù)隊列已滿時。線程池會創(chuàng)建新線程來處理任務(wù)。
- 當線程數(shù)=maxPoolSize,且任務(wù)隊列已滿時,線程池會拒絕處理任務(wù)而拋出異常。
3、keepAliveTime:線程空閑時間
- 當線程空閑時間達到keepAliveTime時,線程會退出,直到線程數(shù)量=corePoolSize。
- 如果allowCoreThreadTimeout=true,則會直到線程數(shù)量=0。
4、queueCapacity:任務(wù)隊列容量(阻塞隊列)
- 當核心線程數(shù)達到最大時,新任務(wù)會放在隊列中排隊等待執(zhí)行
5、allowCoreThreadTimeout:允許核心線程超時
6、rejectedExecutionHandler:任務(wù)拒絕處理器
兩種情況會拒絕處理任務(wù):
- 當線程數(shù)已經(jīng)達到maxPoolSize,且隊列已滿,會拒絕新任務(wù)。
- 當線程池被調(diào)用shutdown()后,會等待線程池里的任務(wù)執(zhí)行完畢再shutdown。如果在調(diào)用shutdown()和線程池真正shutdown之間提交任務(wù),會拒絕新任務(wù)。
線程池會調(diào)用rejectedExecutionHandler來處理這個任務(wù)。如果沒有設(shè)置默認是AbortPolicy,會拋出異常。
ThreadPoolExecutor 采用了策略的設(shè)計模式來處理拒絕任務(wù)的幾種場景。
這幾種策略模式都實現(xiàn)了RejectedExecutionHandler 接口。
- AbortPolicy 丟棄任務(wù),拋運行時異常。
- CallerRunsPolicy 執(zhí)行任務(wù)。
- DiscardPolicy 忽視,什么都不會發(fā)生。
- DiscardOldestPolicy 從隊列中踢出最先進入隊列(最后一個執(zhí)行)的任務(wù)。
三、如何設(shè)置參數(shù)
默認值:
corePoolSize = 1 maxPoolSize = Integer.MAX_VALUE queueCapacity = Integer.MAX_VALUE keepAliveTime = 60s allowCoreThreadTimeout = false rejectedExecutionHandler = AbortPolicy()
如何來設(shè)置呢?
需要根據(jù)幾個值來決定
tasks :每秒的任務(wù)數(shù),假設(shè)為500~1000
taskcost:每個任務(wù)花費時間,假設(shè)為0.1s
responsetime:系統(tǒng)允許容忍的最大響應(yīng)時間,假設(shè)為1s
做幾個計算
corePoolSize = 每秒需要多少個線程處理?
threadcount = tasks/(1/taskcost) = tasks*taskcout = (500 ~ 1000)*0.1 = 50~100 個線程。
corePoolSize設(shè)置應(yīng)該大于50。
根據(jù)8020原則,如果80%的每秒任務(wù)數(shù)小于800,那么corePoolSize設(shè)置為80即可。
queueCapacity = (coreSizePool/taskcost)*responsetime
計算可得 queueCapacity = 80/0.1*1 = 800。意思是隊列里的線程可以等待1s,超過了的需要新開線程來執(zhí)行。
切記不能設(shè)置為Integer.MAX_VALUE,這樣隊列會很大,線程數(shù)只會保持在corePoolSize大小,當任務(wù)陡增時,不能新開線程來執(zhí)行,響應(yīng)時間會隨之陡增。
maxPoolSize 最大線程數(shù)在生產(chǎn)環(huán)境上我們往往設(shè)置成corePoolSize一樣,這樣可以減少在處理過程中創(chuàng)建線程的開銷。
rejectedExecutionHandler:根據(jù)具體情況來決定,任務(wù)不重要可丟棄,任務(wù)重要則要利用一些緩沖機制來處理。
keepAliveTime和allowCoreThreadTimeout采用默認通常能滿足。
以上都是理想值,實際情況下要根據(jù)機器性能來決定。如果在未達到最大線程數(shù)的情況機器cpu load已經(jīng)滿了,則需要通過升級硬件和優(yōu)化代碼,降低taskcost來處理。
以下是我自己的的線程池配置:
@Configuration public class ConcurrentThreadGlobalConfig { @Bean public ThreadPoolTaskExecutor defaultThreadPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //核心線程數(shù)目 executor.setCorePoolSize(65); //指定最大線程數(shù) executor.setMaxPoolSize(65); //隊列中最大的數(shù)目 executor.setQueueCapacity(650); //線程名稱前綴 executor.setThreadNamePrefix("DefaultThreadPool_"); //rejection-policy:當pool已經(jīng)達到max size的時候,如何處理新任務(wù) //CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是由調(diào)用者所在的線程來執(zhí)行 //對拒絕task的處理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //線程空閑后的最大存活時間 executor.setKeepAliveSeconds(60); //加載 executor.initialize(); return executor; } }
四、線程池隊列的選擇
workQueue - 當線程數(shù)目超過核心線程數(shù)時用于保存任務(wù)的隊列。主要有3種類型的BlockingQueue可供選擇:無界隊列,有界隊列和同步移交。從參數(shù)中可以看到,此隊列僅保存實現(xiàn)Runnable接口的任務(wù)。
這里再重復(fù)一下新任務(wù)進入時線程池的執(zhí)行策略:
- 當正在運行的線程小于corePoolSize,線程池會創(chuàng)建新的線程。
- 當大于corePoolSize而任務(wù)隊列未滿時,就會將整個任務(wù)塞入隊列。
- 當大于corePoolSize而且任務(wù)隊列滿時,并且小于maximumPoolSize時,就會創(chuàng)建新額線程執(zhí)行任務(wù)。
- 當大于maximumPoolSize時,會根據(jù)handler策略處理線程。
1、無界隊列
隊列大小無限制,常用的為無界的LinkedBlockingQueue,使用該隊列作為阻塞隊列時要尤其當心,當任務(wù)耗時較長時可能會導(dǎo)致大量新任務(wù)在隊列中堆積最終導(dǎo)致OOM。
閱讀代碼發(fā)現(xiàn),Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue,而博主踩到的就是這個坑,當QPS很高,發(fā)送數(shù)據(jù)很大,大量的任務(wù)被添加到這個無界LinkedBlockingQueue 中,導(dǎo)致cpu和內(nèi)存飆升服務(wù)器掛掉。
當然這種隊列,maximumPoolSize 的值也就無效了。
當每個任務(wù)完全獨立于其他任務(wù),即任務(wù)執(zhí)行互不影響時,適合于使用無界隊列;例如,在 Web 頁服務(wù)器中。
這種排隊可用于處理瞬態(tài)突發(fā)請求,當命令以超過隊列所能處理的平均數(shù)連續(xù)到達時,此策略允許無界線程具有增長的可能性。
2、有界隊列
當使用有限的 maximumPoolSizes 時,有界隊列有助于防止資源耗盡,但是可能較難調(diào)整和控制。
常用的有兩類,一類是遵循FIFO原則的隊列如ArrayBlockingQueue,另一類是優(yōu)先級隊列如PriorityBlockingQueue。
PriorityBlockingQueue中的優(yōu)先級由任務(wù)的Comparator決定。
使用有界隊列時隊列大小需和線程池大小互相配合,線程池較小有界隊列較大時可減少內(nèi)存消耗,降低cpu使用率和上下文切換,但是可能會限制系統(tǒng)吞吐量。
3、同步移交隊列
如果不希望任務(wù)在隊列中等待而是希望將任務(wù)直接移交給工作線程,可使用SynchronousQueue作為等待隊列。
SynchronousQueue不是一個真正的隊列,而是一種線程之間移交的機制。要將一個元素放入SynchronousQueue中,必須有另一個線程正在等待接收這個元素。
只有在使用無界線程池或者有飽和策略時才建議使用該隊列。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Spring Cloud 部署時使用 Kubernetes 作為注冊中心和配置中
Spring Cloud Kubernetes提供了使用Kubernete本地服務(wù)的Spring Cloud通用接口實現(xiàn),這篇文章主要介紹了Spring Cloud 部署時如何使用 Kubernetes 作為注冊中心和配置中心,需要的朋友可以參考下2024-05-05面向?qū)ο缶幊?Java中的抽象數(shù)據(jù)類型
面向?qū)ο缶幊?Java中的抽象數(shù)據(jù)類型...2006-12-12