命令基本语法 | 功能描述 |
---|---|
help | 显示所有操作命令 |
ls path [watch] | 使用 ls 命令来查看当前znode中所包含的内容 |
ls2 path [watch] | 查看当前节点数据并能看到更新次数等数据 |
create | 普通创建 -s 含有序列 -e 临时(重启或者超时消失) |
get path [watch] | 获得节点的值 |
set | 设置节点的具体值 |
stat | 查看节点状态 |
delete | 删除节点 |
rmr | 递归删除节点 |
- 启动客户端
[zhangyong@hadoop101 zookeeper-3.4.10]$ bin/zkcli.sh
2.显示所有操作命令
[zk: localhost:2181(connected) 1] help
3.查看当前znode中所包含的内容
[zk: localhost:2181(connected) 2] ls /
[zookeeper]
4.查看当前节点详细数据
[zk: localhost:2181(connected) 3] ls2 /
[zookeeper]
czxid = 0x0
ctime = thu jan 01 08:00:00 cst 1970
mzxid = 0x0
mtime = thu jan 01 08:00:00 cst 1970
pzxid = 0x0
cversion = -1
dataversion = 0
aclversion = 0
ephemeralowner = 0x0
datalength = 0
5.分别创建2个普通节点
[zk: localhost:2181(connected) 3] create /zhangyong "hahaha"
created /zhangyong
[zk: localhost:2181(connected) 4] create /zhangyong/zhangsan "lisi"
created /zhangyong/zhangsan
6.获得节点的值
[zk: localhost:2181(connected) 5] get /zhangyong
hahaha
czxid = 0x100000003
ctime = wed aug 29 00:03:23 cst 2018
mzxid = 0x100000003
mtime = wed aug 29 00:03:23 cst 2018
pzxid = 0x100000004
cversion = 1
dataversion = 0
aclversion = 0
ephemeralowner = 0x0
datalength = 7
numchildren = 1
[zk: localhost:2181(connected) 6] get /zhangyong/zhangsan
lisi
czxid = 0x100000004
ctime = wed aug 29 00:04:35 cst 2018
mzxid = 0x100000004
mtime = wed aug 29 00:04:35 cst 2018
pzxid = 0x100000004
cversion = 0
dataversion = 0
aclversion = 0
ephemeralowner = 0x0
datalength = 6
numchildren = 0
7.创建短暂节点
[zk: localhost:2181(connected) 7] create -e /zhangyong/zhangsan "zhang"
created /zhangyong/zhangsan
(1)在当前客户端是能查看到的
[zk: localhost:2181(connected) 3] ls /zhangyong
[zhangsan, zhangsan]
(2)退出当前客户端然后再重启客户端
[zk: localhost:2181(connected) 12] quit
[zhangyong@hadoop101 zookeeper-3.4.10]$ bin/zkcli.sh
(3)再次查看根目录下短暂节点已经删除
[zk: localhost:2181(connected) 0] ls /zhangyong
[zhangyong]
8.创建带序号的节点
(1)先创建一个普通的根节点/zhangyong/weiguo
[zk: localhost:2181(connected) 1] create /zhangyong/weiguo "caoyu"
created /zhangyong/weiguo
(2)创建带序号的节点
[zk: localhost:2181(connected) 2] create -s /sanguo/weiguo/xiaoqiao "jinlian"
created /sanguo/weiguo/xiaoqiao0000000000
[zk: localhost:2181(connected) 3] create -s /sanguo/weiguo/daqiao "jinlian"
created /sanguo/weiguo/daqiao0000000001
[zk: localhost:2181(connected) 4] create -s /sanguo/weiguo/diaocan "jinlian"
created /sanguo/weiguo/diaocan0000000002
如果原来没有序号节点,序号从0开始依次递增。如果原节点下已有2个节点,则再排序时从2开始,以此类推。
9.修改节点数据值
[zk: localhost:2181(connected) 6] set /sanguo/weiguo "simayi"
10.节点的值变化监听
(1)在hadoop104主机上注册监听/sanguo节点数据变化
[zk: localhost:2181(connected) 26] [zk: localhost:2181(connected) 8] get /sanguo watch
(2)在hadoop103主机上修改/sanguo节点的数据
[zk: localhost:2181(connected) 1] set /sanguo "xisi"
(3)观察hadoop103主机收到数据变化的监听
watcher::
watchedevent state:syncconnected type:nodedatachanged path:/sanguo
11.节点的子节点变化监听(路径变化)
(1)在hadoop103主机上注册监听/sanguo节点的子节点变化
[zk: localhost:2181(connected) 1] ls /sanguo watch
[aa0000000001, server101]
(2)在hadoop102主机/sanguo节点上创建子节点
[zk: localhost:2181(connected) 2] create /sanguo/jin "simayi"
created /sanguo/jin
(3)观察hadoop103主机收到子节点变化的监听
watcher::
watchedevent state:syncconnected type:nodechildrenchanged path:/sanguo
12.删除节点
[zk: localhost:2181(connected) 4] delete /sanguo/jin
13.递归删除节点
[zk: localhost:2181(connected) 15] rmr /sanguo/shuguo
14.查看节点状态
[zk: localhost:2181(connected) 17] stat /sanguo
czxid = 0x100000003
ctime = wed aug 29 00:03:23 cst 2018
mzxid = 0x100000011
mtime = wed aug 29 00:21:23 cst 2018
pzxid = 0x100000014
cversion = 9
dataversion = 1
aclversion = 0
ephemeralowner = 0x0
datalength = 4
numchildren = 1
3.3 api应用
- idea环境搭建一个maven工程
- 添加pom文件
<dependencies>
<dependency>
<groupid>junit</groupid>
<artifactid>junit</artifactid>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupid>org.apache.logging.log4j</groupid>
<artifactid>log4j-core</artifactid>
<version>2.8.2</version>
</dependency>
<!-- zookeeper3.4.10 -->
<dependency>
<groupid>org.apache.zookeeper</groupid>
<artifactid>zookeeper</artifactid>
<version>3.4.10</version>
</dependency>
<dependency>
<groupid>junit</groupid>
<artifactid>junit</artifactid>
<version>4.12</version>
<scope>compile</scope>
</dependency>
</dependencies>
- 拷贝log4j.properties文件到项目根目录
- 需要在项目的resources目录下,创建“log4j.properties”,在文件中填入。
log4j.rootlogger=info, stdout
log4j.appender.stdout=org.apache.log4j.consoleappender
log4j.appender.stdout.layout=org.apache.log4j.patternlayout
log4j.appender.stdout.layout.conversionpattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.fileappender
log4j.appender.logfile.file=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.patternlayout
log4j.appender.logfile.layout.conversionpattern=%d %p [%c] - %m%n
- 编写java代码
/\*\*
\* @author zhangyong
\* @date 2020/3/18 0:18
\* @version 1.0
\*/
public class zkcli {
private static string connectstring = "hadoop101:2181,hadoop102:2181,hadoop103:2181";
private static int sessiontimeout = 2000;
private zookeeper zkclient = null;
/\*\*
\* 创建zookeeper客户端
\* @throws exception
\*/
public void init() throws exception {
zkclient = new zookeeper (connectstring, sessiontimeout, new watcher () {
@override
public void process(watchedevent event) {
// 收到事件通知后的回调函数(用户的业务逻辑)
system.out.println (event.gettype () + "--" + event.getpath ());
// 再次启动监听
try {
zkclient.getchildren ("/", true);
} catch (exception e) {
e.printstacktrace ();
}
}
});
}
/\*\*
\* 创建子节点
\* @throws exception
\*/
@test
public void create() throws exception {
// 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型
string nodecreated = zkclient.create ("/zhangyong", "zhangrui".getbytes (), zoodefs.ids.open_acl_unsafe, createmode.persistent);
}
/\*\*
\* 获取子节点并监听节点变化
\* @throws exception
\*/
@test
public void getchildren() throws exception {
list<string> children = zkclient.getchildren ("/", true);
for (string child : children) {
system.out.println (child);
}
// 延时阻塞
thread.sleep (long.max_value);
}
/\*\*
\* 判断znode是否存在
\* @throws exception
\*/
@test
public void exist() throws exception {
stat stat = zkclient.exists("/zhangyong", false);
system.out.println(stat == null ? "not exist" : "exist");
}
}
3.4 监听服务器节点动态上下线案例(扩展)
- 需求
- 某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
- 客户端能实时洞察到服务器上下线的变化
- 具体实现
(1)先在集群上创建/servers节点
[zk: localhost:2181(connected) 10] create /servers "servers"
created /servers
(2)服务器端向zookeeper注册代码
package com.zhangyong.zookeeper;
/\*\*
\* @author zhangyong
\* @date 2020/3/18 0:18
\* @version 1.0
\*/
public class distributeserver {
private static string connectstring = "hadoop101:2181,hadoop102:2181,hadoop103:2181";
private static int sessiontimeout = 2000;
private zookeeper zk = null;
private string parentnode = "/servers";
// 创建到zk的客户端连接
public void getconnect() throws ioexception {
zk = new zookeeper (connectstring, sessiontimeout, new watcher () {
@override
public void process(watchedevent event) {
}
});
}
// 注册服务器
public void registserver(string hostname) throws exception {
string create = zk.create (parentnode + "/server", hostname.getbytes (), ids.open_acl_unsafe, createmode.ephemeral_sequential);
system.out.println (hostname + " is online " + create);
}
// 业务功能
public void business(string hostname) throws exception {
system.out.println (hostname + " is working ...");
thread.sleep (long.max_value);
}
public static void main(string[] args) throws exception {
// 1获取zk连接
distributeserver server = new distributeserver ();
server.getconnect ();
// 2 利用zk连接注册服务器信息
server.registserver (args[0]);
// 3 启动业务功能
server.business (args[0]);
}
}
(2)客户端代码
/\*\*
\* @author zhangyong
\* @date 2020/3/18 0:18
\* @version 1.0
\*/
public class distributeclient {
private static string connectstring = "hadoop101:2181,hadoop102:2181,hadoop103:2181";
private static int sessiontimeout = 2000;
private zookeeper zk = null;
private string parentnode = "/servers";
// 创建到zk的客户端连接
public void getconnect() throws ioexception {
zk = new zookeeper(connectstring, sessiontimeout, new watcher() {
@override
public void process(watchedevent event) {
// 再次启动监听
try {
getserverlist();
} catch (exception e) {
e.printstacktrace();
}
}
});
}
// 获取服务器列表信息
public void getserverlist() throws exception {
// 1获取服务器子节点信息,并且对父节点进行监听
list<string> children = zk.getchildren(parentnode, true);
// 2存储服务器信息列表
arraylist<string> servers = new arraylist<>();
// 3遍历所有节点,获取节点中的主机名称信息
for (string child : children) {
byte[] data = zk.getdata(parentnode + "/" + child, false, null);
servers.add(new string(data));
}
// 4打印服务器列表信息
system.out.println(servers);
}
// 业务功能
public void business() throws exception{
system.out.println("client is working ...");
thread.sleep(long.max_value);
}
public static void main(string[] args) throws exception {
// 1获取zk连接
distributeclient client = new distributeclient();
client.getconnect();
// 2获取servers的子节点信息,从中获取服务器信息列表
client.getserverlist();
// 3业务进程启动
client.business();
}
}
第4章 zookeeper内部原理
4.1 节点类型
- 持久(persistent):客户端和服务器端断开连接后,创建的节点不删除
- 短暂(ephemeral):客户端和服务器端断开连接后,创建的节点自己删除
- 说明:创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护。
- 注意:在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序
(1)持久化目录节点
- 客户端与zookeeper断开连接后,该节点依旧存在
(2)持久化顺序编号目录节点
- 客户端与zookeeper断开连接后,该节点依旧存在,只是zookeeper给该节点名称进行顺序编号
(3)临时目录节点
- 客户端与zookeeper断开连接后,该节点被删除
(4)临时顺序编号目录节点
- 客户端与zookeeper断开连接后,该节点被删除,只是zookeeper给该节点名称进行顺序编号。
4.2 stat结构体
-
czxid-创建节点的事务zxid
- 每次修改zookeeper状态都会收到一个zxid形式的时间戳,也就是zookeeper事务id。
- 事务id是zookeeper中所有修改总的次序。每个修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生。
-
ctime - znode被创建的毫秒数(从1970年开始)
-
mzxid - znode最后更新的事务zxid
-
mtime - znode最后修改的毫秒数(从1970年开始)
-
pzxid-znode最后更新的子节点zxid
-
cversion - znode子节点变化号,znode子节点修改次数
-
dataversion - znode数据变化号
-
aclversion - znode访问控制列表的变化号
-
ephemeralowner- 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。
-
datalength- znode的数据长度
-
numchildren - znode子节点数量
4.3 监听器原理
-
监听原理详解:
- 首先要有一个main()线程
- 在main线程中创建zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。
- 通过connect线程将注册的监听事件发送给zookeeper。
- 在zookeeper的注册监听器列表中将注册的监听事件添加到列表中。
- zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。
- listener线程内部调用了process()方法。
-
常见的监听
- 监听节点数据的变化 get path [watch]
- 监听子节点增减的变化 ls path [watch]
4.4 paxos算法
- paxos算法一种基于消息传递且具有高度容错特性的一致性算法。
- 分布式系统中的节点通信存在两种模型:共享内存(shared memory)和消息传递(messages passing)。基于消息传递通信模型的分布式系统,不可避免的会发生以下错误:进程可能会慢、被杀死或者重启,消息可能会延迟、丢失、重复,在基础 paxos 场景中,先不考虑可能出现消息篡改即拜占庭错误的情况。paxos 算法解决的问题是在一个可能发生上述异常的分布式系统中如何就某个值达成一致,保证不论发生以上任何异常,都不会破坏决议的一致性。
- 在一个paxos系统中,首先将所有节点划分为proposers,acceptors,和learners。(注意:每个节点都可以身兼数职)。
- 一个完整的paxos算法流程分为三个阶段:
- paxos算法流程中的每条消息描述如下:
- prepare: proposer生成全局唯一且递增的proposal id (可使用时间戳加server id),向所有acceptors发送prepare请求,这里无需携带提案内容,只携带proposal id即可。
- promise: acceptors收到prepare请求后,做出“两个承诺,一个应答”。
两个承诺:
a. 不再接受proposal id小于等于(注意:这里是<= )当前请求的prepare请求。
b. 不再接受proposal id小于(注意:这里是< )当前请求的propose请求。
一个应答:
c. 不违背以前做出的承诺下,回复已经accept过的提案中proposal id最大的那个提案的value和proposal id,没有则返回空值。
- propose: proposer 收到多数acceptors的promise应答后,从应答中选择proposal id最大的提案的value,作为本次要发起的提案。如果所有应答的提案value均为空值,则可以自己随意决定提案value。然后携带当前proposal id,向所有acceptors发送propose请求。
- accept: acceptor收到propose请求后,在不违背自己之前做出的承诺下,接受并持久化当前proposal id和提案value。
- learn: proposer收到多数acceptors的accept后,决议形成,将形成的决议发送给所有learners。
- paxos算法缺陷:在网络复杂的情况下,一个应用paxos算法的分布式系统,可能很久无法收敛,甚至陷入活锁的情况。
4.5 选举机制
网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
一个人可以走的很快,但一群人才能走的更远!不论你是正从事it行业的老鸟或是对it行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
ose: proposer 收到多数acceptors的promise应答后,从应答中选择proposal id最大的提案的value,作为本次要发起的提案。如果所有应答的提案value均为空值,则可以自己随意决定提案value。然后携带当前proposal id,向所有acceptors发送propose请求。
4. accept: acceptor收到propose请求后,在不违背自己之前做出的承诺下,接受并持久化当前proposal id和提案value。
5. learn: proposer收到多数acceptors的accept后,决议形成,将形成的决议发送给所有learners。
- paxos算法缺陷:在网络复杂的情况下,一个应用paxos算法的分布式系统,可能很久无法收敛,甚至陷入活锁的情况。
4.5 选举机制
[外链图片转存中…(img-slevatcr-1714281120992)]
[外链图片转存中…(img-ncyxkoru-1714281120992)]
网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
一个人可以走的很快,但一群人才能走的更远!不论你是正从事it行业的老鸟或是对it行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
发表评论