欢迎来到徐庆高(Tea)的个人博客网站
磨难很爱我,一度将我连根拔起。从惊慌失措到心力交瘁,我孤身一人,但并不孤独无依。依赖那些依赖我的人,信任那些信任我的人,帮助那些给予我帮助的人。如果我愿意,可以分裂成无数面镜子,让他们看见我,就像看见自己。察言观色和模仿学习是我的领域。像每个深受创伤的人那样,最终,我学会了随遇而安。
当前位置: 日志文章 > 详细内容

Java使用Curator进行ZooKeeper操作的详细教程

2025年04月01日 Java
1、简述apache curator 是一个基于 zookeeper 的 java 客户端库,它极大地简化了使用 zookeeper 的开发工作。curator 提供了高层次的 api,封装了很多复杂

1、简述

apache curator 是一个基于 zookeeper 的 java 客户端库,它极大地简化了使用 zookeeper 的开发工作。curator 提供了高层次的 api,封装了很多复杂的 zookeeper 操作,例如连接管理、分布式锁、leader 选举等。

在分布式系统中,zookeeper 通常被用来作为协调服务,而 curator 则为我们提供了更简洁易用的接口,减少了开发的复杂性。本文将介绍 curator 的核心功能及实践样例。

2、核心功能

apache curator是一个比较完善的zookeeper客户端框架,通过封装的一套高级api 简化了zookeeper的操作。curator主要解决了三类问题:

  • 封装zookeeper client与zookeeper server之间的连接处理
  • 提供了一套fluent风格的操作api
  • 提供zookeeper各种应用场景(recipe, 比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装

curator 提供了以下核心组件:

2.1 curatorframework

curatorframework 是 curator 的核心类,用于与 zookeeper 服务交互。

2.2 recipes

curator 提供了多种常见分布式模式的实现,包括:

  • 分布式锁 (interprocessmutex)
  • 分布式队列 (distributedqueue)
  • leader 选举 (leaderselector)
  • 节点缓存 (nodecache)
  • 路径缓存 (pathchildrencache)
  • 树缓存 (treecache)

3、示例实践

curator中提供了zookeeper各种应用场景(recipe,如共享锁服务、master选举机制和分布式计算器等)的抽象封装。

3.1 依赖引入

在使用 curator 前,需要在项目中引入相关的依赖:

<!-- zookeeper支持 -->
<dependency>
   <groupid>org.apache.zookeeper</groupid>
   <artifactid>zookeeper</artifactid>
   <version>3.6.4</version>
</dependency>
<!-- curator-recipes -->
<dependency>
   <groupid>org.apache.curator</groupid>
   <artifactid>curator-recipes</artifactid>
   <version>5.5.0</version>
</dependency>
<!-- curator-framework -->
<dependency>
   <groupid>org.apache.curator</groupid>
   <artifactid>curator-framework</artifactid>
   <version>5.5.0</version>
</dependency>

3.2 初始化 curatorframework

以下代码展示了如何初始化 curatorframework:

import org.apache.curator.framework.curatorframework;
import org.apache.curator.framework.curatorframeworkfactory;
import org.apache.curator.retry.exponentialbackoffretry;

public class curatorexample {
    public static void main(string[] args) {
        // 创建 curatorframework 实例
        curatorframework client = curatorframeworkfactory.builder()
                .connectstring("127.0.0.1:2181") // zookeeper 地址
                .sessiontimeoutms(5000)
                .connectiontimeoutms(3000)
                .retrypolicy(new exponentialbackoffretry(1000, 3))
                .build();

        // 启动客户端
        client.start();

        system.out.println("curatorframework 已启动");

        // 关闭客户端
        client.close();
    }
}

3.3 分布式锁

分布式锁是分布式系统中的一个重要功能,用于协调多进程/线程间的访问。

import org.apache.curator.framework.curatorframework;
import org.apache.curator.framework.curatorframeworkfactory;
import org.apache.curator.retry.exponentialbackoffretry;
import org.apache.curator.framework.recipes.locks.interprocessmutex;

import java.util.concurrent.timeunit;

public class distributedlockexample {
    public static void main(string[] args) throws exception {
        // 初始化 curatorframework
        curatorframework client = curatorframeworkfactory.builder()
                .connectstring("127.0.0.1:2181")
                .retrypolicy(new exponentialbackoffretry(1000, 3))
                .build();
        client.start();

        // 创建分布式锁
        interprocessmutex lock = new interprocessmutex(client, "/distributed-lock");

        // 尝试获取锁
        if (lock.acquire(10, timeunit.seconds)) {
            try {
                system.out.println("成功获取锁,执行任务...");
                thread.sleep(5000); // 模拟任务
            } finally {
                lock.release();
                system.out.println("锁已释放");
            }
        } else {
            system.out.println("未能获取锁");
        }

        client.close();
    }
}

3.4 leader 选举

curator 的 leaderselector 提供了简单易用的 leader 选举功能。

import org.apache.curator.framework.curatorframework;
import org.apache.curator.framework.curatorframeworkfactory;
import org.apache.curator.retry.exponentialbackoffretry;
import org.apache.curator.framework.recipes.leader.leaderselector;
import org.apache.curator.framework.recipes.leader.leaderselectorlisteneradapter;

public class leaderelectionexample {
    public static void main(string[] args) throws interruptedexception {
        curatorframework client = curatorframeworkfactory.builder()
                .connectstring("127.0.0.1:2181")
                .retrypolicy(new exponentialbackoffretry(1000, 3))
                .build();
        client.start();

        // 创建 leaderselector
        leaderselector leaderselector = new leaderselector(client, "/leader-election", new leaderselectorlisteneradapter() {
            @override
            public void takeleadership(curatorframework client) throws exception {
                system.out.println("成为 leader,执行任务...");
                thread.sleep(3000); // 模拟任务
                system.out.println("任务完成,释放 leader 权限");
            }
        });

        leaderselector.autorequeue(); // 自动重新排队参与选举
        leaderselector.start();

        thread.sleep(integer.max_value); // 保持主线程运行
        client.close();
    }
}

3.5 节点缓存

nodecache 用于监听特定节点的数据变更。

import org.apache.curator.framework.curatorframework;
import org.apache.curator.framework.curatorframeworkfactory;
import org.apache.curator.framework.recipes.cache.nodecache;
import org.apache.curator.retry.exponentialbackoffretry;

public class nodecacheexample {
    public static void main(string[] args) throws exception {
        curatorframework client = curatorframeworkfactory.builder()
                .connectstring("127.0.0.1:2181")
                .retrypolicy(new exponentialbackoffretry(1000, 3))
                .build();
        client.start();

        // 创建 nodecache
        nodecache nodecache = new nodecache(client, "/test-node");
        nodecache.getlistenable().addlistener(() -> {
            system.out.println("节点数据变更,新的数据为:" + new string(nodecache.getcurrentdata().getdata()));
        });

        nodecache.start();

        // 创建节点并修改数据
        client.create().orsetdata().forpath("/test-node", "initial-data".getbytes());
        thread.sleep(1000);
        client.setdata().forpath("/test-node", "updated-data".getbytes());

        thread.sleep(5000); // 保持运行观察结果
        client.close();
    }
}

4、总结

curator 提供了强大的 zookeeper 封装功能,极大地简化了开发流程。在分布式系统中,通过 curator 可以实现诸如分布式锁、leader 选举和节点监听等功能,帮助开发者快速构建稳定的分布式服务。

以上就是java使用curator进行zookeeper操作的详细教程的详细内容,更多关于java curator进行zookeeper操作的资料请关注代码网其它相关文章!