Zookeeper客户端命令、JAVA API、监听原理、写数据原理以及案例

news2024/11/19 14:38:40

1. Zookeeper节点信息

指定服务端,启动客户端命令:

bin/zkCli.sh -server 服务端主机名:端口号

1)ls /  查看根节点下面的子节点

     ls -s /  查看根节点下面的子节点以及根节点详细信息

其中,cZxid是创建节点的事务id,每次修改Zookeeper的状态都会产生一个事务id;

ctime是节点被创建的毫秒数(从1970年开始),这里是zookeeper自带的默认节点,其ctime就是0;mZxid是节点最后被更新的事务id;

mtime是节点最后修改的毫秒数;pZxid是最后更新的子节点的事务id;ephemeralOwner如果是临时节点则表示拥有这个节点的s

ession id,如果不是临时节点则为0;

dataLength是该节点的数据长度;

numChildren是该节点的子节点数量。

2. Zookeeper节点类型

持久节点:客户端和服务端断开连接后,创建的节点不删除

短暂/临时节点:客户端和服务端断开连接后,创建的节点删除

上面两种节点还可以继续分为带序号和不带序号的,如果带序号,节点名称后面会接一个数值,顺序递增,由父节点维护。

创建永久节点:create path "val"

(注意,ls命令后面的路径不能以/结尾,这里跟Linux不一样)

查询节点的值:get -s path

创建带序号的永久节点:create -s path "val"

以相同的路径再次创建同名节点,带序号的节点会自动序号加1,不带序号的节点创建报错

以上是永久节点,退出客户端之后这些节点依然存在

创建临时节点:create -e path "val"

创建带序号的临时节点只需加上-s即可

因为已经有shuguo,weiguo,wuguo,所以新创建的带序号的临时节点的序号为3。

断开客户端之后,上面创建的临时节点wuguo会被删除。

修改节点的值:set path "newVal"

3. 监听原理

监听主要是通过getChildren和getData来实现,表面上是获取节点子节点或者节点数据的方法,但是第二个参数表示是否监听,一般为true(第二个参数也可以传一个自定义的监听器),所以实现了监听,当子节点发生增减或者节点数据发生变化时,就会通知客户端,触发process方法。getChildren和getData是Java API监听方式,稍后介绍,这里先介绍命令行监听。

命令行开启监听节点数据:get -w path

在另一个会话端修改sanguo的节点值,在本端会产生事件通知:

再次在另一个会话端修改sanguo的节点值,在本端不会产生事件通知,因为监听只生效一次,要想再次监听,需要再次注册,即执行get -w path

监控子节点变化:ls -w path

再另一个会话端创建一个子节点,在本端会产生一个事件通知

节点删除:delete path

删除节点及其下面的子节点:deleteall path

查看节点状态:stat path

4. Java API

添加pom依赖:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.7</version>
</dependency>

建立Zookeeper连接:

Zookeeper zk = new Zookeeper(connectString, sessionTimeout, new Watcher() {
    @Override
    public void process(WatchedEvent watchedEvent) {
    
    }        
})

其中connectString是主机地址,如果有多个主机,用逗号隔开,中间不能有空格。sessionTimeout是超时时间,单位是微秒。第三个参数是监听器,里面一般是根据事件类型以及事件路径来做相应的处理,也可在里面继续调用getChildren或者getData方法实现持续监听。

一旦连接上Zookeeper之后就会调用到process方法,里面一般会根据事件类型来对某个countDownLatch变量进行减1操作,在主线程中会等待这个变量为0,即等待Zookeeper连接上。

创建节点:

String node = zk.create(path, data, ZooDfs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

监控子节点的增删(注册监听):

List<String> children = zk.getChildren(path, true)

for (String child : children) {
    System.out.println(child);
}

要想验证对子节点增删的监听,首先在java主线程中添加一个睡眠的函数,使其持续运行不至于很快结束,然后在process回调中添加相应的打印代码(比如继续getChildren,打印子节点信息),这样手动去添加节点,会执行到process函数中的打印信息。

判断节点是否存在:

Stat stat = ck.exists(path, false);
System.out.println(stat == null ? "not exist" : "exist");

5. 写数据原理

1)写请求直接发给Leader

其中,只要有半数节点写完,就可以发送ack给客户端,其他没写的服务端稍后再写。

2)  写请求发给Follower

这里也是半数节点写完就发送ack给客户端,所不同的是由接受写请求的Follower发送给客户端,而不是Leader,因为客户端最开始建立连接的是Follower。

6. 服务器动态上下线监听案例

分析:客户端监听服务器的上下线,本质是监听子节点的增删,服务器启动时会去Zookeeper集群注册(临时)子节点,使用的是create操作,而客户端监听则是get操作。注意这里的服务器和客户端对于Zookeeper集群来说都是客户端。

于是,代码主要分两部分,服务器创建子节点和客户端监听子节点。

// 服务器
private Zookeeper zk;

public static void main(String[] args) throws Exception {
    // 创建本类(服务器类)的对象
    DistributeServer server = new DistributeServer();

    // 建立Zookeeper连接
    server.getConnect();

    // 注册,即创建子节点
    server.register(args[0]);

    // 服务端业务逻辑(睡觉)
    server.business();
}

private void business() throws Exception {
    Thread.sleep(Long.MAX_VALUE);
}

private void register(String hostName) throws Exception {
    String create = zk.create("/servers/" + hostName, hostName.getBytes(), ZooDef.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMRAL_SEQUENTIAL);
    System.out.println(hostName + "is online");
}

private void getConnect() throws Exception {
    zk = new Zookeeper("xxx", 2000, new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent){
            
        }
    });
}
// 客户端
private Zookeeper zk;

public static void main(String[] args) throws Exception {
    // 创建本类(服务器类)的对象
    DistributeClient client = new DistributeClient();

    // 建立Zookeeper连接
    client.getConnect();

    // 注册,即创建子节点
    client.getServerList();

    // 客户端业务逻辑(睡觉)
    client.business();
}

private void business() throws Exception {
    Thread.sleep(Long.MAX_VALUE);
}

private void getServerList() throws Exception {
    List<String> children = zk.getChildren("/servers", true);
    
    List<String> servers = new ArrayList<>;
    for (String child : children) {
        byte[] data = zk.getData("/servers/" + child, false, null);
        servers.add(new String(data));
    }

    System.out.println(servers);
}

private void getConnect() throws Exception {
    zk = new Zookeeper("xxx", 2000, new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent){
               getServerList();
        }
    });
}

验证时,可以先验证客户端功能,服务端可以先用create -e -s 来代替,如果客户端功能ok,再继续验证服务端功能。

7. 分布式锁案例

分析:进程用客户端表示,每个客户端进程会去Zookeeper中创建一个临时带序号的子节点,如果子节点序号最小,则表示获取到锁,否则监听前一个序号更小的节点,持有锁执行完业务之后,会删除节点,表示释放锁,后面的节点/进场即可获取到锁。

private Zookeeper zk;
private String waitPath;
private String currentNode;
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);

public DistributeClient() throws Exception {
    // 创建本类(服务器类)的对象
    DistributeClient client = new DistributeClient();

    // 建立Zookeeper连接
    getConnect();

    connectLatch.await();
}


// 加锁,创建临时节点,并判断是否是序号最小的节点,如果是则获取到锁,处理业务,如果不是,则监听前一个序号较小的节点
public void lock() throws Exception {
    // currentNode是全路径名
    String currentNode = zk.create("/locks/seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAGE, CreateMode.EPHEMETAL_SEQUENTIAL);

    List<String> children = zk.getChildren("locks", false);
    if (children.size() == 1) {
        return;
    } else {
        Collections.sort(children);
        
        String thisNode = currentNode.substring("/locks/".length());
        int index = children.indexOf(thisNode);
        if (index == -1) {
            System.out.println("数据异常");
        } else if (index == 0) {
            return;
        } else {
            // 监听前一个节点
            waitPath = "/locks/" + children.get(index -1);
            zk.getData(waitPath, true, null);

            waitLatch.await();
        }
    }
    
}

public void unlock()  throws Exception {
    zk.delete(currentNode, -1);
}

private void getConnect() throws Exception {
    zk = new Zookeeper("xxx", 2000, new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getType() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();       
                }

                if (watchedEvent.getType() == Event.EventType.NodeDeleted && Event.getPath.equals(waitPath)) {
                    waitLatch.countDown();
                }
        }
    });
}

测试步骤:建立两个线程,每个线程起一个客户端去加锁解锁(加日志打印),加锁解锁之间有睡眠,运行这两个线程,可以看到最终只有1个客户端去持有锁

8. Curator框架

上述分布式锁的案例中,有如下缺点:

1)会话印布链接,需要自己使用CountDownLatch处理

2)监听需要重复注册

3)代码较复杂

4)不支持多节点删除与创建

因此引入Curator框架,添加pom依赖
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>4.3.0</version>
</dependency>
public static void main(String[] args) {
    // 获取分布式锁1
    InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework, "/locks");

    // 获取分布式锁2
    InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework, "/locks");

    // 启动两个线程,分别加锁释放锁,该锁可重入
    // 加锁: lock1.acquire(); 释放锁: lock1.release();
}

private static CuratorFramework getCuratorFramework() {
    ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000, 3);
    
    CuratorFramework client =  CuratorFrameworkFactory.builder().connectString("xxx").connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(retry).build();

    client.start();
    System.out.println("客户端启动成功");

    return client;
}

9. 面试题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1469591.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

QWidget: Must construct a QApplication before a QWidget 13:25:48: 程序异常结束。

QWidget: Must construct a QApplication before a QWidget 13:25:48: 程序异常结束。 你的插件是release&#xff0c;而你用了debug模式、

Yolov8有效涨点:YOLOv8-AM,采用多种注意力模块提高检测精度,含代码,超详细

前言 2023 年&#xff0c;Ultralytics 推出了最新版本的 YOLO 模型。注意力机制是提高模型性能最热门的方法之一。 本次介绍的是YOLOv8-AM&#xff0c;它将注意力机制融入到原始的YOLOv8架构中。具体来说&#xff0c;我们分别采用四个注意力模块&#xff1a;卷积块注意力模块…

max_element和min_element使用

头文件 #include<alorithm> 作用 用于返回数组或容器中最值元素(最小值、最大值)&#xff0c;值和下标。 使用举例 #include<iostream> #include<vector> #include<algorithm> using namespace std; int main() {/*数组初始化*/vector<int>…

Django入门指南:从环境搭建到模型管理系统的完整教程

环境安装&#xff1a; ​ 由于我的C的Anaconda 是安装在C盘的&#xff0c;但是没内存了&#xff0c;所有我将环境转在e盘&#xff0c;下面的命令是创建环境到指定目录中. conda create --prefixE:\envs\dj42 python3.9进入环境中&#xff1a; conda activate E:\envs\dj42…

unity学习(41)——创建(create)角色脚本(panel)——UserHandler(收)+CreateClick(发)——发包!

1.客户端的程序结构被我精简过&#xff0c;现在去MessageManager.cs中增加一个UserHandler函数&#xff0c;根据收到的包做对应的GameInfo赋值。 2.在Model文件夹下新增一个协议文件UserProtocol&#xff0c;内容很简单。 using System;public class UserProtocol {public co…

Unity与Android交互通信系列(5)

在前述文章中&#xff0c;已经使用了AndroidJavaProxy代理接口&#xff0c;本节我们将详细的介绍AndroidJavaProxy代理的用法。正如其名&#xff0c;AndroidJavaProxy是一个代理&#xff0c;它在Android端代码与Unity端代码交互中起一个桥接作用。其一般用法为在Java代码中定义…

【C++】类和对象之拷贝构造函数篇

个人主页 &#xff1a; zxctscl 文章封面来自&#xff1a;艺术家–贤海林 如有转载请先通知 文章目录 1. 前言2. 传值传参和传引用传参3. 概念4. 特征 1. 前言 在前面学习了6个默认成员函数中的构造函数和析构函数 【C】构造函数和析构函数详解&#xff0c;接下来继续往后看拷…

S-35390A计时芯片介绍及开发方案

计时芯片 S-35390A芯片是计时芯片&#xff0c;一般用来计算时间。低功耗&#xff0c;宽电压&#xff0c;受温度影响小&#xff0c;适用于很多电路。它有一个问题&#xff0c;不阻止用户设置不存在的时间&#xff0c;设置进去之后计时或者闹钟定时会出错。 规格书阅读 首先我…

如何移除禁用WordPress默认小工具(附WordPress默认小工具名称)

WordPress 自带的小工具非常多&#xff0c;但是我们用到的也就那么几种&#xff0c;甚至一种都不会用到&#xff0c;所以很有必要注销&#xff08;去除&#xff09;掉一些不用的小工具。实现的方法也很简单&#xff0c;只需将以下代码&#xff0c;根据自己的情况删除需要用的小…

精品基于SpringBoot的体育馆场地预约赛事管理系统的设计与实现-选座

《[含文档PPT源码等]精品基于SpringBoot的体育馆管理系统的设计与实现[包运行成功]》该项目含有源码、文档、PPT、配套开发软件、软件安装教程、项目发布教程、包运行成功&#xff01; 软件开发环境及开发工具&#xff1a; Java——涉及技术&#xff1a; 前端使用技术&#…

掌握3个Mock工具,轻松玩转单元测试

公司要求提升单元测试的质量&#xff0c;提高代码的分支覆盖率和行覆盖率&#xff0c;安排我研究单元测试&#xff0c;指定方案分享并在开发部普及开。 单元测试中的Mock的目的 Mock的主要目的是让单元测试Write Once, Run Everywhere. 即编写一次后&#xff0c;可以在任意时…

程序媛的mac修炼手册-- 2024如何彻底卸载Python

啊&#xff0c;前段时间因为想尝试chatgpt的API&#xff0c;需要先创建一个python虚拟环境来安装OpenAI Python library. 结果&#xff0c;不出意外的出意外了&#xff0c;安装好OpenAI Python library后&#xff0c;因为身份认证问题&#xff0c;根本就没有获取API key的权限…

【YOLO系列算法人员摔倒检测】

YOLO系列算法人员摔倒检测 模型和数据集下载YOLO系列算法的人员摔倒检测数据集可视化数据集图像示例&#xff1a; 模型和数据集下载 yolo行人跌倒检测一&#xff1a; 1、训练好的行人跌倒检测权重以及PR曲线&#xff0c;loss曲线等等&#xff0c;map达90%多&#xff0c;在行人跌…

GEE必须会教程—曾“几何”时(Geometry类型)

几何图形组成了世界万物&#xff0c;在数学史具有重要地位&#xff0c;将几何图形迁移到地理空间信息的处理上&#xff0c;我们我们得到就是研究区域的边界范围&#xff0c;因此&#xff0c;在学习矢量数据和栅格数据之前&#xff0c;我们有必要了解几何图形在GEE上的编辑。 1…

019 Spring Boot+Vue 电影院会员管理系统(源代码+数据库+文档)

部分代码地址&#xff1a; https://github.com/XinChennn/xc019-cinema 一、系统介绍 cinema项目是一套电影院会员管理系统&#xff0c;使用前后端分离架构开发包含管理员、会员管理、会员卡管理、电影票、消费记录、数据统计等模块 二、所用技术 后端技术栈&#xff1a; …

【爬虫逆向实战篇】定位加密参数、断点调试与JS代码分析

文章目录 1. 写在前面2. 确认加密参数3. 加密参数定位4. XHR断点调试 【作者主页】&#xff1a;吴秋霖 【作者介绍】&#xff1a;Python领域优质创作者、阿里云博客专家、华为云享专家。长期致力于Python与爬虫领域研究与开发工作&#xff01; 【作者推荐】&#xff1a;对JS逆向…

openssl3.2 - 编译 - zlib.dll不要使用绝对路径

文章目录 openssl3.2 - 编译 - 编译时的动态库zlib.dll不要使用绝对路径概述测试zlib特性在安装好的目录中是否正常笔记70-test_tls13certcomp.t80-test_cms.t对测试环境的猜测从头再编译测试安装一次测试一下随便改变位置的openssl用到zlib时是否好使测试一下随便改变位置的op…

11-ATF初始化Normal cpu context

ATF (Arm Trusted Firmware) 初始化 Normal CPU Context 的步骤主要涉及设置正常世界(Normal World)CPU的环境,确保在从安全世界(Secure World)切换到正常世界时,CPU能够正确执行正常世界的代码。以下是一般步骤的概述: CPU启动与初始化: 在系统启动时,CPU首先在安全世…

ChatGPT Plus遇到订阅被拒原因与解决方案

ChatGPT Plus被广泛认为相比普通版本更快、更强&#xff0c;并且能最先体验新功能。 很多小伙伴再订阅时遇到图片中的问题 错误提示包括这些&#xff1a; Your credit card was declined.Try paying with a debit card instead.您的信用卡被拒绝了。请尝试用借记卡支付。你的…

Vue+SpringBoot打造高校实验室管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、研究内容2.1 实验室类型模块2.2 实验室模块2.3 实验管理模块2.4 实验设备模块2.5 实验订单模块 三、系统设计3.1 用例设计3.2 数据库设计 四、系统展示五、样例代码5.1 查询实验室设备5.2 实验放号5.3 实验预定 六、免责说明 一、摘…