Barrier主要用于ZooKeeper中的同步。
1、BarrierTest.java
package com.neohope.zookeeper.test; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.nio.charset.Charset; import java.util.List; /** * Created by Hansen */ public class BarrierTest implements Watcher, Runnable { static ZooKeeper zk = null; static Object mutex; String root; int size; String name; /** * 构造函数 * * @param hostPort * @param root * @param name * @param size */ BarrierTest(String hostPort, String root, String name, int size) { this.root = root; this.name = name; this.size = size; //创建连接 if (zk == null) { try { System.out.println("Begin Starting ZK:"); zk = new ZooKeeper(hostPort, 30000, this); mutex = new Object(); System.out.println("Finished starting ZK: " + zk); } catch (IOException e) { System.out.println(e.toString()); zk = null; } } // 创建barrier节点 if (zk != null) { try { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out.println("Keeper exception when instantiating queue: " + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception"); } } } /** * exists回调函数 * @param event 发生的事件 * @see org.apache.zookeeper.Watcher */ synchronized public void process(WatchedEvent event) { synchronized (mutex) { mutex.notify(); } } /** * 新建节点,并等待其他节点被新建 * * @return * @throws KeeperException * @throws InterruptedException */ boolean enter() throws KeeperException, InterruptedException{ zk.create(root + "/" + name, "Hi".getBytes(Charset.forName("UTF-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("Begin enter barier:" + name); while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() < size) { mutex.wait(); } else { System.out.println("Finished enter barier:" + name); return true; } } } } /** * 新建节点,并等待其他节点被新建 * * @return * @throws KeeperException * @throws InterruptedException */ boolean doSomeThing() { System.out.println("Begin doSomeThing:" + name); //do your job here System.out.println("Finished doSomeThing:" + name); return true; } /** * 删除自己的节点,并等待其他节点被删除 * * @return * @throws KeeperException * @throws InterruptedException */ boolean leave() throws KeeperException, InterruptedException{ zk.delete(root + "/" + name, -1); System.out.println("Begin leave barier:" + name); while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() > 0) { mutex.wait(); } else { System.out.println("Finished leave barier:" + name); return true; } } } } /** * 线程函数,等待DataMonitor退出 * @see java.lang.Runnable */ @Override public void run() { //进入barrier try { boolean flag = this.enter(); if (!flag) System.out.println("Error when entering the barrier"); } catch (KeeperException ex) { ex.printStackTrace(); } catch (InterruptedException ex) { ex.printStackTrace(); } //处理同步业务 try { doSomeThing(); Thread.sleep(1000); } catch (InterruptedException e) { } //离开barrier try { this.leave(); } catch (KeeperException ex) { ex.printStackTrace(); } catch (InterruptedException ex) { ex.printStackTrace(); } } /** * 入口函数 * @param args */ public static void main(String args[]) throws IOException { String hostPort = "localhost:2181"; String root = "/neohope/barrier"; try { new Thread(new BarrierTest("127.0.0.1:2181", root,"001", 1)).start(); new Thread(new BarrierTest("127.0.0.1:2181", root,"002", 2)).start(); new Thread(new BarrierTest("127.0.0.1:2181", root,"003", 3)).start(); } catch (Exception e) { e.printStackTrace(); } System.in.read(); } }
2、运行结果(由于Finished enter barier时,第一次同步已经结束了,所以是与Begin doSomeThing混在一起的)
Begin enter barier:001 Begin enter barier:003 Begin enter barier:002 Finished enter barier:001 Begin doSomeThing:001 Finished doSomeThing:001 Finished enter barier:002 Begin doSomeThing:002 Finished doSomeThing:002 Finished enter barier:003 Begin doSomeThing:003 Finished doSomeThing:003 Begin leave barier:002 Begin leave barier:001 Begin leave barier:003 Finished leave barier:002 Finished leave barier:003 Finished leave barier:001