当前位置: 代码网 > 科技>操作系统>Windows > zookeeper案例

zookeeper案例

2024年08月02日 Windows 我要评论
先在集群上创建/servers 节点(用于存储连接的服务器的主机和该服务器的节点数)相当于zookeeper集群 创建类对象该类为我们创建的服务端类: 获取zookeeper连接:自己创建连接方法: 让后server对象在main函数中调用注册是需要注册到zookeeper集群的/servers路径下,需要指定参数进行创建(3)业务逻辑(睡眠):服务端代码如下:客户端:(1)获取zookeeper的连接: 先创建客户端对

目录

案例一:服务器动态上下线

服务端:

(1)先获取zookeeper连接

(2)注册服务器到zookeeper集群:

(3)业务逻辑(睡眠):

服务端代码如下:

客户端:

(1)获取zookeeper的连接:

(2)监听/servers下边的子节点的增减:

客户端代码如下:

案例二:zookeeper 分布式锁

分布式锁是什么?

锁的实现:

构造函数:

加锁函数:

解锁函数:

整体代码:

测试类代码 :

curator 框架实现分布式锁案例:

实现步骤:

代码如下:


先在集群上创建/servers 节点(用于存储连接的服务器的主机和该服务器的节点数)相当于zookeeper集群

案例一:服务器动态上下线

服务端:

(1)先获取zookeeper连接

        创建类对象

该类为我们创建的服务端类:

        distributeserver server = new distributeserver();

        获取zookeeper连接:

自己创建连接方法:

    private void getconnect() throws ioexception {
       zk = new zookeeper(connectstring, sessiontimeout, new watcher() {
            @override
            public void process(watchedevent watchedevent) {
            }
        });

    }

 让后server对象在main函数中调用

(2)注册服务器到zookeeper集群:

注册是需要注册到zookeeper集群的/servers路径下,需要指定参数进行创建

private void regestserver(string hostname) throws interruptedexception, keeperexception {
zk.create(parentnode+"/"+hostname,hostname.getbytes(), zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential);
    //  需要创建有序的临时节点所以-e(暂时) -s(有序)
        system.out.println("服务器"+hostname+"已注册连接");
    }

(3)业务逻辑(睡眠):

    private void business() throws interruptedexception {
    thread.sleep(long.max_value);
    }

服务端代码如下:

package com.tangxiaocong.case1;
import org.apache.zookeeper.*;
import java.io.ioexception;
/**
 * @date 2023/8/10 19:06
 * @author 
 */
public class distributeserver {
   private static string connectstring="hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private static int sessiontimeout=2000;
    private zookeeper zk =null;
    private string parentnode = "/servers";
    public static void main(string[] args) throws ioexception, interruptedexception, keeperexception {
        //获取zk连接
        //创建
        distributeserver server = new distributeserver();
        server.getconnect();
        //注册服务器到zk集群
        //注册是需要在/servers节点下创建所开启的服务器的路径
        server.regestserver(args[0]);
        //业务逻辑(实际是延时让它睡觉---不然会注册完成就关闭)
        server.business();
    }
    private void business() throws interruptedexception {
    thread.sleep(long.max_value);
    }
    private void regestserver(string hostname) throws interruptedexception, keeperexception {
zk.create(parentnode+"/"+hostname,hostname.getbytes(), zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential);
    //  需要创建有序的临时节点所以-e(暂时) -s(有序)
        system.out.println("服务器"+hostname+"已注册连接");
    }
    private void getconnect() throws ioexception {
       zk = new zookeeper(connectstring, sessiontimeout, new watcher() {
            @override
            public void process(watchedevent watchedevent) {
            }
        });
    }
}

客户端:

(1)获取zookeeper的连接:

        先创建客户端对象,在进行构建获取zookeeper连接的方法,本方法对process方法进行了重写,填写了再发生上下线的运行逻辑

 private void getconnect() throws ioexception {
       zk= new zookeeper(connectstring, sessiontimeout, new watcher() {
        @override
        public void process(watchedevent watchedevent) {
            try {
                getserverlist();
            } catch (interruptedexception e) {
                e.printstacktrace();
            } catch (keeperexception e) {
                e.printstacktrace();
            }
        }
    });
    }

(2)监听/servers下边的子节点的增减:

        构建方法client.getserverlist()来进行监听:

代码逻辑就是通过getchildren()方法获取指定目录下的所有子目录并开启监听

再进行遍历,把遍历结果封装到一个集合中,最后进行输出

 private void getserverlist() throws interruptedexception, keeperexception {
        list<string> children = zk.getchildren("/servers", true);
       //该方法会获取指定路径下的所有子节点
        //true 会走初始化中的watch 也可以自己创建watch
        //把所有的服务器都封装到一个集合
        arraylist<string> list = new arraylist<>();
        for (string child : children) {
            byte[] data = zk.getdata("/servers" +"/"+ child, false, null);
            //上边已经便利到一个服务器对象,再进行添加
            list.add(new string(data));
        }
        system.out.println(list);
    }

(3)业务逻辑同服务端不在赘述。

客户端代码如下:

package com.tangxiaocong.case1;
import org.apache.zookeeper.keeperexception;
import org.apache.zookeeper.watchedevent;
import org.apache.zookeeper.watcher;
import org.apache.zookeeper.zookeeper;
import java.io.ioexception;
import java.util.arraylist;
import java.util.list;
/**
 * @date 2023/8/10 21:27
 * @author 
 * 客户端的监听功能
 */
public class distributeclient {
private string connectstring="hadoop102:2181,hadoop103:2181,hadoop104:2181";
   private int sessiontimeout=2000;
   private zookeeper zk=null;
   public static void main(string[] args) throws ioexception, interruptedexception, keeperexception {
        //获取zk连接
        distributeclient client = new distributeclient();
        client.getconnect();
        //监听/servers下边的子节点的增减
        client.getserverlist();
       //业务逻辑(睡眠)
       client.business();
    }
    private void business() throws interruptedexception {
   thread.sleep(long.max_value);
   }
    private void getserverlist() throws interruptedexception, keeperexception {
        list<string> children = zk.getchildren("/servers", true);
       //该方法会获取指定路径下的所有子节点
        //true 会走初始化中的watch 也可以自己创建watch
        //把所有的服务器都封装到一个集合
        arraylist<string> list = new arraylist<>();
        for (string child : children) {
            byte[] data = zk.getdata("/servers" +"/"+ child, false, null);
            //上边已经便利到一个服务器对象,再进行添加
            list.add(new string(data));
        }
        system.out.println(list);
    }
    private void getconnect() throws ioexception {
       zk= new zookeeper(connectstring, sessiontimeout, new watcher() {
        @override
        public void process(watchedevent watchedevent) {
            try {
                getserverlist();
            } catch (interruptedexception e) {
                e.printstacktrace();
            } catch (keeperexception e) {
                e.printstacktrace();
            }
        }
    });
    }
}

案例二:zookeeper 分布式锁

分布式锁是什么?

日常使用计算机的时候,我们的电脑不会只开一个进程,但是当“进程1”在访问某些资源的时候,不能被其他进程所访问,它就会去获得锁,把她所访问的资源进行锁上,对该资源进行独占。"进程 1"用完该资源以后就将锁释放掉,让其 他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的 访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

锁的实现:

构造函数:

在该类中首先要实现构造方法,构造方法与类名相同,在该方法中需要获取连接,重写process方法,在该方法中实现释放countdownlatch的类对象,有两种情况,正常连接释放一种,不是正常连接状态,则释放另一种。在构造方法中还要判断是否存在“/locks”路径,存在则正常退出,不存在则创建该路径。

加锁函数:

使用zookeeper对象进行创建节点(临时有序),让后获取“/locks”路径下的所有节点序号,对结果进行判断,如果返回的list集合只有一个节点,则直接返回,默认加锁,不用再做监听工作。如果不是只有一个节点,则对list集合进行排序,再获取他的节点名称,通过indexof函数来获取该名称节点的下标。如果为-1,则数据异常,为0 则为最小节点,则直接退出,进行加锁不需要设置监听,结果为其他则需要设置监听,先设置监听字符串,当状态不发生改变会一致阻塞,只有上锁节点让位后会调用process方法进行释放。

解锁函数:

解锁就是直接删除节点即可

整体代码:

package com.tangxiaocong.case2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.stat;
import java.io.ioexception;
import java.util.collections;
import java.util.list;
import java.util.concurrent.countdownlatch;
/**
 * @date 2023/8/12 19:56
 * @author 
 */
public class distributedlock {
 final    private string connectstring="hadoop102:2181,hadoop103:2181,hadoop104:2181";
    final  private int sessiontimeout=2000;
    final    private   zookeeper zk;
    private string waitpath;
    private string currentmodu;
    //为了程序的健壮性,创建该对象   等待操作
    final   private countdownlatch waitlach=new countdownlatch(1);
    final   private countdownlatch countdownlatch=new countdownlatch(1);
  public distributedlock() throws ioexception, interruptedexception, keeperexception {
        //获取连接
      zk = new zookeeper(connectstring, sessiontimeout, new watcher() {
            @override
            public void process(watchedevent watchedevent) {
            //  connectlatch  如果正常连接zk  可以释放
                if (watchedevent.getstate()==event.keeperstate.syncconnected){
                    countdownlatch.countdown();
                }
                //检测到删除节点并且是前一个节点则释放waitlatch
                if (watchedevent.gettype()==event.eventtype.nodedeleted && watchedevent.getpath().equals(waitpath))
                {
                waitlach.countdown();
                }
            }
        });
      //等待是否正常连接  正常(已)连接会释放  否则阻塞
      countdownlatch.await();
        // 判断是否存在lock锁
        stat stat = zk.exists("/locks", false);
        if (stat==null)
        {
            //创建该节点
            string s = zk.create("/locks", "locks".getbytes(), zoodefs.ids.open_acl_unsafe, createmode.container);
        }
    }
    //对zk加锁
    public void zklock()  {
        //创建临时的带序号的节点
        try {
            currentmodu = zk.create("/locks/" + "seq-", null, zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential);
            list<string> children = zk.getchildren("/locks", false);
             //如果只有一个节点   则直接获取
            if(children.size()==1)
            {
                return;
            }
            else {
                //排序
                collections.sort(children);
                //直接从s后边开始   开始的下标就是length的长度
                string substring = currentmodu.substring("/locks/".length());
                //通过substring来获取在list集合中的下标位置
                int index = children.indexof(substring);
                if (index==-1)
                {
                    system.out.println("数据异常");
                }
                else if (index==0)
                {
                    return;
                }
                else {
                    //  需要监听上一个节点
                    waitpath="/locks/"+children.get(index-1);
                    zk.getdata(waitpath,true,new stat());
                    //等待监听
                    waitlach.await();
                    return;
                }
            }
        } catch (keeperexception e) {
            e.printstacktrace();
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
        //判断创建的节点是否是最小序号的节点 如果是则获取锁  不是则监听他的前一个节点
    }
    //对zk解锁
    public void unzklock()
    {
//删除节点
        try {
            //-1  是版本号
            zk.delete(this.currentmodu,-1);
        } catch (interruptedexception  | keeperexception e) {
            e.printstacktrace();
        }
    }
}

测试类代码 :

package com.tangxiaocong.case2;
import org.apache.zookeeper.keeperexception;
import java.io.ioexception;
/**
 * @date 2023/8/12 22:31
 * @author 唐晓聪
 */
public class distributedlocktest
{
    public static void main(string[] args) throws ioexception, interruptedexception, keeperexception {
        //创建两个客户端对象
        final    distributedlock lock1 = new distributedlock();
        final   distributedlock lock2 = new distributedlock();
        new thread(new runnable() {
            @override
            public void run() {
                try {  lock1.zklock();
                system.out.println("线程1启动获得锁");
                    thread.sleep(5*1000);
                    lock1.unzklock();
                    system.out.println("线程1释放锁");
                } catch (exception e) {
                    e.printstacktrace();
                }
            }
        }).start();
        new thread(new runnable() {
            @override
            public void run() {
                try {
                    lock2.zklock();
                system.out.println("线程2启动获得锁");

                    thread.sleep(5*1000);
                    lock2.unzklock();
                    system.out.println("线程2释放锁");
                } catch (exception e) {
                    e.printstacktrace();
                }
            }
        }).start();
    }
}

curator 框架实现分布式锁案例:

该案例是直接使用api进行实现分布式锁

实现步骤:

创建分布式锁对象,new interprocessmutex(),参数1为所要连接的客户端,参数2为监听路径

参数1传入的为getcuratorframework()自定义函数,

该函数通过工厂类的方式进行建立连接,返回创建好的客户端,让后start启动客户端

创建完分布式锁对象后创建两个线程,在线程中进行获得锁,释放锁的操作。

代码如下:

package com.tangxiaocong.case3;
import org.apache.curator.framework.curatorframework;
import org.apache.curator.framework.curatorframeworkfactory;
import org.apache.curator.framework.recipes.locks.interprocessmutex;
import org.apache.curator.retry.exponentialbackoffretry;
/**
 * @date 2023/8/13 20:07
 * @author 
 */
public class curatorlocktest {
    public static void main(string[] args) {
        //创建分布式锁1
        //参数1   所连接的客户端 参数2 监听路径
        interprocessmutex lock1 = new interprocessmutex(getcuratorframework(), "/locks");
        //创建分布式锁2
        interprocessmutex lock2 = new interprocessmutex(getcuratorframework(), "/locks");
    //创建线程
        new thread(new runnable() {
            @override
            public void run() {
                try {
                    lock1.acquire();
                    system.out.println("thread 1 acquire lock");
               lock1.acquire();
                    system.out.println("thread 1 again acquire lock");
                thread.sleep(5*1000);
                lock1.release();
                    system.out.println("thread 1 relax lock");
                    lock1.release();
                    system.out.println("thread 1 again relax lock");
                    system.out.println();
                } catch (exception e) {
                    e.printstacktrace();
                }
            }
        }).start();
        new thread(new runnable() {
            @override
            public void run() {
                try {
                    lock2.acquire();
                    system.out.println("thread 2 acquire lock");
                    lock2.acquire();
                    system.out.println("thread 2 again acquire lock");
                    thread.sleep(5*1000);
                    lock2.release();
                    system.out.println("thread 2 relax lock");
                    lock2.release();
                    system.out.println("thread 2 again relax lock");
                } catch (exception e) {
                    e.printstacktrace();
                }
            }
        }).start();
    }
    private static curatorframework getcuratorframework() {
        exponentialbackoffretry policy = new exponentialbackoffretry(3000, 3);
        //通过工厂类的方式进行建立连接
        curatorframework client = curatorframeworkfactory.builder().connectstring("hadoop102:2181,hadoop102:2181,hadoop104:2181")
                .connectiontimeoutms(2000)
                .sessiontimeoutms(2000)
                .retrypolicy(policy)//连接失败后  间隔多少秒下次间隔
                .build();
        client.start();
        system.out.println("zookeeper  success start  !!!!!");
        return client;
    }
}

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com