Contents
  1. 1. 参考资料
  2. 2. zk的特点
  3. 3. 实战核心功能
  4. 4. 运用场景说明
  5. 5. Zookeeper通知机制
  6. 6. ZK的数据存储机制(内存+快照+事务日志)
  7. 7. 安装和配置
    1. 7.1. 单节点配置zoo.cfg
    2. 7.2. 集群配置zoo.cfg
    3. 7.3. 常见命令和启动
  8. 8. zk客户端命令操作
  9. 9. JavaAPI操作zk
  10. 10. Znode的特性
  11. 11. 分布式队列的实现
    1. 11.1. 同步队列的实现
    2. 11.2. FIFO队列的实现
    3. 11.3. FIFO代码分析
    4. 11.4. 核心方法

参考资料

超级详细的一篇博客,整合了ZK的各个特性
https://segmentfault.com/a/1190000014479433

zk的入门级官网材料
http://ifeve.com/zookeeper-talk-quick-start/

zk的配置文件详细说明(包括集群配置)
https://blog.csdn.net/u014394255/article/details/53980656

ibm官方中文教程,教你如何实现队列、选举和共享锁(含代码示例)
https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/

zk的特点

  1. 内存存储数据,效率高。采取内存读取和日志加快照的方式,保证高速读写又能持久化。
  2. 节点命名唯一
  3. 监听机制

实战核心功能

  1. 注册中心和高可用的实现(Dubbo中的实现原理、Solr_Cloud中的实现)
  2. 分布式锁的实现
  3. 队列的实现(同步队列和FIFO队列)
  4. ZK的集群管理和选举功能
  5. 集群和高可用的实现

运用场景说明

  1. zookeeper在dubbo中的作用(Dubbo也可以选Redis作为注册中心)
    https://blog.csdn.net/hdu09075340/article/details/71123605
    创建根节点为dubbo,二级节点为暴露的接口名、三级节点为providers和consumers、四级节点为具体真正工作的provider和consumer。
  • consumer通过zk的路径节点找到对应provider,从而实现RPC调用
  • 如果provider发生变动,会被zk监听到,zk会通知到各个consumers
  1. 配置中心的实现
    多个ZKClient对某个特定的ZNode绑定监听,如果ZNode的内容发送变化(也就是配置发生变化),调用各自watcher的process方法

  2. ZK的集群管理和选举的功能

  • 选主:
    启动一个ZKClinet对某节点下注册的所有ServerNode,排序选最小的一个作为Master,在该Master上绑定一个监听,当该Master挂了的时候会触发监听,重新选Master,实现了集群中一定有一个Master节点的功能
  • Master管理:
    对非Master节点都绑定监听,如果发生变化,修改或者宕机了,都会触发监听。对根目录节点绑定监听,如果创建了新的节点,也就是引入了新的机器,也触发策略。

Zookeeper通知机制

客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、被删除、子目录节点增加删除)时,zookeeper会通知客户端。

ZK的数据存储机制(内存+快照+事务日志)

https://www.jianshu.com/p/8fba732af0cd

  1. DataTree
    DataTree是内存数据存储的核心,是一个树结构,代表了内存中一份完整的数据。DataTree不包含任何与网络、客户端连接及请求处理相关的业务逻辑,是一个独立的组件。

  2. DataNode
    DataNode是数据存储的最小单元,其内部除了保存了结点的数据内容、ACL列表、节点状态之外,还记录了父节点的引用和子节点列表两个属性,其也提供了对子节点列表进行操作的接口。

  3. ZKDatabase
    Zookeeper的内存数据库,管理Zookeeper的所有会话、DataTree存储和事务日志。ZKDatabase会定时向磁盘dump快照数据,同时在Zookeeper启动时,会通过磁盘的事务日志和快照文件恢复成一个完整的内存数据库。

安装和配置

单节点配置zoo.cfg

# 心跳时间2s
tickTime=2000
#Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里
dataDir=/tmp/zookeeper
#ZK的日志路径,默认是log4j.properties,可以配置的
#在conf/log4j.properties
dataLogDir=/Users/apple/zookeeper/logs
#客户端连接 Zookeeper 服务器的端口
clientPort=2181

集群配置zoo.cfg

  1. 注意3个端口作用不同:
  • clientPort是ZK和使用的客户端之间通讯的端口
  • 集群中本机IP:Master通讯端口:Master通讯失败发起Follower重新选举的通讯端口
  1. 在上面配置的dataDir目录下创建myid文件,填写这个节点上的id号,就是server.A=B:C:D配置的A那个号码
    #zk集群配置的时候,和zk集群服务器连接通讯的最长时间为5个心跳时间,而不是和用户的连接时间
    initLimit=5
    #Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度
    syncLimit=5
    #server.A=B:C:D
    #其中 A 是一个数字,表示这个是第几号服务器
    #B 是这个服务器的 ip 地址
    #C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口(上面的端口Y)
    #D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口(上面的端口Z)。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号
    server.1=192.168.238.133:2888:3888
    server.2=192.168.238.133:2889:3889
    server.3=192.168.238.133:2890:3890

常见命令和启动

bin/zkServer.sh start
bin/zkServer.sh stop
bin/zkServer.sh status
zkServer.sh : ZooKeeper服务器的启动、停止和重启脚本;
zkCli.sh : ZooKeeper的简易客户端;
zkEnv.sh : 设置ZooKeeper的环境变量;
zkCleanup.sh : 清理ZooKeeper历史数据,包括事务日志文件和快照数据文件

zk客户端命令操作

先启动zkcli

# 创建/testInput节点,设置值为inputData
create /testInput “inputData”
# 获取/testInput中的值
get /testInput
stat path [watch]
set path data [version]
ls path [watch]
delquota [-n|-b] path
ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
delete path [version]
sync path
listquota path
rmr path
get path [watch]
create [-s] [-e] path data acl
addauth scheme auth
quit
getAcl path
close
connect host:port

JavaAPI操作zk

  • 依赖zookeeper-3.4.5.jar包
  • 创建、修改、读取、删除节点等操作
  • 对zk进行初始化和设置监听器
  • Node节点支持4种状态:PERSISTENT(持久化)、PERSISTENT_SEQUENTIAL(持久化并序号自增)、EPHEMERAL(临时,当前session有效)、EPHEMERAL_SEQUENTIAL(临时,当前session有效,序号自增)
    public class ZooKeeperClient {
    //同步互斥变量,用来阻塞等待ZooKeeper连接完成之后再进行ZooKeeper的操作命令
    private static CountDownLatch connectedSemaphore = new CountDownLatch( 1 );
    public static void main(String[] args) throws Exception {
    // 创建一个与服务器的连接
    ZooKeeper zk = new ZooKeeper("127.0.0.1:2181",3000, new Watcher(){//这个是服务器连接完成回调的监听器
    // 监控所有被触发的事件
    public void process(WatchedEvent event) {
    System.out.println("已经触发了" + event.getType() + "事件!");
    if ( Event.KeeperState.SyncConnected == event.getState() ) {
    //连接完成的同步事件,互斥变量取消,下面的阻塞停止,程序继续执行
    connectedSemaphore.countDown();
    }
    }
    });
    //如果和ZooKeeper服务器的TCP连接还没完全建立,就阻塞等待
    connectedSemaphore.await();
    // 创建一个目录节点
    zk.create("/testRootPath", "testRootData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
    // 创建一个子目录节点
    zk.create("/testRootPath/testChildPathOne", "testChildDataOne".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
    System.out.println(new String(zk.getData("/testRootPath",false,null)));
    // 取出子目录节点列表
    System.out.println(zk.getChildren("/testRootPath",true));
    // 修改子目录节点数据
    zk.setData("/testRootPath/testChildPathOne","modifyChildDataOne".getBytes(),-1);
    System.out.println("目录节点状态:["+zk.exists("/testRootPath",true)+"]");
    // 创建另外一个子目录节点
    zk.create("/testRootPath/testChildPathTwo", "testChildDataTwo".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
    System.out.println(new String(zk.getData("/testRootPath/testChildPathTwo",true,null)));
    // 删除子目录节点
    zk.delete("/testRootPath/testChildPathTwo",-1);
    zk.delete("/testRootPath/testChildPathOne",-1);
    // 删除父目录节点
    zk.delete("/testRootPath",-1);
    // 关闭连接
    zk.close();
    }
    }

Znode的特性

  1. 每个子目录项如 NameService 都被称作为 znode,这个 znode 是被它所在的路径唯一标识,如 Server1 这个 znode 的标识为 /NameService/Server1
  2. znode 可以有子节点目录,并且每个 znode 可以存储数据,注意 EPHEMERAL 类型的目录节点不能有子节点目录
  3. znode 是有版本的,每个 znode 中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据
  4. znode 可以是临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除,Zookeeper 的客户端和服务器通信采用长连接方式,每个客户端和服务器通过心跳来保持连接,这个连接状态称为 session,如果 znode 是临时节点,这个 session 失效,znode 也就删除了
  5. znode 的目录名可以自动编号,如 App1 已经存在,再创建的话,将会自动命名为 App2
  6. znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于这个特性实现的,后面在典型的应用场景中会有实例介绍

分布式队列的实现

参考博客含源码
https://blog.csdn.net/evankaka/article/details/70806752

还有这篇高并发网的
http://ifeve.com/zookeeper%EF%BC%8Dcurator/

同步队列的实现

zkClient对根节点进行watcher,当根节点新增了数据的时候会触发监听,计算size()后,再重新绑定监听,知道size()达到某个值,触发同步操作

FIFO队列的实现

  1. 生产产品:来自不同进程的线程,往zk的同一个路径下面添加永久性的顺序节点,并且填充内容。需要check上限,防止内存爆了。因为zk自身维护节点的唯一性,所以高并发的时候,也不会产生2个一样的节点

  2. 消费产品:来自不同进程的线程,读取zk对应路径的所有节点,排序后,消费序号最小的节点,删除的时候再判断一下是否删除成功,删除失败就递归继续删除。这样即使高并发的时候也不会重复删除相同的节点。

FIFO代码分析

  1. 核心类:
  • SendObject (id,content),记得要序列化才能存储
  • ProducerThread,带有queue参数,run方法里面 queue.offer

    public ProducerThread(DistributedSimpleQueue<SendObject> queue) {
    this.queue = queue;
    }
    public void run() {
    for (int i = 0; i < 10000; i++) {
    try {
    Thread.sleep((int) (Math.random() * 5000));// 随机睡眠一下
    SendObject sendObject = new SendObject(String.valueOf(i), "content" + i);
    queue.offer(sendObject);
    System.out.println("发送一条消息成功:" + sendObject);
    } catch (Exception e) {
    }
    }
    }
  • ConsumerThread,类似生产线程

    核心方法

  1. poll()

    public T pollHuang(){
    List<String> list = zkClient.getChildren(root);
    if (list.size() == 0) {
    return null;
    }
    //将队列按照由小到大的顺序排序
    Collections.sort(list, new Comparator<String>() {
    public int compare(String lhs, String rhs) {
    return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));
    }
    });
    String nodeFullPath = root.concat("/").concat(list.get(0));
    T node = (T) zkClient.readData(nodeFullPath);
    //判断是否删除成功,避免高并发的时候,同时删除,即使删除失败也返回相同的值
    if(zkClient.delete(nodeFullPath)){
    return node;
    }else{
    pollHuang();
    }
    return null;
    }
  2. offer(E elemet)

    public boolean offer(T element) throws Exception{
    if(zkClient.getChildren(root).size()>=MAX_SIZE){
    return false;
    }
    //构建数据节点的完整路径
    String nodeFullPath = root .concat( "/" ).concat( Node_NAME );
    try {
    //创建持久的节点,写入数据
    zkClient.createPersistentSequential(nodeFullPath , element);
    }catch (ZkNoNodeException e) {
    zkClient.createPersistent(root);
    offer(element);
    } catch (Exception e) {
    throw ExceptionUtil.convertToRuntimeException(e);
    }
    return true;
    }
Contents
  1. 1. 参考资料
  2. 2. zk的特点
  3. 3. 实战核心功能
  4. 4. 运用场景说明
  5. 5. Zookeeper通知机制
  6. 6. ZK的数据存储机制(内存+快照+事务日志)
  7. 7. 安装和配置
    1. 7.1. 单节点配置zoo.cfg
    2. 7.2. 集群配置zoo.cfg
    3. 7.3. 常见命令和启动
  8. 8. zk客户端命令操作
  9. 9. JavaAPI操作zk
  10. 10. Znode的特性
  11. 11. 分布式队列的实现
    1. 11.1. 同步队列的实现
    2. 11.2. FIFO队列的实现
    3. 11.3. FIFO代码分析
    4. 11.4. 核心方法