SpringBoot使用Redis實(shí)現(xiàn)消息隊(duì)列的方法小結(jié)
使用 Redis 實(shí)現(xiàn)消息隊(duì)列的幾種方式
Redis 提供了多種方式來實(shí)現(xiàn)消息隊(duì)列。
Pub/Sub
訂閱發(fā)布模式,發(fā)布者把消息發(fā)布到某個(gè) Channel,該 Channel 的所有訂閱者都會(huì)收到消息。但是這種方式最大的問題是 「發(fā)布出去的消息,如果沒有被監(jiān)聽消費(fèi),或者消費(fèi)過程中宕機(jī),那么消息就會(huì)永遠(yuǎn)丟失」。適合用于臨時(shí)通知之類的場景,對于需要保證數(shù)據(jù)不丟失的場景不能使用這種方式。
List
List 是 Redis 提供的一種數(shù)據(jù)類型,底層是鏈表,可以用來實(shí)現(xiàn)隊(duì)列、棧。
Stream
Stream 是一個(gè)由 Redis 5 引入的,功能完善的消息隊(duì)列。想必也是 Redis 官方團(tuán)隊(duì)看到太多人拿 Redis 當(dāng)消息隊(duì)列使,于是干脆就在 Redis 上設(shè)計(jì)出一個(gè)類似于 Kafka 的消息隊(duì)列。
Steam 支持消費(fèi)組消費(fèi),一條消息只能被消費(fèi)組中的其中一個(gè)消費(fèi)者消費(fèi)。支持 「消息確認(rèn)」、支持 「回溯消費(fèi)」 還支持把未 ACK(確認(rèn))的消息轉(zhuǎn)移給其他消費(fèi)者進(jìn)行重新消費(fèi),在進(jìn)行轉(zhuǎn)移的時(shí)候還會(huì)累計(jì)消息的轉(zhuǎn)移次數(shù),當(dāng)次數(shù)達(dá)到一定閾值還沒消費(fèi)成功,就可以放入死信隊(duì)列。
這也是 Redis 種最復(fù)雜的一種數(shù)據(jù)類型。如果你真的到了需要使用 Redis Steam 作為消息隊(duì)列的地步,那不如直接使用 RabbitMQ 等更加成熟且穩(wěn)定的消息隊(duì)列系統(tǒng)。
使用 List 實(shí)現(xiàn)可靠的消息隊(duì)列
目前來說,這是用得最多的一種方式,適用于大多數(shù)簡單的消息隊(duì)列應(yīng)用場景。List 類型有很多指令,但是作為消息隊(duì)列來說用到的只有幾個(gè)個(gè):
「LPUSH key element [element ...]
」
把元素插入到 List 的首部,如果 List 不存在,會(huì)自動(dòng)創(chuàng)建。
「BRPOPLPUSH source destination timeout
」
移除并且返回 List (source)尾部的最后一個(gè)元素,并且同時(shí)會(huì)把這個(gè)元素插入到另一個(gè) List (destination)的首部。
當(dāng) source List 中沒有元素時(shí),Redis 會(huì)阻塞連接,直到有其他客戶端向其推送元素或超時(shí)。超時(shí)時(shí)間(秒)為 0 表示永遠(yuǎn)不超時(shí)。
注意,這個(gè)命令是 「原子性」 的,也就是說只要客戶端獲取到了返回的元素,那么這個(gè)元素一定就會(huì)在 destination List 有備份。這是實(shí)現(xiàn)可靠消息隊(duì)列的關(guān)鍵!
「RPOPLPUSH source destination
」
同上,它是 BRPOPLPUSH
命令的 「非阻塞」 版,如果 List 中沒有元素就會(huì)立即返回 null
。
「LREM key count element
」
從 List 中刪除元素,count 的值不同,刪除的方式也不同:
count > 0:從頭到尾開始搜索,刪除與 element 相等的元素,最多刪除 count 個(gè)。
count < 0:從尾到頭開始搜索,刪除與 element 相等的元素,最多刪除 count (絕對值)個(gè)。
count = 0:刪除所有與元素相等的元素。
「
BLMOVE
和LMOVE
命令」
LMOVE source destination <LEFT | RIGHT> <LEFT | RIGHT>
BLMOVE source destination <LEFT | RIGHT> <LEFT | RIGHT> timeout
從 Redis 6.2.0 開始,
BRPOPLPUSH
和RPOPLPUSH
命令就被聲明為廢棄了,取而代之的是語義更加明確的BLMOVE
和LMOVE
命令。?
BLMOVE
和LMOVE
可以通過參數(shù)指定元素出隊(duì)列(source)的方向,和入隊(duì)列(destination)的方向,除此以外并無其他區(qū)別。
實(shí)現(xiàn)思路
了解了上述幾個(gè)命令后,一個(gè)簡單易用且可靠的消息隊(duì)列就呼之欲出了。
生產(chǎn)者使用
LPUSH
命令往消息隊(duì)列生產(chǎn)消息消費(fèi)者使用
BRPOPLPUSH
命令從隊(duì)列消費(fèi)消息,并且還會(huì)在獲取并返回消息的時(shí)候把該消息推送到另一個(gè)消息隊(duì)列,也就是 Pending 隊(duì)列,這個(gè)隊(duì)列中存儲(chǔ)的就是未被消費(fèi)者 ACK 的消息消費(fèi)者成功消費(fèi)完畢后,使用
LREM
命令從 Pending 隊(duì)列中刪除這條消息,整個(gè)消費(fèi)過程結(jié)束如果消費(fèi)者在消費(fèi)過程中出現(xiàn)異常、宕機(jī),那么需要在恢復(fù)后從 Pending 隊(duì)列中獲取到這條消息,再進(jìn)行重新消費(fèi),從而保證了消息隊(duì)列的可靠性,不會(huì)丟失消息(可能存在重復(fù)消費(fèi),需要做好冪等處理)
在 Spring Boot 中實(shí)現(xiàn)
首先,創(chuàng)建 Spring Boot 項(xiàng)目,并整合 Redis。
創(chuàng)建一個(gè) OrderConsumer
Bean 模擬從隊(duì)列中消費(fèi)訂單 ID。
package cn.springdoc.demo.consumer; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; @Component public class OrderConsumer implements ApplicationRunner, Runnable { static final Logger log = LoggerFactory.getLogger(OrderConsumer.class); // 消息隊(duì)列 final String queue = "queue_orders"; // pending 隊(duì)列,即待確認(rèn)消息的隊(duì)列 final String pendingQueue = "pending_queue_orders"; @Autowired StringRedisTemplate stringRedisTemplate; @Override public void run(ApplicationArguments args) throws Exception { // 應(yīng)用啟動(dòng)后,創(chuàng)建新的線程來執(zhí)行消費(fèi)任務(wù) Thread thread = new Thread(this); thread.setName("order-consumer-thread"); thread.start(); } @Override public void run() { while (true) { try { // 1:消費(fèi)者,從隊(duì)列未彈出消息,并推送到 pending 隊(duì)列,整個(gè)過程是原子性的 // 最多阻塞 5 秒,超過 5 秒后還沒有消息,則返回 null String item = stringRedisTemplate.opsForList().rightPopAndLeftPush(queue, pendingQueue, 5, TimeUnit.SECONDS); if (item == null) { log.info("等待消息 ..."); continue ; } try { // 2:解析為 Long Long orderId = Long.parseLong(item); // 模擬消息消費(fèi) log.info("消費(fèi)消息: {}", orderId); } catch (Exception e) { log.error("消費(fèi)異常:{}", e.getMessage()); continue; } // 3:消費(fèi)成功,從 pending 隊(duì)列刪除記錄,相當(dāng)于確認(rèn)消費(fèi) stringRedisTemplate.opsForList().remove(pendingQueue, 0, item); } catch (Exception e) { log.error("隊(duì)列監(jiān)聽異常:{}", e.getMessage()); break; } } log.info("退出消費(fèi)"); } }
OrderConsumer
實(shí)現(xiàn)了 ApplicationRunner
接口,在應(yīng)用就緒后創(chuàng)建新的消費(fèi)線程進(jìn)行消費(fèi)。
stringRedisTemplate.opsForList().rightPopAndLeftPush
方法從 queue
隊(duì)列消費(fèi)一條消息,同時(shí)把消息添加到 pendingQueue
隊(duì)列。該方法底層調(diào)用的正是 brpoplpush
命令,最多阻塞 5 秒,超時(shí)后返回 null
。
得到消息后解析為 Long
類型,模擬消費(fèi),即輸出到日志。如果消費(fèi)成功,則調(diào)用 stringRedisTemplate.opsForList().remove
方法(底層正是 LREM
命令)從 pendingQueue
隊(duì)列中刪除消息。如果消費(fèi)失敗,失敗的消息會(huì)在 pendingQueue
隊(duì)列中繼續(xù)存在,不會(huì)丟失,可以重新投遞消費(fèi)或者是人工處理。
測試
啟動(dòng)應(yīng)用后,通過 Redis 客戶端往 queue_orders
隊(duì)列推送消息:
> lpush queue_orders 10000 "1" > lpush queue_orders 10010 "1" > lpush queue_orders 10011 "1" > lpush queue_orders Nan "1"
往 queue_orders
隊(duì)列推送了四條訂單的 ID。注意最后一條消息值是 Nan
,這會(huì)導(dǎo)致 Long.parseLong
異常從而導(dǎo)致消費(fèi)失敗。
服務(wù)端輸出日志如下:
[ main] cn.springdoc.demo.DemoApplication : Started DemoApplication in 3.769 seconds (process running for 4.18) [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 等待消息 ... [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 消費(fèi)消息: 10000 [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 等待消息 ... [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 消費(fèi)消息: 10010 [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 等待消息 ... [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 消費(fèi)消息: 10011 [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 消費(fèi)異常:For input string: "Nan" [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 等待消息 ...
符合預(yù)期,前面三條消息都成功消費(fèi),最后一條消息消費(fèi)失敗。按照設(shè)計(jì),這條消費(fèi)失敗的消息應(yīng)該在 Pending 隊(duì)列 pending_queue_orders
中存在。且應(yīng)該只有這一條消息,因?yàn)槠渌龡l消息都消費(fèi)成功。
查看 pending_queue_orders
隊(duì)列中的所有元素:
> lrange pending_queue_orders 0 -1 1) "Nan"
一切 OK,該隊(duì)列中只有 Nan
這條消息,正是消費(fèi)失敗的那條消息。
此時(shí),你如果想查看一下 Redis 中的所有 key,你會(huì)發(fā)現(xiàn)只有 pending_queue_orders
隊(duì)列存在:
> keys * 1) "pending_queue_orders"
queue_orders
隊(duì)列呢?這是 Redis List
的一個(gè)特性,當(dāng)從 List
中彈出最后一個(gè)元素后,Redis 就會(huì)刪除這個(gè) List
。queue_orders
中的元素都被彈出了,所以它被刪除了。當(dāng)再次嘗試往 queue_orders
中壓入消息時(shí),它會(huì)自動(dòng)創(chuàng)建。也就是說 「我們不需要手動(dòng)預(yù)先創(chuàng)建隊(duì)列, Redis 會(huì)自己創(chuàng)建,也會(huì)在合適的時(shí)間刪除,而這一切都是線程安全的」。
由于這是線程安全的,所以隊(duì)列中的 「一條消息只能被一個(gè)消費(fèi)者(客戶端)進(jìn)行消費(fèi)」,這非常適合在分布式或者是集群模式下使用,不必?fù)?dān)心同一條消息被多個(gè)消費(fèi)者消費(fèi)到。
注意,Pending 隊(duì)列中的消息可能存在重復(fù)消費(fèi)的可能。例如,消費(fèi)者成功消費(fèi)消息后,在調(diào)用 remove
方法從 Pending 隊(duì)列中刪除消息時(shí)失敗,那么 Pending 隊(duì)列中的這條刪除失敗的消息其實(shí)已經(jīng)是被成功消費(fèi)了的,需要在業(yè)務(wù)中考慮到!
使用 BLMOVE 和 LMOVE 命令
上文說過,從 Redis 6.2.0 開始 BRPOPLPUSH
和 RPOPLPUSH
命令就被聲明為廢棄了,后續(xù)版本中推薦使用 BLMOVE
和 LMOVE
命令。
目前 StringRedisTemplate
(Spring Boot 3.2.2)并未直接提供與 BLMOVE
和 LMOVE
命令對應(yīng)的 API 方法,但是可以獲取到底層連接對象來調(diào)用 BLMOVE
和 LMOVE
命令。
String item = this.stringRedisTemplate.execute(new RedisCallback<String>() { @Override public String doInRedis(RedisConnection connection) throws DataAccessException { // 調(diào)用 bLMove 命令 byte[] ret = connection.listCommands().bLMove(queue.getBytes(), pendingQueue.getBytes(), Direction.RIGHT, Direction.LEFT, 5); return ret == null ? null : new String(ret); } });
Redis 的持久化方式
Redis 是一個(gè)內(nèi)存數(shù)據(jù)庫,為了保證數(shù)據(jù)的安全不丟失,它提供了兩種數(shù)據(jù)備份(持久化)方式,即 「RDB」 和 「AOF」。
「RDB」:生成某一時(shí)刻的數(shù)據(jù)快照,通過子進(jìn)程進(jìn)行備份,數(shù)據(jù)可能不完整(取決于備份周期)。
「AOF」:通過記錄執(zhí)行的指令到文件來實(shí)現(xiàn)數(shù)據(jù)備份,相對完整性較高,但是會(huì)記錄每一條執(zhí)行命令,性能會(huì)有一定影響。
這就需要根據(jù)你的業(yè)務(wù)場景來選擇合適的持久化方式,也可以同時(shí)配合使用 「RDB」 和 「AOF」 兩種方式,兼顧性能和數(shù)據(jù)安全。
總結(jié)
本文介紹了如何在 Spring Boot 中使用 Redis List 的 BRPOPLPUSH
/BLMOVE
命令來實(shí)現(xiàn)一個(gè)線程安全且可靠的消息隊(duì)列。
以上就是SpringBoot使用Redis實(shí)現(xiàn)消息隊(duì)列的方法小結(jié)的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot Redis消息隊(duì)列的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Mybatis使用JSONObject接收數(shù)據(jù)庫查詢的方法
這篇文章主要介紹了Mybatis使用JSONObject接收數(shù)據(jù)庫查詢,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-12-12springboot項(xiàng)目中統(tǒng)一時(shí)間格式處理方法
JacksonConfiguration主要用于配置JSON的序列化和反序列化,而LocalDateTimeFormatter則用于處理請求和響應(yīng)中的LocalDateTime格式,這兩個(gè)配置項(xiàng)在SpringBoot項(xiàng)目中至關(guān)重要,確保數(shù)據(jù)格式的正確處理和傳輸2024-10-10Springboot3集成Knife4j的步驟以及使用(最完整版)
這篇文章主要介紹了Springboot3集成Knife4j的步驟以及使用的相關(guān)資料,Knife4j是一種增強(qiáng)Swagger的工具,支持黑色主題和更多配置選項(xiàng),它與swagger-bootstrap-ui相比,提供了更現(xiàn)代的外觀和更多的功能,需要的朋友可以參考下2024-11-11SSH框架網(wǎng)上商城項(xiàng)目第2戰(zhàn)之基本增刪查改、Service和Action的抽取
SSH框架網(wǎng)上商城項(xiàng)目第2戰(zhàn)之基本增刪查改、Service和Action的抽取,感興趣的小伙伴們可以參考一下2016-05-05SpringBoot實(shí)現(xiàn)過濾敏感詞的示例代碼
這篇文章主要為大家詳細(xì)介紹了如何利用SpringBoot實(shí)現(xiàn)過濾敏感詞功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以動(dòng)手嘗試一下2022-08-08spring中@autowired、@Qualifier、@Primary注解的使用說明
這篇文章主要介紹了spring中@autowired、@Qualifier、@Primary注解的使用,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11Java和C語言分別實(shí)現(xiàn)水仙花數(shù)及拓展代碼
這篇文章主要介紹了分別用Java和C語言實(shí)現(xiàn)水仙花數(shù),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-11-11