Redis實(shí)現(xiàn)延遲隊(duì)列的項(xiàng)目示例
最近用到一個(gè)延遲消息的功能,第一時(shí)間想到使用MQ或者M(jìn)Q的插件,因?yàn)閿?shù)據(jù)量不大,所以嘗試使用Redis來實(shí)現(xiàn)了,畢竟Redis也天生支持類似MQ的隊(duì)列消費(fèi),所以,在這里總結(jié)了一下Redis實(shí)現(xiàn)延遲消息隊(duì)列的方式。
一、監(jiān)聽key過期時(shí)間
處理流程:當(dāng)redis的一個(gè)key過期時(shí),redis會(huì)生成一個(gè)事件,通知訂閱了該事件的客戶端(KeyExpirationEventMessageListener),然后在客戶端的回調(diào)方法中處理邏輯。
1)新建SpringBoot項(xiàng)目,maven依賴及yml如下
maven依賴:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
yml文件
server:
port: 8000
spring:
redis:
database: 0
host: xxxx
port: 6379
password: xxxxxx
lettuce:
pool:
#最大連接數(shù)
max-active: 8
#最大阻塞等待時(shí)間
max-wait: -1
#最大空閑
max-idle: 8
#最小空閑
min-idle: 0
#連接超時(shí)時(shí)間
timeout: 5000
2)修改redis.conf文件開啟事件通知配置
默認(rèn)的配置:notify-keyspace-events “”
修改為:notify-keyspace-events Ex,該配置表示監(jiān)聽key的過期事件
3)設(shè)置Redis監(jiān)聽配置,注入Bean RedisMessageListenerContaine
@Configuration
public class RedisTimeoutConfiguration {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
return redisMessageListenerContainer;
}
@Bean
public KeyExpiredListener keyExpiredListener() {
return new KeyExpiredListener(this.redisMessageListenerContainer());
}
}
4)創(chuàng)建監(jiān)聽器類,重寫key過期回調(diào)方法onMessage
@Slf4j
public class KeyExpiredListener extends KeyExpirationEventMessageListener {
@Autowired
public RedisTemplate<String, String> redisTemplate;
public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] bytes) {
String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
//過期的key
String key = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("redis key 過期:bytes={},channel={},key={}", new String(bytes), channel, key);
}
}
5)編寫測(cè)試接口:寫入一個(gè)帶過期時(shí)間的key
@RestController
@RequestMapping("/demo")
public class BasicController {
@Autowired
public RedisTemplate<String, String> redisTemplate;
@GetMapping(value = "/test")
public void redisTest() {
redisTemplate.opsForValue().set("test", "5s后過期", 5, TimeUnit.SECONDS);
}
}
執(zhí)行后,onMessage監(jiān)聽方法打印結(jié)果:
redis key 過期:bytes=__keyevent@*__:expired,channel=__keyevent@0__:expired,key=test
該方案缺點(diǎn):可靠性問題,Redis 是一個(gè)內(nèi)存數(shù)據(jù)庫,盡管它提供了數(shù)據(jù)持久化選項(xiàng)(如 RDB 和 AOF),但在某些情況下(如意外崩潰或重啟),可能會(huì)丟失一些未處理的過期事件。
二、zset + score
基本思路是將消息按需發(fā)送的時(shí)間作為分?jǐn)?shù)存儲(chǔ)在有序集合zset中,然后定期檢查并處理到期的消息。代碼例子如下:
1)創(chuàng)建 DelayedMessageService 類
@Slf4j
@Service
public class DelayedMessageService {
private static final String DELAYED_MESSAGES_ZSET = "delayed:messages";
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void addMessage(String message, long delayMillis) {
long score = System.currentTimeMillis() + delayMillis;
redisTemplate.opsForZSet().add(DELAYED_MESSAGES_ZSET, message, score);
}
@Scheduled(fixedRate = 1000)
public void processMessages() {
long now = System.currentTimeMillis();
Set<ZSetOperations.TypedTuple<String>> messages = redisTemplate.opsForZSet().rangeByScoreWithScores(DELAYED_MESSAGES_ZSET, 0, now);
if (messages != null && !messages.isEmpty()) {
for (ZSetOperations.TypedTuple<String> message : messages) {
String msg = message.getValue();
long score = message.getScore().longValue();
if (score <= now) {
// Process the message
System.out.println("Processing message: " + msg);
// Remove the message from the zset
redisTemplate.opsForZSet().remove(DELAYED_MESSAGES_ZSET, msg);
}
}
}else{
log.info("定時(shí)任務(wù)執(zhí)行~");
}
}
}
2)編寫Controller接口測(cè)試,初始化zset內(nèi)容
@RestController
@RequestMapping("/demo")
public class BasicController {
@Autowired
private DelayedMessageService delayedMessageService;
@GetMapping(value = "/test2")
public void redisZsetTest() {
// Add some messages with delays
delayedMessageService.addMessage("Message 1", 5000); // 5 seconds delay
delayedMessageService.addMessage("Message 2", 10000); // 10 seconds delay
delayedMessageService.addMessage("Message 3", 15000); // 15 seconds delay
}
}
說明:
- redisZsetTest接口通過調(diào)用
DelayedMessageService的addMessage方法,將消息及其到期時(shí)間添加到 Redis 的 zset 中 - 開啟一個(gè)定時(shí)任務(wù),定期檢查和處理到期的消息。使用
@Scheduled注解定期執(zhí)行,每秒檢查一次,注意這里使用@Scheduled,不要忘了啟動(dòng)類上添加@EnableScheduling注解,否則定時(shí)任務(wù)不會(huì)生效。fixedRate屬性表示以固定的頻率(毫秒為單位)執(zhí)行方法。即方法執(zhí)行完成后,會(huì)立即等待指定的毫秒數(shù),然后再次執(zhí)行。 - 通過
redisTemplate.opsForZSet().rangeByScoreWithScores方法按時(shí)間范圍獲取到期的消息,消息處理完成后,從zset 中移除處理過的消息
三、Redisson框架
利用 Redisson 提供的數(shù)據(jù)結(jié)構(gòu)RDelayedQueue和RBlockingDeque,可以自動(dòng)處理過期的任務(wù)并將它們移動(dòng)到阻塞隊(duì)列中,這樣我們就可以從阻塞隊(duì)列中獲取任務(wù)并進(jìn)行消費(fèi)處理。例子如下:
1)添加依賴
<!-- Redisson 依賴項(xiàng) -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>2.15.1</version>
</dependency>
2)創(chuàng)建DelayedMessageService
@Slf4j
@Service
public class DelayedMessageService {
@Autowired
private RedissonClient redissonClient;
private RBlockingDeque<String> blockingDeque;
private RDelayedQueue<String> delayedQueue;
@PostConstruct
public void init() {
this.blockingDeque = redissonClient.getBlockingDeque("delayedQueue");
this.delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
Executors.newSingleThreadExecutor().submit(this::processMessages);
}
public void addMessage(String message, long delayMillis) {
delayedQueue.offer(message, delayMillis, TimeUnit.MILLISECONDS);
}
public void processMessages() {
try {
while (true) {
String message = blockingDeque.take();
// Process the message
log.info("消息被處理: " + message);
// ..業(yè)務(wù)邏輯處理
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("中斷異常",e);
}
}
}
3)測(cè)試接口
@GetMapping(value = "/test3")
public void redisQueueTest() {
// Add some messages with delays
delayedMessageService.addMessage("Message 1", 5000); // 5 seconds delay
delayedMessageService.addMessage("Message 2", 10000); // 10 seconds delay
delayedMessageService.addMessage("Message 3", 15000); // 15 seconds delay
}
說明:
RDelayedQueue 是 Redisson 提供的延遲隊(duì)列,它將消息存儲(chǔ)在指定的隊(duì)列中,直到消息到期才會(huì)被轉(zhuǎn)移到該隊(duì)列。它的主要作用包括:
- 延遲消息管理:我們可以使用
RDelayedQueue的offer方法將消息添加到延遲隊(duì)列,并指定延遲時(shí)間,消息在延遲時(shí)間到期前一直保留在RDelayedQueue中。 - 消息轉(zhuǎn)移:一旦消息到期,
RDelayedQueue會(huì)自動(dòng)將消息轉(zhuǎn)移到指定的RBlockingDeque中。
RBlockingQueue是 Redisson 提供的阻塞隊(duì)列,它支持阻塞操作。主要作用包括:
- 阻塞操作:支持阻塞的
take操作,如果隊(duì)列中沒有元素,會(huì)一直阻塞直到有元素可供消費(fèi)。
總結(jié):
個(gè)人推薦使用Redisson 的RDelayedQueue 方式,感覺更加可靠和簡(jiǎn)單一些,當(dāng)然zset+score也可以是個(gè)不錯(cuò)選擇,畢竟更加靈活,延遲消息還有其他不同的方案,比如rocketmq、rabbitmq插件等,假如項(xiàng)目中用了redis,又不想引入更多的中間件,可以嘗試使用redis來實(shí)現(xiàn),為了測(cè)試,這里例子都比較簡(jiǎn)單,在實(shí)際使用過程中,還要考慮補(bǔ)償機(jī)制、冪等性等問題。
參考:
1.https://blog.csdn.net/qq_34826261/article/details/120598731
到此這篇關(guān)于Redis實(shí)現(xiàn)延遲隊(duì)列的項(xiàng)目示例的文章就介紹到這了,更多相關(guān)Redis 延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis教程(四):Hashes數(shù)據(jù)類型
這篇文章主要介紹了Redis教程(四):Hashes數(shù)據(jù)類型,本文講解了Hashes數(shù)據(jù)類型概述、相關(guān)命令列表和命令使用示例等內(nèi)容,需要的朋友可以參考下2015-04-04
Redis設(shè)置密碼的實(shí)現(xiàn)步驟
本文主要介紹了Redis設(shè)置密碼的實(shí)現(xiàn)步驟,主要包括兩種方法:臨時(shí)密碼和持久密碼,具有一定的參考價(jià)值,感興趣的可以了解一下2023-08-08
redis實(shí)現(xiàn)分布式session的解決方案
session存放在服務(wù)器,關(guān)閉瀏覽器不會(huì)失效,本文主要介紹了redis實(shí)現(xiàn)分布式session的解決方案,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03
Redisson分布式限流的實(shí)現(xiàn)原理分析
這篇文章主要介紹了Redisson分布式限流的實(shí)現(xiàn)原理分析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-07-07
Redis數(shù)據(jù)遷移RedisShake的實(shí)現(xiàn)方法
本文主要介紹了Redis數(shù)據(jù)遷移RedisShake的實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-04-04

