ZooKeeper配置集群

以在同一台机器上的三个节点的集群为例:

1、在每个节点的zoo.cfg增加下面的配置(只给出了变动的部分)

dataDir=D:/Publish/ZooKeeper/node01
clientPort=2181
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2888:3888
dataDir=D:/Publish/ZooKeeper/node02
clientPort=2182
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2888:3888
dataDir=D:/Publish/ZooKeeper/node03
clientPort=2183
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2888:3888

2、在每个dataDir增加一个myid文件,内容分别为1,2,3

3、现在可以启动哦

4、如果是在不同的服务器上,则dataDir、clientPort及2888:3888都不需要变动,localhost换成对应的计算机名称或ip即可。我这里是在一台电脑上运行的,所以要避免路径及端口冲突。

ZooKeeper Queue(Java)

Queue实现了生产者——消费者模式。

1、QueueTest.java

package com.neohope.zookeeper.test;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;

/**
 * Created by Hansen
 */
public class QueueTest implements Watcher {
    static ZooKeeper zk = null;
    static Object mutex;
    private String root;

    /**
     * 构造函数
     * @param hostPort
     * @param name
     */
    QueueTest(String hostPort, String name) {
        this.root = name;

        //创建连接
        if (zk == null) {
            try {
                System.out.println("Starting ZK:");
                zk = new ZooKeeper(hostPort, 30000, this);
                mutex = new Object();
                System.out.println("Finished starting ZK: " + zk);
            } catch (IOException e) {
                System.out.println(e.toString());
                zk = null;
            }

            // 创建root节点
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out.println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }
        }
    }

    /**
     * exists回调函数
     * @param event     发生的事件
     * @see org.apache.zookeeper.Watcher
     */
    synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
            mutex.notify();
        }
    }

    /**
     * 添加任务队列
     * @param i
     * @return
     */
    boolean produce(int i) throws KeeperException, InterruptedException {
        String s = "element"+i;
        zk.create(root + "/element", s.getBytes(Charset.forName("UTF-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT_SEQUENTIAL);

        return true;
    }


    /**
     * 从任务队列获取任务
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    int consume() throws KeeperException, InterruptedException {
        Stat stat = null;

        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);
                if (list.size() == 0) {
                    System.out.println("Going to wait");
                    mutex.wait();
                } else {
                    //首先进行排序,找到id最小的任务编号
                    Integer min = Integer.MAX_VALUE;
                    for (String s : list) {
                        Integer tempValue = new Integer(s.substring(7));
                        if (tempValue < min) min = tempValue;
                    }

                    //从节点获取任务,处理,并删除节点
                    System.out.println("Processing task: " + root + "/element" + padLeft(min));
                    byte[] buff = zk.getData(root + "/element" + padLeft(min), false, stat);
                    System.out.println("The value in task is: " + new String(buff));
                    zk.delete(root + "/element" + padLeft(min), -1);

                    return min;
                }
            }
        }
    }

    /**
     * 格式化数字字符串
     * @param num
     */
    public static String padLeft(int num) {
        return String.format("%010d", num);
    }

    /**
     * 入口函数
     * @param args
     */
    public static void main(String args[]) {
        String hostPort = "localhost:2181";
        String root = "/neohope/queue";
        int max = 10;
        QueueTest q = new QueueTest(hostPort, root);

        for (int i = 0; i < max; i++) {
            try {
                q.produce(i);
            } catch (KeeperException e) {

            } catch (InterruptedException e) {
            }
        }

        for (int i = 0; i < max; i++) {
            try {
                int r = q.consume();
                System.out.println("Item: " + r);
            } catch (KeeperException ex) {
                ex.printStackTrace();
                break;
            } catch (InterruptedException ex) {
                ex.printStackTrace();
                break;
            }
        }
    }
}

2、尝试运行一下。

ZooKeeper Barrier(Java)

Barrier主要用于ZooKeeper中的同步。

1、BarrierTest.java

package com.neohope.zookeeper.test;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;

/**
 * Created by Hansen
 */
public class BarrierTest implements Watcher, Runnable {
    static ZooKeeper zk = null;
    static Object mutex;
    String root;
    int size;
    String name;

    /**
     * 构造函数
     *
     * @param hostPort
     * @param root
     * @param name
     * @param size
     */
    BarrierTest(String hostPort, String root, String name, int size) {
        this.root = root;
        this.name = name;
        this.size = size;

        //创建连接
        if (zk == null) {
            try {
                System.out.println("Begin Starting ZK:");
                zk = new ZooKeeper(hostPort, 30000, this);
                mutex = new Object();
                System.out.println("Finished starting ZK: " + zk);
            } catch (IOException e) {
                System.out.println(e.toString());
                zk = null;
            }
        }

        // 创建barrier节点
        if (zk != null) {
            try {
                Stat s = zk.exists(root, false);
                if (s == null) {
                    zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                            CreateMode.PERSISTENT);
                }
            } catch (KeeperException e) {
                System.out.println("Keeper exception when instantiating queue: "
                                + e.toString());
            } catch (InterruptedException e) {
                System.out.println("Interrupted exception");
            }
        }
    }

    /**
     * exists回调函数
     * @param event     发生的事件
     * @see org.apache.zookeeper.Watcher
     */
    synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
            mutex.notify();
        }
    }

    /**
     * 新建节点,并等待其他节点被新建
     *
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    boolean enter() throws KeeperException, InterruptedException{
        zk.create(root + "/" + name, "Hi".getBytes(Charset.forName("UTF-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL);

        System.out.println("Begin enter barier:" + name);
        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);

                if (list.size() < size) {
                    mutex.wait();
                } else {
                    System.out.println("Finished enter barier:" + name);
                    return true;
                }
            }
        }
    }


    /**
     * 新建节点,并等待其他节点被新建
     *
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    boolean doSomeThing()
    {
        System.out.println("Begin doSomeThing:" + name);
        //do your job here
        System.out.println("Finished doSomeThing:" + name);
        return true;
    }

    /**
     * 删除自己的节点,并等待其他节点被删除
     *
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */

    boolean leave() throws KeeperException, InterruptedException{
        zk.delete(root + "/" + name, -1);

        System.out.println("Begin leave barier:" + name);
        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);
                if (list.size() > 0) {
                    mutex.wait();
                } else {
                    System.out.println("Finished leave barier:" + name);
                    return true;
                }
            }
        }
    }

    /**
     * 线程函数,等待DataMonitor退出
     * @see java.lang.Runnable
     */
    @Override
    public void run() {
        //进入barrier
        try {
            boolean flag = this.enter();
            if (!flag) System.out.println("Error when entering the barrier");
        } catch (KeeperException ex) {
            ex.printStackTrace();
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }

        //处理同步业务
        try {
            doSomeThing();
            Thread.sleep(1000);
        } catch (InterruptedException e) {

        }

        //离开barrier
        try {
            this.leave();
        } catch (KeeperException ex) {
            ex.printStackTrace();
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }

    /**
     * 入口函数
     * @param args
     */
    public static void main(String args[]) throws IOException {
        String hostPort = "localhost:2181";
        String root = "/neohope/barrier";

        try {
            new Thread(new BarrierTest("127.0.0.1:2181", root,"001", 1)).start();
            new Thread(new BarrierTest("127.0.0.1:2181", root,"002", 2)).start();
            new Thread(new BarrierTest("127.0.0.1:2181", root,"003", 3)).start();
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.in.read();
    }
}

2、运行结果(由于Finished enter barier时,第一次同步已经结束了,所以是与Begin doSomeThing混在一起的)

Begin enter barier:001
Begin enter barier:003
Begin enter barier:002

Finished enter barier:001
Begin doSomeThing:001
Finished doSomeThing:001
Finished enter barier:002
Begin doSomeThing:002
Finished doSomeThing:002
Finished enter barier:003
Begin doSomeThing:003
Finished doSomeThing:003

Begin leave barier:002
Begin leave barier:001
Begin leave barier:003
Finished leave barier:002
Finished leave barier:003
Finished leave barier:001

ZooKeeper DataPublisher(Java)

1、DataPublisher.java

package com.neohope.zookeeper.test;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;

/**
 * Created by Hansen
 */
public class DataPublisher {

    public void publishTest(String hostPort,String znode) throws IOException, KeeperException, InterruptedException {
        ZooKeeper zk = new ZooKeeper("localhost:2181", 30000, new Watcher() {
            public void process(WatchedEvent event) {
                //do nothing
            }});

        //删掉节点
        Stat stat =zk.exists(znode, false);
        if(stat!=null)
        {
            zk.delete(znode, -1);
        }

        //开始测试
        zk.create(znode,"test01".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        byte[] buff =zk.getData(znode, false, null);
        System.out.println("data is " + new String(buff,"UTF-8"));
        zk.setData(znode,"test02".getBytes(), -1);
        buff = zk.getData(znode, false, null);
        System.out.println("data is " + new String(buff,"UTF-8"));
        zk.delete(znode, -1);
        zk.close();
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        String hostPort = "localhost:2181";
        String znode = "/neohope/test";

        DataPublisher publisher = new DataPublisher();
        publisher.publishTest(hostPort,znode);
    }
}

2、与Zookeeper Watcher配合使用,试一下。

ZooKeeper Watcher(Java)

1、Executor.java

package com.neohope.zookeeper.test;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

/**
 * Created by Hansen
 */
public class Executor implements Runnable, DataMonitor.DataMonitorListener
{
    DataMonitor dm;

    /**
     * 构造函数
     * @param hostPort  host:port
     * @param znode      /xxx/yyy/zzz
     */
    public Executor(String hostPort, String znode) throws KeeperException, IOException {
        dm = new DataMonitor(hostPort, znode, null, this);
    }

    /**
     * 线程函数,等待DataMonitor退出
     * @see java.lang.Runnable
     */
    @Override
    public void run() {
        try {
            synchronized (this) {
                while (!dm.bEnd) {
                    wait();
                }
            }
        } catch (InterruptedException e) {
        }
    }

    /**
     * 关闭zk连接
     * @see com.neohope.zookeeper.test.DataMonitor.DataMonitorListener
     */
    @Override
    public void znodeConnectionClosing(int rc) {
        synchronized (this) {
            notifyAll();
        }

        System.out.println("Connection is closing: "+ rc);
    }

    /**
     * znode节点状态或连接状态发生变化
     * @see com.neohope.zookeeper.test.DataMonitor.DataMonitorListener
     */
    @Override
    public void znodeStatusUpdate(byte[] data) {
        if (data == null) {
            System.out.println("data is null");
        } else {
            try {
                String s = new String(data,"UTF-8");
                System.out.println("data is "+s);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 入口函数
     * @param args
     */
    public static void main(String[] args) throws IOException {
        String hostPort = "localhost:2181";
        String znode = "/neohope/test";

        try {
            new Executor(hostPort, znode).run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2、DataMonitor.java

package com.neohope.zookeeper.test;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Arrays;

/**
 * Created by Hansen
 */
public class DataMonitor implements Watcher, AsyncCallback.StatCallback {
    ZooKeeper zk;
    String znode;
    Watcher chainedWatcher;
    DataMonitorListener listener;

    boolean bEnd;
    byte prevData[];

    /**
     * 构造函数,并开始监视
     * @param hostPort          host:port
     * @param znode              /xxx/yyy/zzz
     * @param chainedWatcher   传递事件到下一个Watcher
     * @param listener          回调对象
     */
    public DataMonitor(String hostPort, String znode, Watcher chainedWatcher,
                       DataMonitorListener listener) throws IOException {
        this.zk = new ZooKeeper(hostPort, 30000, this);
        this.znode = znode;
        this.chainedWatcher = chainedWatcher;
        this.listener = listener;

        // 检查节点状态
        zk.exists(znode, true, this, null);
    }

    /**
     * exists回调函数
     * @param event     发生的事件
     * @see org.apache.zookeeper.Watcher
     */
    @Override
    public void process(WatchedEvent event) {
        String path = event.getPath();
        if (event.getType() == Event.EventType.None) {
            // 连接状态发生变化
            switch (event.getState()) {
                case SyncConnected:
                    // 不需要做任何事情
                    break;
                case Expired:
                    // 连接超时,关闭连接
                    System.out.println("SESSIONEXPIRED ending");
                    bEnd = true;
                    listener.znodeConnectionClosing(KeeperException.Code.SESSIONEXPIRED.intValue());
                    break;
            }
        } else {
            //节点状态发生变化
            if (path != null && path.equals(znode)) {
                //检查节点状态
                zk.exists(znode, true, this, null);
            }
        }

        //传递事件
        if (chainedWatcher != null) {
            chainedWatcher.process(event);
        }
    }

    /**
     * exists回调函数
     * @param rc     zk返回值
     * @param path   路径
     * @param ctx    Context
     * @param stat   状态
     *
     * @see org.apache.zookeeper.AsyncCallback.StatCallback
     */
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        boolean exists = false;
        if(rc== KeeperException.Code.OK.intValue()) {
            //节点存在
            exists = true;
        }
        else if(rc== KeeperException.Code.NONODE.intValue()){
            //节点没有找到
            exists = false;
        }
        else if(rc==KeeperException.Code.SESSIONEXPIRED.intValue() ){
            //Session过期
            bEnd = true;
            System.out.println("SESSIONEXPIRED ending");
            listener.znodeConnectionClosing(rc);
            return;
        }
        else if( rc==KeeperException.Code.NOAUTH.intValue())
        {
            //授权问题
            bEnd = true;
            System.out.println("NOAUTH ending");
            listener.znodeConnectionClosing(rc);
            return;
        }
        else
        {
            //重试
            zk.exists(znode, true, this, null);
            return;
        }

        //获取数据
        byte b[] = null;
        if (exists) {
            try {
                b = zk.getData(znode, false, null);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                return;
            }
        }
        //调用listener
        if ((b == null && b != prevData)
                || (b != null && !Arrays.equals(prevData, b))) {
            listener.znodeStatusUpdate(b);
            prevData = b;
        }
    }

    /**
     * Other classes use the DataMonitor by implementing this method
     */
    public interface DataMonitorListener {
        /**
         * znode节点状态或连接状态发生变化
         */
        void znodeStatusUpdate(byte data[]);

        /**
         * 关闭zonde连接
         *
         * @param rc ZooKeeper返回值
         */
        void znodeConnectionClosing(int rc);
    }
}

3、运行Executor

4、运行zkCli.cmd

zkCli.cmd -server 127.0.0.1:2181
[zk: 127.0.0.1:2181(CONNECTED) 1] ls /
[zk: 127.0.0.1:2181(CONNECTED) 2] create /neohope/test test01
[zk: 127.0.0.1:2181(CONNECTED) 3] set /neohope/test test02
[zk: 127.0.0.1:2181(CONNECTED) 4] set /neohope/test test03
[zk: 127.0.0.1:2181(CONNECTED) 5] delete /neohope/test
[zk: 127.0.0.1:2181(CONNECTED) 6] quit

5、观察Executor的输出

ZooKeeper增删改查(Shell)

1、语法

ZooKeeper -server host:port cmd args
        connect host:port
        get path [watch]
        ls path [watch]
        set path data [version]
        rmr path
        delquota [-n|-b] path
        quit
        printwatches on|off
        create [-s] [-e] path data acl
        stat path [watch]
        close
        ls2 path [watch]
        history
        listquota path
        setAcl path acl
        getAcl path
        sync path
        redo cmdno
        addauth scheme auth
        delete path [version]
        setquota -n|-b val path

2、示例

zkCli.cmd -server 127.0.0.1:2181

[zk: 127.0.0.1:2181(CONNECTED) 1] ls /
[zookeeper]

[zk: 127.0.0.1:2181(CONNECTED) 2] create /zktest test01
Created /zktest

[zk: 127.0.0.1:2181(CONNECTED) 3] ls /
[zktest, zookeeper]

[zk: 127.0.0.1:2181(CONNECTED) 4] get /zktest
test01
cZxid = 0x4
ctime = Sun May 01 09:34:09 CST 2016
mZxid = 0x4
mtime = Sun May 01 09:34:09 CST 2016
pZxid = 0x4
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0

[zk: 127.0.0.1:2181(CONNECTED) 5] set /zktest test02
cZxid = 0x4
ctime = Sun May 01 09:34:09 CST 2016
mZxid = 0x5
mtime = Sun May 01 09:37:16 CST 2016
pZxid = 0x4
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0

[zk: 127.0.0.1:2181(CONNECTED) 4] get /zktest
test02
cZxid = 0x4
ctime = Sun May 01 09:34:09 CST 2016
mZxid = 0x5
mtime = Sun May 01 09:37:16 CST 2016
pZxid = 0x4
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0

[zk: 127.0.0.1:2181(CONNECTED) 7] delete /zktest

[zk: 127.0.0.1:2181(CONNECTED) 8] ls /
[zookeeper]

[zk: 127.0.0.1:2181(CONNECTED) 9] quit
Quitting...

使用Dubbo实现RPC简单示例02

简单的Dubbo RPC调用描述如下:
dubbo-arch

复杂的Dubbo RPC调用描述如下:
dubbo-arch-ext

在这里我们先用zookeeper来做registry,做一个简单的例子:

一、首先是zookeeper:
1、到zookeeper进行下载
2、解压
3、拷贝conf/zoo_sample.cfg到conf/zoo.cfg,然后按需要修改配置
4、双击bin/zkServer.cmd

二、然后是接口定义,服务端和接口端都会用到,最好打一个jar包,防止错误修改:
IJustATest.java

package com.neohope.dubbo.test;

public interface IJustATest {
    public String SayHelloTo(String name);
    public int Add(int a, int b);
}

三、再就是服务端实现:
1、新建java程序,添加mvn功能,引用dubbo-x.x.x.jar

2、服务实现类MyDubboService.java

package com.neohope.dubbo.test;

public class MyDubboService implements IJustATest{

    public String SayHelloTo(String name) {
        return "Hello " + name;
    }

    public int Add(int a, int b) {
        return a+b;
    }
}

3、服务注册
ServceTest.java

package com.neohope.dubbo.test;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ServceTest {
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] {"applicationContext.xml"});
        context.start();

        System.in.read();
    }
}

4、spring配置

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://code.alibabatech.com/schema/dubbo
            http://code.alibabatech.com/schema/dubbo/dubbo.xsd
            ">

    <!-- 具体的实现bean -->
    <bean id="myDubboService" class="com.neohope.dubbo.test.MyDubboService" />

    <!-- 提供方应用信息,用于计算依赖关系 -->
    <dubbo:application name="neo_service_provider"  />

    <!-- 使用zookeeper注册中心暴露服务地址 -->
    <dubbo:registry address="zookeeper://127.0.0.1:2181" />

    <!-- 用dubbo协议在20880端口暴露服务 -->
    <dubbo:protocol name="dubbo" port="20880" />

    <!-- 声明需要暴露的服务接口 -->
    <dubbo:service interface="com.neohope.dubbo.test.IJustATest" ref="myDubboService" />

</beans>

5、编译运行

四、最后是客户端实现:
1、新建java程序,添加mvn功能,引用dubbo-x.x.x.jar

2、服务调用
ServceTest.java

package com.neohope.dubbo.test;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ClientTest {
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"applicationContext.xml"});
        context.start();

        IJustATest proxy = (IJustATest) context.getBean("myDubboService");
        System.out.println(proxy.SayHelloTo("neohope")) ;
        System.out.println(proxy.Add(1,2)) ;

        System.in.read();
    }
}

3、spring配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd  
            http://code.alibabatech.com/schema/dubbo  
            http://code.alibabatech.com/schema/dubbo/dubbo.xsd  
            ">

    <!-- 消费方应用名,用于计算依赖关系,不是匹配条件,不要与提供方一样 -->
    <dubbo:application name="neo_service_consumer" />

    <!-- 使用zookeeper注册中心暴露服务地址 -->
    <dubbo:registry address="zookeeper://127.0.0.1:2181" />

    <!-- 生成远程服务代理,可以像使用本地bean一样使用demoService -->
    <dubbo:reference id="myDubboService" interface="com.neohope.dubbo.test.IJustATest" />

</beans>

4、编译运行

使用Dubbo实现RPC简单示例01

由于需要用到dubbo-admin,所以直接下载源码进行编译的

1、到github下载源码
dubbo

2、用mvn生成eclipse工程

mvn eclipse:eclipse

3、导入后,进行编译
如果不是为了看代码方便,直接mvn编译也不错哦

4、将spring版本从2升级到3,我用的是3.2.16.RELEASE
如果考虑到后面的dubbo-admin的话,可以使用citrus-webx-all-3.1.6的相同版本,3.2.7.RELEASE

	<properties>
		<spring_version>3.2.16.RELEASE</spring_version>
	</properties>
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework</groupId>
				<artifactId>spring-context</artifactId>
				<version>${spring_version}</version>
			</dependency>
		</dependencies>
	</dependencyManagement>

5、准备将netty3升级到netty4,发现API差距太大,只好后面再搞了哦

6、dubbo-addmin要修改一下依赖

<!--升级citrus-webx-all到3.1.6,但不要升级到3.2.x版本,一堆的错-->
<dependency>
	<groupId>com.alibaba.citrus</groupId>
	<artifactId>citrus-webx-all</artifactId>
	<version>3.1.6</version>
</dependency>

<!--添加依赖包-->
<dependency>
        <groupId>org.apache.velocity</groupId>
        <artifactId>velocity</artifactId>
        <version>1.7</version>
</dependency>

<!--如果你和我一样,用的spring版本与citrus-webx-all不一致,要手工排除一套spring依赖包-->

7、这样就全部编译通过了哦

8、后面准备手工merge一下dubbox的部分代码,可惜他们也没能升级netty4

使用ProtocolBuffer实现RPC简单示例03

接第01部分,本节用来说明Java语言的代码实现。

使用Protoc.exe生成java代码之后,会生成两个java文件,Client与Server都需要包含这两个文件。

首先是Server端:
1、新建一个java项目,引用以下jar包
protobuf-java-3.0.0-beta-2.jar
grpc-all-0.13.1.jar
guava-18.0.jar
netty-all-4.1.0.CR1.jar
okhttp-2.5.0.jar
okio-1.6.0.jar

2、项目中添加生成的两个java文件。

3、新建一个类MyGRPCServer,实现JustATestGrpc.JustATest接口

package com.neohope.protobuf.grpc.test;

import io.grpc.stub.StreamObserver;

public class MyGRPCServer implements JustATestGrpc.JustATest{

    @Override
    public void add(JustATestOuterClass.AddRequest request, StreamObserver<JustATestOuterClass.AddResponse> responseObserver) {
        JustATestOuterClass.AddResponse rsp =  JustATestOuterClass.AddResponse.newBuilder().setC(request.getA() + request.getB()).build();

        responseObserver.onNext(rsp);
        responseObserver.onCompleted();
    }

    @Override
    public void sayHelloTo(JustATestOuterClass.Person request, StreamObserver<JustATestOuterClass.HelloResponse> responseObserver) {
        JustATestOuterClass.HelloResponse rsp =  JustATestOuterClass.HelloResponse.newBuilder().setRsp("Hello "+ request.getName()).build();

        responseObserver.onNext(rsp);
        responseObserver.onCompleted();
    }
}

4、修改TestServer.java

package com.neohope.protobuf.grpc.test;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.netty.NettyServerBuilder;

import java.io.IOException;
import java.util.Scanner;

public class TestServer {
    public static void main(String[] args) throws IOException {
        Server server = NettyServerBuilder.forPort(1900)
                .addService(JustATestGrpc.bindService(new MyGRPCServer()))
                .build()
                .start();

        Scanner scanner =new Scanner(System.in);
        scanner.nextLine();
    }
}

5、编译运行
然后是Client端:
1、新建一个java项目,引用以下jar包
protobuf-java-3.0.0-beta-2.jar
grpc-all-0.13.1.jar
guava-18.0.jar
netty-all-4.1.0.CR1.jar
okhttp-2.5.0.jar
okio-1.6.0.jar

2、项目中添加生成的两个java文件。

3、修改TestClient.java

package com.neohope.protobuf.grpc.test;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

public class TestClient {

    public static void main(String[] args)
    {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 1900)
                .usePlaintext(true)
                .build();

        JustATestGrpc.JustATestBlockingStub blockingStub = JustATestGrpc.newBlockingStub(channel);

        JustATestOuterClass.AddRequest areq = JustATestOuterClass.AddRequest.newBuilder().setA(1).setB(2).build();
        JustATestOuterClass.AddResponse arsp = blockingStub.add(areq);
        System.out.println(arsp.getC());

        JustATestOuterClass.Person preq = JustATestOuterClass.Person.newBuilder().setAge(30).setName("neohope").setSex(JustATestOuterClass.Person.SexType.MALE).build();
        JustATestOuterClass.HelloResponse prsp = blockingStub.sayHelloTo(preq);
        System.out.println(prsp.getRsp());

        channel.shutdown();
    }
}

4、编译运行

使用ProtocolBuffer实现RPC简单示例02

接第01部分,本节用来说明C#语言的代码实现。

使用Protoc.exe生成cs代码之后,会生成两个cs文件,Client与Server都需要包含这两个文件。

首先是Server端:
1、新建一个Console项目,引用Protoc程序集中以下几个dll文件,并添加生成的CS文件
Google.Protobuf.dll
Grpc.Core.dll
System.Interactive.Async.dll

2、新建一个类MyGRPCServer,实现JustATest.IJustATest接口

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Com.Neohope.Protobuf.Grpc.Test;
using Grpc.Core;

namespace TestProtoBufferCliet
{
    class MyGRPCServer : JustATest.IJustATest
    {
        public Task<AddResponse> Add(AddRequest request, ServerCallContext context)
        {
            AddResponse rsp  = new AddResponse();
            rsp.C = request.A + request.B;
            return Task.FromResult(rsp);
        }

        public Task<HelloResponse> SayHelloTo(Person request, ServerCallContext context)
        {
            HelloResponse rsp = new HelloResponse();
            rsp.Rsp = "Hello " + request.Name;
            return Task.FromResult(rsp);
        }
    }
}

3、修改Program.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Com.Neohope.Protobuf.Grpc.Test;
using Grpc.Core;
using TestProtoBufferCliet;


namespace TestProtoBuffer
{
    class Program
    {
        static void Main(string[] args)
        {
            Server server = new Server
            {
                Services = { JustATest.BindService(new MyGRPCServer()) },
                Ports = { new ServerPort("localhost", 1900, ServerCredentials.Insecure) }
            };
            server.Start();

            
        }
    }
}

4、编译运行
其中,非托管dll要如下放置

.
│  yourprogram.exe
│  
└─nativelibs
    ├─windows_x64
    │      grpc_csharp_ext.dll
    │      
    └─windows_x86
            grpc_csharp_ext.dll
            

然后是Client端:
1、新建一个Console项目,引用Protoc程序集中以下几个dll文件,并添加生成的CS文件
Google.Protobuf.dll
Grpc.Core.dll
System.Interactive.Async.dll

2、修改Program.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Com.Neohope.Protobuf.Grpc.Test;
using Grpc.Core;

namespace TestProtoBufferCliet
{
    class Program
    {
        static void Main(string[] args)
        {
            Channel channel = new Channel("127.0.0.1:1900", ChannelCredentials.Insecure);
            JustATest.JustATestClient client = JustATest.NewClient(channel);

            Person p = new Person();
            p.Name = "neohope";
            p.Age = 30;
            p.Sex = Person.Types.SexType.MALE;
            HelloResponse prsp = client.SayHelloTo(p);
            Console.WriteLine(prsp.Rsp);

            AddRequest areq = new AddRequest();
            areq.A = 1;
            areq.B = 2;
            AddResponse arsp = client.Add(areq);
            Console.WriteLine(arsp.C);

            channel.ShutdownAsync().Wait();
            Console.ReadLine();
        }
    }
}

3、编译运行
其中,非托管dll要如下放置

.
│  yourprogram.exe
│  
└─nativelibs
    ├─windows_x64
    │      grpc_csharp_ext.dll
    │      
    └─windows_x86
            grpc_csharp_ext.dll