Redis使用ZSET實(shí)現(xiàn)消息隊(duì)列的項(xiàng)目實(shí)踐
1.zset為什么可以做消息隊(duì)列
zset做消息隊(duì)列的特性有:
- 有序性:zset中所有元素都被自動排序。這讓zset很適合用于有序的消息隊(duì)列,因?yàn)榭梢愿鶕?jù)一個(gè)或多個(gè)標(biāo)準(zhǔn)(比如消息的到達(dá)時(shí)間或優(yōu)先級)按需檢索消息。
- 元素唯一性:zset的每個(gè)元素都是獨(dú)一無二的,這對于實(shí)現(xiàn)某些消息需求(比如冪等性)是非常有幫助的。
- 成員和分?jǐn)?shù)之間的映射關(guān)系:有序集合中的每個(gè)成員都有一個(gè)分?jǐn)?shù),這樣就可以將相同的數(shù)據(jù)劃分到不同的 queue 中,以及為每個(gè) queue 設(shè)置不同的延時(shí)。
- 高效的添加刪除操作:因?yàn)閦set會自動維護(hù)元素之間的順序,所以在添加或刪除元素時(shí)無需進(jìn)行手動排序,從而能提升操作速度。
綜上所述,Redis的zset天然支持按照時(shí)間順序的消息隊(duì)列,可以利用其成員唯一性的特性來保證消息不被重復(fù)消費(fèi),在實(shí)現(xiàn)高吞吐率等方面也有很大的優(yōu)勢。
2.zset實(shí)現(xiàn)消息隊(duì)列的步驟
Redis的zset有序集合是可以用來實(shí)現(xiàn)消息隊(duì)列的,一般是按照時(shí)間戳作為score的值,將消息內(nèi)容作為value存入有序集合中。
實(shí)現(xiàn)步驟:
- 客戶端將消息推送到Redis的有序集合中。
- 有序集合中,每個(gè)成員都有一個(gè)分?jǐn)?shù)(score)。在這里,我們可以設(shè)成消息的時(shí)間戳,也就是當(dāng)時(shí)的時(shí)間。
- 當(dāng)需要從消息隊(duì)列中獲取消息時(shí),客戶端獲取有序集合前N個(gè)元素并進(jìn)行操作。一般來說,N取一個(gè)適當(dāng)?shù)臄?shù)值,比如10。
需要注意的是,Redis的zset是有序集合,它的元素是有序的,并且不能有重復(fù)元素。因此,如果需要處理有重復(fù)消息的情況,需要在消息體中加入某些唯一性標(biāo)識來保證不會重復(fù)。
3.使用jedis實(shí)現(xiàn)消息隊(duì)列示例
Java可以通過Redis的Java客戶端包Jedis來使用Redis,Jedis提供了豐富的API來操作Redis,下面是一段實(shí)現(xiàn)用Redis的zset類型實(shí)現(xiàn)的消息隊(duì)列的代碼。
import redis.clients.jedis.Jedis; import java.util.Set; public class RedisMessageQueue { ? ? private Jedis jedis; //Redis連接對象 ? ? private String queueName; //隊(duì)列名字 ? ? /** ? ? ?* 構(gòu)造函數(shù) ? ? ?* @param host Redis主機(jī)地址 ? ? ?* @param port Redis端口 ? ? ?* @param password Redis密碼 ? ? ?* @param queueName 隊(duì)列名字 ? ? ?*/ ? ? public RedisMessageQueue(String host, int port, String password, String queueName){ ? ? ? ? jedis = new Jedis(host, port); ? ? ? ? jedis.auth(password); ? ? ? ? this.queueName = queueName; ? ? } ? ? /** ? ? ?* 發(fā)送消息 ? ? ?* @param message 消息內(nèi)容 ? ? ?*/ ? ? public void sendMessage(String message){ ? ? ? ? //獲取當(dāng)前時(shí)間戳 ? ? ? ? long timestamp = System.currentTimeMillis(); ? ? ? ? //將消息添加到有序集合中 ? ? ? ? jedis.zadd(queueName, timestamp, message); ? ? } ? ? /** ? ? ?* 接收消息 ? ? ?* @param count 一次接收的消息數(shù)量 ? ? ?* @return 返回接收到的消息 ? ? ?*/ ? ? public String[] receiveMessage(int count){ ? ? ? ? //設(shè)置最大輪詢時(shí)間 ? ? ? ? long timeout = 5000; ? ? ? ? //獲取當(dāng)前時(shí)間戳 ? ? ? ? long start = System.currentTimeMillis(); ? ? ? ? while (true) { ? ? ? ? ? ? //獲取可用的消息數(shù)量 ? ? ? ? ? ? long size = jedis.zcount(queueName, "-inf", "+inf"); ? ? ? ? ? ? if (size == 0) { ? ? ? ? ? ? ? ? //如果無消息,休眠50ms后繼續(xù)輪詢 ? ? ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? ? ? Thread.sleep(50); ? ? ? ? ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? //計(jì)算需要獲取的消息數(shù)量count與當(dāng)前可用的消息數(shù)量size的最小值 ? ? ? ? ? ? ? ? count = (int) Math.min(count, size); ? ? ? ? ? ? ? ? //獲取消息 ? ? ? ? ? ? ? ? Set<String> messages = jedis.zrange(queueName, 0, count - 1); ? ? ? ? ? ? ? ? String[] results = messages.toArray(new String[0]); ? ? ? ? ? ? ? ? //移除已處理的消息 ? ? ? ? ? ? ? ? jedis.zremrangeByRank(queueName, 0, count - 1); ? ? ? ? ? ? ? ? return results; ? ? ? ? ? ? } ? ? ? ? ? ? //檢查是否超時(shí) ? ? ? ? ? ? if (System.currentTimeMillis() - start > timeout) { ? ? ? ? ? ? ? ? return null; //超時(shí)返回空 ? ? ? ? ? ? } ? ? ? ? } ? ? } ? ? /** ? ? ?* 銷毀隊(duì)列 ? ? ?*/ ? ? public void destroy(){ ? ? ? ? jedis.del(queueName); ? ? ? ? jedis.close(); ? ? } }
使用示例:
public static void main(String[] args) { ? ? //創(chuàng)建消息隊(duì)列 ? ? RedisMessageQueue messageQueue = new RedisMessageQueue("localhost", 6379, "password", "my_queue"); ? ? //生產(chǎn)者發(fā)送消息 ? ? messageQueue.sendMessage("message1"); ? ? messageQueue.sendMessage("message2"); ? ? //消費(fèi)者接收消息 ? ? String[] messages = messageQueue.receiveMessage(10); ? ? System.out.println(Arrays.toString(messages)); //輸出:[message1, message2] ? ? //銷毀隊(duì)列 ? ? messageQueue.destroy(); }
在實(shí)際應(yīng)用中,可以結(jié)合線程池或者消息監(jiān)聽器等方式,將消息接收過程放置于獨(dú)立的線程中,以提高消息隊(duì)列的處理效率。
4.+inf與-inf
+inf 是 Redis 中用于表示正無窮大的一種特殊值,也就是無限大。在使用 Redis 的 zset 集合時(shí),+inf 通常用作 ZREVRANGEBYSCORE 命令的上限值,表示查找 zset 集合中最大的分?jǐn)?shù)值。+inf 后面的 -inf 表示 zset 中最小的分?jǐn)?shù)值。這兩個(gè)值一起可以用來獲取 zset 集合中的所有元素或一個(gè)特定范圍內(nèi)的元素。例如:
# 獲取 zset 集合中所有元素 ZREVRANGE queue +inf -inf WITHSCORES # 獲取 zset 集合中第1到第10個(gè)元素(分?jǐn)?shù)從大到小排列) ZREVRANGE queue +inf -inf WITHSCORES LIMIT 0 9 # 獲取 zset 集合中分?jǐn)?shù)在 1581095012 到當(dāng)前時(shí)間之間的元素 ZREVRANGEBYSCORE queue +inf 1581095012 WITHSCORES
在這些命令中,+inf 代表了一個(gè)最大的分?jǐn)?shù)值,-inf 代表了一個(gè)最小的分?jǐn)?shù)值,用于確定查詢的分?jǐn)?shù)值范圍。
5.redis使用list與zset做消息隊(duì)列有什么區(qū)別
Redis 使用 List 和 ZSET 都可以實(shí)現(xiàn)消息隊(duì)列,但是二者有以下不同之處:
- 數(shù)據(jù)結(jié)構(gòu)不同:List 是一個(gè)有序的字符串列表,ZSET 則是一個(gè)有序集合,它們的底層實(shí)現(xiàn)機(jī)制不同。
- 存儲方式不同:List 只能存儲字符串類型的數(shù)據(jù),而 ZSET 則可以存儲帶有權(quán)重的元素,即除了元素值外,還可以為每個(gè)元素指定一個(gè)分?jǐn)?shù)。
- 功能不同: List 操作在元素添加、刪除等方面比較方便,而 ZSET 在處理數(shù)據(jù)排序和范圍查找等方面比 List 更加高效。
- 應(yīng)用場景不同: 對于需要精細(xì)控制排序和分值的場景可以選用 ZSET,而對于只需要簡單的隊(duì)列操作,例如先進(jìn)先出,可以直接采用 List。
綜上所述,List 和 ZSET 都可以用于消息隊(duì)列的實(shí)現(xiàn),但如果需要更好的性能和更高級的排序功能,建議使用 ZSET。而如果只需要簡單的隊(duì)列操作,則 List 更加適合。
到此這篇關(guān)于Redis使用ZSET實(shí)現(xiàn)消息隊(duì)列的項(xiàng)目實(shí)踐的文章就介紹到這了,更多相關(guān)Redis ZSET消息隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Springboot3+Redis實(shí)現(xiàn)消息隊(duì)列的多種方法小結(jié)
- 一文詳解消息隊(duì)列中為什么不用redis作為隊(duì)列
- SpringBoot集成Redisson實(shí)現(xiàn)消息隊(duì)列的示例代碼
- redis?消息隊(duì)列完成秒殺過期訂單處理方法(一)
- 如何使用?redis?消息隊(duì)列完成秒殺過期訂單處理操作(二)
- Redis高階使用消息隊(duì)列分布式鎖排行榜等(高階用法)
- Redis消息隊(duì)列的三種實(shí)現(xiàn)方式
- Redis使用ZSET實(shí)現(xiàn)消息隊(duì)列使用小結(jié)
- python使用redis實(shí)現(xiàn)消息隊(duì)列(異步)的實(shí)現(xiàn)完整例程
- 詳解Redis Stream做消息隊(duì)列
- 基于Redis實(shí)現(xiàn)消息隊(duì)列的示例代碼
相關(guān)文章
Redis遠(yuǎn)程字典服務(wù)器?hash類型示例詳解
這篇文章主要介紹了Redis遠(yuǎn)程字典服務(wù)器?hash類型示例詳解,本文通過示例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-08-08關(guān)于Redis?bigkeys命令會阻塞問題的解決
這篇文章主要介紹了關(guān)于Redis?bigkeys命令會阻塞問題的解決,今天分享一次Redis引發(fā)的線上事故,避免再次踩雷,實(shí)現(xiàn)快速入門,需要的朋友可以參考下2023-03-03Redis分布式鎖與Redlock算法實(shí)現(xiàn)
在Redis中,可以使用多種方式實(shí)現(xiàn)分布式鎖,如使用SETNX命令或RedLock算法,本文就來介紹一下Redis分布式鎖與Redlock算法實(shí)現(xiàn),感興趣的可以了解一下2023-12-12