基于zookeeper实现分布式锁

news2024/9/28 17:36:47

目录

zookeeper知识点复习

相关概念

java客户端操作

实现思路分析 

基本实现

初始化链接

代码落地 

优化:性能优化

 实现阻塞锁

监听实现阻塞锁

优化:可重入锁

zk分布式锁小结 


zookeeper知识点复习

Zookeeper(业界简称zk)是一种提供配置管理、分布式协同以及命名的中心化服务,这些提供的
功能都是分布式系统中非常底层且必不可少的基本功能,但是如果自己实现这些功能而且要达到高吞吐、低延迟同时还要保持一致性和可用性,实际上非常困难。因此zookeeper提供了这些功能,开发者在zookeeper之上构建自己的各种分布式系统。

相关概念

Zookeeper提供一个多层级的节点命名空间(节点称为znode),每个节点都用一个以斜杠(/)分隔的路径表示,而且每个节点都有父节点(根节点除外),非常类似于文件系统。并且每个节点都是唯一的。

znode节点有四种类型:

  • PERSISTENT:永久节点。客户端与zookeeper断开连接后,该节点依旧存在
  • EPHEMERAL:临时节点。客户端与zookeeper断开连接后,该节点被删除
  • PERSISTENT_SEQUENTIAL:永久节点、序列化。客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号
  • EPHEMERAL_SEQUENTIAL:临时节点、序列化。客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号

创建这四种节点:

 事件监听:在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时zookeeper会通知客户端。当前zookeeper有如下四种事件:
1. 节点创建
2. 节点删除
3. 节点数据修改
4. 子节点变更

java客户端操作

1. 引入依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.14</version>
</dependency>

2. 常用api及其方法

初始化zookeeper客户端类,负责建立与zkServer的会话 

       new ZooKeeper(connectString, 30000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("获取链接成功!!");
            }
        });

创建一个节点,1-节点路径 2-节点内容 3-访问控制控制 4-节点类型 


 String fullPath = zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT);

判断一个节点是否存在

Stat stat = zooKeeper.exists(rootPath, false);
if (stat != null) {...}

查询一个节点的内容 

 Stat stat = new Stat();
 byte[] data = zooKeeper.getData(path, false, stat);

更新一个节点 

zooKeeper.setData(rootPath, new byte[]{}, stat.getVersion() + 1);

删除一个节点 

 zooKeeper.delete(path, stat.getVersion());

查询一个节点的子节点列表 

List<String> children = zooKeeper.getChildren(rootPath, false);

关闭链接 

if (zooKeeper != null) {
       zooKeeper.close();
   }

实现思路分析 

 分布式锁的步骤:
1. 获取锁:create一个节点
2. 删除锁:delete一个节点
3. 重试:没有获取到锁的请求重试
参照redis分布式锁的特点:
        1. 互斥 排他
        2. 防死锁:

        1. 可自动释放锁(临时节点) :获得锁之后客户端所在机器宕机了,客户端没有主动删除子节点;如果创建的是永久的节点,那么这个锁永远不会释放,导致死锁;由于创建的是临时节点,客户端宕机后,过了一定时间zookeeper没有收到客户端的心跳包判断会话失效,将临时节点删除从而释放锁。
        2. 可重入锁:借助于ThreadLocal

3. 防误删:宕机自动释放临时节点,不需要设置过期时间,也就不存在误删问题。
4. 加锁/解锁要具备原子性
5. 单点问题:使用Zookeeper可以有效的解决单点问题,ZK一般是集群部署的。
6. 集群问题:zookeeper集群是强一致性的,只要集群中有半数以上的机器存活,就可以对外提供服务。

基本实现

实现思路:
1. 多个请求同时添加一个相同的临时节点,只有一个可以添加成功。添加成功的获取到锁
2. 执行业务逻辑
3. 完成业务流程后,删除节点释放锁。

初始化链接

由于zookeeper获取链接是一个耗时过程,这里可以在项目启动时,初始化链接,并且只初始化一次。借助于spring特性,代码实现如下:

@Component
public class zkClient {
    private static final String connectString = "192.168.107.135";

    private static final String ROOT_PATH = "/distributed";

    private ZooKeeper zooKeeper;

    @PostConstruct
    public void init() throws IOException {
        this.zooKeeper = new ZooKeeper(connectString, 30000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("zookeeper 获取链接成功");
            }
        });
        //创建分布式锁根节点
        try {
            if (this.zooKeeper.exists(ROOT_PATH, false) == null) {
                this.zooKeeper.create(ROOT_PATH, null,
                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @PreDestroy
    public void destroy() {
        if (zooKeeper != null) {
            try {
                zooKeeper.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 初始化分布式对象方法
     */
    public ZkDistributedLock getZkDistributedLock(String lockname){
        return new ZkDistributedLock(zooKeeper,lockname);
    }
}

代码落地 

public class ZkDistributedLock {
    public static final String ROOT_PATH = "/distribute";
    private String path;
    private ZooKeeper zooKeeper;


    public ZkDistributedLock(ZooKeeper zooKeeper, String lockname) {
        this.zooKeeper = zooKeeper;
        this.path = ROOT_PATH + "/" + lockname;
    }

    public void lock() {
        try {
            zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            Thread.sleep(200);
            lock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    public  void  unlock(){
        try {
            this.zooKeeper.delete(path,0);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }

    }
}

改造StockService的checkAndLock方法:

    @Autowired
    private zkClient client;
    
    public void checkAndLock() {
        // 加锁,获取锁失败重试
        ZkDistributedLock lock = this.client.getZkDistributedLock("lock");
        lock.lock();
        // 先查询库存是否充足
        Stock stock = this.stockMapper.selectById(1L);
        // 再减库存
        if (stock != null && stock.getCount() > 0) {
            stock.setCount(stock.getCount() - 1);
            this.stockMapper.updateById(stock);
        }
        lock.unlock();
    }

Jmeter压力测试:

 性能一般,mysql数据库的库存余量为0(注意:所有测试之前都要先修改库存量为5000)

基本实现存在的问题:
        1. 性能一般(比mysql略好)
        2. 不可重入

接下来首先来提高性能

优化:性能优化

基本实现中由于无限自旋影响性能:

试想:每个请求要想正常的执行完成,最终都是要创建节点,如果能够避免争抢必然可以提高性能。这里借助于zk的临时序列化节点,实现分布式锁: 

 实现阻塞锁

代码实现:

public class ZkDistributedLock {
    public static final String ROOT_PATH = "/distribute";
    private String path;
    private ZooKeeper zooKeeper;


    public ZkDistributedLock(ZooKeeper zooKeeper, String lockname) {
        this.zooKeeper = zooKeeper;
        try {
            this.path = zooKeeper.create(ROOT_PATH + "/" + lockname + "_",
                    null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void lock() {
        String preNode = getpreNode(path);
        //如果该节点没有前一个节点,说明该节点是最小的节点
        if (StringUtils.isEmpty(preNode)) {
            return;
        }
        //重新检查是否获取到锁
        try {
            Thread.sleep(20);
            lock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取指定节点的前节点
     *
     * @param path
     * @return
     */
    private String getpreNode(String path) {
        //获取当前节点的序列化序号
        Long curSerial = Long.valueOf(StringUtil.substringAfter(path, '_'));
        //获取根路径下的所有序列化子节点
        try {
            List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);
            //判空处理
            if (CollectionUtils.isEmpty(nodes)) {
                return null;
            }
            //获取前一个节点
            Long flag = 0L;
            String preNode = null;
            for (String node : nodes) {
                //获取每个节点的序列化号
                Long serial = Long.valueOf(StringUtil.substringAfter(path, '_'));
                if (serial < curSerial && serial > flag) {
                    flag = serial;
                    preNode = node;
                }
            }
            return preNode;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public void unlock() {
        try {
            this.zooKeeper.delete(path, 0);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }

    }
}

主要修改了构造方法和lock方法:

 并添加了getPreNode获取前置节点的方法。

测试结果如下:

 性能反而更弱了。

原因:虽然不用反复争抢创建节点了,但是会自选判断自己是最小的节点,这个判断逻辑反而更复杂更 耗时。

解决方案:监听实现阻塞锁

监听实现阻塞锁

对于这个算法有个极大的优化点:假如当前有1000个节点在等待锁,如果获得锁的客户端释放锁时,这1000个客户端都会被唤醒,这种情况称为“羊群效应”;在这种羊群效应中,zookeeper需要通知1000个 客户端,这会阻塞其他的操作,最好的情况应该只唤醒新的最小节点对应的客户端。应该怎么做呢?在 设置事件监听时,每个客户端应该对刚好在它之前的子节点设置事件监听,例如子节点列表 为/lock/lock-0000000000、/lock/lock-0000000001、/lock/lock-0000000002,序号为1的客户端监听 序号为0的子节点删除消息,序号为2的监听序号为1的子节点删除消息。

所以调整后的分布式锁算法流程如下:

  • 客户端连接zookeeper,并在/lock下创建临时的且有序的子节点,第一个客户端对应的子节点 为/lock/lock-0000000000,第二个为/lock/lock-0000000001,以此类推;
  • 客户端获取/lock下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子 节点,如果是则认为获得锁,否则监听刚好在自己之前一位的子节点删除消息,获得子节点变更通 知后重复此步骤直至获得锁;
  • 执行业务代码;
  • 完成业务流程后,删除对应的子节点释放锁。

改造ZkDistributedLock的lock方法:

    public void lock() {
        String preNode = getpreNode(path);
        //如果该节点没有前一个节点,说明该节点是最小的节点
        if (StringUtils.isEmpty(preNode)) {
            return;
        } else {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            try {
                if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, watchedEvent -> {
                    countDownLatch.countDown();
                }) == null) {
                    return;
                }
                countDownLatch.await();
                return;

            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            lock();
        }
    }

压力测试效果如下:

 由此可见性能提高不少仅次于redis的分布式锁

优化:可重入锁

引入ThreadLocal线程局部变量保证zk分布式锁的可重入性。

在对应的线程的存储数据

public class ZkDistributedLock {
    public static final String ROOT_PATH = "/distribute";
    private String path;
    private ZooKeeper zooKeeper;
    private static final ThreadLocal<Integer> THREAD_LOCAL = new ThreadLocal<>();


    public ZkDistributedLock(ZooKeeper zooKeeper, String lockname) {
        this.zooKeeper = zooKeeper;
        try {
            this.path = zooKeeper.create(ROOT_PATH + "/" + lockname + "_",
                    null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void lock() {
        Integer flag = THREAD_LOCAL.get();
        if (flag != null && flag > 0) {
            THREAD_LOCAL.set(flag + 1);
            return;
        }
        String preNode = getpreNode(path);
        //如果该节点没有前一个节点,说明该节点是最小的节点
        if (StringUtils.isEmpty(preNode)) {
            return;
        } else {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            try {
                if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, watchedEvent -> {
                    countDownLatch.countDown();
                }) == null) {
                    return;
                }
                countDownLatch.await();
                THREAD_LOCAL.set(1);
                return;

            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            lock();
        }
    }

    /**
     * 获取指定节点的前节点
     *
     * @param path
     * @return
     */
    private String getpreNode(String path) {
        //获取当前节点的序列化序号
        Long curSerial = Long.valueOf(StringUtil.substringAfter(path, '_'));
        //获取根路径下的所有序列化子节点
        try {
            List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);
            //判空处理
            if (CollectionUtils.isEmpty(nodes)) {
                return null;
            }
            //获取前一个节点
            Long flag = 0L;
            String preNode = null;
            for (String node : nodes) {
                //获取每个节点的序列化号
                Long serial = Long.valueOf(StringUtil.substringAfter(path, '_'));
                if (serial < curSerial && serial > flag) {
                    flag = serial;
                    preNode = node;
                }
            }
            return preNode;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public void unlock() {
        try {
            THREAD_LOCAL.set(THREAD_LOCAL.get() - 1);
            if (THREAD_LOCAL.get() == 0) {
                this.zooKeeper.delete(path, 0);
                THREAD_LOCAL.remove();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }

    }
}

zk分布式锁小结 

参照redis分布式锁的特点:
1. 互斥 排他:zk节点的不可重复性,以及序列化节点的有序性
2. 防死锁:
        1. 可自动释放锁:临时节点
        2. 可重入锁:借助于ThreadLocal
3. 防误删:临时节点
4. 加锁/解锁要具备原子性
5. 单点问题:使用Zookeeper可以有效的解决单点问题,ZK一般是集群部署的。
6. 集群问题:zookeeper集群是强一致性的,只要集群中有半数以上的机器存活,就可以对外提供服务。
7. 公平锁:有序性节点

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/487068.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux】多路转接--select、poll、epoll,非阻塞等待

1.IO的概念 IO等拷贝数据 等&#xff1a;发送缓冲区满了或者接受缓冲区没有数据&#xff0c;就需要等待 高效IO就是&#xff1a;减少单位时间内,"等"的比重 2. 阻塞IO和非阻塞IO 2.1.阻塞IO 阻塞等待会在read的地方等待 #include <iostream> #include &l…

JavaScript实现输入数字,输出是几月份的代码

以下为实现输入数字&#xff0c;输出是几月份的代码和运行截图 目录 前言 一、实现输入数字&#xff0c;输出是几月份的 1.1 运行流程及思想 1.2 代码段 1.3 JavaScript语句代码 1.4 运行截图 前言 1.若有选择&#xff0c;您可以在目录里进行快速查找&#xff1b; 2.本…

1699_simulink代码生成配置初级方案

全部学习汇总&#xff1a; GreyZhang/g_matlab: MATLAB once used to be my daily tool. After many years when I go back and read my old learning notes I felt maybe I still need it in the future. So, start this repo to keep some of my old learning notes servral …

数据库篇:初始化、建表、配置及调用

微信小程序云开发实战-答题积分赛小程序 数据库篇:初始化、建表、配置及调用 开通云开发服务 点击【云开发】,开通云开发服务; 开通服务完成后,方可继续往下操作; 题库数据表初始化 创建数据表 点击【数据库】,然后点击【+】创建数据表;

彻底告别手动配置任务,魔改xxl-job!

分析 改造 1、接口调用 2、创建新注解 3、自动注册核心 4、自动装配 测试 测试后 XXL-Job是一款非常优秀的任务调度中间件&#xff0c;其轻量级、使用简单、支持分布式等优点&#xff0c;被广泛应用在我们的项目中&#xff0c;解决了不少定时任务的调度问题。不仅如此&a…

RabbitMQ 简单模型

MQ引言 1.1 什么是MQ ​ MQ(Message Quene) : 翻译为消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息&#xff0c;消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的&#xff0c;而且只关心消息的发送和接收&#xff0c;没有业务逻辑的侵…

chatGPT+Midjourney制作绘画本

chatGPTMidjourney制作绘画本 灵感来源&#xff1a;https://www.bilibili.com/video/BV1N24y1F7ga/?spm_id_from888.80997.embed_other.whitelist&vd_source6dd97671c42eb7cf111063714216bd0b 最终效果&#xff1a; 绘本故事 故事塑造能力弱的人可以使用chatGPT来帮助编…

wait/waitpid函数等待子进程状态发生改变

&#x1f38a;【进程通信与并发】专题正在持续更新中&#xff0c;进程&#xff0c;线程&#xff0c;IPC&#xff0c;线程池等的创建原理与运用✨&#xff0c;欢迎大家前往订阅本专题&#xff0c;获取更多详细信息哦&#x1f38f;&#x1f38f;&#x1f38f; &#x1fa94;本系列…

【自看】2023前端面试上岸手册——VUE部分

目录 Vue 的基本原理双向数据绑定的原理MVVM、MVC、MVP 的区别slot 是什么&#xff1f;有什么作用&#xff1f;原理是什么&#xff1f;\$nextTick 原理及作用Vue 单页应用与多页应用的区别Vue 中封装的数组方法有哪些&#xff0c;其如何实现页面更新Vue data 中某一个属性的值发…

商品管理系统【控制台+MySQL】(Java课设)

系统类型 控制台类型Mysql数据库存储数据 使用范围 适合作为Java课设&#xff01;&#xff01;&#xff01; 部署环境 jdk1.8Mysql8.0Idea或eclipsejdbc 运行效果 本系统源码地址&#xff1a;https://download.csdn.net/download/qq_50954361/87738976 更多系统资源库地…

辅助驾驶功能开发-功能规范篇(16)-2-领航辅助系统NAP-匝道跟车基础功能

书接上回 2.3.3匝道辅助驾驶 匝道辅助驾驶功能根据导航引导在ODD范围内辅助驾驶车辆进出匝道,主动变道并入或开出主路,并可根据导航路线引导车辆通过跨高速连接路。 前置条件: 1)驾驶员设置导航目的地及导航路线 2)开启辅助驾驶功能,系统进入NOA功能 2.3.3.1.上下匝道…

如何设计一个可扩展的优惠券功能

本文主要分享了如何设计一个可扩展的优惠券功能。 一、功能特性介绍 1.每个条件的代码独立&#xff0c;相当于单独的实现类实现接口&#xff0c;就能通过配置添加到优惠券条件校验当中&#xff0c;支持多种条件灵活组合 2.新增一种使用条件可以不修改核心流程代码&#xff0…

Angular 与PDF之二:打印预览的实现

如何在angular中实现打印和预览pdf的功能, 使用print.js这个包就可实现这个功能 Print.js介绍 Print.js可以打印pdf文件&#xff0c;html元素&#xff0c;图片。官网 https://printjs.crabbly.com/ Print.js使用 首先新建一个angular项目&#xff0c;在项目里下载print.js n…

[JS每M日N练] [格物] - 你所不知道的toString

文章目录 导读Object.prototype.toString常见类型转换结果Object.toString ! Object.prototype.toString对Object.prototype.toString.call(obj)的理解 .toString.toString TypeError误区tostring被改写了定义在原型链的什么位置上方法重写 文章小结参考资料 导读 开发过程中经…

同时使用注解和 xml 的方式引用 dubbo 服务产生的异常问题排查实战

文章目录 一、现象二、问题排查三、结论四、解决方案 一、现象 使用 nacos 作注册中心的线上 dubbo 消费端应用每隔 1 分钟就会抛出以下异常&#xff08;为使描述简单化&#xff0c;文章中使用本地 demo 来复现&#xff09;&#xff0c;该异常表示无法连接到 172.17.0.1:20881…

JavaWeb( 二 ) URL

1.4.URL统一资源定位符 URL代表Uniform Resource Locator 统一资源定位符&#xff0c;也叫 URL地址 。是用于标识和定位Web上资源的地址&#xff0c;通常用于在Web浏览器中访问网站和文件。 URL由若干部分组成&#xff0c;scheme:// host : port / path 例如&#xff1a; htt…

Contest3111 - 计科2101~2104算法设计与分析上机作业07

问题 A: 有重复元素的排列问题 题目描述 设R{ r 1 , r 2 , …, r n }是要进行排列的n个元素。其中元素r 1 , r 2 , …, r n 可能相同。试设计一个算法&#xff0c; 列出R的所有不同排列。给定n 以及待排列的n 个元素。计算出这n 个元素的所有不同排列。 输入 第1 行是元素个…

android四大组件之一-Activity实现原理分析

前言&#xff1a; 这篇文章是我花费时间最久的一篇文章&#xff0c;整整的两个月。整个流程繁琐是一个方面的原因&#xff0c;另外一个原因是我想尽可能的把整个流程的逻辑尽可能详细的一一描述出来&#xff0c;以及结合到我们项目中遇到的一些问题来进行解释&#xff0c;毕竟…

【五一创作】VS+Qt主界面内嵌自定义控件的四种方法以及不同自定义控件数据交互

前言 在Qt界面开发过程中&#xff0c;一个主界面或者主窗口看成是各个控件排列组合后的集合&#xff0c;对于一些项目而言&#xff0c;有些常用的控件可以封装成自己想要的控件样式并且复用&#xff0c;比如说&#xff0c;log显示控件&#xff0c;图像/视频显示控件等&#xf…

【ros2】ros melodic迁移到ros2 dashing过程中碰到的问题及解决方法

序言 总结踩坑经历&#xff0c;以利他人 1. error: forming pointer to reference type … & 报错原因&#xff1a; ros2回调函数的参数不能是引用形式 &&#xff0c;需要去除& 解决方法&#xff1a; 如果是指针引用&#xff0c;直接去除引用 void Callback(con…