Java實(shí)現(xiàn)DelayQueue延遲隊(duì)列示例代碼
JavaDelayQueue延遲隊(duì)列
1.DelayQueue概述
DelayQueue
是 Java 并發(fā)包(java.util.concurrent
)中的一個(gè) 無界 阻塞隊(duì)列,用于存儲(chǔ)實(shí)現(xiàn)了 Delayed
接口的元素。隊(duì)列中的元素只有在達(dá)到指定的延遲時(shí)間后才能被獲取。
2.DelayQueue的底層數(shù)據(jù)結(jié)構(gòu)
DelayQueue
的底層數(shù)據(jù)結(jié)構(gòu)是 優(yōu)先級(jí)隊(duì)列(PriorityQueue),它是一個(gè)小頂堆(最小堆),根據(jù)元素的過期時(shí)間進(jìn)行排序。
- 底層采用 PriorityQueue(基于堆的實(shí)現(xiàn))
- 按照到期時(shí)間升序排列,即最早過期的元素在堆頂
- 元素未過期時(shí),take() 方法會(huì)阻塞
- 支持多線程并發(fā)訪問
3.DelayQueue的實(shí)現(xiàn)原理
元素需實(shí)現(xiàn)
Delayed
接口,重寫getDelay()
方法,返回剩余的延遲時(shí)間。DelayQueue
內(nèi)部維護(hù)一個(gè)PriorityQueue<Delayed>
。插入元素時(shí),按照到期時(shí)間排序,最早到期的元素位于堆頂。
take()
方法獲取堆頂元素:
- 若到期,直接返回該元素。
- 若未到期,線程阻塞,直到該元素可用。
- 使用鎖 + 條件變量(
ReentrantLock
+Condition
)控制并發(fā)訪問。
4.DelayQueue的應(yīng)用場(chǎng)景
DelayQueue
適用于 延遲執(zhí)行、定時(shí)任務(wù)、緩存超時(shí)管理 等場(chǎng)景,包括:
- 任務(wù)調(diào)度(如延遲執(zhí)行任務(wù)、重試機(jī)制)
- 定時(shí)消息隊(duì)列(如 Kafka 里的延時(shí)消息)
- 訂單超時(shí)取消(未支付訂單自動(dòng)取消)
- 緩存自動(dòng)過期(定期清除緩存)
- 連接超時(shí)管理(網(wǎng)絡(luò)連接的超時(shí)處理)
5.DelayQueue的優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
- 高效的時(shí)間管理,自動(dòng)處理過期元素
- 線程安全,內(nèi)部使用
ReentrantLock
保證并發(fā)安全 - 無界隊(duì)列,但受內(nèi)存限制
- 阻塞機(jī)制,減少 CPU 輪詢
缺點(diǎn)
- 不支持元素移除(除非手動(dòng)遍歷
remove()
) - 不能提前獲取未到期元素(
poll()
只返回到期元素) - 無上限(可能導(dǎo)致 OOM)
6.DelayQueue的替代方案
需求 | 替代方案 |
---|---|
需要定時(shí)任務(wù) | ScheduledThreadPoolExecutor |
需要分布式延遲隊(duì)列 | Redis ZSet (基于時(shí)間戳排序) |
高吞吐延遲消息隊(duì)列 | Kafka + 延遲插件 |
低延遲任務(wù)調(diào)度 | TimeWheel (時(shí)間輪算法,如 Netty 的 HashedWheelTimer) |
7.DelayQueue使用示例
(1) 定義延遲元素
import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; class DelayedTask implements Delayed { private final long delayTime; // 延遲時(shí)間 private final long expireTime; // 過期時(shí)間 private final String name; public DelayedTask(String name, long delay, TimeUnit unit) { this.name = name; this.delayTime = TimeUnit.MILLISECONDS.convert(delay, unit); this.expireTime = System.currentTimeMillis() + this.delayTime; } @Override public long getDelay(TimeUnit unit) { return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString() { return "Task{" + "name='" + name + '\'' + ", expireTime=" + expireTime + '}'; } }
(2) 使用DelayQueue
import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class DelayQueueExample { public static void main(String[] args) { DelayQueue<DelayedTask> queue = new DelayQueue<>(); queue.add(new DelayedTask("Task1", 3, TimeUnit.SECONDS)); queue.add(new DelayedTask("Task2", 1, TimeUnit.SECONDS)); queue.add(new DelayedTask("Task3", 5, TimeUnit.SECONDS)); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(() -> { while (!queue.isEmpty()) { try { DelayedTask task = queue.take(); // 阻塞獲取到期任務(wù) System.out.println("Executing: " + task); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); executor.shutdown(); } }
(3) 運(yùn)行結(jié)果
Executing: Task{name='Task2', expireTime=...} // 1s 后執(zhí)行
Executing: Task{name='Task1', expireTime=...} // 3s 后執(zhí)行
Executing: Task{name='Task3', expireTime=...} // 5s 后執(zhí)行
8. 總結(jié)
DelayQueue
是基于優(yōu)先級(jí)隊(duì)列的小頂堆實(shí)現(xiàn)的阻塞隊(duì)列。- 元素需實(shí)現(xiàn)
Delayed
接口,getDelay()
返回剩余時(shí)間。 - 適用于 延遲任務(wù)調(diào)度、緩存過期管理、訂單超時(shí) 等場(chǎng)景。
- 主要優(yōu)點(diǎn)是 線程安全、自動(dòng)延遲控制,缺點(diǎn)是 無法主動(dòng)獲取未到期元素。
- 可用 ScheduledThreadPoolExecutor、Redis ZSet、Kafka 延時(shí)隊(duì)列 作為替代方案。
分布式微服務(wù)架構(gòu)下,能使用DelayQueue嗎?
在 分布式微服務(wù)架構(gòu) 下,不推薦直接使用 DelayQueue,主要原因如下:
1.DelayQueue的局限性
- 單機(jī)限制:
DelayQueue
是 JVM 內(nèi)存隊(duì)列,它運(yùn)行在單個(gè)進(jìn)程,無法在多個(gè)微服務(wù)實(shí)例間共享數(shù)據(jù),不能保證高可用性和擴(kuò)展性。 - 數(shù)據(jù)丟失風(fēng)險(xiǎn):若微服務(wù)實(shí)例崩潰或重啟,
DelayQueue
中的任務(wù)會(huì)丟失,缺乏持久化機(jī)制。 - 無水平擴(kuò)展能力:隨著流量增長(zhǎng),多個(gè)實(shí)例無法共享隊(duì)列,容易成為瓶頸。
2. 適用于DelayQueue的場(chǎng)景
盡管 DelayQueue
不能直接用于分布式架構(gòu),但在單機(jī)任務(wù)調(diào)度、短時(shí)間小規(guī)模的延遲任務(wù)場(chǎng)景下仍然可行,例如:
- 同一個(gè)微服務(wù)實(shí)例內(nèi)的短期任務(wù)(如 1-10 秒級(jí)的延遲任務(wù))
- 不需要高可靠性的本地任務(wù)(如定期緩存清理)
- 沒有跨實(shí)例同步要求的任務(wù)(如本地事件延遲處理)
3. 分布式替代方案
若要在分布式微服務(wù)架構(gòu)中實(shí)現(xiàn)可擴(kuò)展、高可用的延遲任務(wù)調(diào)度,可以采用以下方案:
(1) Redis ZSet(有序集合)+ 定時(shí)輪詢
原理:利用 Redis 的 ZSet(有序集合),按照
score
存儲(chǔ)任務(wù)的執(zhí)行時(shí)間戳,每隔 N 毫秒 輪詢一次取出到期任務(wù)執(zhí)行。優(yōu)勢(shì):
- 支持 分布式部署,多個(gè)實(shí)例可共享數(shù)據(jù)
- 持久化,即使服務(wù)重啟,任務(wù)仍然存在
- 高性能,Redis 讀寫性能優(yōu)越
示例:
jedis.zadd("delayQueue", System.currentTimeMillis() + 5000, "order:123"); // 5s 后執(zhí)行 Set<String> tasks = jedis.zrangeByScore("delayQueue", 0, System.currentTimeMillis()); if (!tasks.isEmpty()) { tasks.forEach(task -> { process(task); // 處理任務(wù) jedis.zrem("delayQueue", task); // 移除已處理任務(wù) }); }
適用場(chǎng)景:
- 訂單超時(shí)處理
- 定時(shí)消息推送
- 低吞吐的延遲任務(wù)(如秒級(jí)延遲)
(2) Kafka + 延遲隊(duì)列插件
- 原理:Kafka 通過
Kafka Streams
或 延遲隊(duì)列插件(如Kafka Delay Message
)支持延遲消費(fèi)消息。 - 適用場(chǎng)景:
- 高吞吐的延遲任務(wù)
- 可靠的分布式消息隊(duì)列
- 缺點(diǎn):
- 依賴 Kafka,適用于 需要消息隊(duì)列的業(yè)務(wù)
(3) RabbitMQ/ActiveMQ TTL + 死信隊(duì)列
原理:RabbitMQ 支持 TTL(Time-To-Live) 設(shè)置,消息超時(shí)后自動(dòng)進(jìn)入 DLX(Dead Letter Exchange, 死信隊(duì)列),可用 消費(fèi)者監(jiān)聽 處理。
適用場(chǎng)景:
- 需要可靠消息隊(duì)列
- 需要高吞吐延遲任務(wù)
示例:
channel.queueDeclare("delayQueue", true, false, false, Map.of("x-message-ttl", 5000)); channel.basicPublish("", "delayQueue", MessageProperties.PERSISTENT_TEXT_PLAIN, "Delayed Message".getBytes());
缺點(diǎn):
- 依賴消息中間件,適用于 消息驅(qū)動(dòng)的系統(tǒng)
(4) 分布式任務(wù)調(diào)度框架
- 常見框架:
- XXL-JOB(輕量級(jí),適用于小規(guī)模定時(shí)任務(wù))
- Elastic-Job(基于 Zookeeper,適用于高并發(fā)調(diào)度)
- Quartz + DB 持久化(適用于復(fù)雜定時(shí)任務(wù))
- 適用場(chǎng)景:
- 定時(shí)任務(wù)執(zhí)行
- 任務(wù)分片調(diào)度
- 可持久化任務(wù)隊(duì)列
4. 結(jié)論
建議:如果是 單機(jī)應(yīng)用,可以使用 DelayQueue
;如果是 分布式微服務(wù)架構(gòu),建議使用 Redis ZSet / Kafka / RabbitMQ / 任務(wù)調(diào)度框架 實(shí)現(xiàn)延遲任務(wù)。
到此這篇關(guān)于Java實(shí)現(xiàn)DelayQueue延遲隊(duì)列的文章就介紹到這了,更多相關(guān)Java DelayQueue延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Spring中使用@within與@target的區(qū)別
這篇文章主要介紹了Spring中使用@within與@target的一些區(qū)別,本文通過項(xiàng)目案例給大家詳細(xì)分析,給大家介紹的非常詳細(xì),代碼簡(jiǎn)單易懂,需要的朋友可以參考下2021-09-09Springboot整合mybatisplus的項(xiàng)目實(shí)戰(zhàn)
本文主要介紹了Springboot整合mybatisplus的項(xiàng)目實(shí)戰(zhàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-06-06spring boot 圖片上傳與顯示功能實(shí)例詳解
這篇文章主要介紹了spring boot 圖片上傳與顯示功能實(shí)例詳解,需要的朋友可以參考下2017-04-04解決創(chuàng)建springboot后啟動(dòng)報(bào)錯(cuò):Failed?to?bind?properties?under‘spri
在Spring?Boot項(xiàng)目中,application.properties和application.yml是用于配置參數(shù)的兩種文件格式,properties格式簡(jiǎn)潔但不支持層次結(jié)構(gòu),而yml格式支持層次性,可讀性更好,在yml文件中,要注意細(xì)節(jié),比如冒號(hào)后面需要空格2024-10-10分享Java8中通過Stream對(duì)列表進(jìn)行去重的實(shí)現(xiàn)
本文主要介紹了分享Java8中通過Stream對(duì)列表進(jìn)行去重的實(shí)現(xiàn),包括兩種方法,具有一定的參考價(jià)值,感興趣的可以了解一下2023-11-11Java?hutool?List集合對(duì)象拷貝示例代碼
這篇文章主要介紹了Java?hutool?List集合對(duì)象拷貝的相關(guān)資料,文章還分享了在實(shí)現(xiàn)過程中遇到的一些問題,并強(qiáng)調(diào)了閱讀源碼和正確配置CopyOptions的重要性,需要的朋友可以參考下2024-12-12