亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

redis之基于SpringBoot實現Redis stream實時流事件處理方式

 更新時間:2023年06月27日 15:50:47   作者:RachelHwang  
這篇文章主要介紹了redis之基于SpringBoot實現Redis stream實時流事件處理方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

1、redis stream簡介

Redis Stream 是 Redis 5.0 版本新增加的數據結構。Redis Stream 主要用于消息隊列(MQ,Message Queue),Redis 本身是有一個 Redis 發(fā)布訂閱 (pub/sub) 來實現消息隊列的功能,但它有個缺點就是消息無法持久化,如果出現網絡斷開、Redis 宕機等,消息就會被丟棄。

簡單來說發(fā)布訂閱 (pub/sub) 可以分發(fā)消息,但無法記錄歷史消息。

Redis5.0中發(fā)布的Stream類型,也用來實現典型的消息隊列。

提供了消息的持久化和主備復制功能,可以讓任何客戶端訪問任何時刻的數據,并且能記住每一個客戶端的訪問位置,還能保證消息不丟失。

該Stream類型的出現,幾乎滿足了消息隊列具備的全部內容,包括但不限于:

  • 消息ID的序列化生成
  • 消息遍歷
  • 消息的阻塞和非阻塞讀取
  • 消息的分組消費
  • 未完成消息的處理
  • 消息隊列監(jiān)控

Redis Stream 的結構如下所示,它有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的 ID 和對應的內容:

在某些特定場景可以使用redis的stream代替kafka等消息隊列,減少系統(tǒng)復雜性,增強系統(tǒng)的穩(wěn)定性

在這里插入圖片描述

每個 Stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加消息時自動創(chuàng)建。

上圖解析:

  • Consumer Group :消費組,使用 XGROUP CREATE 命令創(chuàng)建,一個消費組有多個消費者(Consumer)。
  • last_delivered_id :游標,每個消費組會有個游標 last_delivered_id,任意一個消費者讀取了消息都會使游標 last_delivered_id 往前移動。
  • pending_ids :消費者(Consumer)的狀態(tài)變量,作用是維護消費者的未確認的 id。 pending_ids 記錄了當前已經被客戶端讀取的消息,但是還沒有 ack (Acknowledge character:確認字符)。

2、redis stream基礎命令

添加:XADD

命令格式:XADD stream_name id key-value [key-value …]

127.0.0.1:6379> XADD mytopic * acctid 012 age 1
1527837352024-0

查看隊列長度:xlen

命令格式:xlen xxx

127.0.0.1:6379> xlen mytopic
(integer) 1
127.0.0.1:6379> 

獲取數據:xrange xrevrange

1.xrange 命令格式:

xrange mytopic - +
xrange mytopic 生成的ID + count 2

2.xrevrange命令格式:

xrevrange mytopic + 1527837440632 count 3

該命令的意思為:反向查詢ID以無限大為開始,以1527837440632為結束的entry,但只取出查詢結果集(降序排列)中的前三個entry;

獲取數據:xread

1.非阻塞

從stream 中拿ID比0大的4個Entry,按升序排列

xread count 4 streams mytopic 0

2.阻塞

監(jiān)聽name為mystream的stream,從stream中拿比ID比"$"(特殊ID:該stream中此刻最大ID)還大的Entry

xread block 0 streams mystream $ 

block 0:block表示命令要阻塞,0表示阻塞時間為無限大,不超時,如果設置為>0的整數,即為阻塞超時時間

監(jiān)聽生效后,拿到數據監(jiān)聽就失效,與zk的watcher雷同。意思是該命令執(zhí)行后,只能拿到一條ID比設置ID更大的entry,要想繼續(xù)拿,必須執(zhí)行xread命令,官方推薦下一次拿entry使用上一次得到的ID。注意千萬別亂設置很大的ID ,否則你可能永遠拿不到entry。

xread block 0 streams mystream mytopic $ $

收到任何一個stream的消息,本次監(jiān)聽就失效,只能拿到一條數據,后面還需要拿數據,可以將各自stream拿到的ID作為最大ID,重新執(zhí)行命令

消費者組:Consumer groups

redis5引入了消費者組的概念,一個stream的數據每一個消費者組都發(fā)一份,消費者組里面的消費者競爭同一份數據,亦即在同一個消費者組內,一個消息是不可能發(fā)給多個消費者的:

在這里插入圖片描述

消費者組提供了如下5點保障

  • 組內消費者消費的消息不重復
  • 組內消費者名稱必須唯一
  • 消費者拿到的消息肯定是沒有被組內其他消費者消費過的消息
  • 消費者成功消費消息之后要求發(fā)送ACK,然后這條消息才會從消費者組中移除,也就是說消息至少被消費一次,和kafka一樣
  • 消費者組會跟蹤所有待處理的消息

基礎命令

1.創(chuàng)建消費者組

xgroup create mytopic mygroup $

該命令的意思是:使用xgroup命令創(chuàng)建了一個mygroup消費者組,該消費者組與mytopic stream進行了關聯,以后mygroup消費者組中的消費者就會mytopic stream中拿數據;

符號" $ "代表mytopic stream中目前最大的ID,消費者拿到的entry的id一定會大于此刻$代表的最大ID。你也可以指定這個最大的ID,比如0;

2.從消費者組讀數據

使用xreadgroup命令讓消費者consumer_a從mygroup消費者組的mytopic stream中拿最新的,并且沒有被發(fā)送給其他消費者處理的entry:

xreadgroup group mygroup consumer_a count 1 streams mytopic >

參數:

  • group:該參數是必選項
  • “>”:該符號只有在消費者組命令xreadgroup中有效,意思為mytopic stream中最新且沒有被其他消費者處理的ID,千萬記住不要與上面"$"最大ID搞混了,否則拿出來的數據與你的期望值不符,如果使用的是任何數組ID,那么該消費者就無法拿到任何新的消息,只是從它的已經處理過的消息中拿,并且不會有ACK機制;

如果想一個消費者組關聯多個stream可以這樣做:

xgroup create mystream mygroup $
xgroup create mytopic mygroup $
xreadgroup group mygroup consumer_a block 0 count 1 streams mytopic mystream > >

讀消息的參數多了一個block 0,就是說讀數據需要阻塞。

3.發(fā)送ACK

將指定ID對應的entry從consumer的已處理消息列表中刪除

XACK mystream mygroup 1527864992409-0

3、結合Spring Boot進行redis實時流處理

樣例應用:

在這里插入圖片描述

項目依賴:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

3.1 生產者

理想情況下,生產者和消費者將是兩個不同的微服務/應用程序。

在這里,我們把消費和生產都弄在同一個項目中。

但是,我們基于名為“ app.role ”的自定義屬性來控制應用程序的行為,使其像生產者或消費者。

基于該值,將在Spring中創(chuàng)建相應的組件。

@Service
@ConditionalOnProperty(name="app.role", havingValue="producer")
public class PurchaseEventProducer {
    private AtomicInteger atomicInteger = new AtomicInteger(0);
    @Value("${stream.key}")
    private String streamKey;
    @Autowired
    private ProductRepository repository;
    @Autowired
    private ReactiveRedisTemplate<String, String> redisTemplate;
    @Scheduled(fixedRateString= "${publish.rate}")
    public void publishEvent(){
        Product product = this.repository.getRandomProduct();
        ObjectRecord<String, Product> record = StreamRecords.newRecord()
                .ofObject(product)
                .withStreamKey(streamKey);
        this.redisTemplate
                .opsForStream()
                .add(record)
                .subscribe(System.out::println);
        atomicInteger.incrementAndGet();
    }
    @Scheduled(fixedRate = 10000)
    public void showPublishedEventsSoFar(){
        System.out.println(
                "Total Events :: " + atomicInteger.get()
        );
    }
}
  • publishEvent方法定期發(fā)布一些隨機購買的產品訂單。
  • showPublishedEventsSoFar方法僅顯示到目前為止已下的訂單數。

3.2 消費者

我們的發(fā)布者已經準備好。讓我們創(chuàng)建一個消費者。

要使用RedisStreams,我們需要實現StreamListener接口。

@Service
@ConditionalOnProperty(name="app.role", havingValue="consumer")
public class PurchaseEventConsumer implements StreamListener<String, ObjectRecord<String, Product>> {
    private AtomicInteger atomicInteger = new AtomicInteger(0);
    @Autowired
    private ReactiveRedisTemplate<String, String> redisTemplate;
    @Override
    @SneakyThrows
    public void onMessage(ObjectRecord<String, Product> record) {
        System.out.println(
                InetAddress.getLocalHost().getHostName() + " - consumed :" +
                record.getValue()
        );
        this.redisTemplate
                .opsForZSet()
                .incrementScore("revenue", record.getValue().getCategory().toString(), record.getValue().getPrice())
                .subscribe();
        atomicInteger.incrementAndGet();
    }
    @Scheduled(fixedRate = 10000)
    public void showPublishedEventsSoFar(){
        System.out.println(
                "Total Consumed :: " + atomicInteger.get()
        );
    }
}

在消費者端,我們只簡單地顯示消費記錄情況。

然后,我們獲得支付價格并將其添加到redis排序集中。

像發(fā)布者一樣,我們會定期顯示此使用者消費到的事件數。

3.3 Redis流配置

創(chuàng)建使用者后,我們需要通過將上述使用者添加到StreamMessageListenerContainer實例中來創(chuàng)建訂閱。

@Configuration
@ConditionalOnProperty(name="app.role", havingValue="consumer")
public class RedisStreamConfig {
    @Value("${stream.key}")
    private String streamKey;
    @Autowired
    private StreamListener<String, ObjectRecord<String, Product>> streamListener;
    @Bean
    public Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
        var options = StreamMessageListenerContainer
                            .StreamMessageListenerContainerOptions
                            .builder()
                            .pollTimeout(Duration.ofSeconds(1))
                            .targetType(Product.class)
                            .build();
        var listenerContainer = StreamMessageListenerContainer
                                    .create(redisConnectionFactory, options);
        var subscription = listenerContainer.receiveAutoAck(
                Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
                streamListener);
        listenerContainer.start();
        return subscription;
    }
}

總結

以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關文章

  • mybatis-plus插入一條數據,獲取插入數據自動生成的主鍵問題

    mybatis-plus插入一條數據,獲取插入數據自動生成的主鍵問題

    這篇文章主要介紹了mybatis-plus插入一條數據,獲取插入數據自動生成的主鍵問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • SpringBoot整合jasypt進行重要數據加密的操作代碼

    SpringBoot整合jasypt進行重要數據加密的操作代碼

    Jasypt(Java?Simplified?Encryption)是一個專注于簡化Java加密操作的開源工具,它提供了一種簡單而強大的方式來處理數據的加密和解密,使開發(fā)者能夠輕松地保護應用程序中的敏感信息,本文給大家介紹了SpringBoot整合jasypt進行重要數據加密,需要的朋友可以參考下
    2024-05-05
  • java中Hutool工具類的常見使用場景詳解

    java中Hutool工具類的常見使用場景詳解

    在日常開發(fā)中,我們會使用很多工具類來提升項目開發(fā)的速度,而國內用的比較多的 Hutool 框架,就是其中之一,本文我們就來介紹一下Hutool的具體使用吧
    2023-12-12
  • 在SpringBoot中配置日期格式化的方法詳解

    在SpringBoot中配置日期格式化的方法詳解

    通常情況下,發(fā)起一個 Http 請求,Spring Boot 會根據請求路徑映射到指定 Controller 上的某個方法的參數上,接著,Spring 會自動進行類型轉換,對于日期類型的參數,Spring 默認是沒有配置如何將字符串轉換成日期類型的,本文將給大家介紹在SpringBoot中配置日期格式化的方法
    2023-10-10
  • Spring Boot 集成 Kafka的詳細步驟

    Spring Boot 集成 Kafka的詳細步驟

    Spring Boot與Kafka的集成使得消息隊列的使用變得更加簡單和高效,可以配置 Kafka、實現生產者和消費者,并利用 Spring Boot 提供的功能處理消息流,以下是 Spring Boot 集成 Kafka 的詳細步驟,包括配置、生產者和消費者的實現以及一些高級特性,感興趣的朋友一起看看吧
    2024-07-07
  • JSON 格式的弊端與解決方法(真實示例)

    JSON 格式的弊端與解決方法(真實示例)

    JSON 格式是目前最流行的數據交互格式,廣泛應用于前后端分離的系統(tǒng)。但也有一些場合不適合使用 JSON 格式,這篇文章主要介紹了JSON 格式的弊端與解決方法,需要的朋友可以參考下
    2022-09-09
  • Springboot RestTemplate設置超時時間的方法(Spring boot 版本)

    Springboot RestTemplate設置超時時間的方法(Spring boot 

    這篇文章主要介紹了Springboot RestTemplate設置超時時間的方法,包括Spring boot 版本<=1.3和Spring boot 版本>=1.4,本文通過實例代碼給大家介紹的非常詳細,感興趣的朋友跟隨小編一起看看吧
    2024-08-08
  • 解決Maven無法下載2.1.7.js7版本的itext依賴問題

    解決Maven無法下載2.1.7.js7版本的itext依賴問題

    本文主要解決使用Maven編譯項目時出現的itext依賴版本問題,通過分析,發(fā)現該問題是由jasperreports依賴的特定版本itext導致的,解決方法是排除jasperreports中的itext依賴,并自行指定更高版本的itext依賴
    2024-12-12
  • MyBatis-Plus實現公共字段自動填充功能詳解

    MyBatis-Plus實現公共字段自動填充功能詳解

    在開發(fā)中經常遇到多個實體類有共同的屬性字段,這些字段屬于公共字段,也就是很多表中都有這些字段,能不能對于這些公共字段在某個地方統(tǒng)一處理,來簡化開發(fā)呢?MyBatis-Plus就提供了這一功能,本文就來為大家詳細講講
    2022-08-08
  • 淺談ArrayList和LinkedList到底誰更快

    淺談ArrayList和LinkedList到底誰更快

    今天給大家?guī)淼氖顷P于Java的相關知識,文章圍繞著ArrayList和LinkedList到底誰更快展開,文中有非常詳細的介紹,需要的朋友可以參考下
    2021-06-06

最新評論