ZooKeeper 客户端API操作

news2025/1/11 18:43:00

文章目录

  • 一、节点信息
    • 1、创建节点
    • 2、获取子节点并监听节点变化
    • 3、判断节点是否存在
    • 4、客户端向服务端写入数据
      • 写入请求直接发给 Leader 节点
      • 写入请求直接发给 follow 节点
  • 二、服务器动态上下线监听
    • 1、监听过程
    • 2、代码
  • 三、分布式锁
    • 1、什么是分布式锁?
    • 2、Curator 框架实现分布式锁

一、节点信息

前提:centos102、centos103、centos104 服务器都已经开启

pom.xml 依赖

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
        <version>2.17.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.5.7</version>
    </dependency>
    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter</artifactId>
        <version>RELEASE</version>
        <scope>compile</scope>
    </dependency>
</dependencies>

log4j.properties 配置

# 设置全局的日志记录级别为 INFO
log4j.rootLogger=INFO, stdout

# 控制台输出
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

# 文件输出
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

1、创建节点

zkClient.java 代码

// 注意:逗号后面不能有空格
private String connectString = "centos102:2181,centos103:2181,centos104:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;

// 创建客户端
@Before
public void init() throws IOException {

    zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
        @Override
        public void process(WatchedEvent event) {

        }
    });
}

// 创建子节点
@Test
public void create() throws InterruptedException, KeeperException {
    String nodeCreated = zkClient.create("/frost", "cat".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    
}

运行创建子节点,看看是否创建了该节点

在这里插入图片描述

2、获取子节点并监听节点变化

@Test
public void getChildren() throws InterruptedException, KeeperException {
    List<String> children = zkClient.getChildren("/", true);

    for (String child : children) {
        System.out.println(child);
    }
}

那如果此时我再创建一个节点,此时控制台没有任何变化,我想要创建一个节点控制台能够看到相关变化怎么办?此时只需要将程序保持不结束,然后将客户端查看子节点函数放入监听器中。

3、判断节点是否存在

@Test
public void exit() throws InterruptedException, KeeperException {
    Stat stat = zkClient.exists("/frost", false);
    System.out.println(stat == null ? "not exits" : "exits");
}

4、客户端向服务端写入数据

写入请求直接发给 Leader 节点

  1. 客户端发送写入请求,leader节点执行写入操作
  2. leader通知follow1执行写入操作
  3. folllow1写入完毕给leader返回确认ack
  4. 现在半数以上服务器完成写入,leader给客户端发送确认ack
  5. leader通知follow2写入
  6. follow2写入完毕给leader发送确认ack
    在这里插入图片描述

写入请求直接发给 follow 节点

  1. 客户端发送写入请求,
  2. follow1 将写入请求发送给leader
  3. leader节点执行写入操作,然后leader通知follow1执行写入操作
  4. folllow1写入完毕给leader返回确认ack
  5. 现在半数以上服务器完成写入,leader给follow1发送确认ack
  6. follow1给客户端发送确认ack
  7. leader通知follow2写入
  8. follow2写入完毕给leader发送确认ack

在这里插入图片描述

二、服务器动态上下线监听

1、监听过程

在这里插入图片描述

以下红色字体写错,应该是下线则通知注册监听器的客户端
在这里插入图片描述

对于ZooKeeper集群来说,客户端和服务器都相当于客户端,区别在于:服务器在ZooKeeper集群中是创建节点,客户端在ZooKeeper是监听信息。

在这里插入图片描述

2、代码

服务器注册到zk集群

import org.apache.zookeeper.*;
import java.io.IOException;

public class DistributeServer {

    private String connectString = "centos102:2181,centos103:2181,centos104:2181";
    private int sessionTimeout = 2000;
    ZooKeeper zk;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        DistributeServer server = new DistributeServer();

        // 1. 获取zk连接
        server.getConnect();

        // 2. 注册服务器到 zk 集群
        server.regist(args[0]);

        // 3. 启动业务逻辑(睡觉)
        server.business();
    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void regist(String hostname) throws InterruptedException, KeeperException {
        String create = zk.create("/servers", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 临时带序号的节点
        System.out.println(hostname + "is online");

    }

    private void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {

            }
        });
    }
}

客户端进行监听

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class DistributeClient {

    private String connectString = "centos102:2181,centos103:2181,centos104:2181";
    private int sessionTimeout = 2000;
    ZooKeeper zk;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        DistributeClient client = new DistributeClient();

        // 1. 获取zk连接
        client.getConnect();

        // 2. 监听/servers下子节点的增加和删除
        client.getServerList();

        // 3. 启动业务逻辑(睡觉)
        client.business();
    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void getServerList() throws InterruptedException, KeeperException {
        List<String> children = zk.getChildren("/servers", true);
        ArrayList<String> servers = new ArrayList<>();
        for (String child : children) {
            byte[] data = zk.getData("/servers/" + child, false, null);
            servers.add(new String(data));
        }
        System.out.println(servers);
    }

    private void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    getServerList();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

启动客户端,然后在服务器上进行增加节点监听
在这里插入图片描述

删除节点监听
在这里插入图片描述

因为我们服务端的代码传参了,所以我们需要设置一下这个参数:
在这里插入图片描述

下图代表服务端启动的是hadoop102节点
在这里插入图片描述

先把客户端启动起来,发现有一个节点hadoop101:
在这里插入图片描述

在启动服务端,hadoop102上线:
在这里插入图片描述

然后返回看客户端的监听,发现节点有变化,打印出所有节点 [hadoop102, hadoop101]
在这里插入图片描述

此时我们修改一下再此启动服务端让 hadoop103 上线:
在这里插入图片描述

返回客户端查看发现 hadoop102 下线,hadoop103 上线
在这里插入图片描述

三、分布式锁

1、什么是分布式锁?

比如说"进程1"在使用该资源的时候,会先去获得锁,"进程1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

在这里插入图片描述

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DistributedLock {

    private final String connectString = "centos102:2181,centos103:2181,centos104:2181";
    private final int sessionTimeout = 2000;
    ZooKeeper zk;

    private CountDownLatch connectLatch = new CountDownLatch(1);
    private CountDownLatch waitLatch = new CountDownLatch(1);

    // 前一个节点
    private String waitPath;
    // 当前节点
    String currentMode;

    public DistributedLock() throws IOException, InterruptedException, KeeperException {
        // 1. 获取连接
         zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // connectLatch,如果连接上zk,可以释放
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }

                // waitLatch,需要释放
                if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });

         // 等待zk正常连接后往下走
         connectLatch.await();

        // 2. 判断根节点/lock是否存在
        Stat stat = zk.exists("/locks", false);

        if (stat == null) {
            // 创建根节点(永久节点)
            zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

    }

    // 对 zk 加锁
    public void zkLock() {
        // 创建对应的临时带序号的节点
        try {
            currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            // 判断创建的节点是否是最小的序号节点,如果是,获取到锁;如果不是,监听前一个节点
            List<String> children = zk.getChildren("/locks", false);

            // 如果 children 只有一个节点,直接获取锁;如果有多个节点,需要判断,谁最小
            if (children.size() == 1) {
                return;
            }else {
                // 排序
                Collections.sort(children);

                // 获取节点名称seq00000001
                String thisNode = currentMode.substring("/locks/".length());
                // 通过seq00000001获取该节点在children当中的位置
                int index = children.indexOf(thisNode);

                if (index == -1) {
                    System.out.println("数据异常");
                }else if (index == 0) {
                    // 该节点为第一个,获取锁直接返回
                    return;
                }else {
                    // 不是第一个,监听前一个节点
                    waitPath = "/locks/" + children.get(index - 1);
                    zk.getData(waitPath, true, null);

                    // 等待监听结束
                    waitLatch.await();
                    return;
                }
            }

        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 解锁
    public void unZkLock() throws InterruptedException, KeeperException {
        // 删除节点
        zk.delete(currentMode, -1);
    }
}

测试

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

public class DistributedLockTest {

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        final DistributedLock lock1 = new DistributedLock();

        final DistributedLock lock2 = new DistributedLock();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.zkLock();
                    System.out.println("线程1启动,获取到锁");

                    Thread.sleep(5000);

                    lock1.unZkLock();
                    System.out.println("线程1释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.zkLock();
                    System.out.println("线程2启动,获取到锁");

                    Thread.sleep(5000);

                    lock2.unZkLock();
                    System.out.println("线程2释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

}

2、Curator 框架实现分布式锁

原生JAVA API出现的问题:
(1)会话是异步的,需要自己去连接
(2)Watch需要重复注册,不然就不能生效
(3)开发的复杂性还是比较高
(4)不支持多节点的删除和创建,需要自己去递归

Curator 是一个专门解决分布式锁的框架,解决了原生JAVA API开发分布式遇到的的问题
curator 官方文档:https://curator.apache.org/index.html

pom.xml 文件添加依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>4.3.0</version>
</dependency>
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLockTest {
    public static void main(String[] args) {
        // 创建分布式锁1
        InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

        // 创建分布式锁2
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.acquire();
                    System.out.println("线程1获取到锁");

                    lock1.acquire();
                    System.out.println("线程1获取到锁");

                    Thread.sleep(5 * 1000);

                    lock1.release();
                    System.out.println("线程1释放锁");

                    lock1.release();
                    System.out.println("线程1再次释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.acquire();
                    System.out.println("线程2获取到锁");

                    lock2.acquire();
                    System.out.println("线程2获取到锁");

                    Thread.sleep(5 * 1000);

                    lock2.release();
                    System.out.println("线程2释放锁");

                    lock2.release();
                    System.out.println("线程2再次释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

    }

    private static CuratorFramework getCuratorFramework() {

        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);

        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("centos102:2181,centos103:2181,centos104:2181")
                .connectionTimeoutMs(2000)
                .sessionTimeoutMs(2000)
                .retryPolicy(policy).build();

        // 启动客户端
        client.start();
        System.out.println("zookeeper 启动成功");

        return client;
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2227458.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【HTML】之基本标签的使用详解

HTML&#xff08;HyperText Markup Language&#xff0c;超文本标记语言&#xff09;是构建网页的基础。它不是一种编程语言&#xff0c;而是一种标记语言&#xff0c;用于描述网页的内容和结构。本文将带你了解HTML的基础知识&#xff0c;并通过详细的代码示例和中文注释进行讲…

【C++】哈希冲突的解决办法:闭散列 与 开散列

哈希冲突解决 上一篇博客提到了&#xff0c;哈希函数的优化可以减小哈希冲突发生的可能性&#xff0c;但无法完全避免。本文就来探讨一下解决哈希冲突的两种常见方法&#xff1a;闭散列和开散列 1.闭散列 闭散列也叫开放定址法&#xff0c;发生哈希冲突时&#xff0c;如果哈…

线程的理解及基本操作

目录 一、线程的理解 &#xff08;1&#xff09;什么是线程呢&#xff1f; &#xff08;2&#xff09;线程的优缺点及异常 二、线程的基本操作 &#xff08;1&#xff09;创建一个新的进程 &#xff08;2&#xff09;获取线程id &#xff08;3&#xff09;线程终止 &…

H3C OSPF配置

OSPF配置实验 实验拓扑图 实验需求 1.配置IP地址 2.分区域配置OSPF&#xff0c;实现全网互通 3.为了路由结构稳定&#xff0c;要求路由器使用环回口作为Router-id&#xff0c;ABR的环回口宣告进骨干区域 实验配置 1.配置IP地址 R1&#xff1a; <H3C>system-view …

apt的编译安装(古老通讯)

Ubuntu系统的防火墙关闭&#xff1a; ufw disable 第一步&#xff1a;Ubuntu 安装依赖环境 apt -y install libpcre3-dev zlib1g-dev libssl-dev build-essential 如果出现无法下载则在末尾处假如 --fix missing如下图所示 出现下图则为安装成功 第二步&#xff1a; useradd…

Vue.js(2) 入门指南:从基础知识到核心功能

我相信一万小时定律&#xff0c;不相信天上掉馅饼的灵感和坐等的成就。做一个自由而自律的人&#xff0c;势必靠决心认真地活着 文章目录 前言vue是什么?vue做什么?vue的核心功能安装vuevue初体验vue配置选项插值表达式指令vue阻止默认行为总结 前言 Vue.js 是一个用于构建用…

Spring 启动流程分析

Spring 的设计 Bean: Spring作为一个IoC容器&#xff0c;最重要的当然是Bean咯 BeanFactory: 生产与管理Bean的工厂 BeanDefinition: Bean的定义&#xff0c;也就是我们方案中的Class&#xff0c;Spring对它进行了封装 BeanDefinitionRegistry: 类似于Bean与BeanFactory的关…

智能名片小程序源码

智能名片小程序&#xff0c;是一款集在线介绍公司和个人名片、高效获取客户信息以及全面展示公司产品于一体的数字化工具。它通过数字化的方式&#xff0c;让名片信息的传递更加高效、便捷&#xff0c;极大地提升了商务交流的效率和效果。 在功能性方面&#xff0c;智能名片小…

LabVIEW汽车状态监测系统

LabVIEW汽车状态监测系统通过模拟车辆运行状态&#xff0c;有效地辅助工程师进行故障预测和维护计划优化&#xff0c;从而提高汽车的可靠性和安全性。 项目背景&#xff1a; 现代汽车工业面临着日益增长的安全要求和客户对于车辆性能的高期望。汽车状态监测系统旨在实时监控汽…

Golang的Web应用架构设计

# Golang的Web应用架构设计 介绍 是一种快速、高效、可靠的编程语言&#xff0c;它在Web应用开发中越来越受欢迎。Golang的Web应用架构设计通常包括前端、后端和数据库三个部分。在本篇文章中&#xff0c;我们将详细介绍Golang的Web应用架构设计及其组成部分。 前端 在Golang的…

SIP 业务举例之 三方通话:邀请第三方加入的信令流程

目录 1. 3-Way Conference - Third Party Is Added 简介 2. RFC5359 的 3-Way Conference - Third Party Is Added 信令流程 3. 3-Way Conference - Third Party Is Added 总结 博主wx:yuanlai45_csdn 博主qq:2777137742 想要 深入学习 5GC IMS 等通信知识(加入 51学通信)…

GNN+强化学习:双霸主强强联合,10种创新思路刷爆顶会!

图神经网络&#xff08;GNN&#xff09;强化学习&#xff08;RL&#xff09;&#xff0c;融合了GNN在图数据表示上的深度学习能力和RL在决策过程中的策略优化能力。这种结合为处理具有复杂图结构的数据问题提供了强大的工具。 GNN与强化学习的结合不仅推动了图机器学习的研究进…

R语言机器学习算法实战系列(十三)随机森林生存分析构建预后模型 (Random Survival Forest)

禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍教程加载R包案例数据数据预处理数据描述构建randomForestSRC模型评估模型C-indexBrier score特征重要性构建新的随机森林生存模型风险打分高低风险分组的生存分析时间依赖的ROC(Ti…

Http 状态码 301 Permanent Rediret 302 Temporary Redirect、 重定向 重写

HTTP状态码301和302是什么&#xff1f; 1、HTTP状态码301 HTTP状态码301表示永久性转移&#xff08;Permanent Redirect&#xff09;&#xff0c;这意味着请求的资源已经被分配了一个新的URI&#xff0c;以后的引用应该使用资源现在所指的URI。 HTTP 301状态码表示请求的资源…

Segugio:一款针对恶意软件的进程执行跟踪与安全分析工具

关于Segugio Segugio是一款功能强大的恶意软件安全分析工具&#xff0c;该工具允许我们轻松分析恶意软件执行的关键步骤&#xff0c;并对其进行跟踪分析和安全审计。 Segugio允许执行和跟踪恶意软件感染过程中的关键步骤&#xff0c;其中包括从点击第一阶段到提取恶意软件的最…

CSS.导入方式

1.内部样式 在head的style里面定义如 <style>p1{color: brown;}</style> 2.内联样式 直接在标签的里面定义如 <p2 style"color: blue;">这是用了内联样式&#xff0c;蓝色</p2><br> 3.外部样式表 在css文件夹里面构建一个css文件…

Java Lock CyclicBarrier 总结

前言 相关系列 《Java & Lock & 目录》&#xff08;持续更新&#xff09;《Java & Lock & CyclicBarrier & 源码》&#xff08;学习过程/多有漏误/仅作参考/不再更新&#xff09;《Java & Lock & CyclicBarrier & 总结》&#xff08;学习总结…

【现代C++】常量求值

现代C&#xff08;特别是C11及以后的版本&#xff09;增强了对编译时常量求值的支持&#xff0c;包括constexpr函数、constinit和consteval关键字。这些特性允许在编译时进行更多的计算&#xff0c;有助于优化运行时性能并确保编译时的数据不变性。 1. constexpr - 编译时常量…

[含文档+PPT+源码等]精品基于PHP实现的培训机构信息管理系统的设计与实现

基于PHP实现的培训机构信息管理系统的设计与实现背景&#xff0c;可以从以下几个方面进行阐述&#xff1a; 一、社会发展与教育需求 随着经济的不断发展和人口数量的增加&#xff0c;教育培训行业迎来了前所未有的发展机遇。家长对子女教育的重视程度日益提高&#xff0c;课外…

雷池社区版compose配置文件解析-mgt

在现代网络安全中&#xff0c;选择合适的 Web 应用防火墙至关重要。雷池&#xff08;SafeLine&#xff09;社区版免费切好用。为网站提供全面的保护&#xff0c;帮助网站抵御各种网络攻击。 compose.yml 文件是 Docker Compose 的核心文件&#xff0c;用于定义和管理多个 Dock…