Redis?延時任務(wù)實現(xiàn)及與定時任務(wù)區(qū)別詳解
引言
1. 生成訂單30分鐘未支付,則自動取消
2. 30分鐘未回復(fù),則結(jié)束會話
對上述的任務(wù),我們給一個專業(yè)的名字來形容,那就是延時任務(wù)
一、延時任務(wù)是什么
延時任務(wù)
不同于一般的定時任務(wù),延時任務(wù)是在某事件觸發(fā)
后的未來某個時刻執(zhí)行,沒有重復(fù)的執(zhí)行周期。
二、延時任務(wù)和定時任務(wù)的區(qū)別是什么
- 定時任務(wù)有明確的觸發(fā)時間,延時任務(wù)沒有
- 定時任務(wù)有執(zhí)行周期,而延時任務(wù)在某事件觸發(fā)后一段時間內(nèi)執(zhí)行,沒有執(zhí)行周期
定時任務(wù)一般執(zhí)行的是批處理多個任務(wù),而延時任務(wù)一般是單任務(wù)處理
三、技術(shù)對比
本文主要講解Redis的Zset
實現(xiàn)延時任務(wù),其他方案只做介紹
1.數(shù)據(jù)庫輪詢
通過定時組件
的去掃描數(shù)據(jù)庫,通過時間來判斷是否有超時的訂單,然后進行update或delete等操作
優(yōu)點:
簡單易行
缺點:
- 對服務(wù)器內(nèi)存消耗大
- 時間間隔小,數(shù)據(jù)庫損耗極大
- 數(shù)據(jù)內(nèi)存態(tài),不可靠
- 如果任務(wù)量過大,對數(shù)據(jù)庫造成的壓力很大 。頻繁查詢數(shù)據(jù)庫帶來性能影響
2.JDK的延遲隊列
利用JDK自帶的DelayQueue
來實現(xiàn),這是一個無界阻塞隊列,該隊列只有在延遲期滿的時候才能從中獲取元素,放入DelayQueue
中,是必須實現(xiàn)Delayed接口
的。
優(yōu)點:實現(xiàn)簡單,效率高,任務(wù)觸發(fā)時間延遲低。
缺點:
- 服務(wù)器重啟后,數(shù)據(jù)全部消失,怕宕機
- 因為內(nèi)存條件限制的原因,比如下單未付款的訂單數(shù)太多,那么很容易就出現(xiàn)OOM異常
- 數(shù)據(jù)內(nèi)存態(tài),不可靠
3.時間輪算法
時間輪TimingWheel是一種高效、低延遲的調(diào)度數(shù)據(jù)結(jié)構(gòu),底層采用數(shù)組實現(xiàn)存儲任務(wù)列表的環(huán)形隊列,示意圖如下:時間輪
時間輪算法可以類比于時鐘,如上圖箭頭(指針)按某一個方向按固定頻率輪動,每一次跳動稱為一個 tick。這樣可以看出定時輪由個3個重要的屬數(shù),ticksPerWheel(一輪的tick數(shù)),tickDuration(一個tick的持續(xù)時間)以及 timeUnit(時間單位),例如當(dāng)ticksPerWheel=60,tickDuration=1,timeUnit=秒,這就和現(xiàn)實中的始終的秒針走動完全類似了。
如果當(dāng)前指針指在1上面,我有一個任務(wù)需要4秒以后執(zhí)行,那么這個執(zhí)行的線程回調(diào)或者消息將會被放在5上。那如果需要在20秒之后執(zhí)行怎么辦,由于這個環(huán)形結(jié)構(gòu)槽數(shù)只到8,如果要20秒,指針需要多轉(zhuǎn)2圈。位置是在2圈之后的5上面(20 % 8 + 1)
優(yōu)點:效率高,任務(wù)觸發(fā)時間延遲時間比delayQueue低
缺點:
- 服務(wù)器重啟后,數(shù)據(jù)全部消失,怕宕機
- 容易就出現(xiàn)OOM異常
- 數(shù)據(jù)內(nèi)存態(tài),不可靠
4.使用消息隊列
使用RabbitMQ死信隊列依賴于RabbitMQ的兩個特性:TTL和DLX。
TTL:Time To Live,消息存活時間,包括兩個維度:隊列消息存活時間和消息本身的存活時間。
DLX:Dead Letter Exchange,死信交換器。
優(yōu)點:異步交互可以削峰,高效,可以利用rabbitmq的分布式特性輕易的進行橫向擴展,消息支持持久化增加了可靠性。
缺點:
1.本身的易用度要依賴于rabbitMq的運維.因為要引用rabbitMq,所以復(fù)雜度和成本變高
2.RabbitMq是一個消息中間件;延遲隊列只是其中一個小功能,如果團隊技術(shù)棧中本來就是使用RabbitMq那還好,如果不是,那為了使用延遲隊列而去部署一套RabbitMq成本有點大;
5.Redis的Zset實現(xiàn)延時任務(wù)
為什么采用Redis的ZSet實現(xiàn)延遲任務(wù)?
zset數(shù)據(jù)類型的去重有序(分數(shù)排序)特點進行延遲。例如:時間戳作為score進行排序
5.1 思路分析
- 項目啟動時啟用
一條線程
,線程用于間隔一定時間去查詢redis的待執(zhí)行任務(wù)。其任務(wù)jobId為業(yè)務(wù)id,值為要執(zhí)行的時間。 - 查詢到執(zhí)行的任務(wù)時,將其從redis的信息中進行刪除。(
刪除成功才執(zhí)行延時任務(wù),否則不執(zhí)行,這樣可以避免分布式系統(tǒng)延時任務(wù)多次執(zhí)行
。) - 刪除redis中的記錄之后,執(zhí)行任務(wù)。將執(zhí)行jobId也就是業(yè)務(wù)id對應(yīng)的任務(wù)。
實際場景中,還會涉及延時任務(wù)修改,刪除等,這些場景可以指定標(biāo)記,修改標(biāo)識即可,當(dāng)然也可以在業(yè)務(wù)邏輯中做補充條件的判斷。
5.2 Redis中Zset的簡單介紹及使用
Redis 有序集合是 string 類型元素的集合,且不允許重復(fù)的成員。每個元素都會關(guān)聯(lián)一個 double 類型的分數(shù)。redis 正是通過分數(shù)來為集合中的成員進行從小到大的排序。有序集合的成員是唯一的,但分數(shù)(score)卻可以重復(fù)。
常用命令
- ZADD命令 : 將一個或多個成員元素及其分數(shù)值加入到有序集當(dāng)中,或者更新已存在成員的分數(shù)
- ZCARD命令 : 獲取有序集合的成員數(shù)
- ZRANGEBYSCORE: 通過分數(shù)返回有序集合指定區(qū)間內(nèi)的成員
- ZREM : 移除有序集合中的一個或多個成員
java中操作簡單介紹
1.add(K key, V value, double score) 添加元素到變量中同時指定元素的分值。 redisTemplate.opsForZSet().add("zSetValue","A",1); 2.rangeByScore(K key, double min, double max) 根據(jù)設(shè)置的score獲取區(qū)間值。 zSetValue = redisTemplate.opsForZSet().rangeByScore("zSetValue",1,2); 3.rangeByScore(K key, double min, double max,long offset, long count) 根據(jù)設(shè)置的score獲取區(qū)間值從給定下標(biāo)和給定長度獲取最終值。 zSetValue = redisTemplate.opsForZSet().rangeByScore("zSetValue",1,5,1,3); 4.rangeWithScores(K key, long start, long end) 獲取RedisZSetCommands.Tuples的區(qū)間值。 Set<ZSetOperations.TypedTuple<Object>> typedTupleSet = redisTemplate.opsForZSet().rangeWithScores("typedTupleSet",1,3); Iterator<ZSetOperations.TypedTuple<Object>> iterator = typedTupleSet.iterator(); while (iterator.hasNext()){ ZSetOperations.TypedTuple<Object> typedTuple = iterator.next(); Object value = typedTuple.getValue(); double score = typedTuple.getScore(); } 5.刪除成員 redisTemplate.opsForZSet().remove("myZset","a","b");
以下代碼可以直接使用-基于Spring Boot項目
5.3 延時隊列工廠
代碼中注釋有詳細介紹
/** * 延時隊列工廠 * **/ @Slf4j public abstract class AbstractDelayQueueMachineFactory { @Autowired private RedisUtil redisUtil; @Autowired private ThreadPoolTaskExecutor asyncTaskExecutor; /** * 插入任務(wù)id * * @param jobId 任務(wù)id(隊列內(nèi)唯一) * @param time 延時時間(單位 :毫秒) * @return 是否插入成功 */ public boolean addJob(String jobId, Integer time) { Calendar instance = Calendar.getInstance(); //增加延時時間,獲取最終觸發(fā)時間 instance.add(Calendar.MILLISECOND, time); long delayMillisecond = instance.getTimeInMillis(); log.info("延時隊列添加問題{}",jobId); return redisUtil.zAdd(setDelayQueueName(), delayMillisecond, jobId); } /** * 刪除任務(wù)id * * @param jobId 任務(wù)id(隊列內(nèi)唯一) */ public boolean removeJob(String jobId) { Long num = redisUtil.zRemove(setDelayQueueName(), jobId); if (num > 0) return true; return false; } /** * 延時隊列機器開始運作 */ private void startDelayQueueMachine() { log.info("延時隊列{}開始啟動", setDelayQueueName()); // 監(jiān)聽redis隊列 while (true) { try { // 獲取當(dāng)前時間前的任務(wù)列表 Set<ZSetOperations.TypedTuple<Object>> tuples = redisUtil.zRangeByScore(setDelayQueueName(), 0, System.currentTimeMillis() ); // 如果任務(wù)不為空 if (!CollectionUtils.isEmpty(tuples)) { log.info("延時任務(wù)開始執(zhí)行:{}", JSONUtil.toJsonStr(tuples)); Iterator<ZSetOperations.TypedTuple<Object>> iterator = tuples.iterator(); while (iterator.hasNext()){ ZSetOperations.TypedTuple<Object> typedTuple = iterator.next(); String questionId = Convert.toStr(typedTuple.getValue()); // 移除緩存,如果移除成功則表示當(dāng)前線程處理了延時任務(wù),則執(zhí)行延時任務(wù) // 刪除成功才執(zhí)行延時任務(wù),否則不執(zhí)行,這樣可以避免分布式系統(tǒng)延時任務(wù)多次執(zhí)行 Long num = redisUtil.zRemove(setDelayQueueName(), questionId); // 如果移除成功, 則執(zhí)行 if (num > 0) { asyncTaskExecutor.execute(() -> invoke(questionId)); } } } } catch (Exception e) { log.error("處理延時任務(wù)發(fā)生異常,異常原因為{}", e.getMessage(), e); } finally { // 間隔()分鐘執(zhí)行一次 //根據(jù)業(yè)務(wù)場景設(shè)置對應(yīng)時間 try { TimeUnit.MINUTES.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * 最終執(zhí)行的任務(wù)方法 * * @param jobId 任務(wù)id */ public abstract void invoke(String jobId); /** * 要實現(xiàn)延時隊列的名字 */ public abstract String setDelayQueueName(); //Spring Boot初始化時開啟一條線程運行 @PostConstruct public void init() { new Thread(this::startDelayQueueMachine).start(); } }
addJob方法是添加任務(wù)id和延時時間(單位毫秒)
redisUtil.zRangeByScore ::根據(jù)設(shè)置的score獲取區(qū)間值
@PostConstruct注解:是針對Bean的初始化完成之后做一些事情,比如注冊一些監(jiān)聽器..(初始化實現(xiàn)方案有很多可自行選擇)
為什么先刪除后執(zhí)行業(yè)務(wù)邏輯?
刪除成功才執(zhí)行延時任務(wù),否則不執(zhí)行,這樣可以避免分布式系統(tǒng)延時任務(wù)多次執(zhí)行
5.4 RedisUtil工具類
@Component @Slf4j public class RedisUtil { @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 向Zset里添加成員 * * @param key key值 * @param score 分數(shù),通常用于排序 * @param value 值 * @return 增加狀態(tài) */ public boolean zAdd(String key, long score, String value) { Boolean result = redisTemplate.opsForZSet().add(key, value, score); return result; } /** * 獲取 某key 下 某一分值區(qū)間的隊列 * * @param key 緩存key * @param from 開始時間 * @param to 結(jié)束時間 * @return 數(shù)據(jù) */ public Set<ZSetOperations.TypedTuple<Object>> zRangeByScore(String key, int from, long to) { Set<ZSetOperations.TypedTuple<Object>> set = redisTemplate.opsForZSet().rangeByScoreWithScores(key, from, to); return set; } /** * 移除 Zset隊列值 * * @param key key值 * @param value 刪除的集合 * @return 刪除數(shù)量 */ public Long zRemove(String key, String... value) { return redisTemplate.opsForZSet().remove(key, value); } }
5.5 測試延時隊列
繼承上文中的延時隊列工廠重寫invoke(處理業(yè)務(wù))
和setDelayQueueName--延時隊列名稱也就是Zset中的key值
/** * 測試延時隊列 * */ @Slf4j @Component public class DelayQueue extends AbstractDelayQueueMachineFactory { @Autowired private ZnjExpertConsultQuestionRecordMapper questionRecordMapper; /** * 處理業(yè)務(wù)邏輯 */ @Override public void invoke(String jobId) { Integer questionId = Convert.toInt(jobId); ZnjExpertConsultQuestionRecordEntity questionRecordEntity = questionRecordMapper.selectById(questionId); Boolean flag = znjExpertConsultService.whetherEnd(questionRecordEntity); /** * 延時隊列名統(tǒng)一設(shè)定 */ @Override public String setDelayQueueName() { return "expert_consult:delay_queue"; } }
運行成功,當(dāng)Redis中有任務(wù)時,則執(zhí)行任務(wù)即可
四、總結(jié)
使用redis zset來實現(xiàn)延時任務(wù),總體類說是可行的
- 實時性: 允許存在一定時間內(nèi)的誤差(可以通過時間設(shè)定)
- 高可用性:支持單機,支持集群
- 消息可靠性: 保證至少被消費一次
- 消息持久化: 基于Redis自身的持久化特性,上面的消息可靠性基于Redis的持久化,所以如果redis數(shù)據(jù)丟失,意味著延遲消息的丟失,不過可以做主備和集群保證
以上就是Redis 延時任務(wù)實現(xiàn)及與定時任務(wù)區(qū)別詳解的詳細內(nèi)容,更多關(guān)于Redis延時任務(wù)定時任務(wù)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Redis和springboot 整合redisUtil類的示例代碼
這篇文章主要介紹了Redis和springboot 整合redisUtil類的示例代碼,本文通過實例圖文相結(jié)合給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-12-12面試常問:如何保證Redis緩存和數(shù)據(jù)庫的數(shù)據(jù)一致性
在實際開發(fā)過程中,緩存的使用頻率是非常高的,只要使用緩存和數(shù)據(jù)庫存儲,就難免會出現(xiàn)雙寫時數(shù)據(jù)一致性的問題,那我們又該如何解決呢2021-09-09Redis 緩存實現(xiàn)存儲和讀取歷史搜索關(guān)鍵字的操作方法
這篇文章主要介紹了Redis 緩存實現(xiàn)存儲和讀取歷史搜索關(guān)鍵字,本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-12-12