Redis如何實(shí)現(xiàn)延遲隊(duì)列
Redis實(shí)現(xiàn)延遲隊(duì)列
Redis延遲隊(duì)列
Redis 是通過有序集合(ZSet)的方式來實(shí)現(xiàn)延遲消息隊(duì)列的,ZSet 有一個(gè) Score 屬性可以用來存儲(chǔ)延遲執(zhí)行的時(shí)間。
但需要無限循環(huán)檢查任務(wù),會(huì)消耗系統(tǒng)資源
class RedisDelayQueue(object): """Simple Queue with Redis Backend dq = RedisDelayQueue('delay:commtrans') dq.put( 5 ,{'info':'測試 5555','time': timestamp_to_datetime_str(t + 5)}) print(dq.get()) """ def __init__(self, name, namespace='queue'): """The default connection parameters are: host='localhost', port=6379, db=0""" self.__db = get_redis_engine(database_name='spdb') self.key = '%s:%s' % (namespace, name) def qsize(self): """Return the approximate size of the queue.""" return self.__db.zcard(self.key) def empty(self): """Return True if the queue is empty, False otherwise.""" return self.qsize() == 0 def rem(self, value): return self.__db.zrem(self.key, value) def get(self): # 獲取任務(wù),以0和當(dāng)前時(shí)間為區(qū)間,返回一條在當(dāng)前區(qū)間的記錄 items = self.__db.zrangebyscore(self.key, 0, int(time.time()), 0, 1) if items: item = items[0] if self.rem(item): # 解決并發(fā)問題 如能刪就讓誰取走 return json.loads(item) return None def put(self, interval, item): """:param interval 延時(shí)秒數(shù)""" # 以時(shí)間作為score,對任務(wù)隊(duì)列按時(shí)間戳從小到大排序 """Put item into the queue.""" d = json.dumps(item) return self.__db.zadd(self.key, {d: int(time.time()) + int(interval)})
Redis實(shí)現(xiàn)延時(shí)隊(duì)列的優(yōu)化方案
延時(shí)隊(duì)列的應(yīng)用
近期在開發(fā)部門的新項(xiàng)目,其中有個(gè)關(guān)鍵功能就是智能推送,即根據(jù)用戶行為在特定的時(shí)間點(diǎn)向用戶推送相應(yīng)的提醒消息,比如以下業(yè)務(wù)場景:
- 在用戶點(diǎn)擊充值項(xiàng)后,半小時(shí)內(nèi)未充值,向用戶推送充值未完成提醒。
- 在用戶最近一次閱讀行為2小時(shí)后,向用戶推送繼續(xù)閱讀提醒。
- 在用戶新注冊或退出應(yīng)用N分鐘后,向用戶推送合適的推薦消息。
- …
上述場景的共同特征就是在某事件觸發(fā)后延遲一定時(shí)間后再執(zhí)行特定任務(wù),若事件觸發(fā)時(shí)間點(diǎn)可知,則上述邏輯也可等價(jià)于在指定時(shí)間點(diǎn)(事件觸發(fā)時(shí)間點(diǎn)+延遲時(shí)間長度)執(zhí)行特定任務(wù)。
實(shí)現(xiàn)這類需求一般采用延時(shí)隊(duì)列,其中創(chuàng)建的延時(shí)消息中需要包含任務(wù)延遲時(shí)間或任務(wù)執(zhí)行時(shí)間點(diǎn)等信息,當(dāng)任務(wù)滿足時(shí)間條件需要執(zhí)行時(shí),該消息便會(huì)被消費(fèi),也就是說可以指定隊(duì)列中的消息在哪個(gè)時(shí)間點(diǎn)被消費(fèi)。
延時(shí)隊(duì)列的實(shí)現(xiàn)
在單機(jī)環(huán)境中,JDK
已經(jīng)自帶了很多能夠?qū)崿F(xiàn)延時(shí)隊(duì)列功能的組件,比如DelayQueue
, Timer
, ScheduledExecutorService
等組件,都可以較為簡便地創(chuàng)建延時(shí)任務(wù),但上述組件使用一般需要把任務(wù)存儲(chǔ)在內(nèi)存中,服務(wù)重啟存在任務(wù)丟失風(fēng)險(xiǎn),且任務(wù)規(guī)模體量受內(nèi)存限制,同時(shí)也造成長時(shí)間內(nèi)存占用,并不靈活,通常適用于單進(jìn)程客服端程序中或?qū)θ蝿?wù)要求不高的項(xiàng)目中。
在分布式環(huán)境下,僅使用JDK
自帶組件并不能可靠高效地實(shí)現(xiàn)延時(shí)隊(duì)列,通常需要引入第三方中間件或框架。
比如常見的經(jīng)典任務(wù)調(diào)度框架Quartz
或基于此框架的xxl-job
等其它框架,這些框架的主要功能是實(shí)現(xiàn)定時(shí)任務(wù)或周期性任務(wù),在Redis
、RabbitMQ
還未廣泛應(yīng)用時(shí),譬如常見的超時(shí)未支付取消訂單等功能都是由定時(shí)任務(wù)實(shí)現(xiàn)的,通過定時(shí)輪詢來判斷是否已到達(dá)觸發(fā)執(zhí)行的時(shí)間點(diǎn)。
但由于定時(shí)任務(wù)需要一定的周期性,周期掃描的間隔時(shí)間不好控制,太短會(huì)造成很多無意義的掃描,且增大系統(tǒng)壓力,太長又會(huì)造成執(zhí)行時(shí)間誤差太大,且可能造成單次掃描所處理的堆積記錄數(shù)量過大。
此外,利用MQ
做延時(shí)隊(duì)列也是一種常見的方式,比如通過RabbitMQ
的TTL
和死信隊(duì)列實(shí)現(xiàn)消息的延遲投遞,考慮到投遞出去的MQ
消息無法方便地實(shí)現(xiàn)刪除或修改,即無法實(shí)現(xiàn)任務(wù)的取消或任務(wù)執(zhí)行時(shí)間點(diǎn)的更改,同時(shí)也不能方便地對消息進(jìn)行去重,因此在項(xiàng)目中并未選擇使用MQ
實(shí)現(xiàn)延時(shí)隊(duì)列。
Redis
的數(shù)據(jù)結(jié)構(gòu)zset
,同樣可以實(shí)現(xiàn)延遲隊(duì)列的效果,且更加靈活,可以實(shí)現(xiàn)MQ
無法做到的一些特性,因此項(xiàng)目最終采用Redis
實(shí)現(xiàn)延時(shí)隊(duì)列,并對其進(jìn)行優(yōu)化與封裝。
實(shí)現(xiàn)原理是利用zset
的score
屬性,redis
會(huì)將zset
集合中的元素按照score
進(jìn)行從小到大排序,通過zadd
命令向zset
中添加元素,如下述命令所示,其中value
值為延時(shí)任務(wù)消息,可根據(jù)業(yè)務(wù)定義消息格式,score
值為任務(wù)執(zhí)行的時(shí)間點(diǎn),比如13位毫秒時(shí)間戳。
zadd delayqueue 1614608094000 taskinfo
任務(wù)添加后,獲取任務(wù)的邏輯只需從zset
中篩選score
值小于當(dāng)前時(shí)間戳的元素,所得結(jié)果便是當(dāng)前時(shí)間節(jié)點(diǎn)下需要執(zhí)行的任務(wù),通過zrangebyscore
命令來獲取,如下述命令所示,其中timestamp
為當(dāng)前時(shí)間戳,可用limit
限制每次拉取的記錄數(shù),防止單次獲取記錄數(shù)過大。
zrangebyscore delayqueue 0 timestamp limit 0 1000
在實(shí)際實(shí)現(xiàn)過程中,從zset
中獲取到當(dāng)前需要執(zhí)行的任務(wù)后,需要先確保將任務(wù)對應(yīng)的元素從zset
中刪除,刪除成功后才允許執(zhí)行任務(wù)邏輯,這樣是為了在分布式環(huán)境下,當(dāng)存在多個(gè)線程獲取到同一任務(wù)后,利用redis
刪除操作的原子性,確保只有一個(gè)線程能夠刪除成功并執(zhí)行任務(wù),防止重復(fù)執(zhí)行。
實(shí)際任務(wù)的執(zhí)行通常會(huì)再將其發(fā)送至MQ
異步處理,將“獲取任務(wù)”與“執(zhí)行任務(wù)”兩者分離解耦,更加靈活,“獲取任務(wù)”只負(fù)責(zé)拿到當(dāng)前時(shí)間需要執(zhí)行的任務(wù),并不真正運(yùn)行任務(wù)業(yè)務(wù)邏輯,因此只需相對少量的執(zhí)行線程即可,而實(shí)際的任務(wù)執(zhí)行邏輯則由MQ
消費(fèi)者承擔(dān),方便調(diào)控負(fù)載能力。
整體過程如下圖所示。
采用zset
做延時(shí)隊(duì)列的另一個(gè)好處是可以實(shí)現(xiàn)任務(wù)的取消和任務(wù)執(zhí)行時(shí)間點(diǎn)的更改,只需要將任務(wù)信息從zset
中刪除,便可取消任務(wù),同時(shí)由于zset
擁有集合去重的特性,只需再次寫入同一個(gè)任務(wù)信息,但是value
值設(shè)置為不同的執(zhí)行時(shí)間點(diǎn),便可更改任務(wù)執(zhí)行時(shí)間,實(shí)現(xiàn)單個(gè)任務(wù)執(zhí)行時(shí)間的動(dòng)態(tài)調(diào)整。
了解實(shí)現(xiàn)原理后,再進(jìn)行具體編程實(shí)現(xiàn)。創(chuàng)建延時(shí)任務(wù)較為簡便,準(zhǔn)備好任務(wù)消息和執(zhí)行時(shí)間點(diǎn),寫入zset
即可。獲取延時(shí)任務(wù)最簡單的方案是通過定時(shí)任務(wù),周期性地執(zhí)行上述邏輯,如下代碼所示。
@XxlScheduled(cron = "0/5 * * * * ?", name = "scan business1 delayqueue") public void scanBusiness1() { // 某業(yè)務(wù)邏輯的zset延遲隊(duì)列對應(yīng)的key String zsetKey = "delayqueue:business1"; while (true) { // 篩選score值小于當(dāng)前時(shí)間戳的元素,一次最多拉取1000條 Set<String> tasks = stringRedisTemplate.opsForZSet().rangeByScore(zsetKey, 0, System.currentTimeMillis(), 0, 1000); if (CollectionUtils.isEmpty(tasks)) { // 當(dāng)前時(shí)間下已沒有需要執(zhí)行的任務(wù),結(jié)束本次掃描 return; } for (String task : tasks) { // 先刪除,再執(zhí)行,確保多線程環(huán)境下執(zhí)行的唯一性 Boolean delete = stringRedisTemplate.delete(task); if (delete) { // 刪除成功后,將其再發(fā)送到指定MQ異步處理,將“獲取任務(wù)”與“執(zhí)行任務(wù)”分離解耦 rabbitTemplate.convertAndSend("exchange_business1", "routekey_business1", task); } } } }
上述方案使用xxl-job
做分布式定時(shí)任務(wù),間隔5秒執(zhí)行一次,代碼借助spring
提供的api
來完成redis
和MQ
的操作。
由于是分布式定時(shí)任務(wù),每次執(zhí)行只有一個(gè)線程在獲取任務(wù),機(jī)器利用率低,當(dāng)數(shù)據(jù)規(guī)模較大時(shí),單靠一個(gè)線程無法滿足吞吐量要求,因此這種方案只適用于小規(guī)模數(shù)據(jù)量級別。
此處間隔時(shí)間也可適當(dāng)調(diào)整,例如縮短為1秒,調(diào)整所需考慮原則在上文已提到:間隔太短會(huì)造成很多無意義的掃描,且增大系統(tǒng)壓力,太長又會(huì)造成執(zhí)行時(shí)間誤差太大。
為了提升整體吞吐量,考慮不使用分布式定時(shí)任務(wù),對集群內(nèi)每臺(tái)機(jī)器(或?qū)嵗┚O(shè)置獨(dú)立的定時(shí)任務(wù),同時(shí)采用多個(gè)zset
隊(duì)列,以數(shù)字后綴區(qū)分。
假設(shè)有M個(gè)zset
隊(duì)列,創(chuàng)建延時(shí)消息時(shí)選取消息的某個(gè)ID
字段,計(jì)算hash
值再對M取余,根據(jù)余數(shù)決定發(fā)送到對應(yīng)數(shù)字后綴的zset
隊(duì)列中(分散消息,此處ID
字段選取需要考慮做到均勻分布,不要造成數(shù)據(jù)傾斜)。
隊(duì)列數(shù)量M的選取需要考慮機(jī)器數(shù)量N,理想情況下有多少臺(tái)機(jī)器就定義多少個(gè)隊(duì)列,保持M與N基本相等即可。
因?yàn)殛?duì)列太少,會(huì)造成機(jī)器對隊(duì)列的競爭訪問處理,隊(duì)列太多又會(huì)導(dǎo)致任務(wù)得不到及時(shí)的處理。
最佳實(shí)踐是隊(duì)列數(shù)量可動(dòng)態(tài)配置,如采用分布式配置中心,這樣當(dāng)集群機(jī)器數(shù)量變化時(shí),可以相應(yīng)調(diào)整隊(duì)列數(shù)量。
每臺(tái)機(jī)器在觸發(fā)定時(shí)任務(wù)時(shí),需要通過適當(dāng)?shù)呢?fù)載均衡來決定從哪個(gè)隊(duì)列拉取消息,負(fù)載均衡的好壞也會(huì)影響整個(gè)集群的效率,如果負(fù)載分布不均可能會(huì)導(dǎo)致多臺(tái)機(jī)器競爭處理同一隊(duì)列,降低效率。
一個(gè)簡單實(shí)用的做法是利用redis
的自增操作再對隊(duì)列數(shù)量取余即可,只要保持隊(duì)列數(shù)量和機(jī)器數(shù)量基本相等,這種做法在很大程度上就可以保證不會(huì)有多臺(tái)機(jī)器競爭同一隊(duì)列。
至于每臺(tái)機(jī)器從對應(yīng)zset
中的任務(wù)獲取邏輯,仍然和前面代碼一致。以上方式簡化實(shí)現(xiàn)代碼如下所示。
@Scheduled(cron = "0/5 * * * * ?") public void scanBusiness1() { // 隊(duì)列數(shù)量M,考慮動(dòng)態(tài)配置,保持和機(jī)器數(shù)量基本一致 int M = 10; // redis自增key,用于負(fù)載均衡 String incrKey = "incrkey:delayqueue:business1"; // 每臺(tái)機(jī)器執(zhí)行時(shí),從不同的zset中拉取消息,盡量確保不同機(jī)器訪問不同zset String zsetKey = "delayqueue:business1:" + (stringRedisTemplate.opsForValue().increment(incrKey) % M); while (true) { // 此處邏輯和前面代碼一致,省略。。。 } }
上述方案和第一種方案的主要的不同點(diǎn)在于zsetKey
的獲取上,這里是根據(jù)負(fù)載均衡算法算出來的,確保每臺(tái)機(jī)器訪問不同zset
并拉取消息,同時(shí)定時(shí)任務(wù)采用spring
提供的進(jìn)程內(nèi)注解@Scheduled
,集群內(nèi)每臺(tái)機(jī)器都會(huì)間隔5秒執(zhí)行,因此相比之前的方案,能夠較為明顯地提升整個(gè)集群的吞吐量。
但是這種方案的步驟相對更為復(fù)雜,需要?jiǎng)討B(tài)配置隊(duì)列數(shù)量,同時(shí)在創(chuàng)建延時(shí)任務(wù)時(shí)需要選擇合適的消息ID
字段來決定發(fā)送的目標(biāo)zset
隊(duì)列,此處還要考慮均勻分布,整體實(shí)現(xiàn)要考慮的因素較多。
上面一種方案已經(jīng)能夠較好地滿足整體吞吐量要求,但其缺點(diǎn)是步驟相對復(fù)雜,因此項(xiàng)目中沒有采用這種方案,而是采用下面一種也能滿足吞吐量要求,步驟相對簡單,又方便通用化的方案。
該方案不使用定時(shí)任務(wù),而是單獨(dú)啟動(dòng)后臺(tái)線程,在線程中執(zhí)行永久循環(huán),每次循環(huán)邏輯為:從目標(biāo)zset
中獲取score
值小于當(dāng)前時(shí)間戳的元素集合中的score
最小的那個(gè)元素,相當(dāng)于獲取當(dāng)前時(shí)間點(diǎn)需要執(zhí)行且執(zhí)行時(shí)間點(diǎn)最早的那個(gè)任務(wù),如果獲取不到,表示當(dāng)前時(shí)間點(diǎn)下暫無需要執(zhí)行的任務(wù),則線程休眠100ms
(可視情況調(diào)整),否則,對獲取到的元素進(jìn)行處理,在分布式多線程環(huán)境下,仍然需要先刪除成功才能進(jìn)行處理。
此外,考慮到每個(gè)線程獲取元素后都需要再次訪問redis
嘗試刪除操作,為了避免多線程爭搶浪費(fèi)資源,降低效率,這里采用lua
腳本將獲取和刪除操作原子化。lua
腳本邏輯代碼如下所示。
local zsetKey = 'delayqueue' local timestamp = 1614608094000 local items = redis.call('zrangebyscore',zsetKey,0,timestamp,'limit',0,1) if #items == 0 then return '' else redis.call('zremrangebyrank',zsetKey,0,0) return items[1] end
其中timestamp
為當(dāng)前時(shí)間戳,通過在zrangebyscore
命令中指定limit
為1來獲取score
最小的元素,若獲取不到,即結(jié)果集長度為0,則返回空字符串,否則,通過zremrangebyrank
命令刪除頭部元素,即score
最小的元素,也就是之前獲取到的那個(gè)元素,由于redis
內(nèi)部保證lua
腳本的原子性,上述獲取并刪除的操作能夠運(yùn)行無誤。具體JAVA
實(shí)現(xiàn)中還對其進(jìn)行了多線程操作的封裝和通用化的抽象,使不同業(yè)務(wù)都能夠使用該組件實(shí)現(xiàn)延時(shí)隊(duì)列。具體實(shí)現(xiàn)代碼如下所示。
/** * 基于ZSET實(shí)現(xiàn)消息延遲處理,score存儲(chǔ)執(zhí)行時(shí)間點(diǎn),到達(dá)時(shí)間點(diǎn)即會(huì)向指定隊(duì)列發(fā)送該消息; * 定義一個(gè)繼承本類的bean即可; */ public abstract class AbstractDelayedMsgScanTrigger implements Runnable, DisposableBean { private static final RedisScript<String> TRY_GET_AND_DEL_SCRIPT; static { // 獲取并刪除的lua腳本,使用spring提供的api String sb = "local items = redis.call('zrangebyscore',KEYS[1],0,ARGV[1],'limit',0,1)\n" + "if #items == 0 then\n" + "\treturn ''\n" + "else\n" + "\tredis.call('zremrangebyrank',KEYS[1],0,0)\n" + "\treturn items[1]\n" + "end"; // 自有工具類,只要能創(chuàng)建出spring包下的 RedisScript 的實(shí)現(xiàn)類對象均可 TRY_GET_AND_DEL_SCRIPT = RedisScriptHelper.createScript(sb, String.class); } private final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(getThreadNum(), getThreadNum(), 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(getThreadNamePrefix())); private volatile boolean quit = false; @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void startScan() { // bean構(gòu)建完成后,啟動(dòng)若干執(zhí)行線程 int threadNum = getThreadNum(); for (int i = 0; i < threadNum; i++) { EXECUTOR.execute(this); } } @Override public void run() { while (!quit) { try { // 循環(huán),采用lua獲取當(dāng)前需要執(zhí)行的任務(wù)并將其從redis中刪除 String msg = stringRedisTemplate.execute(TRY_GET_AND_DEL_SCRIPT, Lists.newArrayList(getDelayedMsgSourceKey()), String.valueOf(System.currentTimeMillis())); if (StringUtils.isNotBlank(msg)) { // 消息不為空,表示獲取任務(wù)成功,將其再發(fā)送到指定MQ異步處理,將“獲取任務(wù)”與“執(zhí)行任務(wù)”分離解耦 rabbitTemplate.convertAndSend(getSendExchange(), getSendRoutingKey(), msg); } else { // 獲取不到任務(wù),表示當(dāng)前時(shí)間點(diǎn)下暫無需要執(zhí)行的任務(wù),則線程休眠1S(可視情況調(diào)整) SleepUtils.sleepSeconds(1); } } catch (Exception e) { Logs.MSG.error("delayed msg scan error, sourceKey:{}", getDelayedMsgSourceKey(), e); } } } @Override public void destroy() throws Exception { quit = true; } public void setQuit(boolean quit) { this.quit = quit; } /** * 獲取消息的工作線程數(shù)量 */ protected abstract int getThreadNum(); /** * 線程名稱前綴,方便問題定位 */ protected abstract String getThreadNamePrefix(); /** * 存放延遲消息的ZSET隊(duì)列名 */ protected abstract String getDelayedMsgSourceKey(); /** * 消息到達(dá)執(zhí)行時(shí)間點(diǎn)時(shí)將其通過指定 exchange 發(fā)送到實(shí)時(shí)消費(fèi)隊(duì)列中 */ protected abstract String getSendExchange(); /** * 消息到達(dá)執(zhí)行時(shí)間點(diǎn)時(shí)將其通過指定 routingKey 發(fā)送到實(shí)時(shí)消費(fèi)隊(duì)列中 */ protected abstract String getSendRoutingKey(); }
在具體業(yè)務(wù)應(yīng)用中,只需定義一個(gè)繼承上述類的bean
即可,需要實(shí)現(xiàn)的方法主要是提供一些配置,比如該業(yè)務(wù)對應(yīng)的zset
延時(shí)隊(duì)列名稱,同時(shí)工作拉取消息的線程數(shù)量,由于采用rabbitMq
,因此這里需要提供exchange
和routingKey
。
實(shí)際使用中只需向該zset
隊(duì)列中添加消息,并將score
設(shè)為該任務(wù)需要執(zhí)行的時(shí)間點(diǎn)(此處為13位毫秒時(shí)間戳),則到該時(shí)間點(diǎn)后,上述組件便會(huì)將該消息從zset
中取出并刪除,再將其通過指定的路由發(fā)送到實(shí)時(shí)MQ
消費(fèi)隊(duì)列中,由消費(fèi)者負(fù)責(zé)執(zhí)行任務(wù)業(yè)務(wù)邏輯。目前該組件在項(xiàng)目中正常平穩(wěn)運(yùn)行。
注意:
本文結(jié)合項(xiàng)目中的實(shí)際需求介紹了延時(shí)隊(duì)列的應(yīng)用場景,分析了延時(shí)隊(duì)列的多種實(shí)現(xiàn),重點(diǎn)講述了利用redis
實(shí)現(xiàn)延時(shí)隊(duì)列的原理,對其實(shí)現(xiàn)方案進(jìn)行比較與優(yōu)化,并將最終方案實(shí)際運(yùn)用于項(xiàng)目需求中。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Redis基于Bitmap實(shí)現(xiàn)用戶簽到功能
很多應(yīng)用上都有用戶簽到的功能,尤其是配合積分系統(tǒng)一起使用。本文主要介紹了Redis基于Bitmap實(shí)現(xiàn)用戶簽到功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-06-06Redis中的配置文件,數(shù)據(jù)持久化,事務(wù)
這篇文章主要介紹了Redis中的配置文件,數(shù)據(jù)持久化,事務(wù)問題,具有很好的參考價(jià)值,希望對大家有所幫助。2022-12-12