1、Executor.java
package com.neohope.zookeeper.test;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
/**
* Created by Hansen
*/
public class Executor implements Runnable, DataMonitor.DataMonitorListener
{
DataMonitor dm;
/**
* 构造函数
* @param hostPort host:port
* @param znode /xxx/yyy/zzz
*/
public Executor(String hostPort, String znode) throws KeeperException, IOException {
dm = new DataMonitor(hostPort, znode, null, this);
}
/**
* 线程函数,等待DataMonitor退出
* @see java.lang.Runnable
*/
@Override
public void run() {
try {
synchronized (this) {
while (!dm.bEnd) {
wait();
}
}
} catch (InterruptedException e) {
}
}
/**
* 关闭zk连接
* @see com.neohope.zookeeper.test.DataMonitor.DataMonitorListener
*/
@Override
public void znodeConnectionClosing(int rc) {
synchronized (this) {
notifyAll();
}
System.out.println("Connection is closing: "+ rc);
}
/**
* znode节点状态或连接状态发生变化
* @see com.neohope.zookeeper.test.DataMonitor.DataMonitorListener
*/
@Override
public void znodeStatusUpdate(byte[] data) {
if (data == null) {
System.out.println("data is null");
} else {
try {
String s = new String(data,"UTF-8");
System.out.println("data is "+s);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
/**
* 入口函数
* @param args
*/
public static void main(String[] args) throws IOException {
String hostPort = "localhost:2181";
String znode = "/neohope/test";
try {
new Executor(hostPort, znode).run();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2、DataMonitor.java
package com.neohope.zookeeper.test;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Arrays;
/**
* Created by Hansen
*/
public class DataMonitor implements Watcher, AsyncCallback.StatCallback {
ZooKeeper zk;
String znode;
Watcher chainedWatcher;
DataMonitorListener listener;
boolean bEnd;
byte prevData[];
/**
* 构造函数,并开始监视
* @param hostPort host:port
* @param znode /xxx/yyy/zzz
* @param chainedWatcher 传递事件到下一个Watcher
* @param listener 回调对象
*/
public DataMonitor(String hostPort, String znode, Watcher chainedWatcher,
DataMonitorListener listener) throws IOException {
this.zk = new ZooKeeper(hostPort, 30000, this);
this.znode = znode;
this.chainedWatcher = chainedWatcher;
this.listener = listener;
// 检查节点状态
zk.exists(znode, true, this, null);
}
/**
* exists回调函数
* @param event 发生的事件
* @see org.apache.zookeeper.Watcher
*/
@Override
public void process(WatchedEvent event) {
String path = event.getPath();
if (event.getType() == Event.EventType.None) {
// 连接状态发生变化
switch (event.getState()) {
case SyncConnected:
// 不需要做任何事情
break;
case Expired:
// 连接超时,关闭连接
System.out.println("SESSIONEXPIRED ending");
bEnd = true;
listener.znodeConnectionClosing(KeeperException.Code.SESSIONEXPIRED.intValue());
break;
}
} else {
//节点状态发生变化
if (path != null && path.equals(znode)) {
//检查节点状态
zk.exists(znode, true, this, null);
}
}
//传递事件
if (chainedWatcher != null) {
chainedWatcher.process(event);
}
}
/**
* exists回调函数
* @param rc zk返回值
* @param path 路径
* @param ctx Context
* @param stat 状态
*
* @see org.apache.zookeeper.AsyncCallback.StatCallback
*/
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
boolean exists = false;
if(rc== KeeperException.Code.OK.intValue()) {
//节点存在
exists = true;
}
else if(rc== KeeperException.Code.NONODE.intValue()){
//节点没有找到
exists = false;
}
else if(rc==KeeperException.Code.SESSIONEXPIRED.intValue() ){
//Session过期
bEnd = true;
System.out.println("SESSIONEXPIRED ending");
listener.znodeConnectionClosing(rc);
return;
}
else if( rc==KeeperException.Code.NOAUTH.intValue())
{
//授权问题
bEnd = true;
System.out.println("NOAUTH ending");
listener.znodeConnectionClosing(rc);
return;
}
else
{
//重试
zk.exists(znode, true, this, null);
return;
}
//获取数据
byte b[] = null;
if (exists) {
try {
b = zk.getData(znode, false, null);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
return;
}
}
//调用listener
if ((b == null && b != prevData)
|| (b != null && !Arrays.equals(prevData, b))) {
listener.znodeStatusUpdate(b);
prevData = b;
}
}
/**
* Other classes use the DataMonitor by implementing this method
*/
public interface DataMonitorListener {
/**
* znode节点状态或连接状态发生变化
*/
void znodeStatusUpdate(byte data[]);
/**
* 关闭zonde连接
*
* @param rc ZooKeeper返回值
*/
void znodeConnectionClosing(int rc);
}
}
3、运行Executor
4、运行zkCli.cmd
zkCli.cmd -server 127.0.0.1:2181
[zk: 127.0.0.1:2181(CONNECTED) 1] ls /
[zk: 127.0.0.1:2181(CONNECTED) 2] create /neohope/test test01
[zk: 127.0.0.1:2181(CONNECTED) 3] set /neohope/test test02
[zk: 127.0.0.1:2181(CONNECTED) 4] set /neohope/test test03
[zk: 127.0.0.1:2181(CONNECTED) 5] delete /neohope/test
[zk: 127.0.0.1:2181(CONNECTED) 6] quit
5、观察Executor的输出