Zookeeper综合
参考资料
超级详细的一篇博客,整合了ZK的各个特性
https://segmentfault.com/a/1190000014479433
zk的入门级官网材料
http://ifeve.com/zookeeper-talk-quick-start/
zk的配置文件详细说明(包括集群配置)
https://blog.csdn.net/u014394255/article/details/53980656
ibm官方中文教程,教你如何实现队列、选举和共享锁(含代码示例)
https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/
zk的特点
- 内存存储数据,效率高。采取内存读取和日志加快照的方式,保证高速读写又能持久化。
- 节点命名唯一
- 监听机制
实战核心功能
- 注册中心和高可用的实现(Dubbo中的实现原理、Solr_Cloud中的实现)
- 分布式锁的实现
- 队列的实现(同步队列和FIFO队列)
- ZK的集群管理和选举功能
- 集群和高可用的实现
运用场景说明
- zookeeper在dubbo中的作用(Dubbo也可以选Redis作为注册中心)
https://blog.csdn.net/hdu09075340/article/details/71123605
创建根节点为dubbo,二级节点为暴露的接口名、三级节点为providers和consumers、四级节点为具体真正工作的provider和consumer。
- consumer通过zk的路径节点找到对应provider,从而实现RPC调用
- 如果provider发生变动,会被zk监听到,zk会通知到各个consumers
配置中心的实现
多个ZKClient对某个特定的ZNode绑定监听,如果ZNode的内容发送变化(也就是配置发生变化),调用各自watcher的process方法ZK的集群管理和选举的功能
- 选主:
启动一个ZKClinet对某节点下注册的所有ServerNode,排序选最小的一个作为Master,在该Master上绑定一个监听,当该Master挂了的时候会触发监听,重新选Master,实现了集群中一定有一个Master节点的功能 - Master管理:
对非Master节点都绑定监听,如果发生变化,修改或者宕机了,都会触发监听。对根目录节点绑定监听,如果创建了新的节点,也就是引入了新的机器,也触发策略。
Zookeeper通知机制
客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、被删除、子目录节点增加删除)时,zookeeper会通知客户端。
ZK的数据存储机制(内存+快照+事务日志)
https://www.jianshu.com/p/8fba732af0cd
DataTree
DataTree是内存数据存储的核心,是一个树结构,代表了内存中一份完整的数据。DataTree不包含任何与网络、客户端连接及请求处理相关的业务逻辑,是一个独立的组件。DataNode
DataNode是数据存储的最小单元,其内部除了保存了结点的数据内容、ACL列表、节点状态之外,还记录了父节点的引用和子节点列表两个属性,其也提供了对子节点列表进行操作的接口。ZKDatabase
Zookeeper的内存数据库,管理Zookeeper的所有会话、DataTree存储和事务日志。ZKDatabase会定时向磁盘dump快照数据,同时在Zookeeper启动时,会通过磁盘的事务日志和快照文件恢复成一个完整的内存数据库。
安装和配置
单节点配置zoo.cfg
|
集群配置zoo.cfg
- 注意3个端口作用不同:
- clientPort是ZK和使用的客户端之间通讯的端口
- 集群中本机IP:Master通讯端口:Master通讯失败发起Follower重新选举的通讯端口
- 在上面配置的dataDir目录下创建myid文件,填写这个节点上的id号,就是server.A=B:C:D配置的A那个号码 #zk集群配置的时候,和zk集群服务器连接通讯的最长时间为5个心跳时间,而不是和用户的连接时间initLimit=5#Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度syncLimit=5#server.A=B:C:D#其中 A 是一个数字,表示这个是第几号服务器#B 是这个服务器的 ip 地址#C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口(上面的端口Y)#D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口(上面的端口Z)。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号server.1=192.168.238.133:2888:3888server.2=192.168.238.133:2889:3889server.3=192.168.238.133:2890:3890
常见命令和启动
|
zk客户端命令操作
先启动zkcli
JavaAPI操作zk
- 依赖zookeeper-3.4.5.jar包
- 创建、修改、读取、删除节点等操作
- 对zk进行初始化和设置监听器
- Node节点支持4种状态:PERSISTENT(持久化)、PERSISTENT_SEQUENTIAL(持久化并序号自增)、EPHEMERAL(临时,当前session有效)、EPHEMERAL_SEQUENTIAL(临时,当前session有效,序号自增) public class ZooKeeperClient {//同步互斥变量,用来阻塞等待ZooKeeper连接完成之后再进行ZooKeeper的操作命令private static CountDownLatch connectedSemaphore = new CountDownLatch( 1 );public static void main(String[] args) throws Exception {// 创建一个与服务器的连接ZooKeeper zk = new ZooKeeper("127.0.0.1:2181",3000, new Watcher(){//这个是服务器连接完成回调的监听器// 监控所有被触发的事件public void process(WatchedEvent event) {System.out.println("已经触发了" + event.getType() + "事件!");if ( Event.KeeperState.SyncConnected == event.getState() ) {//连接完成的同步事件,互斥变量取消,下面的阻塞停止,程序继续执行connectedSemaphore.countDown();}}});//如果和ZooKeeper服务器的TCP连接还没完全建立,就阻塞等待connectedSemaphore.await();// 创建一个目录节点zk.create("/testRootPath", "testRootData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);// 创建一个子目录节点zk.create("/testRootPath/testChildPathOne", "testChildDataOne".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);System.out.println(new String(zk.getData("/testRootPath",false,null)));// 取出子目录节点列表System.out.println(zk.getChildren("/testRootPath",true));// 修改子目录节点数据zk.setData("/testRootPath/testChildPathOne","modifyChildDataOne".getBytes(),-1);System.out.println("目录节点状态:["+zk.exists("/testRootPath",true)+"]");// 创建另外一个子目录节点zk.create("/testRootPath/testChildPathTwo", "testChildDataTwo".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);System.out.println(new String(zk.getData("/testRootPath/testChildPathTwo",true,null)));// 删除子目录节点zk.delete("/testRootPath/testChildPathTwo",-1);zk.delete("/testRootPath/testChildPathOne",-1);// 删除父目录节点zk.delete("/testRootPath",-1);// 关闭连接zk.close();}}
Znode的特性
- 每个子目录项如 NameService 都被称作为 znode,这个 znode 是被它所在的路径唯一标识,如 Server1 这个 znode 的标识为 /NameService/Server1
- znode 可以有子节点目录,并且每个 znode 可以存储数据,注意 EPHEMERAL 类型的目录节点不能有子节点目录
- znode 是有版本的,每个 znode 中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据
- znode 可以是临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除,Zookeeper 的客户端和服务器通信采用长连接方式,每个客户端和服务器通过心跳来保持连接,这个连接状态称为 session,如果 znode 是临时节点,这个 session 失效,znode 也就删除了
- znode 的目录名可以自动编号,如 App1 已经存在,再创建的话,将会自动命名为 App2
- znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于这个特性实现的,后面在典型的应用场景中会有实例介绍
分布式队列的实现
参考博客含源码
https://blog.csdn.net/evankaka/article/details/70806752
还有这篇高并发网的
http://ifeve.com/zookeeper%EF%BC%8Dcurator/
同步队列的实现
zkClient对根节点进行watcher,当根节点新增了数据的时候会触发监听,计算size()后,再重新绑定监听,知道size()达到某个值,触发同步操作
FIFO队列的实现
生产产品:来自不同进程的线程,往zk的同一个路径下面添加永久性的顺序节点,并且填充内容。需要check上限,防止内存爆了。因为zk自身维护节点的唯一性,所以高并发的时候,也不会产生2个一样的节点
消费产品:来自不同进程的线程,读取zk对应路径的所有节点,排序后,消费序号最小的节点,删除的时候再判断一下是否删除成功,删除失败就递归继续删除。这样即使高并发的时候也不会重复删除相同的节点。
FIFO代码分析
- 核心类:
- SendObject (id,content),记得要序列化才能存储
ProducerThread,带有queue参数,run方法里面 queue.offer
public ProducerThread(DistributedSimpleQueue<SendObject> queue) {this.queue = queue;}public void run() {for (int i = 0; i < 10000; i++) {try {Thread.sleep((int) (Math.random() * 5000));// 随机睡眠一下SendObject sendObject = new SendObject(String.valueOf(i), "content" + i);queue.offer(sendObject);System.out.println("发送一条消息成功:" + sendObject);} catch (Exception e) {}}}ConsumerThread,类似生产线程
核心方法
poll()
public T pollHuang(){List<String> list = zkClient.getChildren(root);if (list.size() == 0) {return null;}//将队列按照由小到大的顺序排序Collections.sort(list, new Comparator<String>() {public int compare(String lhs, String rhs) {return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));}});String nodeFullPath = root.concat("/").concat(list.get(0));T node = (T) zkClient.readData(nodeFullPath);//判断是否删除成功,避免高并发的时候,同时删除,即使删除失败也返回相同的值if(zkClient.delete(nodeFullPath)){return node;}else{pollHuang();}return null;}offer(E elemet)
public boolean offer(T element) throws Exception{if(zkClient.getChildren(root).size()>=MAX_SIZE){return false;}//构建数据节点的完整路径String nodeFullPath = root .concat( "/" ).concat( Node_NAME );try {//创建持久的节点,写入数据zkClient.createPersistentSequential(nodeFullPath , element);}catch (ZkNoNodeException e) {zkClient.createPersistent(root);offer(element);} catch (Exception e) {throw ExceptionUtil.convertToRuntimeException(e);}return true;}