Zookeeper的Java API操作

news2025/1/24 8:39:14

Zookeeper的Java API操作

    • 一、先启动Zookeeper集群
    • 二、IDEA 环境搭建
    • 三、创建子节点
    • 四、获取子节点并监听节点变化
    • 五、判断 Znode 是否存在
    • 六、Watcher工作流程

一、先启动Zookeeper集群

二、IDEA 环境搭建

1.创建一个Maven工程:ZookeeperProject
2.在pom.xml文件添加如下内容:

<dependencies>
	<dependency>
		<groupId>junit</groupId>
		<artifactId>junit</artifactId>
		<version>RELEASE</version>
	</dependency>
	<dependency>
		<groupId>org.apache.logging.log4j</groupId>
		<artifactId>log4j-core</artifactId>
		<version>2.8.2</version>
	</dependency>
	<dependency>
		<groupId>org.apache.zookeeper</groupId>
		<artifactId>zookeeper</artifactId>
		<version>3.5.7</version>
	</dependency>
</dependencies>

3.拷贝log4j.properties文件到项目根目录
需要在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在文件中填入:

log4j.rootLogger=INFO, stdout 
log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n 
log4j.appender.logfile=org.apache.log4j.FileAppender 
log4j.appender.logfile.File=target/spring.log 
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout 
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

三、创建子节点

package com.hyj.zk;

import org.apache.zookeeper.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

public class CreateZnode {
    //注意:逗号前后不能有空格  指定Zookeeper服务器列表
    private static String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";
    /* sessionTimeout指会话的超时时间,是一个以“毫秒”为单位的整型值。
     在ZooKeeper中有会话的概念,在一个会话周期内,ZooKeeper客户端和服务端之间会通过心跳检测机制来维持会话的有效性,
     一旦在sessionTimeout时间内没有进行有效的心跳检测,会话就会失效。*/
    private static int sessionTimeout=100000;
    private ZooKeeper zkClient=null;
    @Before
    public void init() throws IOException {
        //创建一个Zookeeper实例来连接Zookeeper服务器   Watcher会话监听器,服务端将会触发监听
        zkClient=new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override  //收到事件通知后的回调函数(用户的业务逻辑)
            public void process(WatchedEvent watchedEvent) {
                
            }
        });
    }
    @Test    //创建子节点
    public void create() throws InterruptedException, KeeperException {
        /* 参数 1:要创建的节点的路径; 参数 2:节点数据(一个字节数组) ;
         参数 3:节点权限 ;ZooDefs.Ids.OPEN_ACL_UNSAFE表示以后对这个节点的任何操作都不受权限控制
         参数 4:节点的类型   持久无序号节点PERSISTENT    持久带序号节点 PERSISTENT_SEQUENTIAL (persistent_sequential)
                           短暂无序号节点EPHEMERAL     短暂带序号节点 EPHEMERAL_SEQUENTIAL (ephemeral_sequential)
         */
        String s = zkClient.create("/sanguo/xiyouji", "sunwu".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
    @After
    public void close() throws InterruptedException {
        zkClient.close();
    }
}

从ZookKeeper系列:watch机制截的一张图在这里插入图片描述

监听的事件类型有:

  • None 客户端连接状态发生改变的时候,会收到None事件通知(如连接成功,连接失败,session会话过期等)
  • NodeCreated 节点被创建
  • NodeDeleted 节点被删除
  • NodeDataChanged 节点数据被修改
  • NodeChildrenChanged 子节点被创建或删除

四、获取子节点并监听节点变化

package com.hyj.zk;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

public class GetChildren {
    //注意:逗号前后不能有空格  指定Zookeeper服务器列表
    private static String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";
    /* sessionTimeout指会话的超时时间,是一个以“毫秒”为单位的整型值。
     在ZooKeeper中有会话的概念,在一个会话周期内,ZooKeeper客户端和服务端之间会通过心跳检测机制来维持会话的有效性,
     一旦在sessionTimeout时间内没有进行有效的心跳检测,会话就会失效。*/
    private static int sessionTimeout=100000;
    private ZooKeeper zkClient=null;
    @Before
    public void init() throws IOException {
        //创建一个Zookeeper实例来连接Zookeeper服务器   Watcher会话监听器,服务端将会触发监听
        zkClient=new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override  //收到事件通知后的回调函数(用户的业务逻辑)
            public void process(WatchedEvent watchedEvent) {
                if(watchedEvent.getType() == Event.EventType.None){
                    if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
                        System.out.println("Zookeeper连接成功!!!");
                    }else if(watchedEvent.getState() == Event.KeeperState.Disconnected){
                        System.out.println("客户端和服务器的连接断开!!!");
                    }else if (watchedEvent.getState() == Event.KeeperState.Expired){
                        System.out.println("session会话过期!!!");
                    }
                }else{
                    System.out.println(watchedEvent.getType() + "--" + watchedEvent.getPath());
                    try {
                        //再次监听(注册一次,监听一次)
                        List<String> children = zkClient.getChildren("/", true); //false表示不监听,true表示使用默认的watcher
                        for (String child : children) {
                            System.out.println(child);
                        }
                    } catch (KeeperException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                }
        });
    }
    @Test   //获取子节点并监听节点路径变化
    public void getChildren() throws InterruptedException, KeeperException {
        // 参数1: 表示监听的节点     参数2: true表示监听 ,false表示不监听
        List<String> children = zkClient.getChildren("/", true); //使用默认的watcher
        for (String child : children) {
            System.out.println(child);
        }
        //延时阻塞
        Thread.sleep(Long.MAX_VALUE);
    }
    @Test   //获取子节点不监听节点路径变化
    public void getChildren2() throws InterruptedException, KeeperException {
        // 参数1: 表示监听的节点     参数2: true表示监听 ,false表示不监听
        List<String> children = zkClient.getChildren("/", false);  //不注册watcher
        for (String child : children) {
            System.out.println(child);
        }
    }
    @Test   //获取子节点并监听节点路径变化
    public void getChildren3() throws InterruptedException, KeeperException {
        // 参数1: 表示监听的节点     参数2: true表示监听 ,false表示不监听
        List<String> children = zkClient.getChildren("/", new Watcher() {  //注册新的watcher
            @Override //收到事件通知后的回调函数(用户的业务逻辑)
            public void process(WatchedEvent watchedEvent) {
                System.out.println(watchedEvent.getType() + "------" + watchedEvent.getPath());
                try {
                    List<String> children = zkClient.getChildren("/", false); //这里若是true它还是会使用默认的watcher
                    for (String child : children) {
                        System.out.println(child);
                    }
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        for (String child : children) {
            System.out.println(child);
        }
        //延时阻塞
        Thread.sleep(Long.MAX_VALUE);
    }
    @After
    public void close() throws InterruptedException {
        zkClient.close();
    }
}


五、判断 Znode 是否存在

package com.hyj.zk;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

public class IsExistNode {
    //注意:逗号前后不能有空格  指定Zookeeper服务器列表
    private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    /* sessionTimeout指会话的超时时间,是一个以“毫秒”为单位的整型值。
     在ZooKeeper中有会话的概念,在一个会话周期内,ZooKeeper客户端和服务端之间会通过心跳检测机制来维持会话的有效性,
     一旦在sessionTimeout时间内没有进行有效的心跳检测,会话就会失效。*/
    private static int sessionTimeout = 100000;
    private ZooKeeper zkClient = null;

    @Before
    public void init() throws IOException {
        //创建一个Zookeeper实例来连接Zookeeper服务器   Watcher会话监听器,服务端将会触发监听
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override //收到事件通知后的回调函数(用户的业务逻辑)
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getType() != Event.EventType.None) {
                    System.out.println(watchedEvent.getType() + "--" + watchedEvent.getPath());
                    try {
                        List<String> children = zkClient.getChildren("/", false); //false表示不监听,true表示使用默认的watcher
                        for (String child : children) {
                            System.out.println(child);
                        }
                    } catch (KeeperException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }

    @Test
    public void exist() throws InterruptedException, KeeperException {
        // 参数1: 表示要判断的节点     参数2: true表示监听 ,false表示不监听
        Stat stat = zkClient.exists("/sanguo", false); //不注册watcher
        System.out.println(stat == null ? "not exist" : "exist");
    }

    @Test
    public void exist2() throws InterruptedException, KeeperException {
        // 参数1: 表示要判断的节点     参数2: true表示监听此节点变化 ,false表示不监听
        Stat stat = zkClient.exists("/sanguo", true);  //使用默认的watcher
        System.out.println(stat == null ? "not exist" : "exist");
        //延时阻塞
        Thread.sleep(Long.MAX_VALUE);
    }

    @Test
    public void exist3() throws InterruptedException, KeeperException {
        // 参数1: 表示要判断的节点     参数2: true表示监听此节点变化 ,false表示不监听
        Stat stat = zkClient.exists("/sanguo", new Watcher() {
            @Override  //收到事件通知后的回调函数(用户的业务逻辑)
            public void process(WatchedEvent watchedEvent) {  //注册新的watcher
                System.out.println(watchedEvent.getType() + "--" + watchedEvent.getPath());
                try {
                    List<String> children = zkClient.getChildren("/", false); //false表示不监听,true表示使用默认的watcher
                    for (String child : children) {
                        System.out.println(child);
                    }
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        System.out.println(stat == null ? "not exist" : "exist");
        //延时阻塞
        Thread.sleep(Long.MAX_VALUE);
    }

    @After
    public void close() throws InterruptedException {
        zkClient.close();
    }
}

六、Watcher工作流程

Client 向 Zookeeper 服务端注册一个 Watcher ,同时将Watcher对象存储在客户端的 WatcherManager 中。当Zookeeper 服务端的一些指定事件触发了 Watcher 事件时,就会向客户端发送事件通知,客户端就会从WatcherManager 中取出对应的 Watcher 进行回调。

Watcher工作机制分为三个过程:

  1. 客户端注册Watcher

  2. 服务端处理Watcher

  3. 客户端回调Watcher

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/403941.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ARM uboot 的移植4 -从 uboot 官方标准uboot开始移植

一、添加DDR初始化1 1、分析下一步的移植路线 (1) cpu_init_crit 函数成功初始化串口、时钟后&#xff0c;转入 _main 函数&#xff0c;函数在 arch/arm/lib/crt0.S 文件中。 (2) 在 crt0.S 中首先设置栈&#xff0c;将 sp 指向 DDR 中的栈地址&#xff1b; #if defined(CONF…

CNCF x Alibaba云原生技术公开课 【重要】第九章 应用存储和持久化数据卷:核心知识

1、Pod Volumes 场景 同一个pod中的某个容器异常退出&#xff0c;kubelet重新拉起来&#xff0c;保证容器之前产生数据没丢同一个pod的多个容器共享数据 常见类型 本地存储&#xff0c;常用的有 emptydir/hostpath&#xff1b;网络存储&#xff1a;网络存储当前的实现方式有两…

2021年我国半导体分立器件市场规模已达3037亿元,国内功率半导体需求持续快速增长

半导体分立器件是由单个半导体晶体管构成的具有独立、完整功能的器件。例如&#xff1a;二极管、三极管、双极型功率晶体管(GTR)、晶闸管(可控硅)、场效应晶体管(结型场效应晶体管、MOSFET)、IGBT、IGCT、发光二极管、敏感器件等。半导体分立器件制造&#xff0c;指单个的半导体…

proteus I2C Debugger 查看 AT24C02写入读取

I2C Debugger仪器&#xff0c;在仿真调试期中&#xff0c;该仪器可以显示I2C数据传送时间、S&#xff08;START状态&#xff09;、Sr(ReStart状态&#xff09;、A&#xff08;Ask响应&#xff09;、N &#xff08;No ask状态&#xff09;、P&#xff08;Stop状态&#xff09;、…

中值滤波+Matlab仿真+频域响应分析

中值滤波 文章目录中值滤波理解中值滤波的过程Matlab 实现实际应用频域分析中值滤波是一种滤波算法&#xff0c;其目的是去除信号中的噪声&#xff0c;而不会对信号本身造成太大的影响。它的原理非常简单&#xff1a;对于一个给定的窗口大小&#xff0c;将窗口内的数值排序&…

【C++进阶】四、红黑树(三)

目录 一、红黑树的概念 二、红黑树的性质 三、红黑树节点的定义 四、红黑树的插入 五、红黑树的验证 六、红黑树与AVL树的比较 七、完整代码 一、红黑树的概念 红黑树&#xff0c;是一种二叉搜索树&#xff0c;但在每个结点上增加一个存储位表示结点的颜色&#xff0c;可…

嵌入式安防监控项目——html框架分析和环境信息刷新到网页

目录 一、html控制LED 二、模拟数据上传到html 一、html控制LED 简单来说就是html给boa服务器发了一个控制指令信息&#xff0c;然后boa转发给cgi进程&#xff0c;cgi通过消息队列和主进程通信。主进程再去启动LED子线程。 这是老师给的工程。 以前学32都有这工具那工具来管…

导航技术调研(CSDN_0023_20221217)

文章编号&#xff1a;CSDN_0023_20221217 目录 1. 惯性导航 2. 组合导航技术 3. 卡尔曼滤波 1. 惯性导航 惯性导航系统(INS-Inertial Navigation System)是上个世纪初发展起来的。惯性导航是一种先进的导航方法&#xff0c;但实现导航定位的原理却非常简单&#xff0c;它是…

RHCSA-用户和组管理和文件系统权限(3.11)

目录 用户&#xff08;UID&#xff09; 用户类别&#xff08;UID&#xff09;&#xff1a; 用户的增删改查&#xff1a; 修改用户密码&#xff1a; 查看用户是否存在&#xff1a; 组&#xff08;GID&#xff09; 组的增删改查&#xff1a; 设置组密码&#xff1a; 用户…

idea集成GitHub

设置 GitHub 账号绑定账号有两种方式&#xff1a;1. 通过授权登录2.如果上述登录不成功&#xff0c;用Token口令的方式登录&#xff0c;口令在github账号哪里生成&#xff0c;点击settings --->Developer settings --->pwrsonal access tokens ----> 复制口令到idea 口…

设置cpp-httplib 服务器模式模式下的线程池大小 以及如何增加默认处理函数 以便能够实现http请求转发

先说说默认的创建的线程池数量 原因是某天调试在gdb调试下 一启动程序发现 开启了好多线程 如下图 因为我们程序 没几个线程 数了下 居然有60多个线程 不需要那么多 所以看下 httplib的源码 构造函数的时候 设置了最大线程池数量 看下这个宏 然后打印了下 发现 居然那么大 …

FusionCompute安装和配置步骤

1. 先去华为官网下载FusionCompute的镜像 下载地址&#xff1a;https://support.huawei.com/enterprise/zh/distributed-storage/fusioncompute-pid-8576912/software/251713663?idAbsPathfixnode01%7C22658044%7C7919788%7C9856606%7C21462752%7C8576912 下载后放在D盘中&am…

【rabbitmq 实现延迟消息-插件版本安装(docker环境)】

一&#xff1a;插件简介 在rabbitmq 3.5.7及以上的版本提供了一个插件&#xff08;rabbitmq-delayed-message-exchange&#xff09;来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上。 二&#xff1a;插件安装 1&#xff1a;选择适合自己安装mq 版本的插件&#xff1…

设计模式---抽象工厂模式

目录 1 介绍 2 优缺点 3 实现 1 介绍 抽象工厂模式(Abstract Factory Pattern) 是围绕一个超级工厂创建其他工厂。该超级工厂又称为其他工厂的工厂。这种类型的设计模式属于创建型模式&#xff0c;它提供了一种创建对象的最佳方式。 在抽象工厂模式中&#xff0c;接口是负…

ROC和AUC

目录 ROC AUC ROC ROC曲线是Receiver Operating Characteristic Curve的简称&#xff0c;中文名为"受试者工作特征曲线"。ROC曲线的横坐标为假阳性率(False Postive Rate, FPR)&#xff1b;纵坐标为真阳性率(True Positive Rate, TPR).FPR和TPR的计算方法分别为 F…

Spring——案例-业务层接口执行效率和AOP通知获取数据+AOP总结

执行时间获取:记录开始时间和结束时间&#xff0c;取差值。 这里使用环绕通知来实现。 环境准备: 项目文件结构: 业务层接口和实现类: 数据层: 采用mybatis注解开发&#xff0c;这里没有实现类&#xff0c;直接在接口方法里面实现映射。 domain层: 实现了数据库里面每一个…

Altium designer--软件简介及安装教程(Altium designer16)

一、软件介绍&#xff08;完整安装包资源见文末链接&#xff0c;含破解license&#xff09; Altium Designer 是一款简单易用、原生3D设计增强的一体化设计环境&#xff0c;结合了原理图、ECAD库、规则和限制条件、BoM、供应链管理、ECO流程和世界一流的PCB设计工具。通过原理…

Baumer工业相机中曝光与增益两种功能的优点和作用以及使用方法

项目场景 Baumer工业相机堡盟相机是一种高性能、高质量的工业相机&#xff0c;可用于各种应用场景&#xff0c;如物体检测、计数和识别、运动分析和图像处理。 Baumer的万兆网相机拥有出色的图像处理性能&#xff0c;可以实时传输高分辨率图像。此外&#xff0c;该相机还具有…

[NOIP2009 提高组] 最优贸易(C++,tarjan,topo,DP)

题目描述 $C 国有国有国有 n 个大城市和个大城市和个大城市和 m$ 条道路&#xff0c;每条道路连接这 nnn个城市中的某两个城市。任意两个城市之间最多只有一条道路直接相连。这 mmm 条道路中有一部分为单向通行的道路&#xff0c;一部分为双向通行的道路&#xff0c;双向通行的…

OpenHarmony通过MQTT连接 “改版后的华为IoT平台”

一、前言 本篇文章我们使用的是BearPi-HM_Nano开发板:小熊派的主板+E53_IA1扩展板 源码用的是D6_iot_cloud_oc,点击下载BearPi-HM_Nano全量源码 那么为什么要写这篇呢? 前段时间看到OpenHarmony群里,经常有小伙伴问接入华为IoT平台的问题,他们无法正常连接到华为IoT平台等…