Zookeeper:实现“通知协调”的 Demo

news2025/1/17 5:48:54

应用配置集中到节点上,应用启动时主动获取,并在节点上注册一个 watcher,每次配置更新都会通知到应用。数据发布/订阅(Publish/Subscribe)系统,即所谓的配置中心,顾名思义就是发布者将数据发布到 ZooKeeper 的一个或一系列节点上,供订阅者进行数据订阅,进而达到动态获取数据的目的,实现配置信息的集中式管理和数据的动态更新。

本篇内容包括:Demo 概述、代码实现、测试结果


文章目录

    • 一、Demo 概述
        • 1、关于 zookeeper “通知协调”
        • 2、Demo 设计
        • 3、Demo 前提
    • 二、代码实现
        • 1、引用 Maven 依赖
        • 2、ConnectionWatcher 类创建 Zookeeper 连接
        • 3、ActiveKeyValueStore 类读写 Zookeeper 数据
        • 4、ConfigUpdater 类发布数据信息
        • 5、ConfigWatcher 类订阅数据信息
    • 三、测试结果
        • 1、ConfigUpdater 打印内容
        • 2、ConfigWatcher 打印内容


一、Demo 概述

1、关于 zookeeper “通知协调”

应用配置集中到节点上,应用启动时主动获取,并在节点上注册一个 watcher,每次配置更新都会通知到应用。

数据发布/订阅(Publish/Subscribe)系统,即所谓的配置中心,顾名思义就是发布者将数据发布到 ZooKeeper 的一个或一系列节点上,供订阅者进行数据订阅,进而达到动态获取数据的目的,实现配置信息的集中式管理和数据的动态更新。

2、Demo 设计

采用发布/订阅模式将配置信息发布到 Zookeeper 节点上,供订阅者动态获取数据:

Zookeeper订阅发布Demo
  1. 首先需要启动 Zookeeper 服务,规划集群配置信息存放的节点 /config;
  2. 然后通过 ConfigWatcher 类更新 /config 节点注册监视器 watcher,监控集群配置信息变化;
  3. 最后通过 ConfigUpdater 类不断更新 /config 节点配置信息,从而模拟实现集群配置信息订阅发布效果。

3、Demo 前提

参考:Mac通过Docker安装Zookeeper集群


二、代码实现

1、引用 Maven 依赖

        <!--    选择对应的Zookeeper版本    -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.7.0</version>
        </dependency>

2、ConnectionWatcher 类创建 Zookeeper 连接

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class ConnectionWatcher implements Watcher {

    private final CountDownLatch connectedSignal = new CountDownLatch(1);
    private static final int SESSION_TIMEOUT = 5000;
    protected ZooKeeper zk;


    public void connect(String hosts) throws IOException, InterruptedException {
        zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
        connectedSignal.await();
    }

    @Override
    public void process(WatchedEvent event) {
        if (event.getState() == Event.KeeperState.SyncConnected) {
            connectedSignal.countDown();
        }
    }

    public void close() throws InterruptedException {
        zk.close();
    }

}

3、ActiveKeyValueStore 类读写 Zookeeper 数据

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

public class ActiveKeyValueStore extends ConnectionWatcher {

    private static final Charset CHARSET = StandardCharsets.UTF_8;

    /**
     * 读取节点数据
     *
     * @param path  节点地址
     * @param value 数据值
     * @throws InterruptedException 中断异常
     * @throws KeeperException ZooKeeper异常
     */
    public void write(String path, String value) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(path, false);
        if (stat == null) {
            if (value == null) {
                zk.create(path, null,
                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            } else {
                zk.create(path, value.getBytes(CHARSET),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }

        } else {
            if (value == null) {
                zk.setData(path, null, -1);
            } else {
                zk.setData(path, value.getBytes(CHARSET), -1);
            }

        }
    }

    /**
     * 读取节点数据
     *
     * @param path 节点地址
     * @param watcher watcher
     * @return 数据值
     * @throws InterruptedException 中断异常
     * @throws KeeperException ZooKeeper异常
     */
    public String read(String path, Watcher watcher) throws InterruptedException, KeeperException {
        /* stat */
        byte[] data = zk.getData(path, watcher, null);
        return new String(data, CHARSET);
    }
}

4、ConfigUpdater 类发布数据信息

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.apache.zookeeper.KeeperException;

public class ConfigUpdater {
    public static final String PATH = "/configuration";
    private final ActiveKeyValueStore store;
    private final Random random = new Random();

    public ConfigUpdater(String hosts) throws IOException, InterruptedException {
        //定义一个类
        store = new ActiveKeyValueStore();
        //连接Zookeeper
        store.connect(hosts);
    }

    public void run() throws InterruptedException, KeeperException {
        // noinspection InfiniteLoopStatement
        while (true) {
            String value = random.nextInt(100) + "";
            //向 ZNode 写数据(也可以将xml文件写进去)
            store.write(PATH, value);
            System.out.printf("Set %s to %s\n", PATH, value);
            TimeUnit.SECONDS.sleep(random.nextInt(10));
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        String hosts = "localhost:2181";
        ConfigUpdater updater = new ConfigUpdater(hosts);
        updater.run();
    }
}

5、ConfigWatcher 类订阅数据信息

import java.io.IOException;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

public class ConfigWatcher implements Watcher {
    private final ActiveKeyValueStore store;

    public ConfigWatcher(String hosts) throws InterruptedException, IOException {
        store = new ActiveKeyValueStore();
        //连接Zookeeper
        store.connect(hosts);
    }

    public void displayConfig() throws InterruptedException, KeeperException {
        String value = store.read(ConfigUpdater.PATH, this);
        System.out.printf("Read %s as %s\n", ConfigUpdater.PATH, value);

    }

    @Override
    public void process(WatchedEvent event) {
        System.out.printf("Process incoming event: %s\n", event.toString());
        if (event.getType() == Event.EventType.NodeDataChanged) {
            try {
                displayConfig();
            } catch (InterruptedException e) {
                System.err.println("Interrupted. Exiting");
                Thread.currentThread().interrupt();
            } catch (KeeperException e) {
                System.err.printf("KeeperException: %s. Exiting.\n", e);
            }
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        String hosts = "localhost:2181";
        //创建 watcher
        ConfigWatcher watcher = new ConfigWatcher(hosts);
        //调用 display 方法
        watcher.displayConfig();
        //然后一直处于监控状态
        Thread.sleep(Long.MAX_VALUE);
    }
}

三、测试结果

1、ConfigUpdater 打印内容

Set /configuration to 76
Set /configuration to 55
Set /configuration to 13
...

2、ConfigWatcher 打印内容

Read /configuration as 76
Read /configuration as 55
Read /configuration as 13
...

通过 ConfigUpdater 发布的信息以及 ConfigWatcher 监控得到的信息可以看出,已经成功模拟实现集群配置信息的订阅发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/23875.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[机器翻译]——pivot-based zero-shot translation based on fairseq

文章目录前言翻译到en生成"伪"的、到英语的数据文件把每一个zs语言对翻译到en从fairseq-generate生成的文件中&#xff0c;抽取纯en文件把en数据和所有zs语言对的tgt数据形成平行语料&#xff0c;然后做预处理形成en到tgt的平行语料预处理在en到tgt语言的"伪&qu…

IMC附录A

目录 A.1 恒等式与不等式 THEOREM A.1 (Binomial expansion theorem) PROPOSITION A.2 PROPOSITION A.3 PROPOSITION A.4 A.2 渐进符号 DEFINITION A.5 A.3 概率论基础 PROPOSITION A.7 (Union Bound) THEOREM A.8 (Bayes’ Theorem) PROPOSITION A.9 PROPOSITI…

图扑 Web SCADA 零代码组态水泥生产工艺流程 HMI

水泥是建筑工业三大基本材料之一&#xff0c;素有“建筑工业的粮食”之称。2022 年 1-9 月水泥产量为 15.63 亿吨&#xff0c;生产方法包括新型干法、立窑、湿窑、干法中空窑和立波尔窑等。 水泥生产线链条长、关键环节多的特性要求执行严密的流程监控。图扑软件大屏组态、UI 组…

国内家具行业数据浅析

大家好&#xff0c;这里是小安说网控。 家具是国民消费必需品之一&#xff0c;受疫情影响&#xff0c;近期销量数据不佳。2022年上半年&#xff0c;规模以上家具制造业企业营业收入3604亿元&#xff0c;同比下降4%&#xff1b;实现利润总额174.8亿元&#xff0c;同比增长2.6%。…

数据结构排序算法之冒泡排序

一 相关概念 稳定排序&#xff1a;如果原数据中a在b之前&#xff0c;而且ab&#xff0c;排序后a任然在b之前 不稳定排序&#xff1a;如果原数据中a在b之前&#xff0c;而且ab&#xff0c;排序后a在b之后 时间复杂度&#xff1a;对排序数据的总的操作次数&#xff0c;反映当n变…

安装Ruby和安装Rails详细步骤详解

rbenv安装Ruby rbenv可以管理多个版本的ruby。可以分为3种范围(或者说不同生效作用域)的版本&#xff1a; local版&#xff1a;本地&#xff0c;针对各项目范围(只在某个目录下有效) global版&#xff1a;全局&#xff0c;没有shell和local版时使用global版 shell版&#xf…

[MySQL]-删库后恢复

[MySQL]-删库后恢复 sen格 | 2022年11月 本文旨在记录个人在数据库的删库恢复演练过程中的一些总结&#xff0c;如有不足&#xff0c;欢迎指正。 一、恢复场景 1&#xff09;假设生产实例MySQL端口为&#xff1a;3306 2&#xff09;本地实例MySQL端口为&#xff1a;3307 在这…

pytorch快速入门

文章目录一、Tensorstensors的初始化(四种):tensors的属性和numpy的联系二、数据集的数据加载器加载数据集标号和可视化自己创建数据集用DataLoaders准备数据用于训练Transforms三、神经网络准备训练设备定义网络的类模型的layersnn.Flattennn.Linearnn.ReLUnn.Sequentialnn.So…

Python之基本扩展模块

一、datetime模块 1.1 主要的模块 datetime.date() #处理日期&#xff08;年、月、日&#xff09; datetime.time() #处理时间&#xff08;时、分、秒和毫秒&#xff09; datetime.datetime() #处理日期时间 datetime.timedelta() #处理时段&#xff08;时间间隔…

基于Java+springboot+SSM的医疗报销系统的设计与实现

项目开发工具: IDEA, MYSQL, JDK1.8 项目使用技术: SpringBoot, SSM, H-UI, JSP, JQUERY, HTML 医疗报销系统【功能列表】 【前台用户】登录,注册,首页新闻轮播图,首页新闻按分类展示列表,栏目分类模块,报销流程模块,修改密码,个人信息展示,新增家庭成员, 家庭成品列表展示,…

Spring框架教程

Spring框架教程Spring框架教程1. 前言2. Spring框架概述2.1 什么是spring?2.2 Spring有哪些优点&#xff1f;2.3 Spring 有两个核心部分&#xff1a;IoC 和AOP2.4 Spring 特点2.5 Spring架构图&#xff0c;Spring由哪些模块组成&#xff1f;3. IOC容器3.1 IOC底层原理3.2 什么…

three.js初时基础

第一步&#xff1a;找到Three.js – JavaScript 3D Library (threejs.org) 第二步 第三步: 第四步&#xff1a; 安装依赖 第五步&#xff1a;新建一个项目文件&#xff0c;在文件中npm init 进行初始化出现一个package.json 第六步&#xff1a;配置安装&#x1f680; 快速开…

QtAV环境配置

本文章主要是使用MSVC编译器&#xff0c;因为QtAV是依赖FFmpeg的&#xff0c;所以需要下载QtAV源码和QtAV-depends-windows-x86x64&#xff1b; 官网地址&#xff1a;http://www.qtav.org/ Github 地址&#xff1a;https://github.com/wang-bin/QtAV 1&#xff0c;解压 将文件…

产品生命周期(PLM)发展历程及技术核心分析指导

产品生命周期管理(Product Lifecycle Management&#xff0c;简称PLM)&#xff0c;是一种为企业产品全生命周期提供服务的软件解决方案&#xff0c;可以应用于在单一地点或分散在多个地点的企业内部&#xff0c;以及在产品研发领域&#xff0c;具有协作关系的企业之间&#xff…

windows 锁屏时执行某个程序

目录 前言 1 打开锁屏事件 2 创建任务计划程序 3 测试 前言 以windows10为例&#xff0c;这个功能的核心是使用windows自带的“任务计划程序”&#xff0c;可以帮助您实现触发器操作。 1 打开锁屏事件 默认情况下&#xff0c;锁屏事件并不会被系统记录&#xff0c;需要手动打…

vue3项目的创建、入口文件、全局方法、生命周期函数、setup中的生命周期函数使用、data的函数方式

文章目录1. 创建vue3项目1.1 基于webpack的工程创建1.2 通过vite来创建vue3项目vue3插件推荐1.3 通过npm init vue3创建项目2. vue3入口文件3. vue3中的全局方法修改4. vue3中封装全局方法5. vue3生命周期函数6. setup中生命周期使用7. data函数方式1. 创建vue3项目 1.1 基于w…

Spring Boot中消息是什么?同步异步消息是啥/都包含那些技术?Activate MQ消息怎么整合

写在前面&#xff1a; 继续记录自己的SpringBoot学习之旅&#xff0c;这次是SpringBoot应用相关知识学习记录。若看不懂则建议先看前几篇博客&#xff0c;详细代码可在我的Gitee仓库SpringBoot克隆下载学习使用&#xff01; 3.5.4 消息 3.5.4.1 简述 消息发送方&#xff1a;…

力扣(LeetCode)29. 两数相除(C++)

快速乘 题解只使用了 intintint 。 万恶的 INT_MININT\_MININT_MIN&#xff0c;怎么处理&#xff1f;打不过就加入——被除数和除数转为负数计算 。 xxx 除以 yyy &#xff0c;等于从 xxx 中拿出若干个 yyy 。 从 xxx 中拿出 yyy 的数量&#xff0c;就是 xyx\div yxy 的商。 y…

Allegro差分自动添加回流地孔操作指导

Allegro差分自动添加回流地孔操作指导 Allegro自带给差分添加回流地孔的功能,具体操作如下 点击connect命令,任意拉一对差分 鼠标右击,选择Return Path。。。,选择settings 会弹出设置的对话框,Assign net name给孔分配一个网络,一般是GND, Return Path via 选择添加…

2019年1+X 证书 Web 前端开发中级理论考试题目原题+答案——第五套

&#x1f4da;文章目录 &#x1f3af;关于1X标准 &#x1f3af;关于中级考点 ⏩&#x1f4bb;答案速查 理论题(满分100分)&#xff0c;包括单选题、多选题、判断题。 &#x1f4d1;一、单选题&#xff08;每小题2分&#xff0c;共30小题&#xff0c;共60分&#xff09; &…