Redis簡易延時隊列的實現(xiàn)示例
一、背景
在實際的業(yè)務(wù)場景中,經(jīng)常會遇到需要延時處理的業(yè)務(wù),比如訂單超時未支付,需要取消訂單,或者是用戶注冊后,需要在一段時間內(nèi)激活賬號,否則賬號失效等等。這些業(yè)務(wù)場景都可以通過延時隊列來實現(xiàn)。
最近在實際業(yè)務(wù)當(dāng)中就遇到了這樣的一個場景,需要實現(xiàn)一個延時隊列,用來處理訂單超時未支付的業(yè)務(wù)。在網(wǎng)上找了一些資料,發(fā)現(xiàn)大部分都是使用了mq來實現(xiàn),比如rabbitmq,rocketmq等等,但是這些mq都是需要安裝的,而且還需要配置,對于此項目來說不想增加額外的依賴,所以就想到了使用redis來實現(xiàn)一個簡易的延時隊列。
二、實現(xiàn)思路
1. 業(yè)務(wù)場景
訂單超時未支付,需要取消訂單,這個業(yè)務(wù)場景可以分為兩個步驟來實現(xiàn):
- 用戶下單后,將訂單信息存入數(shù)據(jù)庫,并將訂單信息存入延時隊列中,設(shè)置延時時間為30分鐘。
- 30分鐘后,從延時隊列中取出訂單信息,判斷訂單是否已支付,如果未支付,則取消訂單。
- 如果用戶在30分鐘內(nèi)支付了訂單,則將訂單從延時隊列中刪除。
2. 實現(xiàn)思路
- 使用redis的zset來實現(xiàn)延時隊列,zset的score用來存儲訂單的超時時間,value用來存儲訂單信息。
- 使用redis的set來存儲已支付的訂單,set中的value為訂單id。
三、實現(xiàn)代碼
1. 使用了兩個注解類分別標(biāo)記生產(chǎn)者類、生產(chǎn)者方法,消費(fèi)者方法
/** * @program: * @description: redis延時隊列生產(chǎn)者類注解,標(biāo)記生產(chǎn)者類,用來掃描生產(chǎn)者類中的生產(chǎn)者方法,將生產(chǎn)者方法注冊到redis延時隊列中 * @author: jiangchengxuan * @created: 2023/12/09 10:32 */ @Component @Documented @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface RedisMessageQueue {}
/** * @program: * @description: * 帶有此注解的方法,方法的入?yún)⑹紫葧晦D(zhuǎn)換為json字符串,然后存入redis的zset中,score為當(dāng)前時間+延時時間,value為json字符串 * 當(dāng)延時時間到達(dá)后,會從redis的zset中取出value,然后將value轉(zhuǎn)換為入?yún)㈩愋?,調(diào)用此方法,執(zhí)行業(yè)務(wù)邏輯 * 此注解只能標(biāo)記在方法上,且方法必須為public,且只能有一個參數(shù) * 此注解標(biāo)記的方法,必須在redis延時隊列生產(chǎn)者類中,否則不會生效 * @author: jiangchengxuan * @created: 2023/12/09 10:37 */ @Documented @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface RedisMessageQueueMethod { String threadName() default "redis消息隊列默認(rèn)線程"; String queueKey(); // 隊列key值 int threadNum() default 1; //默認(rèn)線程數(shù)量 int threadSleepTime() default 500; //默認(rèn)線程休眠時間默認(rèn)500ms }
2. 生產(chǎn)者類具體實現(xiàn)
/** * @program: * @description: 生產(chǎn)者類具體實現(xiàn) * @author: jiangchengxuan * @created: 2023/12/09 10:44 */ @Slf4j @Component public class DelayQueueWorkerConfig implements InitializingBean { private volatile boolean monitorStarted = false; private volatile boolean monitorShutDowned = false; private ExecutorService executorService; // 需要監(jiān)控的延時隊列 @Autowired protected IDelayQueue<String> monitorQueue; @Autowired private ApplicationContext applicationContext; @Override public void afterPropertiesSet(){ //spring工具類,可以獲取指定注解的類 Map<String, Object> allNeedClass = applicationContext.getBeansWithAnnotation(RedisMessageQueue.class); for (Map.Entry<String, Object> entry : allNeedClass.entrySet()) { Object bean = entry.getValue(); Method[] methods = bean.getClass().getMethods(); for (Method method : methods) { Annotation[] annotations = method.getDeclaredAnnotations(); for (Annotation annotation : annotations) { if (annotation instanceof RedisMessageQueueMethod) { RedisMessageQueueMethod queueMethod = (RedisMessageQueueMethod) annotation; //找的需要使用消息隊列的方法后, initExecuteQueue(queueMethod, method, bean); } } } } } /** * 初始化執(zhí)行造作 * @param queueAnnotations 注解 * @param method 方法 * @param bean 對象 */ void initExecuteQueue(RedisMessageQueueMethod queueAnnotations ,Method method,Object bean) { String threadName = queueAnnotations.threadName(); int threadNum = queueAnnotations.threadNum(); int threadSheepTime = queueAnnotations.threadSleepTime(); String queueKey = queueAnnotations.queueKey(); //獲取所有消息隊列名稱 executorService = Executors.newFixedThreadPool(threadNum); for (int i = 0; i < threadNum; i++) { final int num = i; executorService.execute(() -> { Thread.currentThread().setName(threadName + "[" + num + "]"); //如果沒有設(shè)置隊列queuekey或者已經(jīng)暫停則不執(zhí)行 while (!monitorShutDowned) { String value = null; try { value = monitorQueue.get(queueKey); // 獲取數(shù)據(jù)時進(jìn)行刪除操作,刪除成功,則進(jìn)行處理,業(yè)務(wù)邏輯處理失敗則繼續(xù)添加回隊列但是時間設(shè)置最大以達(dá)到保存現(xiàn)場的目的,防止并發(fā)獲取重復(fù)數(shù)據(jù) if (StringUtils.isNotEmpty(value)) { if (log.isDebugEnabled()) { log.debug("Monitor Thread[" + Thread.currentThread().getName() + "], get from queue,value = {}", value); } boolean success = (Boolean) method.invoke(bean, value); // 失敗重試 if (!success) { success = (Boolean) method.invoke(bean, value);; if (!success) { log.warn("Monitor Thread[" + Thread.currentThread().getName() + "] execute Failed,value = {}", value); monitorQueue.add(TimeUnit.DAYS,365, value, queueKey); } } else { if (log.isDebugEnabled()) { log.debug("Monitor Thread[" + Thread.currentThread().getName() + "]:execute successfully!values = {}", value); } } } else { if (log.isDebugEnabled()) { log.debug("Monitor Thread[" + Thread.currentThread().getName() + "]:monitorThreadRunning = {}", monitorStarted); } Thread.sleep(threadSheepTime); } } catch (Exception e) { log.error("Monitor Thread[" + Thread.currentThread().getName() + "] execute Failed,value = " + value, e); } } log.info("Monitor Thread[" + Thread.currentThread().getName() + "] Completed..."); }); } log.info("thread pool is started..."); } }
/** * @program: * @description: * 延時隊列接口實現(xiàn)類, * 使用redis的zset實現(xiàn)延時隊列, * @author: jiangchengxuan * @created: 2023/12/09 23:34 */ public interface IDelayQueue <E> { /** * 向延時隊列中添加數(shù)據(jù) * * @param score 分?jǐn)?shù) * @param data 數(shù)據(jù) * @return true 成功 false 失敗 */ boolean add(long score, E data,String queueKey); /** * 向延時隊列中添加數(shù)據(jù) * * @param timeUnit 時間單位 * @param time 延后時間 * @param data 數(shù)據(jù) * @param queueKey * @return true 成功 false 失敗 */ boolean add(TimeUnit timeUnit, long time, E data, String queueKey); /** * 從延時隊列中獲取數(shù)據(jù) * @param queueKey 隊列key * @return 數(shù)據(jù) */ String get(String queueKey); /** * 刪除數(shù)據(jù) * * @param key * @param data 數(shù)據(jù) * @return */ public<T> boolean rem(String key, T data) ; }
/** * @program: * @description: redis操作類,封裝了redis的操作方法,使用時直接注入即可使用,不需要關(guān)心redis的操作細(xì)節(jié),使用時只需要關(guān)心業(yè)務(wù)邏輯即可 * @author: jiangchengxuan * @created: 2023/12/09 23:35 */ @Service public class RedisDelayQueue implements IDelayQueue<String> { @Autowired private RedisService redisService; @Override public boolean add(long score, String data,String queueKey) { return redisService.opsForZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+queueKey, data, score); } @Override public boolean add(TimeUnit timeUnit, long time, String data, String queueKey) { switch (timeUnit) { case SECONDS: return add(LocalDateTime.now().plusSeconds(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data, queueKey); case MINUTES: return add(LocalDateTime.now().plusMinutes(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey); case HOURS: return add(LocalDateTime.now().plusHours(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey); case DAYS: return add(LocalDateTime.now().plusDays(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey); default: return false; } } @Override public String get(String queueKey) { long now = System.currentTimeMillis(); long min = Long.MIN_VALUE; Set<String> res = redisService.rangeByScoreZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+queueKey, min, now, 0, 10); if (!CollectionUtils.isEmpty(res)) { for (String data : res){ // 刪除成功,則進(jìn)行處理,防止并發(fā)獲取重復(fù)數(shù)據(jù) if (rem(queueKey, data)){ return data; } } } return null; } @Override public<T> boolean rem(String key, T data) { return redisService.remZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+key, data); } }
- 使用
@RedisMessageQueue public class SomethingClass { @Autowired private IDelayQueue<String> messageQueue; /** * 生產(chǎn)者,向隊列中添加數(shù)據(jù),30秒后消費(fèi)者進(jìn)行消費(fèi) */ public void test(){ messageQueue.add(TimeUnit.SECONDS,30L,"這是參數(shù)數(shù)據(jù)","new_queue"); } /** * 消費(fèi)者,如果按此配置的話,會啟動一個線程,線程名稱為:測試線程名稱,線程數(shù)量為1,線程休眠時間為10毫秒 * 注意:queueKey需要與生產(chǎn)者中的queueKey保持一致才能進(jìn)行消費(fèi) * @param data */ @Override @RedisMessageQueueMethod(threadName = "測試線程名稱",queueKey = "new_queue",threadNum = 1,threadSleepTime = 10) public void testMethod(String data) { //do something } }
到此這篇關(guān)于Redis簡易延時隊列的實現(xiàn)示例的文章就介紹到這了,更多相關(guān)Redis 延時隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis數(shù)據(jù)過期策略的實現(xiàn)詳解
最近項目當(dāng)中遇到一個需求場景,需要清空一些存放在Redis的數(shù)據(jù),本文對Redis的過期機(jī)制簡單的講解一下,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-09-09利用redisson快速實現(xiàn)自定義限流注解(接口防刷)
利用redis的有序集合即Sorted?Set數(shù)據(jù)結(jié)構(gòu),構(gòu)造一個令牌桶來實施限流,而redisson已經(jīng)幫我們封裝成了RRateLimiter,通過redisson,即可快速實現(xiàn)我們的目標(biāo),這篇文章主要介紹了利用redisson快速實現(xiàn)自定義限流注解,需要的朋友可以參考下2024-07-07