Java延遲隊列DelayQueue原理詳解
什么是DelayQueue(延時隊列)
DelayQueue 是一個通過PriorityBlockingQueue實現(xiàn)延遲獲取元素的無界隊列無界阻塞隊列,其中添加進(jìn)該隊列的元素必須實現(xiàn)Delayed接口(指定延遲時間),而且只有在延遲期滿后才能從中提取元素。
什么是PriorityBlockingQueue(優(yōu)先隊列)
PriorityBlockingQueue是一個支持優(yōu)先級的無界阻塞隊列,隊列的元素默認(rèn)情況下元素采用自然順序升序排列,或者根據(jù)構(gòu)造隊列時提供的 Comparator 進(jìn)行排序,具體取決于所使用的構(gòu)造方法。
需要注意的是不能保證同優(yōu)先級元素的順序。
PriorityBlockingQueue也是基于最小二叉堆實現(xiàn),使用基于CAS實現(xiàn)的自旋鎖來控制隊列的動態(tài)擴(kuò)容,保證了擴(kuò)容操作不會阻塞take操作的執(zhí)行。
DelayQueue使用場景
DelayQueue可以運用在以下應(yīng)用場景:
- 緩存系統(tǒng)的設(shè)計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環(huán)查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
- 定時任務(wù)調(diào)度。使用DelayQueue保存當(dāng)天將會執(zhí)行的任務(wù)和執(zhí)行時間,一旦從DelayQueue中獲取到任務(wù)就開始執(zhí)行,從比如TimerQueue就是使用DelayQueue實現(xiàn)的。
DelayQueue原理
DelayQueue的泛型參數(shù)需要實現(xiàn)Delayed接口,Delayed接口繼承了Comparable接口,DelayQueue內(nèi)部使用非線程安全的優(yōu)先隊列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待時間。DelayQueue不允許包含null元素。
Leader/Followers模式:
有若干個線程(一般組成線程池)用來處理大量的事件
有一個線程作為領(lǐng)導(dǎo)者,等待事件的發(fā)生;其他的線程作為追隨者,僅僅是睡眠。
假如有事件需要處理,領(lǐng)導(dǎo)者會從追隨者中指定一個新的領(lǐng)導(dǎo)者,自己去處理事件。
喚醒的追隨者作為新的領(lǐng)導(dǎo)者等待事件的發(fā)生。
處理事件的線程處理完畢以后,就會成為追隨者的一員,直到被喚醒成為領(lǐng)導(dǎo)者。
假如需要處理的事件太多,而線程數(shù)量不夠(能夠動態(tài)創(chuàng)建線程處理另當(dāng)別論),則有的事件可能會得不到處理。
所有線程會有三種身份中的一種:leader和follower,以及一個干活中的狀態(tài):proccesser。它的基本原則就是,永遠(yuǎn)最多只有一個leader。而所有follower都在等待成為leader。線程池啟動時會自動產(chǎn)生一個Leader負(fù)責(zé)等待網(wǎng)絡(luò)IO事件,當(dāng)有一個事件產(chǎn)生時,Leader線程首先通知一個Follower線程將其提拔為新的Leader,然后自己就去干活了,去處理這個網(wǎng)絡(luò)事件,處理完畢后加入Follower線程等待隊列,等待下次成為Leader。這種方法可以增強CPU高速緩存相似性,及消除動態(tài)內(nèi)存分配和線程間的數(shù)據(jù)交換。
DelayQueue源碼解析
DelayQueue屬性
//可重入同步鎖 private final transient ReentrantLock lock = new ReentrantLock(); //DelayQueue的實現(xiàn)依賴于PriorityQueue(優(yōu)先隊列) private final PriorityQueue<E> q = new PriorityQueue<E>(); //第一個等待某個延時對象的線程,在延時對象還沒有到期時其他線程看到這個leader不為null,那么就直接wait //主要是為了避免大量線程在同一時間點喚醒,導(dǎo)致大量的競爭,反而影響性能 private Thread leader = null; //條件隊列,用于wait線程 private final Condition available = lock.newCondition();
DelayQueue構(gòu)造方法
//從上面屬性就可以看出,DelayQueue采用了餓漢模式,調(diào)用構(gòu)造方法即創(chuàng)建了隊列實例 public DelayQueue() {} /** * 創(chuàng)建一個DelayQueue,最初包含給定的Collection實例集合。 * @param c 最初包含的元素集合 */ public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
DelayQueue主要方法
offer添加元素
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { //調(diào)用優(yōu)先隊列 q.offer(e); //檢驗元素是否為隊首,是則設(shè)置 leader 為null, 并喚醒一個消費線程 if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
take獲取元素
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { //從優(yōu)先隊列中獲取第一個元素,peek方法不會刪除元素 E first = q.peek(); //如果獲取不到數(shù)據(jù),則調(diào)用available.await()進(jìn)入阻塞狀態(tài) if (first == null) available.await(); else { //獲取當(dāng)前延時對象是否到期 long delay = first.getDelay(NANOSECONDS); //到期那么返回這個延時對象 if (delay <= 0) return q.poll(); first = null; // //leader不為空,表明已經(jīng)有其他線程在等待這個延時對象了 //為什么不available.awaitNanos(delay)呢?這將會導(dǎo)致大量的線程在同一時間點被喚醒,然后去競爭 //這個到期的延時任務(wù),影響性能,還不如直接將他們無時間限制的wait,leader線程或者其他新進(jìn)來的線程獲取到延時對象后,去喚醒 //讓他們?nèi)ジ偁幭乱粋€延時對象 if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { //指定納秒級別線程阻塞時間,當(dāng)前wait住的線程被喚醒后有可能與其他線程競爭失敗,就會進(jìn)入了同步隊列阻塞,那個搶到鎖的線程就會取走這個延時對象 available.awaitNanos(delay); } finally { //leader線程被喚醒并獲取到鎖之后會將leader設(shè)置為空 if (leader == thisThread) leader = null; } } } } } finally { //leader為空并且隊列不為空,那么喚醒正在等待的線程 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); //釋放鎖 } }
從優(yōu)先隊列中取值,如果取到的延時節(jié)點已經(jīng)已經(jīng)到期,那么直接返回,如果還沒有到期并且已經(jīng)有其他線程在執(zhí)行delay時間等待了(也就是leader線程),那么掛起自己(避免延時 相同時間造成大量線程同時喚醒), leader線程在指定delay時間后主動喚醒,然后取競爭鎖,如果競爭成功,那么很大概率可以獲取到延時節(jié)點,如果競爭失敗,將被阻塞。
remove刪除元素
public boolean remove(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.remove(o); } finally { lock.unlock(); } }
Delayed接口
使用DelayQueue的話,放入該隊列的對象必須實現(xiàn)Delayed接口,實現(xiàn)的接口中有兩個參數(shù):延遲時間單位,優(yōu)先級規(guī)則,take方法會根據(jù)規(guī)則按照優(yōu)先級執(zhí)行
Delayed接口源碼:
public interface Delayed extends Comparable<Delayed> { /** * 返回與此對象關(guān)聯(lián)的剩余延遲(給定的時間單位)。 * @param unit 時間單位 * @返回剩余延遲;零值或負(fù)值表示 延遲已過期 */ long getDelay(TimeUnit unit); }
因為Delayed繼承了Comparable,所以還需要實現(xiàn)compareTo方法,具體實現(xiàn)如下:
class MyDelay implements Delayed { long delayTime; // 延遲時間 long expire; // 過期時間 public MyDelay(long delayTime, Thread t) { this.delayTime = delayTime; // 過期時間 = 當(dāng)前時間 + 延遲時間 this.expire = System.currentTimeMillis() + delayTime; } /** * 剩余時間 = 到期時間 - 當(dāng)前時間 */ @Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } /** * 優(yōu)先級規(guī)則:兩個任務(wù)比較,時間短的優(yōu)先執(zhí)行 */ @Override public int compareTo(Delayed o) { long f = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); return (int) f; } }
使用示例
實現(xiàn)Delayed接口:
class MyDelay<T> implements Delayed { long delayTime; // 延遲時間 long expire; // 過期時間 T data; public MyDelay(long delayTime, T t) { this.delayTime = delayTime; // 過期時間 = 當(dāng)前時間 + 延遲時間 this.expire = System.currentTimeMillis() + delayTime; data = t; } /** * 剩余時間 = 到期時間 - 當(dāng)前時間 */ @Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } /** * 優(yōu)先級規(guī)則:兩個任務(wù)比較,時間短的優(yōu)先執(zhí)行 */ @Override public int compareTo(Delayed o) { long f = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); return (int) f; } @Override public String toString() { return "delayTime=" + delayTime + ", expire=" + expire + ", data=" + data; } }
測試用例如下:
public class DelayQueueDemo { static BlockingQueue<Delayed> queue = new DelayQueue(); public static void main(String[] args) throws InterruptedException { queue.add(new MyDelay(8, "第一次添加任務(wù)")); queue.add(new MyDelay(3, "第二次添加任務(wù)")); queue.add(new MyDelay(5, "第三次添加任務(wù)")); while (!queue.isEmpty()) { Delayed delayed = queue.take(); System.out.println(delayed); } } }
輸出如下:
delayTime=3, expire=1625902338874, data=第二次添加任務(wù)
delayTime=5, expire=1625902338876, data=第三次添加任務(wù)
delayTime=8, expire=1625902338879, data=第一次添加任務(wù)
總結(jié)
DelayQueue其實采用了裝飾器模式,在對PriorityQueue進(jìn)行包裝下增加了延時時間獲取元素的功能,其主要特點歸納如下:
- DelayQueue是一個無界阻塞隊列,隊列內(nèi)部使用PriorityQueue來實現(xiàn)。
- 進(jìn)入隊列的元素必須實現(xiàn)Delayed接口,在創(chuàng)建元素時可以指定多久才能從隊列中獲取當(dāng)前元素,只有在延遲期滿時才能從中提取元素;
- 該隊列頭部是延遲期滿后保存時間最長的Delayed元素;
- 如果沒有延遲未過期元素,且隊列沒有頭部,并且poll將返回null;
- 當(dāng)一個元素的getDelay(TimeUnit.NANOSECONDS)方法返回一個小于等于0的值時,表示該元素已過期;
- 無法使用poll或take移除未到期的元素,也不會將這些元素作為正常元素對待;例如:size方法返回到期和未到期元素的計數(shù)之和。
- 此隊列不允許使用null元素。
到此這篇關(guān)于Java延遲隊列DelayQueue原理詳解的文章就介紹到這了,更多相關(guān)延遲隊列DelayQueue原理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中ByteBuddy動態(tài)字節(jié)碼操作庫的使用技術(shù)指南
ByteBuddy?是一個功能強大的?Java?字節(jié)碼操作庫,可以幫助開發(fā)者在運行時動態(tài)生成和修改類,而無需直接接觸復(fù)雜的?ASM?API,本文給大家介紹了Java?ByteBuddy動態(tài)字節(jié)碼操作庫的使用技術(shù)指南,需要的朋友可以參考下2025-04-04詳解java中List中set方法和add方法的區(qū)別
本文主要介紹了詳解java中List中set方法和add方法的區(qū)別,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-08-08JDK自帶的序列化方式優(yōu)缺點及實現(xiàn)原理面試精講
這篇文章主要為大家介紹了JDK自帶的序列化方式優(yōu)缺點及實現(xiàn)原理面試精講,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10Springboot升級到2.7.2結(jié)合nacos遇到的坑及解決
這篇文章主要介紹了Springboot升級到2.7.2結(jié)合nacos遇到的坑及解決,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-06-06java static塊和構(gòu)造函數(shù)的實例詳解
這篇文章主要介紹了java static塊和構(gòu)造函數(shù)的實例詳解的相關(guān)資料,希望通過本文能幫助到大家,讓大家理解掌握J(rèn)ava static關(guān)鍵字的函數(shù)方法,需要的朋友可以參考下2017-09-09SpringBoot前后端json數(shù)據(jù)交互的全過程記錄
現(xiàn)在大多數(shù)互聯(lián)網(wǎng)項目都是采用前后端分離的方式開發(fā),下面這篇文章主要給大家介紹了關(guān)于SpringBoot前后端json數(shù)據(jù)交互的相關(guān)資料,文中通過實例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-03-03