詳解Redis Stream做消息隊(duì)列
List
眾所周知redis數(shù)據(jù)結(jié)構(gòu)中的list的lpush與rpop可以用于常規(guī)消息隊(duì)列,從集合的最左端寫(xiě)入,最右端彈出消費(fèi)。并且支持多個(gè)生產(chǎn)者與多個(gè)消費(fèi)者并發(fā)拿數(shù)據(jù),數(shù)據(jù)只能由一個(gè)消費(fèi)者拿到。
但這個(gè)方案并不能保證消費(fèi)者消費(fèi)消息后是否成功處理的問(wèn)題(服務(wù)掛掉或處理異常等),機(jī)制屬于點(diǎn)對(duì)點(diǎn)模式不能做廣播模式(發(fā)布/訂閱模式)
Pub/sub
于是redis提供了相應(yīng)的發(fā)布訂閱功能,為了解除點(diǎn)對(duì)點(diǎn)的強(qiáng)綁定模式引入了Channel管道。
當(dāng)生產(chǎn)者向管道中發(fā)布消息,訂閱了該管道的消費(fèi)者能夠同時(shí)接收到該消息,而且為了簡(jiǎn)化訂閱多個(gè)管道需要顯式關(guān)注多個(gè)名稱提供了pattern能力。

通過(guò)名稱匹配如果接收消息的頻道wmyskxz.chat,consumer3也會(huì)收到消息。
但這個(gè)方案也有很大的詬病就是不會(huì)持久化,如果服務(wù)掛掉重啟數(shù)據(jù)就全丟棄了,也沒(méi)有提供ack機(jī)制,不保證數(shù)據(jù)可靠性,不管有沒(méi)有消費(fèi)成功發(fā)后既忘。
Stream
stream的話結(jié)構(gòu)很像kafka的設(shè)計(jì)思想,提供了consumer group和offset機(jī)制,結(jié)構(gòu)上感覺(jué)跟kafka的topic差不多,只是沒(méi)有對(duì)應(yīng)partation副本機(jī)制,而是一個(gè)追加消息的鏈表結(jié)構(gòu)??蛻舳苏{(diào)用XADD時(shí)候自動(dòng)創(chuàng)建stream。每個(gè)消息都會(huì)持久化并存在唯一的id標(biāo)識(shí)

Consumer Group
消費(fèi)者組的概念跟kafka的消費(fèi)者概念如出一轍,消費(fèi)者既可以用XREAD命令進(jìn)行獨(dú)立消費(fèi),也可以多個(gè)消費(fèi)者同時(shí)加入一個(gè)消費(fèi)者組。一條消息只能由一個(gè)消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)。這樣可以在分布式系統(tǒng)中保證消息的唯一性。
其實(shí)這個(gè)特性我后來(lái)仔細(xì)琢磨了一下當(dāng)時(shí)自認(rèn)為無(wú)懈可擊的流式圖表為了保證分布式系統(tǒng)消息唯一做了redis分布式鎖。有點(diǎn)雞肋,明明消費(fèi)者組已經(jīng)保證了數(shù)據(jù)的唯一性。只能說(shuō)加鎖可以壓縮資源成本
last_delivered_id
用于標(biāo)識(shí)消費(fèi)者組消費(fèi)在stream上消費(fèi)位置的游標(biāo),每個(gè)消費(fèi)者組都有一個(gè)stream內(nèi)唯一的名稱,消費(fèi)者組不會(huì)自動(dòng)創(chuàng)建,需要用XGROUP CREATE顯式創(chuàng)建。
pending_ids
每個(gè)消費(fèi)者內(nèi)部都有一個(gè)狀態(tài)變量。用來(lái)表示已經(jīng)被客戶端消費(fèi)但沒(méi)有ack的消費(fèi)。目的是為了保證客戶端至少消費(fèi)了消息一次(atleastonce)。如果消費(fèi)者收到了消息處理完了但是沒(méi)有回復(fù)ack,就會(huì)導(dǎo)致列表不斷增長(zhǎng),如果有很多消費(fèi)組的話,那么這個(gè)列表占用的內(nèi)存就會(huì)放大
curd
- xadd 追加消息
- xdel 刪除消息,這里的刪除僅僅是設(shè)置了標(biāo)志位,不影響消息總長(zhǎng)度
- xrange 獲取消息列表,會(huì)自動(dòng)過(guò)濾已經(jīng)刪除的消息
- xlen 消息長(zhǎng)度
- del 刪除Stream
pending_ids如何避免消息丟失
在客戶端消費(fèi)者讀取Stream消息時(shí),Redis服務(wù)器將消息回復(fù)給客戶端的過(guò)程中,客戶端突然斷開(kāi)了連接,消息就丟失了。
但是pending_ids里已經(jīng)保存了發(fā)出去的消息ID。待客戶端重新連上之后,可以再次收到pending_ids中的消息ID列表。
不過(guò)此時(shí)xreadgroup的起始消息必須是任意有效的消息ID,一般將參數(shù)設(shè)為0-0,表示讀取所有的pending_ids消息以及自last_delivered_id之后的新消息。
嵌入SpringBoot
redis stream雖然還是有一些弊端,但是相比較而言用kafka之類的消息組件太重,redis用作消息隊(duì)列已經(jīng)很合適了。
這里簡(jiǎn)單提一下思路,本質(zhì)上是提供一個(gè)管理消息的一個(gè)小功能,定義一個(gè)注解用于創(chuàng)建stream管道

創(chuàng)建一個(gè)注解類,標(biāo)注該注解的類必須繼承StreamListener<String, ObjectRecord<String, Object>>類且重寫(xiě)onMessage方法。方法上也加這個(gè)注解
創(chuàng)建一個(gè)config類實(shí)現(xiàn)BeanPostProcessor接口,重寫(xiě)bean聲明周期postProcessAfterInitialization和postProcessBeforeInitialization方法。該方法會(huì)在spring啟動(dòng)流程里的refresh方法加載bean的聲明周期中掃描到所有加了注解的bean。
通過(guò)線程池挨個(gè)創(chuàng)建stream的group組與stream的consumer監(jiān)聽(tīng)連接,config類記得繼承DisposableBean類在destroy方法里把連接關(guān)掉免得oom。
注冊(cè)redis stream api提供的consumer容器
這里一定注意pollTimeout參數(shù),看名字就知道默認(rèn)拉取數(shù)據(jù)時(shí)間間隔,這個(gè)參數(shù)如果寫(xiě)的值很小或者寫(xiě)0,你就看你cpu高不高就完了。
@Bean("listenerContainer")
@DependsOn(value = "redisConnectionFactory")
public StreamMessageListenerContainer<String, ObjectRecord<String, Object>> init() {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>>
options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.batchSize(10)
.serializer(new StringRedisSerializer())
.executor(new ForkJoinPool())
.pollTimeout(Duration.ofSeconds(3))
.targetType(Object.class)
.build();
return StreamMessageListenerContainer.create(redisConnectionFactory, options);
}創(chuàng)建消費(fèi)者
private Subscription createSubscription(RedisConnectionFactory factory, StreamListener streamListener, String streamKey, String group, String consumerName) {
StreamOperations<String, String, Object> streamOperations = this.stringRedisTemplate.opsForStream();
if (stringRedisTemplate.hasKey(streamKey)) {
StreamInfo.XInfoGroups groups = streamOperations.groups(streamKey);
AtomicReference<Boolean> groupHasKey = new AtomicReference<>(false);
groups.forEach(groupInfo -> {
if (Objects.equals(group, groupInfo.getRaw().get("name"))) {
groupHasKey.set(true);
}
});
if (groups.isEmpty() || !groupHasKey.get()) {
creatGroup(streamKey, group);
} else {
groups.stream().forEach(g -> {
log.info("XInfoGroups:{}", g);
StreamInfo.XInfoConsumers consumers = streamOperations.consumers(streamKey, g.groupName());
log.info("XInfoConsumers:{}", consumers);
});
}
} else {
creatGroup(streamKey, group);
}
StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
Consumer consumer = Consumer.from(group, consumerName);
Subscription subscription = listenerContainer.receive(consumer, streamOffset, streamListener);
listenerContainer.start();
this.containerList.add(listenerContainer);
return subscription;
}到此這篇關(guān)于詳解Redis Stream做消息隊(duì)列的文章就介紹到這了,更多相關(guān)Redis Stream內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Springboot3+Redis實(shí)現(xiàn)消息隊(duì)列的多種方法小結(jié)
- 一文詳解消息隊(duì)列中為什么不用redis作為隊(duì)列
- SpringBoot集成Redisson實(shí)現(xiàn)消息隊(duì)列的示例代碼
- redis?消息隊(duì)列完成秒殺過(guò)期訂單處理方法(一)
- 如何使用?redis?消息隊(duì)列完成秒殺過(guò)期訂單處理操作(二)
- Redis高階使用消息隊(duì)列分布式鎖排行榜等(高階用法)
- Redis消息隊(duì)列的三種實(shí)現(xiàn)方式
- Redis使用ZSET實(shí)現(xiàn)消息隊(duì)列的項(xiàng)目實(shí)踐
- Redis使用ZSET實(shí)現(xiàn)消息隊(duì)列使用小結(jié)
- python使用redis實(shí)現(xiàn)消息隊(duì)列(異步)的實(shí)現(xiàn)完整例程
- 基于Redis實(shí)現(xiàn)消息隊(duì)列的示例代碼
相關(guān)文章
Redis中實(shí)現(xiàn)查找某個(gè)值的范圍
這篇文章主要介紹了Redis中實(shí)現(xiàn)查找某個(gè)值的范圍,本文的題引來(lái)了Redis作者Salvatore Sanfilippo(@antirez)的回答,比較經(jīng)典,需要的朋友可以參考下2015-06-06
redis中事務(wù)機(jī)制及樂(lè)觀鎖的實(shí)現(xiàn)
這篇文章主要介紹了redis中事務(wù)機(jī)制及樂(lè)觀鎖的相關(guān)內(nèi)容,通過(guò)事務(wù)的執(zhí)行分析Redis樂(lè)觀鎖,具有一定參考價(jià)值,需要的朋友可以了解下。2017-10-10
Redis Template實(shí)現(xiàn)分布式鎖的實(shí)例代碼
使用Redis的SETNX命令獲取分布式鎖的步驟,接下來(lái)通過(guò)本文給大家介紹Redis Template實(shí)現(xiàn)分布式鎖的實(shí)例代碼,代碼簡(jiǎn)單易懂,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2018-09-09
阿里云服務(wù)器安裝配置redis的方法并且加入到開(kāi)機(jī)啟動(dòng)(推薦)
這篇文章主要介紹了阿里云服務(wù)器安裝配置redis并且加入到開(kāi)機(jī)啟動(dòng),需要的朋友可以參考下2017-12-12
使用Redis實(shí)現(xiàn)微信步數(shù)排行榜功能
這篇文章主要介紹了使用Redis實(shí)現(xiàn)微信步數(shù)排行榜功能,本文通過(guò)圖文實(shí)例代碼相結(jié)合給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-06-06

