亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

SpringBoot使用Redis實(shí)現(xiàn)消息隊(duì)列的方法小結(jié)

 更新時(shí)間:2024年04月12日 11:53:02   作者:springdoc.cn  
在應(yīng)用中把Redis當(dāng)成消息隊(duì)列來使用已經(jīng)屢見不鮮了,我想主要原因是當(dāng)代應(yīng)用十有八九都會(huì)用到 Redis,因此不用再引入其他消息隊(duì)列系統(tǒng),而且Redis提供了好幾種實(shí)現(xiàn)消息隊(duì)列的方法,用起來也簡單,本文給大家介紹了SpringBoot使用Redis實(shí)現(xiàn)消息隊(duì)列的方法小結(jié)

使用 Redis 實(shí)現(xiàn)消息隊(duì)列的幾種方式

Redis 提供了多種方式來實(shí)現(xiàn)消息隊(duì)列。

Pub/Sub

訂閱發(fā)布模式,發(fā)布者把消息發(fā)布到某個(gè) Channel,該 Channel 的所有訂閱者都會(huì)收到消息。但是這種方式最大的問題是 「發(fā)布出去的消息,如果沒有被監(jiān)聽消費(fèi),或者消費(fèi)過程中宕機(jī),那么消息就會(huì)永遠(yuǎn)丟失」。適合用于臨時(shí)通知之類的場景,對于需要保證數(shù)據(jù)不丟失的場景不能使用這種方式。

List

List 是 Redis 提供的一種數(shù)據(jù)類型,底層是鏈表,可以用來實(shí)現(xiàn)隊(duì)列、棧。

Stream

Stream 是一個(gè)由 Redis 5 引入的,功能完善的消息隊(duì)列。想必也是 Redis 官方團(tuán)隊(duì)看到太多人拿 Redis 當(dāng)消息隊(duì)列使,于是干脆就在 Redis 上設(shè)計(jì)出一個(gè)類似于 Kafka 的消息隊(duì)列。

Steam 支持消費(fèi)組消費(fèi),一條消息只能被消費(fèi)組中的其中一個(gè)消費(fèi)者消費(fèi)。支持 「消息確認(rèn)」、支持 「回溯消費(fèi)」 還支持把未 ACK(確認(rèn))的消息轉(zhuǎn)移給其他消費(fèi)者進(jìn)行重新消費(fèi),在進(jìn)行轉(zhuǎn)移的時(shí)候還會(huì)累計(jì)消息的轉(zhuǎn)移次數(shù),當(dāng)次數(shù)達(dá)到一定閾值還沒消費(fèi)成功,就可以放入死信隊(duì)列。

這也是 Redis 種最復(fù)雜的一種數(shù)據(jù)類型。如果你真的到了需要使用 Redis Steam 作為消息隊(duì)列的地步,那不如直接使用 RabbitMQ 等更加成熟且穩(wěn)定的消息隊(duì)列系統(tǒng)。

使用 List 實(shí)現(xiàn)可靠的消息隊(duì)列

目前來說,這是用得最多的一種方式,適用于大多數(shù)簡單的消息隊(duì)列應(yīng)用場景。List 類型有很多指令,但是作為消息隊(duì)列來說用到的只有幾個(gè)個(gè):

LPUSH key element [element ...]

把元素插入到 List 的首部,如果 List 不存在,會(huì)自動(dòng)創(chuàng)建。

BRPOPLPUSH source destination timeout

移除并且返回 List (source)尾部的最后一個(gè)元素,并且同時(shí)會(huì)把這個(gè)元素插入到另一個(gè) List (destination)的首部。

當(dāng) source List 中沒有元素時(shí),Redis 會(huì)阻塞連接,直到有其他客戶端向其推送元素或超時(shí)。超時(shí)時(shí)間(秒)為 0 表示永遠(yuǎn)不超時(shí)。

注意,這個(gè)命令是 「原子性」 的,也就是說只要客戶端獲取到了返回的元素,那么這個(gè)元素一定就會(huì)在 destination List 有備份。這是實(shí)現(xiàn)可靠消息隊(duì)列的關(guān)鍵!

RPOPLPUSH source destination

同上,它是 BRPOPLPUSH 命令的 「非阻塞」 版,如果 List 中沒有元素就會(huì)立即返回 null。

LREM key count element

從 List 中刪除元素,count 的值不同,刪除的方式也不同:

  • count > 0:從頭到尾開始搜索,刪除與 element 相等的元素,最多刪除 count 個(gè)。

  • count < 0:從尾到頭開始搜索,刪除與 element 相等的元素,最多刪除 count (絕對值)個(gè)。

  • count = 0:刪除所有與元素相等的元素。

BLMOVE 和 LMOVE 命令」

  • LMOVE source destination <LEFT | RIGHT> <LEFT | RIGHT>

  • BLMOVE source destination <LEFT | RIGHT> <LEFT | RIGHT> timeout

從 Redis 6.2.0 開始,BRPOPLPUSH 和 RPOPLPUSH 命令就被聲明為廢棄了,取而代之的是語義更加明確的 BLMOVE 和 LMOVE 命令。

BLMOVE 和 LMOVE 可以通過參數(shù)指定元素出隊(duì)列(source)的方向,和入隊(duì)列(destination)的方向,除此以外并無其他區(qū)別。

?

實(shí)現(xiàn)思路

了解了上述幾個(gè)命令后,一個(gè)簡單易用且可靠的消息隊(duì)列就呼之欲出了。

  • 生產(chǎn)者使用 LPUSH 命令往消息隊(duì)列生產(chǎn)消息

  • 消費(fèi)者使用 BRPOPLPUSH 命令從隊(duì)列消費(fèi)消息,并且還會(huì)在獲取并返回消息的時(shí)候把該消息推送到另一個(gè)消息隊(duì)列,也就是 Pending 隊(duì)列,這個(gè)隊(duì)列中存儲(chǔ)的就是未被消費(fèi)者 ACK 的消息

  • 消費(fèi)者成功消費(fèi)完畢后,使用 LREM 命令從 Pending 隊(duì)列中刪除這條消息,整個(gè)消費(fèi)過程結(jié)束

  • 如果消費(fèi)者在消費(fèi)過程中出現(xiàn)異常、宕機(jī),那么需要在恢復(fù)后從 Pending 隊(duì)列中獲取到這條消息,再進(jìn)行重新消費(fèi),從而保證了消息隊(duì)列的可靠性,不會(huì)丟失消息(可能存在重復(fù)消費(fèi),需要做好冪等處理)

在 Spring Boot 中實(shí)現(xiàn)

首先,創(chuàng)建 Spring Boot 項(xiàng)目,并整合 Redis。

創(chuàng)建一個(gè) OrderConsumer Bean 模擬從隊(duì)列中消費(fèi)訂單 ID。

package cn.springdoc.demo.consumer;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

@Component
public class OrderConsumer implements ApplicationRunner, Runnable {

    static final Logger log = LoggerFactory.getLogger(OrderConsumer.class);

    // 消息隊(duì)列
    final String queue = "queue_orders";

    // pending 隊(duì)列,即待確認(rèn)消息的隊(duì)列
    final String pendingQueue = "pending_queue_orders";

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 應(yīng)用啟動(dòng)后,創(chuàng)建新的線程來執(zhí)行消費(fèi)任務(wù)
        Thread thread = new Thread(this);
        thread.setName("order-consumer-thread");
        thread.start();
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 1:消費(fèi)者,從隊(duì)列未彈出消息,并推送到 pending 隊(duì)列,整個(gè)過程是原子性的
                // 最多阻塞 5 秒,超過 5 秒后還沒有消息,則返回 null
                String item = stringRedisTemplate.opsForList().rightPopAndLeftPush(queue, pendingQueue, 5, TimeUnit.SECONDS);
                
                if (item == null) {
                    log.info("等待消息 ...");
                    continue ;
                }
                
                try {
                    
                    // 2:解析為 Long
                    Long orderId = Long.parseLong(item);
                    
                    // 模擬消息消費(fèi)
                    log.info("消費(fèi)消息: {}", orderId);
                    
                } catch (Exception e) {
                    log.error("消費(fèi)異常:{}", e.getMessage());
                    continue;
                }
                
                // 3:消費(fèi)成功,從 pending 隊(duì)列刪除記錄,相當(dāng)于確認(rèn)消費(fèi)
                stringRedisTemplate.opsForList().remove(pendingQueue, 0, item);
            } catch (Exception e) {
                log.error("隊(duì)列監(jiān)聽異常:{}", e.getMessage());
                break;
            }
        }
        log.info("退出消費(fèi)");
    }
}

OrderConsumer 實(shí)現(xiàn)了 ApplicationRunner 接口,在應(yīng)用就緒后創(chuàng)建新的消費(fèi)線程進(jìn)行消費(fèi)。

stringRedisTemplate.opsForList().rightPopAndLeftPush 方法從 queue 隊(duì)列消費(fèi)一條消息,同時(shí)把消息添加到  pendingQueue 隊(duì)列。該方法底層調(diào)用的正是 brpoplpush 命令,最多阻塞 5 秒,超時(shí)后返回 null。

得到消息后解析為 Long 類型,模擬消費(fèi),即輸出到日志。如果消費(fèi)成功,則調(diào)用 stringRedisTemplate.opsForList().remove 方法(底層正是 LREM 命令)從 pendingQueue 隊(duì)列中刪除消息。如果消費(fèi)失敗,失敗的消息會(huì)在 pendingQueue 隊(duì)列中繼續(xù)存在,不會(huì)丟失,可以重新投遞消費(fèi)或者是人工處理。

測試

啟動(dòng)應(yīng)用后,通過 Redis 客戶端往 queue_orders 隊(duì)列推送消息:

> lpush queue_orders 10000
"1"
> lpush queue_orders 10010
"1"
> lpush queue_orders 10011
"1"
> lpush queue_orders Nan
"1"

往 queue_orders 隊(duì)列推送了四條訂單的 ID。注意最后一條消息值是 Nan,這會(huì)導(dǎo)致 Long.parseLong 異常從而導(dǎo)致消費(fèi)失敗。

服務(wù)端輸出日志如下:

[           main] cn.springdoc.demo.DemoApplication        : Started DemoApplication in 3.769 seconds (process running for 4.18)
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 等待消息 ...
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 消費(fèi)消息: 10000
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 等待消息 ...
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 消費(fèi)消息: 10010
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 等待消息 ...
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 消費(fèi)消息: 10011
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 消費(fèi)異常:For input string: "Nan"
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 等待消息 ...

符合預(yù)期,前面三條消息都成功消費(fèi),最后一條消息消費(fèi)失敗。按照設(shè)計(jì),這條消費(fèi)失敗的消息應(yīng)該在 Pending 隊(duì)列 pending_queue_orders 中存在。且應(yīng)該只有這一條消息,因?yàn)槠渌龡l消息都消費(fèi)成功。

查看 pending_queue_orders 隊(duì)列中的所有元素:

> lrange pending_queue_orders 0 -1
1) "Nan"

一切 OK,該隊(duì)列中只有 Nan 這條消息,正是消費(fèi)失敗的那條消息。

此時(shí),你如果想查看一下 Redis 中的所有 key,你會(huì)發(fā)現(xiàn)只有 pending_queue_orders 隊(duì)列存在:

> keys *
1) "pending_queue_orders"

queue_orders 隊(duì)列呢?這是 Redis List 的一個(gè)特性,當(dāng)從 List 中彈出最后一個(gè)元素后,Redis 就會(huì)刪除這個(gè) List。queue_orders 中的元素都被彈出了,所以它被刪除了。當(dāng)再次嘗試往 queue_orders 中壓入消息時(shí),它會(huì)自動(dòng)創(chuàng)建。也就是說 「我們不需要手動(dòng)預(yù)先創(chuàng)建隊(duì)列, Redis 會(huì)自己創(chuàng)建,也會(huì)在合適的時(shí)間刪除,而這一切都是線程安全的」。

由于這是線程安全的,所以隊(duì)列中的 「一條消息只能被一個(gè)消費(fèi)者(客戶端)進(jìn)行消費(fèi)」,這非常適合在分布式或者是集群模式下使用,不必?fù)?dān)心同一條消息被多個(gè)消費(fèi)者消費(fèi)到。

 

注意,Pending 隊(duì)列中的消息可能存在重復(fù)消費(fèi)的可能。例如,消費(fèi)者成功消費(fèi)消息后,在調(diào)用 remove 方法從 Pending 隊(duì)列中刪除消息時(shí)失敗,那么 Pending 隊(duì)列中的這條刪除失敗的消息其實(shí)已經(jīng)是被成功消費(fèi)了的,需要在業(yè)務(wù)中考慮到!

使用 BLMOVE 和 LMOVE 命令

上文說過,從 Redis 6.2.0 開始 BRPOPLPUSH 和 RPOPLPUSH 命令就被聲明為廢棄了,后續(xù)版本中推薦使用 BLMOVE 和 LMOVE 命令。

目前 StringRedisTemplate (Spring Boot 3.2.2)并未直接提供與 BLMOVE 和 LMOVE 命令對應(yīng)的 API 方法,但是可以獲取到底層連接對象來調(diào)用 BLMOVE 和 LMOVE 命令。

String item = this.stringRedisTemplate.execute(new RedisCallback<String>() {
    @Override
    public String doInRedis(RedisConnection connection) throws DataAccessException {
        // 調(diào)用 bLMove 命令
        byte[] ret = connection.listCommands().bLMove(queue.getBytes(), pendingQueue.getBytes(), Direction.RIGHT, Direction.LEFT, 5);
        return ret == null ? null : new String(ret);
    }
});

Redis 的持久化方式

Redis 是一個(gè)內(nèi)存數(shù)據(jù)庫,為了保證數(shù)據(jù)的安全不丟失,它提供了兩種數(shù)據(jù)備份(持久化)方式,即 「RDB」 和 「AOF」。

  • 「RDB」:生成某一時(shí)刻的數(shù)據(jù)快照,通過子進(jìn)程進(jìn)行備份,數(shù)據(jù)可能不完整(取決于備份周期)。

  • 「AOF」:通過記錄執(zhí)行的指令到文件來實(shí)現(xiàn)數(shù)據(jù)備份,相對完整性較高,但是會(huì)記錄每一條執(zhí)行命令,性能會(huì)有一定影響。

這就需要根據(jù)你的業(yè)務(wù)場景來選擇合適的持久化方式,也可以同時(shí)配合使用 「RDB」 和 「AOF」 兩種方式,兼顧性能和數(shù)據(jù)安全。

總結(jié)

本文介紹了如何在 Spring Boot 中使用 Redis List 的 BRPOPLPUSH/BLMOVE 命令來實(shí)現(xiàn)一個(gè)線程安全且可靠的消息隊(duì)列。

以上就是SpringBoot使用Redis實(shí)現(xiàn)消息隊(duì)列的方法小結(jié)的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot Redis消息隊(duì)列的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論