ZooKeeper 实战(四) Curator Watch事件监听

news2024/9/24 23:30:40

文章目录

  • ZooKeeper 实战(四) Curator Watch事件监听
    • 0.前言
    • 1.Watch 事件监听概念
    • 2.NodeCache
      • 2.1.全参构造器参数
      • 2.2.代码DEMO
      • 2.3.日志输出
    • 3.PathChildrenCache
      • 3.1.全参构造器参数
      • 3.2.子节点监听时间类型
      • 3.2.代码DEMO
    • 4.TreeCache
      • 4.1.构造器参数
      • 4.2.代码DEMO
      • 4.3.日志输出

ZooKeeper 实战(四) Curator Watch事件监听

0.前言

上一篇博客只介绍了有关Curator中对ZNode的CRUD操作,从本篇起开始逐步介绍更加高级的API操作。

1.Watch 事件监听概念

ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。虽然ZooKeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员反复注册Watcher,比较繁琐。

而 Curator 引入了Cache 来实现对 ZooKeeper 服务端事件的监听。

Curator 中提供了三种 Cache(Watcher)来监听不同节点变化类型:

  • NodeCache:监听指定的节点。
  • PathChildrenCache:监听指定节点的子节点。
  • TreeCache:监听指定节点及其子孙节点。

2.NodeCache

监听指定的节点,增删改都会监听。

2.1.全参构造器参数

/**
 * @param: client 注册监听的客户端
 * @param: path 节点路径
 * @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false
 */
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);

2.2.代码DEMO

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");
        String path = "/ahao/watcher";
        TimeUnit.SECONDS.sleep(3);

        // 创建NodeCache对象
        NodeCache nodeCache = new NodeCache(client,path);
        // 添加监听器
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                ChildData currentData = nodeCache.getCurrentData();
                if (currentData != null){
                    String s = new String(currentData.getData(),StandardCharsets.UTF_8);
                    log.info("监听{}节点发生变化,数据内容:{}",path,s);
                }else {
                    log.info("监听{}节点被删除了",path);
                }
            }
        });
      	// 开启监听
        nodeCache.start();

        TimeUnit.SECONDS.sleep(2);
        // 创建节点
        client.create().creatingParentsIfNeeded().forPath(path,"第一次新增".getBytes(StandardCharsets.UTF_8));
        TimeUnit.SECONDS.sleep(2);
        // 更新节点
        client.setData().forPath(path,"数据修改了".getBytes(StandardCharsets.UTF_8));
        TimeUnit.SECONDS.sleep(2);
        // 删除节点
        client.delete().deletingChildrenIfNeeded().forPath(path);
    }

2.3.日志输出

在这里插入图片描述

3.PathChildrenCache

监听指定节点的子节点。当一个子节点增删改时, PathChildrenCache会包含最新的子节点的数据和状态。

3.1.全参构造器参数

/**
 * @param: client 注册监听的客户端
 * @param: path 节点路径
 * @param: cacheData 是否缓存节点内容(包含节点状态)
 * @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false
 * @param: executorService 用于PathChildrenCache的后台线程的线程池。该线程池应该是单线程的,否则缓存可能会看到不一致的结果
 */
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)

3.2.子节点监听时间类型

public enum Type
{
  	// 子节点添加
    CHILD_ADDED,
  	// 子节点的数据变更
    CHILD_UPDATED,
		// 子节点被删除
    CHILD_REMOVED,
 
  	// 以下三个事件类型表示:当连接断开时,PathChildrenCache将继续保持其断开连接之前的状态,并且在连接恢复后,PathChildrenCache将为断开连接期间发生的所有添加、删除和更新发出正常的子事件。
  	// 当连接状态处于ConnectionState.SUSPENDED。
    CONNECTION_SUSPENDED,
  	// 当连接状态处于ConnectionState.RECONNECTED
    CONNECTION_RECONNECTED,
  	// 当连接状态处于ConnectionState.LOST
    CONNECTION_LOST,
  	
  	// 当通过PathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT)启动监听时,该事件表示PathChildrenCache初始化完成
  This event signals that the initial cache has been populated.
    INITIALIZED
}

3.2.代码DEMO

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");
        String path = "/ahao/watcher";
        TimeUnit.SECONDS.sleep(3);

        // 创建PathChildrenCache对象
        // 此处的cacheData参数一定要设置为true,不然Curator不会缓存数据当本地,
        // 那么后续pathChildrenCache.getCurrentData()得到的数据都为null
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client,path,true);
        // 添加监听器
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                if (event.getType() == PathChildrenCacheEvent.Type.INITIALIZED){
                    log.info("PathChildrenCache初始化完,事件类型:{}", event.getType());
                }else {
                    ChildData currentData = event.getData();
                    log.info("事件类型:{},监听到的子节点发生变化:{}",event.getType(),currentData.getPath());
                }
            }
        });
        // 开启监听
        pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

        // 创建子节点
        TimeUnit.SECONDS.sleep(2);
        client.create().creatingParentsIfNeeded().forPath(path+"/c1");
        client.create().creatingParentsIfNeeded().forPath(path+"/c2");
        client.create().creatingParentsIfNeeded().forPath(path+"/c3/age");
        // 修改子节点
        TimeUnit.SECONDS.sleep(2);
        client.setData().forPath(path+"/c1","c1更新了".getBytes(StandardCharsets.UTF_8));
        client.setData().forPath(path+"/c2","c2更新了".getBytes(StandardCharsets.UTF_8));
        // 删除子节点
        TimeUnit.SECONDS.sleep(2);
        client.delete().deletingChildrenIfNeeded().forPath(path+"/c3");
    }

3.3.日志输出

可以看出,PathChildrenCache只会监听直属子节点的变化,其非直属子节点的后代节点如/c3/age,没有发布通知。

在这里插入图片描述

4.TreeCache

监听指定节点及其子孙节点。

4.1.构造器参数

/**
 * @param: client 注册监听的客户端
 * @param: path 节点路径
 */
public TreeCache(CuratorFramework client, String path)
  
/**
 * @param: client 注册监听的客户端
 * @param: path 节点路径
 * @param: cacheData 是否缓存节点内容(包含节点状态)
 * @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false
 * @param: maxDepth 最大深度。最深的那个后代节点到path所需要经过的节点数
 * @param: executorService 用于PathChildrenCache的后台线程的线程池。该线程池应该是单线程的,否则缓存可能会看到不一致的结果
 * @param: createParentNodes 是否需要创建父节点。如果父节点不存在泽创建父节点(容器节点)
 * @param: TreeCacheSelector TreeCache选择器。根据指定的策略和条件,选择适合的缓存树来创建和维护TreeCache
 */
TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, TreeCacheSelector selector)

4.2.代码DEMO

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");
        String path = "/ahao/watcher/tree";
        TimeUnit.SECONDS.sleep(3);

        // 创建TreeCache对象,也可通过TreeCache.newBuilder()创建
        TreeCache treeCache = new TreeCache(client,path);
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                if (event.getType() == TreeCacheEvent.Type.INITIALIZED){
                    log.info("TreeCache初始化完,事件类型:{}", event.getType());
                }else {
                    ChildData currentData = event.getData();
                    log.info("事件类型:{},监听到的子节点发生变化:{}",event.getType(),currentData.getPath());
                }
            }
        });
        // 开启监听
        treeCache.start();

        // 创建节点
        TimeUnit.SECONDS.sleep(2);
        client.create().creatingParentsIfNeeded().forPath(path);
        client.create().creatingParentsIfNeeded().forPath(path +"/t1");
        client.create().creatingParentsIfNeeded().forPath(path +"/t2/ccc");
        // 修改子节点
        TimeUnit.SECONDS.sleep(2);
        client.setData().forPath(path,"根节点更新了".getBytes(StandardCharsets.UTF_8));
        client.setData().forPath(path +"/t2/ccc","/t2/ccc更新了".getBytes(StandardCharsets.UTF_8));
        // 删除子节点
        TimeUnit.SECONDS.sleep(2);
        client.delete().deletingChildrenIfNeeded().forPath(path +"/t2");
    }

4.3.日志输出

可以看出TreeCache会监听当前节点和后代节点的变化。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1380401.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Flink standalone集群部署配置

文章目录 简介软件依赖部署方案二、安装1.下载并解压2.ssh免密登录3.修改配置文件3.启动集群4.访问 Web UI 简介 Flink独立模式(Standalone)是部署 Flink 最基本也是最简单的方式:所需要的所有 Flink 组件, 都只是操作系统上运行…

别再为创业失败找借口了!否则你永远无法创业成功!2024适合上班族的创业,2024个人创业做什么

每当聊起创业,很多人嘴上都很积极,行动都很低迷,事后就开始找各种理由开始否定创业这个路,要么就是大环境不好,要么就是行业太差,还有就是竞争太多,反正不会是自己的能力太差。 其实创业没有你想…

Postgres 中文周报:Postgres Weekly 537 期

本周报由 Cloudberry Database 社区编译自英文版《Postgres Weekly》,译文较原文有所调整。 推荐博文 🏆 PostgreSQL: The DBMS of the Year 2023 PostgreSQL 荣获 DB-Engines 网站 2023 年度 DBMS 冠军。DB-Engines 收集了 480 款数据库系统信息并跟踪…

各版本 操作系统 对 .NET Framework 与 .NET Core 支持

有两种类型的受支持版本:长期支持 (LTS) 版本和标准期限支持 (STS) 版本。 所有版本的质量都是一样的。 唯一的区别是支持的时间长短。 LTS 版本可获得为期三年的免费支持和补丁。 STS 版本可获得 18 个月的免费支持和修补程序。 有关详细信息,请参阅 .N…

2024年美国大学生数学建模思路 - 复盘:校园消费行为分析

文章目录 0 赛题思路1 赛题背景2 分析目标3 数据说明4 数据预处理5 数据分析5.1 食堂就餐行为分析5.2 学生消费行为分析 建模资料 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 1 赛题背景 校园一卡通是集…

mac 上 ssh: connect to host localhost port 22: Connection refused

1。 问题 在搭建hadoop环境的时候 发现ssh localhost 在报错 2. 解决 打开系统设置 -> 共享 -> -> 在左边服务中选择 远程登录 注意红框这些选项慎重选择!!! 修改后,在终端再次 ssh localhost 发现登录成功了 如果…

SkipList 的索引过程,能否越两级搜索

“SkipList 的索引过程,能否越两级搜索?” 昨天,一个工作 7 年的粉丝,去某外包公司面试,被问到这个问题不知道该怎么回答。 今天正好有空,给大家分享一下这个问题的回答思路。 对了,这个问题…

【保姆级教程】【YOLOv8替换主干网络】【1】使用efficientViT替换YOLOV8主干网络结构

《博主简介》 小伙伴们好,我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源,可关注公-仲-hao:【阿旭算法与机器学习】,共同学习交流~ 👍感谢小伙伴们点赞、关注! 《------往期经典推…

08-微服务链路追踪案例

4.4.1:环境说明 dubbo provider: 192.168.58.153 dubbo consumer: 192.168.58.154 zookeeper: 192.168.58.1554.4.2: zookeeper 部署 ~$ apt install openjdk-11-jdk -y ~$ wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.3/apache-zookeeper-3.8.3-bin.…

【计算机组成原理】指令流水线的三种冒险情况(Hazards)

冒险 在计算机架构中,流水线冒险是指在指令流水线的执行过程中由于数据相关性或控制相关性而导致的一种性能问题。指令流水线是将指令执行过程划分为多个阶段,这样可以同时处理多条指令,从而提高指令执行的效率。然而,流水线执行…

新版云进销存ERP销售库存仓库员工管理系统源码

新版云进销存ERP销售库存仓库员工管理系统源码 系统介绍:2022版本,带合同报价单打印,修复子账号不显示新加客户的BUG,还有其他方面的优化。 简单方便。 功能强大,系统采用phpMYSQL开发,B/S架构,方便随地使用…

怎么找微信服务器的IP地址

首先,让微信客户端在PC端运行,在任务管理器->详细信息中,找到WeChat.exe的进程,找到PID 就是微信进程的ID号,如下图所示: 打开一个命令行窗口,cmd或者powershell窗口都可以,输入…

互联网医院系统|北京线上问诊|线上问诊系统功能解析

随着科技的不断发展,线上问诊系统作为一种快速、便捷的医疗服务方式在近年来越来越受欢迎。本文将重点介绍线上问诊系统的开发功能及其优势,帮助读者更好地了解这一医疗服务方式的价值和好处。 一、线上问诊系统的开发功能: 1、患者注册与登…

【2023年度回顾】让我们在新的一年继续努力前行

每当我们在努力的时候都会想:为什么我要努力?躺着不舒服吗? 大家好!我是命运之光,一名普普通通的计算机科学与技术专业的大三学生。 📕回顾一下整个2023年 因为我有每天发朋友圈的习惯,所以这一…

二分搜索边界问题的简单结论

引言 二分搜索是一个说简单也很简单(代码很固定,也没几行),说难也很难(边界问题可能会让人想不太清楚)。 事实上,边界问题也是是算法题中普遍存在的难点。 这篇文章讲两个简单的结论&#xff0…

Head First Design Patterns -工厂模式

什么是工厂模式 工厂方法模式定义了一个创建对象的接口,但由子类来决定要实例化那个类。工厂方法让类把实例化推迟到了子类。 为什么要有工厂模式 书中以pizza店制作pizza为例子,假设不用工厂模式,在制作pizza阶段我们需要这样去实例化类&am…

Python--装饰器

在 Python 中,装饰器是一种特殊类型的函数,它们用于修改或增强其他函数或方法的行为。装饰器本质上是一个函数,它接受一个函数作为参数,并返回一个新的函数。使用装饰器可以在不修改原函数代码的前提下,给函数添加新的…

visio实现背景透明图片的最简单方法

visio实现背景透明图片的最简单方法 导出中选择PNG格式 保存之后会弹出如下的对话框: 主要的修改包括三处:1.数据格式-逐行扫描 2.背景色(与你的visio中使用的颜色不重合的颜色)3.选中透明度颜色 透明度颜色选择与背景色相同的颜…

C#MQTT编程01--MQTT介绍

1、前言 近年来物联网的发展如火如荼已经渗透到我们生活的方方面面。从智能家居到工业自动化从智慧城市到智慧农业物联网,正在以前所未有的速度改变着我们的生活。 大家现在可能已经习惯了通过手机控制家里的灯光、空调和电视,这就是物联网在智能家居领域…

C#编程-了解进程的通信

了解进程的通信 逻辑上一个应用程序内的所有线程都包含在进程内。这是应用程序运行的操作系统单元。进程是程序的一个运行实例。运行时在同一计算机内或通过网络的进程间通信被称为进程内通信。要允许进程间通信,需要使用特殊的技术和机制。 考虑一个您打文档的场景。您使用…