Zookeeper的监听机制及原理解析

news2024/9/30 0:04:39

系列文章目录

手把手教你安装Zookeeper 及可视化插件ZooInspector、ZKUI
Zookeeper入门篇,了解ZK存储特点

使用Zookeeper的监听及原理解析

  • 系列文章目录
  • 前言
  • 一、监听机制的基本概念
  • 二、Zookeeper监听原理
    • 1. 事件类型
    • 2. 监听模式与监听器类型
      • (1)监听模式
      • (2)监听器类型
    • 3. 监听原理
      • (1)基础概念
      • (2)监听触发处理
  • 三、Zookeeper监听的使用Demo


前言

在这里插入图片描述

ZK在现在之所以能非常好用,它便捷的监听功能是很重要的,本次我们就以监听为题,分析一下ZK的监听是怎么设计和管理的,并在文末写了个demo验证我们的所学

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 Zookeeper 专栏,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙 mysql Redis dubbo docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待


一、监听机制的基本概念

其实对于监听,我们并不陌生,我们曽在 Spring专栏 中提到过 《Spring监听器用法与原理详解》,其主要是基于观察者模式,如下图就是一个经典的观察者模型

在这里插入图片描述

Zookeeper的监听机制其实也是基于观察者模式,这种模式允许客户端在数据节点发生变化时得到通知。

而且最通俗的解释就是,当我想监听某个主题的变动时,就会向该主题登记一个观察者。最后当主题真的触发时,就遍历观察者列表,向每个观察者通知该事件。

二、Zookeeper监听原理

1. 事件类型

不管什么监听器,肯定都有自己想监听的内容,也即监听事件。只有当我想看的事件被触发时,才会让我的监听器有所反应。而ZK则提供了以下几种事件类型

  • NodeCreated:节点创建
  • NodeDeleted:节点被删除
  • NodeDataChanged:节点数据变更
  • NodeChildrenChanged:子节点变更
  • DataWatchRemoved:数据监听器被移除
  • ChildWatchRemoved:子节点监听器被移除
  • PersistentWatchRemoved 永久化监听器被移除

需要注意的是,事件操作并不是一回事。比如我们新增一个节点。它其实会触发当前节点的节点创建 和其父节点的子节点变更 两个事件。

同样,我们也不难发现,前4个事件是针对节点进行变更的事件,也是我们最常用的。而后面3种其实是监听器移除事件

2. 监听模式与监听器类型

(1)监听模式

明白了事件,我们再来看一下针对这些事件,我们能用怎样的方式来监听,也即监听模式

  • STANDARD 标准监听
  • PERSISTENT 永久监听
  • PERSISTENT_RECURSIVE 永久递归监听

所谓标准监听,其实就是某个节点的监听器一旦被触发了,这个监听器就会被删除,也就是所谓“一次性”监听。
永久监听就是永久存在,不会被删,可以一直触发。而永久递归监听则代表这个监听器不仅可以监听这个节点的事件,还能监听到该节点的所有子节点的事件,而且可以永久存在。

需要注意的是,监听模式可以叠加出不同的监听状态,比如说一个永久递归的监听器,可以再给他加一个标准监听,此时如果再删除永久递归监听器,那么还能够剩下一个标准监听器在工作,具体原理在源码的 WatchStats 部分

public final class WatchStats {
    private static final WatchStats[] WATCH_STATS = new WatchStats[] {
            new WatchStats(0), // NONE
            new WatchStats(1), // STANDARD
            new WatchStats(2), // PERSISTENT
            new WatchStats(3), // STANDARD + PERSISTENT
            new WatchStats(4), // PERSISTENT_RECURSIVE
            new WatchStats(5), // STANDARD + PERSISTENT_RECURSIVE
            new WatchStats(6), // PERSISTENT + PERSISTENT_RECURSIVE
            new WatchStats(7), // STANDARD + PERSISTENT + PERSISTENT_RECURSIVE
    };

    /**
     * Stats that have no watchers attached.
     *
     * <p>This could be used as start point to compute new stats using {@link #addMode(WatcherMode)}.
     */
    public static final WatchStats NONE = WATCH_STATS[0];

    private final int flags;

    private WatchStats(int flags) {
        this.flags = flags;
    }

    private static int modeToFlag(WatcherMode mode) {
        return 1 << mode.ordinal();
    }

    /**
     * Compute stats after given mode attached to node.
     *
     * @param mode watcher mode
     * @return a new stats if given mode is not attached to this node before, otherwise old stats
     */
    public WatchStats addMode(WatcherMode mode) {
        int flags = this.flags | modeToFlag(mode);
        return WATCH_STATS[flags];
    }

    public WatchStats removeMode(WatcherMode mode) {
        int mask = ~modeToFlag(mode);
        int flags = this.flags & mask;
        if (flags == 0) {
            return NONE;
        }
        return WATCH_STATS[flags];
    }

    /**
     * Check whether given mode is attached to this node.
     *
     * @param mode watcher mode
     * @return true if given mode is attached to this node.
     */
    public boolean hasMode(WatcherMode mode) {
        int flags = modeToFlag(mode);
        return (this.flags & flags) != 0;
    }
}

(2)监听器类型

需要注意的是,知道了所有的事件类型,以及能选择的监听的模式。其实监听器怎么弄,完全取决于你,理论上能做出 m * n 种类型,但ZK在源码中其实做了归纳,只提供了五种类型

  • Children: 子节点监听器
  • Data: 数据监听器
  • Persistent: 永久监听器
  • PersistentRecursive:永久递归监听器
  • Any 所有监听器

其中 ChildrenDataAny 是最开始提供的,也是非常容易理解,因为对事件我们也能归纳为 子节点事件数据事件,所以监听器归纳成 子节点监听器数据监听器 很合理。而PersistentPersistentRecursive 监听器则是在在后续加上的。主要是因为只归纳成这几类的话,如果想要单独删除永久化的监听器就没法做了。加入了这样的枚举后,就能指定更具体的监听器类型进行删除了。

在这里插入图片描述

3. 监听原理

(1)基础概念

知晓了事件类型 与 监听器类型 后,我们再来讲讲监听原理,其实监听整理起来主要就是两个结构和三个步骤。

因为节点和监听器是多对多的关系,一个节点能被多个监听器监听,一个监听器也能监听多个节点。所以两个结构就分别从节点、监听器的角度来对监听关系进行归纳,在源码中就是两个 HashMap:watchTablewatch2Paths

// key 为某个节点的具体路径, value 为该节点的所有监听器集合
private final Map<String, Set<Watcher>> watchTable = new HashMap<>();

// key 为某个监听器,value 值为该监听器在不同路径下的监听状态
private final Map<Watcher, Map<String, WatchStats>> watch2Paths = new HashMap<>();

而三个步骤其实也很简单:

  1. 客户端注册监听器: 客户端通过创建一个监听器(Watcher)并将其注册到Zookeeper服务器上的指定节点上。
    在这里插入图片描述

  2. 节点变更通知: 当节点发生变化时(如节点数据被修改、节点被创建或删除等),Zookeeper服务器会将变更通知发送给所有对该节点注册了监听器的客户端。同时处理该监听器,如下图,标准监听器watch 1被触发后会在该节点上被删除,而永久监听器watch还能继续留存。

在这里插入图片描述

  1. 客户端处理节点变更: 客户端在收到节点变更通知后,会调用注册的监听器进行处理。客户端可以根据具体的业务需求,对节点变更进行相应的处理逻辑,如重新读取节点数据、重新注册监听器等。Watch 接口如下:

在这里插入图片描述

不难看出,Zookeeper监听机制的核心是Watcher接口通知机制。从整个流程来说,我们可以细分为3个步骤:

Watcher接口:Watcher接口是Zookeeper提供的一个回调接口,在客户端注册监听器时需要实现该接口。该接口中只有一个process方法,当节点发生变化时,Zookeeper会调用该方法通知客户端。

通知机制:Zookeeper的通知机制是基于事件触发的。当注册了Watcher的节点发生变化时,Zookeeper会生成一个事件,并将该事件放入事件队列中。客户端线程会从事件队列中获取事件并进行处理。

(2)监听触发处理

我们看一下,当某个节点发生变动后,它是怎么找到该节点的监听器,并触发它的。我们直接看源码并配上注释

    // WatchManager.java
/**
 * 触发watch事件
 *
 * @param path    节点路径
 * @param type    事件类型
 * @param zxid    事务ID
 * @param acl     节点的ACL列表
 * @param supress 指定不触发的Watcher
 * @return 返回触发事件的Watcher或者BitSet
 */
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, List<ACL> acl, WatcherOrBitSet supress) {
    // 创建WatchedEvent对象
    WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);
    
    // 创建Watcher集合
    Set<Watcher> watchers = new HashSet<>();
    
    synchronized (this) {
        // 遍历节点路径的父路径,因为父路径上可能有递归的监听器,也需要监听到此事件
        PathParentIterator pathParentIterator = getPathParentIterator(path);
        for (String localPath : pathParentIterator.asIterable()) {
            // 获取对应路径的Watcher集合
            Set<Watcher> thisWatchers = watchTable.get(localPath);
            
            // 如果Watcher集合为空,直接跳过
            if (thisWatchers == null || thisWatchers.isEmpty()) {
                continue;
            }
            
            // 遍历Watcher集合
            Iterator<Watcher> iterator = thisWatchers.iterator();
            while (iterator.hasNext()) {
                Watcher watcher = iterator.next();
                
                // 获取Watcher对应的路径映射
                Map<String, WatchStats> paths = watch2Paths.getOrDefault(watcher, Collections.emptyMap());
                
                // 获取Watcher对应路径的状态
                WatchStats stats = paths.get(localPath);
                
                // 如果状态为空,输出警告日志并跳过
                if (stats == null) {
                    LOG.warn("inconsistent watch table for watcher {}, {} not in path list", watcher, localPath);
                    continue;
                }
                
                // 如果不是在父路径上,则添加Watcher
                if (!pathParentIterator.atParentPath()) {
                    watchers.add(watcher);
                    
                    // 【【【重要】】】:移除STANDARD模式的状态,而不是触发就删除,可见 2(1)的监听模式
                    WatchStats newStats = stats.removeMode(WatcherMode.STANDARD);
                    
                    // 如果新的状态为空,则移除Watcher和路径映射
                    if (newStats == WatchStats.NONE) {
                        iterator.remove();
                        paths.remove(localPath);
                    } else if (newStats != stats) {
                        paths.put(localPath, newStats);
                    }
                } else if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
                    // 父路径当前只会有永久递归监听器能响应子节点事件,且响应完不会删除监听器,所以直接把该监听器触发,不用做其他操作
                    watchers.add(watcher);
                }
            }
            
            // 如果Watcher集合为空,从watchTable中移除该路径
            if (thisWatchers.isEmpty()) {
                watchTable.remove(localPath);
            }
        }
    }
    
    // 如果Watcher集合为空,返回null
    if (watchers.isEmpty()) {
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path);
        }
        return null;
    }
    
    // 触发Watcher的事件处理方法
    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        if (w instanceof ServerWatcher) {
            ((ServerWatcher) w).process(e, acl);
        } else {
            w.process(e);
        }
    }
    
    // 根据事件类型更新服务器指标
    switch (type) {
        case NodeCreated:
            ServerMetrics.getMetrics().NODE_CREATED_WATCHER.add(watchers.size());
            break;
        case NodeDeleted:
            ServerMetrics.getMetrics().NODE_DELETED_WATCHER.add(watchers.size());
            break;
        case NodeDataChanged:
            ServerMetrics.getMetrics().NODE_CHANGED_WATCHER.add(watchers.size());
            break;
        case NodeChildrenChanged:
            ServerMetrics.getMetrics().NODE_CHILDREN_WATCHER.add(watchers.size());
            break;
        default:
            // Other types not logged.
            break;
    }
    
    // 返回Watcher或BitSet
    return new WatcherOrBitSet(watchers);
}

概括一下上述代码的逻辑:

    1. 首先,根据提供的节点路径,遍历该节点的所有父路径。
    1. 对于本路径及其每个父路径,获取与之关联的Watcher集合。如果Watcher集合为空,直接跳过。
    1. 逐个遍历Watcher集合中的Watcher对象,并根据其关联的路径状态来判断是否应该触发监听事件。
      – 如果当前不在父路径上,则添加Watcher到触发集合中,并更新该路径的状态。
      – 如果是父路径上的递归Persistent_Watcher模式,则添加Watcher到触发集合中。
    1. 如果触发集合中的Watcher为空,则结束。
    1. 逐个触发Watcher集合中的Watcher对象的事件处理方法,传入相应的参数。
    1. 根据事件的类型,更新服务器指标(例如,节点被创建的监听器数量)。
    1. 返回触发事件的Watcher集合或者BitSet对象

这里还有一点细节是比较有趣的,就是监听器虽然依附于节点,但并不意味着如果我们删除了节点,该节点的监听器就一定会消失。比如你在某节点设置了一个永久监听器,即使这个结点被删除了,它在 watchTablewatch2Paths 中也不会被删除,即节点与监听器的关联还在,所以当节点后续又被创建的时候,这个监听器仍然可以使用

三、Zookeeper监听的使用Demo

现在我们使用一个Demo来使用一下ZK的监听功能,我们打算使用一个永久监听器监听多个节点

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;

public class ZooKeeperListenerDemo implements Watcher {
    private static final String ZK_SERVER = "localhost:2181";
    private static final int SESSION_TIMEOUT = 3000;
    private static final String[] PATHS = {"/zhanfu", "/zhanfu2"}; // 需要监听的路径

    private ZooKeeper zooKeeper;

    public static void main(String[] args) {
        ZooKeeperListenerDemo listenerDemo = new ZooKeeperListenerDemo();
        listenerDemo.connectZooKeeper();
        listenerDemo.registerWatchers();

        // 测试监听器,程序会一直运行,直到被中断
        try {
            Thread.sleep(Long.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        listenerDemo.close();
    }

    // 连接ZooKeeper服务器
    public void connectZooKeeper() {
        try {
            zooKeeper = new ZooKeeper(ZK_SERVER, SESSION_TIMEOUT, this);
            System.out.println("Connected to ZooKeeper server: " + ZK_SERVER);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // 注册监听器
    public void registerWatchers() {
        for (String path : PATHS) {
            try {
                // AddWatchMode.PERSISTENT 即为永久监听器
                zooKeeper.addWatch(path, this, AddWatchMode.PERSISTENT);
                System.out.println("Registered watcher for path: " + path);
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    // 处理ZooKeeper事件
    public void process(WatchedEvent event) {
        if (event.getState() == Event.KeeperState.SyncConnected) {
            if (event.getType() == Event.EventType.None && event.getPath() == null) {
                System.out.println("Connected to ZooKeeper server");
            } else {
                // 一般监听器只会处理部分事件,这里就不做限制,只打印日志
                System.out.println("Event received: " + event.getType() + ", path: " + event.getPath());
                // 在此处理收到的事件
            }
        }
    }

    // 关闭ZooKeeper连接
    public void close() {
        try {
            zooKeeper.close();
            System.out.println("ZooKeeper connection closed.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

最后我们运行一下,并修改节点内容、删除再重新添加节点,可以看到该监听器为永久的,一直在生效

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1991526.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深入理解计算机系统 CSAPP lab:bomb

实验资源下载地址&#xff1a;csapp.cs.cmu.edu/3e/labs.html 请先查看writeup 解压后 当我们运行bomb时,发现该程序要求我们输入行,如果输入错误,程序就会返回BOOM!!!提示我们失败了. 所以我们的目标是输入正确的行.以解开bomb程序. 实验前先详细阅读bomb.c //bomb.c /*****…

6.1 模块的导入与使用:Python的秘密武器

欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;欢迎订阅相关专栏&#xff1a; 工&#x1f497;重&#x1f497;hao&#x1f497;&#xff1a;野老杂谈 ⭐️ 全网最全IT互联网公司面试宝典&#xff1a;收集整理全网各大IT互联网公司技术、项目、HR面试真题.…

Java语言程序设计基础篇_编程练习题*16.12(演示TextArea的属性)

目录 题目&#xff1a;*16.12&#xff08;演示TextArea的属性&#xff09; 习题思路&#xff1a; 代码示例 结果展示 题目&#xff1a;*16.12&#xff08;演示TextArea的属性&#xff09; 编写一个程序&#xff0c;演示文本域的属性。程序使用复选框表明文本是否换行&#xf…

Java面试题--JVM大厂篇之JVM监控与GC日志分析:优化Parallel GC性能的重要工具

目录 引言&#xff1a; 正文&#xff1a; 1. 理解GC日志的重要性 2. 启用GC日志 3. GC日志解析 4. JVM监控工具 5. 调优Parallel GC的实战技巧 痛点一&#xff1a;长时间停顿 痛点二&#xff1a;频繁的GC 痛点三&#xff1a;内存溢出 6. 实战案例分享 结束语&#…

linux进程----匿名管道和命名管道

linux进程----匿名管道和命名管道 在Linux中&#xff0c;管道是用于进程间通信的一种机制&#xff0c;可以分为两种类型&#xff1a;匿名管道&#xff08;也称为匿名fifo&#xff09;和命名管道&#xff08;也称为命名fifo或named pipe&#xff09;。 匿名管道&#xff08;An…

如何解决整数溢出问题?

1、问题解析 当以整数数据类型&#xff08;包括字节、短、长和其他类型&#xff09;存储的值过大&#xff08;大于变量可容纳的最大值&#xff09;的值时&#xff0c;将发生整数溢出&#xff08;或环 绕&#xff09;。整数的最高有效位丢失&#xff0c;而其余值则相对于最小值…

Centos安装OpenSearch

Centos安装OpenSearch 下载并安装OpenSearch下载OpenSearch RPM包导入公共GNU Privacy Guard&#xff08;GPG&#xff09;密钥。此密钥验证您的OpenSearch实例是否已签名安装RPM包安装完设置开机自启动OpenSearch启动OpenSearch验证OpenSearch是否正确启动 测试OpenSearch向服务…

【lvs】超干货,包含理论+实验(详细步骤)

lvs理论部分 LVS&#xff08;Linux Virtual Server&#xff09;是Linux虚拟服务器的简称&#xff0c;是一个基于Linux操作系统的虚拟服务器集群系统。LVS主要用于实现负载均衡和高可用性&#xff0c;通过将客户端的请求分发到多台后端服务器上&#xff0c;从而提高整体服务的处…

JSON + AJAX + ThreadLocal

JSON数据交换 规则 JSON对象和字符串转换 <script type"text/javascript">var jsonPerson {"name": "jack","age": 20}console.log(jsonPerson);var strPerson JSON.stringify(jsonPerson);//对jsonPerson没有影响console.lo…

文件系统 --- 软硬链接

序言 经过上一篇的学习 在磁盘中的文件&#xff0c;我们大致了解了磁盘的物理结构&#xff0c;逻辑抽象结构以及文件在磁盘上的存储方式。在这篇文章中&#xff0c;我们会基于上一篇文章的部分知识点来介绍软链接&#xff0c;硬链接。 1. 软链接 1.1 软链接的概念 软链接是一个…

vue3+vite+ts 颜色选择器组件支持颜色吸取,透明度

ciw-color-picker-vue&#xff1a; vue3 vite ts 颜色选择器,支持颜色吸取,透明度&#xff0c;与浏览器原生颜色选择器相似,与饿了么颜色选择器相似 使用了ciw-color-picker-vue npm i ciw-color-picker-vue 安装 npm i ciw-color-picker-vue 全局引入方式 main.ts 或 m…

光伏检测室外气象站的重要性

在光伏产业的快速发展中&#xff0c;光伏检测室外气象站的重要性日益凸显。它不仅是光伏电站运维管理的“眼睛”&#xff0c;更是确保电站高效、稳定运行的关键因素。 首先&#xff0c;光伏检测室外气象站能够实时、准确地监测多种关键气象要素&#xff0c;如太阳辐射、风速、风…

【中间件】Redis从入门到精通-黑马点评综合实战

文章目录 一&#xff1a;Redis基础1.Redis是什么2.初识Redis3.Redis的数据结构A.通用命令B.String类型C.Key的层级格式D.Hash类型E.List类型F.Set类型G.SortedSet类型 二&#xff1a;Redis的Java客户端1.JedisA.引入依赖B.建立连接C.测试JedisD.释放资源 2.Jedis连接池3.Spring…

MM 13 -采购- 退货

思维导图 说明 情形1 直接冲销物料凭证 102 情形2 返货部分交货 情形3 退货PR贷项凭证 情形1 适用于已收货未开票&#xff0c;或者发票还可冲销的当月&#xff0c;冲销物料凭证所有数量 情形2 适用于跟情形1 一样&#xff0c;只是可以修改退货数量 情形3 如果已经跨越…

html+css+js网页设计字节跳动11个页面带js 效果很多

htmlcssjs网页设计字节跳动11个页面带js 效果很多 ui还原度100% 网页作品代码简单&#xff0c;可使用任意HTML编辑软件&#xff08;如&#xff1a;Dreamweaver、HBuilder、Vscode 、Sublime 、Webstorm、Text 、Notepad 等任意html编辑软件进行运行及修改编辑等操作&#xff0…

笔试练习day3

目录 BC149 简写单词题目解析代码 dd爱框框题目解析解析代码方法一暴力解法方法二同向双指针(滑动窗口) 除2!题目解析解法模拟贪心堆 感谢各位大佬对我的支持,如果我的文章对你有用,欢迎点击以下链接 &#x1f412;&#x1f412;&#x1f412; 个人主页 &#x1f978;&#x1…

【网络安全】玲珑安全第四期

鉴于玲珑安全漏洞挖掘前三期课程取得的优异成绩和获得的强烈反响,我们决定启动玲珑安全第四期漏洞挖掘培训计划。 文章目录 往期学员收获基础学员报喜(部分)课程反馈第四期课程课程内容免费课程往期学员收获 第一期课程总结及学员收获:->点我查看第一期学员收获<- …

leetcode 图论专题——(dfs+bfs+并查集 回顾)

DFS、BFS 回顾&#xff08;C语言代码&#xff09; map[i][j]里记录的是i点和j点的连接关系 基本DFS&#xff1a; int vis[101],n,map[101][101]; void dfs(int t) {int i;vis[t]1;for(i0;i<n;i)//找对t点所有有关联的点——“找路”{if(vis[i]!0&&map[t][i]1)//有…

怎么将jar注册为windows系统服务详细操作

将spring boot项目编译成jar,注册为windows系统服务 在网上了解到,winsw这个开源项目,去github看了下,作者常年维护更新,文档齐全,拥有不少,自己写了个小demo体验了下还不错,然后又运行了一个晚上,没啥问题,遂决定采用它 开源地址 源库地址 https://github.com/winsw/winsw R…

string类简单的底层实现,了解string底层以及string的补充知识

string类的简单实现 头文件 #define _CRT_SECURE_NO_WARNINGS 1 #pragma once #include<iostream> #include<assert.h> using namespace std; namespace exprience {class string {public:typedef char* iterator;iterator begin(){return _str;}iterator end()…