leader和follower服务器启动期交互过程
- 完成leader选举后,每个服务器根据自己的角色创建相应服务器实例,并开始进入各自角色主流程
- leader服务器启动follower接收器learnercnxacceptor,负责接收所有非leader服务器的连接请求
- learner服务器根据投票选举结果找到当前集群中的leader服务器,与其建立连接
- leader接收来自其他机器的连接创建请求后,创建一个learnerhandler实例。每个learnerhandler实例都对应了一个leader与learner的服务器之间的连接,负责消息、数据同步
- learner向leader发起注册:将含有当前服务器sid和服务器处理的最新zxid信息的learnerinfo发送给leader服务器
- leader接收到注册信息后解析出sid和zxid,根据zxid解析出learner对应的epoch_of_learner_parse,与自己的epoch_of_leader_self进行比较,如果epoch_of_learner_parse>epoch_of_leader_self,则更新 epoch_of_leader_self=epoch_of_learner_parse+1。learnerhandler会进行等待,直到过半的learner向leader注册完毕,同时更新 epoch_of_leader 之后,leader就可以确定当前集群的epoch
- leader将最终的epoch以leaderinfo消息的形式发送给learner,同时等待learner的响应
- follower从leaderinfo消息中解析出epoch和zxid向leader返回ackepoch响应
- leader收到反馈响应ackepoch后与follower进行数据同步
- 如果过半的learner完成了数据同步,就启动leader和learner服务器实例
leader和follower启动
接上面步骤10,启动步骤如下:
- 创建并启动会话管理器
- 初始化zookeeper的请求处理链:根据服务器角色的不同生成不同的请求处理链
- 注册jmx服务
至此,集群版的zk服务器启动完毕
leader选举过程
leader选举是zookeeper中最重要的技术之一,也是保证分布式数据一致性的关键
服务器启动时期的leader选举
以3台机器组成的集群为例:server1首先启动,此时无法完成leader选举
- server2启动后,与server1进行leader选举,由于是初始化阶段,都会投票给自己,于是server1投票内容 (myid, zxid) 为 (1,0),server2投票 (2,0),各自将这个投票发送给集群中的其他所有机器
- 每个服务器接收来自其他各服务器的投票,并判断投票的有效性:检查是否是本轮投票,是否来自looking状态的服务器
- 收到其他服务器的投票后与自己的投票进行pk,pk规则有:
- 优先检查zxid,zxid较大的服务器优先作为leader
- zxid相同时比较myid,myid较大的作为leader
- 统计投票:每次投票后服务器会统计所有投票,判断是否有过半(> n/2 + 1)的机器接收到相同的投票信息来决定leader服务器 此时3台服务器已有 2台(server1 server2)达成一致,超过半数,将选举出leader - server2
- 改变服务器状态:确定了leader后服务器需要更新自己的状态,follower变更为following,leader会变更为 leading 状态
服务器运行期间的leader选举
leader服务器宕机后进入新一轮的leader选举
- 变更状态:leader宕机后剩下的非observer服务器都会将自己的状态变更为looking,开始进入leader选举流程
- 每个server发出一个投票:生成投票信息(myid, zxid)在第一轮投票中,每个服务器都会投自己,后续的判断过程与服务器启动时期的leader选举相同
leader选举算法 - fastleaderelection
zookeeper提供了3种leader选举算法:leaderelection、udp版本的fastleaderelection、tcp版本的fastleaderelection。
术语解释:
sid - 服务器id,唯一标识zookeeper集群中的机器的数字,与myid一致
zxid - 事务id,用于唯一标识一次服务器状态的变更,某一时刻,集群中的每台服务器的zxid不一定完全一致
vote - 投票
quorum - 过半机器数,quorum = n/2 + 1
zookeeper集群中服务器出现下述两种情况之一就会进入leader选举:集群初始化启动阶段;leader宕机/断网
而一台机器进入leader选举流程时,当前集群也可能会处于两种状态:
- 集群中本来就存在leader,此时试图发起选举会被告知当前服务器的leader信息,直接与leader建立连接并同步状态
- 集群中不存在leader:所有机器进入looking状态进行投票选举leader
【选举案例】集群有5台机器,sid分别为 1 2 3 4 5,zxid分别为 9 9 9 8 8,在某一时刻sid为 1 2 的机器宕机退出,集群此时开始进行leader选举
第一次投票时,由于还无法检测到集群中其他机器的状态信息,每台机器都将投自己,于是sid为 3 4 5的机器分别投票(sid,zxid) (3,9) (4,8) (5,8)
每台机器发出自己的投票后也会收到来自集群中其他机器的投票,每台机器都会对比收到的投票,决定是否替换。假设机器自己的投票是 (self_sid, self_zxid) 接收到的投票是 (vote_sid, vote_zxid),对比的规则是:
- 如果 vote_zxid > self_zxid 则认可当前投票,并再次将更新后的投票发送出去
- 如果 vote_zxid < self_zxid 则不作变更
- 如果 vote_zxid = self_zxid && vote_sid > self_sid,就认可当前接收到的投票,并改为 (vote_sid, vote_zxid) 投递出去
- 如果 vote_zxid = self_zxid && vote_sid < self_sid,则不作变更
sid为 3 4 5的机器对投票进行对比,会统一更新为投票 (3,9) ,此时quorum = 3 >= (5/2 + 1) 超过半数,选举服务器3作为leader
zxid越大的机器,数据也就越新,这样可以保证数据的恢复(更少的数据丢失),所以适合作为leader服务器
leader选举的实现细节
在quorumpeer.serverstate 类中定义了4种服务器状态
public enum serverstate {
looking, // 寻找leader状态,当前集群中没有leader,需要进入leader选举流程
following, // 当前服务器的角色是follower
leading, // 当前服务器角色是leader
observing // 当前服务器角色是 observer
}
org.apache.zookeeper.server.quorum.vote 数据结构的定义
public class vote {
private final int version;
private final long id; // 选举的leader的sid
private final long zxid;
//逻辑时钟,用于判断多个投票是否在同一轮选举周期中。该值在服务端是一个自增序列,每次进入新一轮投票后,都会对该值+1
private final long electionepoch;// 被推举的leader的epoch
private final long peerepoch;//当前服务器的状态
quorumcnxmanager 网络io
每个服务器启动时会启动一个quorumcnxmanager,负责各服务器的底层leader选举过程中的网络通信。
quorumcnxmanager内部维护了一系列按sid分组的消息队列:
recvqueue:消息接收队列,存放从其他服务器接收到的消息
queuesendmap:消息发送队列,保存待发送的消息。此map的key是sid,分别为集群中的每台机器分配了一个单独队列,从而保证各台机器之间的消息发送互不影响
senderworkermap:发送器集合,同样按sid分组,每个senderworker消息发送器对应一台远程zookeeper服务器
lastmessagesent:最近发送过的消息,为每个sid记录最近发送过的消息
选举时集群中的机器是如何建立连接的:
为了能够进行互相投票,zookeeper集群中的机器需要两两建立网络连接。
quorumcnxmanager启动时会创建一个serversocket监听leader选举的通信端口(默认3888),接收其他服务器的tcp连接请求并交给receiveconnection函数来处理。为了避免两台机器之间重复创建tcp连接,zookeeper设计一种建立tcp连接的规则:只允许sid大的服务器主动和其他服务器建立连接,否则断开连接。如果服务器收到tcp连接请求发现比自己的sid值小,会断开这个连接并主动与发起连接的远程服务器建立连接。
建立连接后就会根据外部服务器的sid创建对应的消息发送器 sendworker 和 消息接收器recvworker 并启动
fastleaderelection选举算法的核心
zookeeper对于选票的管理
- sendqueue:选票发送队列,保存待发送的选票
- recvqueue:选票接收队列,保存接收到的外部选票
- fastleaderelection.messenger.workerreceiver:选票接收器,不断从quorumcnxmanager中取出其他服务器发出的选举消息,并转成vote,保存到recvqueueu。如果接收到的外部投票选举轮次小于当前服务器(validvoter方法返回false),直接忽略改选票同时发出自己的投票。如果当前的服务器并不是looking状态(if (self.getpeerstate() == quorumpeer.serverstate.looking)),就将leader信息以投票的形式发出。 选票接收器接收到的消息如果来自observer就会忽略该消息,并将自己当前的投票发送出去
- workersender 选票发送器,会不断从sendqueue队列中获取待发送的选票,并将其传递到底层quorumcnxmanager中
fastleaderelection#lookforleader方法中揭示了选举算法的流程,该方法在服务器状态变成looking时触发
选举算法流程
- 自增选举轮次 logicalclock ++ fastleaderelection中的 atomiclong logicalclock 字段标记当前leader的选举轮次,zookeeper在开始新一轮投票时,会首先对logicalclock进行自增操作
- 初始化选票 初始化选票vote的属性:将自己推荐为leader(id=服务器自身sid,zxid=当前服务器最新zxid,electionepoch=当前服务器的选举轮次,peerepoch=被推举的服务器的选举轮次,state=looking)
- 将初始化好的选票放入sendqueue中,由workersender负责发出
- 服务器不断从 recvqueue 接收外部投票,如果服务器发现无法获取到任何投票会检查与其他服务器的连接,修复连接后重新发出
- 处理外部投票,根据选举轮次判断进行不同的处理:
- 外部投票选举轮次 > 内部轮次:立即更新自己的选举轮次logicalclock,清空所有已收到的投票,使用初始化的投票进行pk以确定是否变更内部投票,最终将内部投票发送出去
- 外部投票选举轮次 < 内部轮次:忽略外部投票,返回步骤4
- 两边一致,绝大多数场景,选举轮次一致时开始进行选票pk
- 选票pk:收到其他服务器有效的外部投票后,进行选票pk,执行fastleaderelection.totalorderpredicate方法,选票pk的目的是确定当前服务器是否需要变更投票,主要从 logicalclock、zxid、sid三个维度判断,符合下述任意一个条件就进行投票变更:
- 外部投票推举的leader服务器的 logicalclock > 内部投票的,需要进行内部投票变更
- logicalclock一致的,对比两者的zxid,外部投票zxid > 内部的,进行内部投票变更
- 两者的zxid一致就对比sid,外部的大就进行投票变更
- 变更投票:如果需要变更投票就使用外部投票的选票信息覆盖内部投票,变更完成后再将这个变更后的内部投票发出去
- 选票归档:无论是否进行了投票变更,外部投票都会存入recvset中进行归档,recvset中按照服务器对应的sid来区分{(1,vote1),(2,vote2),…}
- 统计投票:统计集群中是否已经有过半的机器认可了当前的内部投票,否则返回步骤4
- 更新服务器状态:如果此时已经确定可以终止投票,就更新服务器状态:根据过半机器认可的投票对应的服务器是否是自己确定是否成为leader,并将状态切换为leading/following/observing
上述10个步骤就是fastleaderelection的选举流程,步骤4~9会经过几轮循环,直到leader选举产生。在步骤9如果已经有过半服务器认可了当前选票,此时zookeeper并不会立即进入步骤10,而是等待一段时间(默认200ms)来确定是否有新的更优的投票。
服务器角色介绍
leader
工作内容:事务请求的唯一调度和处理者,保证集群事务处理的顺序性;集群内部各服务器的调度者;
zookeeper使用责任链模式来处理客户端请求
- preprequestprocessor是leader服务器的请求预处理器,在zk中,将创建删除节点/更新数据/创建会话等会改变服务器状态的请求称为事务请求,对于事务请求,预处理器会进行一系列预处理,如创建请求事务头、事务体、会话检查、acl检查和版本检查
- proposalrequestprocessor leader的事务投票处理器,也是leader服务器事务处理流程的发起者。
- 对于非事务请求:直接将请求流转到commitprocessor,不作其他处理
- 对于事务请求:除了交给commitprocessor,还会根据对应请求类型创建对应的proposal,并发送给所有follower服务器发起一次集群内的事务投票。proposalrequestprocessor还会将事务请求交给syncrequestprocessor进行事务日志的记录
- syncrequestprocessor 事务日志处理器,将事务请求记录到事务日志文件中,触发zookeeper进行数据快照
- ackrequestprocessor 是leader特有的处理器,负责在syncrequestprocessor处理器完成事务日志记录后向proposal的投票收集器发送ack反馈,通知投票收集器当前服务器已完成对该proposal的事务日志记录
- commitprocessor 事务提交处理器
- tobecommitprocessor 该处理类中有一个tobeapplied队列(concurrentlinkedqueue tobeapplied)存储被commitprocessor处理过的可被提交的proposal,等待finalrequestprocessor处理完提交的请求后从队列中移除
- finalrequestprocessor 进行客户端请求返回前的收尾工作:创建客户端请求的响应、将事务应用到内存数据库
learnerhandler:leader服务器会与每一个follower/observer服务器建立一个tcp长链接,同时为每个follower/observer服务器创建learnerhandler。learnerhandler是zk集群中的learner服务器的管理器,负责follower/observer服务器和leader服务器之间的网络通信:数据同步、请求转发、proposal提议的投票。
follower
follower的职责:处理客户端非事务请求,转发事务请求给leader服务器;参与事务请求proposal的投票;参与leader选举投票;
follower不需要负责事务请求的投票处理(所以不需要proposalrequestprocessor),所以其请求处理链简单一些
- followerrequestprocessor 识别出当前请求是否是事务请求,如果是事务请求,follower就会将请求转发给leader服务器,leader服务器收到请求后提交给请求处理器链,按正常事务请求进行处理
- sendackrequestprocessor follower服务器上另一个和leader服务器有差异的请求处理器,与leader服务器上的ackrequestprocessor类似,sendackrequestprocessor同样承担了事务日志记录反馈的角色,在完成事务日志记录后,会向leader服务器发送ack消息表明自身完成了事务日志的记录工作。两者的一个区别是:ackrequestprocessor在leader服务器上,因此ack反馈是一个本地操作,而sendackrequestprocessor在follower上,需要通过ack消息的形式向leader服务器进行反馈。
observer
观察zookeeper集群的最新状态并将这些状态变更同步过来,observer服务器在工作原理上与follower基本一致,对于非事务请求可以进行独立的处理,对于事务请求同样需要转发到leader服。与follower的一大区别是:observer不参与任何形式的投票,包括leader选举和事务请求proposal的投票。
集群内消息通信
zk集群各服务器间消息类型分为4类:数据同步型、服务器初始化型、请求处理型、会话管理型
数据同步消息
learner与leader进行数据同步使用的消息,分为4种(消息类型定义在leader.java中,使用常量数字标记):
- diff, 13 leader发送给learner,通知learner进行diff方式的数据同步
- trunc, 14 leader --> learner 触发learner服务器进行内存数据库的回滚操作
- snap, 15 leader --> learner 通知learner,leader即将与其进行全量数据同步
- uptodate, 12 leader --> learner 通知learner完成了数据同步,可以对外提供服务
服务器初始化型消息
整个集群或某些机器初始化时,leader与learner之间相互通信所使用的消息类型:
- observerinfo,16: observer在启动时发送消息给leader,用于向leader注册observer身份,消息中包含当前observer服务器的sid和已经处理的最新zxid
- followerinfo,11:follower启动时发送包含sid和已处理的最新zxid的注册消息到leader
- leaderinfo,17:上述两种情形下,leader服务器会返回包含最新epoch值的leaderinfo返回给observer或follower
- ackepoch,18:learner在收到leaderinfo消息时会将自己的最新zxid和epoch以ackepoch消息的形式发送给leader
- newleader,10:足够多的follower连接上leader服务器,或是leader服务器完成数据同步后,leader向learner发送的阶段性标识信息,包含当前最新zxid
请求处理型
请求处理过程中leader和learner之间互相通信所使用的消息:
- request,1:learner收到事务请求时需要将请求转发给leader,该请求使用request消息的形式进行转发
- proposal,2:在处理事务请求时,leader服务器会将事务请求以proposal消息的形式创建投票发送给集群中的所有的follower进行事务日志的记录
- ack,3:follower完成事务日志的记录后会以ack消息的形式反馈给leader
- commit,4:leader通知集群中的所有follower,可以进行事务请求的提交了,leader在收到过半follower发来的ack消息后,进入事务请求的最终提交流程——生成commit消息,告知所有follower进行事务请求的提交,这是一个2pc的过程
- inform,8:leader发起事务投票并通知提交事务,只需要proposal和commit消息给follower就可以了,而observer不参与事务投票,无法接收commit消息,但需要知道事务提交的内容,所以zk设计了inform消息发给observer,消息中会携带事务请求的内容
- sync,7:leader通知learner服务器已完成sync操作
会话管理型
zk服务器在进行会话管理过程中,与learner服务器之间通信所使用的消息:
- ping,5:zk客户端随机选择一个服务器进行连接,所以leader服务器无法直接收到所有客户端的心跳检测,所以需要委托learner维护所有客户端的心跳检测。leader定时向learner发送ping消息就是要求learner将一段时间内保持心跳检测的客户端列表同样以ping消息的形式返回给leader,这样leader就能获取到全部客户端的活跃状态并进行会话激活了。
- revalidate,6:客户端发生重连后(可能切换了服务器)新连接的服务器需要向leader发送revalidate消息以确定客户端会话是否已经超时。
客户端请求的处理
会话创建请求
zk服务端对于会话创建的处理,可以分为请求接收、会话创建、预处理、事务处理、事务应用和会话响应。
zookeeper源码分析(3)— 一次会话的创建过程 - 简书— 一次会话的创建过程 - 简书")
请求接收
- io层接收来自客户端的请求,nioservercnxn实例维护每一个客户端连接,负责客户端与服务端通信,并将请求内容从底层网络io中读取出来
- 判断是否是客户端“会话创建”请求:检查当前请求对应的nioservercnxn实体是否已经初始化,未初始化时第一个请求必定是会话创建请求
- 反序列化connectrequest请求,确定是会话创建请求后就可以反序列化得到一个connectrequest请求实体
- 判断是否是readonly客户端,如果zk服务器是以readonly模式启动,所有来自非readonly型客户端的请求将无法处理。所以服务端需要从connectrequest中检查是否是readonly客户端,以此来决定是否接受此“会话创建”请求
- 检查客户端zxid:出现客户端zxid比服务端还大这种反常情形时,服务端不接受此会话创建请求
- 协商sessiontimeout:客户端有自己设置的sessiontimeout值,传到服务端后,服务端要根据自身配置进行检查限定,通常的规则是 2 * tickettime ~ 20 * tickertime 之间
- 判断是否需要重新创建会话:解析客户端传入的sessionid进行判断
会话创建
- 为客户端生成sessionid:每个zk服务器启动时都会初始化一个会话管理器sessiontracker,同时初始化一个基准sessionid,这个基准sessionid的生成需要保证后续客户端在此基础上不断+1能够全局唯一。sessionid生成算法见客户端介绍:会话session > sesssionid的生成原理。
- 注册会话:将会话信息保存到sessiontracker的本地字段中:concurrenthashmap<long, sessionimpl> sessionsbyid、concurrentmap<long, integer> sessionswithtimeout
- 会话激活:服务端根据配置的tickettime和会话超时时间比对计算下一次会话超时时间(使用了分桶策略)sessionswithtimeout
- 生成会话密码:随机数,生成代码见 zookeeperserver#generatepasswd
预处理
- preprequestprocessor处理请求(责任链模式)
- 创建请求事务头:对于事务请求,zk会为其创建请求事务头,后续请求处理器都是基于该请求头标识当前请求是否是事务请求,请求事务头包含:clientid(唯一标识请求所属客户端)cxid(客户端操作序列号)zxid(事务请求对应的zxid)time(服务端开始处理事务请求的时间)type(事务请求的类型:zoodefs.opcode.create、delete、setdata和createsession等)
- 创建请求事务体createsessiontxn
- 注册与激活会话:额外处理非leader转发的会话创建请求
事务处理
- proposalrequestprocessor处理请求:preprequestprocessor将请求交给下一级处理器,提案proposal是zk中对因事务请求展开的投票流程中的事务操作的包装,该处理器就是处理提案的,处理流程有:
- sync流程:syncrequestprocessor处理器记录事务日志。完成事务日志记录后,每个follower都会向leader发送ack消息,表明自身完成了事务日志的记录,以便leader服务器统计每个事务请求的投票情况
- proposal流程:zk的实现中,每个事务请求都需要集群中过半机器投票认可才能真正应用到zk的内存数据库中,这个投票与统计的过程就叫 proposal流程:
- 发起投票:对于事务请求,leader服务器会发起一轮事务投票,发起事务投票之前会检查服务端zxid是否可用,如果不可用会抛出xidrolloverexception
- 生成提议proposal:如果服务端zxid可用,就可以开始事务投票了,zk会将之前创建的请求头和事务体,以及zxid和请求本身序列化到proposal对象中
- 广播提议:leader服务器会以zxid作为key,将提议放入投票箱concurrentmap<long, proposal> outstandingproposals中,同时将该提议广播给所有follower服务器
- 收集投票:follower服务器接收到leader发来的提议后,会进入sync流程进行事务日志的记录,执行完后发送ack消息给leader,leader根据ack消息统计proposal的投票情况。当过半机器通过时,就进入proposal的commit阶段
- commit proposal前将请求放入 tobeapplied 队列中
- 广播commit消息:leader会向observer广播包含proposal内容的inform消息,而对于follower服务器则需只发送zxid(上文有介绍)
- commit流程:
- 将请求交给commitprocessor.java处理器,放入 linkedblockingqueue queuedrequests 中,独立线程会取出处理
- 标记toppending:如果是事务请求(write类型),就会将toppending标记为当前请求,用于确保事务请求的顺序性,便于commitprocessor检测当前集群中是否正在进行事务请求的投票
- 等待proposal投票:commit流程处理时,leader根据当前事务请求生成proposal广播给所有follower,此时commit流程需要等待
- 投票通过,提议获得过半机器认可,zk会将请求放入committedrequests队列中,同时唤醒commit流程
- 提交请求:将请求放入toprocess队列中,交给finalrequestprocessor处理
事务应用
- finalrequestprocessor检查outstandingchanges队列中请求的有效性,如果队列中的请求落后于当前正在处理的请求,则从队列中移除
- 之前的请求处理逻辑中仅仅是将事务请求记录到了事务日志中,内存数据库中的状态尚未变更。因此需要将事务变更应用到内存数据库中。对于会话创建这种“事务请求”,只需向sessiontracker进行会话注册
- 完成内容应用后将事务请求放入队列 commitproposal,这个队列保存最近被提交的事务请求,以便集群间机器进行数据的快速同步
会话响应
此时客户端请求在zk服务端已经完成了所有链路的处理
- 统计处理:计算请求在服务器端处理所花费的时间,统计客户端连接的一些基本信息:lastzxid-最新的zxid;lastop-最后一次和服务端的操作;lastlatency-最后一次请求处理所花费的时间;
- 创建响应connectresponse 会话创建成功后的响应,包含:当前客户端与服务端之间的通信协议版本号;会话超时时间;sessionid;会话密码;
- 序列化 connectresponse
- io层发送响应给客户端
setdata请求的处理
预处理
- io层接收来自客户端的请求
- 判断是否是客户端会话创建类请求,setdata请求到来时已完成了会话创建
- preprequestprocessor处理器进行处理
- 创建请求事务头
- 检查会话是否超时,超时向客户端抛出sessionexpiredexception
- 反序列化请求,创建changerecord记录。zk对于客户端请求,反序列化生成特定setdatarequest请求,请求中包含数据节点path、更新数据内容data、期望的数据节点版本version。
- acl权限检查,没有权限会抛出noauthexception
- 数据版本检查(乐观锁)
- 创建事务请求体setdatatxn
- 保存事务操作到outstandingchanges队列中
事务处理:无论对于会话创建还是setdata请求,事务处理流程都是一致的:由proposalrequestprocessor处理器发起,通过sync、proposal、commit 3个子流程相互协作完成
事务应用
- finalrequestprocessor处理
- zk将请求事务头和事务体交给内存数据库 zkdatabase进行事务应用,返回processtxnresult,包含了数据节点内容更新后的stat
- 将事务请求放入 commitproposal 队列
请求响应
- 统计处理
- 创建响应体setdataresponse,包含当前数据节点的最新状态stat
- 创建响应头,方便客户端对响应进行解析,包括当前响应对应的事务zxid和请求处理是否成功的标识err
- 序列化响应
- io层发送响应给客户端
事务请求转发
所有非leader服务器如果收到了来自客户端的事务请求,必须将其转发给leader服务器处理。follower或observer服务器中,第一个请求处理器分别是followerrequestprocessor和observerrequestprocessor,都会检查当前请求是否是事务请求,对于事务请求会议request消息的形式转发给leader,leader解析出原始请求后提交到自己的请求处理链中。
getdata请求
预处理:
io层接收来自客户端的请求;判断是否是客户端会话创建请求;preprequestprocessor处理;会话检查;
由于getdata请求是非事务请求,因此不需要事务预处理逻辑:创建请求事务头、changerecord、事务体、数据节点版本的检查;
非事务处理:
反序列化getdatareqeuest(得到path和watcher注册情况);获取数据节点(zk从内存数据库中获取节点及acl信息);acl检查;获取数据内容和stat,注册watcher;
请求响应:
创建响应体getdataresponse,获取数据成功后的响应,包含当前数据节点内容和状态stat;创建响应头;统计处理;序列化响应;io层发送响应给客户端;
zk底层数据与存储
内存数据库存储了zk树结构的数据,包括节点路径、节点数据和acl信息,zk会定时将这些数据存储到磁盘上
datatree类
org.apache.zookeeper.server.datatree维护了树形结构数据,内部没有任何网络或客户端连接逻辑,所以可以单独进行调试。
datanode是数据存储的最小单元,内含数据节点内容 byte[] data、acl列表对应的map key long acl、节点状态对象statpersisted stat、子节点列表set children、父节点parent的引用?
datatree维护了两个数据结构:path与datanode组成的concurrenthashmap<string, datanode> nodes、datanode树;
另外为方便及时访问和清理临时节点,额外维护字段 map<long, hashset> ephemerals = new concurrenthashmap<long, hashset>()
zkdatabase
zookeeper的内存数据库,管理zk的所有会话、datatree存储和事务日志。zkdatabase会定时向磁盘dump快照数据,zk服务器启动时会通过磁盘上的事务日志和快照文件进行恢复
事务日志
zoo.cfg配置文件中的datadir目录默认用于存储事务日志文件,datalogdir可以配置为事务日志单独分配一个文件存储目录。
如果配置了 datalogdir = /home/admin/zkdata/zk_log,zk运行时会在该目录下创建 version-2 子目录,该目录的命名跟随zk的事务日志版本号。这样运行一段时间后在 /home/admin/zkdata/zk_log/version-2 出现了日志文件
-rw-rw-r-- 1 admin admin 67108880 02-23 16:10 log.2c01631713
-rw-rw-r-- 1 admin admin 67108880 02-23 17:07 log.2c0164334d
-rw-rw-r-- 1 admin admin 67108880 02-23 18:19 log.2d01654af8
-rw-rw-r-- 1 admin admin 67108880 02-23 19:28 log.2d0166a224
- 这些日志文件大小固定都是64mb
- 文件后缀名是十六进制数字,即zxid - 64位数字,前32位表示ecpoch,这些日志文件只有两个epoch:2c、2d;后32位是操作序列号。每个日志文件达到设定的大小后,创建新日志文件并以当时的zxid为文件名,这样方便根据日志文件后缀名寻找zxid的存储位置
- 事务日志文件内容的解析命令
java logformatter log.2d0166a224
事务日志文件头信息,输出事务日志的dbid和日志版本号
zookeeper transactional log file with dbid 0 txnlog format version 2
客户端会话创建的事务操作日志,分别记录了 事务操作时间、客户端会话id、cxid客户端操作序列号、zxid、操作类型和会话超时时间
…11:07:41 session 0x1446994y6273434 cxid 0x0 zxid 0x300000002 createsession 30000
节点创建的事务操作日志,记录了操作类型、节点路径、节点数据内容、节点acl信息、是否是临时节点(f表示永久节点,t表示临时节点)、节点版本号
…11:08:40 session 0x1446995520000 cxid 0x2 zxid 0x3000003 create '/test_log,#7631,v{s{31,s{'world,'anyone}}},f,2
- 事务日志文件不会记录读操作
filetxnlog类
在其public synchronized boolean append(txnheader hdr, record txn, txndigest digest)方法中进行了日志的写入,会传递事务头txnheader和事务体record,日志写入步骤:
- 判断filetxnlog是否已关联上一个可写的事务日志文件,如果没有关联上,就会用与事务操作关联的zxid作为后缀创建一个事务日志文件,同时构建事务日志文件头信息(包含魔数magic、事务日志格式版本version、dbid)并写入日志文件。改日志文件的文件流会放入 queue streamstoflush = new arraydeque<>() 中
- 对日志文件进行磁盘空间预分配(日志文件不断追加写入会触发底层磁盘空白块的seek,为提高io效率,所以预分配磁盘空间),通常一个日志文件预分配64mb,已分配空间不足4kb时会再次分配,预分配的文件使用0填充,预分配的文件大小使用 zookeeper.preallocsize 进行设置
- 事务序列化,包括事务头txnheader和事务体record的序列化,事务体又分为会话创建事务createsessiontxn、节点创建事务createtxn、节点删除事务deletetxn、节点数据更新事务setdatatxn
- 为保证事务日志文件的完整性和准确性,zk在将事务日志写入文件前,会根据序列化步骤产生的字节数组计算checksum,zk默认使用adler32算法计算checksum
- 将序列化后的事务头、事务体和checksum写入文件流,由于zk使用的是bufferedoutputstream,写入的数据并没有立刻到达磁盘
- 事务日志刷入磁盘:将缓冲数据输入磁盘,从streamtoflush 中获取文件流,调filechannel.force(bool metadata) 进行磁盘文件的强制写入,该方法基于底层的fsync接口,通过 zookeeper.forcesync 来设置
trunc日志截断
在运行过程中如果出现非leader服务器的zxid(peerlastzxid)大于leader服务器的,此时leader服务器会发出trunc给这台服务器,清除掉所有包含或大于peerlastzxid的事务日志,保持与leader服务器的同步
sanpshot数据快照
记录zk服务器上某一时刻全量内存数据内容,写入到指定磁盘文件中,同样位于指定的datadir目录下。与事务日志文件不同的是,快照文件没有采用预分配的方式,所以快照文件的大小就可以反映当前zk服务器内存中的全量数据大小。
快照数据的解析可使用命令 java snapshotformatter snapshot.30000000007
快照文件记录的是数据节点的元信息,不包含节点的数据内容
filesnap类
zk定期将内存数据库全量dump到本地文件,这个文件就是snapshot,通过snapcount配置在事务日志记录多少次后进行快照写入。数据快照的流程是
- 理论上在snapcount后进行数据快照,但考虑到数据快照对zk服务器性能的影响,需要避免所有机器在同一时刻进行进行snapshot。zk采用了过半随机策略,满足下述条件就进行数据快照:logcount > (snapshot / 2 + randroll) ,logcount表示当前已经记录的事务日志数量,randroll为 1 ~ snapcount/2 之间的随机数,所以上述条件相当于:如果配置snapcount=1000,则zk会在500~1000次事务日志后进行一次快照
- 切换事务日志文件:当前事务日志文件已经写入了snapcount条事务日志,需要创建一个新的事务日志文件
- 创建数据快照异步线程,不影响zookeeper主流程
- 从zkdatabase获取全量数据datatree和会话信息
- 根据当前已提交的最大zxid确定快照文件名称
- 先序列化文件头信息(包含魔数、版本号和dbid信息)对会话信息和datatree分别进行序列化,生成checksum写入快照文件
服务器初始化流程
初始化流程 zookeeper–数据初始化过程-蒲公英云 主要包括从快照文件中加载数据和从事务日志文件中进行数据订正两个过程
- 初始化filetxnsnaplog:filetxnsnaplog是zookeeper事务日志和快照数据访问层,用于衔接上层业务与底层数据存储,底层数据又分为snapshot和txn两部分,所以filetxnsnaplog内部又分为filetxnlog和filesnap的初始化
- 初始化zkdatabase:datatree是zookeeper内存数据的核心模型,在每个zookeeper服务器内部是单例。zkdatabase初始化时也会初始化datatree:创建一些zk的默认节点,如/ /zookeeper /zookeeper/quota。初始化阶段还会创建一个会话超时时间记录器 sessionswithtimeouts
- 创建playbacklistener监听器,接收事务使用过程中的回调。在zk数据恢复后期事务订正过程中会回调这个监听器进行数据校正
- 完成内存数据库的初始化后从磁盘的快照文件中恢复数据
- 获取至多100个最近的快照文件
- 反序列化这些二进制文件生成datatree对象和sessionwithtimeouts集合,同时进行文件的checksum校验。当最新的快照文件不可用时才会进行逐个文件解析,直到这100个文件全部解析完,否则服务启动失败
- 获取最新的zxid,从加载解析成功的快照文件的文件名中可以得到一个最新的zxid
- 完成快照文件加载后,此时zk服务器内存中已经有一份近似全量的数据,就可以通过事务日志更新增量数据了
- 从事务日志中获取比快照文件最大zxid zxid_for_snap 大的事务操作
- 将这些事务操作逐个应用到之前基于快照数据文件恢复出来的datatree 和 sessionswithtimeouts 中。当一个事务被应用到内存数据库中后,zk会回调playbacklistener监听器,将这一事务操作记录转换成proposal,保存到zkdatabase.committedlog中,以便follower进行快速同步
- 所有事务被完整应用后,此时就能获取到上次服务器正常运行时提交的最大事务id
- 校验当前leader周期 epoch 纪元号:上一步骤得到的zxid中解析到epochofzxid,同时与磁盘上的currentepoch、acceptedepoch文件中的epoch值进行校验
leader应用完事务日志后的数据同步
playbacklistener是一个事务应用监听器,leader在事务应用后会回调该listener的 ontxnloaded 方法
public interface playbacklistener {
void ontxnloaded(txnheader hdr, record rec, txndigest digest);
}
playbacklistener会将这些事务通过 zkdatabase#addcommittedproposal方法转存到 zkdatabase.committedlog 中,便于集群间同步。
事务同步的过程就是leader向learner同步事务的过程,learner向leader注册的最后阶段会发送一个ackepoch数据包,leader会从该数据包中解析出该learner的 currentepoch 和 lastzxid。
数据同步初始化
数据同步代码位于learnerhandler和learner两个类中。leader依据内存数据库zkdatabase中的committedlog进行初始化,生成3个zxid值:
- peerlastzxid:learner服务器最后处理的zxid
- mincommittedlog:leader服务器提案缓存队列committedlog中的最小zxid
- maxcommittedlog:leader服务器提案缓存队列committedlog中的最大zxid
zk集群数据同步分为4类:
- diff 差异化同步(触发条件 mincommittedlog <= peerlastzxid <= maxcommittedlog)leader向learner发送diff指令通知leader进入差异化同步状态,针对每个 zxid > peerlastzxid 的提案,leader会发出 proposal内容 和 commit指令 两个数据包。与通常的leader和follower之间的事务请求的提交过程一致。 leader在发送完差异数据后,会将learner加入到 forwardingfollowers 或 observinglearners 队列中,随后向learner发出newleader指令,通知其差异化同步完毕。 此后learner反馈一个ack消息给leader,leader接收到过半learner的ack消息后,会向所有已完成数据同步的learner发送uptodate指令,通知learner已经完成了数据同步。此时集群中过半机器完成了数据同步,具备了对外提供服务的能力。 learner在接收到这个来自leader的uptodate指令后,会终止数据同步流程,向leader再次反馈一个ack消息。
@startuml
autonumber
leader <- learner : followerinfo/observerinfo
leader -> learner : leaderinfo
leader <- learner : ackepoch
note right
1-3 注册
end note
leader -> learner : diff
leader -> learner : proposal
leader -> learner : commit
note right
4-9 差异化同步
end note
leader -> learner : proposal
leader -> learner : commit
leader -> learner : newleader
leader <- learner : ack
leader -> learner : uptodate 等待过半
leader <- learner : ack
note right
10-12 同步确认
end note
- trunc+diff 先回滚再差异化同步。适用场景:a b c三台机器,某一时刻b是leader,当前的leader_epoch=5,被过半数机器提交的zxid包括 0x500000001、0x500000002,b服务器在处理完0x500000003事务时将该事务写入本地事务日志。但在b向其他follower发送proposal进行投票时,b服务器宕机,proposal没有被同步出去。在新一轮选举后新leader a服务器将leader_epoch变更为6,之后ac两台服务器对外提供服务,又提交了0x600000001、0x600000002两个事务,此时服务器b恢复加入集群。此时peerlastzxid、mincommittedlog、maxcommittedlog的值分别是 0x500000003、0x500000001、0x600000002,满足 mincommittedlog <= peerlastzxid <= maxcommittedlog。 当leader服务器发现某个learner包含了一条自己没有的事务记录,就需要让该learner进行事务回滚,回滚到leader上存在的且是最接近peerlastzxid的zxid(上述例子中就是 0x500000002),然后再进行差异化同步
- trunc 仅回滚同步
- snap 全量同步。适用场景: peerlastzxid < mincommittedlog 或 leader服务器上没有committedlog,peerlastzxid != lastprocessedzxid leader服务器上数据恢复后得到的最大zxid。这两种场景下,leader都无法使用committedlog与learner进行数据同步,因此只能进行全量同步
zookeeper技术内幕总结
zookeeper以树作为其内存数据模型,树上的每一个节点是最小的数据单元,即znode。znode有一个递增的版本号,以此可以实现分布式数据的原子性更新。
zookeeper序列化层使用从hadoop中遗留下来的jute组件,该组件不是性能最好的序列化框架,但在zookeeper中够用。
zookeeper的客户端和服务端之间会建立起tcp长链接进行网络通信,基于该tcp连接衍生出的会话概念,是客户端和服务端之间所有请求与响应交互的基础。在会话的生命周期中,会出现连接断开、重连或是会话失效等一系列问题。leader服务器会负责管理每个会话的生命周期,包括会话的创建、心跳检测和销毁等。
服务器启动阶段会进行磁盘数据的恢复,恢复完成后会进行leader选举,一旦选举产生leader后,就立即开始进行集群间的数据同步——在整个过程中,zookeeper都处于不可用状态,直到数据同步完毕(集群中绝大部分机器数据和leader一致),zookeeper才可以对外提供服务。在运行期间,如果leader服务器所在机器宕机或是和集群中绝大部分机器断开连接,就会触发新一轮leader选举。
一个正常运行的zookeeper集群,其机器角色通常由leader、follower和observer组成。zookeeper对于客户端请求的处理严格按照zab协议规范进行。每个服务器在启动初始化阶段都会组装一个请求处理链,leader服务器能够处理所有类型的客户端请求,而follower或是observer服务器只能处理非事务请求。对于每个事务请求,leader都会为其分配一个全局唯一且递增的zxid,以此保证事务处理的顺序性。在事务请求的处理过程中,leader和follower都会进行事务日志的记录。zookeeper通过jdk的file接口简单实现了自己的数据库存储系统,底层数据存储包括事务日志和快照数据两部分,这些都是zookeeper实现数据一致性非常关键的部分。
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、oppo等大厂,18年进入阿里一直到现在。
深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加vx:vip204888 (备注大数据获取)
己的数据库存储系统,底层数据存储包括事务日志和快照数据两部分,这些都是zookeeper实现数据一致性非常关键的部分。
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、oppo等大厂,18年进入阿里一直到现在。
深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
[外链图片转存中…(img-i4liavi6-1712540612539)]
[外链图片转存中…(img-xhmazeyl-1712540612539)]
[外链图片转存中…(img-ta488tta-1712540612539)]
[外链图片转存中…(img-zzxcdpzu-1712540612540)]
[外链图片转存中…(img-nql3xr5e-1712540612540)]
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加vx:vip204888 (备注大数据获取)
[外链图片转存中…(img-f74u9a9u-1712540612540)]
发表评论