Java并發(fā)線程池實例分析講解
一.為什么要用線程池
先來看個簡單的例子
1.直接new Thread的情況:
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
final List<Integer> list = new ArrayList<>();
final Random random = new Random();
for (int i = 0; i < 100000; i++) {
Thread thread = new Thread() {
@Override
public void run() {
list.add(random.nextInt());
}
};
thread.start();
thread.join();
}
System.out.println("執(zhí)行時間:" + (System.currentTimeMillis() - start));
System.out.println("執(zhí)行大?。? + list.size());
}
執(zhí)行時間:6437
執(zhí)行大?。?00000
2.使用線程池時
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
final List<Integer> list = new ArrayList<>();
final Random random = new Random();
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100000; i++) {
executorService.execute(()->{
list.add(random.nextInt());
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
System.out.println("執(zhí)行時間:" + (System.currentTimeMillis() - start));
System.out.println("執(zhí)行大小:" + list.size());
}
執(zhí)行時間:82
執(zhí)行大?。?00000
從執(zhí)行時間可以看出來,使用線程池的效率要遠遠超過直接new Thread。
二.線程池的好處
- 降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
- 提高響應(yīng)速度。當(dāng)任務(wù)到達時,任務(wù)可以不需要的等到線程創(chuàng)建就能立即執(zhí)行。
- 提高線程的可管理性。線程是稀缺資源,如果無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控。
三.原理解析

四.4種線程池
1.newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
特點:newCachedThreadPool會創(chuàng)建一個可緩存線程池,如果當(dāng)前線程池的長度超過了處理的需要時,可以靈活的回收空閑的線程,當(dāng)需要增加時,它可以靈活的添加新的線程,而不會對線程池的長度作任何限制。
因為其最大線程數(shù)是Integer.MAX_VALUE,若新建的線程數(shù)多了,會超過機器的可用內(nèi)存而OOM,但是因為其不是無界隊列,所以在OOM之前一般會CPU 100%。
2.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
該方法會創(chuàng)建一個固定長度的線程池,控制最大并發(fā)數(shù),超出的線程會在隊列中等待,因為線程的數(shù)量是固定的,但是阻塞隊列是無界的,如果請求數(shù)較多時,會造成阻塞隊列越來越長,超出可用內(nèi)存 進而OOM,所以要根據(jù)系統(tǒng)資源設(shè)置線程池的大小。Runtime.getRuntime().availableProcessors()
3.newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
會創(chuàng)建一個單一的線程,前一個任務(wù)執(zhí)行完畢才會執(zhí)行下一個線程,F(xiàn)IFO,保證順序執(zhí)行。但是高并發(fā)下不太適用
4.newScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
創(chuàng)建一個固定長度的線程池,而且支持定時的以及周期性的任務(wù)執(zhí)行,所有任務(wù)都是串行執(zhí)行的,同一時間只能有一個任務(wù)在執(zhí)行,前一個任務(wù)的延遲或異常都將會影響到之后的任務(wù)。
阿里規(guī)范中不推薦使用以上線程池,推薦使用自定義的線程池,當(dāng)然如果你的項目中的數(shù)量級比較小的話那到?jīng)]什么影響。

自定義線程池:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10),new MonkeyRejectedExecutionHandler());
執(zhí)行優(yōu)先級 : 核心線程>非核心線程>隊列
提交優(yōu)先級 : 核心線程>隊列>非核心線程
五.線程池處理流程
流程圖:

六.源碼分析
流程圖

ThreadPoolExecutor的execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//1.判斷線程數(shù)是否小于核心線程數(shù),如果是則使用入?yún)⑷蝿?wù)通過addWorker方法創(chuàng)建一個新的線程,如果能完成新線程創(chuàng)建execute方法結(jié)束,成功提交任務(wù)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2.在第一步?jīng)]有完成任務(wù)提交;狀態(tài)為運行并且能成功加入任務(wù)到工作隊列后,再進行一次check,如果狀態(tài)在任務(wù)加入隊列后變?yōu)榱朔沁\行(有可能是在執(zhí)行到這里線程池shtdown了),非運行狀態(tài)下當(dāng)然是需要reject;
// offer和add方法差不多,add方法就是調(diào)用的offer,只不過比offer多拋出一個異常 throw new IllegalStateException("Queue full")
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
//3.判斷當(dāng)前工作線程池數(shù)是否為0,如果是創(chuàng)建一個null任務(wù),任務(wù)在堵塞隊列存在了就會從隊列中取出這樣做的意義是保證線程池在running狀態(tài)必須有一個任務(wù)在執(zhí)行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//4.如果不能加入任務(wù)到工作隊列,將嘗試使用任務(wù)新增一個線程,如果失敗,則是線程池已經(jīng)shutdown或者線程池已經(jīng)達到飽和狀態(tài),所以reject.拒絕策略不僅僅是在飽和狀態(tài)下使用,在線程池進入到關(guān)閉階段同樣需要使用到;
else if (!addWorker(command, false))
reject(command);
}
}
再進入到addWork方法
private boolean addWorker(Runnable firstTask, boolean core) {
// goto寫法 重試
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
//線程狀態(tài)非運行并且非shutdown狀態(tài)任務(wù)為空,隊列非空就不能新增線程了
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
//當(dāng)前線程達到了最大閾值 就不再新增線程了
return false;
if (compareAndIncrementWorkerCount(c))
//ctl+1工作線程池數(shù)量+1如果成功 就跳出死循環(huán)
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
//進來的狀態(tài)和此時的狀態(tài)發(fā)生改變重頭開始重試
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//內(nèi)部類封裝了線程和任務(wù) 通過threadfactory創(chuàng)建線程
//毎一個worker就是一個線程數(shù)
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//重新獲取線程狀態(tài)
int rs = runStateOf(ctl.get());
// 狀態(tài)小于shutdown 就是running狀態(tài) 或者 為shutdown并且firstTask為空是從隊列中處理 任務(wù)那就可以放到集合中
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 線程還沒start就是alive就直接異常
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
// 記錄最大線程數(shù)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//失敗回退從wokers移除w線程數(shù)減1嘗試結(jié)束線程池
addWorkerFailed(w);
}
return workerStarted;
}
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
//正在運行woker線程
final Thread thread;
/** Initial task to run. Possibly null. */
//傳入的任務(wù)
Runnable firstTask;
/** Per-thread task counter */
//完成的任務(wù)數(shù)監(jiān)控用
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
//禁止線程中斷
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
再來看runworker方法
final void runWorker(Worker w) {
//獲取當(dāng)前線程
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts 把state從‐1改為0意思是可以允許中斷
boolean completedAbruptly = true;
try {
//task不為空或者阻塞隊列中拿到了任務(wù)
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//如果當(dāng)前線程池狀態(tài)等于stop就中斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
//這設(shè)置為空等下次循環(huán)就會從隊列里面獲取
task = null;
//完成任務(wù)數(shù)+1
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
獲取任務(wù)的方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);//獲取線程池運行狀態(tài)
// Check if queue empty only if necessary.
//shutdown或者為空那就工作線程‐1同時返回為null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//重新獲取工作線程數(shù)
int wc = workerCountOf(c);
// Are workers subject to culling?
// timed是標志超時銷毀 核心線程池也是可以銷毀的
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
runWorker中的processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

ThreadPoolExecutor內(nèi)部有實現(xiàn)4個拒絕策略:(1)、
- CallerRunsPolicy,由調(diào)用execute方法提交任務(wù)的線程來執(zhí)行這個任務(wù);
- AbortPolicy,拋出異常RejectedExecutionException拒絕提交任務(wù);
- DiscardPolicy,直接拋棄任務(wù),不做任何處理;
- DiscardOldestPolicy,去除任務(wù)隊列中的第一個任務(wù)(最舊的),重新提交
ScheduledThreadPoolExecutor

- schedule:延遲多長時間之后只執(zhí)行一次;
- scheduledAtFixedRate固定:延遲指定時間后執(zhí)行一次,之后按照固定的時長周期執(zhí)行;
- scheduledWithFixedDelay非固定:延遲指定時間后執(zhí)行一次,之后按照:上一次任務(wù)執(zhí)行時長+周期的時長的時間去周期執(zhí)行;
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果線程池不是RUNNING狀態(tài),則使用拒絕策略把提交任務(wù)拒絕掉
if (isShutdown())
reject(task);
else {
//與ThreadPoolExecutor不同,這里直接把任務(wù)加入延遲隊列
super.getQueue().add(task);
//如果當(dāng)前狀態(tài)無法執(zhí)行任務(wù),則取消
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//和ThreadPoolExecutor不一樣,corePoolSize沒有達到會增加Worker;
//增加Worker,確保提交的任務(wù)能夠被執(zhí)行
ensurePrestart();
}
}
add方法里其實是調(diào)用了offer方法
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
//容量擴增50%
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
//插入堆尾
siftUp(i, e);
}
if (queue[0] == e) {
//如果新加入的元素成為了堆頂,則原先的leader就無效了
leader = null;
//由于原先leader已經(jīng)無效被設(shè)置為null了,這里隨便喚醒一個線程(未必是原先的leader)來取走堆頂任務(wù)
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
siftup方法:主要是對隊列進行排序
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
//獲取父節(jié)點
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
//如果key節(jié)點的執(zhí)行時間大于父節(jié)點的執(zhí)行時間,不需要再排序了
if (key.compareTo(e) >= 0)
break;
//如果key.compareTo(e)<0,說明key節(jié)點的執(zhí)行時間小于父節(jié)點的執(zhí)行時間,需要把父節(jié)點移到后面
queue[k] = e;
setIndex(e, k);
//設(shè)置索引為k
k = parent;
}
//key設(shè)置為排序后的位置中
queue[k] = key;
setIndex(key, k);
}
run方法:
public void run() {
//是否周期性,就是判斷period是否為0
boolean periodic = isPeriodic();
//檢查任務(wù)是否可以被執(zhí)行
if (!canRunInCurrentRunState(periodic))
cancel(false);
//如果非周期性任務(wù)直接調(diào)用run運行即可
else if (!periodic)
ScheduledFutureTask.super.run();
//如果成功runAndRest,則設(shè)置下次運行時間并調(diào)用reExecutePeriodic
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
//需要重新將任務(wù)(outerTask)放到工作隊列中。此方法源碼會在后文介紹ScheduledThreadPoolExecutor本身API時提及
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
long p = period;
//fixed‐rate模式,時間設(shè)置為上一次時間+p,這里的時間只是可以被執(zhí)行的最小時間,不代表到點就要執(zhí)行
if (p > 0)
time += p;
else
//fixed‐delay模式,計算下一次任務(wù)可以被執(zhí)行的時間, 差不多就是當(dāng)前時間+delay值
time = triggerTime(-p);
}
long triggerTime(long delay) {
//如果delay<Long.Max_VALUE/2,則下次執(zhí)行時間為當(dāng)前時間+delay,否則為了避免隊列中出現(xiàn)由于溢出導(dǎo)致的排序紊亂,需要調(diào)用overflowFree來修正一下delay
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
/**
* 主要就是有這么一種情況:
* 工作隊列中維護任務(wù)順序是基于compareTo的,在compareTo中比較兩個任務(wù)的順序會用time相減,負數(shù)則說明優(yōu)先級高,那么就有可能出現(xiàn)一個delay為正數(shù),減去另一個為負數(shù)的delay,結(jié)果上溢為負數(shù),則會導(dǎo)致compareTo產(chǎn)生錯誤的結(jié)果.
* 為了特殊處理這種情況,首先判斷一下隊首的delay是不是負數(shù),如果是正數(shù)不用管了,怎么減都不會溢出。
* 否則可以拿當(dāng)前delay減去隊首的delay來比較看,如果不出現(xiàn)上溢,則整個隊列都ok,排序不會亂。
* 不然就把當(dāng)前delay值給調(diào)整為Long.MAX_VALUE+隊首delay
/
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(NANOSECONDS);
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
到此這篇關(guān)于Java并發(fā)線程池實例分析講解的文章就介紹到這了,更多相關(guān)Java并發(fā)線程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
postman中參數(shù)和x-www-form-urlencoded傳值的區(qū)別及說明
在Postman中,參數(shù)傳遞有多種方式,其中params和x-www-form-urlencoded最為常用,Params主要用于URL中傳遞查詢參數(shù),適合GET請求和非敏感數(shù)據(jù),其特點是將參數(shù)作為查詢字符串附加在URL末尾,適用于過濾和排序等操作2024-09-09
基于創(chuàng)建Web項目運行時出錯的解決方法(必看篇)
下面小編就為大家?guī)硪黄趧?chuàng)建Web項目運行時出錯的解決方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-08-08
java線程并發(fā)控制同步工具CountDownLatch
這篇文章主要為大家介紹了java線程并發(fā)控制同步工具CountDownLatch使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-08-08
java使用泛型實現(xiàn)棧結(jié)構(gòu)示例分享
泛型是Java SE5.0的重要特性,使用泛型編程可以使代碼獲得最大的重用。由于在使用泛型時要指明泛型的具體類型,這樣就避免了類型轉(zhuǎn)換。本實例將使用泛型來實現(xiàn)一個棧結(jié)構(gòu),并對其進行測試2014-03-03
Spring Cloud Zuul路由網(wǎng)關(guān)服務(wù)過濾實現(xiàn)代碼
這篇文章主要介紹了Spring Cloud Zuul路由網(wǎng)關(guān)服務(wù)過濾實現(xiàn)代碼,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-04-04

