Redis Lettuce連接redis集群實(shí)現(xiàn)過(guò)程詳細(xì)講解
前言
Lettuce連接redis集群使用的都是集群專(zhuān)用類(lèi),像RedisClusterClient、StatefulRedisClusterConnection、RedisAdvancedClusterCommands、StatefulRedisClusterPubSubConnection等等;
Lettuce對(duì)rediscluster的支持:
- 支持所有Cluster命令;
- 基于鍵哈希槽的路由節(jié)點(diǎn);
- 對(duì)集群命令高級(jí)抽象;
- 在多個(gè)集群節(jié)點(diǎn)上執(zhí)行命令;
- 處理MOVED和ASK重定向;
- 通過(guò)槽位和ip端口直接連接集群節(jié)點(diǎn);
- SSL和身份驗(yàn)證;
- 定期和自適應(yīng)集群拓?fù)涓拢?/li>
- 發(fā)布訂閱;
啟動(dòng)時(shí)只需至少一個(gè)可以連接的集群節(jié)點(diǎn)就可以,能夠自動(dòng)拓?fù)涑黾喝抗?jié)點(diǎn);也可以使用ReadFrom設(shè)置讀取數(shù)據(jù)來(lái)源,跟主從模式一樣;
雖然redis本身的多鍵命令要求key必須都在同一個(gè)槽位,但Lettuce對(duì)一部分命令多了優(yōu)化,可以對(duì)多鍵命令進(jìn)行跨槽位執(zhí)行,通過(guò)將對(duì)不同槽位鍵的操作命令分解為多條命令,單個(gè)命令以fork/join方式并發(fā)運(yùn)行,最后將結(jié)果合并返回;
可以跨槽位的命令有
- DEL:刪除鍵,返回刪除數(shù)量;
- EXISTS:統(tǒng)計(jì)跨槽位的存在的鍵的數(shù)量;
- MGET:獲取所有給定鍵的值,順序按照鍵的順序返回;
- MSET:批量保存鍵值對(duì),總是返回OK;
- TOUCH:改變給定鍵的最后訪問(wèn)時(shí)間,返回改變的鍵的數(shù)量;
- UNLINK:刪除鍵并在另一個(gè)不同的線程中回收內(nèi)存,返回刪除數(shù)量;
提供跨槽位命令的api:RedisAdvancedClusterCommands、RedisAdvancedClusterAsyncCommands、RedisAdvancedClusterReactiveCommands;
可以在多個(gè)集群節(jié)點(diǎn)上執(zhí)行的命令有
- CLIENT SETNAME:在所有已知的集群節(jié)點(diǎn)上設(shè)置客戶(hù)端的名稱(chēng),總是返回OK;
- KEYS:返回所有master上存儲(chǔ)的key;
- DBSIZE:返回所有master上存儲(chǔ)的key的數(shù)量;
- FLUSHALL:清空master上的所有數(shù)據(jù),總是返回OK;
- FLUSHDB:清空master上的所有數(shù)據(jù),總是返回OK;
- RANDOMKEY:從隨機(jī)master上返回隨機(jī)的key;
- SCAN:根據(jù)ReadFrom設(shè)置掃描整個(gè)集群的鍵空間;
- SCRIPT FLUSH:從所有的集群節(jié)點(diǎn)腳本緩存中刪除所有腳本;
- SCRIPT LOAD:在所有的集群節(jié)點(diǎn)上加載lua腳本;
- SCRIPT KILL:在所有集群節(jié)點(diǎn)上殺死腳本;(即使腳本沒(méi)有運(yùn)行調(diào)用也不會(huì)失敗)
- SHUTDOWN:將數(shù)據(jù)集同步保存到磁盤(pán),然后關(guān)閉集群所有節(jié)點(diǎn);
關(guān)于發(fā)布訂閱
普通用戶(hù)空間的發(fā)布訂閱,redis集群會(huì)發(fā)送到每個(gè)節(jié)點(diǎn),發(fā)布者和訂閱者不需要在同一個(gè)節(jié)點(diǎn),普通訂閱發(fā)布消息可以在集群拓?fù)涓淖儠r(shí)重新連接。對(duì)于鍵空間事件,只會(huì)發(fā)到自己的節(jié)點(diǎn),不會(huì)擴(kuò)散到其他節(jié)點(diǎn),要訂閱鍵空間事件可以去適當(dāng)?shù)亩鄠€(gè)節(jié)點(diǎn)上訂閱,或者使用RedisClusterClient消息傳播和NodeSelectionAPI獲得一個(gè)托管連接集合;
注意:由于主從同步,鍵會(huì)被復(fù)制到多個(gè)從節(jié)點(diǎn)上,特別是鍵過(guò)期事件,會(huì)在主從節(jié)點(diǎn)上都產(chǎn)生過(guò)期事件,如果訂閱從節(jié)點(diǎn),可能會(huì)收到多條相同的過(guò)期事件;訂閱是通過(guò)NodeSelectionAPI或者單個(gè)節(jié)點(diǎn)調(diào)用subscribe(…)發(fā)出的,訂閱對(duì)于新增的節(jié)點(diǎn)無(wú)效;
測(cè)試Demo:(redis版本7.0.2,Lettuce版本6.1.8)
集群節(jié)點(diǎn):虛擬機(jī) 192.168.1.31,端口 9001-9006,集群節(jié)點(diǎn)已設(shè)置notify-keyspace-events AK;
package testlettuce; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import io.lettuce.core.ClientOptions.DisconnectedBehavior; import io.lettuce.core.KeyScanCursor; import io.lettuce.core.KeyValue; import io.lettuce.core.ReadFrom; import io.lettuce.core.RedisURI; import io.lettuce.core.ScanCursor; import io.lettuce.core.SocketOptions; import io.lettuce.core.SslOptions; import io.lettuce.core.TimeoutOptions; import io.lettuce.core.cluster.ClusterClientOptions; import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.Executions; import io.lettuce.core.cluster.api.sync.NodeSelection; import io.lettuce.core.cluster.api.sync.NodeSelectionCommands; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.cluster.pubsub.api.async.NodeSelectionPubSubAsyncCommands; import io.lettuce.core.cluster.pubsub.api.async.PubSubAsyncNodeSelection; import io.lettuce.core.cluster.pubsub.api.reactive.RedisClusterPubSubReactiveCommands; import io.lettuce.core.protocol.DecodeBufferPolicies; import io.lettuce.core.protocol.ProtocolVersion; import io.lettuce.core.pubsub.RedisPubSubListener; import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; public class TestLettuceCluster { /** * @param args */ public static void main(String[] args) { List<RedisURI> nodeList = new ArrayList<>(); nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9001).withAuthentication("default", "123456").build()); nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9002).withAuthentication("default", "123456").build()); nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9003).withAuthentication("default", "123456").build()); nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9004).withAuthentication("default", "123456").build()); nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9005).withAuthentication("default", "123456").build()); nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9006).withAuthentication("default", "123456").build()); RedisClusterClient clusterClient = RedisClusterClient.create(nodeList); ClusterTopologyRefreshOptions clusterTopologyRefreshOptions = ClusterTopologyRefreshOptions.builder() .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(5L))//設(shè)置自適應(yīng)拓?fù)渌⑿鲁瑫r(shí),每次超時(shí)刷新一次,默認(rèn)30s; .closeStaleConnections(false)//刷新拓?fù)鋾r(shí)是否關(guān)閉失效連接,默認(rèn)true,isPeriodicRefreshEnabled()為true時(shí)生效; .dynamicRefreshSources(true)//從拓?fù)渲邪l(fā)現(xiàn)新節(jié)點(diǎn),并將新節(jié)點(diǎn)也作為拓?fù)涞脑垂?jié)點(diǎn),動(dòng)態(tài)刷新可以發(fā)現(xiàn)全部節(jié)點(diǎn)并計(jì)算每個(gè)客戶(hù)端的數(shù)量,設(shè)置false則只有初始節(jié)點(diǎn)為源和計(jì)算客戶(hù)端數(shù)量; .enableAllAdaptiveRefreshTriggers()//啟用全部觸發(fā)器自適應(yīng)刷新拓?fù)洌J(rèn)關(guān)閉; .enablePeriodicRefresh(Duration.ofSeconds(5L))//開(kāi)啟定時(shí)拓?fù)渌⑿虏⒃O(shè)置周期; .refreshTriggersReconnectAttempts(3)//長(zhǎng)連接重新連接嘗試n次才拓?fù)渌⑿? .build(); ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder() .autoReconnect(true)//在連接丟失時(shí)開(kāi)啟或關(guān)閉自動(dòng)重連,默認(rèn)true; .cancelCommandsOnReconnectFailure(true)//允許在重連失敗取消排隊(duì)命令,默認(rèn)false; .decodeBufferPolicy(DecodeBufferPolicies.always())//設(shè)置丟棄解碼緩沖區(qū)的策略,以回收內(nèi)存;always:解碼后丟棄,最大內(nèi)存效率;alwaysSome:解碼后丟棄一部分;ratio(n)基于比率丟棄,n/(1+n),通常用1-10對(duì)應(yīng)50%-90%; .disconnectedBehavior(DisconnectedBehavior.DEFAULT)//設(shè)置連接斷開(kāi)時(shí)命令的調(diào)用行為,默認(rèn)啟用重連;DEFAULT:?jiǎn)⒂脮r(shí)重連中接收命令,禁用時(shí)重連中拒絕命令;ACCEPT_COMMANDS:重連中接收命令;REJECT_COMMANDS:重連中拒絕命令; // .maxRedirects(5)//當(dāng)鍵從一個(gè)節(jié)點(diǎn)遷移到另一個(gè)節(jié)點(diǎn),集群重定向次數(shù),默認(rèn)5; // .nodeFilter(nodeFilter)//設(shè)置節(jié)點(diǎn)過(guò)濾器 // .pingBeforeActivateConnection(true)//激活連接前設(shè)置PING,默認(rèn)true; // .protocolVersion(ProtocolVersion.RESP3)//設(shè)置協(xié)議版本,默認(rèn)RESP3; // .publishOnScheduler(false)//使用專(zhuān)用的調(diào)度器發(fā)出響應(yīng)信號(hào),默認(rèn)false,啟用時(shí)數(shù)據(jù)信號(hào)將使用服務(wù)的多線程發(fā)出; // .requestQueueSize(requestQueueSize)//設(shè)置每個(gè)連接請(qǐng)求隊(duì)列大小; // .scriptCharset(scriptCharset)//設(shè)置Lua腳本編碼為byte[]的字符集,默認(rèn)StandardCharsets.UTF_8; // .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(10)).keepAlive(true).tcpNoDelay(true).build())//設(shè)置低級(jí)套接字的屬性 // .sslOptions(SslOptions.builder().build())//設(shè)置ssl屬性 // .suspendReconnectOnProtocolFailure(false)//當(dāng)重新連接遇到協(xié)議失敗時(shí)暫停重新連接(SSL驗(yàn)證,連接失敗前PING),默認(rèn)值為false; // .timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(10)))//設(shè)置超時(shí)來(lái)取消和終止命令; .topologyRefreshOptions(clusterTopologyRefreshOptions)//設(shè)置拓?fù)涓略O(shè)置 .validateClusterNodeMembership(true)//在允許連接到集群節(jié)點(diǎn)之前,驗(yàn)證集群節(jié)點(diǎn)成員關(guān)系,默認(rèn)值為true; .build(); clusterClient.setDefaultTimeout(Duration.ofSeconds(5L)); clusterClient.setOptions(clusterClientOptions); StatefulRedisClusterConnection<String, String> clusterConn = clusterClient.connect(); clusterConn.setReadFrom(ReadFrom.ANY);//設(shè)置從哪些節(jié)點(diǎn)讀取數(shù)據(jù); RedisAdvancedClusterCommands<String, String> clusterCmd = clusterConn.sync(); clusterCmd.set("a", "A"); clusterCmd.set("b", "B"); clusterCmd.set("c", "C"); clusterCmd.set("d", "D"); System.out.println("get a=" + clusterCmd.get("a")); System.out.println("get b=" + clusterCmd.get("b")); System.out.println("get c=" + clusterCmd.get("c")); System.out.println("get d=" + clusterCmd.get("d")); //跨槽位命令 Map<String, String> kvmap = new HashMap<>(); kvmap.put("a", "AA"); kvmap.put("b", "BB"); kvmap.put("c", "CC"); kvmap.put("d", "DD"); clusterCmd.mset(kvmap);//Lettuce做了優(yōu)化,支持一些命令的跨槽位命令; System.out.println("Lettuce mget:" + clusterCmd.mget("a", "b", "c", "d")); //選定部分節(jié)點(diǎn)操作 NodeSelection<String, String> replicas = clusterCmd.replicas(); NodeSelectionCommands<String, String> replicaseCmd = replicas.commands(); Executions<KeyScanCursor<String>> executions = replicaseCmd.scan(ScanCursor.INITIAL); executions.forEach(s -> {System.out.println(s.getKeys());}); //訂閱發(fā)布消息 StatefulRedisClusterPubSubConnection<String, String> pubSubConn = clusterClient.connectPubSub(); pubSubConn.addListener(new RedisPubSubListener<String, String>() { @Override public void message(String channel, String message) { System.out.println("[message]ch:" + channel + ",msg:" + message); } @Override public void message(String pattern, String channel, String message) { } @Override public void subscribed(String channel, long count) { System.out.println("[subscribed]ch:" + channel); } @Override public void psubscribed(String pattern, long count) { } @Override public void unsubscribed(String channel, long count) { } @Override public void punsubscribed(String pattern, long count) { } }); pubSubConn.sync().subscribe("TEST_Ch");//(回調(diào)內(nèi)部使用阻塞調(diào)用或者lettuce同步api調(diào)用,需使用異步訂閱) clusterCmd.publish("TEST_Ch", "MSGMSGMSG"); //響應(yīng)式訂閱,可以監(jiān)聽(tīng)ChannelMessage和PatternMessage,使用鏈?zhǔn)竭^(guò)濾處理計(jì)算等操作 RedisClusterPubSubReactiveCommands<String, String> pubsubReactive = pubSubConn.reactive(); pubsubReactive.subscribe("TEST_Ch2").subscribe(); pubsubReactive.observeChannels() .filter(chmsg -> {return chmsg.getMessage().contains("tom");}) .doOnNext(chmsg -> {System.out.println("<tom>" + chmsg.getChannel() + ">>" + chmsg.getMessage());}) .subscribe(); clusterCmd.publish("TEST_Ch2", "send to jerry"); clusterCmd.publish("TEST_Ch", "tom MSG"); clusterCmd.publish("TEST_Ch2", "this is tom"); //keySpaceEvent事件 StatefulRedisClusterPubSubConnection<String, String> clusterPubSubConn = clusterClient.connectPubSub(); clusterPubSubConn.setNodeMessagePropagation(true);//啟用禁用節(jié)點(diǎn)消息傳播到該listener,例如只能在本節(jié)點(diǎn)通知的鍵事件通知; RedisPubSubListener<String, String> listener = new RedisPubSubListener<String, String>() { @Override public void unsubscribed(String channel, long count) { System.out.println("unsubscribed_ch:" + channel); } @Override public void subscribed(String channel, long count) { System.out.println("subscribed_ch:" + channel); } @Override public void punsubscribed(String pattern, long count) { System.out.println("punsubscribed_pattern:" + pattern); } @Override public void psubscribed(String pattern, long count) { System.out.println("psubscribed_pattern:" + pattern); } @Override public void message(String pattern, String channel, String message) { System.out.println("message_pattern:" + pattern + " ch:" + channel + " msg:" + message); } @Override public void message(String channel, String message) { System.out.println("message_ch:" + channel + " msg:" + message); } }; clusterPubSubConn.addListener(listener); PubSubAsyncNodeSelection<String, String> allPubSubAsyncNodeSelection = clusterPubSubConn.async().all(); NodeSelectionPubSubAsyncCommands<String, String> pubsubAsyncCmd = allPubSubAsyncNodeSelection.commands(); clusterCmd.setex("a", 1, "A"); pubsubAsyncCmd.psubscribe("__keyspace@0__:*"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("end"); } }
運(yùn)行結(jié)果:
另外,還有一個(gè)cluster專(zhuān)用的Listener:RedisClusterPubSubListener,可以從listener里獲得發(fā)布消息的節(jié)點(diǎn)信息:
RedisClusterPubSubListener<String, String> clusterListener = new RedisClusterPubSubListener<String, String>() { @Override public void message(RedisClusterNode node, String channel, String message) { } @Override public void message(RedisClusterNode node, String pattern, String channel, String message) { } @Override public void subscribed(RedisClusterNode node, String channel, long count) { } @Override public void psubscribed(RedisClusterNode node, String pattern, long count) { } @Override public void unsubscribed(RedisClusterNode node, String channel, long count) { } @Override public void punsubscribed(RedisClusterNode node, String pattern, long count) { } };
到此這篇關(guān)于Redis Lettuce連接redis集群實(shí)現(xiàn)過(guò)程詳細(xì)講解的文章就介紹到這了,更多相關(guān)Redis Lettuce連接redis集群內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java安全之Tomcat6 Filter內(nèi)存馬問(wèn)題
這篇文章主要介紹了Java安全之Tomcat6 Filter內(nèi)存馬,通過(guò)本文探討下Tomcat6與Tomcat8之間的區(qū)別,主要看下tomcat6和tomcat8之間createFilterChain不相同的地方 看到ApplicationFilterFactory#createFilterChain,需要的朋友可以參考下2022-10-10使用Springboot 打jar包實(shí)現(xiàn)分離依賴(lài)lib和配置
這篇文章主要介紹了使用Springboot 打jar包實(shí)現(xiàn)分離依賴(lài)lib和配置方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02詳解RabbitMQ延遲隊(duì)列的基本使用和優(yōu)化
這篇文章主要介紹了詳解RabbitMQ延遲隊(duì)列的基本使用和優(yōu)化,延遲隊(duì)列中的元素都是帶有時(shí)間屬性的。延遲隊(duì)列就是用來(lái)存放需要在指定時(shí)間被處理的元素的隊(duì)列,需要的朋友可以參考下2023-05-05Spring 中使用Quartz實(shí)現(xiàn)任務(wù)調(diào)度
這篇文章主要介紹了Spring 中使用Quartz實(shí)現(xiàn)任務(wù)調(diào)度,Spring中使用Quartz 有兩種方式,感興趣的小伙伴們可以參考一下。2017-02-02Spring?@DateTimeFormat日期格式化時(shí)注解場(chǎng)景分析
這篇文章主要介紹了Spring?@DateTimeFormat日期格式化時(shí)注解場(chǎng)景分析,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-05-05Java使用Junit4.jar進(jìn)行單元測(cè)試的方法
今天通過(guò)本文給大家介紹Java使用Junit4.jar進(jìn)行單元測(cè)試的方法,本文通過(guò)圖文實(shí)例相結(jié)合給大家介紹的非常詳細(xì),需要的朋友參考下吧2021-11-11