關(guān)于線程池創(chuàng)建、執(zhí)行、銷毀的原理及分析
線程池的執(zhí)行原理
假設(shè)最大核心數(shù)是2,非核心線程數(shù)為1,隊列長度是3
來第一個任務(wù)的時候,沒有工作線程在工作,需要創(chuàng)建一個
來第二個任務(wù)的時候,發(fā)現(xiàn)當(dāng)前核心線程數(shù)小于最大核心線程數(shù),所以繼續(xù)創(chuàng)建線程來處理任務(wù)
當(dāng)來第三個任務(wù)的時候,發(fā)現(xiàn)當(dāng)前核心線程數(shù)已經(jīng)等于最大核心線程數(shù)了,所以把新來的任務(wù)放到taskQueue中
后面來第四個、第五個任務(wù)也會放在taskQueue中
當(dāng)來第六個任務(wù)的時候,發(fā)現(xiàn)taskQueue已經(jīng)滿了,所以會創(chuàng)建一個非核心線程來處理任務(wù)
當(dāng)來第七個任務(wù)的時候,因為線程數(shù)量到最大限度了,taskQueue也滿了,所以就會走拒絕策略,把其中一個任務(wù)給拋棄掉,具體拋棄哪個需要根據(jù)選擇的拒絕策略來定。
創(chuàng)建線程這里需要考慮并發(fā)的問題,即多個任務(wù)同時過來了,需要串行創(chuàng)建線程,否則,可能會導(dǎo)致超賣的情況(即創(chuàng)建的線程超過了最大線程數(shù)),具體是通過CAS樂觀鎖實現(xiàn),代碼解釋如下:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get();//獲取當(dāng)前線程池的控制狀態(tài)。 int rs = runStateOf(c);//獲取當(dāng)前線程池的運行狀態(tài)。 // Check if queue empty only if necessary. //下面這個條件判斷用于判斷線程池是否處于關(guān)閉狀態(tài),并且任務(wù)隊列不為空。如果線程池的狀態(tài)大于等于SHUTDOWN,并且不滿足線程池狀態(tài)為SHUTDOWN、首個任務(wù)為null且任務(wù)隊列為空的條件,則返回false。這個判斷是為了確保在線程池關(guān)閉時,不再添加新的工作線程。 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c);//獲取當(dāng)前線程池中的工作線程數(shù)量。 //這個條件判斷用于判斷工作線程數(shù)量是否達(dá)到上限。如果工作線程數(shù)量大于等于CAPACITY(工作線程數(shù)量的上限)或者大于等于核心線程數(shù)(如果core為true)或最大線程數(shù)(如果core為false),則返回false。這個判斷是為了確保工作線程數(shù)量不超過線程池的限制。 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //嘗試通過CAS(比較并交換)操作增加工作線程數(shù)量。如果成功增加工作線程數(shù)量,則跳出循環(huán),繼續(xù)執(zhí)行后續(xù)邏輯。 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // 重新讀取當(dāng)前線程池的控制狀態(tài)。 //再次判斷線程池的運行狀態(tài)是否發(fā)生了變化。如果運行狀態(tài)發(fā)生了變化,則繼續(xù)重試內(nèi)部循環(huán)。這個判斷是為了處理在CAS操作過程中,線程池的狀態(tài)發(fā)生了變化的情況。 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } ...創(chuàng)建線程的邏輯忽略... }
線程執(zhí)行
花開兩朵,各表一枝。
上面說了任務(wù)來了之后,是怎么創(chuàng)建線程的,又是怎么暫存任務(wù)的。這一節(jié)介紹一下線程是怎么執(zhí)行任務(wù)的,以及在不用的時候,線程怎么被銷毀或者保活。
線程池創(chuàng)建線程并調(diào)了thread的start方法之后,該線程會走到下面的runWorker方法。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
當(dāng)一個線程執(zhí)行完就會自動退出,但是我們知道線程池中的核心線程會一直存活著,想要一直存活,不退出程序就可以了,即循環(huán),從上面的代碼也可以看出來確實是這樣的。但是還有一個疑問,核心線程是一直存活的,但是非核心線程在一定情況是會銷毀的,他們用的是一套代碼邏輯,該怎么實現(xiàn)呢?關(guān)鍵點就在getTask這個方法中。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //銷毀線程需要滿足這兩個條件:1. (允許核心線程銷毀 || 線程數(shù)大于核心線程數(shù))&& 達(dá)到了銷毀時間;2. 任務(wù)隊列中沒有任務(wù)了 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) // 這里返回了null,外層方法就跳出了while循環(huán),從而結(jié)束該線程 return null; continue; } try { // 超時時間就是在這里設(shè)置的,如果允許超時銷毀,那么就用poll進(jìn)行拉取任務(wù),超過了keepAliveTime就返回null。take是阻塞性等待 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
從上面的解析可以看出,如果隊列中沒有任務(wù)時,小于核心數(shù)的線程(核心線程數(shù)不銷毀的情況下)會一直阻塞在獲取任務(wù)的方法,直到返回任務(wù)。(判斷阻塞時并沒有核心線程和非核心線程的概念,只要保證創(chuàng)建出來的線程銷毀到符合預(yù)期數(shù)量就ok)。而且執(zhí)行完后 會繼續(xù)循環(huán)執(zhí)行g(shù)etTask的邏輯,不斷的處理任務(wù)。
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
詳解Spring batch 入門學(xué)習(xí)教程(附源碼)
本篇文章主要介紹了Spring batch 入門學(xué)習(xí)教程(附源碼),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-11-11Spring之spring-context-indexer依賴詳解
這篇文章主要介紹了Spring之spring-context-indexer依賴詳解,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11Mybatis如何從數(shù)據(jù)庫中獲取數(shù)據(jù)存為List類型(存為model)
這篇文章主要介紹了Mybatis如何從數(shù)據(jù)庫中獲取數(shù)據(jù)存為List類型(存為model),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01如何用注解的方式實現(xiàn)Mybatis插入數(shù)據(jù)時返回自增的主鍵Id
這篇文章主要介紹了如何用注解的方式實現(xiàn)Mybatis插入數(shù)據(jù)時返回自增的主鍵Id,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-07-07