Springboot3+Redis實(shí)現(xiàn)消息隊(duì)列的多種方法小結(jié)
Redis實(shí)現(xiàn)消息隊(duì)列和RabbitMQ的優(yōu)缺點(diǎn)
Redis實(shí)現(xiàn)消息隊(duì)列的優(yōu)點(diǎn):
- 性能高:Redis是內(nèi)存數(shù)據(jù)庫(kù),讀寫(xiě)速度快,適合高并發(fā)的消息推送。
- 數(shù)據(jù)結(jié)構(gòu)豐富:Redis支持多種數(shù)據(jù)結(jié)構(gòu),如列表(list)、集合(set)、有序集合(zset)等,可以實(shí)現(xiàn)多種消息隊(duì)列模式。
- 易用性:Redis的命令簡(jiǎn)單,學(xué)習(xí)成本低,易于上手。
- 持久化:Redis支持RDB和AOF持久化,能夠保證數(shù)據(jù)的安全。
- 分布式支持:Redis支持主從復(fù)制、哨兵和集群,可以實(shí)現(xiàn)高可用和分布式架構(gòu)。
Redis實(shí)現(xiàn)消息隊(duì)列的缺點(diǎn):
- 功能有限:相比于專(zhuān)業(yè)的消息隊(duì)列中間件,Redis的消息隊(duì)列功能相對(duì)簡(jiǎn)單,不支持復(fù)雜的消息路由、事務(wù)、消息優(yōu)先級(jí)等特性。
- 消息可靠性:Redis沒(méi)有內(nèi)置的消息確認(rèn)機(jī)制,需要自行實(shí)現(xiàn),可能會(huì)增加開(kāi)發(fā)復(fù)雜度。
- 數(shù)據(jù)量限制:受內(nèi)存限制,Redis不適合存儲(chǔ)大量消息。
- 消息持久化問(wèn)題:雖然Redis支持持久化,但在大數(shù)據(jù)量情況下,持久化可能會(huì)影響性能。
RabbitMQ的優(yōu)點(diǎn):
- 功能強(qiáng)大:RabbitMQ是一個(gè)專(zhuān)業(yè)的消息隊(duì)列中間件,支持消息持久化、事務(wù)、消息優(yōu)先級(jí)、延遲消息、死信隊(duì)列等高級(jí)特性。
- 高可用性:RabbitMQ支持鏡像隊(duì)列,可以實(shí)現(xiàn)高可用架構(gòu)。
- 靈活的路由:RabbitMQ支持多種交換機(jī)類(lèi)型(direct, topic, headers, fanout),可以實(shí)現(xiàn)復(fù)雜的消息路由。
- 客戶端支持廣泛:RabbitMQ有多種語(yǔ)言客戶端,方便集成。
- 社區(qū)活躍:RabbitMQ有活躍的社區(qū),問(wèn)題解決速度快。
RabbitMQ的缺點(diǎn):
- 性能相對(duì)較低:相比于Redis,RabbitMQ的性能略低,尤其是在高并發(fā)場(chǎng)景下。
- 資源消耗:RabbitMQ需要更多的系統(tǒng)資源,如CPU和內(nèi)存。
- 復(fù)雜性:RabbitMQ的概念模型較為復(fù)雜,學(xué)習(xí)曲線較陡峭
Redis適合于需要高速讀寫(xiě)、輕量級(jí)消息隊(duì)列的場(chǎng)景,如果業(yè)務(wù)對(duì)消息隊(duì)列的功能要求不高,且已經(jīng)使用了Redis,可以考慮使用Redis實(shí)現(xiàn)消息隊(duì)列。其他情況下還是建議使用RabbitMQ
1.Spring Data Redis
這是Spring框架提供的一個(gè)用于簡(jiǎn)化Redis操作的模塊。
初始準(zhǔn)備
1.1首先配置Pom依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
1.2 在yml中配置redis相關(guān)信息
由于spring.redis.host 這種配置已經(jīng)被棄用,在新版的springboot中,需要在spring.data.redis.host 進(jìn)行配置
spring: data: redis: # 改為自己的地址和密碼 host: 10.69.37.213 port: 6379 password: Jolly # 連接超時(shí)時(shí)間,單位ms connect-timeout: 50000 # 選擇第幾個(gè)數(shù)據(jù)庫(kù),默認(rèn)為0,最大值15 database: 0 lettuce: pool: # 最大的活躍連接數(shù),不會(huì)超過(guò)這個(gè)數(shù),根據(jù)項(xiàng)目預(yù)期并發(fā)量調(diào)整 max-active: 50 # max-idle 指定了連接池中的最大空閑連接數(shù)。 # 空閑連接是指那些沒(méi)有被使用,但是已經(jīng)創(chuàng)建并且保持在連接池中的連接 # 這個(gè)值應(yīng)該與max-active相匹配或者稍微低一些, # 以保持連接池中有足夠的空閑連接來(lái)處理突發(fā)請(qǐng)求。 # 設(shè)置得太高可能會(huì)導(dǎo)致資源浪費(fèi),因?yàn)榭臻e連接會(huì)占用內(nèi)存和其他資源。 max-idle: 30 #這個(gè)配置指定了連接池中的最小空閑連接數(shù)。 #這個(gè)設(shè)置可以確保連接池始終保持一定數(shù)量的空閑連接,以便在請(qǐng)求到來(lái)時(shí)可以立即使用,而不需要等待連接的創(chuàng)建。 # 這個(gè)值應(yīng)該根據(jù)你的應(yīng)用程序的基線負(fù)載來(lái)設(shè)置 min-idle: 10 # 當(dāng)連接池達(dá)到最大活躍連接數(shù)時(shí),客戶端等待可用連接的最大時(shí)間(以毫秒為單位)。-1 表示無(wú)限等待 # 如果設(shè)置為一個(gè)正數(shù),那么在等待時(shí)間超過(guò)這個(gè)值后,會(huì)拋出一個(gè)異常。 max-wait: -1
1.3 設(shè)置redis的序列化
為了防止存入到redis的數(shù)據(jù)出現(xiàn)亂碼的情況,進(jìn)行序列化的設(shè)置
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @Configuration public class redisConfig { @ConditionalOnMissingBean(name = "redisTemplate") @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate<Object, Object> template = new RedisTemplate<>(); template.setConnectionFactory(connectionFactory); // 默認(rèn)為utf-8,可以進(jìn)行修改 template.setKeySerializer(new StringRedisSerializer()); // 原版默認(rèn)使用jdk的序列化方式JdkSerializationRedisSerializer Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class); template.setValueSerializer(serializer); // 設(shè)置Hash的序列化化方式 template.setHashKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(serializer); // 設(shè)置屬性 template.afterPropertiesSet(); return template; } }
2.Redis實(shí)現(xiàn)消息隊(duì)列的方式
2.1 使用Redis的List實(shí)現(xiàn)消息隊(duì)列
首先構(gòu)造一個(gè)簡(jiǎn)單的訂單類(lèi),用于后面消息隊(duì)列測(cè)試
import lombok.AllArgsConstructor; import lombok.Data; @Data @AllArgsConstructor public class order implements Serializable { private int id; private String userid; private String goodName; }
我們使用最簡(jiǎn)單的方式來(lái)實(shí)現(xiàn)消息隊(duì)列,直接不斷輪詢List中是否有消息
import jakarta.annotation.Resource; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class Mq { @Resource private RedisTemplate<String, Object> redisTemplate; // 隊(duì)列名 private final String queue = "order_queue"; @GetMapping("/order") public void order(){ // 為了模擬消息的獲取,異步開(kāi)啟一個(gè)線程,進(jìn)行消息處理 Thread thread = new Thread(() -> { try { processData(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); thread.start(); // 模擬產(chǎn)生10條消息,放入隊(duì)列 for (int i = 0; i < 10; i++) { order order = new order(i, i, "goods" + i); redisTemplate.opsForList().leftPush(queue, order); System.out.println("放入消息隊(duì)列:"+i); } } // 處理消息,不斷的輪詢隊(duì)列中的消息 public void processData() throws InterruptedException { while (true){ Object order = redisTemplate.opsForList().rightPop(queue); if(order == null){ System.out.println("當(dāng)前沒(méi)有消息"); Thread.sleep(1000); }else{ System.out.println("處理消息:"+order); } } } }
這種方式是最簡(jiǎn)單的方式,但是不推薦,因?yàn)橐恢陛喸兪菚?huì)浪費(fèi)CPU資源的,拉低服務(wù)端的性能。
2.2 消息訂閱模式
Redis 支持消息隊(duì)列的一種模式是通過(guò)其發(fā)布訂閱(Publish/Subscribe)功能。這種模式允許客戶端訂閱一個(gè)或多個(gè)頻道(channel),并接收發(fā)送到這些頻道的消息。
2.2.1 發(fā)布消息
這一步是比較簡(jiǎn)單的,直接調(diào)用方法即可.
@Resource private RedisTemplate<String, Object> redisTemplate; private final String CHANNEL_NAME = "order_queue"; @GetMapping("/order") public void order(){ // 模擬產(chǎn)生10條消息,放入隊(duì)列 for (int i = 0; i < 10; i++) { order order = new order(i, i, "goods" + i); //發(fā)布消息 redisTemplate.convertAndSend(CHANNEL_NAME, order); System.out.println("放入消息隊(duì)列:"+i); } }
2.2.2 消息監(jiān)聽(tīng)
首先我們需要取實(shí)現(xiàn)MessageListener接口的方法
import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Service; @Service public class sub implements MessageListener { // 當(dāng)監(jiān)聽(tīng)到有消息的時(shí)候,就會(huì)執(zhí)行這個(gè)方法 @Override public void onMessage(Message message, byte[] pattern) { String msg = new String(message.getBody()); // 模擬延遲處理 try { Thread.sleep(2000); // 假設(shè)處理需要2秒 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("處理消息:"+msg); } }
然后可以在開(kāi)始的redisConfig類(lèi)里面加上下面的代碼
/** * 因?yàn)闃?biāo)記了@Bean 注解,所以會(huì)在springboot啟動(dòng)的時(shí)候調(diào)用該方法創(chuàng)建,也可以放在其他地方進(jìn)行創(chuàng)建 * 當(dāng)調(diào)用這個(gè)方法時(shí),RedisConnectionFactory 這個(gè)對(duì)象已經(jīng)存在于springboot的容器內(nèi),然后調(diào)用這個(gè) * 方法的時(shí)候就會(huì)傳入該參數(shù),執(zhí)行方法后會(huì)創(chuàng)建一個(gè)RedisMessageListenerContainer,這樣可以在其他類(lèi) * 里面管理這些監(jiān)聽(tīng)MessageListener * @param connectionFactory * @return */ @Bean public RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory) { // 首先創(chuàng)建一個(gè)監(jiān)聽(tīng)的容器,這個(gè)容器可以傳入多個(gè)MessageListener RedisMessageListenerContainer container = new RedisMessageListenerContainer(); // 注入一個(gè)連接池工廠 container.setConnectionFactory(connectionFactory); // 創(chuàng)建一個(gè)自己的監(jiān)聽(tīng)類(lèi) sub sub = new sub(); // 然后和名為order_queue的通道進(jìn)行綁定 container.addMessageListener(sub,new ChannelTopic("order_queue")); return container; }
2.2.3 結(jié)果
2.3 基于Stream進(jìn)行實(shí)現(xiàn)
Redis Stream 是 Redis 5.0 版本引入的一種新的數(shù)據(jù)結(jié)構(gòu),用于存儲(chǔ)和操作消息流。它類(lèi)似于消息隊(duì)列,但提供了更豐富的功能,允許你以有序、可持久化的方式存儲(chǔ)多個(gè)字段-值對(duì)的消息。
2.3.1 優(yōu)點(diǎn)
- 持久化存儲(chǔ):Stream可以持久化消息到磁盤(pán),即使Redis服務(wù)器重啟,消息也不會(huì)丟失。
- 有序性:Stream保證了消息的有序性,每個(gè)消息都有一個(gè)唯一的ID,按照進(jìn)入隊(duì)列的順序排列。
- 多消費(fèi)者:Stream支持多個(gè)消費(fèi)者,可以有不同的組(Consumer Groups)消費(fèi)同一個(gè)Stream,而且每個(gè)消費(fèi)者可以獨(dú)立消費(fèi),不會(huì)互相影響。
- 消息確認(rèn)機(jī)制:消費(fèi)者可以讀取消息并進(jìn)行確認(rèn)(ACK),確保消息不會(huì)因?yàn)橄M(fèi)者故障而丟失。
- 消息回溯:Stream允許新的消費(fèi)者從任意位置開(kāi)始消費(fèi)消息,包括從Stream的開(kāi)始位置,這使得新的消費(fèi)者可以回溯并處理之前的消息。
- 靈活的消息長(zhǎng)度:Stream中的消息可以是任意長(zhǎng)度的字符串,可以包含復(fù)雜的數(shù)據(jù)結(jié)構(gòu),如JSON。
- 阻塞讀取:消費(fèi)者可以使用
BLPOP
或BRPOP
等命令進(jìn)行阻塞讀取,直到有新的消息到來(lái)。 - 事務(wù)支持:可以利用Redis的事務(wù)特性,確保消息的寫(xiě)入和讀取操作是原子性的。
- 時(shí)間戳:每個(gè)消息可以包含時(shí)間戳字段,便于進(jìn)行基于時(shí)間的消息管理。
- 易于監(jiān)控:Stream的結(jié)構(gòu)便于監(jiān)控隊(duì)列的長(zhǎng)度、消費(fèi)者狀態(tài)等信息。
2.3.2 實(shí)現(xiàn)
我們模擬一個(gè)搶購(gòu)訂單場(chǎng)景,比如我們的服務(wù)器只能每秒處理50個(gè)請(qǐng)求,請(qǐng)求太多可能會(huì)導(dǎo)致我們的服務(wù)直接宕機(jī),那么我們可以把請(qǐng)求放入消息隊(duì)列,讓消息隊(duì)列來(lái)抗住大量的請(qǐng)求。
我們的策略可以是消息隊(duì)列限量50個(gè)請(qǐng)求,當(dāng)請(qǐng)求到來(lái)時(shí),消息數(shù)量大于50n我們直接返回讓用戶重試,服務(wù)太忙的提示,這也是很常見(jiàn)的提示。
import com.xujialin.springboot3_study.entity.order; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.connection.stream.*; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.time.Duration; import java.util.HashMap; import java.util.List; @RestController public class streamCont { @Resource private RedisTemplate<String,Object> redisTemplate; private String stream_key = "stream_key"; @GetMapping("/order") public void order() { //封裝請(qǐng)求,假裝這是高并發(fā)場(chǎng)景 for (int j = 0; j < 100; j++) { new Thread(() -> { for (int i = 0; i < 100; i++) { order order = new order(i, i, "goods" + i); HashMap<String,order> map = new HashMap<>(); map.put("order", order); Long size = redisTemplate.opsForStream().size(stream_key); if(size > 500){ System.out.println("活動(dòng)太火爆了,請(qǐng)重試!"); continue; } redisTemplate.opsForStream().add(stream_key,map); } }).start(); } System.out.println("恭喜你搶到了"); } @PostConstruct public void init(){ // 第一個(gè)是stream的key,第二個(gè)是組名 // redisTemplate.opsForStream().createGroup(stream_key, "g1"); try { redisTemplate.opsForStream().createGroup(stream_key, "g1"); } catch (RedisSystemException e) { // 如果 group 已存在,拋出異常,可忽略 System.out.println("group已經(jīng)存在"); } for (int i = 0; i < 5; i++) { new Thread(new consumer()).start(); } } class consumer implements Runnable { @Override public void run() { while(true){ // 讀取消息 List<MapRecord<String, Object, Object>> list = redisTemplate.opsForStream().read( // 為g1消費(fèi)者組創(chuàng)建一個(gè)消費(fèi)則名字,可以為每個(gè)線程的名字,也可以為一個(gè)固定的名字, // 一條消息最多只能被組里面的一個(gè)消費(fèi)者消費(fèi),如果一條消息同時(shí)被兩個(gè)消費(fèi)者消費(fèi), // 那么這兩個(gè)消費(fèi)者應(yīng)該隸屬于不同的消費(fèi)者組,所以同一個(gè)名字或者不同的名字,對(duì)于同一個(gè) // 消費(fèi)組沒(méi)有太大區(qū)別 Consumer.from("g1", Thread.currentThread().getName()), // 創(chuàng)建一個(gè)讀取選項(xiàng),創(chuàng)建一個(gè)空的 StreamReadOptions 實(shí)例。這是配置讀取選項(xiàng)的起點(diǎn) // .count(1): 設(shè)置讀取操作返回的最大消息數(shù)量。意味著每次讀取操作最多只會(huì)返回一條消息。 //.block(Duration.ofSeconds(2)): 配置讀取操作為阻塞模式,并設(shè)置阻塞的超時(shí)時(shí)間為2s, // 也可以設(shè)置單位 StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), // 創(chuàng)建一個(gè)偏移量,ReadOffset.lastConsumed(): 這是指定讀取消息的偏移量。 // 表示從消費(fèi)者組中最后一次確認(rèn)消費(fèi)的消息之后開(kāi)始讀取新的消息。 StreamOffset.create( "stream_key", ReadOffset.lastConsumed())); // 讀取消息 if (list != null && !list.isEmpty()) { MapRecord<String, Object, Object> entries = list.get(0); // 模擬處理消息 System.out.println(entries); // 確認(rèn)消息 redisTemplate.opsForStream().acknowledge("stream_key","g1",entries.getId()); } } } } }
還可以使用更優(yōu)雅的實(shí)現(xiàn),使用 StreamMessageListenerContainer 可以創(chuàng)建一個(gè)更高級(jí)的消息監(jiān)聽(tīng)機(jī)制,它允許你注冊(cè) StreamListener,這樣你就可以實(shí)現(xiàn)基于事件的異步消息處理,而不是阻塞讀取。這種方式更適合生產(chǎn)環(huán)境,因?yàn)樗峁┝烁玫馁Y源管理和錯(cuò)誤處理機(jī)制。
到此這篇關(guān)于Springboot3+Redis實(shí)現(xiàn)消息隊(duì)列的多種方法小結(jié)的文章就介紹到這了,更多相關(guān)Springboot3 Redis消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
為什么JDK8中HashMap依然會(huì)死循環(huán)
這篇文章主要介紹了為什么JDK8中HashMap依然會(huì)死循環(huán),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09深入淺出的學(xué)習(xí)Java ThreadLocal
本文會(huì)基于實(shí)際場(chǎng)景介紹ThreadLocal如何使用以及內(nèi)部實(shí)現(xiàn)機(jī)制。 具有很好的參考價(jià)值,下面跟著小編一起來(lái)看下吧2017-02-02Java正則校驗(yàn)密碼至少包含字母數(shù)字特殊符號(hào)中的2種實(shí)例代碼
正則表達(dá)式驗(yàn)證密碼功能在項(xiàng)目中經(jīng)常被使用到,但是很多朋友還是不大會(huì)使用密碼正則表達(dá)式進(jìn)行驗(yàn)證,下面這篇文章主要給大家介紹了關(guān)于Java正則校驗(yàn)密碼至少包含字母數(shù)字特殊符號(hào)中2種的相關(guān)資料,需要的朋友可以參考下2022-08-08SpringBoot項(xiàng)目實(shí)現(xiàn)jar包方式打包部署
SpringBoot默認(rèn)的打包方式就是jar包,本文就來(lái)介紹一下SpringBoot項(xiàng)目實(shí)現(xiàn)jar包方式打包部署,具有一定的參考價(jià)值,感興趣的可以了解一下2024-08-08Springmvc模式上傳和下載與enctype對(duì)比
這篇文章主要介紹了Springmvc模式上傳和下載與enctype對(duì)比,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-12-12