java連接zookeeper的實(shí)現(xiàn)示例
API
ZooKeeper官方提供了Java API,可以通過(guò)Java代碼來(lái)連接zookeeper服務(wù)進(jìn)行操作??梢赃B接、創(chuàng)建節(jié)點(diǎn)、獲取節(jié)點(diǎn)數(shù)據(jù)、監(jiān)聽節(jié)點(diǎn)變化等操作,具體有以下幾個(gè)重要的類:
- ZooKeeper:ZooKeeper類是Java API的核心類,用于與ZooKeeper服務(wù)器建立連接,并提供了一系列方法來(lái)操作ZooKeeper的節(jié)點(diǎn)。
- Watcher:Watcher是ZooKeeper的一個(gè)回調(diào)接口,當(dāng)節(jié)點(diǎn)發(fā)生變化時(shí)會(huì)調(diào)用相應(yīng)的方法進(jìn)行通知。
- CreateMode:CreateMode枚舉類定義了節(jié)點(diǎn)的類型,包括永久節(jié)點(diǎn)、臨時(shí)節(jié)點(diǎn)、順序節(jié)點(diǎn)和臨時(shí)順序節(jié)點(diǎn)。
- Stat:Stat類表示節(jié)點(diǎn)的元數(shù)據(jù)信息,比如修改版本、數(shù)據(jù)長(zhǎng)度、子節(jié)點(diǎn)數(shù)量等。
添加依賴
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.2</version>
</dependency>
操作例子
String host = "localhost:2181";
//建立連接
zooKeeper = new ZooKeeper(host, 2000, null);
String path = "/test";
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("Node changed: " + watchedEvent.getPath());
System.out.println(watchedEvent);
}
};
//獲取節(jié)點(diǎn)狀態(tài) 如果不存在返回null
Stat stat = zooKeeper.exists(path, false);
if(null != stat){
System.out.println(stat.getCzxid()+"-"+stat.getAversion());
}
//創(chuàng)建節(jié)點(diǎn) 包含版本、時(shí)間、數(shù)據(jù)長(zhǎng)度等信息
zooKeeper.create(path,"123".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
//添加watcher
zooKeeper.addWatch(path, watcher, AddWatchMode.PERSISTENT);
//獲取節(jié)點(diǎn)數(shù)據(jù)
byte[] data = zooKeeper.getData(path, false, null);
System.out.println(new String(data));
zooKeeper.create(path+"/1","child1".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
//獲取子節(jié)點(diǎn)
List<String> children = zooKeeper.getChildren(path, false);
System.out.println("childs size:"+children.size());
//刪除子節(jié)點(diǎn)
zooKeeper.delete(path+"/1",-1);
zooKeeper.close();
zkClient
zkClient封裝了zookeeper的官方api,簡(jiǎn)化了一些繁瑣的操作,并提供了一些額外的功能,提高了開發(fā)效.
添加依賴
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>
zkclient對(duì)節(jié)點(diǎn)數(shù)據(jù)的操作進(jìn)行了序列化, 這里先準(zhǔn)備一個(gè)string類型的序列化類。需要實(shí)現(xiàn)ZkSerializer接口
public class ZkStringSerializer implements ZkSerializer {
@Override
public byte[] serialize(Object o) throws ZkMarshallingError {
return String.valueOf(o).getBytes();
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
return new String(bytes);
}
}
基本操作
ZkClient zkClient = new ZkClient("localhost:2181");
//自定義序列化 否則報(bào)錯(cuò)
zkClient.setZkSerializer(new ZkStringSerializer());
String path = "/test";
//判斷節(jié)點(diǎn)是否存在
boolean exist = zkClient.exists(path);
System.out.println(exist);
if(!exist){//創(chuàng)建節(jié)點(diǎn)
zkClient.create(path,"123", CreateMode.PERSISTENT);
}
//讀取節(jié)點(diǎn)數(shù)據(jù)
System.out.println((String) zkClient.readData(path));
zkClient.writeData(path,"456");//設(shè)置節(jié)點(diǎn)數(shù)據(jù)
System.out.println((String) zkClient.readData(path));
zkClient.delete(path);//刪除節(jié)點(diǎn)
zkClient.close();
節(jié)點(diǎn)變化事件
String path = "/test";
/**
* 節(jié)點(diǎn)變化事件
* 只監(jiān)聽節(jié)點(diǎn)增減,不監(jiān)聽數(shù)據(jù)變化事件
*/
zkClient.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> children) throws Exception {
System.out.println("節(jié)點(diǎn)"+parentPath+"發(fā)生變化");
System.out.println(children);
}
});
//節(jié)點(diǎn)操作,觀察handleChildChange接收到對(duì)應(yīng)事件
Thread.sleep(2000);
zkClient.createPersistent(path);
Thread.sleep(2000);
zkClient.createPersistent(path+"/child1");
Thread.sleep(2000);
zkClient.writeData(path+"/child1","123");
Thread.sleep(2000);
zkClient.delete(path+"/child1");
Thread.sleep(2000);
zkClient.delete(path);
Thread.sleep(100000);
節(jié)點(diǎn)數(shù)據(jù)變化事件
String path = "/test";
/**
* 節(jié)點(diǎn)變化事件,只檢測(cè)當(dāng)前節(jié)點(diǎn),感知不到其子節(jié)點(diǎn)
* 節(jié)點(diǎn)被刪除或節(jié)點(diǎn)數(shù)據(jù)變化
*/
zkClient.subscribeDataChanges(path, new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
System.out.println("節(jié)點(diǎn):"+s+"數(shù)據(jù)變?yōu)?"+o);
}
@Override
public void handleDataDeleted(String s) throws Exception {
System.out.println("節(jié)點(diǎn):"+s+"刪除");
}
});
Thread.sleep(2000);
zkClient.createPersistent(path);
Thread.sleep(2000);
zkClient.createPersistent(path+"/child1");
Thread.sleep(2000);
zkClient.delete(path+"/child1");
Thread.sleep(2000);
zkClient.writeData(path,"123");
Thread.sleep(2000);
zkClient.delete(path);
Thread.sleep(100000);
}
Curator
curator是另一個(gè)java連接zookeeper類庫(kù)。功能更加強(qiáng)大。提供了連接重試、分布式鎖、選舉、隊(duì)列等多種實(shí)際場(chǎng)景的用例。這里先簡(jiǎn)單搞個(gè)使用例子。
添加依賴
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.1.0</version>
</dependency>
curator-framework是基礎(chǔ)的依賴,一些特定的使用方式需要添加不同的依賴,有curator-recipes、curator-x-discovery、curator-x-async等。
基本操作
//創(chuàng)建連接
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
String path = "/test";
client.checkExists().forPath(path);//判斷是否存在
client.create().forPath(path, "123".getBytes());//創(chuàng)建節(jié)點(diǎn)
byte[] data = client.getData().forPath(path);//獲取數(shù)據(jù)
System.out.println(new String(data));
client.setData().forPath(path, "456".getBytes());//設(shè)置數(shù)據(jù)
client.delete().forPath(path);//刪除節(jié)點(diǎn)
client.close();
節(jié)點(diǎn)監(jiān)聽
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
String path = "/test";
NodeCache nodeCache = new NodeCache(client,path);
//添加監(jiān)聽
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData data = nodeCache.getCurrentData();
if (data != null) {
System.out.println("Node changed: " + data.getPath() + ", value: " + new String(data.getData()));
} else {
System.out.println("Node deleted: " + nodeCache.getPath());
}
}
});
nodeCache.start();
client.create().forPath(path);
client.setData().forPath(path, "123".getBytes());
client.delete().forPath(path);
client.close();
這里NodeCache被標(biāo)識(shí)@Deprecated,也不知道被什么方式代替了,后面再研究。先簡(jiǎn)單使用。
到此這篇關(guān)于java連接zookeeper的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)java連接zookeeper內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java substring方法實(shí)現(xiàn)原理解析
這篇文章主要介紹了Java substring方法實(shí)現(xiàn)原理解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-05-05
Maven中央倉(cāng)庫(kù)發(fā)布的實(shí)現(xiàn)方法
最近做了個(gè)項(xiàng)目,希望能夠上傳到maven中央倉(cāng)庫(kù),給更多的人使用,于是就產(chǎn)生了這次項(xiàng)目發(fā)布經(jīng)歷。感興趣的可以一起來(lái)參考一下2021-06-06
Java實(shí)現(xiàn)讀取SFTP服務(wù)器指定目錄文件的方法
SFTP是一種在安全通道上傳輸文件的協(xié)議,它是基于SSH(Secure Shell)協(xié)議的擴(kuò)展,用于在客戶端和服務(wù)器之間進(jìn)行加密的文件傳輸,這篇文章主要介紹了Java實(shí)現(xiàn)讀取SFTP服務(wù)器指定目錄文件,感興趣的朋友跟隨小編一起看看吧2023-08-08
Spring--國(guó)內(nèi)Java程序員用得最多的框架
前幾年面試最常問的且可以順利拿到高薪的技能是Spring,隨著Spring體系的壯大,除非你在簡(jiǎn)歷上添加Spring Boot和Spring Cloud的技能,才可以打動(dòng)面試官,而現(xiàn)在,除非是Spring全家桶的實(shí)戰(zhàn)經(jīng)驗(yàn),否則難以讓面試官高看2021-06-06

