redis之基于SpringBoot實現Redis stream實時流事件處理方式
1、redis stream簡介
Redis Stream 是 Redis 5.0 版本新增加的數據結構。Redis Stream 主要用于消息隊列(MQ,Message Queue),Redis 本身是有一個 Redis 發(fā)布訂閱 (pub/sub) 來實現消息隊列的功能,但它有個缺點就是消息無法持久化,如果出現網絡斷開、Redis 宕機等,消息就會被丟棄。
簡單來說發(fā)布訂閱 (pub/sub) 可以分發(fā)消息,但無法記錄歷史消息。
Redis5.0中發(fā)布的Stream類型,也用來實現典型的消息隊列。
提供了消息的持久化和主備復制功能,可以讓任何客戶端訪問任何時刻的數據,并且能記住每一個客戶端的訪問位置,還能保證消息不丟失。
該Stream類型的出現,幾乎滿足了消息隊列具備的全部內容,包括但不限于:
- 消息ID的序列化生成
- 消息遍歷
- 消息的阻塞和非阻塞讀取
- 消息的分組消費
- 未完成消息的處理
- 消息隊列監(jiān)控
Redis Stream 的結構如下所示,它有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的 ID 和對應的內容:
在某些特定場景可以使用redis的stream代替kafka等消息隊列,減少系統(tǒng)復雜性,增強系統(tǒng)的穩(wěn)定性
每個 Stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加消息時自動創(chuàng)建。
上圖解析:
- Consumer Group :消費組,使用 XGROUP CREATE 命令創(chuàng)建,一個消費組有多個消費者(Consumer)。
- last_delivered_id :游標,每個消費組會有個游標 last_delivered_id,任意一個消費者讀取了消息都會使游標 last_delivered_id 往前移動。
- pending_ids :消費者(Consumer)的狀態(tài)變量,作用是維護消費者的未確認的 id。 pending_ids 記錄了當前已經被客戶端讀取的消息,但是還沒有 ack (Acknowledge character:確認字符)。
2、redis stream基礎命令
添加:XADD
命令格式:XADD stream_name id key-value [key-value …]
127.0.0.1:6379> XADD mytopic * acctid 012 age 1 1527837352024-0
查看隊列長度:xlen
命令格式:xlen xxx
127.0.0.1:6379> xlen mytopic (integer) 1 127.0.0.1:6379>
獲取數據:xrange xrevrange
1.xrange 命令格式:
xrange mytopic - + xrange mytopic 生成的ID + count 2
2.xrevrange命令格式:
xrevrange mytopic + 1527837440632 count 3
該命令的意思為:反向查詢ID以無限大為開始,以1527837440632為結束的entry,但只取出查詢結果集(降序排列)中的前三個entry;
獲取數據:xread
1.非阻塞
從stream 中拿ID比0大的4個Entry,按升序排列
xread count 4 streams mytopic 0
2.阻塞
監(jiān)聽name為mystream的stream,從stream中拿比ID比"$"(特殊ID:該stream中此刻最大ID)還大的Entry
xread block 0 streams mystream $
block 0:block表示命令要阻塞,0表示阻塞時間為無限大,不超時,如果設置為>0的整數,即為阻塞超時時間
監(jiān)聽生效后,拿到數據監(jiān)聽就失效,與zk的watcher雷同。意思是該命令執(zhí)行后,只能拿到一條ID比設置ID更大的entry,要想繼續(xù)拿,必須執(zhí)行xread命令,官方推薦下一次拿entry使用上一次得到的ID。注意千萬別亂設置很大的ID ,否則你可能永遠拿不到entry。
xread block 0 streams mystream mytopic $ $
收到任何一個stream的消息,本次監(jiān)聽就失效,只能拿到一條數據,后面還需要拿數據,可以將各自stream拿到的ID作為最大ID,重新執(zhí)行命令
消費者組:Consumer groups
redis5引入了消費者組的概念,一個stream的數據每一個消費者組都發(fā)一份,消費者組里面的消費者競爭同一份數據,亦即在同一個消費者組內,一個消息是不可能發(fā)給多個消費者的:
消費者組提供了如下5點保障
- 組內消費者消費的消息不重復
- 組內消費者名稱必須唯一
- 消費者拿到的消息肯定是沒有被組內其他消費者消費過的消息
- 消費者成功消費消息之后要求發(fā)送ACK,然后這條消息才會從消費者組中移除,也就是說消息至少被消費一次,和kafka一樣
- 消費者組會跟蹤所有待處理的消息
基礎命令
1.創(chuàng)建消費者組
xgroup create mytopic mygroup $
該命令的意思是:使用xgroup命令創(chuàng)建了一個mygroup消費者組,該消費者組與mytopic stream進行了關聯,以后mygroup消費者組中的消費者就會mytopic stream中拿數據;
符號" $ "代表mytopic stream中目前最大的ID,消費者拿到的entry的id一定會大于此刻$代表的最大ID。你也可以指定這個最大的ID,比如0;
2.從消費者組讀數據
使用xreadgroup命令讓消費者consumer_a從mygroup消費者組的mytopic stream中拿最新的,并且沒有被發(fā)送給其他消費者處理的entry:
xreadgroup group mygroup consumer_a count 1 streams mytopic >
參數:
- group:該參數是必選項
- “>”:該符號只有在消費者組命令xreadgroup中有效,意思為mytopic stream中最新且沒有被其他消費者處理的ID,千萬記住不要與上面"$"最大ID搞混了,否則拿出來的數據與你的期望值不符,如果使用的是任何數組ID,那么該消費者就無法拿到任何新的消息,只是從它的已經處理過的消息中拿,并且不會有ACK機制;
如果想一個消費者組關聯多個stream可以這樣做:
xgroup create mystream mygroup $ xgroup create mytopic mygroup $ xreadgroup group mygroup consumer_a block 0 count 1 streams mytopic mystream > >
讀消息的參數多了一個block 0,就是說讀數據需要阻塞。
3.發(fā)送ACK
將指定ID對應的entry從consumer的已處理消息列表中刪除
XACK mystream mygroup 1527864992409-0
3、結合Spring Boot進行redis實時流處理
樣例應用:
項目依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency>
3.1 生產者
理想情況下,生產者和消費者將是兩個不同的微服務/應用程序。
在這里,我們把消費和生產都弄在同一個項目中。
但是,我們基于名為“ app.role ”的自定義屬性來控制應用程序的行為,使其像生產者或消費者。
基于該值,將在Spring中創(chuàng)建相應的組件。
@Service @ConditionalOnProperty(name="app.role", havingValue="producer") public class PurchaseEventProducer { private AtomicInteger atomicInteger = new AtomicInteger(0); @Value("${stream.key}") private String streamKey; @Autowired private ProductRepository repository; @Autowired private ReactiveRedisTemplate<String, String> redisTemplate; @Scheduled(fixedRateString= "${publish.rate}") public void publishEvent(){ Product product = this.repository.getRandomProduct(); ObjectRecord<String, Product> record = StreamRecords.newRecord() .ofObject(product) .withStreamKey(streamKey); this.redisTemplate .opsForStream() .add(record) .subscribe(System.out::println); atomicInteger.incrementAndGet(); } @Scheduled(fixedRate = 10000) public void showPublishedEventsSoFar(){ System.out.println( "Total Events :: " + atomicInteger.get() ); } }
- publishEvent方法定期發(fā)布一些隨機購買的產品訂單。
- showPublishedEventsSoFar方法僅顯示到目前為止已下的訂單數。
3.2 消費者
我們的發(fā)布者已經準備好。讓我們創(chuàng)建一個消費者。
要使用RedisStreams,我們需要實現StreamListener接口。
@Service @ConditionalOnProperty(name="app.role", havingValue="consumer") public class PurchaseEventConsumer implements StreamListener<String, ObjectRecord<String, Product>> { private AtomicInteger atomicInteger = new AtomicInteger(0); @Autowired private ReactiveRedisTemplate<String, String> redisTemplate; @Override @SneakyThrows public void onMessage(ObjectRecord<String, Product> record) { System.out.println( InetAddress.getLocalHost().getHostName() + " - consumed :" + record.getValue() ); this.redisTemplate .opsForZSet() .incrementScore("revenue", record.getValue().getCategory().toString(), record.getValue().getPrice()) .subscribe(); atomicInteger.incrementAndGet(); } @Scheduled(fixedRate = 10000) public void showPublishedEventsSoFar(){ System.out.println( "Total Consumed :: " + atomicInteger.get() ); } }
在消費者端,我們只簡單地顯示消費記錄情況。
然后,我們獲得支付價格并將其添加到redis排序集中。
像發(fā)布者一樣,我們會定期顯示此使用者消費到的事件數。
3.3 Redis流配置
創(chuàng)建使用者后,我們需要通過將上述使用者添加到StreamMessageListenerContainer實例中來創(chuàng)建訂閱。
@Configuration @ConditionalOnProperty(name="app.role", havingValue="consumer") public class RedisStreamConfig { @Value("${stream.key}") private String streamKey; @Autowired private StreamListener<String, ObjectRecord<String, Product>> streamListener; @Bean public Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException { var options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .targetType(Product.class) .build(); var listenerContainer = StreamMessageListenerContainer .create(redisConnectionFactory, options); var subscription = listenerContainer.receiveAutoAck( Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()), StreamOffset.create(streamKey, ReadOffset.lastConsumed()), streamListener); listenerContainer.start(); return subscription; } }
總結
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
mybatis-plus插入一條數據,獲取插入數據自動生成的主鍵問題
這篇文章主要介紹了mybatis-plus插入一條數據,獲取插入數據自動生成的主鍵問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12SpringBoot整合jasypt進行重要數據加密的操作代碼
Jasypt(Java?Simplified?Encryption)是一個專注于簡化Java加密操作的開源工具,它提供了一種簡單而強大的方式來處理數據的加密和解密,使開發(fā)者能夠輕松地保護應用程序中的敏感信息,本文給大家介紹了SpringBoot整合jasypt進行重要數據加密,需要的朋友可以參考下2024-05-05Springboot RestTemplate設置超時時間的方法(Spring boot
這篇文章主要介紹了Springboot RestTemplate設置超時時間的方法,包括Spring boot 版本<=1.3和Spring boot 版本>=1.4,本文通過實例代碼給大家介紹的非常詳細,感興趣的朋友跟隨小編一起看看吧2024-08-08解決Maven無法下載2.1.7.js7版本的itext依賴問題
本文主要解決使用Maven編譯項目時出現的itext依賴版本問題,通過分析,發(fā)現該問題是由jasperreports依賴的特定版本itext導致的,解決方法是排除jasperreports中的itext依賴,并自行指定更高版本的itext依賴2024-12-12