package com.neohope.zookeeper.
test
;
import
org.apache.zookeeper.*;
import
org.apache.zookeeper.data.Stat;
import
java.io.IOException;
import
java.nio.charset.Charset;
import
java.util.List;
/**
* Created by Hansen
*/
public class BarrierTest implements Watcher, Runnable {
static ZooKeeper zk = null;
static Object mutex;
String root;
int size;
String name;
/**
* 构造函数
*
* @param hostPort
* @param root
* @param name
* @param size
*/
BarrierTest(String hostPort, String root, String name, int size) {
this.root = root;
this.name = name;
this.size = size;
//
创建连接
if
(zk == null) {
try {
System.out.println(
"Begin Starting ZK:"
);
zk = new ZooKeeper(hostPort, 30000, this);
mutex = new Object();
System.out.println(
"Finished starting ZK: "
+ zk);
} catch (IOException e) {
System.out.println(e.toString());
zk = null;
}
}
//
创建barrier节点
if
(zk != null) {
try {
Stat s = zk.exists(root,
false
);
if
(s == null) {
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out.println(
"Keeper exception when instantiating queue: "
+ e.toString());
} catch (InterruptedException e) {
System.out.println(
"Interrupted exception"
);
}
}
}
/**
* exists回调函数
* @param event 发生的事件
* @see org.apache.zookeeper.Watcher
*/
synchronized public void process(WatchedEvent event) {
synchronized (mutex) {
mutex.notify();
}
}
/**
* 新建节点,并等待其他节点被新建
*
* @
return
* @throws KeeperException
* @throws InterruptedException
*/
boolean enter() throws KeeperException, InterruptedException{
zk.create(root +
"/"
+ name,
"Hi"
.getBytes(Charset.forName(
"UTF-8"
)), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
System.out.println(
"Begin enter barier:"
+ name);
while
(
true
) {
synchronized (mutex) {
List<String> list = zk.getChildren(root,
true
);
if
(list.size() < size) {
mutex.wait();
}
else
{
System.out.println(
"Finished enter barier:"
+ name);
return
true
;
}
}
}
}
/**
* 新建节点,并等待其他节点被新建
*
* @
return
* @throws KeeperException
* @throws InterruptedException
*/
boolean doSomeThing()
{
System.out.println(
"Begin doSomeThing:"
+ name);
//do
your job here
System.out.println(
"Finished doSomeThing:"
+ name);
return
true
;
}
/**
* 删除自己的节点,并等待其他节点被删除
*
* @
return
* @throws KeeperException
* @throws InterruptedException
*/
boolean leave() throws KeeperException, InterruptedException{
zk.delete(root +
"/"
+ name, -1);
System.out.println(
"Begin leave barier:"
+ name);
while
(
true
) {
synchronized (mutex) {
List<String> list = zk.getChildren(root,
true
);
if
(list.size() > 0) {
mutex.wait();
}
else
{
System.out.println(
"Finished leave barier:"
+ name);
return
true
;
}
}
}
}
/**
* 线程函数,等待DataMonitor退出
* @see java.lang.Runnable
*/
@Override
public void run() {
//
进入barrier
try {
boolean flag = this.enter();
if
(!flag) System.out.println(
"Error when entering the barrier"
);
} catch (KeeperException ex) {
ex.printStackTrace();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
//
处理同步业务
try {
doSomeThing();
Thread.
sleep
(1000);
} catch (InterruptedException e) {
}
//
离开barrier
try {
this.leave();
} catch (KeeperException ex) {
ex.printStackTrace();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
/**
* 入口函数
* @param args
*/
public static void main(String args[]) throws IOException {
String hostPort =
"localhost:2181"
;
String root =
"/neohope/barrier"
;
try {
new Thread(new BarrierTest(
"127.0.0.1:2181"
, root,
"001"
, 1)).start();
new Thread(new BarrierTest(
"127.0.0.1:2181"
, root,
"002"
, 2)).start();
new Thread(new BarrierTest(
"127.0.0.1:2181"
, root,
"003"
, 3)).start();
} catch (Exception e) {
e.printStackTrace();
}
System.
in
.
read
();
}
}