常见分布式锁4:zookeeper 瞬时znode节点 + watcher监听机制,ChatGPT回复的解决死锁的方案

news2024/9/26 5:21:56

原文地址在这里

临时节点具备数据自动删除的功能。当client与ZooKeeper连接和session断掉时,相应的临时节点就会被删除。zk有瞬时和持久节点,瞬时节点不可以有子节点。会话结束之后瞬时节点就会消失,基于zk的瞬时有序节点实现分布式锁:

多线程并发创建瞬时节点的时候,得到有序的序列,序号最小的线程可以获得锁;

其他的线程监听自己序号的前一个序号。前一个线程执行结束之后删除自己序号的节点;

下一个序号的线程得到通知,继续执行;

以此类推,创建节点的时候,就确认了线程执行的顺序。

<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.14</version>
  <exclusions>
    <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
  </exclusions>
</dependency>

zk 的观察器只可以监控一次,数据发生变化之后可以发送给客户端,之后需要再次设置监控。exists、create、getChildren三个方法都可以添加watcher ,也就是在调用方法的时候传递true就是添加监听。注意这里Lock 实现了Watcher和AutoCloseable:

当前线程创建的节点是第一个节点就获得锁,否则就监听自己的前一个节点的事件:

/**
 * 自己本身就是一个 watcher,可以得到通知
 * AutoCloseable 实现自动关闭,资源不使用的时候
 */
@Slf4j
public class ZkLock implements AutoCloseable, Watcher {
​
    private ZooKeeper zooKeeper;
​
    /**
     * 记录当前锁的名字
     */
    private String znode;
​
    public ZkLock() throws IOException {
        this.zooKeeper = new ZooKeeper("localhost:2181",
                10000,this);
    }
​
    public boolean getLock(String businessCode) {
        try {
            //创建业务 根节点
            Stat stat = zooKeeper.exists("/" + businessCode, false);
            if (stat==null){
                zooKeeper.create("/" + businessCode,businessCode.getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT);
            }
​
            //创建瞬时有序节点  /order/order_00000001
            znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
​
            //获取业务节点下 所有的子节点
            List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false);
            //获取序号最小的(第一个)子节点
            Collections.sort(childrenNodes);
            String firstNode = childrenNodes.get(0);
            //如果创建的节点是第一个子节点,则获得锁
            if (znode.endsWith(firstNode)){
                return true;
            }
            //如果不是第一个子节点,则监听前一个节点
            String lastNode = firstNode;
            for (String node:childrenNodes){
                if (znode.endsWith(node)){
                    zooKeeper.exists("/"+businessCode+"/"+lastNode,true);
                    break;
                }else {
                    lastNode = node;
                }
            }
            synchronized (this){
                wait();
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
​
    @Override
    public void close() throws Exception {
        zooKeeper.delete(znode,-1);
        zooKeeper.close();
        log.info("我已经释放了锁!");
    }
​
    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeDeleted){
            synchronized (this){
                notify();
            }
        }
    }
}

这段代码实现了一个基于 ZooKeeper 的分布式锁,以下是它的实现步骤:

  1. 首先创建 ZooKeeper 客户端,并实现 Watcher 接口,在自身上注册监听器。

  2. 在 ZooKeeper 上创建一个业务根节点,例如 /businessCode,表示该业务下所有的分布式锁。

  3. 使用 ZooKeeper 的临时有序节点创建子节点,例如 /businessCode/businessCode_00001,表示当前节点占用了分布式锁,并记录在 znode 中。

  4. 获取业务节点下所有的子节点,并按节点名称排序。如果当前节点的名称是所有子节点中最小的,则获取分布式锁。

  5. 如果当前节点不是所有子节点中最小的,则监听前一个子节点的删除事件,等待前一个子节点释放锁。

  6. 当前一个子节点被删除时,重新执行第四步,即再次检查当前节点是否为所有子节点中最小的。

  7. 如果当前节点成功获取到锁,则执行业务操作;否则等待锁释放,重新获取锁。

  8. 业务操作完成后,执行 close() 方法释放锁,删除当前节点。

总体来说,这段代码实现了一种基本的分布式锁机制,通过 ZooKeeper 的临时有序节点和 Watcher 监听机制来实现。需要注意的是,此实现方式还可能存在死锁问题,当持有锁的节点出现网络故障或宕机时,会导致整个分布式锁失效。因此,在实际应用中,还需要综合考虑各种异常情况,确保分布式锁的正确性和高可用性。

 针对死锁ChatGPT3.5的解决方案:

那段代码中存在潜在的死锁问题,即如果持有锁的节点出现异常,可能导致整个分布式锁失效。为了解决这个问题,可以考虑以下两种方案:

  1. 引入超时机制

在获取锁的过程中,可以引入超时机制,例如在等待前一个节点释放锁的过程中,设定一个超时时间,如果超过了这个时间仍未收到通知,则当前节点主动删除自己的临时节点,释放锁,并尝试重新获取锁。

  1. 心跳检测机制

对于已经获取到分布式锁的节点,可以在一定时间间隔内向 ZooKeeper 发送心跳消息,以检测自身是否还处于活跃状态。如果该节点长时间没有发送心跳消息或者 ZooKeeper 无法接收到该节点的心跳消息,就认为该节点出现了异常,需要主动删除自身的临时节点,释放锁,并尝试重新获取锁。

在前面提到的两种方案中,我选择了第一种方案,即引入超时机制来解决死锁问题。具体实现方式如下:

在获取锁的过程中,我们可以增加一个等待时间的参数 waitTime,表示等待前一个节点释放锁的最长时间。在设置监听器之后,使用 CountDownLatch 等待前一个节点的删除事件通知,当出现以下情况之一时,即可跳出等待循环:

  1. 在等待时间内成功获取到锁,即当前节点为业务节点下所有临时节点中的最小节点。

  2. 在等待时间内等待到超时时间,说明前一节点无法正常释放锁,因此当前节点放弃等待,主动删除自己的临时节点,重新尝试获取锁。

修改后的代码如下所示:

public class DistributedLock implements AutoCloseable {

    private static final String ROOT_NODE = "/businessCode"; // 业务根节点
    private final ZooKeeper zkClient; // ZooKeeper 客户端
    private final String lockName; // 分布式锁名称
    private String currentLockNodePath; // 当前节点的 znode 路径

    public DistributedLock(String zkServers, String lockName) throws IOException {
        this.zkClient = new ZooKeeper(zkServers, 3000, event -> {});
        this.lockName = lockName;
        ensureRootNodeExists();
    }

    // 确保业务根节点存在
    private void ensureRootNodeExists() {
        try {
            if (zkClient.exists(ROOT_NODE, false) == null) {
                zkClient.create(ROOT_NODE, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to ensure root node exists: " + ROOT_NODE, e);
        }
    }

    // 获取锁
    public void lock(long waitTime) {
        try {
            // 创建当前节点
            currentLockNodePath = zkClient.create(
                    ROOT_NODE + "/" + lockName + "_",
                    null,
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);

            // 等待前一个节点释放锁
            List<String> children = zkClient.getChildren(ROOT_NODE, false);
            String[] sortedNodes = children.stream().sorted().toArray(String[]::new);
            int currentNodeIndex = Arrays.binarySearch(sortedNodes, currentLockNodePath.substring(currentLockNodePath.lastIndexOf("/") + 1));
            if (currentNodeIndex == 0) {
                // 当前节点为业务节点下所有临时节点中的最小节点,获取锁
                return;
            }

            // 等待前一个节点的删除事件通知,如果超时仍未收到,则主动删除自己的临时节点
            CountDownLatch latch = new CountDownLatch(1);
            String previousNodePath = ROOT_NODE + "/" + sortedNodes[currentNodeIndex - 1];
            Stat stat = zkClient.exists(previousNodePath, event -> {
                if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
                    latch.countDown();
                }
            });
            if (stat != null) {
                if (latch.await(waitTime, TimeUnit.MILLISECONDS)) {
                    // 等待成功,重新执行获取锁操作
                    lock(waitTime);
                } else {
                    // 超时未等到通知,主动删除自己的临时节点,放弃获取锁
                    zkClient.delete(currentLockNodePath, -1);
                    throw new RuntimeException("Failed to acquire lock within wait time: " + waitTime);
                }
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to acquire lock: " + lockName, e);
        }
    }

    // 释放锁
    @Override
    public void close() {
        try {
            System.out.println("Releasing lock: " + lockName);
            zkClient.delete(currentLockNodePath, -1);
            zkClient.close();
        } catch (Exception e) {
            throw new RuntimeException("Failed to release lock: " + lockName, e);
        }
    }
}

lock() 方法中增加了一个参数 waitTime,表示等待前一个节点释放锁的最长时间。在方法最后,如果超时未收到通知,则主动删除当前节点,放弃获取锁,并抛出异常提示用户。

需要注意的是,在使用该类时,需要确保所有节点设置的 waitTime 值是一致的。另外,为了让线程能够跳出等待循环,应该每隔一段时间检查一下当前的线程状态,例如通过 Thread.currentThread().isInterrupted() 判断线程是否被打断,如果被打断则立即跳出循环。

下面是使用心跳检测机制的代码实例:

public class DistributedLock implements AutoCloseable {

    private static final String ROOT_NODE = "/businessCode"; // 业务根节点
    private static final long HEARTBEAT_RATE = 1000L; // 心跳检测频率,单位毫秒
    private final ZooKeeper zkClient; // ZooKeeper 客户端
    private final String lockName; // 分布式锁名称
    private volatile boolean locked = false; // 是否已经获取到了锁
    private Thread heartbeatThread; // 心跳线程
    private String currentLockNodePath; // 当前节点的 znode 路径

    public DistributedLock(String zkServers, String lockName) throws IOException {
        this.zkClient = new ZooKeeper(zkServers, 3000, event -> {});
        this.lockName = lockName;
        ensureRootNodeExists();
    }

    // 确保业务根节点存在
    private void ensureRootNodeExists() {
        try {
            if (zkClient.exists(ROOT_NODE, false) == null) {
                zkClient.create(ROOT_NODE, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to ensure root node exists: " + ROOT_NODE, e);
        }
    }

    // 获取锁
    public void lock() {
        try {
            // 创建当前节点
            currentLockNodePath = zkClient.create(
                    ROOT_NODE + "/" + lockName + "_",
                    null,
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);

            // 等待前一个节点释放锁
            while (!locked) {
                List<String> children = zkClient.getChildren(ROOT_NODE, false);
                String[] sortedNodes = children.stream().sorted().toArray(String[]::new);
                int currentNodeIndex = Arrays.binarySearch(sortedNodes, currentLockNodePath.substring(currentLockNodePath.lastIndexOf("/") + 1));
                if (currentNodeIndex == 0) {
                    // 当前节点为业务节点下所有临时节点中的最小节点,获取锁
                    locked = true;
                    startHeartbeatThread(); // 启动心跳线程
                    break;
                }

                // 等待前一个节点的删除事件通知
                String previousNodePath = ROOT_NODE + "/" + sortedNodes[currentNodeIndex - 1];
                CountDownLatch latch = new CountDownLatch(1);
                Stat stat = zkClient.exists(previousNodePath, event -> {
                    if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
                        latch.countDown();
                    }
                });
                if (stat != null) {
                    latch.await();
                }
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to acquire lock: " + lockName, e);
        }
    }

    // 释放锁
    @Override
    public void close() {
        try {
            System.out.println("Releasing lock: " + lockName);
            zkClient.delete(currentLockNodePath, -1);
            stopHeartbeatThread(); // 停止心跳线程
            zkClient.close();
        } catch (Exception e) {
            throw new RuntimeException("Failed to release lock: " + lockName, e);
        }
    }

    // 启动心跳线程
    private void startHeartbeatThread() {
        heartbeatThread = new Thread(() -> {
            while (true) {
                try {
                    zkClient.setData(currentLockNodePath, null, -1);
                    Thread.sleep(HEARTBEAT_RATE);
                } catch (Exception e) {
                    System.out.println("Failed to send heartbeat signal: " + lockName);
                }
            }
        });
        heartbeatThread.start();
    }

    // 停止心跳线程
    private void stopHeartbeatThread() {
        if (heartbeatThread != null) {
            heartbeatThread.interrupt();
            heartbeatThread = null;
        }
    }
}

在该代码实例中,我们创建了一个布尔型变量 locked,表示当前是否已经获取到了锁。在获取锁的过程中,如果当前节点为业务节点下所有临时节点中的最小节点,则设置 lockedtrue,同时启动心跳线程。心跳线程每隔一段时间就向 ZooKeeper 发送一次空数据以保持会话,从而保证自己的临时节点不会过期。

在释放锁的过程中,停止心跳线程即可。需要注意的是,心跳线程的 while 循环不能被打断,因为一旦被打断,线程就会退出,从而不再发送心跳信号,导致临时节点过期。因此,在捕获 InterruptedException 异常时只是简单地输出日志,并继续下一轮循环。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/430317.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

windows 环境下安装ITOP

该文章修改自旧版本的教程&#xff0c;如有侵权或其他问题请及时联系 windows 环境下安装ITOP 1、安装环境的下载 安装的相关文件列表&#xff0c;自用的程序安装包是版本3.0.2-1&#xff0c;环境是3.2.6版本&#xff1b;也可以直接通过下面链接找到最新的版本进行下载 1.1…

GitLab与jekins结合构建持续集成(cl)环境(2)

目录 GItlab配置邮箱 绑定邮箱 创建群组 添加人员 创建一个项目 添加文件 新建分支 如何拉取代码 Git bash 演示 Git GUI演示 安装jenkins 更改插件镜像源 配置jenkins使用gitlab更新代码 安装jekins插件 配置jenkins免密拉取gatlab代码 jenkins创建项目 将代码…

一种vivado联合vcs仿真以及verdi查看波形的方法

上一篇中提到vivado仿真xilinx官方的axi vip耗时过长、且每次缩放波形时加载慢的问题。后来用了正点原子的AXI DDR例程&#xff0c;将AXI DDR换成了AXI RAM进行读写测试&#xff0c;用以学习了解AXI的工作方式。详见此文读写AXI4接口RAM的简单示例_给米PHY的博客-CSDN博客。 在…

力扣题库刷题笔记20-有效的括号

1、题目如下&#xff1a; 2、个人Python代码实现如下&#xff1a; 第一次读题就理解错了题意&#xff0c;以为是只判断小括号闭合&#xff0c;大括号、中括号只是干扰元素。再次读题后&#xff0c;代码实现如下&#xff1a; 以上代码仍旧是没有理解清楚题意&#xff0c;以为是只…

Linux Shell 实现一键部署二进制docker+docker_compose

docker 前言 Docker 是一个开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的镜像中&#xff0c;然后发布到任何流行的 Linux或Windows 机器上&#xff0c;也可以实现虚拟化。容器是完全使用沙箱机制&#xff0c;相互之间不会有任何接口。 d…

协议篇之以太网UDP协议

协议篇之以太网UDP协议一、写在前面二、TCP/IP协议分层三、UDP协议数据报格式2.1 MAC层/物理层&#xff1a;2.2 IP层/网络层&#xff1a;2.3 UDP层/传输层&#xff1a;2.4 应用层&#xff1a;四、总结四、写在后面一、写在前面 TCP/IP协议是指一个协议簇&#xff0c;可以理解为…

测试包的更新

有的项目有配了ci自动打包更新&#xff0c;开发有权限&#xff0c;就不用测试更新&#xff1b;有的是在阿里云上&#xff0c;测试没有权限&#xff0c;也是开发更新&#xff1b;测试自己的测试服务器&#xff0c;部分开发没有上传下载的权限&#xff0c;所以需要测试来进行更新…

CentOS 8自动化安装MongoDB并安装和实验master-slave集群、副本集群(Replica Set)、分片集群(Sharding)

文章目录CentOS 8自动化安装MongoDB安装Master-Slave集群安装并测试副本集(Replica Set)集群安装副本集(Replica Set)集群实验测试安装并测试分片集群&#xff08;Sharding&#xff09;注意实验使用的是ARM架构的CentOS 8 虚拟机 CentOS 8自动化安装MongoDB 首先&#xff0c;更…

分布式事务-概念-实现方式

分布式事务 文章目录分布式事务一、分布式事务相关概念1.分布式事务架构图2.理解本地事务相关概念3.理解分布式事务相关概念1.CAP理论2.刚性事务&#xff08;CP&#xff09;与柔性事务&#xff08;AP&#xff09;3.基于AP模型衍生下的BASE理论4 .如何从大方向选择分布式事务&am…

MySQL正则表达式 | 事务详解

目录 一、正则表达式 实例操作 二、事务 事务控制语句 MYSQL 事务处理主要有两种方法 SQL测试代码 PHP中使用事务实例 使用保留点 SAVEPOINT 一、正则表达式 MySQL可以通过 LIKE ...% 来进行模糊匹配。 MySQL 同样也支持其他正则表达式的匹配&#xff0c; MySQL中使用…

【嵌入式Linux内核驱动】GPIO子系统

GPIO子系统 总体框架 通用功能 可以设为输出&#xff1a;让它输出高低电平&#xff1b;可以设为输入&#xff0c;读取引脚当前电平&#xff1b;可以用来触发中断 通用属性 Active-High and Active-LowOpen Drain and Open Source 开漏和开源 GPIOLIB向上提供的gpiod相关接…

数据结构与算法之手撕排序算法

前言 为什么要学习排序算法&#xff1f; 根据统计&#xff0c;早起大型机CPU资源的四分之一都花在了数据排序上面。排序算法作为最基础的算法&#xff0c;各种操作系统、编程语言都提供了内置的实现。既然排序实现随处可见&#xff0c;我们为什么还要自己动手实现呢&#xff1…

文章伪原创-文章伪原创工具

伪原创原理 文章伪原创的原理是将一篇原始文章进行修改、改写、调整或者替换一些词句等方式&#xff0c;生成与原始文章类似但又不完全相同的新文章。 文章伪原创的实现需要用到自然语言处理技术和相关的算法。具体的过程包括以下几个步骤&#xff1a; 分析原始文章&#xff…

YML是一种数据存储格式

读取yml配置信息 Value("${province}") private String province; Value("${user.sname}") private String name1; Value("${user1[1].name}") private String name; Value("${server.port}") private int port; server:port: 8099 #…

大数据现在找工作难么

大数据行业工作好找还是难找不是光靠嘴说出来的结合实际&#xff0c;看看市场上的招聘需求和岗位要求就大致知道了 要想符合企业用人规范&#xff0c;学历&#xff0c;工作经验&#xff0c;掌握技能都是非常重要的~ 先来看几个招聘网站的报告数据&#xff1a; Boss直聘发布的…

Linux应用编程(系统信息与系统资源)

在应用程序当中&#xff0c;有时往往需要去获取到一些系统相关的信息&#xff0c;譬如时间、日期、以及其它一些系统相关信息&#xff0c;本章将向大家介绍如何通过 Linux 系统调用或 C 库函数获取系统信息&#xff0c;譬如获取系统时间、日期以及设置系统时间、日期等&#xf…

java拦截器怎么实现

Java拦截器是一种对象拦截器&#xff0c;它可以拦截任何的类、方法和字段。拦截器还可以用于检查类是否已经加载以及对字段的访问权限是否符合规范。下面就让我们来了解一下 java拦截器怎么实现吧。 在 Java中&#xff0c;可以通过重写方法和代码块来实现拦截功能&#xff0c;但…

通俗讲解什么是Socket通讯

Socket通讯原理 1、什么是Socket&#xff1f; Socket&#xff0c;即套接字。就是两台主机之间逻辑连接的端点。&#xff08;通俗来说&#xff1a;网络上的两个程序通过一个双向的通信连接实现数据的交换&#xff0c;这个连接的一端称为一个socket&#xff09;。 Socket是一套…

【python】类和对象 | 一些混淆的知识点再复盘 | 魔术方法(特殊方法)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录前言一、魔术方法二、构造方法三、__ getitem __方法四、__ len __方法前言 参考视频&#xff1a;视频 一、魔术方法 首先看一看chatgpt对魔术方法&#xff08;特…

一图看懂 xlsxwriter 模块:用于创建 Excel .xlsx 文件, 资料整理+笔记(大全)

本文由 大侠(AhcaoZhu)原创&#xff0c;转载请声明。 链接: https://blog.csdn.net/Ahcao2008 一图看懂 xlsxwriter 模块&#xff1a;用于创建 Excel .xlsx 文件, 资料整理笔记&#xff08;大全&#xff09;摘要模块图类关系图模块全展开【xlsxwriter】统计常量模块1 xlsxwrite…