Java中的延遲隊(duì)列DelayQueue源碼解析
一、什么是DelayQueue
DelayQueue是一個(gè)支持并發(fā)的無(wú)界延遲隊(duì)列,隊(duì)列中的每個(gè)元素都有個(gè)預(yù)定時(shí)間,當(dāng)線程從隊(duì)列獲取元素時(shí),只有到期元素才會(huì)出隊(duì)列,沒(méi)有到期元素則阻塞等待。
隊(duì)列頭元素是最快要到期的元素。因此DelayQueue可用于實(shí)現(xiàn)定時(shí)任務(wù)隊(duì)列。
DelayQueue中的主要成員變量和方法如下:
q:使用優(yōu)先隊(duì)列PriorityQueue存儲(chǔ)數(shù)據(jù),隊(duì)列中的元素需實(shí)現(xiàn)Delayed接口,實(shí)現(xiàn)getDelay()和compareTo()方法,以實(shí)現(xiàn)優(yōu)先隊(duì)列內(nèi)部的優(yōu)先級(jí)比較,剩余到期時(shí)間越短的元素優(yōu)先級(jí)越高
public interface Delayed extends Comparable<Delayed> { //獲取元素剩余到期時(shí)間 long getDelay(TimeUnit unit); }
lock:使用ReentrantLock對(duì)插入和讀取隊(duì)列元素的方法進(jìn)行加鎖,以實(shí)現(xiàn)多線程并發(fā)讀寫(xiě)隊(duì)列操作的同步。
available:用一個(gè)條件等待隊(duì)列存放等待獲取到期元素的線程。
leader:用于表示當(dāng)前正在等待獲取隊(duì)頭元素的線程,這里使用了一個(gè)Leader-Follower模式的變體,線程獲取完元素后從等待隊(duì)列中選擇一個(gè)線程成為leader繼續(xù)等待獲取隊(duì)頭元素,以避免不必要的競(jìng)爭(zhēng)消耗。
Leader-Follower模式 在并發(fā)IO中,當(dāng)一個(gè)線程收到IO事件后,會(huì)考慮啟動(dòng)一個(gè)新的線程去處理,而自己繼續(xù)等待下一個(gè)請(qǐng)求。但這里可能會(huì)有性能問(wèn)題,就是把工作交給別一個(gè)線程的時(shí)候需上下文切換,包括數(shù)據(jù)拷貝。 而在Leader-Follower模式中所有線程會(huì)有三種身份中的一種:leader和follower,以及一個(gè)干活中的狀態(tài):proccesser。它的基本原則就是,永遠(yuǎn)最多只有一個(gè)leader。而所有follower都在等待成為leader。線程池啟動(dòng)時(shí)會(huì)自動(dòng)產(chǎn)生一個(gè)Leader負(fù)責(zé)等待事件,當(dāng)有一個(gè)事件產(chǎn)生時(shí),Leader線程首先通知一個(gè)Follower線程將其提拔為新的Leader,然后自己去處理這個(gè)事件,處理完畢后加入Follower線程等待隊(duì)列,等待下次成為L(zhǎng)eader。這種方法可以增強(qiáng)CPU高速緩存相似性,及消除動(dòng)態(tài)內(nèi)存分配和線程間的數(shù)據(jù)交換。
二、主要方法源碼解析
1. offer()
插入元素到隊(duì)列。首先獲取鎖,拿到鎖后向優(yōu)先隊(duì)列中插入元素,若插入完畢后發(fā)現(xiàn)隊(duì)頭元素就是自己,即最近到期時(shí)間的元素就是自己,刷新了記錄,那就趕緊從等待隊(duì)列中通知一個(gè)線程準(zhǔn)備來(lái)獲取這個(gè)元素,然后釋放鎖。
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
這里為什么要將leader先置為null?
因?yàn)槿绻藭r(shí)leader線程在超時(shí)等待獲取前任隊(duì)頭元素,而signal通知了另一個(gè)線程,看完take()的源碼可以知道如果有l(wèi)eader線程,那么此線程會(huì)直接阻塞等待,讓leader線程超時(shí)完后獲取隊(duì)頭,那顯然時(shí)間就不正確了,只有將leader設(shè)為null,后續(xù)線程才能成為leader并設(shè)置正確的超時(shí)時(shí)間來(lái)等待獲取最新隊(duì)頭元素
因此,leader變量的真正含義是:超時(shí)等待獲取隊(duì)列最新隊(duì)頭元素的線程,等待的時(shí)間即為最新隊(duì)頭元素剩余到期時(shí)間 因此,當(dāng)隊(duì)頭元素發(fā)生變動(dòng)(插入/刪除更新)時(shí),就需要喚醒一個(gè)線程更新leader
2. take()
獲取優(yōu)先隊(duì)列隊(duì)頭元素。首先獲取鎖,拿到鎖后進(jìn)入一個(gè)循環(huán),首先檢測(cè)隊(duì)頭元素,若為空則進(jìn)入等待隊(duì)列阻塞等待,若不為空且隊(duì)頭元素已到期則直接將其出隊(duì)返回,如果還沒(méi)到期就看有沒(méi)有l(wèi)eader線程已經(jīng)在準(zhǔn)備獲取隊(duì)頭元素了,如果有就不用搶了,進(jìn)入等待隊(duì)列阻塞等待,如果沒(méi)有就超時(shí)等待準(zhǔn)備獲取隊(duì)頭元素,被喚醒后進(jìn)入下一次循環(huán)獲取隊(duì)頭元素。獲取完畢后就從等待隊(duì)列中通知一個(gè)線程到同步隊(duì)列準(zhǔn)備獲取隊(duì)頭元素然后釋放鎖。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { //從優(yōu)先堆中獲取堆頂元素,即優(yōu)先級(jí)最高,即預(yù)定時(shí)間最近的元素 E first = q.peek(); if (first == null) //若隊(duì)列中無(wú)元素則直接進(jìn)入條件隊(duì)列等待 available.await(); else { long delay = first.getDelay(NANOSECONDS); //若堆頂元素已經(jīng)到期,則直接將其出隊(duì)返回 if (delay <= 0) return q.poll(); //等待期間不持有元素引用,防止該元素被其他線程出隊(duì)消費(fèi)后,仍不能被垃圾回收 first = null; // don't retain ref while waiting if (leader != null) //若已經(jīng)有l(wèi)eader了,則進(jìn)入條件隊(duì)列無(wú)限期等待 available.await(); else { //否則成為leader進(jìn)入條件隊(duì)列超時(shí)等待,到預(yù)期時(shí)間或者有更近時(shí)間元素插入就到同步隊(duì)列競(jìng)爭(zhēng)鎖,再重復(fù)循環(huán)去取堆頂元素 Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { //取完元素后若leader為null且隊(duì)列中還有元素則從條件等待隊(duì)列通知一個(gè)線程到同步隊(duì)列 //為什么存在leader不為null的情況:leader線程從awaitNanos()中結(jié)束后沒(méi)有競(jìng)爭(zhēng)過(guò)新進(jìn)take()的線程,因此繼續(xù)在同步隊(duì)列中被阻塞,因此無(wú)需再?gòu)臈l件等待隊(duì)列中通知線程,直接讓leader線程再去競(jìng)爭(zhēng)鎖, if (leader == null && q.peek() != null) available.signal(); lock.unlock();//釋放鎖資源讓同步隊(duì)列中的線程競(jìng)爭(zhēng)鎖 } }
Leader-Follower模式在這里的作用在于,在隊(duì)頭元素還沒(méi)到期的情況下,只需要有一個(gè)線程(leader)超時(shí)等待,其余線程進(jìn)來(lái)后發(fā)現(xiàn)已經(jīng)有l(wèi)eader了,就直接無(wú)限等待就行了,避免了無(wú)意義的超時(shí)等待和競(jìng)爭(zhēng)消耗。
3. poll()
加鎖獲取并移除隊(duì)頭過(guò)期元素,如果沒(méi)有過(guò)期元素則不等待直接返回 null。
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } }
4. size()
加鎖獲取隊(duì)列當(dāng)前剩余元素個(gè)數(shù)
public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.size(); } finally { lock.unlock(); } }
三、使用案例
如下使用案例,首先向DelayQueue插入5個(gè)定時(shí)任務(wù),然后用3個(gè)線程并發(fā)讀取
public class DelayQueueTest { //隊(duì)列元素類 static class DelayTask implements Delayed { long exeTime;//預(yù)定執(zhí)行時(shí)間 public DelayTask(long exeTime) { this.exeTime = exeTime; } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.exeTime - System.currentTimeMillis(), unit); } @Override public int compareTo(Delayed o) { long delta = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); return (int) delta; } } public static void main(String[] args) { DelayQueue<DelayTask> delayQueue = new DelayQueue<>(); for (int i = 1;i <= 5;i++) { delayQueue.offer(new DelayTask(System.currentTimeMillis() + new Random().nextInt(10)*1000)); } for (int i = 1;i <= 3;i++) { new Thread(() -> { try { while (true) { DelayTask task = delayQueue.take(); System.out.printf("取出任務(wù)!取出時(shí)間:%s 任務(wù)預(yù)定執(zhí)行時(shí)間:%s%n", hms(System.currentTimeMillis()), hms(task.exeTime)); } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } public static String hms(long milliseconds) { return new SimpleDateFormat("HH:mm:ss").format(milliseconds); } }
運(yùn)行結(jié)果:
取出任務(wù)!取出時(shí)間:10:27:39 任務(wù)預(yù)定執(zhí)行時(shí)間:10:27:39
取出任務(wù)!取出時(shí)間:10:27:39 任務(wù)預(yù)定執(zhí)行時(shí)間:10:27:39
取出任務(wù)!取出時(shí)間:10:27:40 任務(wù)預(yù)定執(zhí)行時(shí)間:10:27:40
取出任務(wù)!取出時(shí)間:10:27:42 任務(wù)預(yù)定執(zhí)行時(shí)間:10:27:42
取出任務(wù)!取出時(shí)間:10:27:46 任務(wù)預(yù)定執(zhí)行時(shí)間:10:27:46
到此這篇關(guān)于Java中的延遲隊(duì)列DelayQueue源碼解析的文章就介紹到這了,更多相關(guān)Java延遲隊(duì)列DelayQueue內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
學(xué)習(xí)Java之如何對(duì)時(shí)間進(jìn)行格式化
當(dāng)我們?cè)谀J(rèn)情況下構(gòu)造出來(lái)的時(shí)間對(duì)象,它的時(shí)間格式并不適合我們閱讀,并且在開(kāi)發(fā)時(shí),pc端、Android端、iOS端等展示的時(shí)間格式可能也并不完全一樣,本文就從這幾個(gè)問(wèn)題給大家介紹如何對(duì)時(shí)間進(jìn)行格式化,感興趣的同學(xué)可以借鑒一下2023-05-05解決IDEA springboot"spring-boot-maven-plugin"報(bào)紅問(wèn)題
這篇文章主要介紹了解決IDEA springboot"spring-boot-maven-plugin"報(bào)紅問(wèn)題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04

使用@Value 注入 List 類型的配置屬性需要注意的 BUG

從Hello?World開(kāi)始理解GraphQL背后處理及執(zhí)行過(guò)程