当前位置: 代码网 > it编程>编程语言>Java > Springboot整合Zookeeper分布式组件实例

Springboot整合Zookeeper分布式组件实例

2024年08月01日 Java 我要评论
Zookeeper是一个开源的分布式协调服务,主要用于分布式应用程序中的协调管理。它由Apache软件基金会维护,是Hadoop生态系统中的重要成员。Zookeeper提供了一个高效且可靠的分布式锁服务,以及群集管理功能,在分布式系统中起到了“守护神”的作用。

一、zookeeper概述

1.1 zookeeper的定义

zookeeper是一个开源的分布式协调服务,主要用于分布式应用程序中的协调管理。它由apache软件基金会维护,是hadoop生态系统中的重要成员。zookeeper提供了一个高效且可靠的分布式锁服务,以及群集管理功能,在分布式系统中起到了“守护神”的作用。

1.2 zookeeper的核心理念

zookeeper基于以下关键概念构建:

  • 数据模型:zookeeper的数据模型是一个层次结构,这个层次类似于一个文件系统,与liunx的文件系统类似,整体可以看作为一棵树。它由节点组成,节点也成为znode,每个节点可以有子节点。节点可以存储数据,但数据尺寸有限默认存储为1mb的数据。

  • 节点(znode):zookeeper中的每个数据单元称为znode。znode有两种类型:持久(persistent)和临时(ephemeral)。持久节点在客户端断开连接后仍存在,而临时节点在客户端断开连接后会被自动删除。

  • 观察者(watcher):客户端可以在znode上设置观察者,当znode的数据或子节点发生变化时,watcher会通知对应的客户端。

  • 有序性(orderliness):zookeeper通过全局顺序来确保所有操作的顺序一致。

  • 数据一致性 :每个server保存一份相同的数据拷贝,客户端无论请求到被集群中哪个server处理,得到的数据都是一致的。

  • 集群服务:在zookeeper集群服务由一个领导者(leader),多个跟随者(follower)组成的集群。领导者(leader)负责进行投票的发起和决议,更新集群服务状态。跟随者用于接收客户请求并向客户端返回结果,在选举leader过程中参与投票。集群中只要有半数以上节点存活,zookeeper集群就能正常服务。

二、zookeeper的应用场景

2.1 分布式锁服务

zookeeper能够非常有效地实现分布式锁。这在需要同步或并发控制的分布式系统中尤为重要。通过利用zookeeper的临时znode特性,可以实现锁的自动释放,防止死锁。

2.2 统一配置管理

在分布式系统中,应用程序的配置管理成为一个复杂的问题。zookeeper提供了一种集中式管理配置的方式,所有的配置文件可以存储在zookeeper中,并且可以动态更新。当配置变化时,zookeeper可以通知到所有客户端,从而使应用程序能够立即响应变化。

2.3 命名服务

zookeeper可以作为分布式系统的命名服务,通过维护名称和元数据的映射关系,提供高效的名称解析能力。

2.4 集群管理

zookeeper能够监控集群中各个节点的状态,决定节点是否健康,而节点的加入和离开能够动态调整。

三、zookeeper的安全管理操作方法

3.1 基本安全措施

  • 验证(authentication):zookeeper支持基于客户端和服务器之间的认证机制。通过设置用户和密码,可以限制对znode的访问。

3.1.1认证方式
  • world:默认方式,开放的权限,意解为全世界都能随意访问。
  • auth:已经授权且认证通过的用户才可以访问。
  • digest:用户名:密码方式认证,实际业务开发中最常用的方式。
  • ip白名单:授权指定的ip地址,和指定的权限点,控制访问。

  • acl(access control lists):通过设定acl,能够控制znode的读写权限。acl规则可以根据不同的需求设定,如只读、完全控制等。

3.1.2 acl授权流程
  • 添加认证用户

addauth digest 用户名:密码

  • 设置权限

setacl /path auth:用户名:密码:权限

  • 查看acl设置

getacl /path

完整的操作如下代码

-- 添加授权用户
[zk: localhost:2181] addauth digest user1:123456
-- 创建节点
[zk: localhost:2181] create /testnode testnode 
-- 节点授权
[zk: localhost:2181] setacl /testnode  auth:user1:123456:cdrwa
-- 查看授权
[zk: localhost:2181] getacl /testnode 

3.2 数据加密

在zookeeper的配置文件中,可以启用数据传输加密(例如ssl/tls)来保证数据在网络传输中的安全性。

3.3 安全配置示例

# zookeeper configuration 
authprovider.1=org.apache.zookeeper.server.auth.saslauthenticationprovider 
requireclientauthscheme=sasl 
digest.authenticationhandler.sasl.clientallowedprotocols=gssapi:cram-md5

四、zookeeper与spring boot 2的整合

4.1 引入依赖

在spring boot 2项目中,首先需要引入curator依赖,这是用于简化zookeeper操作的一个高层次api。curator框架在zookeeper原生api接口上进行二次包装。提供zookeeper各种应用场景:比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等api封装。

<dependency>
    <groupid>org.apache.curator</groupid>
    <artifactid>curator-framework</artifactid>
    <version>2.12.0</version>
</dependency>
<dependency>
    <groupid>org.apache.curator</groupid>
    <artifactid>curator-recipes</artifactid>
    <version>2.12.0</version>
</dependency>
<dependency>
    <groupid>org.apache.curator</groupid>
    <artifactid>curator-client</artifactid>
    <version>2.12.0</version>
</dependency>

4.2 springboot项目yml配置

application.properties中,配置zookeeper的连接信息:

zoo:
  keeper:
    #开启标志
    enabled: true
    #服务器地址
    server: 127.0.0.1:2181
    #命名空间,被称为znode
    namespace: testnode
    #权限控制,加密
    digest: user1:123456
    #会话超时时间
    sessiontimeoutms: 3000
    #连接超时时间
    connectiontimeoutms: 60000
     #最大重试次数
    maxretries: 2
    #初始休眠时间
    basesleeptimems: 1000

4.3 编写配置类

编写zookeeper配置类,用于初始化zookeeper客户端:

@configuration
public class zookeeperconfig {
    private static final logger logger = loggerfactory.getlogger(zookeeperconfig.class) ;
    
    //注入zookeeper配置文件类,用于获取yml的配置项值
    @autowired
    private zookeeperparam zookeeperparam ;

    private static curatorframework client = null ;
    /**
     * 初始化
     */
    @postconstruct
    public void init (){
        //重试策略,初试时间1秒,重试10次
        retrypolicy policy = new exponentialbackoffretry(
                zookeeperparam.getbasesleeptimems(),
                zookeeperparam.getmaxretries());
        //通过工厂创建curator
        client = curatorframeworkfactory.builder()
                .connectstring(zookeeperparam.getserver()) //链接的服务的地址
                .authorization("digest",zookeeperparam.getdigest().getbytes()) //认证方式
                .connectiontimeoutms(zookeeperparam.getconnectiontimeoutms())
                .sessiontimeoutms(zookeeperparam.getsessiontimeoutms())
                .retrypolicy(policy).build();
        //开启连接
        client.start();
        logger.info("zookeeper 初始化完成...");
    }
    public static curatorframework getclient (){
        return client ;
    }
    public static void closeclient (){
        if (client != null){
            client.close();
        }
    }
}

4.4 示例代码

示例代码展示如何在spring boot 2项目中使用zookeeper:

zookeeper接口类
public interface zookeeperservice {
    /**
     * 判断节点是否存在
     */
    boolean isexistnode (final string path) ;
    /**
     * 创建节点
     */
    void createnode (createmode mode,string path ) ;
    /**
     * 设置节点数据
     */
    void setnodedata (string path, string nodedata) ;
    /**
     * 创建节点
     */
    void createnodeanddata (createmode mode, string path , string nodedata) ;
    /**
     * 获取节点数据
     */
    string getnodedata (string path) ;
    /**
     * 获取节点下数据
     */
    list<string> getnodechild (string path) ;
    /**
     * 是否递归删除节点
     */
    void deletenode (string path,boolean recursive) ;
    /**
     * 获取读写锁
     */
    interprocessreadwritelock getreadwritelock (string path) ;
}
zookeeper接口实现类impl
@service
public class zookeeperserviceimpl implements zookeeperservice {
    private static final logger logger = loggerfactory.getlogger(zookeeperserviceimpl.class);
    @override
    public boolean isexistnode(string path) {
        curatorframework client = zookeeperconfig.getclient();
        client.sync() ;
        try {
            stat stat = client.checkexists().forpath(path);
            return client.checkexists().forpath(path) != null;
        } catch (exception e) {
            logger.error("isexistnode error...", e);
            e.printstacktrace();
        }
        return false;
    }
    @override
    public void createnode(createmode mode, string path) {
        curatorframework client = zookeeperconfig.getclient() ;
        try {
            // 递归创建所需父节点
            client.create().creatingparentsifneeded().withmode(mode).forpath(path);
        } catch (exception e) {
            logger.error("createnode error...", e);
            e.printstacktrace();
        }
    }
    @override
    public void setnodedata(string path, string nodedata) {
        curatorframework client = zookeeperconfig.getclient() ;
        try {
            // 设置节点数据
            client.setdata().forpath(path, nodedata.getbytes("utf-8"));
        } catch (exception e) {
            logger.error("setnodedata error...", e);
            e.printstacktrace();
        }
    }
    @override
    public void createnodeanddata(createmode mode, string path, string nodedata) {
        curatorframework client = zookeeperconfig.getclient() ;
        try {
            // 创建节点,关联数据
            client.create().creatingparentsifneeded().withmode(mode)
                  .forpath(path,nodedata.getbytes("utf-8"));
        } catch (exception e) {
            logger.error("createnode error...", e);
            e.printstacktrace();
        }
    }
    @override
    public string getnodedata(string path) {
        curatorframework client = zookeeperconfig.getclient() ;
        try {
            // 数据读取和转换
            byte[] databyte = client.getdata().forpath(path) ;
            string data = new string(databyte,"utf-8") ;
            if (stringutils.isnotempty(data)){
                return data ;
            }
        }catch (exception e) {
            logger.error("getnodedata error...", e);
            e.printstacktrace();
        }
        return null;
    }
    @override
    public list<string> getnodechild(string path) {
        curatorframework client = zookeeperconfig.getclient() ;
        list<string> nodechilddatalist = new arraylist<>();
        try {
            // 节点下数据集
            nodechilddatalist = client.getchildren().forpath(path);
        } catch (exception e) {
            logger.error("getnodechild error...", e);
            e.printstacktrace();
        }
        return nodechilddatalist;
    }
    @override
    public void deletenode(string path, boolean recursive) {
        curatorframework client = zookeeperconfig.getclient() ;
        try {
            if(recursive) {
                // 递归删除节点
                client.delete().guaranteed().deletingchildrenifneeded().forpath(path);
            } else {
                // 删除单个节点
                client.delete().guaranteed().forpath(path);
            }
        } catch (exception e) {
            logger.error("deletenode error...", e);
            e.printstacktrace();
        }
    }
    @override
    public interprocessreadwritelock getreadwritelock(string path) {
        curatorframework client = zookeeperconfig.getclient() ;
        // 写锁互斥、读写互斥
        interprocessreadwritelock readwritelock = new interprocessreadwritelock(client, path);
        return readwritelock ;
    }
}
zookeeper业务api场景使用
@api("zookeeper接口使用实例")
@restcontroller
public class zookeepercontroller {
    @autowired
    private zookeeperservice zookeeperservice ;
    @apioperation(value="查询节点数据")
    @getmapping("/getnodedata")
    public httpresult getnodedata (string path) {
       
        return httpresult.create(httpstatus.success,zookeeperservice.getnodedata(path));
    }
    @apioperation(value="判断节点是否存在")
    @getmapping("/isexistnode")
    public httpresult isexistnode (final string path){
        
        return httpresult.create(httpstatus.success,zookeeperservice.isexistnode(path));
    }
    @apioperation(value="创建节点")
    @getmapping("/createnode")
    public httpresult createnode (createmode mode, string path ){
        zookeeperservice.createnode(mode,path) ;
        return httpresult.create(httpstatus.success);
    }
    @apioperation(value="设置节点数据")
    @getmapping("/setnodedata")
    public httpresult setnodedata (string path, string nodedata) {
        zookeeperservice.setnodedata(path,nodedata) ;
        return httpresult.create(httpstatus.success);
    }
    @apioperation(value="创建并设置节点数据")
    @getmapping("/createnodeanddata")
    public httpresult createnodeanddata (createmode mode, string path , string nodedata){
        zookeeperservice.createnodeanddata(mode,path,nodedata) ;
        return httpresult.create(httpstatus.success);
    }
    @apioperation(value="递归获取节点数据")
    @getmapping("/getnodechild")
    public httpresult getnodechild (string path) {
        return httpresult.create(httpstatus.success,zookeeperservice.getnodechild(path));
    }
    @apioperation(value="是否递归删除节点")
    @getmapping("/deletenode")
    public httpresult deletenode (string path,boolean recursive) {
        zookeeperservice.deletenode(path,recursive) ;
        return httpresult.create(httpstatus.success);
    }
}
接口返回httpresult统一类
import com.fasterxml.jackson.annotation.jsoninclude;

import io.swagger.annotations.apimodel;
import io.swagger.annotations.apimodelproperty;
import lombok.data;

/**
 * @author lqzhang
 * @date 2020/5/19
 */
@data
@apimodel(value = "httpresult", description = "统一返回数据结构")
public class httpresult<t> {

    @apimodelproperty(value = "返回状态码")
    private integer code;

    @apimodelproperty(value = "返回信息")
    private string msg;

    @apimodelproperty(value = "返回数据")
    @jsoninclude(value = jsoninclude.include.non_null)
    private t data;

    public static <e> httpresult<e> create(httpstatus httpstatus) {
        httpresult<e> httpresult = new httpresult<>();
        httpresult.setcode(httpstatus.getcode());
        httpresult.setmsg(httpstatus.getmessage());
        return httpresult;
    }

    public static <e> httpresult<e> create(httpstatus httpstatus, string msg) {
        httpresult<e> httpresult = new httpresult<>();
        httpresult.setcode(httpstatus.getcode());
        httpresult.setmsg(msg);
        return httpresult;
    }

    public static <e> httpresult<e> create(httpstatus httpstatus, e data) {
        httpresult<e> httpresult = new httpresult<>();
        httpresult.setcode(httpstatus.getcode());
        httpresult.setmsg(httpstatus.getmessage());
        httpresult.setdata(data);
        return httpresult;
    }

    public static <e> httpresult<e> create(httpstatus httpstatus, string msg, e data) {
        httpresult<e> httpresult = new httpresult<>();
        httpresult.setcode(httpstatus.getcode());
        httpresult.setmsg(msg);
        httpresult.setdata(data);
        return httpresult;
    }

    public static <e> httpresult<e> create(integer code, string msg, e data) {
        httpresult<e> httpresult = new httpresult<>();
        httpresult.setcode(code);
        httpresult.setmsg(msg);
        httpresult.setdata(data);
        return httpresult;
    }

    public static <e> httpresult<e> success() {
        return success(null);
    }

    public static <e> httpresult<e> success(e data) {
        httpresult<e> httpresult = new httpresult<>();
        httpresult.setcode(200);
        httpresult.setmsg("操作成功");
        httpresult.setdata(data);
        return httpresult;
    }

    public static <e> httpresult<e> fail() {
        return fail(httpstatus.fail.getmessage());
    }

    public static <e> httpresult<e> fail(string message) {
        httpresult<e> httpresult = new httpresult<>();
        httpresult.setcode(httpstatus.fail.getcode());
        httpresult.setmsg(message);
        return httpresult;
    }
}
/**
 * 请求结果状态枚举常量类
 *
 * @author lqzhang
 */
public enum httpstatus {

    success(200, "请求成功"),

    no_data(201, "没有查询到对应的数据"),

    fail(203, "请求异常"),

    param_error(204, "参数名错误或参数为空,请检查"),

    no_login(205, "没有授权"),

    save_error(206, "操作失败"),

    no_data_in_auth(207, "权限范围内没有查询到数据"),

    arrears(208, "账户可用余额已不足,请充值"),

    unbound_phone(210, "用户账户未绑定微信手机"),

    user_not_exits(211, "用户不存在"),

    password_error(212, "密码错误"),

    token_expired(403, "当前登录凭证已失效,请重新登录"),

    service_not_opened(215, "该功能未开通,联系管理员开通使用"),;

    /**
     * 状态码
     */
    private int code;
    /**
     * 状态信息
     */
    private string message;

    httpstatus(int code, string message) {
        this.code = code;
        this.message = message;
    }

    public int getcode() {
        return code;
    }

    public void setcode(int code) {
        this.code = code;
    }

    public string getmessage() {
        return message;
    }

    public void setmessage(string message) {
        this.message = message;
    }

    public int getstatustypecode(httpstatus httpstatus) {
        return httpstatus.getcode();
    }

    public string getstatustypemessage(httpstatus httpstatus) {
        return httpstatus.getmessage();
    }

    public static httpstatus getstatustypebycode(int code) {
        httpstatus httpstatus = null;
        for (httpstatus status : values()) {
            if (status.getcode() == code) {
                httpstatus = status;
                break;
            }
        }
        return httpstatus;
    }

    public static httpstatus getstatustypebymessage(string message) {
        httpstatus httpstatus = null;
        for (httpstatus status : values()) {
            if (status.getmessage().equals(message)) {
                httpstatus = status;
                break;
            }
        }
        return httpstatus;
    }

}

结论

zookeeper作为分布式系统中的重要组件,提供了多种功能和强大的协调能力。在实际应用中,可以利用zookeeper实现分布式锁、统一配置管理、命名服务及集群管理等功能。通过与spring boot 2的整合,能更好地在应用中利用zookeeper这些功能,以提升系统的可用性和可靠性。希望通过本文的介绍,您对zookeeper有更加深入的了解,并能够在实际项目中加以应用。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com