Java中的延遲隊列DelayQueue詳細解析
前言
JDK自身支持延遲隊列的數(shù)據(jù)結(jié)構(gòu),其實類:java.util.concurrent.DelayQueue。
我們通過閱讀源碼的方式理解該延遲隊列類的實現(xiàn)過程。
1.定義
DelayQueue:是一種支持延時獲取元素的無界阻塞隊列。
特性:
- 線程安全(多生產(chǎn)者,多消費者)(單機,如果想實現(xiàn)分布式,可以結(jié)合redis 消息分發(fā),如果需要較高數(shù)據(jù)可靠性可以考慮結(jié)合消息中間件等);
- 內(nèi)部元素有“延遲”特性:只有延遲到期的元素才允許被獲取;
- 具有優(yōu)先級特性的無界隊列,優(yōu)先級以元素延遲時間為標(biāo)準(zhǔn),最先過期的元素優(yōu)先級最高(隊首);
- 入隊操作不會被阻塞,獲取元素在特定情況會阻塞(隊列為空,隊首元素延遲未到期等);
根據(jù)其源碼分析為何如此定義以及其特性的由來。
DelayQueue繼承關(guān)系:
類圖分析:
其核心繼承/實現(xiàn):
1.BlockingQueue:說明其具有阻塞隊列的特性;
2.元素必實現(xiàn)接口Delayed,而Delayed繼承了接口Comparable。因此所有元素必須實現(xiàn)兩個方法:
compareTo方法用于元素比較; getDelay方法用于獲取元素剩余延時時間。
public interface Delayed extends Comparable<Delayed> { /** * 返回關(guān)聯(lián)對象的剩余延遲時間(可指定時間單位) */ long getDelay(TimeUnit unit); }
2.源碼
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { /** * 可重入鎖,用于保證線程安全 */ private final transient ReentrantLock lock = new ReentrantLock(); /** * 優(yōu)先隊列(容器),實際存儲元素的地方 */ private final PriorityQueue<E> q = new PriorityQueue<E>(); /** * 等待取元素線程的領(lǐng)導(dǎo)(leader)線程,有且僅有一個leader。 * 具有最高優(yōu)先級,第一個嘗試獲取元素的線程。 * leader取完元素后,會喚醒新的等待線程成為新的leader。 */ private Thread leader = null; /** * 觸發(fā)條件,表示是否可以從隊列中讀取元素. * 用于等待(await())/通知(signal())其他線程 */ private final Condition available = lock.newCondition(); /** * 構(gòu)造函數(shù) */ public DelayQueue() { } /** * 構(gòu)造函數(shù): 調(diào)用addAll()方法:將集合c 存入隊列中 * */ public DelayQueue(Collection<? extends E> c) { this.addAll(c); } /*--------------------------添加元素(非阻塞)-------------------------------*/ /** * 插入新元素. * 核心內(nèi)容見:public boolean offer(E e) */ public boolean add(E e) { return offer(e); } /** * 插入新元素. * 核心內(nèi)容見:public boolean offer(E e) */ public void put(E e) { offer(e); } /** * 插入新元素. * 核心內(nèi)容見:public boolean offer(E e) * @param e 元素 * @param timeout 此參數(shù)將被忽略,因為該方法從不阻塞(廢棄) * @param unit 此參數(shù)將被忽略,因為該方法從不阻塞(廢棄) * @return {@code true} * @throws NullPointerException {@inheritDoc} */ public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); } /** * 插入新元素.(線程安全 lock) * 邏輯: * 1.入隊; * 2.如果入隊元素為隊首元素(原隊列為空),喚醒一個等待的線程,通知獲取數(shù)據(jù)。 * * @param e 元素 * @return {@code true} * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { // 入隊 q.offer(e); // 若該元素為隊列頭部元素(說明原隊列為空),可以喚醒等待的線程取元素數(shù)據(jù) if (q.peek() == e) { // 如果隊首元素是剛插入的元素,則設(shè)置leader為null(騰位置) leader = null; // 喚醒一個等待的線程 available.signal(); } return true; } finally { lock.unlock(); } } /*--------------------------取出(返回并刪除)元素-------------------------------*/ /** * 取出延遲到期元素(非阻塞的).(線程安全 lock) * poll() 方法是非阻塞的,即調(diào)用之后無論元素是否存在/延遲到期都會立即返回。 * 邏輯: * 1.查詢隊首元素; * 2.元素延遲到期返回,否則返回null */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { // 查詢隊首元素 E first = q.peek(); // 隊首元素為空或者延時未到期 返回null if (first == null || first.getDelay(NANOSECONDS) > 0) { return null; } else { // 如果到期,取出并刪除隊首元素 return q.poll(); } } finally { lock.unlock(); } } /** * 取出延遲到期元素(帶有超時時間,阻塞).(線程安全 lock) * 如果隊首元素未到期或者為null,等待:直到隊首元素延遲到期或者超出指定等待時間(timeout) * 邏輯(無限循環(huán)等待獲?。? * 宗旨:在不超出timeout的時間內(nèi),循環(huán)去取出延遲到期的隊首元素(前提無其他線程正在取數(shù)--互斥). * 1.查詢隊首元素; * 2.1.隊列空:等待timeout一段時間,直到等待超時(即timeout被重置小于等于0); * 2.2.隊列不為空: * 2.2.1. 隊首元素延遲到期,取出隊首元素(poll()); * 2.2.2. 隊首元素延遲未到期: * 2.2.3 等待超時 ,返回null; * 2.2.4 等待未超時,等待時間<延遲時間或者有其他線程正在取數(shù)據(jù),繼續(xù)等待到超時到期 * 2.2.5 等待為超時,等待時間>=延遲時間并且無其他線程正在取數(shù)據(jù),該線程設(shè)置為leader等待到延遲到期(最后清空leader) * 3. 循環(huán)后,如果leader=null(無正在取數(shù)線程)并且隊列還有數(shù)據(jù),喚醒一個等待線程最終成為leader. */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; // 以可中斷方式獲取鎖 lock.lockInterruptibly(); try { for (; ; ) { // 獲取隊首元素 E first = q.peek(); if (first == null) { // 若隊首元素為空(即隊列為空,這時就需要關(guān)注,當(dāng)前取值請求是否需要阻塞等待 // 等待時間小于等于0 ,不阻塞等待,直接返回null) if (nanos <= 0) { return null; } else { // 等待相應(yīng)的時間 nanos = available.awaitNanos(nanos); } } else { // 若隊列元素非空,獲取隊首元素剩余延遲時間 long delay = first.getDelay(NANOSECONDS); // 延時過期 返回元素 if (delay <= 0) { return q.poll(); } // 延時未過期 等待時間超時 ,不等待,直接返回null if (nanos <= 0) { return null; } first = null; // 延時和等待都未到期且等待時間<延遲時間 或者 有其他線程在取數(shù)據(jù),當(dāng)前請求繼續(xù)等待 if (nanos < delay || leader != null) { nanos = available.awaitNanos(nanos); } else { // 沒有其他線程等待,將當(dāng)前線程設(shè)置為 leader,類似于“獨占”操作 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 等待直到延遲到期 long timeLeft = available.awaitNanos(delay); // 計算超時時間 nanos -= delay - timeLeft; } finally { // 該線程操作完畢,把 leader 置空 if (leader == thisThread) { leader = null; } } } } } } finally { // 如果leader線程為空 并且 queue非空,則喚醒其他等待線程 if (leader == null && q.peek() != null) { available.signal(); } lock.unlock(); } } /** * 取出延遲到期元素(無超時時間限制,阻塞).(線程安全 lock) * 邏輯(無限循環(huán)等待獲?。? * 其邏輯參考poll(long timeout, TimeUnit unit). * 其區(qū)別在于:不受超時時間限制(timeout) */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 以可中斷方式獲取鎖 lock.lockInterruptibly(); try { // 無限循環(huán) for (; ; ) { // 獲取隊首元素 E first = q.peek(); if (first == null) { // 若隊首元素為空(隊列為空),則等待 available.await(); } else { // 若隊列元素非空,獲取隊首元素剩余延遲時間 long delay = first.getDelay(NANOSECONDS); // 延遲到期,獲取隊首元素 if (delay <= 0) { return q.poll(); } // 延時未過期 first = null; // leader 不為空表示有其他線程在讀取數(shù)據(jù),當(dāng)前線程等待 if (leader != null) { available.await(); } else { // 沒有其他線程等待,將當(dāng)前線程設(shè)置為 leader Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 等待延遲時間過期 available.awaitNanos(delay); } finally { if (leader == thisThread) { leader = null; } } } } } } finally { // 如果leader線程為空 并且 queue非空,則喚醒其他等待線程 if (leader == null && q.peek() != null) { available.signal(); } lock.unlock(); } } /*--------------------------讀取隊首元素-------------------------------*/ /** * 讀取隊首元素.(線程安全 lock) */ public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.peek(); } finally { lock.unlock(); } } /*--------------------------讀取隊列長度-------------------------------*/ /** * 獲取隊列數(shù)據(jù)的長度.(線程安全 lock) */ public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.size(); } finally { lock.unlock(); } } /*--------------------------獲取延遲到期元素集合-------------------------------*/ /** * 將隊列中延遲到期數(shù)據(jù) 收集到集合C中.(線程安全 lock) * * @return 返回延遲到期元素數(shù)量 */ public int drainTo(Collection<? super E> c) { if (c == null) { throw new NullPointerException(); } if (c == this) { throw new IllegalArgumentException(); } final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; // peekExpired() 判斷隊首元素是否延遲到期 for (E e; (e = peekExpired()) != null; ) { c.add(e); q.poll(); ++n; } return n; } finally { lock.unlock(); } } /** * 將隊列中延遲到期數(shù)據(jù) 收集到集合C中(C集合總數(shù)有限制小于maxElements).(線程安全 lock) * @return 返回延遲到期元素數(shù)量 */ public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) { throw new NullPointerException(); } if (c == this) { throw new IllegalArgumentException(); } if (maxElements <= 0) { return 0; } final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; // peekExpired() 判斷隊首元素是否延遲到期。并且到期元素總數(shù)不允許超過maxElements for (E e; n < maxElements && (e = peekExpired()) != null; ) { c.add(e); q.poll(); ++n; } return n; } finally { lock.unlock(); } } /** * 讀取隊首元素(已延遲到期).(私有方法) */ private E peekExpired() { // 獲取隊首元素 E first = q.peek(); // 隊首元素存在并且延遲到期,否則返回null return (first == null || first.getDelay(NANOSECONDS) > 0) ? null : first; } /*--------------------------刪除元素-------------------------------*/ /** * 清除隊列中所有元素(線程安全 lock)--暴力清除 */ public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { q.clear(); } finally { lock.unlock(); } } /** * 刪除指定元素O.(線程安全 lock) */ public boolean remove(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.remove(o); } finally { lock.unlock(); } } /** * 刪除指定元素O.(這里指的是相同的對象引用/內(nèi)存地址)(線程安全 lock) */ void removeEQ(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { for (Iterator<E> it = q.iterator(); it.hasNext(); ) { // 使用了對象引用/內(nèi)存地址相等比較 if (o == it.next()) { it.remove(); break; } } } finally { lock.unlock(); } } /*--------------------------隊列轉(zhuǎn)數(shù)組-------------------------------*/ /** * 將隊列元素都復(fù)制到數(shù)組中(無序).(線程安全 lock) */ public Object[] toArray() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.toArray(); } finally { lock.unlock(); } } /** * 將隊列元素都復(fù)制到數(shù)組a中(無序). */ public <T> T[] toArray(T[] a) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.toArray(a); } finally { lock.unlock(); } } /*--------------------------私有內(nèi)部類--迭代器-------------------------------*/ /** * 返回此隊列中所有元素(已過期和未過期)的迭代器。迭代器不按任何特定順序返回元素。 */ public Iterator<E> iterator() { return new Itr(toArray()); } /** * 快照迭代器,用于處理底層 隊列/數(shù)組的副本。 */ private class Itr implements Iterator<E> { final Object[] array; // Array of all elements int cursor; // index of next element to return int lastRet; // index of last element, or -1 if no such Itr(Object[] array) { lastRet = -1; this.array = array; } public boolean hasNext() { return cursor < array.length; } @SuppressWarnings("unchecked") public E next() { if (cursor >= array.length) throw new NoSuchElementException(); lastRet = cursor; return (E) array[cursor++]; } public void remove() { if (lastRet < 0) throw new IllegalStateException(); removeEQ(array[lastRet]); lastRet = -1; } } }
3.使用demo
使用DelayQueue實現(xiàn)延遲隊列:
優(yōu)點:實現(xiàn)簡單。
缺點:可擴展性較差,內(nèi)存限制、無持久化機制等。
@SneakyThrows public static void main(String[] args) { DelayQueue<TestTask> testTaskDelayQueue = new DelayQueue<>(); long time = System.currentTimeMillis(); testTaskDelayQueue.offer(TestTask.builder().name("test_1").endTime(time + 10 * 1000).build()); testTaskDelayQueue.offer(TestTask.builder().name("test_2").endTime(time + 4 * 1000).build()); testTaskDelayQueue.offer(TestTask.builder().name("test_3").endTime(time + 16 * 1000).build()); for(;;){ System.out.println(testTaskDelayQueue.take()); TimeUnit.SECONDS.sleep(2); } } @Data @AllArgsConstructor @NoArgsConstructor @Builder private static class TestTask implements Delayed { private String name; private Long endTime; @Override public long getDelay(TimeUnit unit) { return unit.convert(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); } }
到此這篇關(guān)于Java中的延遲隊列DelayQueue詳細解析的文章就介紹到這了,更多相關(guān)延遲隊列DelayQueue內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Spring Boot Admin監(jiān)控服務(wù)上下線郵件通知
本篇文章主要介紹了詳解Spring Boot Admin監(jiān)控服務(wù)上下線郵件通知,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-12-12Java判斷IP地址為內(nèi)網(wǎng)IP還是公網(wǎng)IP的方法
這篇文章主要介紹了Java判斷IP地址為內(nèi)網(wǎng)IP還是公網(wǎng)IP的方法,針對tcp/ip協(xié)議中保留的三個私有地址進行判斷分析,是比較實用的技巧,需要的朋友可以參考下2015-01-01SpringBoot中過濾器Filter+JWT令牌實現(xiàn)登錄驗證
本文主要介紹了SpringBoot中過濾器Filter+JWT令牌實現(xiàn)登錄驗證,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-04-04學(xué)習(xí)java編程后可以走哪些職業(yè)道路
在本篇文章里給大家介紹了關(guān)于學(xué)習(xí)java后的職業(yè)道路,以及需要學(xué)習(xí)的相關(guān)知識內(nèi)容,有興趣的朋友們可以跟著學(xué)習(xí)下。2022-11-11基于Java實現(xiàn)多線程下載并允許斷點續(xù)傳
這篇文章主要介紹了基于Java實現(xiàn)多線程下載并允許斷點續(xù)傳,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-03-03Java調(diào)用第三方http接口的四種方式總結(jié)
這篇文章主要給大家介紹了關(guān)于Java調(diào)用第三方http接口的四種方式,在實際開發(fā)中我們經(jīng)常會與第三方公司進行合作,接入第三方接口,文中給出了詳細的代碼實例,需要的朋友可以參考下2023-08-08Mac使用Idea配置傳統(tǒng)SSM項目(非maven項目)
本文主要介紹了Mac使用Idea配置傳統(tǒng)SSM項目(非maven項目),將展示如何設(shè)置項目結(jié)構(gòu)、添加依賴關(guān)系等,具有一定的參考價值,感興趣的可以了解一下2024-01-01