zk之数据的发布与订阅

news2024/9/23 11:25:23

数据的发布和订阅:

(1)数据的发布与订阅是一个一对多的关系。多个订阅者对象同时监听某一个主题对象。这个主题对象在自身状态发生变化时,会通知所有的订阅者对象,使它们能够自动的更新自己的状态。发布和订阅可以让发布方和订阅放独立封装。
(2)当一个对象改变的时候,需要同时改变其它的对象,而且不知道多少个对象需要改变时,那么就可以使用发布和订阅模式。数据的发布和订阅在分布式中的应用主要有配置管理,和服务发现。
(3)配置管理是指如果集群中的机器拥有某些相同的配置时,并且这些配置信息需要动态的改变,我们就可以使用数据的发布和订阅模式把配置做统一的管理。让这些机器各自订阅配置信息的改变。当配置发生改变的时候,这些机器就可以得到通知。并且更新为最新的配置。
(4)服务发现是指,对集群中的服务上下线做统一的管理,每台服务器都可以作为数据的发布方,向集群注册自己的基本信息。而让某些监控服务器作为订阅工作服务器的基本信息。
(5)当工作服务器的基本信息发生改变的时候,比如说上下线,服务器角色改变,服务范围的变更,那么监控服务器可以得到通知,并且响应这些变化。

基本模型如下

​​​​​​​​​​​​在这里插入图片描述
左侧浅紫色的区域代表的是zk集群,右侧的方块代表的是工作服务器集群。其中,前3个方块代表的是工作服务器。绿色的方块代表的是管理服务器。最下面的方块代表的是控制服务器。Zk中有三类的节点,首先是config节点,它用于我们的配置管理,manageServer可以通过config来下发配置信息。workServer可以通过订阅config来改变更新自己的配置信息。

Servers节点用于服务发现,每个workServer在启动的时候,都会在Servers下创建一个临时节点,manager节点充当的monitor,监控servers节点下的子节点的改变,来更新工作服务器的列表信息。最后我们可以通过control Server,由command节点作为中介向manageServer发送控制指令。controlServer向command节点写入控制信息,manageServer订阅command节点的数据改变,来监听并且执行命令。

代码基本流程图

1.manage server 程序主体工作流程

在这里插入图片描述

2.work server 程序主体流程

在这里插入图片描述

3.系统核心类基本模型

在这里插入图片描述
serverConfig 用来记录workServer的配置信息
serverData 用来记录workServer的基本信息
subscribeZkClient 作为整个类的入口,用来启动workServer和manageServer.
demo 如下:

/**
 * 下面demo就是一个典型的发布订阅系统:
 * 集群中每台机器在启动阶段,都会到该节点上获取数据库的配置信息,同时客户端还需要在在节
 * 点注册一个数据变更的watcher监听,一旦该数据节点发生变更,就会受到通知信息。
 */
public class ConfigTest {
    /**
     * 配置中心父节点
     */
    private static final String PATH = "/server/database_config";
    private static final String zkAddress = "127.0.0.1:2181";
    private static final int timeout = 1000;

    private static CuratorFramework client = null;

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    /**
     * 客户端的连接状态监听
     */
    static ConnectionStateListener clientListener = new ConnectionStateListener() {

        public void stateChanged(CuratorFramework client,
                                 ConnectionState newState) {
            if (newState == ConnectionState.CONNECTED) {
                System.out.println("connected established");
                countDownLatch.countDown();
            } else if (newState == ConnectionState.LOST) {
                System.out.println("connection lost,waiting for reconection");
                try {
                    System.out.println("reinit---");
                    reinit();
                    System.out.println("inited---");
                } catch (Exception e) {
                    System.err.println("re-inited failed");
                }
            } else if (newState == ConnectionState.SUSPENDED) {
                System.out.println("suspended");
            } else {
                System.out.println(newState);
            }

        }
    };

    public static void main(String[] args) throws Exception {
        //1、初始化curator
        init();
        //2、判断父节点是否存在,不在的话创建该节点(这个节点本身应在服务器手动添加的)
        Stat stat = client.checkExists().forPath(PATH);
        if (stat == null) {
            client.create().creatingParentsIfNeeded().forPath(PATH);
        }
        //3、对path的变更进行监听
        watcherPath(PATH, pathWatcher);
        //4、模拟阻塞场景,可以客户端改变数据测试
        Thread.sleep(Integer.MAX_VALUE);
    }


    public static void init() throws Exception {
        client = CuratorFrameworkFactory.builder().connectString(zkAddress)
                .sessionTimeoutMs(timeout)
                .retryPolicy(new RetryNTimes(5, 5000)).build();
        // 客户端注册连接状态监听器,进行连接配置(客户端连接的状态会被相应 的监听器监听)
        client.getConnectionStateListenable().addListener(clientListener);
        client.start();
        // 连接成功后,才进行下一步的操作(连接成功会触发监听器中的countDownLatch.await())
        countDownLatch.await();
    }

    public static void reinit() {
        try {
            unregister();
            init();
        } catch (Exception e) {
            // TODO: handle exception
        }
    }

    public static void unregister() {
        try {
            if (client != null) {
                client.close();
                client = null;
            }
        } catch (Exception e) {
            System.out.println("unregister failed");
        }
    }

    /**
     * 对path进行监听配置
     *
     * @param path
     * @param watcher
     * @return
     * @throws Exception
     */
    public static String watcherPath(String path, CuratorWatcher watcher)
            throws Exception {
        //只是改变数据时候会反应(或者是使用getChildren、exist。或者是在创建客户端构造函数进行watcher监听)
        byte[] buffer = client.getData().usingWatcher(watcher).forPath(path);

        System.out.println("获取节点的信息:" + new String(buffer));
        return new String(buffer);
    }

    /**
     * 读取path数据
     *
     * @param path
     * @return
     * @throws Exception
     */
    public static String readPath(String path) throws Exception {
        byte[] buffer = client.getData().forPath(path);
        return new String(buffer);

    }

    /**
     *
     * 对path进行改变监听的watcher
     */
    private static CuratorWatcher pathWatcher = new CuratorWatcher() {
        public void process(WatchedEvent event) throws Exception {
            System.out.println(event.getType());
            // 当数据变化后,重新获取数据信息
            if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
                //获取更改后的数据,进行相应的业务处理
                String value = readPath(event.getPath());
                System.out.println(value);
            }

        }
    };

}```

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/498965.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot项目修改application.yml,application-prod.yml配置文件中的端口,数据库链接等信息后,项目突然不能运行

SpringBoot项目修改application.yml,application-prod.yml配置文件中的端口,数据库链接等信息后,项目突然不能运行 问题记录 ,SpringBoot项目修改application.yml,application-prod.yml配置文件中的端口,数…

跟姥爷深度学习5 浅用卷积网络做mnist数字识别

一、前言 前面用TensorFlow浅做了一个温度预测,使用的是全连接网络,同时我们还对网上的示例做了调试和修改,使得预测结果还能看。本篇我们更进一步使用CNN(卷积)网络,不过再预测温度就有点大材小用&#x…

Stable Diffusion Webui 本地部署【踩坑记录】

1、安装python Python Release Python 3.10.6 | Python.org 2、安装git git是一个代码管理工具,通过它可以将开源项目仓库克隆到本地 下载地址:Git - Downloading Package 3、下载stable-diffusion-webui 可以新建一个目录,在文件夹内单…

代数余子式怎么求

代数余子式是矩阵中每个元素的代数余数,可以通过以下步骤求得: 1. 找到该元素所在的行和列,将其删除,得到一个新的矩阵。 2. 计算新矩阵的行列式,乘以(-1)^(行号列号),即为该元素的代数余子式。 例如 对…

vulnhub靶场之nasef1

1.信息收集 探测存活主机,发现192.168.239.176存活 对目标主机192.168.239.176进行端口扫描,发现存活22、80端口 浏览器访问http://192.168.239.176/,发现为apache2的页面,查看源码,未发现异常。 对http://192.16…

-笔记 tps qps

页面请求异步处理 将请求 扔进 kafka, Mq等 MQ单机抗几万并发也是ok的 底层批量处理 sql 处理 尽量批量处理,减少耗时 分库分表, 可能到了最后数据库层面还是免不了抗高并发的要求,好吧,那么就将一个数据库拆分为多个库&#xf…

Java ---System类

System 类位于 java.lang 包,代表当前 Java 程序的运行平台,系统级的很多属性和控制方法都放置在该类的内部。由于该类的构造方法是 private 的,所以无法创建该类的对象,也就是无法实例化该类。 System 类提供了一些类变量和类方…

PBDB Data Service:Thumbnail images of lifeforms(生命形式的缩略图)

Thumbnail images of lifeforms(生命形式的缩略图) 描述用法参数方法响应值格式术语表 描述 此操作返回表示指定分类的图像,或关于图像的信息。如果后缀是 .png,则返回图像内容数据。否则,将以指定的格式返回一个描述…

2023年全国硕士研究生入学统一考试英语(二)试题

2023年全国硕士研究生入学统一考试英语(二)试题 Section I Use of English Directions: Read the following text. Choose the best word ( s) for each numbered blank and mark A, B , C or D on the ANSWER SHEET. ( 10 points) Here’s a common …

OSS文件打包下载

前言 OSS 存放了很多项目(项目是 TMagic 低代码平台编辑生成,自动上传 OSS),现在需要在管理后台将项目打包ZIP下载,并不在本地生成文件。 OSS 要下载项目文件: 一、思路实现 创建 OSSClient 实例获取 Bu…

K8s基础6——应用配置管理方案、调度策略、污点和污点容忍

文章目录 一、应用配置管理方案1.1 ConfigMap1.1.1 注入变量1.1.2 挂载数据卷 1.2 Secret 二、调度策略2.1 nodeSelector定向调度2.1.1 正例2.1.2 反例 2.2 nodeAffinity亲和力调度2.2.1 In硬策略2.2.2 NotIn硬策略2.2.3 软策略 2.3 PodAffinity亲和力调度2.3.1 pod共存2.3.2 p…

【机器学习】信息量、香农熵、信息增益

这节可以搭配 【机器学习】Logistic回归(重新整理)信息量(信息)信息量公式的推理过程 香农熵信息增益 【机器学习】Logistic回归(重新整理) B站视频:“交叉熵”如何做损失函数?打包…

RabbitMQ、RabbitMQ发布/订阅模式

1.RabbiMQ RabbitMQ是一个消息中间件 MQ的基本结构 1.1RabitMQ安装 参考:Docker安装 Docker中部署RabbitMQ 2.入门案例 2.1.publisher实现 package cn.itcast.mq.helloworld;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; im…

链表中倒数第k个结点

描述 输入一个链表,输出该链表中倒数第k个结点。 示例1 输入: 1,{1,2,3,4,5} 复制返回值: {5}看代码 struct ListNode* FindKthToTail(struct ListNode* pListHead, int k ) {struct ListNode* fast pListHead, *slow pListHead;whi…

Redis实现分布式锁详细解读

文章目录 什么是分布式锁?如何用Redis实现分布式锁?分布式锁的改进锁过期处理集群环境下Redis宕机问题RedLock的引入RedLock的实现步骤RedLock带来的弊端 什么是分布式锁? 我们在学多线程的时候遇到过ReetrantLock,这种锁主要应用…

node install编译失败原因

关键报错信息 npm ERR! gyp verb check python checking for Python executable "python2.7" in the PATH npm ERR! gyp verb which failed Error: not found: python2.7 或者 npm ERR! code ERESOLVE npm ERR! ERESOLVE could not resolve npm ERR! npm ERR! Whi…

车载以太网时间同步之EthTsync

车载以太网时间同步之EthTsync 前言 首先,请问大家几个小小问题,你清楚: 你知道EthTsync模块的主要作用是什么吗?EthTsync模块与其他AUTOSAR基础软件模块交互关系;Eth Tsync模块使用的时间同步协议是什么&#xff1f…

Java—JDK8新特性—函数式接口

目录 函数式接口 3.1 什么是函数式接口 3.2 functionalinterface注解 源码分析 3.3 Lambda表达式和函数式接口关系 3.4 使用函数式接口 函数式接口 3.1 什么是函数式接口 如果一个接口中只包含一个抽象方法,这个接口称为函数式接口 如果一个接口包含&#xff0…

mac php8 安装xdebug模块失败

安装 xdebug 模块,官网有详细介绍Xdebug: Documentation Installation 本机是mac php使用brew安装,想着可以直接使用以下方式安装,还是美滋滋的 但是安装途中发生了错误 PHP Warning: mkdir(): File exists in /usr/local/Cellar/php/8.0.10/share/php/pear/System.php on…

解决报错ERROR: No matching distribution found for torchvision==0.11.2+cu111

目录 一、猜测 二、验证 三、解决方案 四、检验 该报错是在按官网方法用指令: pip install torch1.9.1cu111 torchvision0.10.1cu111 torchaudio0.9.1 -f https://download.pytorch.org/whl/torch_stable.html 安装pytorch时出现的,以下是分析&#…