Zookeeper节点操作

news2025/1/22 19:43:35

ZooKeeper的节点操作

ZooKeeper的节点类型

ZooKeeper其实也是一个分布式集群,其中维护了一个目录树结构,在这个目录树中,组成的部分是一个个的节点。ZooKeeper的节点可以大致分为两种类型: 短暂类型持久类型

  • 短暂类型ephemeral: 客户端和服务器断开后,创建的节点自己删除。
  • 持久类型persistent: 客户端和服务器断开后,创建的节点不删除(默认情况)。
节点类型描述信息
EPHEMERAL临时节点,在会话结束后自动被删除。
EPHEMERAL_SEQUENTIAL临时顺序节点,在会话结束后会自动被删除。会在给定的path节点名称后添加一个序列号。
PERSISTENT永久节点,在会话结束后不会被自动删除。
PERSISTENT_SEQUENTIAL永久顺序节点,在会话结束后不会被自动删除。会在给定的path节点名称后添加一个序列号。

ZooKeeper的Shell操作

打开Shell客户端

  1. 连接到当前节点的Server服务

    [root@qianfeng01 ~]# zkCli.sh
    复制代码
  2. 连接到其他节点的Server服务

    [root@qianfeng01 ~]# zkCli.sh -server qianfeng02:2181
    复制代码

Shell操作

ls

作用: 查看某个节点下的子节点
选项: 
    -s 查看具体信息,包括time、version等信息
注意事项: 需要使用绝对路径查看
示例:
    ls /
    ls /zookeeper
    ls -s /zookeeper/config
复制代码

create

作用: 创建一个节点,可以设置节点的初始内容
选项:
    -e: 设置短暂类型节点
    -s: 设置顺序节点
示例:
    create /test
    create /test2 "content message"
    create -e /test3 "content message"
    create -e -s /test "content message"
复制代码

get

作用: 获取节点存储的值
选项:
    -s: 同时获取版本描述信息,例如: time、version等
示例:
    get /zookeeper/config
    get -s /zookeeper/config
复制代码

hehe 节点数据信息
cZxid = 0x800000002 节点创建时的zxid
ctime = Thu May 09 03:41:15 CST 2019 节点创建的时间
mZxid = 0x800000002 对应节点最近一次修改的时间,与子节点无关
mtime = Thu May 09 03:41:15 CST 2019 节点最近一次更新的时间
pZxid = 0x800000002 对应节点与子节点(或者子节点)的修改的时间,与孙子节点无关
cversion = 0 子节点数据更新次数
dataVersion = 0 本节点数据更新次数
aclVersion = 0 节点授权信息(ACL)的更新次数
ephemeralOwner = 0x0 如果该节点为临时节点,ephemeralOwner值表示与该节点绑定的session id. 如果该节点不是临时节点,ephemeralOwner值为0
dataLength = 4 节点的数据长度
numChildren = 0 子节点的个数

set

作用: 设置节点存储的值
示例:
    set /test "content message"
复制代码

delete

作用: 删除节点,只能删除空节点,即没有子节点的节点
示例:
    delete /test
复制代码

deleteAll

作用: 删除节点,可以递归删除所有的子节点
示例:
    deleteAll /test
复制代码

addWatch

作用: 监听节点,当这个节点发生变化(内容、创建、删除)会得到通知
示例:
    addWatch /test
复制代码

removewatches

作用: 移除对节点的监听
示例:
    removewatches /test
复制代码

quit

作用: 退出客户端
复制代码

IDEA操作ZooKeeper的API操作

pom依赖

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
</dependency>
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.3</version>
</dependency>
复制代码

初始化ZooKeeper客户端对象

package com.qianfeng.zk;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

/**
 * ZooKeeper的API操作
 *
 * @author 千锋大数据教研院
 */
public class ZkAPI {
    // Zookeeper客户端对象
    private ZooKeeper zkCli;

    @Before
    public void init() throws IOException, InterruptedException {
        // 连接到ZooKeeper的Server端
        String connectString = "qianfeng01:2181,qianfeng02:2181,qianfeng03:2181";
        // 连接超时时间
        int sessionTimeout = 5000;

        // 初始化一个ZooKeeper客户端实例,需要参数: 服务端、连接超时时间、观察者做回调
        zkCli = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // 暂不做任何处理
            }
        });
    }
}
复制代码

创建节点(同步)

注意: 以下的API操作,需要使用到zkCli对象。在上述的初始化部分已经完成了对zkCli对象的初始化。后面的所有操作,只需要将方法粘贴到ZkAPI类中即可。需要导入的包,也在上方的初始化部分导入完成了。

// ACL权限类型:
//     OPEN_ACL_UNSAFE: 完全开放的ACL,任何连接的客户端都可以操作该节点
//     CREATOR_ALL_ACL: 只有创建者才有ACL权限
//     READ_ACL_UNSAFE: 只能读取ACL
//
// CreateMode:
//     EPHEMERAL: 临时型
//     EPHEMERAL_SEQUENTIAL: 临时顺序型
//     PERSISTENT: 永久型
//     PERSISTENT_SEQUENTIAL: 永久顺序型
//
// 同步创建节点,遇到不正常的情况直接抛出异常。
@Test
public void createNode() throws InterruptedException, KeeperException {
    zkCli.create("/ApiNode1", "ApiContent1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
复制代码

文末扫码有福利!

创建节点(异步)

@Test
public void createNodeAsync() throws InterruptedException {
    zkCli.create("/ApiNode2", "ApiContent2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new CreateNodeCallBack(), "");
    // 因为是异步操作,如果不等待一下,操作直接就结束了,来不及等到回调事件的触发
    Thread.sleep(Integer.MAX_VALUE);
}

private static class CreateNodeCallBack implements AsyncCallback.StringCallback {
    /**
     * 异步创建节点回调方法
     * @param i 服务端响应代码: 0 => 成功,-4 => 客户端与服务端连接断开,-110 => 指定节点已存在,-112 => 会话已过期
     * @param s 创建的节点路径
     * @param o 创建节点时传递的ctx
     * @param s1 在ZK上实际创建的节点名(针对顺序节点)
     */
    @Override
    public void processResult(int i, String s, Object o, String s1) {
        System.out.println("i = " + i + ", s = " + s + ", o = " + o + ", s1 = " + s1);
        switch (i) {
            case 0:
                System.out.println("节点创建成功");
                break;
            case -4:
                System.out.println("客户端与服务端连接已断开");
                break;
            case -110:
                System.out.println("指定节点已存在");
                break;
            case -112:
                System.out.println("会话已过期");
                break;
        }
    }
}
复制代码

删除节点(同步)

// 同步删除,遇到无法正常删除的情况,直接异常
@Test
public void deleteNode() throws Exception {
    zkCli.delete("/ApiNode1", -1);
}
复制代码

删除节点(异步)

@Test
public void deleteNodeAsync() throws Exception {
    zkCli.delete("/ApiNode1", -1, new DeleteNodeCallBack(), "");
    Thread.sleep(Integer.MAX_VALUE);
}

/**
 * 删除节点回调
 */
private static class DeleteNodeCallBack implements AsyncCallback.VoidCallback {
    /**
     * 异步删除节点回调方法
     * @param rc 服务端响应代码: 0 => 成功,-4 => 客户端与服务端连接断开,-110 => 指定节点已存在,-112 => 会话已过期
     * @param path 删除的节点路径
     * @param ctx 删除节点时传递的ctx
     */
    @Override
    public void processResult(int rc, String path, Object ctx) {
        System.out.println("删除结果:rc=" + rc + ",path=" + path + ",ctx=" + ctx);
        switch (rc) {
            case 0:
                System.out.println("节点删除成功");
                break;
            case -4:
                System.out.println("客户端与服务端连接已断开");
                break;
            case -112:
                System.out.println("会话已过期");
                break;
            default:
                System.out.println("服务端响应码" + rc + "未知");
                break;
        }
    }
}
复制代码

修改节点内容(同步)

@Test
public void setNode() throws InterruptedException, KeeperException {
    zkCli.setData("/ApiNode1", "hello".getBytes(), -1);
}
复制代码

修改节点内容(异步)

@Test
public void setNodeAsync() throws Exception {
    zkCli.setData("/ApiNode2", "hello".getBytes(), -1, new SetNodeCallBack(), "");
    Thread.sleep(Integer.MAX_VALUE);
}

private static class SetNodeCallBack implements AsyncCallback.StatCallback {
    /**
     * @param rc 服务端响应代码: 0 => 成功,-4 => 客户端与服务端连接断开,-110 => 指定节点已存在,-112 => 会话已过期
     * @param path 修改的节点路径
     * @param ctx  修改节点时传递的ctx
     * @param stat 节点状态,由服务器端响应的新stat替换
     */
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        switch (rc) {
            case 0:
                System.out.println("节点数据设置成功");
                break;
            case -4:
                System.out.println("客户端与服务端连接已断开");
                break;
            case -112:
                System.out.println("会话已过期");
                break;
            default:
                System.out.println("服务端响应码" + rc + "未知");
                break;
        }
    }
}
复制代码

获取节点内容(同步)

@Test
public void getNode() throws InterruptedException, KeeperException {
    // 实例化对象,用于记录节点的状态信息
    Stat stat = new Stat();
    // 获取数据
    byte[] data = zkCli.getData("/ApiNode1", true, stat);
    // 打印数据
    System.out.println(new String(data));
    // 打印节点状态信息
    System.out.println(stat);
    System.out.println(stat.getVersion());
    System.out.println(stat.getCtime());
}
复制代码

获取节点内容(异步)

@Test
public void getNodeAsync() throws InterruptedException {
    zkCli.getData("/ApiNode1", true, new GetNodeCallBack(), "");
    Thread.sleep(Integer.MAX_VALUE);
}

private static class GetNodeCallBack implements AsyncCallback.DataCallback {
    /**
     * 获取数据的回调
     * @param rc 服务端响应代码: 0 => 成功,-4 => 客户端与服务端连接断开,-110 => 指定节点已存在,-112 => 会话已过期
     * @param path 获取数据的节点路径
     * @param ctx 获取数据时传递的ctx
     * @param data 获取到的节点数据
     * @param stat 获取到的节点状态
     */
    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        switch (rc) {
            case 0:
                System.out.println("节点数据获取成功: " + new String(data));
                break;
            case -4:
                System.out.println("客户端与服务端连接已断开");
                break;
            case -112:
                System.out.println("会话已过期");
                break;
            default:
                System.out.println("服务端响应码" + rc + "未知");
                break;
        }
    }
}
复制代码

获取所有子节点(同步)

@Test
public void getChildren() throws InterruptedException, KeeperException {
    List<String> children = zkCli.getChildren("/", true);
    System.out.println(children);
}
复制代码

获取所有子节点(异步)

@Test
public void getChildrenAsync() throws InterruptedException {
    zkCli.getChildren("/", true, new GetChildrenCallBack(), "");
    Thread.sleep(Integer.MAX_VALUE);
}

private static class GetChildrenCallBack implements AsyncCallback.ChildrenCallback {

    /**
     * 获取到所有的子节点的回调
     * @param rc 服务端响应代码: 0 => 成功,-4 => 客户端与服务端连接断开,-110 => 指定节点已存在,-112 => 会话已过期
     * @param path 获取子节点的节点路径
     * @param ctx 调用方法传递的ctx
     * @param children 所有的子节点
     */
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children) {
        switch (rc) {
            case 0:
                System.out.println("子节点获取成功: " + children);
                break;
            case -4:
                System.out.println("客户端与服务端连接已断开");
                break;
            case -112:
                System.out.println("会话已过期");
                break;
            default:
                System.out.println("服务端响应码" + rc + "未知");
                break;
        }
    }
}

 更多大数据精彩内容欢迎B站搜索“千锋教育”或者扫码领取大数据学习全套资料!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/624897.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Creating Add-in Hooks (C#)

本文介绍如何使一个文件在添加、检入、检出到库时&#xff0c;让add-in 程序在SOLIDWORKS PDM Professional 中通知到你。 注意&#xff1a; 因为 SOLIDWORKS PDM Professional 无法强制重新加载Add-in程序 &#xff0c;必须重新启动所有客户端计算机&#xff0c;以确保使用最…

电力综合自动化系统在煤矿领域的设计与应用

安科瑞虞佳豪 持续的高温&#xff0c;给能源保供带来严峻的考验。针对南方部分地区电力供应紧张的局面&#xff0c;煤炭资源大省山西&#xff0c;在确保安全生产的基础上&#xff0c;积极协调增产保供。 这几天&#xff0c;南方多地持续高温&#xff0c;用电量达到高峰。在山西…

深入理解深度学习——注意力机制(Attention Mechanism):Bahdanau注意力

分类目录&#xff1a;《深入理解深度学习》总目录 之前我们探讨了机器翻译问题&#xff1a; 通过设计一个基于两个循环神经网络的编码器—解码器架构&#xff0c; 用于序列到序列学习。 具体来说&#xff0c;循环神经网络编码器将长度可变的序列转换为固定形状的上下文变量&…

抖音seo矩阵系统源码搭建步骤分享

目录 账号矩阵系统源码搭建包括以下步骤&#xff1a; 二、代码实现 三、 代码展示 四、 服务交付 故障级别定义 服务响应时间 账号矩阵系统源码搭建包括以下步骤&#xff1a; 1. 准备服务器和域名 准备一台服务器&#xff0c;例如阿里云、腾讯云等。并在网站上购买一个域…

C++:类型转换

目录 一. C语言的类型转换 二. C类型转换 2.1 static_cast 2.2 reinterpret_cast 2.3 const_cast 2.4 dynamic_cast 三. 运行时类型识别 -- RTTI 四. 总结 一. C语言的类型转换 C语言的类型转换分为隐式类型转换和强制类型转换&#xff0c;隐式类型转换发生在相近的类…

WEB测试环境搭建和测试方法大全

一、WEB测试环境搭建 WEB测试时搭建测试环境所需的软硬件包括&#xff1a;电脑一台、JDK1.6、Tomcat7.0、mysql、IE浏览器、Firefox浏览器、Chrome浏览器、SVN客户端 通过SVN客户端导出最新的Web工程部署到Tomcat7.0下的webapps中&#xff0c;另外重要的一点就是修改数据库连…

31、js - Promise

一、Promise要点 -> js中&#xff0c;只有Promise对象才可以使用.then().catch()方法。 -> axios可以使用.then().catch()&#xff0c;完全是因为调用axios()&#xff0c;返回的是一个Promise对象。 -> new Promise() 里面的代码是同步代码&#xff0c;一旦调用promis…

这个API Hub太厉害了,太适合接口测试了,收录了钉钉企业微信等开放Api的利器

目录 前言&#xff1a; 01API Hub的项目 02API Hub 03调试 04 API 调试 05 API mock 06 针对开放项目功提供者 08 下载 前言&#xff1a; API Hub 的优势在于它提供了完整的 API 管理解决方案&#xff0c;包括API的设计、接口调试、测试和文档管理等。通过集中管理API…

火热报名中 | KCD 北京精彩抢“鲜”看

​ 仲夏已至&#xff0c;风云再起&#xff0c;Kubernetes Community Days 北京站英雄帖一经发出&#xff0c;云原生的各路英雄豪杰纷纷响应。经典招式的升级亮相&#xff0c;最新技巧的惊喜面世&#xff0c;且看各路门派京城聚首&#xff0c;掀起一场云原生的武林论道。各大议…

深入解析Cloudflare五秒盾与爬虫绕过技巧

最近一个朋友发现一个比较有趣的网站&#xff0c;他说正常构造一个HTTP请求居然拿不到网站页面的信息&#xff0c;网站页面如下&#xff1a; 别看它只是一个普普通通的小说网站。随后我在本地环境验证了一下&#xff0c;果不其然得到了以下信息&#xff1a; 从上面反馈的信息…

Yakit: 集成化单兵安全能力平台使用教程·进阶篇

Yakit: 集成化单兵安全能力平台使用教程进阶篇 1.数据处理数据对比Codec2.插件仓库1.数据处理 数据对比 该功能主要提供一个可视化的差异比对工具,用于分析两次数据之间的区别。使用场景可能包括:枚举用户名时比较登录成功和失败时服务器端反馈结果的差异、使用 Web Fuzzer…

【css3实现华为充电】那些你没想到的CSS效果之华为充电效果(附源码下载)

【写在前面】今天是高考的第二天&#xff0c;在这里我也祝各位学子能够旗开得胜&#xff0c;进入自己理想的大学&#xff0c;借着今天这个吉日我就和大家介绍一下如何用css实现华为充电效果。 涉及知识点&#xff1a;CSS3特效&#xff0c;华为充电特效实现&#xff0c;CSS属性f…

部署DR模式 LVS负载均衡群集

部署DR模式 LVS负载均衡群集 一、LVS-DR数据包流向分析二、DR模型的特点三、DR模式 LVS负载均衡群集部署 一、LVS-DR数据包流向分析 &#xff08;1&#xff09;客户端发送请求到 Director Server&#xff08;负载均衡器&#xff09;&#xff0c;请求的数据报文&#xff08;源 …

SYSU程设c++(第十五周)

vector容器 1.要开vector库 2.vector<T> 是动态的连续数组&#xff0c;可以列表初始化 vector<int> ivec(10, 2); //创建10个值为2的元素 3.可以靠[ ]、at(int)、front、back、迭代器访问其中元素&#xff0c;其中at会自动检查下标越界&#xff0c;抛出异常 4.迭…

【资料分享】ESD防护设计-常见ESD保护电路图

ESD防护设计 ESD防护设计的目的是&#xff0c;当集成电路任意两个输入/输出引脚之间发生ESD事件时&#xff0c;集成电路内部的ESD防护系统能及时开启来泄放掉大量的瞬时电流/电压,使内部电路免遭破坏。此外&#xff0c;在集成电路正常工作时&#xff0c;即未发生ESD事件时&…

Jmeter Suite安装中influx一直处于pending状态

目录 【前言】 【背景说明】 【问题表现】 【排查思路】 简单重试 深入分析 直面本质 【小结】 【写在最后】 完整版文档下载方式&#xff1a; 【前言】 今天要和大家聊聊一个关于Jmeter Suite安装的问题——“influx一直处于pending状态”。 作为一名老测试&#…

文心一言 VS 讯飞星火 VS chatgpt (35)-- 算法导论5.3 5题

五、证明:在过程 PERMUTE-BY-SORTING的数组 P中&#xff0c;所有元素都唯一的概率至少是1-1/n。 文心一言&#xff1a; 证明&#xff1a; 在过程PERMUTE-BY-SORTING中&#xff0c;对于输入数组P中的每个元素&#xff0c;其出现次数为1&#xff0c;因此对于每个元素&#xff…

配置Linux服务器时间同步

这里说一下为什么要配置时间同步这个问题&#xff0c;为了避免各虚拟机的时间出现不一致的情况而引发集群故障。这里还要了解一下什么是系统时间&#xff0c;什么是硬件时间。   这里配置时间同步我选择的是Chrony。Chrony是网络时间协议&#xff08;NTP&#xff09;的通用实…

这11款协同软件大大提升你的工作效率

无论是产品经理还是开发工程师&#xff0c;他们都是各自领域的专家&#xff0c;所以他们可以成为设计过程的一部分。 协作设计软件为设计过程提供了沟通方式和工具&#xff0c;以便团队成员甚至客户能够交流、分享他们的意见和想法。 本文将盘点11个协作设计软件&#xff01;…

Jenkins DingTalk 钉钉通知插件

目录 前言 一、相关文档 二、组件版本 三、钉钉配置 四、Jenkins配置 1、安装钉钉插件DingTalk 2、在Jenkins用户管理中填写钉钉手机号 3、在Jenkins中配置钉钉 5、在流水线任务中编写pipeline 写在最后 完整版文档下载方式&#xff1a; 前言 今天要和大家聊聊一个…