当前位置: 代码网 > 服务器>软件设计>开源 > 常见分布式锁4:zookeeper 瞬时znode节点 + watcher监听机制,ChatGPT回复的解决死锁的方案

常见分布式锁4:zookeeper 瞬时znode节点 + watcher监听机制,ChatGPT回复的解决死锁的方案

2024年08月02日 开源 我要评论
zookeeper 瞬时znode节点 + watcher监听机制

原文地址在这里

临时节点具备数据自动删除的功能。当client与zookeeper连接和session断掉时,相应的临时节点就会被删除。zk有瞬时和持久节点,瞬时节点不可以有子节点。会话结束之后瞬时节点就会消失,基于zk的瞬时有序节点实现分布式锁:

多线程并发创建瞬时节点的时候,得到有序的序列,序号最小的线程可以获得锁;

其他的线程监听自己序号的前一个序号。前一个线程执行结束之后删除自己序号的节点;

下一个序号的线程得到通知,继续执行;

以此类推,创建节点的时候,就确认了线程执行的顺序。

<dependency>
  <groupid>org.apache.zookeeper</groupid>
  <artifactid>zookeeper</artifactid>
  <version>3.4.14</version>
  <exclusions>
    <exclusion>
      <groupid>org.slf4j</groupid>
      <artifactid>slf4j-log4j12</artifactid>
    </exclusion>
  </exclusions>
</dependency>

zk 的观察器只可以监控一次,数据发生变化之后可以发送给客户端,之后需要再次设置监控。exists、create、getchildren三个方法都可以添加watcher ,也就是在调用方法的时候传递true就是添加监听。注意这里lock 实现了watcher和autocloseable:

当前线程创建的节点是第一个节点就获得锁,否则就监听自己的前一个节点的事件:

/**
 * 自己本身就是一个 watcher,可以得到通知
 * autocloseable 实现自动关闭,资源不使用的时候
 */
@slf4j
public class zklock implements autocloseable, watcher {
​
    private zookeeper zookeeper;
​
    /**
     * 记录当前锁的名字
     */
    private string znode;
​
    public zklock() throws ioexception {
        this.zookeeper = new zookeeper("localhost:2181",
                10000,this);
    }
​
    public boolean getlock(string businesscode) {
        try {
            //创建业务 根节点
            stat stat = zookeeper.exists("/" + businesscode, false);
            if (stat==null){
                zookeeper.create("/" + businesscode,businesscode.getbytes(),
                        zoodefs.ids.open_acl_unsafe,
                        createmode.persistent);
            }
​
            //创建瞬时有序节点  /order/order_00000001
            znode = zookeeper.create("/" + businesscode + "/" + businesscode + "_", businesscode.getbytes(),
                    zoodefs.ids.open_acl_unsafe,
                    createmode.ephemeral_sequential);
​
            //获取业务节点下 所有的子节点
            list<string> childrennodes = zookeeper.getchildren("/" + businesscode, false);
            //获取序号最小的(第一个)子节点
            collections.sort(childrennodes);
            string firstnode = childrennodes.get(0);
            //如果创建的节点是第一个子节点,则获得锁
            if (znode.endswith(firstnode)){
                return true;
            }
            //如果不是第一个子节点,则监听前一个节点
            string lastnode = firstnode;
            for (string node:childrennodes){
                if (znode.endswith(node)){
                    zookeeper.exists("/"+businesscode+"/"+lastnode,true);
                    break;
                }else {
                    lastnode = node;
                }
            }
            synchronized (this){
                wait();
            }
            return true;
        } catch (exception e) {
            e.printstacktrace();
        }
        return false;
    }
​
    @override
    public void close() throws exception {
        zookeeper.delete(znode,-1);
        zookeeper.close();
        log.info("我已经释放了锁!");
    }
​
    @override
    public void process(watchedevent event) {
        if (event.gettype() == event.eventtype.nodedeleted){
            synchronized (this){
                notify();
            }
        }
    }
}

这段代码实现了一个基于 zookeeper 的分布式锁,以下是它的实现步骤:

  1. 首先创建 zookeeper 客户端,并实现 watcher 接口,在自身上注册监听器。

  2. 在 zookeeper 上创建一个业务根节点,例如 /businesscode,表示该业务下所有的分布式锁。

  3. 使用 zookeeper 的临时有序节点创建子节点,例如 /businesscode/businesscode_00001,表示当前节点占用了分布式锁,并记录在 znode 中。

  4. 获取业务节点下所有的子节点,并按节点名称排序。如果当前节点的名称是所有子节点中最小的,则获取分布式锁。

  5. 如果当前节点不是所有子节点中最小的,则监听前一个子节点的删除事件,等待前一个子节点释放锁。

  6. 当前一个子节点被删除时,重新执行第四步,即再次检查当前节点是否为所有子节点中最小的。

  7. 如果当前节点成功获取到锁,则执行业务操作;否则等待锁释放,重新获取锁。

  8. 业务操作完成后,执行 close() 方法释放锁,删除当前节点。

总体来说,这段代码实现了一种基本的分布式锁机制,通过 zookeeper 的临时有序节点和 watcher 监听机制来实现。需要注意的是,此实现方式还可能存在死锁问题,当持有锁的节点出现网络故障或宕机时,会导致整个分布式锁失效。因此,在实际应用中,还需要综合考虑各种异常情况,确保分布式锁的正确性和高可用性。

 针对死锁chatgpt3.5的解决方案:

那段代码中存在潜在的死锁问题,即如果持有锁的节点出现异常,可能导致整个分布式锁失效。为了解决这个问题,可以考虑以下两种方案:

  1. 引入超时机制

在获取锁的过程中,可以引入超时机制,例如在等待前一个节点释放锁的过程中,设定一个超时时间,如果超过了这个时间仍未收到通知,则当前节点主动删除自己的临时节点,释放锁,并尝试重新获取锁。

  1. 心跳检测机制

对于已经获取到分布式锁的节点,可以在一定时间间隔内向 zookeeper 发送心跳消息,以检测自身是否还处于活跃状态。如果该节点长时间没有发送心跳消息或者 zookeeper 无法接收到该节点的心跳消息,就认为该节点出现了异常,需要主动删除自身的临时节点,释放锁,并尝试重新获取锁。

在前面提到的两种方案中,我选择了第一种方案,即引入超时机制来解决死锁问题。具体实现方式如下:

在获取锁的过程中,我们可以增加一个等待时间的参数 waittime,表示等待前一个节点释放锁的最长时间。在设置监听器之后,使用 countdownlatch 等待前一个节点的删除事件通知,当出现以下情况之一时,即可跳出等待循环:

  1. 在等待时间内成功获取到锁,即当前节点为业务节点下所有临时节点中的最小节点。

  2. 在等待时间内等待到超时时间,说明前一节点无法正常释放锁,因此当前节点放弃等待,主动删除自己的临时节点,重新尝试获取锁。

修改后的代码如下所示:

public class distributedlock implements autocloseable {

    private static final string root_node = "/businesscode"; // 业务根节点
    private final zookeeper zkclient; // zookeeper 客户端
    private final string lockname; // 分布式锁名称
    private string currentlocknodepath; // 当前节点的 znode 路径

    public distributedlock(string zkservers, string lockname) throws ioexception {
        this.zkclient = new zookeeper(zkservers, 3000, event -> {});
        this.lockname = lockname;
        ensurerootnodeexists();
    }

    // 确保业务根节点存在
    private void ensurerootnodeexists() {
        try {
            if (zkclient.exists(root_node, false) == null) {
                zkclient.create(root_node, null, zoodefs.ids.open_acl_unsafe, createmode.persistent);
            }
        } catch (exception e) {
            throw new runtimeexception("failed to ensure root node exists: " + root_node, e);
        }
    }

    // 获取锁
    public void lock(long waittime) {
        try {
            // 创建当前节点
            currentlocknodepath = zkclient.create(
                    root_node + "/" + lockname + "_",
                    null,
                    zoodefs.ids.open_acl_unsafe,
                    createmode.ephemeral_sequential);

            // 等待前一个节点释放锁
            list<string> children = zkclient.getchildren(root_node, false);
            string[] sortednodes = children.stream().sorted().toarray(string[]::new);
            int currentnodeindex = arrays.binarysearch(sortednodes, currentlocknodepath.substring(currentlocknodepath.lastindexof("/") + 1));
            if (currentnodeindex == 0) {
                // 当前节点为业务节点下所有临时节点中的最小节点,获取锁
                return;
            }

            // 等待前一个节点的删除事件通知,如果超时仍未收到,则主动删除自己的临时节点
            countdownlatch latch = new countdownlatch(1);
            string previousnodepath = root_node + "/" + sortednodes[currentnodeindex - 1];
            stat stat = zkclient.exists(previousnodepath, event -> {
                if (event.gettype() == watcher.event.eventtype.nodedeleted) {
                    latch.countdown();
                }
            });
            if (stat != null) {
                if (latch.await(waittime, timeunit.milliseconds)) {
                    // 等待成功,重新执行获取锁操作
                    lock(waittime);
                } else {
                    // 超时未等到通知,主动删除自己的临时节点,放弃获取锁
                    zkclient.delete(currentlocknodepath, -1);
                    throw new runtimeexception("failed to acquire lock within wait time: " + waittime);
                }
            }
        } catch (exception e) {
            throw new runtimeexception("failed to acquire lock: " + lockname, e);
        }
    }

    // 释放锁
    @override
    public void close() {
        try {
            system.out.println("releasing lock: " + lockname);
            zkclient.delete(currentlocknodepath, -1);
            zkclient.close();
        } catch (exception e) {
            throw new runtimeexception("failed to release lock: " + lockname, e);
        }
    }
}

lock() 方法中增加了一个参数 waittime,表示等待前一个节点释放锁的最长时间。在方法最后,如果超时未收到通知,则主动删除当前节点,放弃获取锁,并抛出异常提示用户。

需要注意的是,在使用该类时,需要确保所有节点设置的 waittime 值是一致的。另外,为了让线程能够跳出等待循环,应该每隔一段时间检查一下当前的线程状态,例如通过 thread.currentthread().isinterrupted() 判断线程是否被打断,如果被打断则立即跳出循环。

下面是使用心跳检测机制的代码实例:

public class distributedlock implements autocloseable {

    private static final string root_node = "/businesscode"; // 业务根节点
    private static final long heartbeat_rate = 1000l; // 心跳检测频率,单位毫秒
    private final zookeeper zkclient; // zookeeper 客户端
    private final string lockname; // 分布式锁名称
    private volatile boolean locked = false; // 是否已经获取到了锁
    private thread heartbeatthread; // 心跳线程
    private string currentlocknodepath; // 当前节点的 znode 路径

    public distributedlock(string zkservers, string lockname) throws ioexception {
        this.zkclient = new zookeeper(zkservers, 3000, event -> {});
        this.lockname = lockname;
        ensurerootnodeexists();
    }

    // 确保业务根节点存在
    private void ensurerootnodeexists() {
        try {
            if (zkclient.exists(root_node, false) == null) {
                zkclient.create(root_node, null, zoodefs.ids.open_acl_unsafe, createmode.persistent);
            }
        } catch (exception e) {
            throw new runtimeexception("failed to ensure root node exists: " + root_node, e);
        }
    }

    // 获取锁
    public void lock() {
        try {
            // 创建当前节点
            currentlocknodepath = zkclient.create(
                    root_node + "/" + lockname + "_",
                    null,
                    zoodefs.ids.open_acl_unsafe,
                    createmode.ephemeral_sequential);

            // 等待前一个节点释放锁
            while (!locked) {
                list<string> children = zkclient.getchildren(root_node, false);
                string[] sortednodes = children.stream().sorted().toarray(string[]::new);
                int currentnodeindex = arrays.binarysearch(sortednodes, currentlocknodepath.substring(currentlocknodepath.lastindexof("/") + 1));
                if (currentnodeindex == 0) {
                    // 当前节点为业务节点下所有临时节点中的最小节点,获取锁
                    locked = true;
                    startheartbeatthread(); // 启动心跳线程
                    break;
                }

                // 等待前一个节点的删除事件通知
                string previousnodepath = root_node + "/" + sortednodes[currentnodeindex - 1];
                countdownlatch latch = new countdownlatch(1);
                stat stat = zkclient.exists(previousnodepath, event -> {
                    if (event.gettype() == watcher.event.eventtype.nodedeleted) {
                        latch.countdown();
                    }
                });
                if (stat != null) {
                    latch.await();
                }
            }
        } catch (exception e) {
            throw new runtimeexception("failed to acquire lock: " + lockname, e);
        }
    }

    // 释放锁
    @override
    public void close() {
        try {
            system.out.println("releasing lock: " + lockname);
            zkclient.delete(currentlocknodepath, -1);
            stopheartbeatthread(); // 停止心跳线程
            zkclient.close();
        } catch (exception e) {
            throw new runtimeexception("failed to release lock: " + lockname, e);
        }
    }

    // 启动心跳线程
    private void startheartbeatthread() {
        heartbeatthread = new thread(() -> {
            while (true) {
                try {
                    zkclient.setdata(currentlocknodepath, null, -1);
                    thread.sleep(heartbeat_rate);
                } catch (exception e) {
                    system.out.println("failed to send heartbeat signal: " + lockname);
                }
            }
        });
        heartbeatthread.start();
    }

    // 停止心跳线程
    private void stopheartbeatthread() {
        if (heartbeatthread != null) {
            heartbeatthread.interrupt();
            heartbeatthread = null;
        }
    }
}

在该代码实例中,我们创建了一个布尔型变量 locked,表示当前是否已经获取到了锁。在获取锁的过程中,如果当前节点为业务节点下所有临时节点中的最小节点,则设置 lockedtrue,同时启动心跳线程。心跳线程每隔一段时间就向 zookeeper 发送一次空数据以保持会话,从而保证自己的临时节点不会过期。

在释放锁的过程中,停止心跳线程即可。需要注意的是,心跳线程的 while 循环不能被打断,因为一旦被打断,线程就会退出,从而不再发送心跳信号,导致临时节点过期。因此,在捕获 interruptedexception 异常时只是简单地输出日志,并继续下一轮循环。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com