Redis優(yōu)雅地實(shí)現(xiàn)延遲隊(duì)列的方法分享
前言
工作中常常會(huì)遇到這樣的場(chǎng)景,如訂單到期未支付取消,到期自動(dòng)續(xù)費(fèi)等,我們發(fā)現(xiàn)延遲隊(duì)列非常適合在這樣的場(chǎng)景中使用。常見(jiàn)的延遲隊(duì)列的優(yōu)秀實(shí)現(xiàn)有rabbitMQ的死信隊(duì)列,RocketMQ的延遲隊(duì)列等,但是了有時(shí)候項(xiàng)目沒(méi)有特別的大,沒(méi)有引入類(lèi)似的消息中間件,但是了又遇到了特別適合使用延遲隊(duì)列的場(chǎng)景,我們一般會(huì)利用已有的redis實(shí)現(xiàn)一個(gè)簡(jiǎn)陋的延遲隊(duì)列。常見(jiàn)的實(shí)現(xiàn)方式有監(jiān)聽(tīng)過(guò)期key,使用zset利用分值進(jìn)行一個(gè)匹配,但是了這些實(shí)現(xiàn)或多或少有些問(wèn)題,不夠優(yōu)雅。監(jiān)聽(tīng)過(guò)期key是一種危險(xiǎn)行為,一是如果過(guò)redis中key數(shù)量較大監(jiān)聽(tīng)過(guò)期key可能導(dǎo)致服務(wù)負(fù)載異常,二是redis中key過(guò)期后key是惰性刪除的,因此監(jiān)聽(tīng)機(jī)制需要主動(dòng)觸發(fā)。利用zset分值實(shí)現(xiàn)呢,需要自己開(kāi)發(fā)代碼處理定時(shí)輪訓(xùn)以及key刪除的邏輯,具有一定的工作量和復(fù)雜度。哪有沒(méi)有一種優(yōu)雅的redis延遲隊(duì)列的實(shí)現(xiàn)呢?
Redisson是Redis服務(wù)器上的分布式可伸縮Java數(shù)據(jù)結(jié)構(gòu)----駐內(nèi)存數(shù)據(jù)網(wǎng)格(In-Memory Data Grid,IMDG)。底層使用netty框架,并提供了與java對(duì)象相對(duì)應(yīng)的分布式對(duì)象、分布式集合、分布式鎖和同步器、分布式服務(wù)等一系列的Redisson的分布式對(duì)象。為我們提供了許多開(kāi)箱即用的功能。今天介紹Redisson實(shí)現(xiàn)的優(yōu)雅的延遲隊(duì)列。
使用
依賴(lài)配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.12.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.homeey</groupId>
<artifactId>redis-delay-queue</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>redis-delay-queue</name>
<description>redis-delay-queue</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.19.3</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data-23</artifactId>
<version>3.19.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>備注:處理redisson和springboot兼容性問(wèn)題
配置文件
springboot整合redisson有三種方式
- 第一種:通用的redis配置+redisson的自動(dòng)配置[最簡(jiǎn)單]
- 第二種:使用單獨(dú)的redisson配置文件
- 第三種:使用spring.redis.redisson這個(gè)配置key下進(jìn)行配置
詳細(xì)的整合查看 springboot整合redisson配置
spring:
redis:
database: 0
host: localhost
port: 6379
timeout: 10000
lettuce:
pool:
max-active: 8
max-wait: -1
min-idle: 0
max-idle: 8
demo代碼
package com.homeey.redisdelayqueue.delay;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 明天的你會(huì)因今天到的努力而幸運(yùn)
*
* @author jt4mrg@qq.com
* 23:11 2023-02-19 2023
**/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonDelayQueue {
private final RDelayedQueue<String> delayedQueue;
private final RBlockingQueue<String> blockingQueue;
@PostConstruct
public void init() {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.submit(() -> {
while (true) {
try {
String task = blockingQueue.take();
log.info("rev delay task:{}", task);
} catch (Exception e) {
log.error("occur error", e);
}
}
});
}
public void offerTask(String task, long seconds) {
log.info("add delay task:{},delay time:{}s", task, seconds);
delayedQueue.offer(task, seconds, TimeUnit.SECONDS);
}
@Configuration
static class RedissonDelayQueueConfigure {
@Bean
public RBlockingQueue<String> blockingQueue(RedissonClient redissonClient) {
return redissonClient.getBlockingQueue("TOKEN-RENEWAL");
}
@Bean
public RDelayedQueue<String> delayedQueue(RBlockingQueue<String> blockingQueue,
RedissonClient redissonClient) {
return redissonClient.getDelayedQueue(blockingQueue);
}
}
}執(zhí)行效果

原理分析
從RedissonDelayedQueue實(shí)現(xiàn)中我們看到有四個(gè)角色

- redisson_delay_queue_timeout:xxx,sorted set數(shù)據(jù)類(lèi)型,存放所有延遲任務(wù),按照延遲任務(wù)的到期時(shí)間戳(提交任務(wù)時(shí)的時(shí)間戳 + 延遲時(shí)間)來(lái)排序的,所以列表的最前面的第一個(gè)元素就是整個(gè)延遲隊(duì)列中最早要被執(zhí)行的任務(wù),這個(gè)概念很重要
- redisson_delay_queue:xxx,list數(shù)據(jù)類(lèi)型,暫時(shí)沒(méi)發(fā)現(xiàn)什么用,只是在提交任務(wù)時(shí)會(huì)寫(xiě)入這里面,隊(duì)列轉(zhuǎn)移時(shí)又會(huì)刪除里面的元素
- xxx:list數(shù)據(jù)類(lèi)型,被稱(chēng)為目標(biāo)隊(duì)列,這個(gè)里面存放的任務(wù)都是已經(jīng)到了延遲時(shí)間的,可以被消費(fèi)者獲取的任務(wù),所以上面demo中的RBlockingQueue的take方法是從這個(gè)目標(biāo)隊(duì)列中獲取到任務(wù)的
- redisson_delay_queue_channel:xxx,是一個(gè)channel,用來(lái)通知客戶端開(kāi)啟一個(gè)延遲任務(wù)
隊(duì)列創(chuàng)建
RedissonDelayedQueue延遲隊(duì)列創(chuàng)建時(shí),指定了隊(duì)列轉(zhuǎn)移服務(wù),以及實(shí)現(xiàn)延遲隊(duì)列的四個(gè)重要校色的key。核心代碼是指定隊(duì)列轉(zhuǎn)移任務(wù)
QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
@Override
protected RFuture<Long> pushTaskAsync() {
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "http://拿到zset中過(guò)期的值列表
+ "if #expiredValues > 0 then " //如果有
+ "for i, v in ipairs(expiredValues) do "
+ "local randomId, value = struct.unpack('dLc0', v);"http://解構(gòu)消息,在提交任務(wù)時(shí)打包的消息
+ "redis.call('rpush', KEYS[1], value);" //放入無(wú)前綴的list 隊(duì)頭
+ "redis.call('lrem', KEYS[3], 1, v);"http://移除帶前綴list 隊(duì)尾元素
+ "end; "
+ "redis.call('zrem', KEYS[2], unpack(expiredValues));" //移除zset中本次讀取的過(guò)期元素
+ "end; "
// get startTime from scheduler queue head task
+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "http://取zset最小分值的元素
+ "if v[1] ~= nil then "
+ "return v[2]; " //返回分值,即過(guò)期時(shí)間
+ "end "
+ "return nil;",
Arrays.asList(getRawName(), timeoutSetName, queueName),
System.currentTimeMillis(), 100);
}
@Override
protected RTopic getTopic() {
return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName);
}
};生產(chǎn)者

核心代碼RedissonDelayedQueue#offerAsync
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
"local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" //打包消息體:消息id,消息長(zhǎng)度,消息值
+ "redis.call('zadd', KEYS[2], ARGV[1], value);"http://zset中加入消息及其超時(shí)分值
+ "redis.call('rpush', KEYS[3], value);" //向帶前綴的list中添加消息
// if new object added to queue head when publish its startTime
// to all scheduler workers
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "http://取出zset中第一個(gè)元素
+ "if v[1] == value then " //如果最快過(guò)期的元素就是這次發(fā)送的消息
+ "redis.call('publish', KEYS[4], ARGV[1]); " //channel中發(fā)布一下超時(shí)時(shí)間
+ "end;",
Arrays.asList(getRawName(), timeoutSetName, queueName, channelName),
timeout, randomId, encode(e));消費(fèi)者
消費(fèi)者最簡(jiǎn)單,直接從不帶前綴的list中BLPOP讀取就可以
整個(gè)流程

總結(jié)思考
Lua是redis的好朋友,我們可以看到Redisson實(shí)現(xiàn)延遲隊(duì)列時(shí),大量使用到lua腳本,因Redis會(huì)將整個(gè)腳本作為一個(gè)整體執(zhí)行,中間不會(huì)被其他請(qǐng)求插入。因此在腳本運(yùn)行過(guò)程中無(wú)需擔(dān)心會(huì)出現(xiàn)競(jìng)態(tài)條件,無(wú)需使用事務(wù)。我們?cè)谄綍r(shí)開(kāi)發(fā)時(shí)有多個(gè)redis命令操作的有簡(jiǎn)單的業(yè)務(wù)邏輯,不妨嘗試一下lua腳本的方式,可以避免使用分布式鎖來(lái)保障一致性。
Redisson的源碼值得一讀,有很多新東西值得學(xué)習(xí),如果其用到的netty基于時(shí)間輪算法的定時(shí)任務(wù)調(diào)度,可以讓我們基于此實(shí)現(xiàn)自己的任務(wù)調(diào)度框架,也讓我有了去探究這種實(shí)現(xiàn)方式和基于ScheduledThreadPoolExecutor的定時(shí)調(diào)度的差異及各自優(yōu)劣的欲望。
以上就是Redis優(yōu)雅地實(shí)現(xiàn)延遲隊(duì)列的方法分享的詳細(xì)內(nèi)容,更多關(guān)于Redis延遲隊(duì)列的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
使用redis實(shí)現(xiàn)高效分頁(yè)的項(xiàng)目實(shí)踐
在很多場(chǎng)景下,我們需要對(duì)大量的數(shù)據(jù)進(jìn)行分頁(yè)展示,本文主要介紹了使用redis實(shí)現(xiàn)高效分頁(yè)的項(xiàng)目實(shí)踐,具有一定的參考價(jià)值,感興趣的可以了解一下2024-02-02
Redis 操作多個(gè)數(shù)據(jù)庫(kù)的配置的方法實(shí)現(xiàn)
本文主要介紹了Redis 操作多個(gè)數(shù)據(jù)庫(kù)的配置的方法實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03
解決Redis分布式鎖的誤刪問(wèn)題和原子性問(wèn)題
Redis的分布式鎖是通過(guò)利用Redis的原子操作和特性來(lái)實(shí)現(xiàn)的,為了保證數(shù)據(jù)的一致性和避免沖突,可以使用分布式鎖來(lái)進(jìn)行同步控制,本文給大家介紹了如何解決Redis分布式鎖的誤刪問(wèn)題和原子性問(wèn)題,需要的朋友可以參考下2024-02-02

