Java實(shí)現(xiàn)一個(gè)簡(jiǎn)單的線程池代碼示例
一、線程池的模式
線程池顧名思義就是管理線程的一個(gè)池子,我們把創(chuàng)建線程的過(guò)程交給線程池來(lái)處理,而這個(gè)線程池當(dāng)中的線程都會(huì)從阻塞隊(duì)列當(dāng)中取獲取任務(wù)執(zhí)行。
我們不在直接把任務(wù)的創(chuàng)建過(guò)程寫(xiě)到我們初始化的線程對(duì)象中,而是通過(guò)調(diào)用線程池的execute()方法,同時(shí)把我們的具體任務(wù)交作為參數(shù)傳給線程池,之后線程池就會(huì)把任務(wù)添加到阻塞隊(duì)列當(dāng)中,而線程池當(dāng)中的線程會(huì)從阻塞隊(duì)列當(dāng)中獲取任務(wù)并執(zhí)行。
二、線程池的一些參數(shù)
- corePoolSize:線程池核心線程大小,即最小線程數(shù)(初始化線程數(shù))。線程池會(huì)維護(hù)當(dāng)前數(shù)量的線程在線程池中,即使這些線程一直處于閑置狀態(tài),也不會(huì)被銷(xiāo)毀,除非設(shè)置了allowCoreThreadTimeOut。
- maximumPoolSize:線程池最大線程數(shù)量。當(dāng)任務(wù)提交到線程池后,如果當(dāng)前線程數(shù)小于核心線程數(shù),則會(huì)創(chuàng)建新線程來(lái)處理任務(wù);如果當(dāng)前線程數(shù)大于或等于核心線程數(shù),但小于最大線程數(shù),并且任務(wù)隊(duì)列已滿,則會(huì)創(chuàng)建新線程來(lái)處理任務(wù)。
- keepAliveTime:空閑線程的存活時(shí)間。當(dāng)線程池中的線程數(shù)量大于核心線程數(shù)且線程處于空閑狀態(tài)時(shí),在指定時(shí)間后,這個(gè)空閑線程將會(huì)被銷(xiāo)毀,從而逐漸恢復(fù)到穩(wěn)定的核心線程數(shù)數(shù)量。
- unit:keepAliveTime的存活時(shí)間的計(jì)量單位,通常使用TimeUnit枚舉類中的方法,如TimeUnit.SECONDS表示秒級(jí)。
- workQueue:任務(wù)隊(duì)列。用于存放等待執(zhí)行的任務(wù),常見(jiàn)的實(shí)現(xiàn)類有LinkedBlockingQueue、ArrayBlockingQueue等。
- threadFactory:線程工廠。用于創(chuàng)建新的線程,可以自定義線程的名稱、優(yōu)先級(jí)等。
- handler:拒絕策略。當(dāng)任務(wù)無(wú)法執(zhí)行(如線程池已滿)時(shí),可以選擇的策略有:AbortPolicy(拋出異常)、CallerRunsPolicy(調(diào)用者運(yùn)行)、DiscardOldestPolicy(丟棄最老的任務(wù))、DiscardPolicy(無(wú)聲丟棄)。
三、代碼實(shí)現(xiàn)
因?yàn)槲覀冎皇呛?jiǎn)單的實(shí)現(xiàn),所以有一些情況和實(shí)際不太相似。
1.BlockingQueue
先來(lái)看看我們阻塞隊(duì)列當(dāng)中的一些參數(shù),為了在多線程環(huán)境下防止并發(fā)問(wèn)題,我使用了ReentrantLock,使用它的目的是為了創(chuàng)建多個(gè)不同的阻塞條件。
在我們調(diào)用一個(gè)對(duì)象的await()方法后,我們的當(dāng)前線程就會(huì)加入到一個(gè)特定的隊(duì)列當(dāng)中去等待,直到有調(diào)用了這個(gè)對(duì)象的notify()方法后才會(huì)從這個(gè)隊(duì)列中抽取一個(gè)線程喚醒。
舉個(gè)例子,我們?nèi)メt(yī)院的時(shí)候,一個(gè)醫(yī)生同一時(shí)間只能看一個(gè)病人,剩下的人都只能等待,如果只有一個(gè)大廳的話,看不同病的病人都只能等待在一個(gè)候診室中。使用ReentrentLock的意思就是為了創(chuàng)建多個(gè)不同的候診室,將不同醫(yī)生的病人分開(kāi)在不同的候診室當(dāng)中。
//1.阻塞隊(duì)列 private Deque<T> deque = new ArrayDeque<>(); //2.實(shí)現(xiàn)阻塞的鎖 private ReentrantLock lock = new ReentrantLock(); //3. 生產(chǎn)者等待條件 private Condition fullWaitSet = lock.newCondition(); //4.消費(fèi)者等待條件 private Condition emptyWaitSet = lock.newCondition(); //5.阻塞隊(duì)列的大小 private int CAPACITY;
在自定義的阻塞隊(duì)列中,我使用了一個(gè)雙向隊(duì)列來(lái)存儲(chǔ)任務(wù),并且設(shè)置了一個(gè)隊(duì)列大小的屬性,在我們創(chuàng)建這個(gè)隊(duì)列的時(shí)候我們可以進(jìn)行初始化。
先來(lái)看看阻塞隊(duì)列任務(wù)的添加過(guò)程。這個(gè)邏輯并不難,我們?cè)诖a的上方上鎖,在finally中解鎖。如果這時(shí)我們的隊(duì)列是滿的,就無(wú)法在繼續(xù)添加任務(wù)了,這個(gè)時(shí)候我們就把當(dāng)前線程掛起(注意我們的掛起條件)。如果隊(duì)列不是滿的話那我們就加入到隊(duì)尾,同時(shí)把另一類掛起的線程喚醒(這類線程在隊(duì)列為空的時(shí)候掛起,等待任務(wù)的添加)。
// 生產(chǎn)者放入數(shù)據(jù) public void put(T t) { lock.lock(); try { while (deque.size() == CAPACITY) { fullWaitSet.await(); } deque.addLast(t); emptyWaitSet.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
在看看我們?nèi)∪蝿?wù)的過(guò)程。同樣加鎖,當(dāng)我們的隊(duì)列為空的時(shí)候,線程掛起,等待任務(wù)的添加之后線程喚醒,如果隊(duì)列不為空的話,我們從隊(duì)列頭部取出一個(gè)任務(wù),并且喚起一類線程(這類線程在任務(wù)已經(jīng)滿了的時(shí)候無(wú)法在添加任務(wù)了,進(jìn)行掛起,等待隊(duì)列不為滿)。
// 消費(fèi)者從線程池當(dāng)中獲取任務(wù) public T take(){ T t = null; lock.lock(); try { while(deque.size() == 0){ emptyWaitSet.await(); } t = deque.removeFirst(); fullWaitSet.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } return t; }
我們上邊的代碼展示的隊(duì)列的存取的過(guò)程都是死等狀態(tài),什么是死等狀態(tài)?就是任務(wù)添加不進(jìn)去或者取不出來(lái)的時(shí)候,線程會(huì)被一直掛起。真實(shí)并不是如此,這里只是簡(jiǎn)單的展示。
阻塞隊(duì)列需要的就是這兩個(gè)存取的過(guò)程。
2.ThreadPool
先看看線程池當(dāng)中的屬性。把剛才創(chuàng)建的任務(wù)隊(duì)列加進(jìn)去,因?yàn)榫€程池要時(shí)常和任務(wù)隊(duì)列溝通。然后創(chuàng)建了一個(gè)HashSet結(jié)構(gòu)用于存儲(chǔ)我們的線程。下邊的都是我們線程池需要的一些參數(shù)了,拒絕策略在這里沒(méi)有寫(xiě)。
// 任務(wù)隊(duì)列 private BlockedQueue<Runnable> taskQueue; // 線程集合 private HashSet<Worker> workers = new HashSet<>(); //核心線程數(shù) private int coreSize; // 超時(shí)時(shí)間 private int timeout; // 超時(shí)單位 private TimeUnit timeUnit;
來(lái)看看我們的線程池是如何工作的吧,可以看到我們線程池保存的是Worker對(duì)象,我們來(lái)看看這個(gè)Worker對(duì)象是干啥的。這個(gè)Worker對(duì)象實(shí)現(xiàn)了Runnable接口,我們可以把這個(gè)類當(dāng)作線程類,這個(gè)類中有一個(gè)task屬性,因?yàn)槲覀兙€程池當(dāng)中的線程是要獲取任務(wù)執(zhí)行的,這個(gè)任務(wù)就用這個(gè)task屬性代表。
這個(gè)Worker類一直在干一件事情,就是不斷地從我們的任務(wù)隊(duì)列當(dāng)中獲取任務(wù)(Worker類是ThreadPool的內(nèi)部類),如果獲取的任務(wù)不為空的話就執(zhí)行任務(wù),一旦沒(méi)有任務(wù)可以執(zhí)行那么就把當(dāng)前的線程從線程池當(dāng)中移除。
class Worker implements Runnable{ private Runnable task; public Worker(Runnable task){ this.task = task; } @Override public void run() { while(task!=null || (task = taskQueue.take())!=null){ System.out.println("取出的任務(wù)是"+task); try { task.run(); }catch (Exception e){ e.printStackTrace(); }finally { task = null; } synchronized (workers){ workers.remove(this); } } } }
那什么時(shí)候用到這個(gè)Worker類呢?當(dāng)我們調(diào)用ThreadPool中的execute()方法時(shí),線程池中的線程會(huì)就調(diào)用這個(gè)run()方法。
來(lái)看我們的execute()方法。當(dāng)我們的線程數(shù)小于我們的核心線程數(shù)的時(shí)候,我們可以直接創(chuàng)建一個(gè)新的線程,并且把我們的任務(wù)直接交給這個(gè)核心線程。反之我們不能創(chuàng)建,而是把任務(wù)添加到我們的任務(wù)隊(duì)列當(dāng)中,等待核心線程去執(zhí)行這個(gè)任務(wù)。
// 任務(wù)執(zhí)行 public void execute(Runnable task){ synchronized (workers){ if(workers.size() < coreSize){ // 創(chuàng)建核心線程 Worker worker = new Worker(task); workers.add(worker); Thread thread = new Thread(worker); thread.start(); }else { taskQueue.put(task); } } }
寫(xiě)完了上邊的代碼我們測(cè)試一下。
public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(2,10,TimeUnit.MILLISECONDS,10); for(int i = 0;i<12;i++){ int j = i; threadPool.execute(()->{ System.out.println("當(dāng)前線程"+Thread.currentThread().getName()+"task "+j+" is running"); try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } }
方法運(yùn)行了之后,即使任務(wù)全部執(zhí)行,線程也不會(huì)結(jié)束。這是因?yàn)槲覀兊膚orker類中的run方法調(diào)用了任務(wù)隊(duì)列的take()方法,而take方法是會(huì)一直掛起的。
我們現(xiàn)在換一種帶超時(shí)獲取,在規(guī)定時(shí)間內(nèi)獲取不到任務(wù)就自動(dòng)結(jié)束任務(wù)。這時(shí)候就用到我們傳入的時(shí)間參數(shù)了,我們不再調(diào)用await()方法了,而是調(diào)用awaitNanos()方法,方法可以接收一個(gè)時(shí)間參數(shù),這個(gè)方法可以消耗我們的nanos時(shí)間,在這個(gè)時(shí)間內(nèi)如果獲取不到的話線程就不在掛起了,這時(shí)還會(huì)進(jìn)入到我們的while循環(huán)當(dāng)中,判斷我們的nanos是不是被消耗完了,如果被消耗完了就說(shuō)明在規(guī)定時(shí)間內(nèi)獲取不到任務(wù),直接return結(jié)束線程。
// 帶超時(shí)獲取 public T poll(int timeout,TimeUnit timeUnit){ T t = null; lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while(deque.size() == 0){ if(nanos <= 0){ return null; } nanos = emptyWaitSet.awaitNanos(nanos); } t = deque.removeFirst(); fullWaitSet.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } return t; }
修改Worker類。
class Worker implements Runnable{ private Runnable task; public Worker(Runnable task){ this.task = task; } @Override public void run() { while(task!=null || (task = taskQueue.poll(timeout,timeUnit))!=null){ System.out.println("取出的任務(wù)是"+task); try { task.run(); }catch (Exception e){ e.printStackTrace(); }finally { task = null; } synchronized (workers){ workers.remove(this); } } } }
現(xiàn)在就可以正常結(jié)束了。
四、拒絕策略
全部代碼如下。要使用拒絕策略,我們定義一個(gè)函數(shù)式接口,同時(shí)寫(xiě)一個(gè)參數(shù)傳給線程池,參數(shù)的具體內(nèi)容就是拒絕策略的拒絕方法,是我們自己定義的。
同時(shí)我們的execute()方法不在使用put來(lái)添加任務(wù)了,而是使用tryPut,如果大家對(duì)這一塊感興趣的話,可以在bilibili上觀看黑馬程序員的課程學(xué)習(xí)一下。
/** * 自定義線程池 */ public class TestPool { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(2,10,TimeUnit.SECONDS,10,((queue, task) -> {queue.put(task);})); for(int i = 0;i<12;i++){ int j = i; threadPool.execute(()->{ System.out.println("當(dāng)前線程"+Thread.currentThread().getName()+"task "+j+" is running"); try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } } } /** * 拒絕策略 */ @FunctionalInterface interface RejectPolicy<T>{ void reject(BlockedQueue<T> queue,T task); } /** * 阻塞隊(duì)列 */ class BlockedQueue <T>{ //1.阻塞隊(duì)列 private Deque<T> deque = new ArrayDeque<>(); //2.實(shí)現(xiàn)阻塞的鎖 private ReentrantLock lock = new ReentrantLock(); //3. 生產(chǎn)者等待條件 private Condition fullWaitSet = lock.newCondition(); //4.消費(fèi)者等待條件 private Condition emptyWaitSet = lock.newCondition(); //5.阻塞隊(duì)列的大小 private int CAPACITY; public BlockedQueue(int queueCapacity) { this.CAPACITY = queueCapacity; } // 生產(chǎn)者放入數(shù)據(jù) public void put(T t) { lock.lock(); try { while (deque.size() == CAPACITY) { fullWaitSet.await(); } deque.addLast(t); emptyWaitSet.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } // 帶超時(shí)添加 public boolean offer(T t,int timeout,TimeUnit timeUnit) { lock.lock(); long nanos = timeUnit.toNanos(timeout); try { while (deque.size() == CAPACITY) { if(nanos <= 0){ return false; } nanos = fullWaitSet.awaitNanos(nanos); } deque.addLast(t); emptyWaitSet.signal(); return true; } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return true; } // 帶超時(shí)獲取 public T poll(int timeout,TimeUnit timeUnit){ T t = null; lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while(deque.size() == 0){ if(nanos <= 0){ return null; } nanos = emptyWaitSet.awaitNanos(nanos); } t = deque.removeFirst(); fullWaitSet.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } return t; } // 消費(fèi)者從線程池當(dāng)中獲取任務(wù) public T take(){ T t = null; lock.lock(); try { while(deque.size() == 0){ emptyWaitSet.await(); } t = deque.removeFirst(); fullWaitSet.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } return t; } public void tryPut(RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { if(deque.size()==CAPACITY){ rejectPolicy.reject(this,task); }else{ deque.addLast(task); emptyWaitSet.signal(); } } finally { lock.unlock(); } } } /** * 線程池 */ class ThreadPool{ // 任務(wù)隊(duì)列 private BlockedQueue<Runnable> taskQueue; // 線程集合 private HashSet<Worker> workers = new HashSet<>(); //核心線程數(shù) private int coreSize; // 超時(shí)時(shí)間 private int timeout; // 超時(shí)單位 private TimeUnit timeUnit; //拒絕策略 private RejectPolicy<Runnable> rejectPolicy; // 任務(wù)執(zhí)行 public void execute(Runnable task){ synchronized (workers){ if(workers.size() < coreSize){ // 創(chuàng)建核心線程 Worker worker = new Worker(task); workers.add(worker); Thread thread = new Thread(worker); thread.start(); }else { // 任務(wù)隊(duì)列 //taskQueue.offer(task,timeout,timeUnit); taskQueue.tryPut(rejectPolicy,task); //taskQueue.put(task); } } } public ThreadPool(int coreSize, int timeout, TimeUnit timeUnit,int queueCapacity,RejectPolicy<Runnable> rejectPolicy){ this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = new BlockedQueue<>(queueCapacity); this.rejectPolicy = rejectPolicy; } class Worker implements Runnable{ private Runnable task; public Worker(Runnable task){ this.task = task; } @Override public void run() { while(task!=null || (task = taskQueue.poll(timeout,timeUnit))!=null){ System.out.println("取出的任務(wù)是"+task); try { task.run(); }catch (Exception e){ e.printStackTrace(); }finally { task = null; } synchronized (workers){ workers.remove(this); } } } } }
這個(gè)代碼我自己覺(jué)得是有些問(wèn)題,因?yàn)槿绻业娜蝿?wù)隊(duì)列大小有10的時(shí)候,我給出了13個(gè)任務(wù),兩個(gè)交給核心線程不占任務(wù)隊(duì)列大小,另外10個(gè)任務(wù)正好占滿,剩下一個(gè)放不進(jìn)去,這時(shí)就會(huì)卡住不輸出。---------未解決
總結(jié)
到此這篇關(guān)于Java實(shí)現(xiàn)一個(gè)線程池的文章就介紹到這了,更多相關(guān)Java實(shí)現(xiàn)線程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot?log4j2.xml如何讀取application.yml中屬性值
這篇文章主要介紹了springboot?log4j2.xml如何讀取application.yml中屬性值問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12SpringBoot 啟動(dòng)報(bào)錯(cuò)Unable to connect to 
這篇文章主要介紹了SpringBoot 啟動(dòng)報(bào)錯(cuò)Unable to connect to Redis server: 127.0.0.1/127.0.0.1:6379問(wèn)題的解決方案,文中通過(guò)圖文結(jié)合的方式給大家講解的非常詳細(xì),對(duì)大家解決問(wèn)題有一定的幫助,需要的朋友可以參考下2024-10-10Spring 靜態(tài)變量/構(gòu)造函數(shù)注入失敗的解決方案
我們經(jīng)常會(huì)遇到一下問(wèn)題:Spring對(duì)靜態(tài)變量的注入為空、在構(gòu)造函數(shù)中使用Spring容器中的Bean對(duì)象,得到的結(jié)果為空。不要擔(dān)心,本文將為大家介紹如何解決這些問(wèn)題,跟隨小編來(lái)看看吧2021-11-11