在Java中操作Zookeeper的示例代碼詳解
依賴(lài)
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
連接到zkServer
//連接字符串,zkServer的ip、port,如果是集群逗號(hào)分隔
String connectStr = "192.168.1.9:2181";
//zookeeper就是一個(gè)zkCli
ZooKeeper zooKeeper = null;
try {
//初始次數(shù)為1。后面要在內(nèi)部類(lèi)中使用,三種寫(xiě)法:1、寫(xiě)成外部類(lèi)成員變量,不用加final;2、作為函數(shù)局部變量,放在try外面,寫(xiě)成final;3、寫(xiě)在try中,不加final
CountDownLatch countDownLatch = new CountDownLatch(1);
//超時(shí)時(shí)間ms,監(jiān)聽(tīng)器
zooKeeper = new ZooKeeper(connectStr, 5000, new Watcher() {
public void process(WatchedEvent watchedEvent) {
//如果狀態(tài)變成已連接
if (watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
System.out.println("連接成功");
//次數(shù)-1
countDownLatch.countDown();
}
}
});
//等待,次數(shù)為0時(shí)才會(huì)繼續(xù)往下執(zhí)行。等待監(jiān)聽(tīng)器監(jiān)聽(tīng)到連接成功,才能操作zk
countDownLatch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
//...操作zk。后面的demo都是寫(xiě)在此處的
//關(guān)閉連接
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
檢測(cè)節(jié)點(diǎn)是否存在
// 檢測(cè)節(jié)點(diǎn)是否存在
// 同步方式
Stat exists = null;
try {
//如果存在,返回節(jié)點(diǎn)狀態(tài)Stat;如果不存在,返回null。第二個(gè)參數(shù)是watch
exists = zooKeeper.exists("/mall",false);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
if (exists==null){
System.out.println("節(jié)點(diǎn)不存在");
}
else {
System.out.println("節(jié)點(diǎn)存在");
}
//異步回調(diào)
zooKeeper.exists("/mall",false, new AsyncCallback.StatCallback() {
//第二個(gè)是path znode路徑,第三個(gè)是ctx 后面?zhèn)魅雽?shí)參,第四個(gè)是znode的狀態(tài)
public void processResult(int i, String s, Object o, Stat stat) {
//如果節(jié)點(diǎn)不存在,返回的stat是null
if (stat==null){
System.out.println("節(jié)點(diǎn)不存在");
}
else{
System.out.println("節(jié)點(diǎn)存在");
}
}
// 傳入ctx,Object類(lèi)型
},null);
操作后,服務(wù)端會(huì)返回處理結(jié)果,返回void、null也算處理結(jié)果。
同步指的是當(dāng)前線(xiàn)程阻塞,等待服務(wù)端返回?cái)?shù)據(jù),收到返回的數(shù)據(jù)才繼續(xù)往下執(zhí)行;
異步回調(diào)指的是,把對(duì)結(jié)果(返回的數(shù)據(jù))的處理寫(xiě)在回調(diào)函數(shù)中,當(dāng)前線(xiàn)程不等待返回的數(shù)據(jù),繼續(xù)往下執(zhí)行,收到返回的數(shù)據(jù)時(shí)自動(dòng)調(diào)用回調(diào)函數(shù)來(lái)處理。
如果處理返回?cái)?shù)據(jù)的代碼之后的操作,不依賴(lài)返回?cái)?shù)據(jù)、對(duì)返回?cái)?shù)據(jù)的處理,那么可以把返回?cái)?shù)據(jù)的處理寫(xiě)成回調(diào)函數(shù)。
創(chuàng)建節(jié)點(diǎn)
//創(chuàng)建節(jié)點(diǎn)
//同步方式
try {
//數(shù)據(jù)要寫(xiě)成byte[],不攜帶數(shù)據(jù)寫(xiě)成null;默認(rèn)acl權(quán)限使用ZooDefs.Ids.OPEN_ACL_UNSAFE;最后一個(gè)是節(jié)點(diǎn)類(lèi)型,P是永久,E是臨時(shí),S是有序
zooKeeper.create("/mall", "abcd".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("已創(chuàng)建節(jié)點(diǎn)/mall");
//如果節(jié)點(diǎn)已存在,會(huì)拋出異常
} catch (KeeperException | InterruptedException e) { System.out.println("創(chuàng)建節(jié)點(diǎn)/mall失敗,請(qǐng)檢查節(jié)點(diǎn)是否已存在");
e.printStackTrace();
}
//異步回調(diào)
zooKeeper.create("/mall", "abcd".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.Create2Callback(){
//第二個(gè)path,第三個(gè)ctx,第四個(gè)節(jié)點(diǎn)狀態(tài)
public void processResult(int i, String s, Object o, String s1, Stat stat) {
//回調(diào)方式不拋出異常,返回的stat是創(chuàng)建節(jié)點(diǎn)的狀態(tài),如果節(jié)點(diǎn)已存在,返回的stat是null
if (stat==null){
System.out.println("創(chuàng)建節(jié)點(diǎn)/mall失敗,請(qǐng)檢查節(jié)點(diǎn)是否已存在");
}
else {
System.out.println("節(jié)點(diǎn)創(chuàng)建成功");
}
}
//ctx實(shí)參
},null);
刪除節(jié)點(diǎn)
//刪除節(jié)點(diǎn)
//同步方式
try {
//第二個(gè)參數(shù)是版本號(hào),-1表示可以是任何版本
zooKeeper.delete("/mall1",-1);
System.out.println("成功刪除節(jié)點(diǎn)/mall");
} catch (InterruptedException | KeeperException e) {
System.out.println("刪除節(jié)點(diǎn)/mall失敗");
e.printStackTrace();
}
//異步回調(diào)
zooKeeper.delete("/mall2", -1, new AsyncCallback.VoidCallback() {
//第二個(gè)是path,第三個(gè)是ctx
public void processResult(int i, String s, Object o) {
}
//
//ctx實(shí)參
},null);
delete()只能刪除沒(méi)有子節(jié)點(diǎn)的znode,如果該znode有子節(jié)點(diǎn)會(huì)拋出異常。
沒(méi)有提供遞歸刪除子節(jié)點(diǎn)的方法,如果要?jiǎng)h除帶有子節(jié)點(diǎn)的znode,需要自己實(shí)現(xiàn)遞歸刪除??梢韵萭etChildren()獲取子節(jié)點(diǎn)列表,遍歷列表依次刪除子節(jié)點(diǎn),再刪除父節(jié)點(diǎn)。
獲取子節(jié)點(diǎn)列表
//獲取子節(jié)點(diǎn)列表,List<String>,比如/mall/user,/mall/order,返回的是["user"、"order"]
//同步方式
List<String> children = null;
try {
//第二個(gè)參數(shù)是watch
children = zooKeeper.getChildren("/mall", false);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
System.out.println("子節(jié)點(diǎn)列表:" + children);
//異步
zooKeeper.getChildren("/mall", false, new AsyncCallback.ChildrenCallback() {
//第二個(gè)起依次是:path、ctx、返回的子節(jié)點(diǎn)列表
public void processResult(int i, String s, Object o, List<String> list) {
System.out.println("子節(jié)點(diǎn)列表:" + list);
}
//ctx實(shí)參
}, null);
只獲取子節(jié)點(diǎn),不獲取孫節(jié)點(diǎn)。
watch都是:可以寫(xiě)boolean,要添加監(jiān)聽(tīng)就寫(xiě)true,不監(jiān)聽(tīng)寫(xiě)false;也可以寫(xiě)Watcher對(duì)象,new一個(gè)Watcher對(duì)象表示要監(jiān)聽(tīng),null表示不監(jiān)聽(tīng)。
獲取節(jié)點(diǎn)數(shù)據(jù)
//獲取節(jié)點(diǎn)數(shù)據(jù),返回byte[]
//同步方式
byte[] data = null;
try {
//第二個(gè)參數(shù)是watch,第三個(gè)是stat
data = zooKeeper.getData("/mall", false, null);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
//調(diào)用new String()時(shí)要判斷data是否為null,如果是null會(huì)拋NPE
if (data==null){
System.out.println("該節(jié)點(diǎn)沒(méi)有數(shù)據(jù)");
}
else{
System.out.println("節(jié)點(diǎn)數(shù)據(jù):"+new String(data));
}
//異步回調(diào)
zooKeeper.getData("/mall", false, new AsyncCallback.DataCallback() {
//第二個(gè)起依次是:path、ctx、返回的節(jié)點(diǎn)數(shù)據(jù)、節(jié)點(diǎn)狀態(tài)
public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
//不必判斷bytes是否是null,如果節(jié)點(diǎn)沒(méi)有數(shù)據(jù),不會(huì)調(diào)用回調(diào)函數(shù);執(zhí)行到此,說(shuō)明bytes不是null
System.out.println("節(jié)點(diǎn)數(shù)據(jù):" + new String(bytes) );
}
//ctx實(shí)參
}, null);
設(shè)置|修改節(jié)點(diǎn)數(shù)據(jù)
//設(shè)置|更新節(jié)點(diǎn)據(jù)
//同步方式
try {
//最后一個(gè)參數(shù)是版本號(hào)
zooKeeper.setData("/mall", "1234".getBytes(), -1);
System.out.println("設(shè)置節(jié)點(diǎn)數(shù)據(jù)成功");
} catch (KeeperException | InterruptedException e) {
System.out.println("設(shè)置節(jié)點(diǎn)數(shù)據(jù)失敗");
e.printStackTrace();
}
//異步回調(diào)
zooKeeper.setData("/mall", "1234".getBytes(), -1, new AsyncCallback.StatCallback() {
//第二個(gè)是path,第三個(gè)是ctx
public void processResult(int i, String s, Object o, Stat stat) {
}
// ctx
},null);
設(shè)置acl權(quán)限
//設(shè)置acl權(quán)限
//第一個(gè)參數(shù)指定權(quán)限,第二個(gè)是Id對(duì)象
ACL acl = new ACL(ZooDefs.Perms.ALL, new Id("auth", "chy:abcd"));
List<ACL> aclList = new ArrayList<>();
aclList.add(acl);
//如果List中只有一個(gè)ACL對(duì)象,也可以這樣寫(xiě)
//List<ACL> aclList = Collections.singletonList(auth);
//驗(yàn)證權(quán)限,需寫(xiě)在設(shè)置權(quán)限之前。如果之前沒(méi)有設(shè)置權(quán)限,也需要先驗(yàn)證本次即將設(shè)置的用戶(hù)
zooKeeper.addAuthInfo("digest","chy:abcd".getBytes());
//方式一 setAcl
try {
//第二個(gè)參數(shù)是List<ACL>,第三個(gè)參數(shù)是版本號(hào)
zooKeeper.setACL("/mall", aclList, -1);
System.out.println("設(shè)置權(quán)限成功");
} catch (KeeperException | InterruptedException e) {
System.out.println("設(shè)置權(quán)限失敗");
e.printStackTrace();
}
//方式二 在創(chuàng)建節(jié)點(diǎn)時(shí)設(shè)置權(quán)限
try {
zooKeeper.create("/mall","abcd".getBytes(),aclList,CreateMode.PERSISTENT);
System.out.println("已創(chuàng)建節(jié)點(diǎn)并設(shè)置權(quán)限");
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
設(shè)置權(quán)限之后,連接zkServer進(jìn)行操作時(shí),都需要先驗(yàn)證用戶(hù)。
此處未寫(xiě)對(duì)應(yīng)的異步回調(diào)。
查看acl權(quán)限
//查看acl權(quán)限
//設(shè)置權(quán)限之后,以后操作時(shí)需要先驗(yàn)證用戶(hù),一次session中驗(yàn)證一次即可
zooKeeper.addAuthInfo("digest","chy:abcd".getBytes());
//同步方式
try {
List<ACL> aclList = zooKeeper.getACL("/mall", null);
System.out.println("acl權(quán)限:"+aclList);
} catch (KeeperException | InterruptedException e) {
System.out.println("獲取acl權(quán)限失敗");
e.printStackTrace();
}
//異步回調(diào)
zooKeeper.getACL("/mall3", null, new AsyncCallback.ACLCallback() {
//第二個(gè)起:path、ctx、獲取到的List<ACL>、節(jié)點(diǎn)狀態(tài)
public void processResult(int i, String s, Object o, List<ACL> list, Stat stat) {
//就算沒(méi)有手動(dòng)設(shè)置acl權(quán)限,默認(rèn)也是有值的
System.out.println("acl權(quán)限:"+list);
}
//ctx實(shí)參
},null);
添加監(jiān)聽(tīng)器
//添加監(jiān)聽(tīng) 方式一
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper.getData("/mall", new Watcher() {
public void process(WatchedEvent watchedEvent) {
//watcher會(huì)監(jiān)聽(tīng)該節(jié)點(diǎn)所有的事件,不管發(fā)生什么事件都會(huì)調(diào)用process()來(lái)處理,需要先判斷一下事件類(lèi)型
if (watchedEvent.getType().equals(Event.EventType.NodeDataChanged)){
System.out.println("節(jié)點(diǎn)數(shù)據(jù)改變了");
//會(huì)一直監(jiān)聽(tīng),如果只監(jiān)聽(tīng)一次數(shù)據(jù)改變,將下面這句代碼取消注釋即可
//countDownLatch.countDown();
}
}
}, null);
//默認(rèn)watcher是一次性的,如果要一直監(jiān)聽(tīng),需要借助CountDownLatch
countDownLatch.await();
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
ZooKeeper類(lèi)的exists()、getData()、getChildren()方法都具有添加監(jiān)聽(tīng)的功能,用法類(lèi)似。
watchedEvent.getType().equals(Event.EventType.NodeDataChanged)
watchedEvent.getState().equals(Event.KeeperState.SyncConnected)
getType是獲取事件類(lèi)型,getState是獲取連接狀態(tài)。
上面這種方式,會(huì)遞歸監(jiān)聽(tīng)子孫節(jié)點(diǎn),子孫節(jié)點(diǎn)的數(shù)據(jù)改變也算NodeDataChanged,子孫節(jié)點(diǎn)的創(chuàng)建|刪除也算NodeCreated|NodeDeleted。
//添加監(jiān)聽(tīng) 方式二
try {
CountDownLatch countDownLatch1 = new CountDownLatch(1);
zooKeeper.addWatch("/mall", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType().equals(Event.EventType.NodeDataChanged)){
System.out.println("節(jié)點(diǎn)數(shù)據(jù)改變了");
//如果只監(jiān)聽(tīng)一次數(shù)據(jù)改變,將下面這句代碼注釋掉
//countDownLatch1.countDown();
}
}
//監(jiān)聽(tīng)模式,PERSISTENT是不監(jiān)聽(tīng)子孫節(jié)點(diǎn),PERSISTENT_RECURSIVE是遞歸監(jiān)聽(tīng)子孫節(jié)點(diǎn)
}, AddWatchMode.PERSISTENT_RECURSIVE);
countDownLatch1.await();
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
countDownLatch1.await();要阻塞線(xiàn)程,最好啟動(dòng)一條新線(xiàn)程來(lái)監(jiān)聽(tīng)。
只有設(shè)置了監(jiān)聽(tīng)的zkCli,該節(jié)點(diǎn)發(fā)生事件時(shí)才會(huì)收到zkServer的通知。
watch只保存在zkServer的內(nèi)存中(zk依賴(lài)jdk,運(yùn)行在jvm上,堆中的session對(duì)象),不持久化到硬盤(pán),就是說(shuō)設(shè)置的監(jiān)聽(tīng)只在本次會(huì)話(huà)期間有效,zkCli關(guān)閉連接,zkServer在指定時(shí)間后(默認(rèn)連續(xù)沒(méi)有收到10個(gè)心跳),zkServer會(huì)自動(dòng)刪除相關(guān)session,watcher丟失。
移除監(jiān)聽(tīng)
//移除監(jiān)聽(tīng) 方式一
try {
zooKeeper.addWatch("/mall",null,AddWatchMode.PERSISTENT);
System.out.println("已移除監(jiān)聽(tīng)");
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
就是上面添加監(jiān)聽(tīng)的哪些方法,watch|watcher參數(shù),如果是boolean類(lèi)型,設(shè)置為false即可關(guān)閉監(jiān)聽(tīng);如果是Watcher類(lèi)型,可以設(shè)置null覆蓋掉之前設(shè)置的監(jiān)聽(tīng)。
//移除監(jiān)聽(tīng) 方式二
try {
//第二個(gè)參數(shù)是Watcher,原來(lái)添加的那個(gè)Watcher監(jiān)聽(tīng)對(duì)象,不能是null
//第三個(gè)參數(shù)指定要移除監(jiān)聽(tīng)的哪部分,Any是移除整個(gè)監(jiān)聽(tīng),Data是移除對(duì)數(shù)據(jù)的監(jiān)聽(tīng),Children是移除對(duì)子節(jié)點(diǎn)的遞歸監(jiān)聽(tīng)
//最后一個(gè)參數(shù)指定未連接到zkServe時(shí),是否移除本地監(jiān)聽(tīng)部分
zooKeeper.removeWatches("/mall",watcher, Watcher.WatcherType.Children,true);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
監(jiān)聽(tīng)由2部分組成,一部分在zkServer上,事件發(fā)生時(shí)通知對(duì)應(yīng)的zkCli;一部分在zkCli,收到zkServer的通知時(shí)做出一些處理。
最后一個(gè)參數(shù)指定未連接到zkServer,是否移除本地(zkCli)監(jiān)聽(tīng)部分,true——移除,false——不移除。
比如說(shuō)沒(méi)有連接到zkServer,移除本地監(jiān)聽(tīng),10個(gè)心跳內(nèi)連上了zkServer,zkServer的監(jiān)聽(tīng)部分仍在,發(fā)生事件時(shí)仍會(huì)通知此zkCli,但zkCli本地監(jiān)聽(tīng)已經(jīng)移除了,對(duì)通知不會(huì)做出處理。
第一種方式會(huì)移除整個(gè)監(jiān)聽(tīng),不需要傳入監(jiān)聽(tīng)對(duì)象watcher;
第二種方式功能更全,可以指定移除監(jiān)聽(tīng)的哪個(gè)部分,但需要傳入watcher對(duì)象,添加監(jiān)聽(tīng)時(shí)要用一個(gè)變量來(lái)保存watcher對(duì)象。
到此這篇關(guān)于在Java中操作Zookeeper的示例代碼詳解的文章就介紹到這了,更多相關(guān)Java操作Zookeeper內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java多種經(jīng)典排序算法(含動(dòng)態(tài)圖)
排序算法是老生常談的了,但是在面試中也有會(huì)被問(wèn)到,例如有時(shí)候,在考察算法能力的時(shí)候,不讓你寫(xiě)算法,就讓你描述一下,某個(gè)排序算法的思想以及時(shí)間復(fù)雜度或空間復(fù)雜度。我就遇到過(guò),直接問(wèn)快排的,所以這次我就總結(jié)梳理一下經(jīng)典的十大排序算法以及它們的模板代碼2021-04-04
Java統(tǒng)計(jì)代碼的執(zhí)行時(shí)間的N種方法
在日常開(kāi)發(fā)中經(jīng)常需要測(cè)試一些代碼的執(zhí)行時(shí)間,但又不想使用向 JMH(Java?Microbenchmark Harness,Java 微基準(zhǔn)測(cè)試套件)這么重的測(cè)試框架,所以本文就匯總了一些 Java 中比較常用的執(zhí)行時(shí)間統(tǒng)計(jì)方法,總共包含以下 6 種,需要的朋友可以參考下2022-08-08
Java設(shè)計(jì)模式之外觀(guān)模式(Facade模式)介紹
這篇文章主要介紹了Java設(shè)計(jì)模式之外觀(guān)模式(Facade模式)介紹,外觀(guān)模式(Facade)的定義:為子系統(tǒng)中的一組接口提供一個(gè)一致的界面,需要的朋友可以參考下2015-03-03
Java軟件生產(chǎn)監(jiān)控工具Btrace使用方法詳解
這篇文章主要介紹了Java軟件生產(chǎn)監(jiān)控工具Btrace使用方法詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07
SpringBoot整合Mybatis使用Druid數(shù)據(jù)庫(kù)連接池
這篇文章主要介紹了SpringBoot整合Mybatis使用Druid數(shù)據(jù)庫(kù)連接池,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-03-03

