ZooKeeper快速入门学习+在springboot中的应用+监听机制的业务使用

news2024/11/18 0:50:05

目录

前言

基础知识

一、什么是ZooKeeper

二、为什么使用ZooKeeper

三、数据结构

四、监听通知机制

五、选举机制

使用

1 下载zookeeper

2 修改

3 排错

在SpringBoot中的使用

安装可视化插件

依赖 配置

安装httpclient方便测试

增删查改

新建控制器

创建节点

查询节点

更新节点

删除节点

使用监听

新建监听器

更改控制器中的方法。

使用httpclient请求,结果如下 

注意事项

业务使用


前言

在很多时候,我们都可以在各种框架应用中看到ZooKeeper的身影,比如Kafka中间件,Dubbo框架,Hadoop等等。为什么到处都看到ZooKeeper?

基础知识

一、什么是ZooKeeper

        ZooKeeper是一个分布式服务协调框架,提供了分布式数据一致性的解决方案,基于ZooKeeper的数据结构,Watcher,选举机制等特点,可以实现数据的发布/订阅,软负载均衡,命名服务,统一配置管理,分布式锁,集群管理等等。

        Zookeeper 的核心实现是一个分布式的数据存储系统,其内部采用 ZAB 协议(Zookeeper Atomic Broadcast)进行主从复制,确保了数据的一致性和可靠性。在 Zookeeper 中,数据存储采用了一种称为“Znode”的数据模型,类似于 Unix 文件系统。

Znode 是 Zookeeper 中最基本的数据单元,是一个有层级的树形结构,每个节点都有一个路径,其中根节点为“/”,子节点路径会在父节点路径的基础上加上相对路径,最终形成一棵完整的树形结构。

除了 Znode 外,Zookeeper 还支持节点的监听和事件机制,监听可以让客户端对 Znode 的变化做出及时响应,增强了应用的实时性。

二、为什么使用ZooKeeper

ZooKeeper能保证:

  • 更新请求顺序进行。来自同一个client的更新请求按其发送顺序依次执行
  • 数据更新原子性。一次数据更新要么成功,要么失败
  • 全局唯一数据视图。client无论连接到哪个server,数据视图都是一致的
  • 实时性。在一定时间范围内,client读到的数据是最新的

三、数据结构

ZooKeeper的数据结构和Unix文件系统很类似,总体上可以看做是一棵树,每一个节点称之为一个ZNode,每一个ZNode默认能存储1M的数据。每一个ZNode可通过唯一的路径标识。如下图所示:

创建ZNode时,可以指定以下四种类型,包括:

  • PERSISTENT,持久性ZNode。创建后,即使客户端与服务端断开连接也不会删除,只有客户端主动删除才会消失。
  • PERSISTENT_SEQUENTIAL,持久性顺序编号ZNode。和持久性节点一样不会因为断开连接后而删除,并且ZNode的编号会自动增加。
  • EPHEMERAL,临时性ZNode。客户端与服务端断开连接,该ZNode会被删除。
  • EPEMERAL_SEQUENTIAL,临时性顺序编号ZNode。和临时性节点一样,断开连接会被删除,并且ZNode的编号会自动增加。

四、监听通知机制

Watcher是基于观察者模式实现的一种机制。如果我们需要实现当某个ZNode节点发生变化时收到通知,就可以使用Watcher监听器。

客户端通过设置监视点(watcher)向 ZooKeeper 注册需要接收通知的 znode,在 znode 发生变化时 ZooKeeper 就会向客户端发送消息

这种通知机制是一次性的。一旦watcher被触发,ZooKeeper就会从相应的存储中删除。如果需要不断监听ZNode的变化,可以在收到通知后再设置新的watcher注册到ZooKeeper。

监视点的类型有很多,如监控ZNode数据变化、监控ZNode子节点变化、监控ZNode 创建或删除

五、选举机制

ZooKeeper是一个高可用的应用框架,因为ZooKeeper是支持集群的。ZooKeeper在集群状态下,配置文件是不会指定Master和Slave,而是在ZooKeeper服务器初始化时就在内部进行选举,产生一台做为Leader,多台做为Follower,并且遵守半数可用原则。

由于遵守半数可用原则,所以5台服务器和6台服务器,实际上最大允许宕机数量都是3台,所以为了节约成本,集群的服务器数量一般设置为奇数

如果在运行时,如果长时间无法和Leader保持连接的话,则会再次进行选举,产生新的Leader,以保证服务的可用

使用

1 下载zookeeper

https://dlcdn.apache.org/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz

2 修改

将下面的这个配置文件复制一份然后将新复制的改名为zoo.cfg

然后更改文件内容为

tickTime=2000
dataDir=X:\\mygreensoftware\\apache-zookeeper-3.8.1-bin\\tmp\\data
dataLogDir=X:\\mygreensoftware\\apache-zookeeper-3.8.1-bin\\tmp\\logs
clientPort=2181

创建设置的目录

双击启动。。

3 排错

wc,闪卡了!!

别急我们可能是JAVA_HOME没有配置好,我们进入环境变量配置JAVA_HOME

注意这里是jdk的主路径,不带bin奥!! 

ok,再次启动试试,好了,这次没有问题了

在SpringBoot中的使用

安装可视化插件

 侧边

新建连接

127.0.0.1:2181

 然后点击connect

 

依赖 配置

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>
zookeeper: 
  server: 127.0.0.1:2181
  timeout: 3000

配置类

 

package com.scm.springbootzookper.config;

import org.apache.zookeeper.ZooKeeper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;

@Configuration
public class ZookeeperConfig {
    @Value("${zookeeper.server}")
    private String server;

    @Value("${zookeeper.timeout}")
    private Integer timeout;

    @Bean
    public ZooKeeper zkClient() throws IOException {
        return new ZooKeeper(server, timeout, watchedEvent -> {});
    }
}

安装httpclient方便测试

可参照以下

https://blog.csdn.net/qq_53679247/article/details/130841001

增删查改

新建控制器

package com.scm.springbootzookper.controller;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api")
public class ZookController {

    @Autowired
    ZooKeeper zkClient;

    @GetMapping("/zookeeper")
    public String getData() throws KeeperException, InterruptedException {
        String path = "/zookeeper";
        boolean watch = true;
        byte[] data = zkClient.getData(path, watch, null);
        return new String(data);
    }

}

使用http client测试

 

创建节点

API

public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
  • path ZNode路径
  • data ZNode存储的数据
  • acl ACL权限控制
  • createMode ZNode类型

 

    @GetMapping("/addNode/{nodename}/{data}")
    public String addNode(@PathVariable("nodename")String nodename, @PathVariable("data") String data1){
        // 创建节点的路径
        String path = "/"+nodename;
        // 节点数据
        String data =data1;
        // 权限控制
        List<ACL> aclList = ZooDefs.Ids.OPEN_ACL_UNSAFE;
        // 创建节点的类型
        CreateMode createMode = CreateMode.PERSISTENT;

        String result = null;
        try {
            result = zkClient.create(path, data.getBytes(), aclList, createMode);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return result;
    }

httpclent 环境数据

{
  "dev": {
    "name": "value",
    "test": "test",
    "nodename": "node1",
    "data": "我是测试数据1"
  }
}

 

查询节点

    @GetMapping("/getData/{nodename}")
    public String getData(@PathVariable("nodename") String nodename){
        //数据的描述信息,包括版本号,ACL权限,子节点信息等等
        Stat stat = new Stat();
        //返回结果是byte[]数据,getData()方法底层会把描述信息复制到stat对象中
        byte[] bytes;
        String path="/"+nodename;
        try {
            bytes = zkClient.getData(path, false, stat);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        //打印结果
        System.out.println("ZNode的数据data:" + new String(bytes));//Hello World
        System.out.println("获取到dataVersion版本号:" + stat.getVersion());//默认数据版本号是0
        return new String(bytes);
    }

更新节点

删除和更新操作,必须获取到版本号才能进行修改

    @GetMapping("/setData/{nodename}/{data}")
    public String setData(@PathVariable("nodename")String nodename, @PathVariable("data") String data1){
        String path = "/"+nodename;
        String data = data1;
        int version = 0;

        Stat stat = null;
        try {
            stat = zkClient.setData(path, data.getBytes(), version);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return stat.toString();
    }

删除节点

    
    @GetMapping("/deleteNode/{nodename}")
    public String deleteNode(@PathVariable("nodename")String nodename){
        String path = "/"+nodename;
        int version = 0;
        try {
             zkClient.delete(path, version);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return "OK!";
    }

使用监听

新建监听器

package com.scm.springbootzookper.watch;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.springframework.stereotype.Component;

@Component
public class MyWatcher implements Watcher {
    @Override
    public void process(WatchedEvent watchedEvent) {
        Event.KeeperState state = watchedEvent.getState();
        Event.EventType type = watchedEvent.getType();
        System.out.println("检测到节点发生变化.....");
        System.out.println("节点名称:"+state.name());
        System.out.println("事件类型:"+type.name());
        System.out.println("节点路径"+watchedEvent.getPath());

    }
}

更改控制器中的方法。

    @GetMapping("/setData/{nodename}/{data}")
    public String setData(@PathVariable("nodename")String nodename, @PathVariable("data") String data1) throws InterruptedException, KeeperException {
        String path = "/"+nodename;
        zkClient.exists(path, new MyWatcher());
        String data = data1;
        // 这里必须先拿到版本号才能更新
        int version =5;

        Stat stat = null;
        try {
            stat = zkClient.setData(path, data.getBytes(), version);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return stat.toString();
    }

使用httpclient请求,结果如下 

注意事项

需要注意的是,注册一次监听器只能使用一次,使用完就失效了。 

串行执行。客户端Watcher回调的过程是一个串行同步的过程,这是为了保证顺序。

业务使用

判断通知时节点的更改类型,进行其他操作。

package com.scm.springbootzookper.watch;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.springframework.stereotype.Component;

@Component
public class MyWatcher implements Watcher {
    @Override
    public void process(WatchedEvent watchedEvent) {
        Event.KeeperState state = watchedEvent.getState();
        Event.EventType type = watchedEvent.getType();
        if (Event.EventType.NodeDataChanged.getIntValue()==type.getIntValue()) {
            System.out.println("节点被修改了!");
        }
        if (Event.EventType.NodeDeleted.getIntValue()==type.getIntValue()) {
            System.out.println("节点被删除了!");
        }
        System.out.println("检测到节点发生变化.....");
        System.out.println("节点名称:"+state.name());
        System.out.println("事件类型:"+type.name());
        System.out.println("节点路径"+watchedEvent.getPath());

    }
}

可以进行一些业务操作。

以下是Watch接口的源码,我们可以注意到其中的枚举类型,

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.zookeeper;

public interface Watcher {
    void process(WatchedEvent var1);

    public interface Event {
        public static enum EventType {
            None(-1),
            NodeCreated(1),
            NodeDeleted(2),
            NodeDataChanged(3),
            NodeChildrenChanged(4);

            private final int intValue;

            private EventType(int intValue) {
                this.intValue = intValue;
            }

            public int getIntValue() {
                return this.intValue;
            }

            public static EventType fromInt(int intValue) {
                switch (intValue) {
                    case -1:
                        return None;
                    case 0:
                    default:
                        throw new RuntimeException("Invalid integer value for conversion to EventType");
                    case 1:
                        return NodeCreated;
                    case 2:
                        return NodeDeleted;
                    case 3:
                        return NodeDataChanged;
                    case 4:
                        return NodeChildrenChanged;
                }
            }
        }

        public static enum KeeperState {
            /** @deprecated */
            @Deprecated
            Unknown(-1),
            Disconnected(0),
            /** @deprecated */
            @Deprecated
            NoSyncConnected(1),
            SyncConnected(3),
            AuthFailed(4),
            ConnectedReadOnly(5),
            SaslAuthenticated(6),
            Expired(-112);

            private final int intValue;

            private KeeperState(int intValue) {
                this.intValue = intValue;
            }

            public int getIntValue() {
                return this.intValue;
            }

            public static KeeperState fromInt(int intValue) {
                switch (intValue) {
                    case -112:
                        return Expired;
                    case -1:
                        return Unknown;
                    case 0:
                        return Disconnected;
                    case 1:
                        return NoSyncConnected;
                    case 3:
                        return SyncConnected;
                    case 4:
                        return AuthFailed;
                    case 5:
                        return ConnectedReadOnly;
                    case 6:
                        return SaslAuthenticated;
                    default:
                        throw new RuntimeException("Invalid integer value for conversion to KeeperState");
                }
            }
        }
    }
}

END.........

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/563493.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

k8s中部署nginx-ingress实现外部访问k8s集群内部服务

k8s通过nginx-ingress实现集群外网访问功能 一&#xff1a;ingress概述 1.1 ingress 工作原理 step1&#xff1a;ingress contronler通过与k8s的api进行交互&#xff0c;动态的去感知k8s集群中ingress服务规则的变化&#xff0c;然后读取它&#xff0c;并按照定义的ingress规…

jsp手机回收软件系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 jsp手机回收软件系统 是一套完善的web设计系统&#xff0c;对理解JSP java编程开发语言有帮助 &#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为 TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为Mysql&#xff0c;使…

Moonbeam社区治理|参与委托投票问卷,瓜分2000U奖励

社区治理升级意味着公链正走向可持续和透明化发展&#xff0c;让每位GLMR所有者都参与治理&#xff0c;是Moonbeam成为真正去中心化公链的重要一环。 Moonbeam治理 OpenGov为Moonbeam生态带来了多角色委托功能&#xff0c;使Token持有者能够根据track委托Token进行投票。委托…

零基础如何入门渗透测试

作为一名多年的渗透测试工程师&#xff0c;了解到很多零基础的初学者都面临着学习渗透测试的困难。在这里&#xff0c;我会提供一些指导性的建议和方法&#xff0c;帮助初学者快速入门&#xff0c;开启学习之旅。 一、什么是渗透测试 在学习渗透测试之前&#xff0c;建议先了解…

虹科技术 | 虹科EtherCAT增量编码器输入模块数据采集实操测试

1. 背景介绍 编码器是将信号或数据进行编制、转换为可用以通讯、传输和存储的信号形式的设备。编码器把角位移或直线位移转换成电信号&#xff0c;前者称为码盘&#xff0c;后者称为码尺。按照读出方式编码器可以分为接触式和非接触式两种&#xff1b;按照工作原理编码器可分为…

Android | Android 系统架构

参考&#xff1a; Android Developers(https://developer.android.google.cn/) 平台架构 Android 是基于 Linux 的开源软件栈&#xff0c;下图为官网给出的 Android 平台主要组件。 Android 平台从上&#xff08;直接与用户交互&#xff09;到下&#xff08;直接与硬件交互&a…

Mastodon 长毛象多租户:自定义域名、自定义账号别名

概念 自定义域名后缀 假设&#xff0c;Mastodon 主节点域名 domain1.com&#xff0c;我在该域名下拥有一个用户 user1domain1.com。 配置自定义域名后缀支持后&#xff0c;也可以通过 user1domain2.com 搜索到。该配置需要在主节点中设置 ALTERNATE_DOMAINS。 自定义账号别…

DOS的常用指令:

DOS的常用指令&#xff1a; DOS【介绍】&#xff1a;磁盘操作系统 cmd是操作DOS的媒介&#xff0c;dos可以操作Windows的目录结构&#xff0c; 基本操作指令&#xff1a; cmd【控制台】->发给dos【解析】->win的目录结构 常用操作指令&#xff1a; 《一》目录操作 &a…

QT学习笔记-QT5.15.2使用qtopcua5.15.2实现与PLC通讯(上)

QT学习笔记-QT5.15.2使用qtopcua5.15.2实现与PLC通讯&#xff08;上&#xff09; 环境说明背景思路perl依赖安装qtopcua插件编译解决编译报错问题解决安装mingw32-make install报错问题 环境说明 操作系统&#xff1a;Windows10 专业版 64位 开发工具&#xff1a;Qt 5.15.2 OP…

Python提取PDF文字的10个方法,OCR识别扫描版pdf,图片pdf格式的10种ocr汉字识别方法

Python 读取扫描版 PDF、图片 PDF 并进行 OCR 识别的方法&#xff1a; pytesseract&#xff1a;一种基于 Python 的 OCR 库&#xff0c;可用于识别扫描版 PDF 和图片 PDF 中的文本。 它可以使用 Google 的 OCR 引擎进行识别&#xff0c;也可以使用本地的 OCR 引擎进行识别。使…

阿里云免费ssl证书申请与部署

一、证书申请 1、找到 ssl 证书 2、点击选择SSL 证书 进入其管理控台 3、如果你还没有免费证书&#xff0c;选择购买即可&#xff0c;一个自然年内每个账号可以领取一次数量为20的免费单域名试用证书额度&#xff0c;我的已经购买过来&#xff0c;今年的&#xff0c;所以无法…

网络安全各类WAF绕过技巧

一、WAF绕过 1、脏数据绕过 即传入一段长数据使waf失效&#xff0c;从而实现绕过waf。某些waf处理POST的数据时&#xff0c;只会检测开头的8K&#xff0c;后面选择全部放过。 例如&#xff0c;当发现某网站存在一个反序列化漏洞时&#xff0c;但是无回显&#xff0c;被waf拦…

MQTT中间件Eclipse Mosquitto安装和使用(.asc文件)MQTT监控命令mosquitto_sub(mosquitto C++库源码编译)

昨天弄的&#xff0c;今天忘了不少。。。 文章目录 参考链接安装MQTT服务中间件安装启动与查询卸载与清理 MQTT C支持库安装&#xff08;使C能使用相关库函数&#xff09;离线安装&#xff08;通过源码&#xff09;ubuntu官网下载软件包编译mosquitto客户端库 mosquitto Docker…

后端SpringBoot应用向云原生K8S平台迁移

目录 一、引言二、方式1&#xff1a;在K8S上部署Spring Cloud Alibaba三、方式2&#xff1a;在K8S上部署Spring Cloud K8S3.1 第1次优化&#xff1a;移除Spring Cloud K8S DiscoveryClient 四、方式3&#xff1a;在K8S上部署SpringBoot应用4.1 第2次优化&#xff1a;移除Spring…

acwing提高--DFS之剪枝与优化

剪枝与优化的方法 1.优化搜索顺序 大部分情况下&#xff0c;我们应该优先搜索分支较少的节点 2.排除等效冗余 3.可行性剪枝 4.最优性剪枝 5.记忆化搜索&#xff08;DP&#xff09; 1.小猫爬山 题目https://www.acwing.com/problem/content/description/167/ 1.优化搜索顺…

《操作系统》期末最全复习题及解析

文章目录 选择题填空题简答题程序题综合题1.银行家算法2.页面置换算法3.进程调度算法4.磁盘调度算法5.求物理/逻辑地址6.分页存储管理7.可变分区分配算法 选择题 若信号量S的初值为2&#xff0c;且有3个进程共享此信号量&#xff0c;则S的取值范围是&#xff08;B &#xff09;…

单词长度统计,统计数据放入列表

输入一段英文计算每个单词长度&#xff0c;统计不含非英文字符&#xff0c;列表输出。 【学习的细节是欢悦的历程】 Python 官网&#xff1a;https://www.python.org/ Free&#xff1a;大咖免费“圣经”教程《 python 完全自学教程》&#xff0c;不仅仅是基础那么简单…… 地址…

AI与税务管理:新技术带来的新机遇和新挑战

本文作者&#xff1a;王伊琳 人工智能&#xff08;Artificial Intelligence&#xff0c;AI&#xff09;是指由计算机系统或机器人模拟人类智能的过程和结果&#xff0c;包括感知、理解、学习、推理、决策等能力。近年来&#xff0c;随着计算机技术、互联网平台、大数据分析等的…

AI工具 ChatGPT-4 vs Google Bard , PostgreSQL 开发者会pick谁?

在人工智能 (AI) 进步的快节奏世界中&#xff0c;开发人员正在寻找最高效和突破性的解决方案来加快和提高他们的工作质量。对于 PostgreSQL 开发人员来说&#xff0c;选择理想的 AI 支持的工具以最专业的方式解决他们的查询至关重要。 近年来&#xff0c;人工智能工具的普及率…

Redis如何做到内存高效利用?过期key删除术解析!

大家好&#xff0c;我是小米&#xff0c;一个热衷于分享技术的小伙伴。今天我要和大家探讨一个关于 Redis 的话题&#xff1a;删除过期key。在使用 Redis 进行数据存储和缓存时&#xff0c;我们经常会遇到过期数据的处理问题。接下来&#xff0c;我将为大家介绍为什么要删除过期…