Redis消息隊(duì)列的三種實(shí)現(xiàn)方式
前言
為什么要使用Redis的消息隊(duì)列?
成本低,對(duì)于RabbitMQ或是Kafka來(lái)說(shuō),已經(jīng)是重量級(jí)的消息隊(duì)列。
Redis的三種實(shí)現(xiàn)方式:
- List結(jié)構(gòu):一種有序的雙向鏈表
- PubSub發(fā)布訂閱:基于點(diǎn)對(duì)點(diǎn)的消息模型
- Stream:在Redis5.0之后提供的,比較完善的消息隊(duì)列模型
List實(shí)現(xiàn)消息隊(duì)列
我們可以利用Redis中List的命令LPUSH與RPOP來(lái)實(shí)現(xiàn)消息的發(fā)送與接收,但是需要注意的是,隊(duì)列中沒(méi)有消息時(shí),RPOP會(huì)返回null,不會(huì)向JVM中阻塞隊(duì)列一樣進(jìn)行阻塞并等待消息,因此這里應(yīng)該使用BRPOP來(lái)實(shí)現(xiàn)阻塞效果。
優(yōu)點(diǎn):利用Redis存儲(chǔ),不受限于JVM內(nèi)存上限。
基于Redis的持久化機(jī)制,數(shù)據(jù)安全性有保證。
可以滿(mǎn)足消息有序性。
缺點(diǎn):無(wú)法避免消息丟失。
只支持單消費(fèi)者。
PubSub消息隊(duì)列
是Redis2.0版本引入的消息傳遞模型,顧名思義,消費(fèi)者可以訂閱一個(gè)或多個(gè)channel,生產(chǎn)者向?qū)?yīng)的channel發(fā)送消息后,所有訂閱者都能收到相關(guān)信息。
PubSub消息隊(duì)列的基本命令
# 訂閱一個(gè)或多個(gè)頻道 SUBSCRIBE channel [channel] # 向一個(gè)頻道發(fā)送消息 PUBLISH channel msg # 訂閱與pattern格式匹配的所有頻道 PSUBSCRIBE pattern [pattern]
優(yōu)點(diǎn):采用發(fā)布訂閱模式,支持多生產(chǎn)者,多消費(fèi)者。
缺點(diǎn):不支持?jǐn)?shù)據(jù)持久化。
無(wú)法避免消息丟失。
消息堆積有上限,超出時(shí)數(shù)據(jù)丟失。
Stream消息隊(duì)列
Stream是Redis5.0之后引入新的數(shù)據(jù)類(lèi)型,支持持久化,因此相比于PubSub更加安全,可以通過(guò)Stream實(shí)現(xiàn)一個(gè)功能完善的消息隊(duì)列
發(fā)送消息的命令:
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID filed value[filed value]
命令解釋?zhuān)?/p>
- key:隊(duì)列名稱(chēng)
- NOMKSTREAM:如果隊(duì)列不存在,是否自動(dòng)創(chuàng)建隊(duì)列,默認(rèn)是自動(dòng)創(chuàng)建
- MAXLEN|MINID [=|~] threshold [LIMIT count]:設(shè)置消息隊(duì)列的最大消息數(shù)量
- *|ID:消息的唯一ID,*代表由Redis自動(dòng)生成,格式是"時(shí)間戳-遞增數(shù)字"
- field value:發(fā)送到隊(duì)列的消息名稱(chēng)為Entry
讀取消息的第一種方法
命令如下
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
命令解釋?zhuān)?/p>
- COUNT count:每次讀取消息的最大數(shù)量
- BLOCK milliseconds:當(dāng)沒(méi)有消息時(shí)進(jìn)行阻塞,并指定阻塞時(shí)長(zhǎng),如果為0則指永久阻塞
- STREAMS key:要從哪個(gè)隊(duì)列讀取消息
- ID:起始ID,只返回大于該ID的消息,0表示從第一個(gè)消息開(kāi)始讀取,$表示從最新消息開(kāi)始
XREAD命令的特點(diǎn):
- 消息可回溯
- 一個(gè)消息可以被多個(gè)消費(fèi)者拿到
- 可以阻塞讀取
- 有消息漏讀的風(fēng)險(xiǎn)
讀取消息的第二種方法
將多個(gè)消費(fèi)者劃分到一個(gè)組(Consumer Group)當(dāng)中,監(jiān)聽(tīng)同一個(gè)隊(duì)列。特點(diǎn)如下
- 消息分流:隊(duì)列中的消息會(huì)分流給組內(nèi)的不同消費(fèi)者,而不是重復(fù)消費(fèi),從而加快消息處理的速度
- 消息標(biāo)識(shí):消費(fèi)者組會(huì)維護(hù)一個(gè)標(biāo)示記錄最后一個(gè)被處理的消息哪怕消費(fèi)者宕機(jī)重啟,還會(huì)從標(biāo)示之后讀取消息。確保每一個(gè)消息都會(huì)被消費(fèi)
- 消息確認(rèn):消費(fèi)者獲取消息后,消息處于pending狀態(tài),并存入一個(gè)pending-list。當(dāng)處理完成后需要通過(guò)XACK來(lái)確認(rèn)消息,標(biāo)記消息為已處理,才會(huì)從pending-list移除。
創(chuàng)建消費(fèi)者組命令:
XGROUP CREATE key groupName ID [MKSTREAM]
命令解釋?zhuān)?/p>
- key:隊(duì)列名稱(chēng)
- groupName:消費(fèi)者組名稱(chēng)
- ID:起始ID標(biāo)識(shí),$代表隊(duì)列中最后一個(gè)消息,0則代表隊(duì)列中第一個(gè)消息
- MKSTREAM:隊(duì)列不存在時(shí)自動(dòng)創(chuàng)建
# 刪除指定的消費(fèi)者組 XGROUP DESTORY key groupName # 給指定的消費(fèi)者組添加消費(fèi)者 XGROUP CREATECONSUMER key groupName consumername # 刪除消費(fèi)者組中的指定消費(fèi)者 XGROUP DELCONSUMER key groupname comsumername
從消費(fèi)者組中讀取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID[ID ...]
命令解釋?zhuān)?/p>
- group:消費(fèi)組名稱(chēng)
- consumer:消費(fèi)者名稱(chēng),如果消費(fèi)者不存在,會(huì)自動(dòng)創(chuàng)建一個(gè)消費(fèi)者
- count:本次查詢(xún)的最大數(shù)量
- BLOCK milliseconds:當(dāng)沒(méi)有消息時(shí)最長(zhǎng)等待時(shí)間
- NOACK:無(wú)需手動(dòng)ACK,獲取到消息后自動(dòng)確認(rèn)
- STREAMS key:指定隊(duì)列名稱(chēng)
- ID:獲取消息的起始ID。" > "表示從下一個(gè)未消費(fèi)的消息開(kāi)始。其它則是根據(jù)指定id從pending-list中獲取已消費(fèi)但未確認(rèn)的消息,例如0,是從pending-list中的第一個(gè)消息開(kāi)始
Group類(lèi)型的消息隊(duì)列特點(diǎn):
- 消息可回溯
- 可以多消費(fèi)者爭(zhēng)搶消息
- 可以阻塞讀取
- 沒(méi)有消息漏讀風(fēng)險(xiǎn)
- 有消息確認(rèn)機(jī)制,保證消息至少被消費(fèi)一次
三種實(shí)現(xiàn)方式對(duì)比
LIST | PubSub | Stream | |
消息持久化 | 支持 | 不支持 | 支持 |
阻塞讀取 | 支持 | 支持 | 支持 |
消息堆積處理 | 受限于內(nèi)存空間,可以利用多消費(fèi)者加快處理 | 受限于消費(fèi)者緩沖區(qū) | 受限于隊(duì)列長(zhǎng)度,可以利用消費(fèi)者組提高消費(fèi)速度,減少堆積 |
消息確認(rèn)機(jī)制 | 不支持 | 不支持 | 支持 |
消息回溯 | 不支持 | 不支持 | 支持 |
到此這篇關(guān)于Redis消息隊(duì)列的三種實(shí)現(xiàn)方式的文章就介紹到這了,更多相關(guān)Redis消息隊(duì)列 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Springboot3+Redis實(shí)現(xiàn)消息隊(duì)列的多種方法小結(jié)
- 一文詳解消息隊(duì)列中為什么不用redis作為隊(duì)列
- SpringBoot集成Redisson實(shí)現(xiàn)消息隊(duì)列的示例代碼
- redis?消息隊(duì)列完成秒殺過(guò)期訂單處理方法(一)
- 如何使用?redis?消息隊(duì)列完成秒殺過(guò)期訂單處理操作(二)
- Redis高階使用消息隊(duì)列分布式鎖排行榜等(高階用法)
- Redis使用ZSET實(shí)現(xiàn)消息隊(duì)列的項(xiàng)目實(shí)踐
- Redis使用ZSET實(shí)現(xiàn)消息隊(duì)列使用小結(jié)
- python使用redis實(shí)現(xiàn)消息隊(duì)列(異步)的實(shí)現(xiàn)完整例程
- 詳解Redis Stream做消息隊(duì)列
- 基于Redis實(shí)現(xiàn)消息隊(duì)列的示例代碼
相關(guān)文章
多維度深入分析Redis的5種基本數(shù)據(jù)結(jié)構(gòu)
此篇文章主要對(duì)Redis的5種基本數(shù)據(jù)類(lèi)型,即字符串(String)、列表(List)、散列(Hash)、集合(Set)、有序集合(Sorted?Set),從使用場(chǎng)景和底層結(jié)構(gòu)出發(fā),進(jìn)行多維度深入分析。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-11-11使用Redis實(shí)現(xiàn)實(shí)時(shí)排行榜功能
排行榜功能是一個(gè)很普遍的需求。使用 Redis 中有序集合的特性來(lái)實(shí)現(xiàn)排行榜是又好又快的選擇。接下來(lái)通過(guò)本文給大家介紹使用Redis實(shí)現(xiàn)實(shí)時(shí)排行榜功能,需要的朋友可以參考下2021-07-07通過(guò)docker和docker-compose安裝redis兩種方式詳解
這篇文章主要介紹了通過(guò)docker和docker-compose安裝redis的兩種方式,Docker安裝方式包括拉取鏡像、查看本地鏡像、運(yùn)行容器和測(cè)試連接,Docker Compose安裝方式包括目錄結(jié)構(gòu)、配置文件、啟動(dòng)和關(guān)閉容器、檢查啟動(dòng)情況以及查看CPU和內(nèi)存使用狀態(tài),需要的朋友可以參考下2024-12-12使用lua+redis解決發(fā)多張券的并發(fā)問(wèn)題
這篇文章主要介紹了使用lua+redis解決發(fā)多張券的并發(fā)問(wèn)題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-01-01使用Jedis線(xiàn)程池returnResource異常注意事項(xiàng)
這篇文章主要介紹了使用Jedis線(xiàn)程池returnResource異常注意事項(xiàng),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03Linux環(huán)境下升級(jí)redis的詳細(xì)步驟記錄
這篇文章主要給大家介紹了關(guān)于Linux環(huán)境下升級(jí)redis的詳細(xì)步驟,描述了如何從舊版本升級(jí)到新版本Redis,包括備份舊數(shù)據(jù)、下載和安裝新版本、復(fù)制配置文件和數(shù)據(jù)、停止舊版本并啟動(dòng)新版本的過(guò)程,需要的朋友可以參考下2024-12-12