java多線(xiàn)程教程之如何使用線(xiàn)程池詳解
為什么要用線(xiàn)程池?
諸如 Web 服務(wù)器、數(shù)據(jù)庫(kù)服務(wù)器、文件服務(wù)器或郵件服務(wù)器之類(lèi)的許多服務(wù)器應(yīng)用程序都面臨處理來(lái)自某些遠(yuǎn)程來(lái)源的大量短小的任務(wù)。請(qǐng)求以某種方式到達(dá)服務(wù)器,這種方式可能是通過(guò)網(wǎng)絡(luò)協(xié)議(例如 HTTP、FTP 或 POP)、通過(guò) JMS 隊(duì)列或者可能通過(guò)輪詢(xún)數(shù)據(jù)庫(kù)。不管請(qǐng)求如何到達(dá),服務(wù)器應(yīng)用程序中經(jīng)常出現(xiàn)的情況是:?jiǎn)蝹€(gè)任務(wù)處理的時(shí)間很短而請(qǐng)求的數(shù)目卻是巨大的。
只有當(dāng)任務(wù)都是同類(lèi)型并且相互獨(dú)立時(shí),線(xiàn)程池的性能才能達(dá)到最佳。如果將運(yùn)行時(shí)間較長(zhǎng)的與運(yùn)行時(shí)間較短的任務(wù)混合在一起,那么除非線(xiàn)程池很大,否則將可能造成擁塞,如果提交的任務(wù)依賴(lài)于其他任務(wù),那么除非線(xiàn)程池?zé)o線(xiàn)大,否則將可能造成死鎖。
例如饑餓死鎖:線(xiàn)程池中的任務(wù)需要無(wú)限等待一些必須由池中其他任務(wù)才能提供的資源或條件。
ThreadPoolExecutor的通用構(gòu)造函數(shù):(在調(diào)用完構(gòu)造函數(shù)之后可以繼續(xù)定制ThreadPoolExecutor)
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory, RejectedExecutionHandler handler){ //... }
飽和策略:
ThreadPoolExecutor允許提供一個(gè)BlockingQueue來(lái)保存等待執(zhí)行的任務(wù)。
當(dāng)有界隊(duì)列被填滿(mǎn)后,飽和策略開(kāi)始發(fā)揮作用??梢酝ㄟ^(guò)調(diào)用setRejectedExecutionHandler來(lái)修改。
中止是默認(rèn)的飽和策略,該策略將拋出未檢查的RejectedExecutionException,調(diào)用者可以捕獲這個(gè)異常,然后根據(jù)需求編寫(xiě)自己的處理代碼。
調(diào)用者運(yùn)行策略實(shí)現(xiàn)了一種調(diào)節(jié)機(jī)制,該策略既不會(huì)拋棄任務(wù),也不會(huì)拋出異常,而是將某些任務(wù)回退到調(diào)用者,從而降低新任務(wù)的流量。
例如對(duì)于WebServer,當(dāng)線(xiàn)程池中的所有線(xiàn)程都被占用,并且工作隊(duì)列被填滿(mǎn)后,下一個(gè)任務(wù)在調(diào)用execute時(shí)在主線(xiàn)程中執(zhí)行。
由于執(zhí)行任務(wù)需要一定的時(shí)間,因此主線(xiàn)程至少在一段時(shí)間內(nèi)不能提交任何任務(wù),從而使得工作者線(xiàn)程有時(shí)間來(lái)處理完正在執(zhí)行的任務(wù)。
在這期間,主線(xiàn)程不會(huì)調(diào)用accept,因此到達(dá)的請(qǐng)求將被保存在TCP層的隊(duì)列中而不是在應(yīng)用程序的隊(duì)列中,如果持續(xù)過(guò)載,那么TCP層最終發(fā)現(xiàn)它的請(qǐng)求隊(duì)列被填滿(mǎn),同樣會(huì)開(kāi)始拋棄請(qǐng)求。
因此當(dāng)服務(wù)器過(guò)載時(shí),這種過(guò)載會(huì)逐漸向外蔓延開(kāi)來(lái)---從線(xiàn)程池到工作隊(duì)列到應(yīng)用程序再到TCP層,最終到達(dá)客戶(hù)端,導(dǎo)致服務(wù)器在高負(fù)載下實(shí)現(xiàn)一種平緩的性能降低。
exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
當(dāng)工作隊(duì)列被填滿(mǎn)后,沒(méi)有預(yù)定于的飽和策略來(lái)阻塞execute。而通過(guò)Semaphore來(lái)現(xiàn)在任務(wù)的到達(dá)率,可以實(shí)現(xiàn)。
/** * 設(shè)置信號(hào)量的上界設(shè)置為線(xiàn)程池的大小加上可排隊(duì)任務(wù)的數(shù)量,控制正在執(zhí)行和等待執(zhí)行的任務(wù)數(shù)量。 */ public class BoundedExecutor { private final Executor exec; private final Semaphore semaphore; public BoundedExecutor(Executor exec,int bound){ this.exec = exec; this.semaphore = new Semaphore(bound); } public void submitTask(final Runnable task) throws InterruptedException{ semaphore.acquire(); try{ exec.execute(new Runnable(){ public void run(){ try{ task.run(); }finally{ semaphore.release(); } } }); }catch(RejectedExecutionException e){ semaphore.release(); } } }
線(xiàn)程工廠(chǎng)
線(xiàn)程池配置信息中可以定制線(xiàn)程工廠(chǎng),在ThreadFactory中只定義了一個(gè)方法newThread,每當(dāng)線(xiàn)程池需要?jiǎng)?chuàng)建一個(gè)新線(xiàn)程時(shí)都會(huì)調(diào)用這個(gè)方法。
public interface ThreadFactory{ Thread newThread(Runnable r); }
// 示例:將一個(gè)特定于線(xiàn)程池的名字傳遞給MyThread的構(gòu)造函數(shù),從而可以再線(xiàn)程轉(zhuǎn)儲(chǔ)和錯(cuò)誤日志信息中區(qū)分來(lái)自不同線(xiàn)程池的線(xiàn)程。 public class MyThreadFactory implements ThreadFactory{ private final String poolName; public MyThreadFactory(String poolName){ this.poolName = poolName; } public Thread newThread(Runnable runnable){ return new MyThread(runnable,poolName); } }
// 示例:為線(xiàn)程指定名字,設(shè)置自定義UncaughtExceptionHandler向Logger中寫(xiě)入信息及維護(hù)一些統(tǒng)計(jì)信息以及在線(xiàn)程被創(chuàng)建或者終止時(shí)把調(diào)試消息寫(xiě)入日志。 public class MyThread extends Thread{ public static final String default_name = "myThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyThread(Runnable runnable){ this(runnable,default_name); } public MyThread(Runnable runnable, String defaultName) { super(runnable,defaultName + "-" + created.incrementAndGet()); setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE,"uncaught in thread " + t.getName(), e); } }); } public void run(){ boolean debug = debugLifecycle; if(debug){ log.log(Level.FINE,"created " + getName()); } try{ alive.incrementAndGet(); super.run(); }finally{ alive.decrementAndGet(); if(debug){ log.log(Level.FINE,"Exiting " + getName()); } } } }
擴(kuò)展ThreadPoolExecutor
在線(xiàn)程池完成關(guān)閉操作時(shí)調(diào)用terminated,也就是在所有任務(wù)都已經(jīng)完成并且所有工作者線(xiàn)程也已經(jīng)關(guān)閉后。terminated可以用來(lái)釋放Executor在其生命周期里分配的各種資源,此外還可以執(zhí)行發(fā)送通知、記錄日志或者收集finalize統(tǒng)計(jì)信息等操作。
示例:給線(xiàn)程池添加統(tǒng)計(jì)信息
/** * TimingThreadPool中給出了一個(gè)自定義的線(xiàn)程池,通過(guò)beforeExecute、afterExecute、terminated等方法來(lái)添加日志記錄和統(tǒng)計(jì)信息收集。 * 為了測(cè)量任務(wù)的運(yùn)行時(shí)間,beforeExecute必須記錄開(kāi)始時(shí)間并把它保存到一個(gè)afterExecute可用訪(fǎng)問(wèn)的地方。 * 因?yàn)檫@些方法將在執(zhí)行任務(wù)的線(xiàn)程中調(diào)用,因此beforeExecute可以把值保存到一個(gè)ThreadLocal變量中。然后由afterExecute來(lái)取。 * 在TimingThreadPool中使用了兩個(gè)AtomicLong變量,分別用于記錄已處理的任務(wù)和總的處理時(shí)間,并通過(guò)包含平均任務(wù)時(shí)間的日志消息。 */ public class TimingThreadPool extends ThreadPoolExecutor{ public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } private final ThreadLocal<Long> startTime = new ThreadLocal<Long>(); private final Logger log = Logger.getLogger("TimingThreadPool"); private final AtomicLong numTasks = new AtomicLong(); private final AtomicLong totalTime = new AtomicLong(); protected void beforeExecute(Thread t,Runnable r){ super.beforeExecute(t, r); log.fine(String.format("Thread %s: start %s", t,r)); startTime.set(System.nanoTime()); } protected void afterExecute(Throwable t,Runnable r){ try{ long endTime = System.nanoTime(); long taskTime = endTime - startTime.get(); numTasks.incrementAndGet(); totalTime.addAndGet(taskTime); log.fine(String.format("Thread %s: end %s, time=%dns", t,r,taskTime)); }finally{ super.afterExecute(r, t); } } protected void terminated(){ try{ log.info(String.format("Terminated: avg time=%dns", totalTime.get()/numTasks.get())); }finally{ super.terminated(); } } }
#筆記內(nèi)容參考 《java并發(fā)編程實(shí)戰(zhàn)》
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)腳本之家的支持。
相關(guān)文章
淺析Spring配置中的classpath:與classpath*:的區(qū)別
這篇文章主要介紹了Spring配置中的"classpath:"與"classpath*:"的區(qū)別,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08SpringBoot如何動(dòng)態(tài)修改Scheduled(系統(tǒng)啟動(dòng)默認(rèn)執(zhí)行,動(dòng)態(tài)修改)
這篇文章主要介紹了SpringBoot如何動(dòng)態(tài)修改Scheduled(系統(tǒng)啟動(dòng)默認(rèn)執(zhí)行,動(dòng)態(tài)修改)的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07SSH框架網(wǎng)上商城項(xiàng)目第17戰(zhàn)之購(gòu)物車(chē)基本功能
這篇文章主要為大家詳細(xì)介紹了SSH框架網(wǎng)上商城項(xiàng)目第17戰(zhàn)之購(gòu)物車(chē)基本功能的實(shí)現(xiàn)過(guò)程,感興趣的小伙伴們可以參考一下2016-06-06java的內(nèi)部類(lèi)和外部類(lèi)用法講解
本文詳細(xì)講解了java的內(nèi)部類(lèi)和外部類(lèi)用法,文中通過(guò)示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-12-12深入了解Java核心類(lèi)庫(kù)--String類(lèi)
這篇文章主要為大家詳細(xì)介紹了java String類(lèi)定義與使用的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能給你帶來(lái)幫助2021-07-07MyBatis多對(duì)多一對(duì)多關(guān)系查詢(xún)嵌套處理
這篇文章主要為大家介紹了MyBatis多對(duì)多一對(duì)多關(guān)系查詢(xún)嵌套處理示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10詳解Java如何在A(yíng)rray和List之間進(jìn)行轉(zhuǎn)換
這篇文章主要為大家介紹了詳解Java如何在A(yíng)rray和List之間進(jìn)行轉(zhuǎn)換的方法示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05關(guān)于jdk環(huán)境變量配置以及javac不是內(nèi)部或外部命令的解決
這篇文章主要介紹了關(guān)于jdk環(huán)境變量配置以及javac不是內(nèi)部或外部命令的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-01-01如何解決springmvc文件下載,內(nèi)容損壞的問(wèn)題
這篇文章主要介紹了解決springmvc文件下載,內(nèi)容損壞的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06