目录
先在集群上创建/servers 节点(用于存储连接的服务器的主机和该服务器的节点数)相当于zookeeper集群
案例一:服务器动态上下线
服务端:
(1)先获取zookeeper连接
创建类对象
该类为我们创建的服务端类:
distributeserver server = new distributeserver();
获取zookeeper连接:
自己创建连接方法:
private void getconnect() throws ioexception {
zk = new zookeeper(connectstring, sessiontimeout, new watcher() {
@override
public void process(watchedevent watchedevent) {
}
});
}
让后server对象在main函数中调用
(2)注册服务器到zookeeper集群:
注册是需要注册到zookeeper集群的/servers路径下,需要指定参数进行创建
private void regestserver(string hostname) throws interruptedexception, keeperexception {
zk.create(parentnode+"/"+hostname,hostname.getbytes(), zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential);
// 需要创建有序的临时节点所以-e(暂时) -s(有序)
system.out.println("服务器"+hostname+"已注册连接");
}
(3)业务逻辑(睡眠):
private void business() throws interruptedexception {
thread.sleep(long.max_value);
}
服务端代码如下:
package com.tangxiaocong.case1;
import org.apache.zookeeper.*;
import java.io.ioexception;
/**
* @date 2023/8/10 19:06
* @author
*/
public class distributeserver {
private static string connectstring="hadoop102:2181,hadoop103:2181,hadoop104:2181";
private static int sessiontimeout=2000;
private zookeeper zk =null;
private string parentnode = "/servers";
public static void main(string[] args) throws ioexception, interruptedexception, keeperexception {
//获取zk连接
//创建
distributeserver server = new distributeserver();
server.getconnect();
//注册服务器到zk集群
//注册是需要在/servers节点下创建所开启的服务器的路径
server.regestserver(args[0]);
//业务逻辑(实际是延时让它睡觉---不然会注册完成就关闭)
server.business();
}
private void business() throws interruptedexception {
thread.sleep(long.max_value);
}
private void regestserver(string hostname) throws interruptedexception, keeperexception {
zk.create(parentnode+"/"+hostname,hostname.getbytes(), zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential);
// 需要创建有序的临时节点所以-e(暂时) -s(有序)
system.out.println("服务器"+hostname+"已注册连接");
}
private void getconnect() throws ioexception {
zk = new zookeeper(connectstring, sessiontimeout, new watcher() {
@override
public void process(watchedevent watchedevent) {
}
});
}
}
客户端:
(1)获取zookeeper的连接:
先创建客户端对象,在进行构建获取zookeeper连接的方法,本方法对process方法进行了重写,填写了再发生上下线的运行逻辑
private void getconnect() throws ioexception {
zk= new zookeeper(connectstring, sessiontimeout, new watcher() {
@override
public void process(watchedevent watchedevent) {
try {
getserverlist();
} catch (interruptedexception e) {
e.printstacktrace();
} catch (keeperexception e) {
e.printstacktrace();
}
}
});
}
(2)监听/servers下边的子节点的增减:
构建方法client.getserverlist()来进行监听:
代码逻辑就是通过getchildren()方法获取指定目录下的所有子目录并开启监听
再进行遍历,把遍历结果封装到一个集合中,最后进行输出
private void getserverlist() throws interruptedexception, keeperexception {
list<string> children = zk.getchildren("/servers", true);
//该方法会获取指定路径下的所有子节点
//true 会走初始化中的watch 也可以自己创建watch
//把所有的服务器都封装到一个集合
arraylist<string> list = new arraylist<>();
for (string child : children) {
byte[] data = zk.getdata("/servers" +"/"+ child, false, null);
//上边已经便利到一个服务器对象,再进行添加
list.add(new string(data));
}
system.out.println(list);
}
(3)业务逻辑同服务端不在赘述。
客户端代码如下:
package com.tangxiaocong.case1;
import org.apache.zookeeper.keeperexception;
import org.apache.zookeeper.watchedevent;
import org.apache.zookeeper.watcher;
import org.apache.zookeeper.zookeeper;
import java.io.ioexception;
import java.util.arraylist;
import java.util.list;
/**
* @date 2023/8/10 21:27
* @author
* 客户端的监听功能
*/
public class distributeclient {
private string connectstring="hadoop102:2181,hadoop103:2181,hadoop104:2181";
private int sessiontimeout=2000;
private zookeeper zk=null;
public static void main(string[] args) throws ioexception, interruptedexception, keeperexception {
//获取zk连接
distributeclient client = new distributeclient();
client.getconnect();
//监听/servers下边的子节点的增减
client.getserverlist();
//业务逻辑(睡眠)
client.business();
}
private void business() throws interruptedexception {
thread.sleep(long.max_value);
}
private void getserverlist() throws interruptedexception, keeperexception {
list<string> children = zk.getchildren("/servers", true);
//该方法会获取指定路径下的所有子节点
//true 会走初始化中的watch 也可以自己创建watch
//把所有的服务器都封装到一个集合
arraylist<string> list = new arraylist<>();
for (string child : children) {
byte[] data = zk.getdata("/servers" +"/"+ child, false, null);
//上边已经便利到一个服务器对象,再进行添加
list.add(new string(data));
}
system.out.println(list);
}
private void getconnect() throws ioexception {
zk= new zookeeper(connectstring, sessiontimeout, new watcher() {
@override
public void process(watchedevent watchedevent) {
try {
getserverlist();
} catch (interruptedexception e) {
e.printstacktrace();
} catch (keeperexception e) {
e.printstacktrace();
}
}
});
}
}
案例二:zookeeper 分布式锁
分布式锁是什么?
日常使用计算机的时候,我们的电脑不会只开一个进程,但是当“进程1”在访问某些资源的时候,不能被其他进程所访问,它就会去获得锁,把她所访问的资源进行锁上,对该资源进行独占。"进程 1"用完该资源以后就将锁释放掉,让其 他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的 访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
锁的实现:
构造函数:
在该类中首先要实现构造方法,构造方法与类名相同,在该方法中需要获取连接,重写process方法,在该方法中实现释放countdownlatch的类对象,有两种情况,正常连接释放一种,不是正常连接状态,则释放另一种。在构造方法中还要判断是否存在“/locks”路径,存在则正常退出,不存在则创建该路径。
加锁函数:
使用zookeeper对象进行创建节点(临时有序),让后获取“/locks”路径下的所有节点序号,对结果进行判断,如果返回的list集合只有一个节点,则直接返回,默认加锁,不用再做监听工作。如果不是只有一个节点,则对list集合进行排序,再获取他的节点名称,通过indexof函数来获取该名称节点的下标。如果为-1,则数据异常,为0 则为最小节点,则直接退出,进行加锁不需要设置监听,结果为其他则需要设置监听,先设置监听字符串,当状态不发生改变会一致阻塞,只有上锁节点让位后会调用process方法进行释放。
解锁函数:
解锁就是直接删除节点即可
整体代码:
package com.tangxiaocong.case2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.stat;
import java.io.ioexception;
import java.util.collections;
import java.util.list;
import java.util.concurrent.countdownlatch;
/**
* @date 2023/8/12 19:56
* @author
*/
public class distributedlock {
final private string connectstring="hadoop102:2181,hadoop103:2181,hadoop104:2181";
final private int sessiontimeout=2000;
final private zookeeper zk;
private string waitpath;
private string currentmodu;
//为了程序的健壮性,创建该对象 等待操作
final private countdownlatch waitlach=new countdownlatch(1);
final private countdownlatch countdownlatch=new countdownlatch(1);
public distributedlock() throws ioexception, interruptedexception, keeperexception {
//获取连接
zk = new zookeeper(connectstring, sessiontimeout, new watcher() {
@override
public void process(watchedevent watchedevent) {
// connectlatch 如果正常连接zk 可以释放
if (watchedevent.getstate()==event.keeperstate.syncconnected){
countdownlatch.countdown();
}
//检测到删除节点并且是前一个节点则释放waitlatch
if (watchedevent.gettype()==event.eventtype.nodedeleted && watchedevent.getpath().equals(waitpath))
{
waitlach.countdown();
}
}
});
//等待是否正常连接 正常(已)连接会释放 否则阻塞
countdownlatch.await();
// 判断是否存在lock锁
stat stat = zk.exists("/locks", false);
if (stat==null)
{
//创建该节点
string s = zk.create("/locks", "locks".getbytes(), zoodefs.ids.open_acl_unsafe, createmode.container);
}
}
//对zk加锁
public void zklock() {
//创建临时的带序号的节点
try {
currentmodu = zk.create("/locks/" + "seq-", null, zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential);
list<string> children = zk.getchildren("/locks", false);
//如果只有一个节点 则直接获取
if(children.size()==1)
{
return;
}
else {
//排序
collections.sort(children);
//直接从s后边开始 开始的下标就是length的长度
string substring = currentmodu.substring("/locks/".length());
//通过substring来获取在list集合中的下标位置
int index = children.indexof(substring);
if (index==-1)
{
system.out.println("数据异常");
}
else if (index==0)
{
return;
}
else {
// 需要监听上一个节点
waitpath="/locks/"+children.get(index-1);
zk.getdata(waitpath,true,new stat());
//等待监听
waitlach.await();
return;
}
}
} catch (keeperexception e) {
e.printstacktrace();
} catch (interruptedexception e) {
e.printstacktrace();
}
//判断创建的节点是否是最小序号的节点 如果是则获取锁 不是则监听他的前一个节点
}
//对zk解锁
public void unzklock()
{
//删除节点
try {
//-1 是版本号
zk.delete(this.currentmodu,-1);
} catch (interruptedexception | keeperexception e) {
e.printstacktrace();
}
}
}
测试类代码 :
package com.tangxiaocong.case2;
import org.apache.zookeeper.keeperexception;
import java.io.ioexception;
/**
* @date 2023/8/12 22:31
* @author 唐晓聪
*/
public class distributedlocktest
{
public static void main(string[] args) throws ioexception, interruptedexception, keeperexception {
//创建两个客户端对象
final distributedlock lock1 = new distributedlock();
final distributedlock lock2 = new distributedlock();
new thread(new runnable() {
@override
public void run() {
try { lock1.zklock();
system.out.println("线程1启动获得锁");
thread.sleep(5*1000);
lock1.unzklock();
system.out.println("线程1释放锁");
} catch (exception e) {
e.printstacktrace();
}
}
}).start();
new thread(new runnable() {
@override
public void run() {
try {
lock2.zklock();
system.out.println("线程2启动获得锁");
thread.sleep(5*1000);
lock2.unzklock();
system.out.println("线程2释放锁");
} catch (exception e) {
e.printstacktrace();
}
}
}).start();
}
}
curator 框架实现分布式锁案例:
该案例是直接使用api进行实现分布式锁
实现步骤:
先创建分布式锁对象,new interprocessmutex(),参数1为所要连接的客户端,参数2为监听路径
参数1传入的为getcuratorframework()自定义函数,
该函数通过工厂类的方式进行建立连接,返回创建好的客户端,让后start启动客户端
创建完分布式锁对象后创建两个线程,在线程中进行获得锁,释放锁的操作。
代码如下:
package com.tangxiaocong.case3;
import org.apache.curator.framework.curatorframework;
import org.apache.curator.framework.curatorframeworkfactory;
import org.apache.curator.framework.recipes.locks.interprocessmutex;
import org.apache.curator.retry.exponentialbackoffretry;
/**
* @date 2023/8/13 20:07
* @author
*/
public class curatorlocktest {
public static void main(string[] args) {
//创建分布式锁1
//参数1 所连接的客户端 参数2 监听路径
interprocessmutex lock1 = new interprocessmutex(getcuratorframework(), "/locks");
//创建分布式锁2
interprocessmutex lock2 = new interprocessmutex(getcuratorframework(), "/locks");
//创建线程
new thread(new runnable() {
@override
public void run() {
try {
lock1.acquire();
system.out.println("thread 1 acquire lock");
lock1.acquire();
system.out.println("thread 1 again acquire lock");
thread.sleep(5*1000);
lock1.release();
system.out.println("thread 1 relax lock");
lock1.release();
system.out.println("thread 1 again relax lock");
system.out.println();
} catch (exception e) {
e.printstacktrace();
}
}
}).start();
new thread(new runnable() {
@override
public void run() {
try {
lock2.acquire();
system.out.println("thread 2 acquire lock");
lock2.acquire();
system.out.println("thread 2 again acquire lock");
thread.sleep(5*1000);
lock2.release();
system.out.println("thread 2 relax lock");
lock2.release();
system.out.println("thread 2 again relax lock");
} catch (exception e) {
e.printstacktrace();
}
}
}).start();
}
private static curatorframework getcuratorframework() {
exponentialbackoffretry policy = new exponentialbackoffretry(3000, 3);
//通过工厂类的方式进行建立连接
curatorframework client = curatorframeworkfactory.builder().connectstring("hadoop102:2181,hadoop102:2181,hadoop104:2181")
.connectiontimeoutms(2000)
.sessiontimeoutms(2000)
.retrypolicy(policy)//连接失败后 间隔多少秒下次间隔
.build();
client.start();
system.out.println("zookeeper success start !!!!!");
return client;
}
}
发表评论