Zookeeper-JavaApI操作

news2025/1/13 19:54:07

JavaApI操作

  • JavaApI操作
    • 1) Curator 介绍
    • 2) Curator API 常用操作
      • a) 建立连接与CRUD基本操作
      • b) Watch事件监听
      • c) 分布式锁
        • c.1) 介绍
        • c.2) Zookeeper分布式锁原理
        • c.3) 案例:模拟12306售票

JavaApI操作

1) Curator 介绍

Curator 是 Apache ZooKeeper 的Java客户端库。

常见的ZooKeeper Java API :

  • 原生Java API
  • ZkClient
  • Curator

Curator 项目的目标是简化 ZooKeeper 客户端的使用。

Curator 最初是 Netfix 研发的,后来捐献了 Apache 基金会,目前是 Apache 的顶级项目。

官网:http://curator.apache.org/

2) Curator API 常用操作

a) 建立连接与CRUD基本操作

Curator API 常用操作:

  • 建立连接
    • 第一种方法:CuratorFrameworkFactory.newClient
    • 第二种方法:CuratorFrameworkFactory.builder() 推荐
/**
* 建立连接
*/
@Test
public void testConnect() {
    /**
     * connectString – 连接字符串 zk server 地址和端口 "192.168.200.130:2181"
     * sessionTimeoutMs – 会话超时时间 单位ms
     * connectionTimeoutMs – 连接超时时间 单位ms
     * retryPolicy – 重试策略
     */
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
    // 1.第一种方式
    CuratorFramework client1 = CuratorFrameworkFactory.newClient("192.168.200.130:2181", 60 * 1000, 15 * 1000, retryPolicy);
    client1.start();

    // 2.第二种方式
    CuratorFramework client2 = CuratorFrameworkFactory.builder()
        .connectString("192.168.200.130:2181")
        .sessionTimeoutMs(60 * 1000)
        .connectionTimeoutMs(15 * 1000)
        .retryPolicy(retryPolicy)
        .namespace("dcy") // 名称空间
        .build();
    client2.start();
}
  • 添加节点
    • 如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
    • 默认类型:持久化
    • creatingParentsIfNeeded 如果父节点不存在,则创建父节点
/**
* 创建节点: create 持久 临时 顺序 数据
*/
@Test
public void testCreate1() throws Exception {
    // 1.基本创建
    // 如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
    String rs = client2.create().forPath("/app1");
    System.out.println("rs = " + rs);
}

@Test
public void testCreate2() throws Exception {
    // 2.创建节点 带有数据
    String rs = client2.create().forPath("/app2", "hehe".getBytes());
    System.out.println("rs = " + rs);
}

@Test
public void testCreate3() throws Exception {
    // 3.设置节点的类型
    // 默认类型:持久化
    String rs = client2.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
    System.out.println("rs = " + rs);
}

@Test
public void testCreate4() throws Exception {
    // 4.创建多级节点
    // creatingParentsIfNeeded 如果父节点不存在,则创建父节点
    String rs = client2.create().creatingParentsIfNeeded().forPath("/app4/p1");
    System.out.println("rs = " + rs);
}
  • 删除节点
    • 1.删除单个节点 delete().forPath()
    • 2.删除带有子节点的节点 delete().deletingChildrenIfNeeded().forPath()
    • 3.必须成功的删除(为了防止网络抖动,本质是重试) delete().guaranteed().forPath()
    • 4.回调 inBackground
/**
* 删除节点: delete, deleteall
*/
@Test
public void testDelete1() throws Exception {
    // 1.删除单个节点
    client2.delete().forPath("/app1");
}

@Test
public void testDelete2() throws Exception {
    // 2.删除带有子节点的节点
    client2.delete().deletingChildrenIfNeeded().forPath("/app4");
}

@Test
public void testDelete3() throws Exception {
    // 3.必须成功的删除 (可能会因为网络抖动等原因,操作超时)
    client2.delete().guaranteed().forPath("/app2");
}

@Test
public void testDelete4() throws Exception {
    // 4.回调
    client2.delete().guaranteed().inBackground(new BackgroundCallback() {
        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
            System.out.println("我被删除了~");
            System.out.println(event);
        }
    }).forPath("/app1");
}
  • 修改节点
    • 1.修改数据 setData().forPath()
    • 2.根据版本修改 setData().withVersion().forPath()
      • version 是通过查询出来的,目的是为了让其他客户端和线程不干扰我
/**
* 修改数据
*/
@Test
public void testSet() throws Exception {
    client2.setData().forPath("/app1", "dcy".getBytes());
}

@Test
public void testSetForVersion() throws Exception {
    Stat stat = new Stat();
    client2.getData().storingStatIn(stat).forPath("/app1");

    int version = stat.getVersion(); // 查询出来
    client2.setData().withVersion(version).forPath("/app1", "haha".getBytes());
}
  • 查询节点
    • 1.查询数据:get: getData().forPath()
    • 2.查询子节点:ls: getChildren().forPath()
    • 3.查询节点状态信息:ls -s: getData().storingStatIn(状态对象).forPath()
/**
* 查询节点:
*  1.查询数据:get: getData().forPath()
*  2.查询子节点:ls: getChildren().forPath()
*  3.查询节点状态信息:ls -s: getData().storingStatIn(状态对象).forPath()
*/
@Test
public void testGet1() throws Exception {
    // 1.查询数据:get
    byte[] bytes = client2.getData().forPath("/app1");
    System.out.println(new String(bytes));
}

@Test
public void testGet2() throws Exception {
    // 2.查询子节点:ls
    List<String> path = client2.getChildren().forPath("/");
    System.out.println(path);
}

@Test
public void testGet3() throws Exception {
    // 3.查询节点状态信息:ls -s
    Stat status = new Stat();
    client2.getData().storingStatIn(status).forPath("/app1");
    System.out.println("status = " + status);
}

b) Watch事件监听

ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。

ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。

ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便 需要开发人员自己反复注册Watcher,比较繁琐。

Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。

ZooKeeper提供了三种Watcher:

  • NodeCache : 只是监听某一个特定的节点
  • PathChildrenCache : 监控一个ZNode的子节点.
  • TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合

1.NodeCache : 监听一个特定的节点

/**
* 演示NodeCache
*/
@Test
public void testNodeCache() throws Exception {
    // 1.创建NodeCache监听对象
    final NodeCache nodeCache = new NodeCache(client, "/app1");
    // 2.注册监听
    nodeCache.getListenable().addListener(new NodeCacheListener() {
        @Override
        public void nodeChanged() throws Exception {
            System.out.println("节点变化~~~");

            // 获取修改节点后的数据
            byte[] data = nodeCache.getCurrentData().getData();
            System.out.println(new String(data));
        }
    });
    // 3.开启监听,如果设置true,则开启监听是,加载缓存数据
    nodeCache.start(true);

    while (true) {
    }
}

2.PathChildrenCache:监听某个节点的所有子节点

/**
* 演示PathChildrenCache:监听某个节点的所有子节点
*/
@Test
public void testPathChildrenCache () throws Exception {
    // 1.创建监听对象
    PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);

    // 2.绑定监听器
    pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
        @Override
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            System.out.println("子节点变化~~~");
            System.out.println("event = " + event);
            // 监听子节点的数据变更,并且拿到变更后的数据
            // 1.获取类型
            PathChildrenCacheEvent.Type type = event.getType();
            // 2.判断类型是否是update
            if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                byte[] data = event.getData().getData();
                System.out.println(new String(data));
            }
        }
    });

    // 3.开启监听
    pathChildrenCache.start();

    while (true) {
    }
}

3.TreeCache:监听某个节点自己和所有的子节点

/**
* 演示TreeCache:监听某个节点自己和所有的子节点
*/
@Test
public void testTreeCache () throws Exception {
    // 1.创建监听器
    TreeCache treeCache = new TreeCache(client, "/app2");

    // 2.注册监听
    treeCache.getListenable().addListener(new TreeCacheListener() {
        @Override
        public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
            System.out.println("节点变化了");
            System.out.println("event = " + event);
        }
    });

    // 3.开启监听
    treeCache.start();

    while (true) {
    }
}

c) 分布式锁

c.1) 介绍

在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。

但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。

在这里插入图片描述

那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题——这就是分布式锁。

在这里插入图片描述

c.2) Zookeeper分布式锁原理

核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除节点

  • 1.客户端获取锁时,在lock节点下创建临时顺序节点
  • 2.然后获取lock下面的所有子节点,客户端获取到所有的子节点后。如果发现自己创建的节点顺序最小,那就认为该客户端获取到了锁。使用完锁后,将该节点删除
  • 3.如果发现自己创建的节点并发lock所有子节点中最小的,说明自己还没有获取到锁。此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件
  • 4.如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点 是否是lock子节点中序号最小的。如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点,并注册监听。
c.3) 案例:模拟12306售票

Curator实现分布式锁API

在Curator中有五种锁方案:

  • InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
  • InterProcessMutex:分布式可重入排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器
  • InterProcessSemaphoreV2:共享信号量

在这里插入图片描述

// 测试类
public class LockTest {
    public static void main(String[] args) {
        Ticket12306 ticket12306 = new Ticket12306();
        // 创建客户端
        Thread t1 = new Thread(ticket12306, "携程");
        Thread t2 = new Thread(ticket12306, "飞猪");

        t1.start();
        t2.start();
    }
}

// Ticket12306
public class Ticket12306 implements Runnable{

    private int tickets = 10; // 数据库的票数

    private InterProcessMutex lock; // 创建锁

    public Ticket12306() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.200.130:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .build();
        client.start();

        this.lock = new InterProcessMutex(client, "/lock");
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 获取锁
                lock.acquire(3, TimeUnit.SECONDS);
                if (tickets > 0) {
                    System.out.println(Thread.currentThread().getName() + ":" + tickets);
                    tickets--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // 释放锁
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1036879.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

树结构的讲解与二叉树的基本运用

目录&#xff1a; 一&#xff0c;树的基本知识 二&#xff0c;树的类型 三&#xff0c;树的存储 四&#xff0c;树的基本运算 五&#xff0c;二叉树堆的基本运用 一&#xff0c;树的基本知识 树是一种非线性的数据结构&#xff0c;它是由n个有限结点组合而成为一个具有层次…

【1++的Linux】之进程(三)

&#x1f44d;作者主页&#xff1a;进击的1 &#x1f929; 专栏链接&#xff1a;【1的Linux】 文章目录 一&#xff0c;什么是进程地址空间&#xff1f;二&#xff0c;进程地址空间是怎么设计的&#xff1f;三&#xff0c;为什么要有进程地址空间&#xff1f; 一&#xff0c;什…

【C++杂货铺】一颗具有搜索功能的二叉树

文章目录 一、二叉搜索树概念二、二叉搜索树的操作2.1 二叉搜索树的查找2.2 二叉搜索树的插入2.3 二叉搜索树的删除 三、二叉搜索树的实现3.1 BinarySearchTreeNode&#xff08;结点类&#xff09;3.2 BinarySearchTree&#xff08;二叉搜索树类&#xff09;3.2.1 框架3.2.2 in…

【力扣485】最大连续 1 的个数

&#x1f451;专栏内容&#xff1a;力扣刷题⛪个人主页&#xff1a;子夜的星的主页&#x1f495;座右铭&#xff1a;前路未远&#xff0c;步履不停 目录 一、题目描述二、题目分析1、最值模拟2、双指针 一、题目描述 题目链接&#xff1a;最大连续 1 的个数 给定一个二进制数…

辨析常见的医学数据分析(相关性分析回归分析)

目录 1 常见的三种分类结果&#xff1f; 2 什么是相关性分析&#xff1f; 相关性分析的结果怎么看&#xff1f; 3 什么是回归分析&#xff1f; 1&#xff09;前提 2&#xff09;常见的回归模型 4 对于存在对照组实验的医学病例如何分析&#xff1f; 1&#xff09;卡方检验…

万字解析30张图带你领略glibc内存管理精髓

最近在逛知乎的时候&#xff0c;看到篇帖子&#xff0c;如下&#xff1a; 看了下面所有的回答&#xff0c;要么是没有回答到点上&#xff0c;要么是回答不够深入&#xff0c;所以&#xff0c;借助本文&#xff0c;深入讲解C/C内存管理。 1 写在前面 源码分析本身就很枯燥乏味…

服务注册发现_解读Eureka注册中心UI界面

参数&#xff1a; Environment: 环境&#xff0c;默认为test&#xff0c;该参数在实际使用过程中&#xff0c;可以不用更改Data center&#xff1a; 数据中心&#xff0c;使用的是默认的是 “MyOwn”Current time&#xff1a;当前的系统时间Uptime&#xff1a;已经运行了多少时…

JavaScript系列从入门到精通系列第六篇:JavaScrip当中的运算符,主要涉及JavaScript当中的六大数据类型的四则运算

文章目录 前言 一&#xff1a;算数运算符 1&#xff1a;Number类型的四则运算 2&#xff1a;其他数据类型的四则运算 (一)&#xff1a;加法运算 (二)&#xff1a;减法运算 3&#xff1a;乘法运算 4&#xff1a;除法运算 5&#xff1a;取模运算 前言 运算符也叫操作符。…

极大似然函数和似然函数的区别

极大似然函数和似然函数 "极大似然函数"和"似然函数"是统计学和机器学习中常见的两个概念&#xff0c;它们之间的区别在于它们在不同上下文中的使用方式&#xff1a; 似然函数&#xff08;Likelihood Function&#xff09;&#xff1a; 似然函数通常表示为…

[pai-diffusion]pai的easynlp的diffusion模型训练

PAI-Diffusion模型来了&#xff01;阿里云机器学习团队带您徜徉中文艺术海洋 - 知乎作者&#xff1a;汪诚愚、段忠杰、朱祥茹、黄俊导读近年来&#xff0c;随着海量多模态数据在互联网的爆炸性增长和训练深度学习大模型的算力大幅提升&#xff0c;AI生成内容&#xff08;AI Gen…

基于微信小程序快递取件上门预约服务系统设计与实现(开题报告+任务书+源码+lw+ppt +部署文档+讲解)

文章目录 前言运行环境说明用户的主要功能有&#xff1a;管理员的主要功能有&#xff1a;具体实现截图详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09;有保障的售后福利 代码参考论文参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌…

电子电子架构——AUTOSAR信息安全机制有哪些(下)

电子电子架构——AUTOSAR信息安全机制有哪些&#xff08;下&#xff09; 我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的工程师。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&#xff1a; 人们会在生活中不断攻击你。他们的主要…

使用FastChat部署Baichuan2

1. 引言 近来&#xff0c;大型语言模型的市场需求呈现出蓬勃发展的态势。然而&#xff0c;仅仅掌握模型的数据准备和训练是不够的&#xff0c;模型的部署方法也变得至关重要。在这篇文章中&#xff0c;我们将以Baichuan2为例&#xff0c;利用FastChat进行模型部署的实战操作。…

IDEA 中 Maven 报错 Cannot resolve xxx【终于解决了】

问题 pom中已经添加相关依赖&#xff0c;maven刷新也没有用&#xff0c;依旧是疯狂报错 解决办法 不断的查询资料&#xff0c;总结一下试过的办法。 解决办法一&#xff1a;清除缓存 File -> Invalidate Caches/Restart -> Invalidate And Restart 试了之后也就报错 …

C# EPPlus 访问 Excel表格

EPPlus是什么&#xff1f; 一个访问Excel表格的库&#xff0c;调用相当简单 怎么访问&#xff1f; 表格可以简单理解成一个二维数组我希望访问表格像二维数组一样简单我希望消耗不算太大 封装一个类 下载DLL以及这个文件&#xff1a;《下载传送门->》 注意需要导入EP…

uniapp iOS离线打包——上传到App Store

uniapp iOS离线打包&#xff0c;如何打包上传到App Store&#xff1f; 文章目录 uniapp iOS离线打包&#xff0c;如何打包上传到App Store&#xff1f;打包上传 App Store App iOS 离线打包 上一篇分享部分工程配置 打包上传 App Store 选中项目工程&#xff1a;点击 工具栏 P…

虚幻4学习笔记(14)界面切换、局域网联机

虚幻4学习笔记 创建游戏加入游戏搜索服务器加入服务器刷新服务器 B站UP谌嘉诚课程&#xff1a;https://www.bilibili.com/video/BV164411Y732 创建游戏 新建三个UI界面 FindServer、JoinServer、MainMenu 打开MainMenu 打开FindServer 添加Scroll Box滚动框 添加Circular T…

【计算机网络】——应用层

// 图片取自王道 仅做交流学习 一、基本概念 应用层概述 协议是 网络层次模型 中多台主机之间 同层之间进行通信的规则。是一个水平概念 垂直空间上&#xff0c;向下屏蔽下层细节&#xff0c;向上提供服务接入&#xff0c;多台主机之间同层之间形成一条逻辑信道。 应用层的…

关于Pandas数据分析

pandas的数据加载与预处理 数据清洗&#xff1a;洗掉脏数据 整理分析&#xff1a;字不如表 数据展现&#xff1a;表不如图 环境搭建 pythonjupyter anaconda Jupyter Notebook Jupyter Notebook可以在网页页面中直接编写代码和运行代码, 代码的运行结果也会直接在代码块下显示…

zabbix学习2--zabbix6.x高可用

文章目录 1. server高可用-默认HA2. 访问高可用 1. server高可用-默认HA 1.部署zabbix单节点后&#xff0c;配置添加HANodeName和NodeAddress即为HA架构 2.zabbix1故障后切换zabbix2使用 3.浏览器访问主机1&#xff0c;使用主机1php前端连接mysql后zabbix2提供后台服务--------…