apache zookeeper使用方法實例詳解
本文涉及了Apache Zookeeper使用方法實例詳解的相關(guān)知識,接下來我們就看看具體內(nèi)容。
簡介
Apache Zookeeper 是由 Apache Hadoop 的 Zookeeper 子項目發(fā)展而來,現(xiàn)在已經(jīng)成為了 Apache 的頂級項目。Zookeeper 為分布式系統(tǒng)提供了高效可靠且易于使用的協(xié)同服務(wù),它可以為分布式應(yīng)用提供相當(dāng)多的服務(wù),諸如統(tǒng)一命名服務(wù),配置管理,狀態(tài)同步和組服務(wù)等。 Zookeeper 接口簡單,開發(fā)人員不必過多地糾結(jié)在分布式系統(tǒng)編程難于處理的同步和一致性問題上,你可以使用 Zookeeper 提供的現(xiàn)成(off-the-shelf)服務(wù)來實現(xiàn)分布式系統(tǒng)的配置管理,組管理,Leader 選舉等功能。
英文原文地址:http://zookeeper.apache.org/doc/current/javaExample.html
一個簡單的 Zookeeper Watch 客戶端
為了介紹 Zookeeper Java API 的基本用法,本文將帶你如何一步一步實現(xiàn)一個功能簡單的 Zookeeper 客戶端。該 Zookeeper 客戶端會監(jiān)視一個你指定 Zookeeper 節(jié)點 Znode, 當(dāng)被監(jiān)視的節(jié)點發(fā)生變化時,客戶端會啟動或者停止某一程序。
基本要求
該客戶端具備四個基本要求:
(1)客戶端所帶參數(shù):
(2)Zookeeper 服務(wù)地址。
(3)被監(jiān)視的 Znode 節(jié)點名稱。
(4)可執(zhí)行程序及其所帶的參數(shù)
客戶端會獲取被監(jiān)視 Znode 節(jié)點的數(shù)據(jù)并啟動你所指定的可執(zhí)行程序。如果被監(jiān)視的 Znode 節(jié)點發(fā)生改變,客戶端重新獲取其內(nèi)容并再次啟動你所指定的可執(zhí)行程序。如果被監(jiān)視的 Znode 節(jié)點消失,客戶端會殺死可執(zhí)行程序。
程序設(shè)計
一般而言,Zookeeper 應(yīng)用程序分為兩部分,其中一部分維護(hù)與服務(wù)器端的連接,另外一部分監(jiān)視 Znode 節(jié)點的數(shù)據(jù)。在本程序中,Executor 類負(fù)責(zé)維護(hù) Zookeeper 連接,DataMonitor 類監(jiān)視 Zookeeper 目錄樹中的數(shù)據(jù), 同時,Executor 包含了主線程和程序主要的執(zhí)行邏輯,它負(fù)責(zé)少量的用戶交互,以及與可執(zhí)行程序的交互,該可執(zhí)行程序接受你向它傳入的參數(shù),并且會根據(jù)被監(jiān)視的 Znode 節(jié)點的狀態(tài)變化停止或重啟。
Executor類
Executor 對象是本例程最基本的“容器”,它包括Zookeeper 對象和DataMonitor對象。
public static void main(String[] args) {
if (args.length < 4) {
System.err
.println("USAGE: Executor hostPort znode filename program [args ...]");
System.exit(2);
}
String hostPort = args[0];
String znode = args[1];
String filename = args[2];
String exec[] = new String[args.length - 3];
System.arraycopy(args, 3, exec, 0, exec.length);
try {
new Executor(hostPort, znode, filename, exec).run();
} catch (Exception e) {
e.printStackTrace();
}
}
public Executor(String hostPort, String znode, String filename,
String exec[]) throws KeeperException, IOException {
this.filename = filename;
this.exec = exec;
zk = new ZooKeeper(hostPort, 3000, this);
dm = new DataMonitor(zk, znode, null, this);
}
public void run() {
try {
synchronized (this) {
while (!dm.dead) {
wait();
}
}
} catch (InterruptedException e) {
}
}
回憶一下 Executor 的任務(wù)是根據(jù) Zookeeper 中 Znode 節(jié)點狀態(tài)改變所觸發(fā)的事件來啟動和停止你在命令行指定的可執(zhí)行程序, 在上面的代碼你可以看到,Executor 類在其構(gòu)造函數(shù)中實例化 Zookeeper 對象時,將其自身的引用作為 Watch 參數(shù)傳遞給 Zookeeper 的構(gòu)造函數(shù),同時它也將其自身的引用作為 DataMonitorListener 參數(shù)傳遞給 DataMonitor 的構(gòu)造函數(shù)。Executor 本身實現(xiàn)了以下接口:
public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener {
...
Watcher 接口是在ZooKeeper Java API 中定義的。 ZooKeeper 用它來與“容器”(此處“容器”與上面的 Executor 類相似)進(jìn)行通信,Watcher 只支持一個方法,即process(), ZooKeeper 用該函數(shù)來處理主線程可能感興趣的事件,例如 Zookeeper 連接或會話的狀態(tài),本例中的“容器” Executor只是簡單地把事件向下傳遞給 DataMonitor,具體如何處理事件是由 DataMonitor 決定的。本文只是簡單地描述了如何使用 Watcher,通常情況下,Executor 或 與 Executor 類似的對象擁有 與Zookeeper 服務(wù)端的連接,但它可以將事件傳遞給其他對象,并有其它的對象處理該事件。
public void process(WatchedEvent event) {
dm.process(event);
}
DataMonitorListener 接口本身不是Zookeeper API 的一部分,它完全是一個自定義的接口,可以說是專門為本程序設(shè)計的。DataMonitor 對象使用該接口和“容器”(即 Executor 類)進(jìn)行通信,DataMonitorListener 接口如下:
public interface DataMonitorListener {
/**
* The existence status of the node has changed.
*/
void exists(byte data[]);
/**
* The ZooKeeper session is no longer valid.
*
* @param rc
* the ZooKeeper reason code
*/
void closing(int rc);
}
該接口在 DataMonitor 中定義,Executor 類實現(xiàn)該接口,當(dāng) Executor.exists() 被調(diào)用的時候,Executor 決定是否啟動或停止事先指定的應(yīng)用程序(回憶一下前文所說的,當(dāng) Znode 消失時 Zookeeper 客戶端會殺死該可執(zhí)行程序)。
當(dāng) Executor.closing() 被調(diào)用的時候,Executor 會根據(jù) Zookeeper 連接永久性地消失來決定是否關(guān)閉自己。
你或許已經(jīng)猜到,DataMonitor 對象根據(jù) Zookeeper 狀態(tài)變化來調(diào)用這些方法吧?
以下是 Executor 類中實現(xiàn) DataMonitorListener.exists() 和 DataMonitorListener.closing()的代碼:
public void exists( byte[] data ) {
if (data == null) {
if (child != null) {
System.out.println("Killing process");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
}
}
child = null;
} else {
if (child != null) {
System.out.println("Stopping child");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
FileOutputStream fos = new FileOutputStream(filename);
fos.write(data);
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
System.out.println("Starting child");
child = Runtime.getRuntime().exec(exec);
new StreamWriter(child.getInputStream(), System.out);
new StreamWriter(child.getErrorStream(), System.err);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void closing(int rc) {
synchronized (this) {
notifyAll();
}
}
DataMonitor 類
DataMonitor 類是本程序 Zookeeper 邏輯的核心, 它差不多是異步的,并由事件驅(qū)動的。DataMonitor 構(gòu)造函數(shù)如下:
public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
DataMonitorListener listener) {
this.zk = zk;
this.znode = znode;
this.chainedWatcher = chainedWatcher;
this.listener = listener;
// Get things started by checking if the node exists. We are going
// to be completely event driven
zk.exists(znode, true, this, null);
}
調(diào)用 ZooKeeper.exists() 檢查指定的 Znode 是否存在,并設(shè)置監(jiān)視,傳遞自身引用作為回調(diào)對象,在某種意義上,在 watch 觸發(fā)時就會引起真實的處理流程。
當(dāng) ZooKeeper.exists() 操作在服務(wù)器端完成時,ZooKeeper API 會在客戶端調(diào)用 completion callback:
public void processResult(int rc, String path, Object ctx, Stat stat) {
boolean exists;
switch (rc) {
case Code.Ok:
exists = true;
break;
case Code.NoNode:
exists = false;
break;
case Code.SessionExpired:
case Code.NoAuth:
dead = true;
listener.closing(rc);
return;
default:
// Retry errors
zk.exists(znode, true, this, null);
return;
}
byte b[] = null;
if (exists) {
try {
b = zk.getData(znode, false, null);
} catch (KeeperException e) {
// We don't need to worry about recovering now. The watch
// callbacks will kick off any exception handling
e.printStackTrace();
} catch (InterruptedException e) {
return;
}
}
if ((b == null && b != prevData)
|| (b != null && !Arrays.equals(prevData, b))) {
listener.exists(b);
prevData = b;
}
}
上述代碼首先檢查 Znode 是否存在,以及其他重大的不可恢復(fù)的錯誤。如果文件(或者Znode)存在,它將從 Znode 獲取數(shù)據(jù),如果狀態(tài)發(fā)生變化再調(diào)用 Executor 的 exists() 回調(diào)函數(shù)。注意,getData 函數(shù)本省必須要做任何的異常處理,因為本身就有監(jiān)視可以處理任何錯誤:如果節(jié)點在調(diào)用 ZooKeeper.getData() 之前被刪除,ZooKeeper.exists() 就會觸發(fā)回調(diào)函數(shù),如果存在通信錯誤,在連接上的監(jiān)視會在該連接重建之前觸發(fā)相應(yīng)的事件,同時引發(fā)相應(yīng)的處理。
最后,DataMonitor 處理監(jiān)視事件的代碼如下:
public void process(WatchedEvent event) {
String path = event.getPath();
if (event.getType() == Event.EventType.None) {
// We are are being told that the state of the
// connection has changed
switch (event.getState()) {
case SyncConnected:
// In this particular example we don't need to do anything
// here - watches are automatically re-registered with
// server and any watches triggered while the client was
// disconnected will be delivered (in order of course)
break;
case Expired:
// It's all over
dead = true;
listener.closing(KeeperException.Code.SessionExpired);
break;
}
} else {
if (path != null && path.equals(znode)) {
// Something has changed on the node, let's find out
zk.exists(znode, true, this, null);
}
}
if (chainedWatcher != null) {
chainedWatcher.process(event);
}
}
如果客戶端 Zookeeper 程序在會話失效時(Expired event)重新建立了通信信道(SyncConnected event) ,所有的會話監(jiān)視會自動和服務(wù)器進(jìn)行重連, (Zookeeper 3.0.0以上版本會重置之前設(shè)置的監(jiān)視). 更多編程指南請參見 ZooKeeper Watches 。 當(dāng) DataMonitor 獲得了指定 Znode 的事件后,它將調(diào)用 ZooKeeper.exists() 來決定究竟發(fā)生了什么。
完整的程序:
Executor.java:
/**
* A simple example program to use DataMonitor to start and
* stop executables based on a znode. The program watches the
* specified znode and saves the data that corresponds to the
* znode in the filesystem. It also starts the specified program
* with the specified arguments when the znode exists and kills
* the program if the znode goes away.
*/
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class Executor
implements Watcher, Runnable, DataMonitor.DataMonitorListener
{
String znode;
DataMonitor dm;
ZooKeeper zk;
String filename;
String exec[];
Process child;
public Executor(String hostPort, String znode, String filename,
String exec[]) throws KeeperException, IOException {
this.filename = filename;
this.exec = exec;
zk = new ZooKeeper(hostPort, 3000, this);
dm = new DataMonitor(zk, znode, null, this);
}
/**
* @param args
*/
public static void main(String[] args) {
if (args.length < 4) {
System.err
.println("USAGE: Executor hostPort znode filename program [args ...]");
System.exit(2);
}
String hostPort = args[0];
String znode = args[1];
String filename = args[2];
String exec[] = new String[args.length - 3];
System.arraycopy(args, 3, exec, 0, exec.length);
try {
new Executor(hostPort, znode, filename, exec).run();
} catch (Exception e) {
e.printStackTrace();
}
}
/***************************************************************************
* We do process any events ourselves, we just need to forward them on.
*
* @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent)
*/
public void process(WatchedEvent event) {
dm.process(event);
}
public void run() {
try {
synchronized (this) {
while (!dm.dead) {
wait();
}
}
} catch (InterruptedException e) {
}
}
public void closing(int rc) {
synchronized (this) {
notifyAll();
}
}
static class StreamWriter extends Thread {
OutputStream os;
InputStream is;
StreamWriter(InputStream is, OutputStream os) {
this.is = is;
this.os = os;
start();
}
public void run() {
byte b[] = new byte[80];
int rc;
try {
while ((rc = is.read(b)) > 0) {
os.write(b, 0, rc);
}
} catch (IOException e) {
}
}
}
public void exists(byte[] data) {
if (data == null) {
if (child != null) {
System.out.println("Killing process");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
}
}
child = null;
} else {
if (child != null) {
System.out.println("Stopping child");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
FileOutputStream fos = new FileOutputStream(filename);
fos.write(data);
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
System.out.println("Starting child");
child = Runtime.getRuntime().exec(exec);
new StreamWriter(child.getInputStream(), System.out);
new StreamWriter(child.getErrorStream(), System.err);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
DataMonitor.java:
/**
* A simple class that monitors the data and existence of a ZooKeeper
* node. It uses asynchronous ZooKeeper APIs.
*/
import java.util.Arrays;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
public class DataMonitor implements Watcher, StatCallback {
ZooKeeper zk;
String znode;
Watcher chainedWatcher;
boolean dead;
DataMonitorListener listener;
byte prevData[];
public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
DataMonitorListener listener) {
this.zk = zk;
this.znode = znode;
this.chainedWatcher = chainedWatcher;
this.listener = listener;
// Get things started by checking if the node exists. We are going
// to be completely event driven
zk.exists(znode, true, this, null);
}
/**
* Other classes use the DataMonitor by implementing this method
*/
public interface DataMonitorListener {
/**
* The existence status of the node has changed.
*/
void exists(byte data[]);
/**
* The ZooKeeper session is no longer valid.
*
* @param rc
* the ZooKeeper reason code
*/
void closing(int rc);
}
public void process(WatchedEvent event) {
String path = event.getPath();
if (event.getType() == Event.EventType.None) {
// We are are being told that the state of the
// connection has changed
switch (event.getState()) {
case SyncConnected:
// In this particular example we don't need to do anything
// here - watches are automatically re-registered with
// server and any watches triggered while the client was
// disconnected will be delivered (in order of course)
break;
case Expired:
// It's all over
dead = true;
listener.closing(KeeperException.Code.SessionExpired);
break;
}
} else {
if (path != null && path.equals(znode)) {
// Something has changed on the node, let's find out
zk.exists(znode, true, this, null);
}
}
if (chainedWatcher != null) {
chainedWatcher.process(event);
}
}
public void processResult(int rc, String path, Object ctx, Stat stat) {
boolean exists;
switch (rc) {
case Code.Ok:
exists = true;
break;
case Code.NoNode:
exists = false;
break;
case Code.SessionExpired:
case Code.NoAuth:
dead = true;
listener.closing(rc);
return;
default:
// Retry errors
zk.exists(znode, true, this, null);
return;
}
byte b[] = null;
if (exists) {
try {
b = zk.getData(znode, false, null);
} catch (KeeperException e) {
// We don't need to worry about recovering now. The watch
// callbacks will kick off any exception handling
e.printStackTrace();
} catch (InterruptedException e) {
return;
}
}
if ((b == null && b != prevData)
|| (b != null && !Arrays.equals(prevData, b))) {
listener.exists(b);
prevData = b;
}
}
}
總結(jié)
本文關(guān)于Apache Zookeeper使用方法實例詳解的介紹就到這里,希望對大家有所幫助。如果有什么問題可以留言,小編會及時回復(fù)大家的,感謝大家對腳本之家網(wǎng)站的支持!
相關(guān)文章
Spring定時任務(wù)關(guān)于@EnableScheduling的用法解析
關(guān)于request.getRequestDispatcher().forward()的妙用及DispatcherType
java利用JEXL實現(xiàn)動態(tài)表達(dá)式編譯

