Zookeeper Java SDK 开发入门

news2025/1/14 13:42:31

文章目录

    • 一、概述
    • 二、导入依赖包
    • 三、与 Zookeeper 建立连接
    • 四、判断 ZooKeeper 节点是否存在
    • 四、创建 ZooKeeper 节点数据
    • 五、获取 ZooKeeper 节点数据
    • 六、修改 ZooKeeper 节点数据
    • 七、异步获取 ZooKeeper 节点数据
    • 八、完整示例

如果您还没有安装Zookeeper请看ZooKeeper 安装说明,Zookeeper 命令使用方法和数据说明。

一、概述

  • ZooKeeper是一个开源的、分布式的协调服务,它主要用于分布式系统中的数据管理和协调任务。它提供了一个具有高可用性的分布式环境,用于存储和管理小规模数据,例如配置信息、命名服务、分布式锁等。

  • 本文主要介绍如何使用 Java 与 ZooKeeper 建立连接,进行数据创建、修改、读取、删除等操作。

  • 源码地址:https://github.com/apache/zookeeper

    在这里插入图片描述

二、导入依赖包

  • 在 pom.xml 文件中导入 Zookeeper 包,注意一般这个包的版本要和您安装的 Zookeeper 服务端版本一致。

    <dependency>
       <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.8.2</version>
    </dependency>
    
    

三、与 Zookeeper 建立连接

  • 与ZooKeeper集群建立连接使用 ZooKeeper 类,传递三个参数,分别是
    • connectionString ,是ZooKeeper 集群地址(没连接池的概念,是Session的概念)
    • sessionTimeout , 是ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。
    • watcher, ZooKeeper Session 级别监听器( Watcher),(Watch只发生在读方法上,如 get、exists等)
    private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);

        // ZooKeeper 集群地址(没连接池的概念,是Session的概念)
        String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181";
        // ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。
        Integer sessionTimeout = 3000;
        // ZooKeeper Session 级别 Watcher(Watch只发生在读方法上,如 get、exists)
        final ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    Event.KeeperState state = watchedEvent.getState();
                    Event.EventType type = watchedEvent.getType();
                    String path = watchedEvent.getPath();

                    switch (state) {
                        case Unknown:
                            break;
                        case Disconnected:
                            break;
                        case NoSyncConnected:
                            break;
                        case SyncConnected:
                            countDownLatch.countDown();
                            break;
                        case AuthFailed:
                            break;
                        case ConnectedReadOnly:
                            break;
                        case SaslAuthenticated:
                            break;
                        case Expired:
                            break;
                        case Closed:
                            break;
                    }
                    switch (type) {
                        case None:
                            break;
                        case NodeCreated:
                            break;
                        case NodeDeleted:
                            break;
                        case NodeDataChanged:
                            break;
                        case NodeChildrenChanged:
                            break;
                        case DataWatchRemoved:
                            break;
                        case ChildWatchRemoved:
                            break;
                        case PersistentWatchRemoved:
                            break;
                    }

                    System.out.println("Session watch state=" + state);
                    System.out.println("Session watch type=" + type);
                    System.out.println("Session watch path=" + path);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 由于建立连接是异步的,这里先阻塞等待连接结果
        countDownLatch.await();
        ZooKeeper.States state = zooKeeper.getState();
        switch (state) {
            case CONNECTING:
                break;
            case ASSOCIATING:
                break;
            case CONNECTED:
                break;
            case CONNECTEDREADONLY:
                break;
            case CLOSED:
                break;
            case AUTH_FAILED:
                break;
            case NOT_CONNECTED:
                break;
        }
        System.out.println("ZooKeeper state=" + state);

        return zooKeeper;
    }

四、判断 ZooKeeper 节点是否存在

  • 创建节点数据使用 exists 方法,传递四个参数
    • path , 表示节点目录名称
    • watch, 表示监听器(只对该路径有效)
    • stat, 判断结果回调函数
    • context, 自定义上下文对象
    private static void testExists(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
        // 判断 ZooKeeper 节点是否存在
        Object context = new Object();
        zooKeeper.exists("/yiqifu", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        }, new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int i, String s, Object o, Stat stat) {
                if(null != stat){
                    System.out.println("ZooKeeper /yiqifu 节点存在");
                }
                else {
                    System.out.println("ZooKeeper /yiqifu 节点不存在");
                }
            }
        }, context);
    }

四、创建 ZooKeeper 节点数据

  • 创建节点数据使用 create 方法,传递四个参数
    • path , 表示节点目录名称
    • data , 表示节点数据
   private static void testCreateNode(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException 	{

        // 在 ZooKeeper 中创建节点
        String nodeName = zooKeeper.create("/yiqifu", "test create data".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("ZooKeeper 创建节点成功:" + nodeName);
    }

五、获取 ZooKeeper 节点数据

  • 获取 ZooKeeper 节点数据使用 getData 方法,传递三个参数
    • path , 表示节点目录名称
    • watch, 表示路径级别的监听器,这个监听器只对该路径下的数据操作监听生效。
    private static void testGetdata(final ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {

        // 获取 ZooKeeper 节点数据,这里设置了Path级Watch
        final Stat stat = new Stat();
        byte[] nodeData = zooKeeper.getData("/yiqifu", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    Event.KeeperState state = watchedEvent.getState();
                    Event.EventType type = watchedEvent.getType();
                    String path = watchedEvent.getPath();

                    System.out.println("Path watch state=" + state);
                    System.out.println("Path watch type=" + type);
                    System.out.println("Path watch path=" + path);

                    //zooKeeper.getData("/yiqifu", true, stat); // 这里会使用Session级Watch
                    zooKeeper.getData("/yiqifu", this, stat);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, stat);
        System.out.println("ZooKeeper 同步获取节点数据:" + new String(nodeData));
    }

六、修改 ZooKeeper 节点数据

  • 修改 ZooKeeper 节点数据使用 setData 方法,传递三个参数
    • path , 表示节点目录名称。
    • data, 表示新数据。
    • version, 表示数据版本。
    private static void testSetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
        // 更新 ZooKeeper 节点数据(修改数据会触发Watch)
        zooKeeper.setData("/yiqifu", "test modify data 1 ".getBytes(), 0);
        zooKeeper.setData("/yiqifu", "test modify data 2 ".getBytes(), 1);
    }

七、异步获取 ZooKeeper 节点数据

  • 修改 ZooKeeper 节点数据使用 getData 方法,传递三个参数

    • path , 表示节点目录名称。

    • watch, 表示是否触发监听器。

    • dataCallback, 表示异步获取数据的回调函数。

 private static void testAsyncGetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {

        // 获取 ZooKeeper 节点数据(使用异步回调方式)
        Object context = new Object();
        zooKeeper.getData("/yiqifu", false, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
                System.out.println("ZooKeeper 同步获取节点数据的目录:"+s);
                System.out.println("ZooKeeper 异步获取节点数据:"+new String(bytes));
            }
        }, context);
    }

八、完整示例

package top.yiqifu.study.p131;


import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class Test01_Zookeeper {

    public static void main(String[] args) {
        try {
            // 创建 ZooKeeper 对象
            ZooKeeper zooKeeper = testCreateZookeeper();
            // 在 ZooKeeper 创建数据节点
            testCreateNode(zooKeeper);
            // 在 ZooKeeper 中同步获取节点数据
            testGetdata(zooKeeper);
            // 在 ZooKeeper 中更新节点数据
            testSetdata(zooKeeper);
            // 在 ZooKeeper 异步获取节点数据
            testAsyncGetdata(zooKeeper);

            Thread.sleep(3000);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }


    private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);

        // ZooKeeper 集群地址(没连接池的概念,是Session的概念)
        String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181";
        // ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。
        Integer sessionTimeout = 3000;
        // ZooKeeper Session 级别 Watcher(Watch只发生在读方法上,如 get、exists)
        final ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    Event.KeeperState state = watchedEvent.getState();
                    Event.EventType type = watchedEvent.getType();
                    String path = watchedEvent.getPath();

                    switch (state) {
                        case Unknown:
                            break;
                        case Disconnected:
                            break;
                        case NoSyncConnected:
                            break;
                        case SyncConnected:
                            countDownLatch.countDown();
                            break;
                        case AuthFailed:
                            break;
                        case ConnectedReadOnly:
                            break;
                        case SaslAuthenticated:
                            break;
                        case Expired:
                            break;
                        case Closed:
                            break;
                    }
                    switch (type) {
                        case None:
                            break;
                        case NodeCreated:
                            break;
                        case NodeDeleted:
                            break;
                        case NodeDataChanged:
                            break;
                        case NodeChildrenChanged:
                            break;
                        case DataWatchRemoved:
                            break;
                        case ChildWatchRemoved:
                            break;
                        case PersistentWatchRemoved:
                            break;
                    }

                    System.out.println("Session watch state=" + state);
                    System.out.println("Session watch type=" + type);
                    System.out.println("Session watch path=" + path);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        countDownLatch.await();
        ZooKeeper.States state = zooKeeper.getState();
        switch (state) {
            case CONNECTING:
                break;
            case ASSOCIATING:
                break;
            case CONNECTED:
                break;
            case CONNECTEDREADONLY:
                break;
            case CLOSED:
                break;
            case AUTH_FAILED:
                break;
            case NOT_CONNECTED:
                break;
        }
        System.out.println("ZooKeeper state=" + state);

        return zooKeeper;
    }

    private static void testCreateNode(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {

        // 在 ZooKeeper 中创建节点
        String nodeName = zooKeeper.create("/yiqifu", "test create data".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("ZooKeeper 创建节点成功:" + nodeName);
    }

    private static void testGetdata(final ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {

        // 获取 ZooKeeper 节点数据,这里设置了Path级Watch
        final Stat stat = new Stat();
        byte[] nodeData = zooKeeper.getData("/yiqifu", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    Event.KeeperState state = watchedEvent.getState();
                    Event.EventType type = watchedEvent.getType();
                    String path = watchedEvent.getPath();

                    System.out.println("Path watch state=" + state);
                    System.out.println("Path watch type=" + type);
                    System.out.println("Path watch path=" + path);

                    //zooKeeper.getData("/yiqifu", true, stat); // 这里会使用Session级Watch
                    zooKeeper.getData("/yiqifu", this, stat);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, stat);
        System.out.println("ZooKeeper 同步获取节点数据:" + new String(nodeData));
    }

    private static void testSetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
        // 更新 ZooKeeper 节点数据(修改数据会触发Watch)
        zooKeeper.setData("/yiqifu", "test modify data 1 ".getBytes(), 0);
        zooKeeper.setData("/yiqifu", "test modify data 2 ".getBytes(), 1);
    }

    private static void testAsyncGetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {

        // 获取 ZooKeeper 节点数据(使用异步回调方式)
        Object context = new Object();
        zooKeeper.getData("/yiqifu", false, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
                System.out.println("ZooKeeper 同步获取节点数据的目录:"+s);
                System.out.println("ZooKeeper 异步获取节点数据:"+new String(bytes));
            }
        }, context);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1213010.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

竞赛选题 深度学习花卉识别 - python 机器视觉 opencv

文章目录 0 前言1 项目背景2 花卉识别的基本原理3 算法实现3.1 预处理3.2 特征提取和选择3.3 分类器设计和决策3.4 卷积神经网络基本原理 4 算法实现4.1 花卉图像数据4.2 模块组成 5 项目执行结果6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &a…

消息通讯——MQTT WebHookSpringBoot案例

目录 EMQX WebHook介绍EMQX WebHook是什么EMQX WebHook配置项如何使用EMQX WebHook配置WebHook配置事件推送参数详解 SpringBoot集成Webhook实现客户端断连监控1. 实现前提2. 代码实现接口3. 监听结果 总结 EMQX WebHook介绍 EMQX WebHook是什么 EMQX WebHook 是由 emqx_web_…

Control的Invoke和BeginInvoke

近日&#xff0c;被Control的Invoke和BeginInvoke搞的头大&#xff0c;就查了些相关的资料&#xff0c;整理如下。感谢这篇文章对我的理解Invoke和BeginInvoke的真正含义 。 (一&#xff09;Control的Invoke和BeginInvoke 我们要基于以下认识&#xff1a; &#xff08;1&#x…

企业微信H5开发遇到的坑

企业微信官方推荐wx.agentConfig引用<script src"https://open.work.weixin.qq.com/wwopen/js/jwxwork-1.0.0.js"></script>是没有效果的 必须引用以下代码才有效果&#xff0c;这也是我看了社区的回答才有所收获&#xff0c;是一个坑 且VUE引用在线的…

C语言——分割单向链表

本文的内容是使用C语言分割单向链表&#xff0c;给出一个链表和一个值&#xff0c;要求链表中小于给定值的节点全都位于大于或等于给定值的节点之前&#xff0c;打印原始链表的所有元素和经此操作之后链表的所有元素。 分析&#xff1a;本题只是单向链表的分割&#xff0c;不涉…

【数据结构初阶】链表OJ

链表OJ 题目一&#xff1a;移除链表元素题目二&#xff1a;反转链表题目三&#xff1a;链表的中间节点题目四&#xff1a;链表中倒数第k个结点题目五&#xff1a;合并两个有序链表题目六&#xff1a;链表分割题目七&#xff1a;链表的回文结构题目八&#xff1a;相交链表题目九…

2023年【道路运输企业安全生产管理人员】证考试及道路运输企业安全生产管理人员模拟考试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年道路运输企业安全生产管理人员证考试为正在备考道路运输企业安全生产管理人员操作证的学员准备的理论考试专题&#xff0c;每个月更新的道路运输企业安全生产管理人员模拟考试题祝您顺利通过道路运输企业安全生…

独家推荐:7个极品APP设计模板,助你塑造高级审美

用户的视觉体验在APP开发中也至关重要&#xff0c;市场上广受好评的APP离不开精致的APP界面设计。高质量的APP界面设计模板可以为高质量的APP界面设计做出贡献。本文结合即时设计资源社区10个高质量的APP界面设计模板进行分析介绍&#xff0c;让您的界面设计更加抢眼。看完这些…

用户画像与用户分层

用户画像是重要的数据产品和运营抓手&#xff0c;指能够描述和刻画用户信息和的数据指标。通过用户画像&#xff0c;业务经营团队可以充分、深入、准确地了解用户在不同生命周期的特征&#xff0c;来制定高效的用户经营策略。用户画像&#xff0c;不论 Persona 还是 Profile &a…

[已解决]使用order by 排序后的是10 6 7 8 9 而不是 6 7 8 9 10?

问题 sql order by 排序后的为什么 是10 6 7 8 9 而不是 6 7 8 9 10? 思路 在 SQL 中&#xff0c;ORDER BY 默认的排序方式是升序&#xff08;从小到大&#xff09;。所以&#xff0c;如果您简单地使用 ORDER BY 对某个列进行排序&#xff0c;它会将数字按照升序排列&#…

前端element的el-tooltip鼠标经过显示文字,没有文字显示空黑框问题

场景&#xff1a; 有时候在使用element的el-tooltip时会使用三元表达式&#xff0c;满足某个条件后才显示提示文字&#xff0c;否则不展示文字&#xff0c;但是却出现在在没有文字时展示了黑框&#xff0c;如下图&#xff1a; 解决方案&#xff1a; 加一个disabled便可&#…

colormap与colorbar应用

一&#xff0c;colormap 常见色度枚举值如下 应用如下 img cv2.applyColorMap(img, cv2.COLORMAP_JET) cv2.imshow(img,img) cv2.waitKey(0) cv2.destroyAllWindows() 常用的COLORMAP_JET效果如下&#xff0c;该模式常用于生成热力图 二&#xff0c;colorbar colorbar所有…

Python自动化测试:web自动化测试——Selenium框架

web自动化测试1 Selenium介绍web自动化实现原理环境准备1&#xff09;Seleniumpython环境搭建安装步骤环境变量的配置 2&#xff09;浏览器驱动驱动下载驱动环境配置 3&#xff09;版本检查4&#xff09;其他异常情况排查版本不一致未激活卸载、降低/升级setuptools版本 web自动…

day2324_jdbc

今日内容 零、 复习昨日 一、作业 二、SQL注入 三、PreparedStatement 四、事务 五、DBUtil 零、 复习昨日 一、引言 1.1 如何操作数据库 使用客户端工具访问数据库&#xff0c;需要手工建立连接&#xff0c;输入用户名和密码登录&#xff0c;编写 SQL 语句&#xff0c;点击执行…

沉浸式航天vr科普馆VR太空主题馆展示

科普教育从小做起&#xff0c;现在我们的很多地方小孩子游乐体验不单单只有草坪玩耍体验&#xff0c;还有很多科普知识的体验馆和游玩馆。虽然现在我们还不能真实的上太空或者潜入海底&#xff0c;但是这些现在已经可以逼真的展示在我们面前。通过一种虚拟现实技术手段。人们带…

深入解析JavaScript中的变量作用域与声明提升

JS中的变量作用域 背景&#xff1a; ​ 之前做js逆向的时候&#xff0c;有一个网站很有意思&#xff0c;就是先出现对其赋值&#xff0c;但是后来的变量赋值没有对其发生修改&#xff0c;决定说一下js中的作用域问题. 全局作用域&#xff1a; ​ 全局作用域的变量可以在任何…

QT使用Socket与安卓Socket互发消息

背景:安卓设备通过usb网络共享给Linux,此时安卓设备与linux处于同一网络环境,符合使用socket的条件,linux做客户端,安卓做服务端 1.QT使用Socket (1).在工程文件中加入 QT network (2).导包以及写一些槽函数用做数据传输与状态接收 #ifndef MAINWINDOW_H #define MAINWINDOW…

pycharm安装库失败

项目场景 pycharm安装第三方库 问题描述 python 安装第三方库总是安装失败 原因分析&#xff1a; 提示&#xff1a;这里填写问题的分析&#xff1a; 1.网络 2.网墙 解决方案&#xff1a; 加个镜像 –trusted-host mirrors.aliyun.com

JTS: 24 MinimumDiameter 最小矩形

文章目录 版本代码 版本 org.locationtech.jts:jts-core:1.19.0 链接: github 代码 package pers.stu.algorithm;import org.locationtech.jts.algorithm.MinimumDiameter; import org.locationtech.jts.geom.Coordinate; import org.locationtech.jts.geom.Geometry; import…