当前位置: 代码网 > 服务器>网络>网络协议 > ZooKeeper基础学习和API编写

ZooKeeper基础学习和API编写

2024年07月31日 网络协议 我要评论
事务id(ZXID)在某个时刻,每台服务器的ZXID不一定一致,跟更新请求后的更新逻辑有关系,操作或则数据在某一时刻不是绝对一致的。服务器id(SID),用来唯一标识Zookeeper集群中的服务器,在集群中每台服务器SID唯一。通知系统:接受观察者的注册,一旦数据的状态发生变化,ZooKeeper将负责通知已经在ZooKeeper上注册的那些观察者做出相应的反应。每个服务器都保持一份相同的数据副本。此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。

一、zookeeper学习要点

        zookeeper是一个基于观察者模式设计的分布式管理框架,为分布式框架提供协调服务的 apache 项目。


1、zookeeper工作机制


2、zookeeper特点


3、应用场景


4、选取机制(假设共5台服务器


5、客户端向服务器端写数据的流程


二、api编写


6、服务器动态上下线监听案例

1)先在集群上创建/servers 节点
(2)服务器端向 zookeeper 注册代码


import org.apache.zookeeper.*;

import java.io.ioexception;

public class distributeserver {
    private string connectstring = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private int sessiontimeout=2000;
    private zookeeper zk;

    public static void main(string[] args) throws ioexception, interruptedexception, keeperexception {

        //服务端在zookeeper上进行注册

        //1、获取zk链接
        //用distributeserver类对整体进行封装
        distributeserver server = new distributeserver();
        server.getconnect();

        //2、利用zk链接注册服务器信息(创建节点)
        server.registserver(args[0]);//args[0] 手动传 实参
        //3、启动业务功能(目前没有功能,我们们执行sleep()睡觉)
        server.business();
    }


    //业务功能
    private void business() throws interruptedexception {
        thread.sleep(long.max_value);
    }

    //在zk注册服务器(就是在父节点下面创建新的子节点)
    private void registserver(string hostname) throws interruptedexception, keeperexception {
        string create = zk.create("/servers/"+hostname, hostname.getbytes(), zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential);
        //创建完之后提示 已经上线
        system.out.println(hostname + "  is online.");
    }

    //链接zk集群
    private void getconnect() throws ioexception {
        //把zk做全局变狼
        zk = new zookeeper(connectstring, sessiontimeout, new watcher() {
            @override
            public void process(watchedevent watchedevent) {

            }
        });

    }

}//distributeserver
3)客户端代码

import org.apache.zookeeper.keeperexception;
import org.apache.zookeeper.watchedevent;
import org.apache.zookeeper.watcher;
import org.apache.zookeeper.zookeeper;

import java.io.ioexception;
import java.util.arraylist;
import java.util.list;

//客户端监听服务器的上下线
public class distributecliient {
    private string connectstring="hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private int sessiontimeout=2000;
    private zookeeper zk;

    public static void main(string[] args) throws ioexception, interruptedexception, keeperexception {
        distributecliient client = new distributecliient();
        //1、获取zk链接
        client.getconnect();
        //2、获取servers的子节点信息。从中得到服务器信息的列表
        client.getserverslist();
        //3、业务进程启动
        client.business();
    }

    //业务进程
    private void business() throws interruptedexception {
        thread.sleep(long.max_value);
    }

    //获取服务器列表的信息
    private void getserverslist() throws interruptedexception, keeperexception {
        //1获取服务器节点信息,对父节点(指定的节点"/servers",需要先在zk集群上创建该父节点)进行监听
        list<string> children = zk.getchildren("/servers", true);//监听
        //2存储服务器的列表信息
        arraylist<object> servers = new arraylist<>();

        //3遍历所有节点,获取节点中的主机名称信息
        for (string child : children) {
            //不在监听,已经监听父节点,不返回状态信息
            byte[] data = zk.getdata("/servers/" + child, false, null);
            //添加到列表中,循环添加
            servers.add(new string(data)); //注意类型转换
        }
        //4打印列表
        system.out.println(servers);
    }

    //链接zk集群
    private void getconnect() throws ioexception {
        zk = new zookeeper(connectstring, sessiontimeout, new watcher() {
            //多次监听
            @override
            public void process(watchedevent watchedevent) {
                try {
                    getserverslist();
                } catch (interruptedexception e) {
                    throw new runtimeexception(e);
                } catch (keeperexception e) {
                    throw new runtimeexception(e);
                }
            }
        });
    }
}

7、zookeeper 分布式锁案例 (java api实现)

1)分布式锁实现


import org.apache.zookeeper.*;
import org.apache.zookeeper.data.stat;

import java.io.ioexception;
import java.util.collection;
import java.util.collections;
import java.util.list;
import java.util.concurrent.countdownlatch;

public class distrabutelocks {
    private final string connectstring="hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private final int sessiontimeout=2000;
    private final zookeeper zk;
    //增加代码的健壮性
    private countdownlatch connectlatch = new countdownlatch(1);
    private countdownlatch waitlatch =new countdownlatch(1);
    private string lastnodepath; //当前节点的前一个节点的路径
    private string currentnode;

    //1、获取链接,连接上zk
    public distrabutelocks() throws ioexception, interruptedexception, keeperexception {
        //1)获取链接
        zk = new zookeeper(connectstring, sessiontimeout, new watcher() {
            @override
            public void process(watchedevent watchedevent) {
                //执行监听操作
                //connectlatch如果连接上zk,需要释放,执行下面的操作,进行判断
                //如果监听的状态是 链接 ,就释放
                if (watchedevent.getstate() == event.keeperstate.syncconnected){
                    connectlatch.countdown();//释放
                }

                //waitlatch 监听到前一个节点删除, 需要释放
                //监听到事件    节点删除 && 是前一个节点
                if (watchedevent.gettype() == event.eventtype.nodedeleted && watchedevent.getpath().equals(lastnodepath))
                    waitlatch.countdown();//释放
            }
        });
        //2)等待连接上zk,再进行后续操作,相当于阻塞,等待前序进程完成
        connectlatch.await();

        //3)判断根节点"/locks"是否存在,不需监听,获取到status(状态)
        stat stat = zk.exists("/locks", false);
        if (stat == null){
            //需要创建根节点
            zk.create("/locks", "locks".getbytes(), zoodefs.ids.open_acl_unsafe, createmode.persistent);
        }
    }

    //2、对zk加锁
    public void zklock(){
        try {
            //创建临时带序号的节点
            currentnode = zk.create("/locks/seq-", null, zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential);
            //判断创建的节点是不是最小序号的节点,如果是获取到锁,如果不是需要监听当前序号的前一个节点
            list<string> children = zk.getchildren("/locks", false);
            //如果"/locks"下只有一个节点,直接获取锁,如果多个节点,找出最小的序号获取锁
            if (children.size() == 1)
                return;//直接返回就是,获取锁
            else {
                //先排序
                collections.sort(children);
                //获取当前节点currentnode的定位是第几个,先获取当前节点的名称
                string thisnodename = currentnode.substring("/locks/".length());
                int index = children.indexof(thisnodename);
                //判断thisnodename是哪个位置
                if (index == -1 ){
                    system.out.println("数据异常~~~~");
                }else if (index == 0) {
                    return;//只有一个数据,就是当前节点获取锁
                }else {
                    //不是第一个节点,需要监听前一个节点的状态
                    //前一个节点的路径
                    lastnodepath = "/locks/"+ children.get(index -1);
                    //获取前一个节点的数据变化,达到监听的效果
                    zk.getdata(lastnodepath,true,null);
                    //增加代码的健壮性,等待监听
                    waitlatch.await();
                    //接听结束
                    return;//获取锁
                }
            }

        } catch (keeperexception e) {
            throw new runtimeexception(e);
        } catch (interruptedexception e) {
            throw new runtimeexception(e);
        }
    }

    //3、解锁
    public void unzklock(){
        //删除节点
        try {
            zk.delete(currentnode,-1);
        } catch (interruptedexception e) {
            throw new runtimeexception(e);
        } catch (keeperexception e) {
            throw new runtimeexception(e);
        }
    }

}

2)分布式锁测试



import org.apache.zookeeper.keeperexception;

import java.io.ioexception;

//对distrabutelocks进行测试,创建两个节点,查看运行状态
public class lockstest {
    public static void main(string[] args) throws ioexception, interruptedexception, keeperexception {
        //先创建需要测试的distrabutelocks 类
       final distrabutelocks locks1 = new distrabutelocks();
       final distrabutelocks locks2 = new distrabutelocks();
       final distrabutelocks locks3 = new distrabutelocks();

       //线程1
       new thread(new runnable() {
           @override
           public void run() {
               try {
                   //加锁(创建节点)
                   locks1.zklock();
                   system.out.println("线程1 启动~~,获取到锁。");
                   //增加延时5s
                   thread.sleep(5*1000);
                   //延时过后释放锁
                   locks1.unzklock();
                   system.out.println("线程1 结束~~,释放锁。");

               } catch (interruptedexception e) {
                   throw new runtimeexception(e);
               }
           }
       }).start();

        //线程2
        new thread(new runnable() {
            @override
            public void run() {
                try {
                    //加锁(创建节点)
                    locks2.zklock();
                    system.out.println("线程2 启动~~,获取到锁。");
                    //增加延时5s
                    thread.sleep(5*1000);

                    //延时过后释放锁
                    locks2.unzklock();
                    system.out.println("线程2 结束~~,释放锁。");

                } catch (interruptedexception e) {
                    throw new runtimeexception(e);
                }
            }
        }).start();

        //线程3
        new thread(new runnable() {
            @override
            public void run() {
                try {
                    //加锁(创建节点)
                    locks3.zklock();
                    system.out.println("线程3 启动~~,获取到锁。");
                    //增加延时5s
                    thread.sleep(5*1000);

                    //延时过后释放锁
                    locks3.unzklock();
                    system.out.println("线程3 结束~~,释放锁。");

                } catch (interruptedexception e) {
                    throw new runtimeexception(e);
                }
            }
        }).start();

    }//main
}

8、curator 框架实现分布式锁案例

2) curator 案例实操


import org.apache.curator.framework.curatorframework;
import org.apache.curator.framework.curatorframeworkfactory;
import org.apache.curator.framework.recipes.locks.interprocessmutex;
import org.apache.curator.retry.boundedexponentialbackoffretry;
import org.apache.curator.retry.exponentialbackoffretry;

public class curatorlocktest {
    private static string nodepath= "/locks";
    private static string connectadress="hadoop102:2181,hadoop103:2181,hadoop104:2181";

    public static void main(string[] args) {
        //1、创建分布式锁
        //使用curator的api进行编写

        //创建分布式锁1,使用getcuratorclient()方法创建客户端
        interprocessmutex lock1 = new interprocessmutex(getcuratorclient(), nodepath);
        //创建分布式锁2
        interprocessmutex lock2 = new interprocessmutex(getcuratorclient(), nodepath);
        //创建分布式锁3
        interprocessmutex lock3 = new interprocessmutex(getcuratorclient(), nodepath);


        //2、测试
        //线程1
        new thread(new runnable() {
            @override
            public void run() {
                try {
                    lock1.acquire();//获取到锁
                    system.out.println("线程1获取到锁~~~");

                    lock1.acquire();//获取到锁
                    system.out.println("线程1再次获取到锁~~~");

                    thread.sleep(5*1000);//延时5s

                    lock1.release();//释放锁
                    system.out.println("线程1释放锁~~~");

                    lock1.release();//释放锁
                    system.out.println("线程1再次释放锁~~~");
                } catch (exception e) {
                    throw new runtimeexception(e);
                }
            }
        }).start();//启动线程

        //线程2
        new thread(new runnable() {
            @override
            public void run() {
                try {
                    lock2.acquire();//获取到锁
                    system.out.println("线程2获取到锁~~~");

                    lock2.acquire();//获取到锁
                    system.out.println("线程2再次获取到锁~~~");

                    thread.sleep(5*1000);//延时5s

                    lock2.release();//释放锁
                    system.out.println("线程2释放锁~~~");

                    lock2.release();//释放锁
                    system.out.println("线程2再次释放锁~~~");
                } catch (exception e) {
                    throw new runtimeexception(e);
                }
            }
        }).start();//启动线程

        //线程3
        new thread(new runnable() {
            @override
            public void run() {
                try {
                    lock3.acquire();//获取到锁
                    system.out.println("线程3获取到锁~~~");

                    lock3.acquire();//获取到锁
                    system.out.println("线程3再次获取到锁~~~");

                    thread.sleep(5*1000);//延时5s

                    lock3.release();//释放锁
                    system.out.println("线程3释放锁~~~");

                    lock3.release();//释放锁
                    system.out.println("线程3再次释放锁~~~");
                } catch (exception e) {
                    throw new runtimeexception(e);
                }
            }
        }).start();//启动线程

    }

    //编写获取curator客户端的方法
    private static curatorframework getcuratorclient() {
        exponentialbackoffretry policy = new exponentialbackoffretry(3000, 3);
        //创建客户端
        curatorframework client = curatorframeworkfactory.builder().connectstring(connectadress) //链接地址
                .connectiontimeoutms(2000) //链接超时时间设置
                .sessiontimeoutms(2000) //心跳时间设置
                .retrypolicy(policy) //重新试链接次数
                .build();//客户端创建


        //启动客户端
        client.start();
        system.out.println("zookeeper 客户端启动成功!~~");
        return client; //返回客户端
    }
}

参考:尚硅谷zookeeper学习

(0)

相关文章:

  • LTPI协议的理解——4、LTPI链路初始化以及运行

    整个LTPI协议实现过程中,我认为最复杂的也就是LTPI的链路建立的过程 以及异步信号(特别是IIC)的传输事务的控制和响应以及对IIC接口线的操作。我们前面已经大致提到了各个阶段…

    2024年08月01日 网络
  • 基于FPGA的DES算法实验报告

    DES算法用硬件实现容易,软件实现难,基于这点本次实验将会用硬件形式实现DES算法。DES算法使用一个有效位为56位的密钥对64位的明文进行加密,其加密过程主要包括IP置换、密钥置…

    2024年08月01日 网络
  • 【模块系列】STM32&TCS3472

    【模块系列】STM32&TCS3472

    手上正好有TCS3472模块,也正好想在加深一下自己对I2C协议的理解和应用,所以就写了这个代码库出来。参考的资料主要来源于TCS3472的数据手册,和ardu... [阅读全文]
  • 【STM32】定时器与PWM的LED控制

    定时器的定时时间主要取决于定时周期和预分频因子,计算公式为:定时时间=(ARR+1)×(预分频值PSC+1)/输入时钟频率或 T=(TIM_Period +1)*(TIM_Pres…

    2024年08月01日 网络
  • 【17】STM32·HAL库·CAN

    CAN(Controller Area Network),是ISO国际标准化的串行通信协议。为了满足汽车产业的“减少线束的数量”、“通过多个 LAN,进行大量数据的高速通信”的需求…

    2024年08月01日 网络
  • 工业设备物联网与数据采集中可能遇见的问题

    工业设备物联网与数据采集中可能遇见的问题

    数据中台可以接入PLC、仪器仪表、传感器、数控机床、工业机器人等设备数据,进行清洗、过滤、计算等标准化处理,从而为各种工业物联网平台、工业互联网平台和工业软件提... [阅读全文]

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com