一、zookeeper学习要点
zookeeper是一个基于观察者模式设计的分布式管理框架,为分布式框架提供协调服务的 apache 项目。
1、zookeeper工作机制
2、zookeeper特点
3、应用场景
4、选取机制(假设共5台服务器)
5、客户端向服务器端写数据的流程
二、api编写
6、服务器动态上下线监听案例
import org.apache.zookeeper.*;
import java.io.ioexception;
public class distributeserver {
private string connectstring = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private int sessiontimeout=2000;
private zookeeper zk;
public static void main(string[] args) throws ioexception, interruptedexception, keeperexception {
//服务端在zookeeper上进行注册
//1、获取zk链接
//用distributeserver类对整体进行封装
distributeserver server = new distributeserver();
server.getconnect();
//2、利用zk链接注册服务器信息(创建节点)
server.registserver(args[0]);//args[0] 手动传 实参
//3、启动业务功能(目前没有功能,我们们执行sleep()睡觉)
server.business();
}
//业务功能
private void business() throws interruptedexception {
thread.sleep(long.max_value);
}
//在zk注册服务器(就是在父节点下面创建新的子节点)
private void registserver(string hostname) throws interruptedexception, keeperexception {
string create = zk.create("/servers/"+hostname, hostname.getbytes(), zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential);
//创建完之后提示 已经上线
system.out.println(hostname + " is online.");
}
//链接zk集群
private void getconnect() throws ioexception {
//把zk做全局变狼
zk = new zookeeper(connectstring, sessiontimeout, new watcher() {
@override
public void process(watchedevent watchedevent) {
}
});
}
}//distributeserver
import org.apache.zookeeper.keeperexception;
import org.apache.zookeeper.watchedevent;
import org.apache.zookeeper.watcher;
import org.apache.zookeeper.zookeeper;
import java.io.ioexception;
import java.util.arraylist;
import java.util.list;
//客户端监听服务器的上下线
public class distributecliient {
private string connectstring="hadoop102:2181,hadoop103:2181,hadoop104:2181";
private int sessiontimeout=2000;
private zookeeper zk;
public static void main(string[] args) throws ioexception, interruptedexception, keeperexception {
distributecliient client = new distributecliient();
//1、获取zk链接
client.getconnect();
//2、获取servers的子节点信息。从中得到服务器信息的列表
client.getserverslist();
//3、业务进程启动
client.business();
}
//业务进程
private void business() throws interruptedexception {
thread.sleep(long.max_value);
}
//获取服务器列表的信息
private void getserverslist() throws interruptedexception, keeperexception {
//1获取服务器节点信息,对父节点(指定的节点"/servers",需要先在zk集群上创建该父节点)进行监听
list<string> children = zk.getchildren("/servers", true);//监听
//2存储服务器的列表信息
arraylist<object> servers = new arraylist<>();
//3遍历所有节点,获取节点中的主机名称信息
for (string child : children) {
//不在监听,已经监听父节点,不返回状态信息
byte[] data = zk.getdata("/servers/" + child, false, null);
//添加到列表中,循环添加
servers.add(new string(data)); //注意类型转换
}
//4打印列表
system.out.println(servers);
}
//链接zk集群
private void getconnect() throws ioexception {
zk = new zookeeper(connectstring, sessiontimeout, new watcher() {
//多次监听
@override
public void process(watchedevent watchedevent) {
try {
getserverslist();
} catch (interruptedexception e) {
throw new runtimeexception(e);
} catch (keeperexception e) {
throw new runtimeexception(e);
}
}
});
}
}
7、zookeeper 分布式锁案例 (java api实现)
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.stat;
import java.io.ioexception;
import java.util.collection;
import java.util.collections;
import java.util.list;
import java.util.concurrent.countdownlatch;
public class distrabutelocks {
private final string connectstring="hadoop102:2181,hadoop103:2181,hadoop104:2181";
private final int sessiontimeout=2000;
private final zookeeper zk;
//增加代码的健壮性
private countdownlatch connectlatch = new countdownlatch(1);
private countdownlatch waitlatch =new countdownlatch(1);
private string lastnodepath; //当前节点的前一个节点的路径
private string currentnode;
//1、获取链接,连接上zk
public distrabutelocks() throws ioexception, interruptedexception, keeperexception {
//1)获取链接
zk = new zookeeper(connectstring, sessiontimeout, new watcher() {
@override
public void process(watchedevent watchedevent) {
//执行监听操作
//connectlatch如果连接上zk,需要释放,执行下面的操作,进行判断
//如果监听的状态是 链接 ,就释放
if (watchedevent.getstate() == event.keeperstate.syncconnected){
connectlatch.countdown();//释放
}
//waitlatch 监听到前一个节点删除, 需要释放
//监听到事件 节点删除 && 是前一个节点
if (watchedevent.gettype() == event.eventtype.nodedeleted && watchedevent.getpath().equals(lastnodepath))
waitlatch.countdown();//释放
}
});
//2)等待连接上zk,再进行后续操作,相当于阻塞,等待前序进程完成
connectlatch.await();
//3)判断根节点"/locks"是否存在,不需监听,获取到status(状态)
stat stat = zk.exists("/locks", false);
if (stat == null){
//需要创建根节点
zk.create("/locks", "locks".getbytes(), zoodefs.ids.open_acl_unsafe, createmode.persistent);
}
}
//2、对zk加锁
public void zklock(){
try {
//创建临时带序号的节点
currentnode = zk.create("/locks/seq-", null, zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential);
//判断创建的节点是不是最小序号的节点,如果是获取到锁,如果不是需要监听当前序号的前一个节点
list<string> children = zk.getchildren("/locks", false);
//如果"/locks"下只有一个节点,直接获取锁,如果多个节点,找出最小的序号获取锁
if (children.size() == 1)
return;//直接返回就是,获取锁
else {
//先排序
collections.sort(children);
//获取当前节点currentnode的定位是第几个,先获取当前节点的名称
string thisnodename = currentnode.substring("/locks/".length());
int index = children.indexof(thisnodename);
//判断thisnodename是哪个位置
if (index == -1 ){
system.out.println("数据异常~~~~");
}else if (index == 0) {
return;//只有一个数据,就是当前节点获取锁
}else {
//不是第一个节点,需要监听前一个节点的状态
//前一个节点的路径
lastnodepath = "/locks/"+ children.get(index -1);
//获取前一个节点的数据变化,达到监听的效果
zk.getdata(lastnodepath,true,null);
//增加代码的健壮性,等待监听
waitlatch.await();
//接听结束
return;//获取锁
}
}
} catch (keeperexception e) {
throw new runtimeexception(e);
} catch (interruptedexception e) {
throw new runtimeexception(e);
}
}
//3、解锁
public void unzklock(){
//删除节点
try {
zk.delete(currentnode,-1);
} catch (interruptedexception e) {
throw new runtimeexception(e);
} catch (keeperexception e) {
throw new runtimeexception(e);
}
}
}
2)分布式锁测试
import org.apache.zookeeper.keeperexception;
import java.io.ioexception;
//对distrabutelocks进行测试,创建两个节点,查看运行状态
public class lockstest {
public static void main(string[] args) throws ioexception, interruptedexception, keeperexception {
//先创建需要测试的distrabutelocks 类
final distrabutelocks locks1 = new distrabutelocks();
final distrabutelocks locks2 = new distrabutelocks();
final distrabutelocks locks3 = new distrabutelocks();
//线程1
new thread(new runnable() {
@override
public void run() {
try {
//加锁(创建节点)
locks1.zklock();
system.out.println("线程1 启动~~,获取到锁。");
//增加延时5s
thread.sleep(5*1000);
//延时过后释放锁
locks1.unzklock();
system.out.println("线程1 结束~~,释放锁。");
} catch (interruptedexception e) {
throw new runtimeexception(e);
}
}
}).start();
//线程2
new thread(new runnable() {
@override
public void run() {
try {
//加锁(创建节点)
locks2.zklock();
system.out.println("线程2 启动~~,获取到锁。");
//增加延时5s
thread.sleep(5*1000);
//延时过后释放锁
locks2.unzklock();
system.out.println("线程2 结束~~,释放锁。");
} catch (interruptedexception e) {
throw new runtimeexception(e);
}
}
}).start();
//线程3
new thread(new runnable() {
@override
public void run() {
try {
//加锁(创建节点)
locks3.zklock();
system.out.println("线程3 启动~~,获取到锁。");
//增加延时5s
thread.sleep(5*1000);
//延时过后释放锁
locks3.unzklock();
system.out.println("线程3 结束~~,释放锁。");
} catch (interruptedexception e) {
throw new runtimeexception(e);
}
}
}).start();
}//main
}
8、curator 框架实现分布式锁案例
import org.apache.curator.framework.curatorframework;
import org.apache.curator.framework.curatorframeworkfactory;
import org.apache.curator.framework.recipes.locks.interprocessmutex;
import org.apache.curator.retry.boundedexponentialbackoffretry;
import org.apache.curator.retry.exponentialbackoffretry;
public class curatorlocktest {
private static string nodepath= "/locks";
private static string connectadress="hadoop102:2181,hadoop103:2181,hadoop104:2181";
public static void main(string[] args) {
//1、创建分布式锁
//使用curator的api进行编写
//创建分布式锁1,使用getcuratorclient()方法创建客户端
interprocessmutex lock1 = new interprocessmutex(getcuratorclient(), nodepath);
//创建分布式锁2
interprocessmutex lock2 = new interprocessmutex(getcuratorclient(), nodepath);
//创建分布式锁3
interprocessmutex lock3 = new interprocessmutex(getcuratorclient(), nodepath);
//2、测试
//线程1
new thread(new runnable() {
@override
public void run() {
try {
lock1.acquire();//获取到锁
system.out.println("线程1获取到锁~~~");
lock1.acquire();//获取到锁
system.out.println("线程1再次获取到锁~~~");
thread.sleep(5*1000);//延时5s
lock1.release();//释放锁
system.out.println("线程1释放锁~~~");
lock1.release();//释放锁
system.out.println("线程1再次释放锁~~~");
} catch (exception e) {
throw new runtimeexception(e);
}
}
}).start();//启动线程
//线程2
new thread(new runnable() {
@override
public void run() {
try {
lock2.acquire();//获取到锁
system.out.println("线程2获取到锁~~~");
lock2.acquire();//获取到锁
system.out.println("线程2再次获取到锁~~~");
thread.sleep(5*1000);//延时5s
lock2.release();//释放锁
system.out.println("线程2释放锁~~~");
lock2.release();//释放锁
system.out.println("线程2再次释放锁~~~");
} catch (exception e) {
throw new runtimeexception(e);
}
}
}).start();//启动线程
//线程3
new thread(new runnable() {
@override
public void run() {
try {
lock3.acquire();//获取到锁
system.out.println("线程3获取到锁~~~");
lock3.acquire();//获取到锁
system.out.println("线程3再次获取到锁~~~");
thread.sleep(5*1000);//延时5s
lock3.release();//释放锁
system.out.println("线程3释放锁~~~");
lock3.release();//释放锁
system.out.println("线程3再次释放锁~~~");
} catch (exception e) {
throw new runtimeexception(e);
}
}
}).start();//启动线程
}
//编写获取curator客户端的方法
private static curatorframework getcuratorclient() {
exponentialbackoffretry policy = new exponentialbackoffretry(3000, 3);
//创建客户端
curatorframework client = curatorframeworkfactory.builder().connectstring(connectadress) //链接地址
.connectiontimeoutms(2000) //链接超时时间设置
.sessiontimeoutms(2000) //心跳时间设置
.retrypolicy(policy) //重新试链接次数
.build();//客户端创建
//启动客户端
client.start();
system.out.println("zookeeper 客户端启动成功!~~");
return client; //返回客户端
}
}
发表评论