zookeeper之基本使用及实现分布式锁

news2025/1/10 23:30:50

写在前面

本文一起看下zk的基本用法。

安装 。

1:数据结构

采用类似于linux系统的文件系统存储结构,但不同于Linux系统文件,zk每个节点都可以存储数据,结构如下图:

在这里插入图片描述

节点类型分为如下四种:

PERSISTENT,持久性ZNode。创建后,即使客户端与服务端断开连接也不会删除,只有客户端主动删除才会消失。
PERSISTENT_SEQUENTIAL,持久性顺序编号ZNode。和持久性节点一样不会因为断开连接后而删除,并且ZNode的编号会自动增加,比如创建/aa/bb,创建结果为/aa/bb00000001这种。
EPHEMERAL,临时性ZNode。客户端与服务端断开连接,该ZNode会被删除。
EPEMERAL_SEQUENTIAL,临时性顺序编号ZNode。和临时性节点一样,断开连接会被删除,并且ZNode的编号会自动增加,比如创建/aa/bb,创建结果为/aa/bb00000001这种。

2:操作

源码 。

//连接地址及端口号
private static final String SERVER_HOST = "127.0.0.1:5181";
//会话超时时间
private static final int SESSION_TIME_OUT = 2000;

2.1:Maven

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>

2.2:create

private static void create() throws IOException, KeeperException, InterruptedException {
    //参数一:服务端地址及端口号
    //参数二:超时时间
    //参数三:监听器
    ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            //获取事件的状态
            Event.KeeperState state = watchedEvent.getState();
            //判断是否是连接事件
            if (Event.KeeperState.SyncConnected == state) {
                Event.EventType type = watchedEvent.getType();
                if (Event.EventType.None == type) {
                    System.out.println("zk客户端已连接...");
                }
            }
        }
    });
    zooKeeper.create("/java", "Hello World".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    System.out.println("新增ZNode成功");
    zooKeeper.close();
}

2.3:query

private static void query() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            //获取事件的状态
            Event.KeeperState state = watchedEvent.getState();
            //判断是否是连接事件
            if (Event.KeeperState.SyncConnected == state) {
                Event.EventType type = watchedEvent.getType();
                if (Event.EventType.None == type) {
                    System.out.println("zk客户端已连接...");
                }
            }
        }
    });
    //数据的描述信息,包括版本号,ACL权限,子节点信息等等
    Stat stat = new Stat();
    //返回结果是byte[]数据,getData()方法底层会把描述信息复制到stat对象中
    byte[] bytes = zooKeeper.getData("/java", false, stat);
    //打印结果
    System.out.println("ZNode的数据data:" + new String(bytes));//Hello World
    System.out.println("获取到dataVersion版本号:" + stat.getVersion());//默认数据版本号是0
}

2.4:update

private static void update() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            //获取事件的状态
            Event.KeeperState state = watchedEvent.getState();
            //判断是否是连接事件
            if (Event.KeeperState.SyncConnected == state) {
                Event.EventType type = watchedEvent.getType();
                if (Event.EventType.None == type) {
                    System.out.println("zk客户端已连接...");
                }
            }
        }
    });
    //获取节点描述信息
    Stat stat = new Stat();
    zooKeeper.getData("/java", false, stat);
    System.out.println("更新ZNode数据...");
    //更新操作,传入路径,更新值,版本号三个参数,返回结果是新的描述信息
    Stat setData = zooKeeper.setData("/java", "fly!!!".getBytes(), stat.getVersion());
    System.out.println("更新后的版本号为:" + setData.getVersion());//更新后的版本号为:1
}

2.5:del

private static void del() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            //获取事件的状态
            Event.KeeperState state = watchedEvent.getState();
            //判断是否是连接事件
            if (Event.KeeperState.SyncConnected == state) {
                Event.EventType type = watchedEvent.getType();
                if (Event.EventType.None == type) {
                    System.out.println("zk客户端已连接...");
                }
            }
        }
    });
    Stat stat = new Stat();
    zooKeeper.getData("/java", false, stat);
//删除ZNode
    zooKeeper.delete("/java", stat.getVersion());
}

2.6:update watch

注意watch只会触发一次。

节点更新触发watch。

private static void testUpdateWatcher() throws Exception {
    ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            //获取事件的状态
            Event.KeeperState state = watchedEvent.getState();
            //判断是否是连接事件
            if (Event.KeeperState.SyncConnected == state) {
                Event.EventType type = watchedEvent.getType();
                if (Event.EventType.None == type) {
                    System.out.println("zk客户端已连接...");
                }
            }
        }
    });
    zooKeeper.exists("/java", new MyWatcher());
    //对ZNode进行更新数据的操作,触发监听器
    zooKeeper.setData("/java", "fly".getBytes(), -1);
}

2.7:delete watch

注意watch只会触发一次。

节点删除触发watch。

private static void testDeleteWatcher() throws Exception {
    ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            //获取事件的状态
            Event.KeeperState state = watchedEvent.getState();
            //判断是否是连接事件
            if (Event.KeeperState.SyncConnected == state) {
                Event.EventType type = watchedEvent.getType();
                if (Event.EventType.None == type) {
                    System.out.println("zk客户端已连接...");
                }
            }
        }
    });
    String path = "/java" + Math.random();
    System.out.println(zooKeeper.create(path, "Hello World".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL));
    ZooKeeper zooKeeper1 = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            //获取事件的状态
            Event.KeeperState state = watchedEvent.getState();
            //判断是否是连接事件
            if (Event.KeeperState.SyncConnected == state) {
                Event.EventType type = watchedEvent.getType();
                if (Event.EventType.None == type) {
                    System.out.println("zk客户端已连接...");
                }
            }
        }
    });
    // 其他客户端监听
    zooKeeper1.exists(path, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.out.println("我是exists()方法的监听器:" + event.getType().name());
        }
    });
    Stat stat = new Stat();
    zooKeeper.getData(path, false, stat);
//        zooKeeper.delete(path, stat.getVersion());
    // 连接关闭自动删除
    zooKeeper.close();
    Thread.sleep(Integer.MAX_VALUE);
}

3:实现分布式锁

3.1:原理

当有多个线程并发创建同一个节点时,zookeeper只允许一个线程创建成功,基于此我们就可以通过创建节点的方式来实现,这里的节点我们使用临时节点,这里使用临时节点的原因是其在会话结束后会自动删除,客户端就不需要关心节点删除的问题,也避免了基于Redis实现分布式锁 时客户端异常断开还需要等到超时key才会删除的问题。具体流程如下:

1:用zookeeper中一个临时节点代表锁,比如在/exlusive_lock下创建临时子节点/exlusive_lock/lock。
2:所有客户端争相创建此节点,但只有一个客户端创建成功。
3:创建成功代表获取锁成功,此客户端执行业务逻辑
4:未创建成功的客户端,监听/exlusive_lock变更
5:获取锁的客户端执行完成后,删除/exlusive_lock/lock,表示锁被释放
6:锁被释放后,其他监听/exlusive_lock变更的客户端得到通知,再次争相创建临时子节点/exlusive_lock/lock。此时相当于回到了第2步。

这种方式的优点是实现简单,缺点如下:

1:实现的是非公平锁,容易出现线程的饿死,系统要避免出现饿死,因为饿死可以认为是一定程度上的死锁了
2:锁释放时,zk要同时通知所有节点,所有节点又会同时并发调用zk创建节点,争抢执行资格,当节点较多时,会给zk带来比较大的压力

上述两个问题,我们可以通过基于临时顺序节点实现分布式锁的方案来解决,具体如下:

1:每个客户端往/exlusive_lock下创建有序临时节点/exlusive_lock/lock_。创建成功后/exlusive_lock下面会有每个客户端对应的节点,如/exlusive_lock/lock_000000001
2:客户端取得/exlusive_lock下子节点,并进行排序,判断排在最前面的是否为自己。
3:如果自己的锁节点在第一位,代表获取锁成功,此客户端执行业务逻辑
4:如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点lock_000000002,那么则监听lock_000000001.
5:当前一位锁节点(lock_000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(lock_000000002)的逻辑。
监听客户端重新执行第2步逻辑,判断自己是否获得了锁。

以上排队获取锁,实现了公平锁,解决了线程饿死的问题,每个节点只会有一个服务注册watcher,zk每次只需要通知一个节点,且每个服务创建节点的操作只需要执行一次(后续收到通知只需要判断自己创建过的节点是否排在首位就行了),也大大降低了zk的服务压力。

针对这两种方案我们接下来分别给出具体的编程实现,pom如下:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>

3.2:基于临时节点

源码 。

核心代码如下,争抢创建节点失败的话,注册一个watcher,监听节点删除,再重复争抢过程,直到抢到:

try {
    zooKeeper.create(LOCK_KEY_NAME , "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    System.err.println(Thread.currentThread().getName() + " 争抢锁成功,开始执行临界区业务!");
    Thread.sleep((long) (Math.random() * 5000));
    System.out.println(Thread.currentThread().getName() + " 执行临界区业务结束!");
    // 业务执行完毕,可以关闭
    canClose = true;
    countDownLatch.countDown();
} catch (KeeperException.NodeExistsException e) {
    try {
        zooKeeper.exists(LOCK_KEY_NAME, watchedEvent -> {
            // 删除
            if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted) {
                System.out.println("锁被释放了,开始抢!");
                try {
                    tryAcquireLock(zooKeeper);
                } catch (Exception interruptedException) {
                    interruptedException.printStackTrace();
                }
            }
        });
        System.out.println(Thread.currentThread().getName() + " 争抢锁失败,注册监听下次再抢!!!");
    } catch (KeeperException keeperException) {
        keeperException.printStackTrace();
    } catch (InterruptedException interruptedException) {
        interruptedException.printStackTrace();
    }

运行如下:

在这里插入图片描述

3.3:基于临时顺序节点

源码 。

核心代码如下,创建节点后判断自己是否为第一个,是的话则争抢成功,否则监听前一个节点,之后收到watch通知后判断是否为第一个节点,重复这个过程,直到成功,理论上只要收到通知自己已经处于第一个节点了,但道理虽然如此,程序还是要严谨:

// realLockPath不为空则说明是第一次已经创建节点,其前节点删除触发了watch
private static void tryAcquireLock(ZooKeeper zooKeeper, String realLockPath) {
    boolean canClose = false;
    try {
        String lockPath = LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME;
        if (realLockPath == null) {
            realLockPath = zooKeeper.create(lockPath , "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        }
        List<String> lockPaths = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
        Collections.sort(lockPaths);
        int index = lockPaths.indexOf(realLockPath.substring(LOCK_ROOT_PATH.length() + 1));
// 如果lockPath是序号最小的节点,则获取锁
        if (index == 0) {
            System.err.println(Thread.currentThread().getName() + " 争抢锁成功,开始执行临界区业务!lockPath: " + realLockPath);
            Thread.sleep((long) (Math.random() * 5000));
            System.out.println(Thread.currentThread().getName() + " 执行临界区业务结束!");
            // 业务执行完毕,可以关闭
            canClose = true;
            countDownLatch.countDown();
        } else {
            // lockPath不是序号最小的节点,监听前一个节点
            String preLockPath = lockPaths.get(index - 1);
            String watchPath = LOCK_ROOT_PATH + "/" + preLockPath;
            System.out.println(Thread.currentThread().getName() + " 争抢锁失败,注册监听下次再抢!!! watchPath: " + watchPath + " lockPath: " + realLockPath);
            // 监听自己前一个节点的删除,理论上,watch执行一次肯定能拿到锁
            String finalRealLockPath = realLockPath;
            zooKeeper.exists(watchPath, watchedEvent -> {
                // 删除
                if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted) {
                    System.out.println("前节点锁被释放了,开始抢!");
                    try {
                        tryAcquireLock(zooKeeper, finalRealLockPath);
                    } catch (Exception interruptedException) {
                        interruptedException.printStackTrace();
                    }
                }
            });
        }
    } catch (KeeperException.NodeExistsException e) {
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
            if (canClose) zooKeeper.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行如下:

在这里插入图片描述

对应zk的节点信息变化如下图:

在这里插入图片描述

写在后面

参考文章列表:

ZooKeeper入门,看这篇就够了! 。

ZooKeeper分布式锁实现java例子,附完整可运行源代码 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/125631.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RK3568平台开发系列讲解(设备驱动篇)中断下文之tasklet的使用

🚀返回专栏总目录 文章目录 一、tasklet 的概念二、tasklet 参考步骤沉淀、分享、成长,让自己和他人都能有所收获!😄 📢我们一般将中断分为上下两个部分,分为上半部,下半部。上半部完成有严格时限的工作(必须),例如回复硬件等,这些工作都是在禁止其他中断情况下进…

汽车相关概念记录

目录 一、汽车电路相关概念 1.1、两个电源 1.2、单线制 1.3、低压直流供电 1.4、常电与ACC 1.4.1、大众ACC供电开关 1.4.2、奥迪ACC开关 二、电子电路 2.1、三极管 2.1.2、截止状态 2.1.3、放大区 2.1.4、饱和区 2.1.4、实例分析 一、汽车电路相关概念 1.1、两个电…

CIO40— 2022 行平常心,做自由人 (3年之约已满)

今天的天空依然很蓝。认识还是在那个不戴口罩的夏天。 感谢IT行业给了我们帮助。 IT将交流植根于微信群&#xff0c;微信群既是信息的集散地&#xff0c;也是良好实践的方案池。在工作中碰到的问题&#xff0c;只要在IT微信群中求助&#xff0c;大家都是知无不言&#xff0c…

C# 异步编程

一 异步编程 1 异步 asynchronize 2 主要解决的事情是 ① 等待一些耗时的任务&#xff08;特别是文件&#xff0c;网络操作&#xff09;而不阻塞当前任务&#xff1b; ② 异步编程提高响应能力&#xff08;特别是UI&#xff09; 开始一个任务后&#xff0c;让任务在离感应线…

机器学习:图文详细总结马尔科夫链及其性质(附例题分析)

目录0 写在前面1 从一个实例出发2 马尔科夫链3 马氏链的基本性质4 C-K方程5 平稳状态分布6 遍历性与例题分析0 写在前面 机器学习强基计划聚焦深度和广度&#xff0c;加深对机器学习模型的理解与应用。“深”在详细推导算法模型背后的数学原理&#xff1b;“广”在分析多个机器…

canopen4.0-canfestiva移植以及同步帧发送

1.canfestival移植入 工程包: 一、canfestival系列教程之程序移植 1.1、首先准备一个hal工程 ,cubmx --------------RCC配置 -----------SYS配置 ----------时钟配置 -----canopen定时器配置(开启中断) --------------can配置波特率,接收中断

CSS3知识点精学

CSS3 被拆分为"模块"。旧规范已拆分成小块&#xff0c;还增加了新的。 一些最重要 CSS3 模块如下&#xff1a; 选择器盒模型背景和边框文字特效2D/3D转换动画多列布局用户界面css引入方式 内嵌式&#xff1a;CSS写在style标签中&#xff0c;style标签虽然可以写在…

PE格式的base reloc分区

https://0xrick.github.io/win-internals/pe7/ 程序雕塑被编译之后&#xff0c;编译器假设可执行文件将会在特定1的v z基地址被加载&#xff0c;这个地址被保存在image_optional_header的imagebase成员中&#xff0c;一些地址会被计算出来然后硬编码到可执行文件中 出于各种原…

malmquist指数案例分析

传统的DEA模型可以反应静态的投入产出效率情况&#xff0c;但如果是面板数据&#xff0c;则需要使用malmquist指数进行研究。malmquist指数可以分析从t期到t1期的效率变化情况。Malmquist指数可分解为技术效率&#xff08;EC&#xff09;和技术进步&#xff08;TC&#xff09;&…

Java高手速成│Java程序怎样和数据库对话

从上一篇 Java高手速成│编写你第一个数据库程序 的例子中可以看出&#xff0c;Java和数据库的连接和对话离不开JDK库类&#xff0c;如java.sql包中支持数据库编程的各种API类、数据库软件DBMS、JDBC驱动软件或Java Connector以及你编写的数据库编程代码。 并且&#xff0c;在…

基础不牢,地动山摇系列 ------ 软硬通吃 unity常用API

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 秩沅 原创 收录于专栏 unity 实战系列 ⭐相关文章⭐ ——————————————————— -关于游戏剧情模式中用到的基础简单A…

第01讲:Git安装及基本操作

一、什么是Git 版本控制系统&#xff08;VCS&#xff09;是将『什么时候、谁、对什么文件做了怎样的修改』这样的信息以版本的形式保存并进行管理的系统。 简单来说&#xff0c;版本控制系统会去记录它所管理的文件的『历史版本』。 版本控制系统 “不是网盘&#xff0c;而胜…

linux常用命令(一)-路径切换及查看

切换路径-cd 我们可以使用cd&#xff08;change directory&#xff0c;切换目录&#xff09;的命令来进行目录切换 常规 其命令格式为 cd [相对路径或绝对路径] 例如&#xff1a; // 使用相对路径&#xff0c;切换到postfix目录下 cd ../postfix// 使用绝对路径切换到/var/…

Linux操作系统实验1——地址转换

实验要求&#xff1a; 1.在内核中先申请一个页面&#xff0c;使用内核提供的函数&#xff0c;按照寻页的步骤一步步的找到物理地址。这些步骤就相当于我们手动的模拟了mmu的寻页过程。(paging_lowmem.c) 2.通过mmap将物理内存的数据映射到一个设备文件中。 通过访问该设备就可以…

美图商业化2.0:探寻多元增长曲线

【潮汐商业评论/原创】 数字化智能化浪潮正席卷而来。与此前的工业革命、信息技术革命一样&#xff0c;这场箭在弦上的“数智化革命”核心也在于技术的突破与应用。 今年以来&#xff0c;AIGC作为AI技术在内容生产领域的应用&#xff0c;迎来了全球大厂的争相布局&#xff0c…

tensorflow2.x多层感知机模型参数量和计算量的统计

当创建了一个多层感知机模型后&#xff0c;如何调用接口获取该模型的参数量和计算量&#xff1f;首先&#xff0c;打印出模型结构&#xff0c;可通过graphviz模块实现 # 加载模型 model keras.models.load_model(modelPath) tf.keras.utils.plot_model(model, to_filemodel.p…

linux ubuntu 如何自动定时备份数据库到服务器 mysql mysqldump cron

linux ubuntu 如何自动定时备份数据库到服务器 mysql mysqldump cron 一、需求描述 我有一个小日记应用&#xff0c;从 2019 年到 2022 年已经出现了两次比较严重的数据丢失的情况&#xff0c;一次是服务器错误&#xff0c;一次是人为。 所以我急切需要它能自己自动备份数据库…

基于servlet+jsp+mysql实现的java web校园车辆管理系统

一、项目简介 本项目是一套基于servletjspmysql实现的java web校园车辆管理系统&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Java学习者。 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严…

WebDAV之葫芦儿·派盘+Orgzly

Orgzly 支持WebDAV方式连接葫芦儿派盘。 给大家推荐一款Android 上的笔记与 todo 待办事项相融合的应用,它通过树形结构来记录笔记,并且只需要对笔记进行 TODO 标记就能变身任务管理,十分方便。 Orgzly是一款多功能的应用,更确切地说,它是一款 org 文件编辑器。Org 和 …

ArcGIS基础实验操作100例--实验12以线要素分割面要素(二)

本实验专栏来自于汤国安教授《地理信息系统基础实验操作100例》一书 实验平台&#xff1a;ArcGIS 10.6 实验数据&#xff1a;请访问实验1&#xff08;传送门&#xff09; 基础编辑篇--实验12 以线要素分割面要素&#xff08;二&#xff09; 目录 一、实验背景 二、实验数据 …