进度信息记录于zookeeper 路径的哪个路径下

zookeeper(11)
  上一篇博文讲解了Zookeeper的典型应用场景,在大数据时代,各种分布式系统层出不穷,其中,有很多系统都直接或间接使用了Zookeeper,用来解决诸如配置管理、分布式通知/协调、集群管理和Master选举等一系列分布式问题。
二、 Hadoop
  Hadoop的核心是HDFS(Hadoop Distributed File System)和MapReduce,分别提供了对海量数据的存储和计算能力,后来,Hadoop又引入了全新MapReduce框架YARN(Yet Another Resource Negotiator)。在Hadoop中,Zookeeper主要用于实现HA(High
Availability),这部分逻辑主要集中在Hadoop Common的HA模块中,HDFS的NameNode与YARN的ResourceManager都是基于此HA模块来实现自己的HA功能,YARN又使用了Zookeeper来存储应用的运行状态。
  YARN是一种新的 Hadoop 资源管理器,它是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。其可以支持MapReduce模型,同时也支持Tez、Spark、Storm、Impala、Open MPI等。
  YARN主要由ResourceManager(RM)、NodeManager(NM)、ApplicationManager(AM)、Container四部分构成。其中,ResourceManager为全局资源管理器,负责整个系统的资源管理和分配。由YARN体系架构可以看到ResourceManager的单点问题,ResourceManager的工作状况直接决定了整个YARN架构是否可以正常运转。
  ResourceManager HA
  为了解决ResourceManager的单点问题,YARN设计了一套Active/Standby模式的ResourceManager HA架构。
  由上图可知,在运行期间,会有多个ResourceManager并存,并且其中只有一个ResourceManager处于Active状态,另外一些(允许一个或者多个)则处于Standby状态,当Active节点无法正常工作时,其余处于Standby状态的节点则会通过竞争选举产生新的Active节点。
  主备切换
  ResourceManager使用基于Zookeeper实现的ActiveStandbyElector组件来确定ResourceManager的状态。具体步骤如下
  1. 创建锁节点。在Zookeeper上会有一个类似于/yarn-leader-election/pseudo-yarn-rm-cluster的锁节点,所有的ResourceManager在启动时,都会去竞争写一个Lock子节点(/yarn-leader-election/pseudo-yarn-rm-cluster/ActiveStandbyElectorLock),子节点类型为临时节点,利用Zookeeper的特性,创建成功的那个ResourceManager切换为Active状态,其余的为Standby状态。
  2. 注册Watcher监听。所有Standby状态的ResourceManager都会向/yarn-leader-election/pseudo-yarn-rm-cluster/ActiveStandbyElectorLock节点注册一个节点变更监听,利用临时节点的特性,能够快速感知到Active状态的ResourceManager的运行情况。
  3. 主备切换。当Active的ResourceManager无法正常工作时,其创建的Lock节点也会被删除,此时,其余各个Standby的ResourceManager都会收到通知,然后重复步骤1。
  隔离(Fencing)
  在分布式环境中,经常会出现诸如单机假死(机器由于网络闪断或是其自身由于负载过高,常见的有GC占用时间过长或CPU负载过高,而无法正常地对外进行及时响应)情况。假设RM集群由RM1和RM2两台机器构成,某一时刻,RM1发生了假死,此时,Zookeeper认为RM1挂了,然后进行主备切换,RM2会成为Active状态,但是在随后,RM1恢复了正常,其依然认为自己还处于Active状态,这就是分布式脑裂现象,即存在多个处于Active状态的RM工作,可以使用隔离来解决此类问题。
  YARN引入了Fencing机制,借助Zookeeper的数据节点的ACL权限控制机制来实现不同RM之间的隔离。在上述主备切换时,多个RM之间通过竞争创建锁节点来实现主备状态的确定,此时,只需要在创建节点时携带Zookeeper的ACL信息,目的是为了独占该节点,以防止其他RM对该节点进行更新。
  还是上述案例,若RM1出现假死,Zookeeper会移除其创建的节点,此时RM2会创建相应的锁节点并切换至Active状态,RM1恢复之后,会试图去更新Zookeeper相关数据,但是此时其没有权限更新Zookeeper的相关节点数据,因为节点不是由其创建的,于是就自动切换至Standby状态,这样就避免了脑裂现象的出现。
  ResourceManager状态存储
  在ResourceManager中,RMStateStore可以存储一些RM的内部状态信息,包括Application以及Attempts信息、Delegation Token及Version Information等,值得注意的是,RMStateStore的绝大多数状态信息都是不需要持久化存储的(如资源使用情况),因为其很容易从上下文信息中重构,,在存储方案设计中,提供了三种可能的实现。
  1. 基于内存实现,一般用于日常开发测试。
  2. 基于文件系统实现,如HDFS。
  3. 基于Zookeeper实现。
  由于存储的信息不是特别大,Hadoop官方建议基于Zookeeper来实现状态信息的存储,在Zookeeper中,ResourceManager的状态信息都被存储在/rmstore这个根节点下,其数据结构如下。
  在RMAppRoot节点下存储的是与各个Application相关的信息,RMDTSecretManagerRoot存储的是与安全相关的Token信息。每个Active状态的ResourceManager在初始化节点都会从Zookeeper上读取到这些信息,并根据这些状态信息继续后续的处理。
  HBase(Hadoop Database)是一个基于Hadoop的文件系统设计的面向海量数据的高可靠、高性能、面向列、可伸缩的分布式存储系统,其针对数据写入具有强一致性,索引列也实现了强一致性,其采用了Zookeeper服务来完成对整个系统的分布式协调工作,其架构如下
  HBase架构中,Zookeeper是串联起HBase集群与Client的关键所在,Zookeeper在HBase中的系统容错、RootRegion管理、Region状态管理、分布式SplitLog任务管理、Replication管理都扮演重要角色。
  系统容错
  在HBase启动时,每个RegionServer服务器都会到Zookeeper的/hbase/rs节点下创建一个信息节点,例如/hbase/rs/[Hostname],同时,HMaster会对这个节点进行监听,当某个RegionServer挂掉时,Zookeeper会因为在一段时间内无法接收其心跳信息(Session失效),而删除掉该RegionServer服务器对应的节点,与此同时,HMaster则会接收到Zookeeper的NodeDelete通知,从而感知到某个节点断开,并立即开始容错工作,HMaster会将该RegionServer所处理的数据分片(Region)重新路由到其他节点上,并记录到Meta信息中供客户端查询。
  RootRegion管理
  数据存储的位置信息是记录在元数据分片上的,即在RootRegion上,每次客户端发起新的请求,就会查询RootRegion来确定数据的位置,而RootRegion自身的位置则记录在Zookeeper上(默认情况下在/hbase/root-region-server节点中)。当RootRegion发生变化时,如Region手工移动、Balance或者是RootRegion所在服务器发生了故障,就能通过Zookeeper来感知到这一变化并做出一系列相应的容灾措施,从而保障客户端总能够拿到正确的RootRegion信息。
  Region状态管理
  Region是HBase中数据的物理切片,每个Region记录了全局数据的一小部分,并且不同的Region之间的数据是相互不重复的。但对于一个分布式系统来说,Region是会发生变更的,原因可能是系统故障、负载均衡、配置修改、Region分裂合并等,一旦Region发生变动,它必然经历离线和重新在线的过程。在离线期间,数据是不能被访问的,并且Region的状态变化必须让全局知晓,否则可能会出现某些事务性的异常,而对于HBase集群而言,Region的数量可能会多达10万级别,因此这样规模的Region状态管理只有依靠Zookeeper才能做到。
  分布式SplitLog任务管理
  当某台RegionServer服务器挂掉后,由于总有一部分新写入的数据还没有持久化到HFile中(在内存中),因此在迁移该RegionServer的服务时,应该从HLog中恢复这部分还在内存中的数据,此时HMaster需要遍历该RegionServer服务器的HLog(SplitLog),并按照Region切分成小块移动到新的地址,并进行数据的Replay。由于单个RegionServer的日志量相对庞大(可能存在上千个Region,上GB的日志),而用户往往希望系统能够快速完成日志的恢复工作,因此,需要将处理HLog的任务分配给多台RegionServer服务器共同处理,而这又需要一个持久化组件来辅助HMaster完成任务的分配,当前做法如下,HMaster会在Zookeeper上创建一个splitlog节点(默认为/hbase/splitlog节点),将"哪个RegionServer处理哪个Region"的信息以列表的形式存放在该节点上,然后由各个RegionServer服务器自行到该节点上去领取任务并在任务执行成功或失败后再更新该节点的信息以通知HMaster继续后续步骤,Zookeeper起到了相互通知和信息持久化的角色。
  Replication管理
  Replication是实现HBase中主备集群间的实时同步的重要模块,与传统关系型数据库不同的是,HBase的Replication是多对多的,且每个节点随时都有可能挂掉,因此其会复杂得多。HBase也借助了Zookeeper来完成Replication功能,做法是在Zookeeper上记录一个replication节点(默认是/hbase/replication节点),然后把不同的RegionServer服务器对应的HLog文件名称记录到相应的节点上,HMaster集群会将新增的数据推送给Slave集群,并同时将推送信息记录到Zookeeper上(称为断点记录),然后重复上述步骤,当服务器挂掉时,由于Zookeeper上已经保存了断点信息,因此只要有HMaster能够根据这些信息来协同用来推送HLog数据的主节点服务器就可以进行继续复制操作。
  kafka是一个吞吐量极高的分布式消息系统,其整体设计是典型的发布与订阅系统模式,在Kafka集群中,没有中心主节点概念,所有服务器都是对等的,因此,可以在不做任何配置更改的情况下实现服务器的添加与删除,同样,消息的生产者和消费者也能够随意重启和机器的上下线。
  生产者(Producer):消息产生的源头,负责生成消息并发送到Kafka服务器。
  消费者(Consumer):消息的使用方,负责消费Kafka服务器上的消息。
  主题(Topic):由用户定义并配置在Kafka服务端,用于建立生产者和消费者之间的订阅关系,生产者发送消息到指定Topic下,消费者从这个Topic中消费消息。
  消息分区(Partition):一个Topic下会分为多个分区,如"kafka-test"这个Topic可以分为10个分区,分别由两台服务器提供,那么通常可以配置让每台服务器提供5个分区,假设服务器ID为0和1,那么分区为0-0、0-1、0-2、0-3、0-4和1-0、 1-1、1-2、1-3、1-4。消息分区机制和分区的数量与消费者的负载均衡机制有很大的关系。
  服务器(Broker):用于存储信息,在消息中间件中通常被称为Broker。
  消费者分组(Group):归组同类消费者,多个消费者可以共同消费一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者组成了消费者分组,拥有同一个分组名称,通常也被称为消费者集群。
  偏移量(Offset):消息存储在Kafka的Broker上,消费者拉取消息数据的过程中需要知道消息在文件中的偏移量。
  Broker注册
  Broker是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来,此时就使用到了Zookeeper。在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点/brokers/ids。每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点,如/brokers/ids/[0...N]。Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的Broker ID进行注册,创建完节点后,每个Broker就会将自己的IP地址和端口信息记录到该节点中去。其中,Broker创建的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除。
  Topic注册
  在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,由专门的节点来记录,如/borkers/topics。Kafka中每个Topic都会以/brokers/topics/[topic]的形式被记录,如/brokers/topics/login和/brokers/topics/search等。Broker服务器启动后,会到对应Topic节点(/brokers/topics)上注册自己的Broker ID并写入针对该Topic的分区总数,如/brokers/topics/login/3-&2,这个节点表示Broker
ID为3的一个Broker服务器,对于"login"这个Topic的消息,提供了2个分区进行消息存储,同样,这个分区节点也是临时节点。
  生产者负载均衡
  由于同一个Topic消息会被分区并将其分布在多个Broker上,因此,生产者需要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。
  ① 四层负载均衡,根据生产者的IP地址和端口来为其确定一个相关联的Broker。通常,一个生产者只会对应单个Broker,然后该生产者产生的消息都发往该Broker。这种方式逻辑简单,每个生产者不需要同其他系统建立额外的TCP连接,只需要和Broker维护单个TCP连接即可。但是,其无法做到真正的负载均衡,因为实际系统中的每个生产者产生的消息量及每个Broker的消息存储量都是不一样的,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的Broker接收到的消息总数差异巨大,同时,生产者也无法实时感知到Broker的新增和删除。
  ② 使用Zookeeper进行负载均衡,由于每个Broker启动时,都会完成Broker注册过程,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更,这样就可以实现动态的负载均衡机制。
  消费者负载均衡
  与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。
  消费分区与消费者的关系
  对于每个消费者分组,Kafka都会为其分配一个全局唯一的Group ID,同一个消费者分组内部的所有消费者共享该ID。同时,Kafka为每个消费者分配一个Consumer ID,通常采用"Hostname:UUID"形式表示。在Kafka中,规定了每个消息分区有且只能同时有一个消费者进行消费,因此,需要在Zookeeper上记录消息分区与消费者之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到对应消息分区的临时节点上,例如/consumers/[group_id]/owners/[topic]/[broker_id-partition_id],其中,[broker_id-partition_id]就是一个消息分区的标识,节点内容就是该消费分区上消息消费者的Consumer
  消息消费进度Offset记录
  在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset在Zookeeper中由一个专门节点进行记录,其节点路径为/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id],节点内容就是Offset的值。
  消费者注册
  消费者服务器在初始化启动时加入消费者分组的步骤如下
  ① 注册到消费者分组。每个消费者服务器启动时,都会到Zookeeper的指定节点下创建一个属于自己的消费者节点,例如/consumers/[group_id]/ids/[consumer_id],完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点。
  ② 对消费者分组中的消费者的变化注册监听。每个消费者都需要关注所属消费者分组中其他消费者服务器的变化情况,即对/consumers/[group_id]/ids节点注册子节点变化的Watcher监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。
  ③ 对Broker服务器变化注册监听。消费者需要对/broker/ids/[0-N]中的节点进行监听,如果发现Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡。
  ④ 进行消费者负载均衡。为了让同一个Topic下不同分区的消息尽量均衡地被多个消费者消费而进行消费者与消息分区分配的过程,通常,对于一个消费者分组,如果组内的消费者服务器发生变更或Broker服务器发生变更,会发出消费者负载均衡。
  负载均衡
  Kafka借助Zookeeper上记录的Broker和消费者信息,采用消费者均衡算法进行负载均衡,其具体步骤如下。假设一个消息分组的每个消费者记为C1,C2,Ci,...,Cn。那么对于消费者Ci,其对应的消息分区分配策略如下:
  1. 设置Pr为指定Topic所有的消息分区。
  2. 设置Cg为统一消费者分组中的所有消费者。
  3. 对Pr进行排序,使分布在同一个Broker服务器上的分区尽量靠在一起。
  4. 对Cg进行排序。
  5. 设置i为Ci在Cg中的位置索引,同时设置N = size (Pr) / size (Cg)。
  6. 将编号为i * N ~ (i + 1) * N - 1的消息分区分配给Ci。
  7. 重新更新Zookeeper上消息分区与消费者Ci的关系。
转载:http://www.cnblogs.com/leesf456/p/6063694.html> 博客详情
摘要: https://github.com/Netflix/curator/wiki
Netflix curator客户端库官方地址
Zookeeper应用场景
ZooKeeper是一个高可用的分布式数据管理与系统协调框架。基于对Paxos算法的实现,使该框架保证了分布式环境中数据的强一致性,也正是基于这样的特性,使得zookeeper能够应用于很多场景。需要注意的是,ZK并不是生来就为这些场景设计,都是后来众多开发者根据框架的特性,摸索出来的典型使用方法。因此,我们也可以根据自己的需要来设计相应的场景实现。正如前文所提到的,ZooKeeper&实现的任何功能都离不开ZooKeeper的数据结构,任何功能的实现都是利用"Znode结构特性+节点关联的数据"来实现的,好吧那么我们就看一下ZooKeeper数据结构有哪些特性。ZooKeeper数据结构如下图所示:
图2.1 ZooKeeper数据结构
Zookeeper&这种数据结构有如下这些特点:
①&每个子目录项如&NameService&都被称作为&znode,这个&znode&是被它所在的路径唯一标识,如Server1&这个&znode&的标识为&/NameService/Server1;
②&znode&可以有子节点目录,并且每个&znode&可以存储数据,注意&EPHEMERAL&类型的目录节点不能有子节点目录;
③&znode&是有版本的,每个&znode&中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据;
④&znode&可以是临时节点,一旦创建这个&znode&的客户端与服务器失去联系,这个&znode&也将自动删除,Zookeeper&的客户端和服务器通信采用长连接方式,每个客户端和服务器通过心跳来保持连接,这个连接状态称为&session,如果&znode&是临时节点,这个&session&失效,znode&也就删除了;
⑤&znode&的目录名可以自动编号,如&App1&已经存在,再创建的话,将会自动命名为&App2;
⑥&znode&可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个是&Zookeeper&的核心特性,Zookeeper&的很多功能都是基于这个特性实现的。
1.0 Zookeeper节点类型
ZooKeeper 节点是有生命周期的,这取决于节点的类型。在 ZooKeeper 中,节点类型可以分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),以及时序节点(SEQUENTIAL ),具体在节点创建过程中,一般是组合使用,可以生成以下 4 种节点类型。
持久节点(PERSISTENT) 所谓持久节点,是指在节点创建后,就一直存在,直到有删除操作来主动清除这个节点——不会因为创建该节点的客户端会话失效而消失。 持久顺序节点(PERSISTENT_SEQUENTIAL) 这类节点的基本特性和上面的节点类型是一致的。额外的特性是,在ZK中,每个父节点会为他的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。基于这个特性,在创建子节点的时候,可以设置这个属性,那么在创建节点过程中,ZK会自动为给定节点名加上一个数字后缀,作为新的节点名。这个数字后缀的范围是整型的最大值。 临时节点(EPHEMERAL) 和持久节点不同的是,临时节点的生命周期和客户端会话绑定。也就是说,如果客户端会话失效,那么这个节点就会自动被清除掉。注意,这里提到的是会话失效,而非连接断开。另外,在临时节点下面不能创建子节点。 临时顺序节点(EPHEMERAL_SEQUENTIAL) 可以用来实现分布式锁 & & & &客户端调用create()方法创建名为“_locknode_/guid-lock-”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。 & & &客户端调用getChildren(“_locknode_”)方法来获取所有已经创建的子节点,注意,这里不注册任何Watcher。客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点序号最小,那么就认为这个客户端获得了锁。如果在步骤3中发现自己并非所有子节点中最小的,说明自己还没有获取到锁。此时客户端需要找到比自己小的那个节点,然后对其调用exist()方法,同时注册事件监听。之后当这个被关注的节点被移除了,客户端会收到相应的通知。这个时候客户端需要再次调用getChildren(“_locknode_”)方法来获取所有已经创建的子节点,确保自己确实是最小的节点了,然后进入步骤3。
1.1数据发布与订阅
(1)&典型场景描述
发布与订阅即所谓的配置管理,顾名思义就是将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。例如全局的配置信息,地址列表等就非常适合使用。集中式的配置管理在应用集群中是非常常见的,一般商业公司内部都会实现一套集中的配置管理中心,应对不同的应用集群对于共享各自配置的需求,并且在配置变更时能够通知到集群中的每一个机器。
①&索引信息和集群中机器节点状态存放在ZK的一些指定节点,供各个客户端订阅使用。
②&系统日志(经过处理后的)存储,这些日志通常2-3天后被清除。
③&应用中用到的一些配置信息集中管理,在应用启动的时候主动来获取一次,并且在节点上注册一个Watcher,以后每次配置有更新,实时通知到应用,获取最新配置信息。
④&业务逻辑中需要用到的一些全局变量,比如一些消息中间件的消息队列通常有个offset,这个offset存放在zk上,这样集群中每个发送者都能知道当前的发送进度。
⑤&系统中有些信息需要动态获取,并且还会存在人工手动去修改这个信息。以前通常是暴露出接口,例如JMX接口,有了ZK后,只要将这些信息存放到ZK节点上即可。
(3)&应用举例
例如:同一个应用系统需要多台&PC Server&运行,但是它们运行的应用系统的某些配置项是相同的,如果要修改这些相同的配置项,那么就必须同时修改每台运行这个应用系统的&PC Server,这样非常麻烦而且容易出错。将配置信息保存在&Zookeeper&的某个目录节点中,然后将所有需要修改的应用机器监控配置信息的状态,一旦配置信息发生变化,每台应用机器就会收到&Zookeeper&的通知,然后从&Zookeeper&获取新的配置信息应用到系统中。ZooKeeper配置管理服务如下图所示:
图2.2 配置管理结构图
Zookeeper很容易实现这种集中式的配置管理,比如将所需要的配置信息放到/Configuration&节点上,集群中所有机器一启动就会通过Client对/Configuration这个节点进行监控【zk.exist("/Configuration″,true)】,并且实现Watcher回调方法process(),那么在zookeeper上/Configuration节点下数据发生变化的时候,每个机器都会收到通知,Watcher回调方法将会被执行,那么应用再取下数据即可【zk.getData("/Configuration″,false,null)】。
1.2统一命名服务(Name Service)
(1)&场景描述
分布式应用中,通常需要有一套完整的命名规则,既能够产生唯一的名称又便于人识别和记住,通常情况下用树形的名称结构是一个理想的选择,树形的名称结构是一个有层次的目录结构,既对人友好又不会重复。说到这里你可能想到了&JNDI,没错&Zookeeper&的&Name Service&与&JNDI&能够完成的功能是差不多的,它们都是将有层次的目录结构关联到一定资源上,但是Zookeeper的Name Service&更加是广泛意义上的关联,也许你并不需要将名称关联到特定资源上,你可能只需要一个不会重复名称,就像数据库中产生一个唯一的数字主键一样。
在分布式系统中,通过使用命名服务,客户端应用能够根据指定的名字来获取资源服务的地址,提供者等信息。被命名的实体通常可以是集群中的机器,提供的服务地址,进程对象等等,这些我们都可以统称他们为名字(Name)。其中较为常见的就是一些分布式服务框架中的服务地址列表。通过调用ZK提供的创建节点的API,能够很容易创建一个全局唯一的path,这个path就可以作为一个名称。Name Service&已经是Zookeeper&内置的功能,你只要调用&Zookeeper&的&API&就能实现。如调用&create&接口就可以很容易创建一个目录节点。
(3)&应用举例
阿里开源的分布式服务框架Dubbo中使用ZooKeeper来作为其命名服务,维护全局的服务地址列表。在Dubbo实现中:&服务提供者在启动的时候,向ZK上的指定节点/dubbo/${serviceName}/providers目录下写入自己的URL地址,这个操作就完成了服务的发布。&服务消费者启动的时候,订阅/dubbo/serviceName/providers目录下的提供者URL地址,并向/dubbo/{serviceName} /consumers目录下写入自己的URL地址。 注意,所有向ZK上注册的地址都是临时节点,这样就能够保证服务提供者和消费者能够自动感应资源的变化。 另外,Dubbo还有针对服务粒度的监控,方法是订阅/dubbo/${serviceName}目录下所有提供者和消费者的信息。
1.3分布通知/协调(Distribution of notification/coordination)
(1)&典型场景描述
ZooKeeper中特有watcher注册与异步通知机制,能够很好的实现分布式环境下不同系统之间的通知与协调,实现对数据变更的实时处理。使用方法通常是不同系统都对ZK上同一个znode进行注册,监听znode的变化(包括znode本身内容及子节点的),其中一个系统update了znode,那么另一个系统能够收到通知,并作出相应处理。
①&另一种心跳检测机制:检测系统和被检测系统之间并不直接关联起来,而是通过ZK上某个节点关联,大大减少系统耦合。
②&另一种系统调度模式:某系统由控制台和推送系统两部分组成,控制台的职责是控制推送系统进行相应的推送工作。管理人员在控制台作的一些操作,实际上是修改了ZK上某些节点的状态,而ZK就把这些变化通知给他们注册Watcher的客户端,即推送系统,于是,作出相应的推送任务。
③&另一种工作汇报模式:一些类似于任务分发系统,子任务启动后,到ZK来注册一个临时节点,并且定时将自己的进度进行汇报(将进度写回这个临时节点),这样任务管理者就能够实时知道任务进度。
总之,使用zookeeper来进行分布式通知和协调能够大大降低系统之间的耦合。
1.4分布式锁(Distribute Lock)
(1)&场景描述
&&&&分布式锁,这个主要得益于ZooKeeper为我们保证了数据的强一致性,即用户只要完全相信每时每刻,zk集群中任意节点(一个zk server)上的相同znode的数据是一定是相同的。锁服务可以分为两类,一个是保持独占,另一个是控制时序。全局时序和独占
&&&&&&&&保持独占,就是所有试图来获取这个锁的客户端,最终只有一个可以成功获得这把锁。通常的做法是把ZK上的一个znode看作是一把锁,通过create znode的方式来实现。所有客户端都去创建&/distribute_lock&节点,最终成功创建的那个客户端也即拥有了这把锁。
&&&&& & 全局时序,就是所有试图来获取这个锁的客户端,最终都是会被安排执行,只是有个全局时序了。做法和上面基本类似,只是这里&/distribute_lock&已经预先存在,客户端在它下面创建临时有序节点。Zk的父节点(/distribute_lock)维持一份sequence,保证子节点创建的时序性,从而也形成了每个客户端的全局时序。
共享锁在同一个进程中很容易实现,但是在跨进程或者在不同&Server&之间就不好实现了。Zookeeper&却很容易实现这个功能,实现方式也是需要获得锁的&Server&创建一个&EPHEMERAL_SEQUENTIAL&目录节点,然后调用&getChildren方法获取当前的目录节点列表中最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么它就获得了这个锁,如果不是那么它就调用&exists(String path, boolean watch)&方法并监控Zookeeper&上目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,从而获得锁,释放锁很简单,只要删除前面它自己所创建的目录节点就行了。
图 2.3 ZooKeeper实现Locks的流程图
1.5&集群管理(Cluster Management)
(1)&典型场景描述
集群机器监控:
这通常用于那种对集群中机器状态,机器在线率有较高要求的场景,能够快速对集群中机器变化作出响应。这样的场景中,往往有一个监控系统,实时检测集群机器是否存活。过去的做法通常是:监控系统通过某种手段(比如ping)定时检测每个机器,或者每个机器自己定时向监控系统汇报"我还活着"。 这种做法可行,但是存在两个比较明显的问题:
①&集群中机器有变动的时候,牵连修改的东西比较多。
②&有一定的延时。利用ZooKeeper中两个特性,就可以实施另一种集群机器存活性监控系统:
①&客户端在节点&x&上注册一个Watcher,那么如果&x&的子节点变化了,会通知该客户端。
②&创建EPHEMERAL类型的节点,一旦客户端和服务器的会话结束或过期,那么该节点就会消失。
Master选举:
Master选举则是zookeeper中最为经典的使用场景了,在分布式环境中,相同的业务应用分布在不同的机器上,有些业务逻辑,例如一些耗时的计算,网络I/O处,往往只需要让整个集群中的某一台机器进行执行,其余机器可以共享这个结果,这样可以大大减少重复劳动,提高性能,于是这个master选举便是这种场景下的碰到的主要问题。
利用ZooKeeper中两个特性,就可以实施另一种集群中Master选举:
①&利用ZooKeeper的强一致性,能够保证在分布式高并发情况下节点创建的全局唯一性,即:同时有多个客户端请求创建&/Master&节点,最终一定只有一个客户端请求能够创建成功。利用这个特性,就能很轻易的在分布式环境中进行集群选举了。
②另外,这种场景演化一下,就是动态Master选举。这就要用到&EPHEMERAL_SEQUENTIAL类型节点的特性了,这样每个节点会自动被编号。允许所有请求都能够创建成功,但是得有个创建顺序,每次选取序列号最小的那个机器作为Master&。
在搜索系统中,如果集群中每个机器都生成一份全量索引,不仅耗时,而且不能保证彼此间索引数据一致。因此让集群中的Master来迚行全量索引的生成,然后同步到集群中其它机器。另外,Master选丼的容灾措施是,可以随时迚行手动挃定master,就是说应用在zk在无法获取master信息时,可以通过比如http方式,向一个地方获取master。在Hbase中,也是使用ZooKeeper来实现动态HMaster的选举。在Hbase实现中,会在ZK上存储一些ROOT表的地址和HMaster的地址,HRegionServer也会把自己以临时节点(Ephemeral)的方式注册到Zookeeper中,使得HMaster可以随时感知到各个HRegionServer的存活状态,同时,一旦HMaster出现问题,会重新选出一个HMaster来运行,从而避免了HMaster的单点问题的存活状态,同时,一旦HMaster出现问题,会重新选出一个HMaster来运行,从而避免了HMaster的单点问题。
(3)&应用举例
集群监控:
应用集群中,我们常常需要让每一个机器知道集群中或依赖的其他某一个集群中哪些机器是活着的,并且在集群机器因为宕机,网络断链等原因能够不在人工介入的情况下迅速通知到每一个机器,Zookeeper&能够很容易的实现集群管理的功能,如有多台&Server&组成一个服务集群,那么必须要一个"总管"知道当前集群中每台机器的服务状态,一旦有机器不能提供服务,集群中其它集群必须知道,从而做出调整重新分配服务策略。同样当增加集群的服务能力时,就会增加一台或多台&Server,同样也必须让"总管"知道,这就是ZooKeeper的集群监控功能。
图2.4 集群管理结构图
比如我在zookeeper服务器端有一个znode叫/Configuration,那么集群中每一个机器启动的时候都去这个节点下创建一个EPHEMERAL类型的节点,比如server1创建/Configuration&/Server1,server2创建/Configuration&/Server1,然后Server1和Server2都watch /Configuration&这个父节点,那么也就是这个父节点下数据或者子节点变化都会通知对该节点进行watch的客户端。因为EPHEMERAL类型节点有一个很重要的特性,就是客户端和服务器端连接断掉或者session过期就会使节点消失,那么在某一个机器挂掉或者断链的时候,其对应的节点就会消 失,然后集群中所有对/Configuration进行watch的客户端都会收到通知,然后取得最新列表即可。
Master选举:
Zookeeper&不仅能够维护当前的集群中机器的服务状态,而且能够选出一个"总管",让这个总管来管理集群,这就是&Zookeeper&的另一个功能&Leader Election。Zookeeper&如何实现&Leader Election,也就是选出一个Master Server。和前面的一样每台&Server&创建一个&EPHEMERAL&目录节点,不同的是它还是一个SEQUENTIAL&目录节点,所以它是个&EPHEMERAL_SEQUENTIAL&目录节点。之所以它是EPHEMERAL_SEQUENTIAL&目录节点,是因为我们可以给每台&Server&编号,我们可以选择当前是最小编号的Server&为&Master,假如这个最小编号的&Server&死去,由于是&EPHEMERAL&节点,死去的&Server&对应的节点也被删除,所以当前的节点列表中又出现一个最小编号的节点,我们就选择这个节点为当前&Master。这样就实现了动态选择&Master,避免了传统意义上单&Master&容易出现单点故障的问题。
package org.zk.leader.
import org.apache.log4j.xml.DOMC
import org.apache.zookeeper.WatchedE
import org.apache.zookeeper.W
import org.apache.zookeeper.ZooK
import java.io.IOE
* TestMainClient
* Author By: sunddenly工作室
* Created Date:
public class WatcherClient implements Watcher {
protected static ZooKeeper zk =
protected static I
int sessionTimeout = 10000;
protected S
public TestMainClient(String connectString) {
if(zk == null){
String configFile = this.getClass().getResource("/").getPath()+"org/zk/leader/election/log4j.xml";
DOMConfigurator.configure(configFile);
System.out.println("创建一个新的连接:");
zk = new ZooKeeper(connectString, sessionTimeout, this);
mutex = new Integer(-1);
} catch (IOException e) {
synchronized public void process(WatchedEvent event) {
synchronized (mutex) {
mutex.notify();
清单&3 Leader Election代码
package org.zk.leader.
import org.apache.log4j.L
import org.apache.zookeeper.CreateM
import org.apache.zookeeper.KeeperE
import org.apache.zookeeper.WatchedE
import org.apache.zookeeper.ZooD
import org.apache.zookeeper.data.S
import java.net.InetA
import java.net.UnknownHostE
* LeaderElection
* Author By: sunddenly工作室
* Created Date:
public class LeaderElection extends WatcherClient {
public static final Logger logger = Logger.getLogger(LeaderElection.class);
public LeaderElection(String connectString, String root) {
super(connectString);
this.root =
if (zk != null) {
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
logger.error(e);
} catch (InterruptedException e) {
logger.error(e);
void findLeader() throws InterruptedException, UnknownHostException, KeeperException {
byte[] leader =
leader = zk.getData(root + "/leader", true, null);
} catch (KeeperException e) {
if (e instanceof KeeperException.NoNodeException) {
logger.error(e);
if (leader != null) {
following();
String newLeader =
byte[] localhost = InetAddress.getLocalHost().getAddress();
newLeader = zk.create(root + "/leader", localhost, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (KeeperException e) {
if (e instanceof KeeperException.NodeExistsException) {
logger.error(e);
if (newLeader != null) {
leading();
mutex.wait();
public void process(WatchedEvent event) {
if (event.getPath().equals(root + "/leader") && event.getType() == Event.EventType.NodeCreated) {
System.out.println("得到通知");
super.process(event);
following();
void leading() {
System.out.println("成为领导者");
void following() {
System.out.println("成为组成员");
public static void main(String[] args) {
String connectString = "localhost:2181";
LeaderElection le = new LeaderElection(connectString, "/GroupMembers");
le.findLeader();
} catch (Exception e) {
logger.error(e);
1.6&队列管理
Zookeeper&可以处理两种类型的队列:
①&当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达,这种是同步队列。
②&队列按照&FIFO&方式进行入队和出队操作,例如实现生产者和消费者模型。
(1)&同步队列用&Zookeeper&实现的实现思路如下:
创建一个父目录&/synchronizing,每个成员都监控标志(Set Watch)位目录&/synchronizing/start&是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建&/synchronizing/member_i&的临时目录节点,然后每个成员获取&/ synchronizing&目录的所有目录节点,也就是&member_i。判断&i&的值是否已经是成员的个数,如果小于成员个数等待&/synchronizing/start&的出现,如果已经相等就创建&/synchronizing/start。
清单 4 Synchronizing 代码
package org.zk.
import java.net.InetA
import java.net.UnknownHostE
import java.util.L
import org.apache.log4j.L
import org.apache.zookeeper.CreateM
import org.apache.zookeeper.KeeperE
import org.apache.zookeeper.WatchedE
import org.apache.zookeeper.W
import org.apache.zookeeper.ZooK
import org.apache.zookeeper.ZooDefs.I
import org.apache.zookeeper.data.S
import org.zk.leader.election.TestMainC
* Synchronizing
* Author By: sunddenly工作室
* Created Date:
public class Synchronizing extends WatcherClient {
public static final Logger logger = Logger.getLogger(Synchronizing.class);
* 构造函数
* @param connectString 服务器连接
* @param root 根目录
* @param size 队列大小
Synchronizing(String connectString, String root, int size) {
super(connectString);
this.root =
this.size =
if (zk != null) {
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
} catch (KeeperException e) {
logger.error(e);
} catch (InterruptedException e) {
logger.error(e);
name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
} catch (UnknownHostException e) {
logger.error(e);
* 加入队列
* @throws KeeperException
* @throws InterruptedException
void addQueue() throws KeeperException, InterruptedException{
zk.exists(root + "/start",true);
zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
synchronized (mutex) {
List&String& list = zk.getChildren(root, false);
if (list.size() & size) {
mutex.wait();
zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
public void process(WatchedEvent event) {
if(event.getPath().equals(root + "/start") && event.getType() == Event.EventType.NodeCreated){
System.out.println("得到通知");
super.process(event);
doAction();
* 执行其他任务
private void doAction(){
System.out.println("同步队列已经得到同步,可以开始执行后面的任务了");
public static void main(String args[]) {
//启动Server
String connectString = "localhost:2181";
int size = 1;
Synchronizing b = new Synchronizing(connectString, "/synchronizing", size);
b.addQueue();
} catch (KeeperException e){
logger.error(e);
} catch (InterruptedException e){
logger.error(e);
(2)&FIFO&队列用&Zookeeper&实现思路如下:
实现的思路也非常简单,就是在特定的目录下创建&SEQUENTIAL&类型的子目录&/queue_i,这样就能保证所有成员加入队列时都是有编号的,出队列时通过&getChildren( )&方法可以返回当前所有的队列中的元素,然后消费其中最小的一个,这样就能保证&FIFO。
下面是生产者和消费者这种队列形式的示例代码
清单 5 FIFOQueue 代码
import org.apache.log4j.L
import org.apache.zookeeper.CreateM
import org.apache.zookeeper.KeeperE
import org.apache.zookeeper.WatchedE
import org.apache.zookeeper.ZooD
import org.apache.zookeeper.data.S
import java.nio.ByteB
import java.util.L
* FIFOQueue
* Author By: sunddenly工作室
* Created Date:
public class FIFOQueue extends WatcherClient{
public static final Logger logger = Logger.getLogger(FIFOQueue.class);
* Constructor
* @param connectString
* @param root
FIFOQueue(String connectString, String root) {
super(connectString);
this.root =
if (zk != null) {
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
} catch (KeeperException e) {
logger.error(e);
} catch (InterruptedException e) {
logger.error(e);
* @param i
boolean produce(int i) throws KeeperException, InterruptedException{
ByteBuffer b = ByteBuffer.allocate(4);
b.putInt(i);
value = b.array();
zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
* @throws KeeperException
* @throws InterruptedException
int consume() throws KeeperException, InterruptedException{
int retvalue = -1;
Stat stat =
while (true) {
synchronized (mutex) {
List&String& list = zk.getChildren(root, true);
if (list.size() == 0) {
mutex.wait();
Integer min = new Integer(list.get(0).substring(7));
for(String s : list){
Integer tempValue = new Integer(s.substring(7));
if(tempValue & min) min = tempV
byte[] b = zk.getData(root + "/element" + min,false, stat);
zk.delete(root + "/element" + min, 0);
ByteBuffer buffer = ByteBuffer.wrap(b);
retvalue = buffer.getInt();
public void process(WatchedEvent event) {
super.process(event);
public static void main(String args[]) {
//启动Server
TestMainServer.start();
String connectString = "localhost:"+TestMainServer.CLIENT_PORT;
FIFOQueue q = new FIFOQueue(connectString, "/app1");
Integer max = new Integer(5);
System.out.println("Producer");
for (i = 0; i & i++)
q.produce(10 + i);
} catch (KeeperException e){
logger.error(e);
} catch (InterruptedException e){
logger.error(e);
for (i = 0; i & i++) {
int r = q.consume();
System.out.println("Item: " + r);
} catch (KeeperException e){
logger.error(e);
} catch (InterruptedException e){
logger.error(e);
1.7&Zookeeper客户端Curator公用库
&&&&Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处理很多事情, 于是在它的基础上包装了一下, 提供了一套更好用的客户端框架.。
&&&&获取一个链接:
CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy)
这将会使用默认的值创建一个到ZK集群的链接,唯一需要特别指定单参数是重试机制,从例子上看,你需要使用:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
使用Curator的好处是Curator帮助我们管理客户端到ZK的链接,并且在出现网络链接的问题的时候将会执行指定的重试机制。
Curator提供的Recipes:
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) )
try { // do some work inside of the critical section here
}catch(Exception e){
}finally {
lock.release();
领导者选举
LeaderSelectorListener listener = new LeaderSelectorListener()
public void takeLeadership(CuratorFramework client) throws Exception {
//这个方法将会在当前节点处于Leader角色的时候调用
public void stateChanged(CuratorFramework client, ConnectionState newState){
//当到ZK的连接断开的时候会回调该方法
LeaderSelector selector = new LeaderSelector(client, path, listener);
selector.autoRequeue();
selector.start();
官方wiki:&
1.8.节点监听
import java.util.concurrent.ExecutorS
import java.util.concurrent.E
import javax.annotation.R
import org.apache.curator.framework.CuratorF
import org.apache.curator.framework.recipes.cache.ChildD
import org.apache.curator.framework.recipes.cache.TreeC
import org.apache.curator.framework.recipes.cache.TreeCacheE
import org.apache.curator.framework.recipes.cache.TreeCacheL
import org.slf4j.L
import org.slf4j.LoggerF
import org.springframework.stereotype.C
@Component
public class ZkWatcher{
private Logger LOG=LoggerFactory.getLogger(ZkWatcher.class);
private ZkClient zkC
* 设置节点的监听器,需要实现ZKNodeListener接口
* @param nodePath
需要监听的zk节点
* @param listener
ZKNodeListener接口
* @throws Exception
public void setNodeListener(String nodePath,final ZKNodeListener listener) throws Exception{
CuratorFramework client=zkClient.getClient();
ExecutorService executor=Executors.newCachedThreadPool();
TreeCache treeCache = new TreeCache(client,nodePath);
treeCache.getListenable().addListener(new TreeCacheListener() {
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
if(data!=null){
switch (event.getType()) {
case NODE_ADDED:
//节点添加事件
listener.onNodeAdded(data.getPath(), new String(data.getData()));
case NODE_UPDATED:
//节点更新事件
listener.onNodeUpdated(data.getPath(), new String(data.getData()));
case NODE_REMOVED:
//节点删除事件
listener.onNodeRemoved(data.getPath(), new String(data.getData()));
},executor);
//开始监听
treeCache.start();
public interface ZKNodeListener {
* zk 节点添加 回调方法
* @param path 节点路径
* @param data
public void onNodeAdded(String path,String data);
* zk 节点更新 回调方法
* @param path 节点路径
* @param data
public void onNodeUpdated(String path,String data);
* zk 节点删除 回调方法
* @param path 节点路径
* @param data
public void onNodeRemoved(String path,String data);
//zkClient ------------------------------------------------
@Component
public class ZkClient implements InitializingBean{
private Logger LOG=LoggerFactory.getLogger(ZkClient.class);
// ip:port,ip:port,ip:port
@Value("${zookeeper.connection.url}")
private String zookeeperConnectionS
private CuratorF
public void afterPropertiesSet() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
LOG.info("connected the zookeeper");
public CuratorFramework getClient(){
return this.
ZooKeeper实际应用
假设我们的集群有:
(1)&20个搜索引擎的服务器:每个负责总索引中的一部分的搜索任务。
①&搜索引擎的服务器中的15个服务器现在提供搜索服务。
②&5个服务器正在生成索引。
这20个搜索引擎的服务器,经常要让正在提供搜索服务的服务器停止提供服务开始生成索引,或生成索引的服务器已经把索引生成完成可以搜索提供服务了。
(2)&一个总服务器:负责向这20个搜索引擎的服务器发出搜索请求并合并结果集。
(3)&一个备用的总服务器:负责当总服务器宕机时替换总服务器。
(4)&一个web的cgi:向总服务器发出搜索请求。
使用Zookeeper可以保证:
(1)&总服务器:自动感知有多少提供搜索引擎的服务器,并向这些服务器发出搜索请求。
(2)&备用的总服务器:宕机时自动启用备用的总服务器。
(3)&web的cgi:能够自动地获知总服务器的网络地址变化。
(4)&实现如下:
①&提供搜索引擎的服务器都在Zookeeper中创建znode,zk.create("/search/nodes/node1",&"hostname".getBytes(),&Ids.OPEN_ACL_UNSAFE,CreateFlags.EPHEMERAL);
②总服务器可以从Zookeeper中获取一个znode的子节点的列表,zk.getChildren("/search/nodes", true);
③&总服务器遍历这些子节点,并获取子节点的数据生成提供搜索引擎的服务器列表;
④&当总服务器接收到子节点改变的事件信息,重新返回第二步;
⑤&总服务器在Zookeeper中创建节点,zk.create("/search/master", "hostname".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL);
⑥备用的总服务器监控Zookeeper中的"/search/master"节点。当这个znode的节点数据改变时,把自己启动变成总服务器,并把自己的网络地址数据放进这个节点。
⑦&web的cgi从Zookeeper中"/search/master"节点获取总服务器的网络地址数据,并向其发送搜索请求。
⑧&web的cgi监控Zookeeper中的"/search/master"节点,当这个znode的节点数据改变时,从这个节点获取总服务器的网络地址数据,并改变当前的总服务器的网络地址。
支付宝支付
微信扫码支付
打赏金额: ¥
已支付成功
打赏金额: ¥

我要回帖

更多关于 查看zookeeper路径 的文章

 

随机推荐