一、zookeeper概述
1.1 zookeeper的定义
zookeeper是一个开源的分布式协调服务,主要用于分布式应用程序中的协调管理。它由apache软件基金会维护,是hadoop生态系统中的重要成员。zookeeper提供了一个高效且可靠的分布式锁服务,以及群集管理功能,在分布式系统中起到了“守护神”的作用。
1.2 zookeeper的核心理念
zookeeper基于以下关键概念构建:
-
数据模型:zookeeper的数据模型是一个层次结构,这个层次类似于一个文件系统,与liunx的文件系统类似,整体可以看作为一棵树。它由节点组成,节点也成为znode,每个节点可以有子节点。节点可以存储数据,但数据尺寸有限默认存储为1mb的数据。
-
节点(znode):zookeeper中的每个数据单元称为znode。znode有两种类型:持久(persistent)和临时(ephemeral)。持久节点在客户端断开连接后仍存在,而临时节点在客户端断开连接后会被自动删除。
-
观察者(watcher):客户端可以在znode上设置观察者,当znode的数据或子节点发生变化时,watcher会通知对应的客户端。
-
有序性(orderliness):zookeeper通过全局顺序来确保所有操作的顺序一致。
-
数据一致性 :每个server保存一份相同的数据拷贝,客户端无论请求到被集群中哪个server处理,得到的数据都是一致的。
-
集群服务:在zookeeper集群服务由一个领导者(leader),多个跟随者(follower)组成的集群。领导者(leader)负责进行投票的发起和决议,更新集群服务状态。跟随者用于接收客户请求并向客户端返回结果,在选举leader过程中参与投票。集群中只要有半数以上节点存活,zookeeper集群就能正常服务。
二、zookeeper的应用场景
2.1 分布式锁服务
zookeeper能够非常有效地实现分布式锁。这在需要同步或并发控制的分布式系统中尤为重要。通过利用zookeeper的临时znode特性,可以实现锁的自动释放,防止死锁。
2.2 统一配置管理
在分布式系统中,应用程序的配置管理成为一个复杂的问题。zookeeper提供了一种集中式管理配置的方式,所有的配置文件可以存储在zookeeper中,并且可以动态更新。当配置变化时,zookeeper可以通知到所有客户端,从而使应用程序能够立即响应变化。
2.3 命名服务
zookeeper可以作为分布式系统的命名服务,通过维护名称和元数据的映射关系,提供高效的名称解析能力。
2.4 集群管理
zookeeper能够监控集群中各个节点的状态,决定节点是否健康,而节点的加入和离开能够动态调整。
三、zookeeper的安全管理操作方法
3.1 基本安全措施
-
验证(authentication):zookeeper支持基于客户端和服务器之间的认证机制。通过设置用户和密码,可以限制对znode的访问。
3.1.1认证方式
- world:默认方式,开放的权限,意解为全世界都能随意访问。
- auth:已经授权且认证通过的用户才可以访问。
- digest:用户名:密码方式认证,实际业务开发中最常用的方式。
- ip白名单:授权指定的ip地址,和指定的权限点,控制访问。
-
acl(access control lists):通过设定acl,能够控制znode的读写权限。acl规则可以根据不同的需求设定,如只读、完全控制等。
3.1.2 acl授权流程
- 添加认证用户
addauth digest 用户名:密码
- 设置权限
setacl /path auth:用户名:密码:权限
- 查看acl设置
getacl /path
完整的操作如下代码
-- 添加授权用户
[zk: localhost:2181] addauth digest user1:123456
-- 创建节点
[zk: localhost:2181] create /testnode testnode
-- 节点授权
[zk: localhost:2181] setacl /testnode auth:user1:123456:cdrwa
-- 查看授权
[zk: localhost:2181] getacl /testnode
3.2 数据加密
在zookeeper的配置文件中,可以启用数据传输加密(例如ssl/tls)来保证数据在网络传输中的安全性。
3.3 安全配置示例
# zookeeper configuration
authprovider.1=org.apache.zookeeper.server.auth.saslauthenticationprovider
requireclientauthscheme=sasl
digest.authenticationhandler.sasl.clientallowedprotocols=gssapi:cram-md5
四、zookeeper与spring boot 2的整合
4.1 引入依赖
在spring boot 2项目中,首先需要引入curator依赖,这是用于简化zookeeper操作的一个高层次api。curator框架在zookeeper原生api接口上进行二次包装。提供zookeeper各种应用场景:比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等api封装。
<dependency>
<groupid>org.apache.curator</groupid>
<artifactid>curator-framework</artifactid>
<version>2.12.0</version>
</dependency>
<dependency>
<groupid>org.apache.curator</groupid>
<artifactid>curator-recipes</artifactid>
<version>2.12.0</version>
</dependency>
<dependency>
<groupid>org.apache.curator</groupid>
<artifactid>curator-client</artifactid>
<version>2.12.0</version>
</dependency>
4.2 springboot项目yml配置
在application.properties
中,配置zookeeper的连接信息:
zoo:
keeper:
#开启标志
enabled: true
#服务器地址
server: 127.0.0.1:2181
#命名空间,被称为znode
namespace: testnode
#权限控制,加密
digest: user1:123456
#会话超时时间
sessiontimeoutms: 3000
#连接超时时间
connectiontimeoutms: 60000
#最大重试次数
maxretries: 2
#初始休眠时间
basesleeptimems: 1000
4.3 编写配置类
编写zookeeper配置类,用于初始化zookeeper客户端:
@configuration
public class zookeeperconfig {
private static final logger logger = loggerfactory.getlogger(zookeeperconfig.class) ;
//注入zookeeper配置文件类,用于获取yml的配置项值
@autowired
private zookeeperparam zookeeperparam ;
private static curatorframework client = null ;
/**
* 初始化
*/
@postconstruct
public void init (){
//重试策略,初试时间1秒,重试10次
retrypolicy policy = new exponentialbackoffretry(
zookeeperparam.getbasesleeptimems(),
zookeeperparam.getmaxretries());
//通过工厂创建curator
client = curatorframeworkfactory.builder()
.connectstring(zookeeperparam.getserver()) //链接的服务的地址
.authorization("digest",zookeeperparam.getdigest().getbytes()) //认证方式
.connectiontimeoutms(zookeeperparam.getconnectiontimeoutms())
.sessiontimeoutms(zookeeperparam.getsessiontimeoutms())
.retrypolicy(policy).build();
//开启连接
client.start();
logger.info("zookeeper 初始化完成...");
}
public static curatorframework getclient (){
return client ;
}
public static void closeclient (){
if (client != null){
client.close();
}
}
}
4.4 示例代码
示例代码展示如何在spring boot 2项目中使用zookeeper:
zookeeper接口类
public interface zookeeperservice {
/**
* 判断节点是否存在
*/
boolean isexistnode (final string path) ;
/**
* 创建节点
*/
void createnode (createmode mode,string path ) ;
/**
* 设置节点数据
*/
void setnodedata (string path, string nodedata) ;
/**
* 创建节点
*/
void createnodeanddata (createmode mode, string path , string nodedata) ;
/**
* 获取节点数据
*/
string getnodedata (string path) ;
/**
* 获取节点下数据
*/
list<string> getnodechild (string path) ;
/**
* 是否递归删除节点
*/
void deletenode (string path,boolean recursive) ;
/**
* 获取读写锁
*/
interprocessreadwritelock getreadwritelock (string path) ;
}
zookeeper接口实现类impl
@service
public class zookeeperserviceimpl implements zookeeperservice {
private static final logger logger = loggerfactory.getlogger(zookeeperserviceimpl.class);
@override
public boolean isexistnode(string path) {
curatorframework client = zookeeperconfig.getclient();
client.sync() ;
try {
stat stat = client.checkexists().forpath(path);
return client.checkexists().forpath(path) != null;
} catch (exception e) {
logger.error("isexistnode error...", e);
e.printstacktrace();
}
return false;
}
@override
public void createnode(createmode mode, string path) {
curatorframework client = zookeeperconfig.getclient() ;
try {
// 递归创建所需父节点
client.create().creatingparentsifneeded().withmode(mode).forpath(path);
} catch (exception e) {
logger.error("createnode error...", e);
e.printstacktrace();
}
}
@override
public void setnodedata(string path, string nodedata) {
curatorframework client = zookeeperconfig.getclient() ;
try {
// 设置节点数据
client.setdata().forpath(path, nodedata.getbytes("utf-8"));
} catch (exception e) {
logger.error("setnodedata error...", e);
e.printstacktrace();
}
}
@override
public void createnodeanddata(createmode mode, string path, string nodedata) {
curatorframework client = zookeeperconfig.getclient() ;
try {
// 创建节点,关联数据
client.create().creatingparentsifneeded().withmode(mode)
.forpath(path,nodedata.getbytes("utf-8"));
} catch (exception e) {
logger.error("createnode error...", e);
e.printstacktrace();
}
}
@override
public string getnodedata(string path) {
curatorframework client = zookeeperconfig.getclient() ;
try {
// 数据读取和转换
byte[] databyte = client.getdata().forpath(path) ;
string data = new string(databyte,"utf-8") ;
if (stringutils.isnotempty(data)){
return data ;
}
}catch (exception e) {
logger.error("getnodedata error...", e);
e.printstacktrace();
}
return null;
}
@override
public list<string> getnodechild(string path) {
curatorframework client = zookeeperconfig.getclient() ;
list<string> nodechilddatalist = new arraylist<>();
try {
// 节点下数据集
nodechilddatalist = client.getchildren().forpath(path);
} catch (exception e) {
logger.error("getnodechild error...", e);
e.printstacktrace();
}
return nodechilddatalist;
}
@override
public void deletenode(string path, boolean recursive) {
curatorframework client = zookeeperconfig.getclient() ;
try {
if(recursive) {
// 递归删除节点
client.delete().guaranteed().deletingchildrenifneeded().forpath(path);
} else {
// 删除单个节点
client.delete().guaranteed().forpath(path);
}
} catch (exception e) {
logger.error("deletenode error...", e);
e.printstacktrace();
}
}
@override
public interprocessreadwritelock getreadwritelock(string path) {
curatorframework client = zookeeperconfig.getclient() ;
// 写锁互斥、读写互斥
interprocessreadwritelock readwritelock = new interprocessreadwritelock(client, path);
return readwritelock ;
}
}
zookeeper业务api场景使用
@api("zookeeper接口使用实例")
@restcontroller
public class zookeepercontroller {
@autowired
private zookeeperservice zookeeperservice ;
@apioperation(value="查询节点数据")
@getmapping("/getnodedata")
public httpresult getnodedata (string path) {
return httpresult.create(httpstatus.success,zookeeperservice.getnodedata(path));
}
@apioperation(value="判断节点是否存在")
@getmapping("/isexistnode")
public httpresult isexistnode (final string path){
return httpresult.create(httpstatus.success,zookeeperservice.isexistnode(path));
}
@apioperation(value="创建节点")
@getmapping("/createnode")
public httpresult createnode (createmode mode, string path ){
zookeeperservice.createnode(mode,path) ;
return httpresult.create(httpstatus.success);
}
@apioperation(value="设置节点数据")
@getmapping("/setnodedata")
public httpresult setnodedata (string path, string nodedata) {
zookeeperservice.setnodedata(path,nodedata) ;
return httpresult.create(httpstatus.success);
}
@apioperation(value="创建并设置节点数据")
@getmapping("/createnodeanddata")
public httpresult createnodeanddata (createmode mode, string path , string nodedata){
zookeeperservice.createnodeanddata(mode,path,nodedata) ;
return httpresult.create(httpstatus.success);
}
@apioperation(value="递归获取节点数据")
@getmapping("/getnodechild")
public httpresult getnodechild (string path) {
return httpresult.create(httpstatus.success,zookeeperservice.getnodechild(path));
}
@apioperation(value="是否递归删除节点")
@getmapping("/deletenode")
public httpresult deletenode (string path,boolean recursive) {
zookeeperservice.deletenode(path,recursive) ;
return httpresult.create(httpstatus.success);
}
}
接口返回httpresult统一类
import com.fasterxml.jackson.annotation.jsoninclude;
import io.swagger.annotations.apimodel;
import io.swagger.annotations.apimodelproperty;
import lombok.data;
/**
* @author lqzhang
* @date 2020/5/19
*/
@data
@apimodel(value = "httpresult", description = "统一返回数据结构")
public class httpresult<t> {
@apimodelproperty(value = "返回状态码")
private integer code;
@apimodelproperty(value = "返回信息")
private string msg;
@apimodelproperty(value = "返回数据")
@jsoninclude(value = jsoninclude.include.non_null)
private t data;
public static <e> httpresult<e> create(httpstatus httpstatus) {
httpresult<e> httpresult = new httpresult<>();
httpresult.setcode(httpstatus.getcode());
httpresult.setmsg(httpstatus.getmessage());
return httpresult;
}
public static <e> httpresult<e> create(httpstatus httpstatus, string msg) {
httpresult<e> httpresult = new httpresult<>();
httpresult.setcode(httpstatus.getcode());
httpresult.setmsg(msg);
return httpresult;
}
public static <e> httpresult<e> create(httpstatus httpstatus, e data) {
httpresult<e> httpresult = new httpresult<>();
httpresult.setcode(httpstatus.getcode());
httpresult.setmsg(httpstatus.getmessage());
httpresult.setdata(data);
return httpresult;
}
public static <e> httpresult<e> create(httpstatus httpstatus, string msg, e data) {
httpresult<e> httpresult = new httpresult<>();
httpresult.setcode(httpstatus.getcode());
httpresult.setmsg(msg);
httpresult.setdata(data);
return httpresult;
}
public static <e> httpresult<e> create(integer code, string msg, e data) {
httpresult<e> httpresult = new httpresult<>();
httpresult.setcode(code);
httpresult.setmsg(msg);
httpresult.setdata(data);
return httpresult;
}
public static <e> httpresult<e> success() {
return success(null);
}
public static <e> httpresult<e> success(e data) {
httpresult<e> httpresult = new httpresult<>();
httpresult.setcode(200);
httpresult.setmsg("操作成功");
httpresult.setdata(data);
return httpresult;
}
public static <e> httpresult<e> fail() {
return fail(httpstatus.fail.getmessage());
}
public static <e> httpresult<e> fail(string message) {
httpresult<e> httpresult = new httpresult<>();
httpresult.setcode(httpstatus.fail.getcode());
httpresult.setmsg(message);
return httpresult;
}
}
/**
* 请求结果状态枚举常量类
*
* @author lqzhang
*/
public enum httpstatus {
success(200, "请求成功"),
no_data(201, "没有查询到对应的数据"),
fail(203, "请求异常"),
param_error(204, "参数名错误或参数为空,请检查"),
no_login(205, "没有授权"),
save_error(206, "操作失败"),
no_data_in_auth(207, "权限范围内没有查询到数据"),
arrears(208, "账户可用余额已不足,请充值"),
unbound_phone(210, "用户账户未绑定微信手机"),
user_not_exits(211, "用户不存在"),
password_error(212, "密码错误"),
token_expired(403, "当前登录凭证已失效,请重新登录"),
service_not_opened(215, "该功能未开通,联系管理员开通使用"),;
/**
* 状态码
*/
private int code;
/**
* 状态信息
*/
private string message;
httpstatus(int code, string message) {
this.code = code;
this.message = message;
}
public int getcode() {
return code;
}
public void setcode(int code) {
this.code = code;
}
public string getmessage() {
return message;
}
public void setmessage(string message) {
this.message = message;
}
public int getstatustypecode(httpstatus httpstatus) {
return httpstatus.getcode();
}
public string getstatustypemessage(httpstatus httpstatus) {
return httpstatus.getmessage();
}
public static httpstatus getstatustypebycode(int code) {
httpstatus httpstatus = null;
for (httpstatus status : values()) {
if (status.getcode() == code) {
httpstatus = status;
break;
}
}
return httpstatus;
}
public static httpstatus getstatustypebymessage(string message) {
httpstatus httpstatus = null;
for (httpstatus status : values()) {
if (status.getmessage().equals(message)) {
httpstatus = status;
break;
}
}
return httpstatus;
}
}
结论
zookeeper作为分布式系统中的重要组件,提供了多种功能和强大的协调能力。在实际应用中,可以利用zookeeper实现分布式锁、统一配置管理、命名服务及集群管理等功能。通过与spring boot 2的整合,能更好地在应用中利用zookeeper这些功能,以提升系统的可用性和可靠性。希望通过本文的介绍,您对zookeeper有更加深入的了解,并能够在实际项目中加以应用。
发表评论