Redis中4種延時隊列實現(xiàn)方式小結(jié)
延時隊列是一種特殊的消息隊列,它允許消息在指定的時間后被消費。在微服務(wù)架構(gòu)、電商系統(tǒng)和任務(wù)調(diào)度場景中,延時隊列扮演著關(guān)鍵角色。例如,訂單超時自動取消、定時提醒、延時支付等都依賴延時隊列實現(xiàn)。
Redis作為高性能的內(nèi)存數(shù)據(jù)庫,具備原子操作、數(shù)據(jù)結(jié)構(gòu)豐富和簡單易用的特性,本文將介紹基于Redis實現(xiàn)分布式延時隊列的四種方式。
1. 基于Sorted Set的延時隊列
原理
利用Redis的Sorted Set(有序集合),將消息ID作為member,執(zhí)行時間戳作為score進行存儲。通過ZRANGEBYSCORE
命令可以獲取到達執(zhí)行時間的任務(wù)。
代碼實現(xiàn)
public class RedisZSetDelayQueue { private final StringRedisTemplate redisTemplate; private final String queueKey = "delay_queue:tasks"; public RedisZSetDelayQueue(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } /** * 添加延時任務(wù) * @param taskId 任務(wù)ID * @param taskInfo 任務(wù)信息(JSON字符串) * @param delayTime 延遲時間(秒) */ public void addTask(String taskId, String taskInfo, long delayTime) { // 計算執(zhí)行時間 long executeTime = System.currentTimeMillis() + delayTime * 1000; // 存儲任務(wù)詳情 redisTemplate.opsForHash().put("delay_queue:details", taskId, taskInfo); // 添加到延時隊列 redisTemplate.opsForZSet().add(queueKey, taskId, executeTime); System.out.println("Task added: " + taskId + ", will execute at: " + executeTime); } /** * 輪詢獲取到期任務(wù) */ public List<String> pollTasks() { long now = System.currentTimeMillis(); // 獲取當(dāng)前時間之前的任務(wù) Set<String> taskIds = redisTemplate.opsForZSet() .rangeByScore(queueKey, 0, now); if (taskIds == null || taskIds.isEmpty()) { return Collections.emptyList(); } // 獲取任務(wù)詳情 List<String> tasks = new ArrayList<>(); for (String taskId : taskIds) { String taskInfo = (String) redisTemplate.opsForHash() .get("delay_queue:details", taskId); if (taskInfo != null) { tasks.add(taskInfo); // 從集合和詳情中移除任務(wù) redisTemplate.opsForZSet().remove(queueKey, taskId); redisTemplate.opsForHash().delete("delay_queue:details", taskId); } } return tasks; } // 定時任務(wù)示例 public void startTaskProcessor() { ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(() -> { try { List<String> tasks = pollTasks(); for (String task : tasks) { processTask(task); } } catch (Exception e) { e.printStackTrace(); } }, 0, 1, TimeUnit.SECONDS); } private void processTask(String taskInfo) { System.out.println("Processing task: " + taskInfo); // 實際任務(wù)處理邏輯 } }
優(yōu)缺點
優(yōu)點
- 實現(xiàn)簡單,易于理解
- 任務(wù)按執(zhí)行時間自動排序
- 支持精確的時間控制
缺點
- 需要輪詢獲取到期任務(wù),消耗CPU資源
- 大量任務(wù)情況下,
ZRANGEBYSCORE
操作可能影響性能 - 沒有消費確認機制,需要額外實現(xiàn)
2. 基于List + 定時輪詢的延時隊列
原理
這種方式使用多個List作為存儲容器,按延遲時間的不同將任務(wù)分配到不同的隊列中。通過定時輪詢各個隊列,將到期任務(wù)移動到一個立即執(zhí)行隊列。
代碼實現(xiàn)
public class RedisListDelayQueue { private final StringRedisTemplate redisTemplate; private final String readyQueueKey = "delay_queue:ready"; // 待處理隊列 private final Map<Integer, String> delayQueueKeys; // 延遲隊列,按延時時間分級 public RedisListDelayQueue(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; // 初始化不同延遲級別的隊列 delayQueueKeys = new HashMap<>(); delayQueueKeys.put(5, "delay_queue:delay_5s"); // 5秒 delayQueueKeys.put(60, "delay_queue:delay_1m"); // 1分鐘 delayQueueKeys.put(300, "delay_queue:delay_5m"); // 5分鐘 delayQueueKeys.put(1800, "delay_queue:delay_30m"); // 30分鐘 } /** * 添加延時任務(wù) */ public void addTask(String taskInfo, int delaySeconds) { // 選擇合適的延遲隊列 String queueKey = selectDelayQueue(delaySeconds); // 任務(wù)元數(shù)據(jù),包含任務(wù)信息和執(zhí)行時間 long executeTime = System.currentTimeMillis() + delaySeconds * 1000; String taskData = executeTime + ":" + taskInfo; // 添加到延遲隊列 redisTemplate.opsForList().rightPush(queueKey, taskData); System.out.println("Task added to " + queueKey + ": " + taskData); } /** * 選擇合適的延遲隊列 */ private String selectDelayQueue(int delaySeconds) { // 找到最接近的延遲級別 int closestDelay = delayQueueKeys.keySet().stream() .filter(delay -> delay >= delaySeconds) .min(Integer::compareTo) .orElse(Collections.max(delayQueueKeys.keySet())); return delayQueueKeys.get(closestDelay); } /** * 移動到期任務(wù)到待處理隊列 */ public void moveTasksToReadyQueue() { long now = System.currentTimeMillis(); // 遍歷所有延遲隊列 for (String queueKey : delayQueueKeys.values()) { boolean hasMoreTasks = true; while (hasMoreTasks) { // 查看隊列頭部任務(wù) String taskData = redisTemplate.opsForList().index(queueKey, 0); if (taskData == null) { hasMoreTasks = false; continue; } // 解析任務(wù)執(zhí)行時間 long executeTime = Long.parseLong(taskData.split(":", 2)[0]); // 檢查是否到期 if (executeTime <= now) { // 通過LPOP原子性地移除隊列頭部任務(wù) String task = redisTemplate.opsForList().leftPop(queueKey); // 任務(wù)可能被其他進程處理,再次檢查 if (task != null) { // 提取任務(wù)信息并添加到待處理隊列 String taskInfo = task.split(":", 2)[1]; redisTemplate.opsForList().rightPush(readyQueueKey, taskInfo); System.out.println("Task moved to ready queue: " + taskInfo); } } else { // 隊列頭部任務(wù)未到期,無需檢查后面的任務(wù) hasMoreTasks = false; } } } } /** * 獲取待處理任務(wù) */ public String getReadyTask() { return redisTemplate.opsForList().leftPop(readyQueueKey); } /** * 啟動任務(wù)處理器 */ public void startTaskProcessors() { // 定時移動到期任務(wù) ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); // 移動任務(wù)線程 scheduler.scheduleAtFixedRate(this::moveTasksToReadyQueue, 0, 1, TimeUnit.SECONDS); // 處理任務(wù)線程 scheduler.scheduleAtFixedRate(() -> { String task = getReadyTask(); if (task != null) { processTask(task); } }, 0, 100, TimeUnit.MILLISECONDS); } private void processTask(String taskInfo) { System.out.println("Processing task: " + taskInfo); // 實際任務(wù)處理邏輯 } }
優(yōu)缺點
優(yōu)點
- 分級隊列設(shè)計,降低單隊列壓力
- 相比Sorted Set占用內(nèi)存少
- 支持隊列監(jiān)控和任務(wù)優(yōu)先級
缺點
- 延遲時間精度受輪詢頻率影響
- 實現(xiàn)復(fù)雜度高
- 需要維護多個隊列
- 時間判斷和隊列操作非原子性,需特別處理并發(fā)問題
3. 基于發(fā)布/訂閱(Pub/Sub)的延時隊列
原理
結(jié)合Redis發(fā)布/訂閱功能與本地時間輪算法,實現(xiàn)延遲任務(wù)的分發(fā)和處理。任務(wù)信息存儲在Redis中,而時間輪負責(zé)任務(wù)的調(diào)度和發(fā)布。
代碼實現(xiàn)
public class RedisPubSubDelayQueue { private final StringRedisTemplate redisTemplate; private final String TASK_TOPIC = "delay_queue:task_channel"; private final String TASK_HASH = "delay_queue:tasks"; private final HashedWheelTimer timer; public RedisPubSubDelayQueue(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; // 初始化時間輪,刻度100ms,輪子大小512 this.timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512); // 啟動消息訂閱 subscribeTaskChannel(); } /** * 添加延時任務(wù) */ public void addTask(String taskId, String taskInfo, long delaySeconds) { // 存儲任務(wù)信息到Redis redisTemplate.opsForHash().put(TASK_HASH, taskId, taskInfo); // 添加到時間輪 timer.newTimeout(timeout -> { // 發(fā)布任務(wù)就緒消息 redisTemplate.convertAndSend(TASK_TOPIC, taskId); }, delaySeconds, TimeUnit.SECONDS); System.out.println("Task scheduled: " + taskId + ", delay: " + delaySeconds + "s"); } /** * 訂閱任務(wù)通道 */ private void subscribeTaskChannel() { redisTemplate.getConnectionFactory().getConnection().subscribe( (message, pattern) -> { String taskId = new String(message.getBody()); // 獲取任務(wù)信息 String taskInfo = (String) redisTemplate.opsForHash().get(TASK_HASH, taskId); if (taskInfo != null) { // 處理任務(wù) processTask(taskId, taskInfo); // 刪除任務(wù) redisTemplate.opsForHash().delete(TASK_HASH, taskId); } }, TASK_TOPIC.getBytes() ); } private void processTask(String taskId, String taskInfo) { System.out.println("Processing task: " + taskId + " - " + taskInfo); // 實際任務(wù)處理邏輯 } // 模擬HashedWheelTimer類 public static class HashedWheelTimer { private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private final long tickDuration; private final TimeUnit unit; public HashedWheelTimer(long tickDuration, TimeUnit unit, int wheelSize) { this.tickDuration = tickDuration; this.unit = unit; } public void newTimeout(TimerTask task, long delay, TimeUnit timeUnit) { long delayMillis = timeUnit.toMillis(delay); scheduler.schedule( () -> task.run(null), delayMillis, TimeUnit.MILLISECONDS ); } public interface TimerTask { void run(Timeout timeout); } public interface Timeout { } } }
優(yōu)缺點
優(yōu)點:
- 即時觸發(fā),無需輪詢
- 高效的時間輪算法
- 可以跨應(yīng)用訂閱任務(wù)
- 分離任務(wù)調(diào)度和執(zhí)行,降低耦合
缺點:
- 依賴本地時間輪,非純Redis實現(xiàn)
- Pub/Sub模式無消息持久化,可能丟失消息
- 服務(wù)重啟時需要重建時間輪
- 訂閱者需要保持連接
4. 基于Redis Stream的延時隊列
原理
Redis 5.0引入的Stream是一個強大的數(shù)據(jù)結(jié)構(gòu),專為消息隊列設(shè)計。結(jié)合Stream的消費組和確認機制,可以構(gòu)建可靠的延時隊列。
代碼實現(xiàn)
public class RedisStreamDelayQueue { private final StringRedisTemplate redisTemplate; private final String delayQueueKey = "delay_queue:stream"; private final String consumerGroup = "delay_queue_consumers"; private final String consumerId = UUID.randomUUID().toString(); public RedisStreamDelayQueue(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; // 創(chuàng)建消費者組 try { redisTemplate.execute((RedisCallback<String>) connection -> { connection.streamCommands().xGroupCreate( delayQueueKey.getBytes(), consumerGroup, ReadOffset.from("0"), true ); return "OK"; }); } catch (Exception e) { // 消費者組可能已存在 System.out.println("Consumer group may already exist: " + e.getMessage()); } } /** * 添加延時任務(wù) */ public void addTask(String taskInfo, long delaySeconds) { long executeTime = System.currentTimeMillis() + delaySeconds * 1000; Map<String, Object> task = new HashMap<>(); task.put("executeTime", String.valueOf(executeTime)); task.put("taskInfo", taskInfo); redisTemplate.opsForStream().add(delayQueueKey, task); System.out.println("Task added: " + taskInfo + ", execute at: " + executeTime); } /** * 獲取待執(zhí)行的任務(wù) */ public List<String> pollTasks() { long now = System.currentTimeMillis(); List<String> readyTasks = new ArrayList<>(); // 讀取尚未處理的消息 List<MapRecord<String, Object, Object>> records = redisTemplate.execute( (RedisCallback<List<MapRecord<String, Object, Object>>>) connection -> { return connection.streamCommands().xReadGroup( consumerGroup.getBytes(), consumerId.getBytes(), StreamReadOptions.empty().count(10), StreamOffset.create(delayQueueKey.getBytes(), ReadOffset.from(">")) ); } ); if (records != null) { for (MapRecord<String, Object, Object> record : records) { String messageId = record.getId().getValue(); Map<Object, Object> value = record.getValue(); long executeTime = Long.parseLong((String) value.get("executeTime")); String taskInfo = (String) value.get("taskInfo"); // 檢查任務(wù)是否到期 if (executeTime <= now) { readyTasks.add(taskInfo); // 確認消息已處理 redisTemplate.execute((RedisCallback<String>) connection -> { connection.streamCommands().xAck( delayQueueKey.getBytes(), consumerGroup.getBytes(), messageId.getBytes() ); return "OK"; }); // 可選:從流中刪除消息 redisTemplate.opsForStream().delete(delayQueueKey, messageId); } else { // 任務(wù)未到期,放回隊列 redisTemplate.execute((RedisCallback<String>) connection -> { connection.streamCommands().xAck( delayQueueKey.getBytes(), consumerGroup.getBytes(), messageId.getBytes() ); return "OK"; }); // 重新添加任務(wù)(可選:使用延遲重新入隊策略) Map<String, Object> newTask = new HashMap<>(); newTask.put("executeTime", String.valueOf(executeTime)); newTask.put("taskInfo", taskInfo); redisTemplate.opsForStream().add(delayQueueKey, newTask); } } } return readyTasks; } /** * 啟動任務(wù)處理器 */ public void startTaskProcessor() { ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(() -> { try { List<String> tasks = pollTasks(); for (String task : tasks) { processTask(task); } } catch (Exception e) { e.printStackTrace(); } }, 0, 1, TimeUnit.SECONDS); } private void processTask(String taskInfo) { System.out.println("Processing task: " + taskInfo); // 實際任務(wù)處理邏輯 } }
優(yōu)缺點
優(yōu)點:
- 支持消費者組和消息確認,提供可靠的消息處理
- 內(nèi)置消息持久化機制
- 支持多消費者并行處理
- 消息ID包含時間戳,便于排序
缺點:
- 要求Redis 5.0+版本
- 實現(xiàn)相對復(fù)雜
- 仍需輪詢獲取到期任務(wù)
- 對未到期任務(wù)的處理相對繁瑣
性能對比與選型建議
實現(xiàn)方式 | 性能 | 可靠性 | 實現(xiàn)復(fù)雜度 | 內(nèi)存占用 | 適用場景 |
---|---|---|---|---|---|
Sorted Set | ★★★★☆ | ★★★☆☆ | 低 | 中 | 任務(wù)量適中,需要精確調(diào)度 |
List + 輪詢 | ★★★★★ | ★★★☆☆ | 中 | 低 | 高并發(fā),延時精度要求不高 |
Pub/Sub + 時間輪 | ★★★★★ | ★★☆☆☆ | 高 | 低 | 實時性要求高,可容忍服務(wù)重啟丟失 |
Stream | ★★★☆☆ | ★★★★★ | 高 | 中 | 可靠性要求高,需要消息確認 |
總結(jié)
在實際應(yīng)用中,可根據(jù)系統(tǒng)規(guī)模、性能需求、可靠性要求和實現(xiàn)復(fù)雜度等因素進行選擇,也可以組合多種方式打造更符合業(yè)務(wù)需求的延時隊列解決方案。無論選擇哪種實現(xiàn),都應(yīng)關(guān)注可靠性、性能和監(jiān)控等方面,確保延時隊列在生產(chǎn)環(huán)境中穩(wěn)定運行。
到此這篇關(guān)于Redis中4種延時隊列實現(xiàn)方式小結(jié)的文章就介紹到這了,更多相關(guān)Redis延時隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Boot如何使用Undertow代替Tomcat
這篇文章主要介紹了Spring Boot如何使用Undertow代替Tomcat,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-09-09Java 獲取當(dāng)前類名和方法名的實現(xiàn)方法
這篇文章主要介紹了 Java 獲取當(dāng)前類名和方法名的實現(xiàn)方法的相關(guān)資料,這里不僅提供了實現(xiàn)方法并比較幾種方法的效率,需要的朋友可以參考下2017-07-07SQL Server 2000 Driver for JDBC Service Pack 3 安裝測試方法
這篇文章主要介紹了數(shù)據(jù)庫連接測試程序(SQL Server 2000 Driver for JDBC Service Pack 3 安裝測試),需要的朋友可以參考下2014-10-10Java synchronized重量級鎖實現(xiàn)過程淺析
這篇文章主要介紹了Java synchronized重量級鎖實現(xiàn)過程,synchronized是Java里的一個關(guān)鍵字,起到的一個效果是"監(jiān)視器鎖",它的功能就是保證操作的原子性,同時禁止指令重排序和保證內(nèi)存的可見性2023-02-02java狀態(tài)機方案解決訂單狀態(tài)扭轉(zhuǎn)示例詳解
這篇文章主要為大家介紹了java狀態(tài)機方案解決訂單狀態(tài)扭轉(zhuǎn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-03-03Hibernate基于ThreadLocal管理Session過程解析
這篇文章主要介紹了Hibernate基于ThreadLocal管理Session過程解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-10-10