ZooKeeper命令及JavaAPI操作代碼
ZooKeeper數(shù)據(jù)模型
- ZooKeeper是一個(gè)樹形目錄服務(wù),其數(shù)據(jù)模型和Uiix的文件目錄樹很類似,擁有一個(gè)層次化結(jié)構(gòu)。
- 這里面的每一個(gè)節(jié)點(diǎn)都被稱為:ZNode,每個(gè)節(jié)點(diǎn)上都會保存自己的數(shù)據(jù)和節(jié)點(diǎn)信息。
- 節(jié)點(diǎn)可以擁有子節(jié)點(diǎn),同時(shí)也允許少量(1MB)數(shù)據(jù)存儲在該節(jié)點(diǎn)之下。
- 節(jié)點(diǎn)可以分為四大類:
- PEFSISTENT持久化節(jié)點(diǎn)
- EPHEMERAL臨時(shí)節(jié)點(diǎn):-e
- PERSISTENT_SEQUENTIAL持久化順序節(jié)點(diǎn):-s
- EPHEMERAL_SEQUENTIAL臨時(shí)順序節(jié)點(diǎn):-es
ZooKeeper服務(wù)端常用命令
- 啟動ZooKeeper服務(wù):
./zkServer.sh start
- 查看ZooKeeper服務(wù):
./zkServer.sh status
- 停止ZooKeeper服務(wù):
./zkServer.sh stop
- 重啟ZooKeeper服務(wù):
./zkServer.sh restart
ZooKeeper客戶端命令
- ./zkCli.sh -server localhost:2181連接服務(wù)端,如果是單機(jī)后面的可以省略不寫。
- ls [/] :查看指定節(jié)點(diǎn)下子節(jié)點(diǎn)
- create [/app] [hrbu]:創(chuàng)建一個(gè)名為/app1的子節(jié)點(diǎn),并存放數(shù)據(jù)。
- get [/app] :獲取節(jié)點(diǎn)下的數(shù)據(jù)。
- set [/app] [hrbu]:給指定節(jié)點(diǎn)設(shè)置數(shù)據(jù)
- delete [/app] :刪除指定節(jié)點(diǎn) ps:此命令無法刪除存在子節(jié)點(diǎn)的節(jié)點(diǎn),如果要刪除帶有子節(jié)點(diǎn)的節(jié)點(diǎn)可以是使用deleteall [/app] 命令。
- quit 斷開連接
- help 查看命令幫助
- create -e [/app] 創(chuàng)建臨時(shí)節(jié)點(diǎn),會話關(guān)閉就會刪除
- create -s [/app] 創(chuàng)建順序節(jié)點(diǎn)
- create -es [/app] 創(chuàng)建臨時(shí)順序節(jié)點(diǎn)
- ls -s [/app] 查看節(jié)點(diǎn)的詳細(xì)信息
使用Curator API操作Zookeeper
建立連接
@Test public void testConnect() { //重試策略 ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000, 10); //第一種方式 CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.130.120:2181", 60 * 1000, 15 * 1000, retry); //第二種方式 CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("192.168.130.120:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retry).namespace("hrbu").build(); //開啟連接 client.start(); }
參數(shù)解讀
- connectString – list of servers to connect to (ZooKeeper的地址)
sessionTimeoutMs – session timeout (會話超時(shí)時(shí)間)
connectionTimeoutMs – connection timeout (連接超時(shí)時(shí)間)
retryPolicy – retry policy to use (重試策略)
會話超時(shí)時(shí)間和連接超時(shí)時(shí)間有默認(rèn)值。
第二種鏈?zhǔn)骄幊痰姆绞娇梢灾付ㄒ粋€(gè)工作空間,在此客戶端下的所有操作都會將此工作空間作為根目錄。
注意
如果使用的是云服務(wù)器需要將指定端口打開
firewall-cmd --zone=public --add-port=2181/tcp --permanent
開放端口
firewall-cmd --zone=public --list-ports
查看已經(jīng)開放的端口
systemctl restart firewalld
重啟防火墻生效
最后別忘了在服務(wù)器的安全組里面添加端口,將2181端口打開
添加節(jié)點(diǎn)
@Test public void testCreate1() throws Exception { //基本創(chuàng)建 CreateBuilder createBuilder = client.create(); //創(chuàng)建時(shí)不指定數(shù)據(jù),會將當(dāng)前客戶端ip存到里面 createBuilder.forPath("/app1"); //指定數(shù)據(jù) createBuilder.forPath("/app2", "hello".getBytes()); } @Test public void testCreate2() throws Exception { CreateBuilder createBuilder = client.create(); //設(shè)置節(jié)點(diǎn)類型,默認(rèn)的類型是持久化 //CreateMode是枚舉類型 createBuilder.withMode(CreateMode.EPHEMERAL).forPath("/app3"); } @Test public void testCreate3() throws Exception { CreateBuilder createBuilder = client.create(); //創(chuàng)建多級節(jié)點(diǎn),如果父節(jié)點(diǎn)不存在,則創(chuàng)建父節(jié)點(diǎn)。 createBuilder.creatingParentContainersIfNeeded().forPath("/app4/app4_1"); }
查詢節(jié)點(diǎn)
@Test public void testGet() throws Exception { //查詢數(shù)據(jù) byte[] bytes = client.getData().forPath("/app1"); System.out.println(new String(bytes)); //查詢子節(jié)點(diǎn) List<String> strings = client.getChildren().forPath("/app4"); strings.forEach(System.out::println); //查詢節(jié)點(diǎn)狀態(tài)信息 Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath("/app1"); System.out.println(stat); }
修改節(jié)點(diǎn)
@Test public void testSet() throws Exception { //修改數(shù)據(jù) client.setData().forPath("/app1","hrbu".getBytes()); //根據(jù)版本修改 int version = 0; Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath("/app1"); version = stat.getVersion(); client.setData().withVersion(version).forPath("/app1", "HRBU".getBytes()); }
刪除節(jié)點(diǎn)
@Test public void testDelete() throws Exception { //刪除單個(gè)節(jié)點(diǎn) client.delete().forPath("/app4/app4_1"); //刪除帶有子節(jié)點(diǎn)的節(jié)點(diǎn) client.delete().deletingChildrenIfNeeded().forPath("/app4"); //強(qiáng)制刪除 client.delete().guaranteed().forPath("/app4"); //回調(diào) client.delete().guaranteed().inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("執(zhí)行刪除操作"); } }).forPath("/app4"); }
Watch事件監(jiān)聽
- Zookeeper允許用戶在指定節(jié)點(diǎn)上注冊一些Watcher,并且在一些特定事件觸發(fā)的時(shí)候,ZooKeeper服務(wù)端會將事件通知到感興趣的客戶端上去,該機(jī)制是ZooKeeper實(shí)現(xiàn)分布式協(xié)調(diào)服務(wù)的重要特性。
- ZooKeeper中引入了Watcher機(jī)制來實(shí)現(xiàn)了發(fā)布/訂閱功能,能夠讓多個(gè)訂閱者同時(shí)監(jiān)聽某一個(gè)對象,當(dāng)一個(gè)對象自身狀態(tài)變化時(shí),會通知所有訂閱者。
- ZooKeeper原生支持通過注冊Watcher來進(jìn)行事件監(jiān)聽,但是使用并不是特別方便,需要開發(fā)人員自己反復(fù)注冊Watcher,比較繁瑣。
- Curator引入了Cache來時(shí)限對Zookeeper服務(wù)端事件的監(jiān)聽。
- ZooKeeper提供了三種Watcher:
- NodeCache:只是監(jiān)聽某一個(gè)特定的節(jié)點(diǎn)。
- PathChildrenCache:監(jiān)控一個(gè)Node的子節(jié)點(diǎn)。
- TreeCache:可以監(jiān)控整個(gè)樹上的所有節(jié)點(diǎn),類似于PathChildrenCache和NodeCache的組合。
NodeCache
@Test public void testNodeCache() throws Exception { //NodeCache:指定一個(gè)節(jié)點(diǎn)注冊監(jiān)聽器 //創(chuàng)建NodeCache對象 final NodeCache nodeCache = new NodeCache(client, "/app1"); //注冊監(jiān)聽 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("app1節(jié)點(diǎn)發(fā)生變化"); //獲取修改節(jié)點(diǎn)后的數(shù)據(jù) byte[] data = nodeCache.getCurrentData().getData(); System.out.println("變化后的節(jié)點(diǎn):"+new String(data)); } }); //開啟監(jiān)聽,如果為true,則開啟則開啟監(jiān)聽,加載緩沖數(shù)據(jù) nodeCache.start(true); }
PathChildrenCache
@Test public void testPathChildrenCache() throws Exception { //PathChildrenCache:監(jiān)聽某個(gè)節(jié)點(diǎn)的所有子節(jié)點(diǎn) //創(chuàng)建監(jiān)聽對象 PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/hrbu", true); //綁定監(jiān)聽器 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { System.out.println("子節(jié)點(diǎn)發(fā)生變化"); System.out.println(pathChildrenCacheEvent); //監(jiān)聽子節(jié)點(diǎn)的數(shù)據(jù)變更,并且得到變更后的數(shù)據(jù) //獲取類型 PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType(); //判斷類型 if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { //獲取數(shù)據(jù) byte[] data = pathChildrenCacheEvent.getData().getData(); System.out.println(new String(data)); } } }); //開啟 pathChildrenCache.start(); }
TreeCache
@Test public void testTreeCache() throws Exception { //創(chuàng)建監(jiān)聽器 TreeCache treeCache = new TreeCache(client, "/"); //注冊監(jiān)聽 treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception { System.out.println("節(jié)點(diǎn)發(fā)生變化"); System.out.println(treeCacheEvent); } }); //開啟 treeCache.start(); }
分布式鎖實(shí)現(xiàn)
概述
- 我們在進(jìn)行單機(jī)應(yīng)用開發(fā),涉及并發(fā)同步的時(shí)候,,我們往往采用synchronized或者lock的方式來解決多線程間的代碼同步問題,這時(shí)候多線程的運(yùn)行都是在同一個(gè)JVM之下,沒有任何問題。
- 但當(dāng)我們的應(yīng)用時(shí)分布式集群工作的情況下,屬于多JVM下的工作環(huán)境,跨JVM之間已經(jīng)無法通過多線程的鎖解決同步問題。
- 那么就需要一種更加高級的鎖機(jī)制,來處理跨機(jī)器進(jìn)程之間的數(shù)據(jù)同步問題,這就是分布式鎖。
Zookeeper分布式鎖原理
- 核心思想:當(dāng)客戶端要獲取鎖,則創(chuàng)建節(jié)點(diǎn),使用完鎖,則刪除該節(jié)點(diǎn)。
- 1.客戶端獲取鎖時(shí),在lock節(jié)點(diǎn)下創(chuàng)建臨時(shí)順序節(jié)點(diǎn)。
- 2.然后獲取lock下面的所有子節(jié)點(diǎn),客戶端獲取到所有的子節(jié)之后,如果發(fā)現(xiàn)自己創(chuàng)建的子節(jié)點(diǎn)序號最小,那么就認(rèn)為該客戶端獲取到了鎖。使用完鎖后,將該節(jié)點(diǎn)刪除。
- 3.如果發(fā)現(xiàn)自己創(chuàng)建的節(jié)點(diǎn)并非lock所有子節(jié)點(diǎn)中最小的,說明自己還沒有獲取到鎖,此時(shí)客戶端需要找到比自己小的那個(gè)節(jié)點(diǎn),同時(shí)對其注冊事件監(jiān)聽器,監(jiān)聽刪除事件。
- 4.如果發(fā)現(xiàn)比自己小的那個(gè)節(jié)點(diǎn)被刪除,則客戶端的Watcher會收到相應(yīng)通知,此時(shí)再次判斷自己創(chuàng)建的節(jié)點(diǎn)是否時(shí)lock子節(jié)點(diǎn)中序號最小的,如果是則獲取到了鎖,如果不是則重復(fù)以上步驟繼續(xù)獲取比自己小的一個(gè)節(jié)點(diǎn)并注冊監(jiān)聽。
Curator實(shí)現(xiàn)分布式鎖API
在Curator中有五種鎖方案:
- InterProcessSemaphoreMutex:分布式排它鎖(非可重入鎖)
- InterProcessMutex:分布式可重入排它鎖
- InterProcessReadWriteLock:分布式讀寫鎖
- InterProcessMultiLock:將多個(gè)鎖作為單個(gè)實(shí)體管理的容器
- InterProcessSemaphoreV2:共享信號量
package com.hrbu.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.concurrent.TimeUnit; public class Ticket12306 implements Runnable{ private int tickets = 10;//數(shù)據(jù)庫的票數(shù) private InterProcessMutex lock ; public Ticket12306(){ //重試策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("8.130.32.75:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retryPolicy) .build(); //開啟連接 client.start(); lock = new InterProcessMutex(client,"/lock"); } @Override public void run() { while(true){ //獲取鎖 try { lock.acquire(3, TimeUnit.SECONDS); if(tickets > 0){ System.out.println(Thread.currentThread()+":"+tickets); Thread.sleep(100); tickets--; } } catch (Exception e) { e.printStackTrace(); }finally { //釋放鎖 try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } } }
package com.hrbu.curator; public class LockTest { public static void main(String[] args) { Ticket12306 ticket12306 = new Ticket12306(); //創(chuàng)建客戶端 Thread t1 = new Thread(ticket12306,"攜程"); Thread t2 = new Thread(ticket12306,"飛豬"); t1.start(); t2.start(); } }
到此這篇關(guān)于ZooKeeper命令及JavaAPI操作的文章就介紹到這了,更多相關(guān)ZooKeeper JavaAPI操作內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis Plus 增刪改查的實(shí)現(xiàn)(小白教程)
本文主要介紹了Mybatis Plus 增刪改查,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-09-09SpringBoot2.0整合SpringCloud Finchley @hystrixcommand注解找不到解決方案
這篇文章主要介紹了SpringBoot2.0整合SpringCloud Finchley @hystrixcommand注解找不到解決方案,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-08-08SpringBoot通過Filter實(shí)現(xiàn)整個(gè)項(xiàng)目接口的SQL注入攔截詳解
這篇文章主要介紹了SpringBoot通過Filter實(shí)現(xiàn)整個(gè)項(xiàng)目接口的SQL注入攔截詳解,SQL注入是比較常見的網(wǎng)絡(luò)攻擊方式之一,在客戶端在向服務(wù)器發(fā)送請求的時(shí)候,sql命令通過表單提交或者url字符串拼接傳遞到后臺持久層,最終達(dá)到欺騙服務(wù)器執(zhí)行惡意的SQL命令,需要的朋友可以參考下2023-12-12java Volatile與Synchronized的區(qū)別
這篇文章主要介紹了java Volatile與Synchronized的區(qū)別,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下2020-12-12