SpringBoot實(shí)現(xiàn)redis延遲隊(duì)列的示例代碼
1. 業(yè)務(wù)場景
延時(shí)隊(duì)列場景在我們?nèi)粘I(yè)務(wù)開發(fā)中經(jīng)常遇到,它是一種特殊類型的消息隊(duì)列,它允許把消息發(fā)送到隊(duì)列中,但不立即投遞給消費(fèi)者,而是在一定時(shí)間后再將消息投遞給消費(fèi)者。延遲隊(duì)列的常見使用場景有以下幾種:
- 在各種購物平臺上下單,訂單超過30分鐘未支付,自動關(guān)閉。
- 訂單完成后, 如果用戶一直未評價(jià), 5天后自動好評。
- 會員到期前15天, 到期前3天分別發(fā)送短信提醒。
- 當(dāng)訂單一直處于未支付狀態(tài)時(shí),如何及時(shí)的關(guān)閉訂單,并退還庫存?
- 如何定期檢查處于退款狀態(tài)的訂單是否已經(jīng)退款成功?
2. Redis延遲隊(duì)列實(shí)現(xiàn)原理
目前延遲隊(duì)列的類型主要實(shí)現(xiàn)有:
- 基于消息的延遲:指為每條消息設(shè)置不同的延遲時(shí)間,那么每當(dāng)隊(duì)列中有新消息進(jìn)入的時(shí)候就會重新根據(jù)延遲時(shí)間排序,或者定義時(shí)間輪,新消息落在指定位置;
- 基于隊(duì)列的延遲: 設(shè)置不同延遲級別的隊(duì)列,比如5s、1min、30mins、1h等,每個(gè)隊(duì)列中消息的延遲時(shí)間都是相同的。
基于第一種不少組件都有實(shí)現(xiàn)方案,比如redis的sortset間接實(shí)現(xiàn),kafka內(nèi)部時(shí)間輪,rabbitMQ可安裝插件實(shí)現(xiàn)。第一種實(shí)時(shí)性高,不過主觀看會比較依賴組件本身,但自己實(shí)現(xiàn)就得考慮持久化、高可用等問題,建議直接使用組件本身;第二種方案可以基于組件去實(shí)現(xiàn),通用性會高點(diǎn),不過實(shí)時(shí)性不高,更適合用于重試業(yè)務(wù)場景。當(dāng)然Redis本身并不支持延遲隊(duì)列,所以我們只是實(shí)現(xiàn)一個(gè)比較簡單的延遲隊(duì)列,而且Redis不太適合大量消息堆積,所以只適合比較簡單的場景,然假如我們對消息的實(shí)時(shí)性以及可靠性要求非常高,可能就需要使用MQ或kafka來實(shí)現(xiàn)了。
消息延遲流程圖如下:
Redis延遲隊(duì)列可以通過 zset 來實(shí)現(xiàn),因?yàn)?zset 中有一個(gè) score,我們可以把時(shí)間作為 score,將 value 存到 redis 中,然后通過輪詢的方式,去不斷的讀取消息出來,整體思路為:
- 消息體設(shè)置有效期,設(shè)置好score,然后放入zset中
- 通過排名拉取消息
- 有效期到了,就把當(dāng)前消息從zset中移除
zadd命令
使用方式:ZADD key score member [[score member][score member] …]
將一個(gè)或多個(gè) member 元素及其 score 值加入到有序集 key 當(dāng)中。如果 key 不存在,則創(chuàng)建一個(gè)空的有序集并執(zhí)行 ZADD 操作。如果某個(gè) member 已經(jīng)是有序集的成員,那么更新這個(gè) member 的 score 值,并通過重新插入這個(gè) member 元素,來保證該 member 在正確的位置上。score 值可以是整數(shù)值或雙精度浮點(diǎn)數(shù)。
ZRANGEBYSCORE命令
使用方式:ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
- 返回有序集 key 中,所有 score 值介于 min 和 max 之間(包括等于 min 或 max )的成員。有序集成員按 score 值遞增(從小到大)次序排列。
- 具有相同 score 值的成員按字典序來排列
- 可選的 LIMIT 參數(shù)指定返回結(jié)果的數(shù)量及區(qū)間(就像SQL中的 SELECT LIMIT offset, count ),注意當(dāng) offset 很大時(shí),定位 offset 的操作可能需要遍歷整個(gè)有序集,此過程最壞復(fù)雜度為 O(N) 時(shí)間。
- 可選的 WITHSCORES 參數(shù)決定結(jié)果集是單單返回有序集的成員,還是將有序集成員及其 score 值一起返回。
ZREM命令
使用方式:ZREM key member [member …]
移除有序集 key 中的一個(gè)或多個(gè)成員,不存在的成員將被忽略。
當(dāng) key 存在但不是有序集類型時(shí),返回一個(gè)錯(cuò)誤。
3. 基于springboot實(shí)現(xiàn)redis延遲隊(duì)列
3.1 引入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>${version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>${version}</version> </dependency>
3.2 redis基礎(chǔ)方法
定義RedisService基礎(chǔ)服務(wù)方法,本次案例只涉及到以下三個(gè)基礎(chǔ)方法:
/** * 添加 ZSet 元素 * * @param key * @param value * @param score */ @Override public boolean add(String key, Object value, double score) { return redisTemplate.opsForZSet().add(key, value, score); } /** * 返回 分?jǐn)?shù)范圍內(nèi) 指定 count 數(shù)量的元素集合, 并且從 offset 下標(biāo)開始(從小到大,不帶分?jǐn)?shù)的集合) * * @param key * @param min * @param max * @param offset 從指定下標(biāo)開始 * @param count 輸出指定元素?cái)?shù)量 * @return */ @Override public Set<Object> rangeByScore(String key, double min, double max, long offset, long count) { return redisTemplate.opsForZSet().rangeByScore(key, min, max, offset, count); } /** * Zset 刪除一個(gè)或多個(gè)元素 * * @param key * @param values * @return */ @Override public Long removeZset(String key, Object... values) { return redisTemplate.opsForZSet().remove(key, values); }
3.3 定義Spring消息事件推送
@Getter @ToString public class DelayMsg extends ApplicationEvent { private String msg; private String topic; public DelayMsg(Object source, String msg, String topic) { super(source); this.msg = msg; this.topic = topic; } }
3.4 消息獲取
定義redis獲取延遲隊(duì)列消息方法:
/** * 從zset中取出score小于當(dāng)前時(shí)間戳的數(shù)據(jù) * * @param key * @return */ public String getDelayOne(String key) { //先查后刪,一次拿3個(gè)做備選,這樣搶占到的概率就會高一些 Set<Object> sets = redisService.rangeByScore(key, 0, System.currentTimeMillis(), 0, 3); if (CollectionUtils.isEmpty(sets)) { return null; } for (Object val : sets) { if (1L.equals(redisService.removeZset(key, val))) { // 刪除成功,表示搶占到 return val.toString(); } } return null; }
這里每次查詢時(shí)取了三個(gè)數(shù)據(jù),然后遍歷獲取到的數(shù)據(jù),依次嘗試去刪除,若刪除成功,則表示當(dāng)前實(shí)例搶占到了這個(gè)消息
- 為什么這樣設(shè)計(jì)? 這里有兩個(gè)點(diǎn),先解釋第一個(gè),為啥先查后刪
如果我們按照正常的實(shí)現(xiàn)流程,每次從zset中取一個(gè),但是無法保證這個(gè)時(shí)候就只有我一個(gè)人拿到了這個(gè)數(shù)據(jù),在多實(shí)例的場景下,可能存在多個(gè)實(shí)例同時(shí)拿到了它,那么如何才能表示只有一個(gè)實(shí)例搶占到呢?
借助redis的單線程機(jī)制,只可能有一個(gè)實(shí)例會刪除成功,所以拿到并刪除成功的那個(gè)小伙伴,就是最終的幸運(yùn)兒;
因此實(shí)現(xiàn)細(xì)節(jié)就是先查,后刪,若刪除成功,表示獲取成功;否則表示被其他的實(shí)例捷足先登。
- 接下來再看第二個(gè),為啥一次拿三個(gè)
從上面的分析可以看出,如果我一次只拿一個(gè),那么我搶占到的幾率并不太大,特別是當(dāng)實(shí)例比較多時(shí),可能會做多次的無效操作;為了減少這個(gè)可能性,所以我一次多拿幾個(gè)做備選,這樣搶占到的概率就會高一些,至于為什么是3,這個(gè)就看實(shí)際的實(shí)例與定時(shí)任務(wù)的執(zhí)行間隔了。
上面定義了如何獲取延遲隊(duì)列中已到期的消息,接下來需要定時(shí)輪訓(xùn)獲取消息:
/** * 每5s定時(shí)輪訓(xùn)消息 */ @Scheduled(fixedRate = 5000) public void schedule() { for (String specialTopic : topic) { String msg = redisDelayQueue.getDelayOne(specialTopic); logger.info("開始輪訓(xùn)獲取消息 {}", msg); if (StringUtil.isNotEmpty(msg)) { //使用Spring推送事件處理 applicationContext.publishEvent(new DelayMsg(this, msg, specialTopic)); } } }
上面的定時(shí)任務(wù),直接借助Spring的@Schedule來實(shí)現(xiàn),遍歷所有的topic,撈出數(shù)據(jù)之后,通過spring的 event/listener事件機(jī)制來實(shí)現(xiàn)消息處理的解耦
3.5 定義消費(fèi)者注解和切面處理
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @EventListener public @interface Consumer { String topic(); }
注意這個(gè)注解上面還有 @EventListener,表明它可以監(jiān)聽的spring的事件
3.6 定義延時(shí)業(yè)務(wù)的切面處理
@Aspect @Component public class ConsumerAspect { @Around("@annotation(consumer)") public Object around(ProceedingJoinPoint joinPoint, Consumer consumer) throws Throwable { Object[] args = joinPoint.getArgs(); boolean check = false; for (Object obj : args) { if (obj instanceof DelayMsg) { check = consumer.topic().equals(((DelayMsg) obj).getTopic()); } } if (!check) { // 不滿足條件,直接忽略 return null; } // topic匹配成功,執(zhí)行 return joinPoint.proceed(); } }
3.7 消息監(jiān)聽
//使用自定義的consumer注解監(jiān)聽topic延遲隊(duì)列 @Consumer(topic = RedisKeyConstant.DELAY_QUEUE) public void consumer(DelayMsg delayMsg) { logger.info("預(yù)約單延時(shí)確認(rèn): " + delayMsg.getMsg() + " at:" + System.currentTimeMillis()); //延遲業(yè)務(wù)具體實(shí)現(xiàn) //... //... }
3.8 寫入隊(duì)列的包裝服務(wù)類
@Component public class DelayListWrapper { private Logger logger = LoggerFactory.getLogger(RedisDelayQueue.class); @Autowired RedisService redisService; private Set<String> topic = new CopyOnWriteArraySet<>(); /** * * @param key 隊(duì)列名稱 * @param val 消息內(nèi)容 * @param delayTime 過期時(shí)間 */ public void publish(String key, Object val, long delayTime) { topic.add(key); String strVal = val instanceof String ? (String) val : JSONObject.toJSONString(val); redisService.add(key, strVal, System.currentTimeMillis() + delayTime); logger.info("key為:{},time:{}", key,System.currentTimeMillis() + delayTime); } }
3.9 業(yè)務(wù)facade層調(diào)用延遲處理
經(jīng)過以上的延遲隊(duì)列封裝處理,在facade層,也就是我們的業(yè)務(wù)中就可以直接調(diào)用:
@Autowired private DelayListWrapper delayListWrapper; ... delayListWrapper.publish(RedisKeyConstant.DELAY_QUEUE, xxxId, xxx);
4 總結(jié)
本文以redis的zset來實(shí)現(xiàn)延時(shí)隊(duì)列,并基于SpringBoot實(shí)現(xiàn)了延遲隊(duì)列的推送和消費(fèi)。更多相關(guān)SpringBoot redis延遲隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot利用redis集成消息隊(duì)列的方法
- SpringBoot集成Redis實(shí)現(xiàn)消息隊(duì)列的方法
- SpringBoot集成Redisson實(shí)現(xiàn)延遲隊(duì)列的場景分析
- springboot整合redis之消息隊(duì)列
- SpringBoot+Redis隊(duì)列實(shí)現(xiàn)Java版秒殺的示例代碼
- SpringBoot使用Redis實(shí)現(xiàn)消息隊(duì)列的方法小結(jié)
- SpringBoot中Redisson延遲隊(duì)列的示例
- springboot使用Redis隊(duì)列實(shí)戰(zhàn)
相關(guān)文章
基于Ajax用戶名驗(yàn)證、服務(wù)條款加載、驗(yàn)證碼生成的實(shí)現(xiàn)方法
本篇文章對Ajax用戶名驗(yàn)證、服務(wù)條款加載、驗(yàn)證碼生成的實(shí)現(xiàn)方法,進(jìn)行了詳細(xì)的分析介紹。需要的朋友參考下2013-05-05SpringCloud筆記(Hoxton)Netflix之Ribbon負(fù)載均衡示例代碼
這篇文章主要介紹了SpringCloud筆記HoxtonNetflix之Ribbon負(fù)載均衡,Ribbon是管理HTTP和TCP服務(wù)客戶端的負(fù)載均衡器,Ribbon具有一系列帶有名稱的客戶端(Named?Client),對SpringCloud?Ribbon負(fù)載均衡相關(guān)知識感興趣的朋友一起看看吧2022-06-06利用ScriptEngineManager實(shí)現(xiàn)字符串公式靈活計(jì)算的方法
今天小編就為大家分享一篇利用ScriptEngineManager實(shí)現(xiàn)字符串公式靈活計(jì)算的方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-07-07springmvc json類型轉(zhuǎn)換錯(cuò)誤解決方案
這篇文章主要介紹了springmvc json類型轉(zhuǎn)換錯(cuò)誤解決方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12Spring Boot實(shí)現(xiàn)郵件發(fā)送功能
這篇文章主要為大家詳細(xì)介紹了Spring Boot實(shí)現(xiàn)郵件發(fā)送功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-06-06