zookeeper案例

news2025/1/12 3:56:33

目录

案例一:服务器动态上下线

服务端:

(1)先获取zookeeper连接

(2)注册服务器到zookeeper集群:

(3)业务逻辑(睡眠):

服务端代码如下:

客户端:

(1)获取zookeeper的连接:

(2)监听/servers下边的子节点的增减:

客户端代码如下:

案例二:ZooKeeper 分布式锁

分布式锁是什么?

锁的实现:

构造函数:

加锁函数:

解锁函数:

整体代码:

测试类代码 :

Curator 框架实现分布式锁案例:

实现步骤:

代码如下:


该案例主要也是客户端监听原理,客户端监听服务器的上下线情况

先在集群上创建/servers 节点(用于存储连接的服务器的主机和该服务器的节点数)相当于zookeeper集群

案例一:服务器动态上下线

服务端:

(1)先获取zookeeper连接

        创建类对象

该类为我们创建的服务端类:

        DistributeServer server = new DistributeServer();

        获取zookeeper连接:

自己创建连接方法:

    private void getconnect() throws IOException {
       zk = new ZooKeeper(connectstring, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
            }
        });

    }

 让后server对象在main函数中调用

(2)注册服务器到zookeeper集群:

注册是需要注册到zookeeper集群的/servers路径下,需要指定参数进行创建

private void regestServer(String hostname) throws InterruptedException, KeeperException {
zk.create(parentNode+"/"+hostname,hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    //  需要创建有序的临时节点所以-e(暂时) -s(有序)
        System.out.println("服务器"+hostname+"已注册连接");
    }

(3)业务逻辑(睡眠):

    private void business() throws InterruptedException {
    Thread.sleep(Long.MAX_VALUE);
    }

服务端代码如下:

package com.tangxiaocong.case1;
import org.apache.zookeeper.*;
import java.io.IOException;
/**
 * @Date 2023/8/10 19:06
 * @Author 
 */
public class DistributeServer {
   private static String connectstring="hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private static int sessionTimeout=2000;
    private ZooKeeper zk =null;
    private String parentNode = "/servers";
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        //获取zk连接
        //创建
        DistributeServer server = new DistributeServer();
        server.getconnect();
        //注册服务器到zk集群
        //注册是需要在/servers节点下创建所开启的服务器的路径
        server.regestServer(args[0]);
        //业务逻辑(实际是延时让它睡觉---不然会注册完成就关闭)
        server.business();
    }
    private void business() throws InterruptedException {
    Thread.sleep(Long.MAX_VALUE);
    }
    private void regestServer(String hostname) throws InterruptedException, KeeperException {
zk.create(parentNode+"/"+hostname,hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    //  需要创建有序的临时节点所以-e(暂时) -s(有序)
        System.out.println("服务器"+hostname+"已注册连接");
    }
    private void getconnect() throws IOException {
       zk = new ZooKeeper(connectstring, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
            }
        });
    }
}

客户端:

(1)获取zookeeper的连接:

        先创建客户端对象,在进行构建获取zookeeper连接的方法,本方法对process方法进行了重写,填写了再发生上下线的运行逻辑

 private void getConnect() throws IOException {
       zk= new ZooKeeper(connectString, sessionTimeout, new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            try {
                getServerList();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    });
    }

(2)监听/servers下边的子节点的增减:

        构建方法client.getServerList()来进行监听:

代码逻辑就是通过getChildren()方法获取指定目录下的所有子目录并开启监听

再进行遍历,把遍历结果封装到一个集合中,最后进行输出

 private void getServerList() throws InterruptedException, KeeperException {
        List<String> children = zk.getChildren("/servers", true);
       //该方法会获取指定路径下的所有子节点
        //true 会走初始化中的watch 也可以自己创建watch
        //把所有的服务器都封装到一个集合
        ArrayList<String> list = new ArrayList<>();
        for (String child : children) {
            byte[] data = zk.getData("/servers" +"/"+ child, false, null);
            //上边已经便利到一个服务器对象,再进行添加
            list.add(new String(data));
        }
        System.out.println(list);
    }

(3)业务逻辑同服务端不在赘述。

客户端代码如下:

package com.tangxiaocong.case1;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
 * @Date 2023/8/10 21:27
 * @Author 
 * 客户端的监听功能
 */
public class DistributeClient {
private String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";
   private int sessionTimeout=2000;
   private ZooKeeper zk=null;
   public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        //获取zk连接
        DistributeClient client = new DistributeClient();
        client.getConnect();
        //监听/servers下边的子节点的增减
        client.getServerList();
       //业务逻辑(睡眠)
       client.business();
    }
    private void business() throws InterruptedException {
   Thread.sleep(Long.MAX_VALUE);
   }
    private void getServerList() throws InterruptedException, KeeperException {
        List<String> children = zk.getChildren("/servers", true);
       //该方法会获取指定路径下的所有子节点
        //true 会走初始化中的watch 也可以自己创建watch
        //把所有的服务器都封装到一个集合
        ArrayList<String> list = new ArrayList<>();
        for (String child : children) {
            byte[] data = zk.getData("/servers" +"/"+ child, false, null);
            //上边已经便利到一个服务器对象,再进行添加
            list.add(new String(data));
        }
        System.out.println(list);
    }
    private void getConnect() throws IOException {
       zk= new ZooKeeper(connectString, sessionTimeout, new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            try {
                getServerList();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    });
    }
}

案例二:ZooKeeper 分布式锁

分布式锁是什么?

日常使用计算机的时候,我们的电脑不会只开一个进程,但是当“进程1”在访问某些资源的时候,不能被其他进程所访问,它就会去获得锁,把她所访问的资源进行锁上,对该资源进行独占。"进程 1"用完该资源以后就将锁释放掉,让其 他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的 访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

锁的实现:

构造函数:

在该类中首先要实现构造方法,构造方法与类名相同,在该方法中需要获取连接,重写process方法,在该方法中实现释放CountDownLatch的类对象,有两种情况,正常连接释放一种,不是正常连接状态,则释放另一种。在构造方法中还要判断是否存在“/locks”路径,存在则正常退出,不存在则创建该路径。

加锁函数:

使用ZooKeeper对象进行创建节点(临时有序),让后获取“/locks”路径下的所有节点序号,对结果进行判断,如果返回的List集合只有一个节点,则直接返回,默认加锁,不用再做监听工作。如果不是只有一个节点,则对List集合进行排序,再获取他的节点名称,通过indexOf函数来获取该名称节点的下标。如果为-1,则数据异常,为0 则为最小节点,则直接退出,进行加锁不需要设置监听,结果为其他则需要设置监听,先设置监听字符串,当状态不发生改变会一致阻塞,只有上锁节点让位后会调用process方法进行释放。

解锁函数:

解锁就是直接删除节点即可

整体代码:

package com.tangxiaocong.case2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
 * @Date 2023/8/12 19:56
 * @Author 
 */
public class DistributedLock {
 final    private String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";
    final  private int sessionTimeout=2000;
    final    private   ZooKeeper zk;
    private String waitPath;
    private String currentModu;
    //为了程序的健壮性,创建该对象   等待操作
    final   private CountDownLatch waitLach=new CountDownLatch(1);
    final   private CountDownLatch countDownLatch=new CountDownLatch(1);
  public DistributedLock() throws IOException, InterruptedException, KeeperException {
        //获取连接
      zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
            //  connectLatch  如果正常连接zk  可以释放
                if (watchedEvent.getState()==Event.KeeperState.SyncConnected){
                    countDownLatch.countDown();
                }
                //检测到删除节点并且是前一个节点则释放waitlatch
                if (watchedEvent.getType()==Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath))
                {
                waitLach.countDown();
                }
            }
        });
      //等待是否正常连接  正常(已)连接会释放  否则阻塞
      countDownLatch.await();
        // 判断是否存在lock锁
        Stat stat = zk.exists("/locks", false);
        if (stat==null)
        {
            //创建该节点
            String s = zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER);
        }
    }
    //对zk加锁
    public void zkLock()  {
        //创建临时的带序号的节点
        try {
            currentModu = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            List<String> children = zk.getChildren("/locks", false);
             //如果只有一个节点   则直接获取
            if(children.size()==1)
            {
                return;
            }
            else {
                //排序
                Collections.sort(children);
                //直接从s后边开始   开始的下标就是length的长度
                String substring = currentModu.substring("/locks/".length());
                //通过substring来获取在List集合中的下标位置
                int index = children.indexOf(substring);
                if (index==-1)
                {
                    System.out.println("数据异常");
                }
                else if (index==0)
                {
                    return;
                }
                else {
                    //  需要监听上一个节点
                    waitPath="/locks/"+children.get(index-1);
                    zk.getData(waitPath,true,new Stat());
                    //等待监听
                    waitLach.await();
                    return;
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //判断创建的节点是否是最小序号的节点 如果是则获取锁  不是则监听他的前一个节点
    }
    //对zk解锁
    public void unzkLock()
    {
//删除节点
        try {
            //-1  是版本号
            zk.delete(this.currentModu,-1);
        } catch (InterruptedException  | KeeperException e) {
            e.printStackTrace();
        }
    }
}

测试类代码 :

package com.tangxiaocong.case2;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
/**
 * @Date 2023/8/12 22:31
 * @Author 唐晓聪
 */
public class DistributedLockTest
{
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        //创建两个客户端对象
        final    DistributedLock lock1 = new DistributedLock();
        final   DistributedLock lock2 = new DistributedLock();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {  lock1.zkLock();
                System.out.println("线程1启动获得锁");
                    Thread.sleep(5*1000);
                    lock1.unzkLock();
                    System.out.println("线程1释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.zkLock();
                System.out.println("线程2启动获得锁");

                    Thread.sleep(5*1000);
                    lock2.unzkLock();
                    System.out.println("线程2释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

Curator 框架实现分布式锁案例:

该案例是直接使用API进行实现分布式锁

实现步骤:

创建分布式锁对象,new InterProcessMutex(),参数1为所要连接的客户端,参数2为监听路径

参数1传入的为getCuratorFramework()自定义函数,

该函数通过工厂类的方式进行建立连接,返回创建好的客户端,让后start启动客户端

创建完分布式锁对象后创建两个线程,在线程中进行获得锁,释放锁的操作。

代码如下:

package com.tangxiaocong.case3;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
 * @Date 2023/8/13 20:07
 * @Author 
 */
public class CuratorLockTest {
    public static void main(String[] args) {
        //创建分布式锁1
        //参数1   所连接的客户端 参数2 监听路径
        InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
        //创建分布式锁2
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
    //创建线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.acquire();
                    System.out.println("thread 1 acquire lock");
               lock1.acquire();
                    System.out.println("thread 1 again acquire lock");
                Thread.sleep(5*1000);
                lock1.release();
                    System.out.println("thread 1 relax lock");
                    lock1.release();
                    System.out.println("thread 1 again relax lock");
                    System.out.println();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.acquire();
                    System.out.println("thread 2 acquire lock");
                    lock2.acquire();
                    System.out.println("thread 2 again acquire lock");
                    Thread.sleep(5*1000);
                    lock2.release();
                    System.out.println("thread 2 relax lock");
                    lock2.release();
                    System.out.println("thread 2 again relax lock");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
    private static CuratorFramework getCuratorFramework() {
        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
        //通过工厂类的方式进行建立连接
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop102:2181,hadoop104:2181")
                .connectionTimeoutMs(2000)
                .sessionTimeoutMs(2000)
                .retryPolicy(policy)//连接失败后  间隔多少秒下次间隔
                .build();
        client.start();
        System.out.println("zookeeper  success start  !!!!!");
        return client;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/872601.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

提高生产力 | Apifox 数据结构验证最佳实践

目录 实践场景 定义返回响应 场景数据准备 校验响应数据 总结 在设计接口的过程中&#xff0c;响应数据需要和返回响应规范一一对应。这样能够确保接口的一致性和可靠性&#xff0c;并且方便接口的使用和维护&#xff0c;即使在后续迭代过程中出现问题&#xff0c;开发人员…

zabbix监控安装部署

目录 一、环境 二、配置 1.配置yum源&#xff0c;这里用的清华的 2.过滤一下安装包&#xff0c;查看依赖包 安装依赖包 3.配置数据库 开机自启 创建数据库 创建用户 授权 导入数据到数据库 查看zabbix数据库有没有表和数据 4.修改zabbix配置文件 1.修改zabbix配置…

【Java】常见面试题:多线程

文章目录 1. 谈谈进程和线程之间的区别【高频】2. java中有哪些方式来创建线程&#xff1f;3. run和start的区别【经典面试题】4. Java线程的状态5. 【线程不安全的原因】6. 就以count为例&#xff1a;一个线程加锁、一个线程不加锁&#xff0c;此时能否保证线程的安全呢&#…

client-go实战之十二:选主(leader-election)

欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码)&#xff1a;https://github.com/zq2599/blog_demos 本篇概览 本文是《client-go实战》系列的第十二篇&#xff0c;又有一个精彩的知识点在本章呈现&#xff1a;选主(leader-election)在解释什么是选主之前&…

Keepalived源码安装

文章目录 Keepalived源码安装安装准备缺少OpenSSL解决方法 Keepalived 源码安装 安装准备 tar zxf keepalived-2.2.8.tar.gz /root/ ll drwxrwxr-x. 10 1000 1000 4096 Aug 9 18:29 keepalived-2.2.8 #进入目录执行以下命令查看帮助 ./configure --help #重要编译参数 -…

QT学习笔记-oracle oci数据库驱动交叉编译并移植到ARM开发板

QT学习笔记-oracle oci数据库驱动交叉编译并移植到RK3568ARM开发板 0、背景1、搭建交叉编译环境2、交叉编译过程3、把数据库驱动部署到目标系统中 0、背景 在上一文《QT学习笔记-QT安装oracle oci驱动》中介绍了在Windows环境下使用QT访问oracle数据库时遇到驱动无法加载问题的…

kingbase:数据库启动状态

1 启停KingbaseES数据库 Linux下通过系统服务&#xff1a; root用户执行&#xff1a; service kingbase8d stop/start/restart ——注册服务的情况下 Linux下通过安装用户&#xff1a; 安装用户执行&#xff1a; sys_ctl stop/start/restart -D data路径 2 查看数据库当…

UI自动化环境的搭建(python+pycharm+selenium+chrome)

最近在做一些UI自动化的项目&#xff0c;为此从环境搭建来从0到1&#xff0c;希望能够帮助到你&#xff0c;同时也是自我的梳理。将按照如下进行开展&#xff1a; 1、python的下载、安装&#xff0c;python环境变量的配置。 2、pycharm开发工具的下载安装。 3、selenium的安装。…

【Java】一只小菜坤的编程题之旅【3】

文章目录 1丶判定是否互为字符重排2、杨辉三角3丶某公司的1个面试题&#xff08;字符串包含问题&#xff09; 1丶判定是否互为字符重排 这个题我们用一个非常简单的思想就能实现&#xff0c;我们先将字符串转换为字符数组&#xff0c;然后对字符数组进行排序&#xff0c;然后再…

Codeforces Round 893 (Div. 2)ABC

Codeforces Round 892 (Div. 2) 目录 A. United We Stand题目大意思路代码 B. Olya and Game with Arrays题目大意思路代码 C. Another Permutation Problem题目大意思路代码 A. United We Stand 题目大意 给你一个数组&#xff0c;把这个数组分成两个数组a和b&#xff0c;使…

03_013内存分配api以及页表详解

前言 之前文章中物理ram中的最小单位一直用页来表示 这次又描述的详细了点 物理ram的最小单位 有的地方叫 块,框,页帧 在虚拟空间中最小单位也叫页 需要好好区分 不过后来想想管你虚拟页还是物理ram页 都存在物理ram上 都能想成一 一对应的关系 所以大家都叫页好像也行 内存分…

【Unity3D】Shader Graph节点

1 前言 Shader Graph 16.0.3 中有 208 个 Node&#xff08;节点&#xff09;&#xff0c;本文梳理了 Shader Graph 中大部分 Node 的释义&#xff0c;官方介绍详见→Node-Library。 Shader Graph 通过图像的形式表达了顶点变换和片元着色流程&#xff0c;其背后都是一些列的数学…

保持城市天际线(力扣)贪心 JAVA

给你一座由 n x n 个街区组成的城市&#xff0c;每个街区都包含一座立方体建筑。给你一个下标从 0 开始的 n x n 整数矩阵 grid &#xff0c;其中 grid[r][c] 表示坐落于 r 行 c 列的建筑物的 高度 。 城市的 天际线 是从远处观察城市时&#xff0c;所有建筑物形成的外部轮廓。…

【系统架构】分布式系统架构设计

1 分布式系统是什么 分布式系统是指由多个计算机节点组成的一个系统&#xff0c;这些节点通过网络互相连接&#xff0c;并协同工作完成某个任务。 与单个计算机相比&#xff0c;分布式系统具有更高的可扩展性、可靠性和性能等优势&#xff0c;因此广泛应用于大规模数据处理、高…

大数据-玩转数据-Flink 自定义Sink(Mysql)

一、说明 如果Flink没有提供给我们可以直接使用的连接器&#xff0c;那我们如果想将数据存储到我们自己的存储设备中&#xff0c;mysql 的安装使用请参考 mysql-玩转数据-centos7下mysql的安装 创建表 CREATE TABLE sensor (id int(10) ) ENGINEInnoDB DEFAULT CHARSETutf8二…

使用日志来监控应用

根据提取规则运行的位置可以分为两类做法&#xff0c;一个是在中心端&#xff0c;一个是在日志端。 中心端就是把要处理的所有机器的日志都统一传到中心&#xff0c;比如通过 Kafka 传输&#xff0c;最终落到 Elasticsearch&#xff0c;指标提取规则可以作为流计算任务插到 Ka…

3.解构赋值

解构赋值是一种快速为变量赋值的简洁语法&#xff0c;本质上仍然是为变量赋值。 3.1数组解构 数组解构是 将数组的单元值快速批量赋值给一系列变量 的简洁语法 1.基本语法: &#xff08;1&#xff09;赋值运算符左侧的[ ]用于批量声明变量&#xff0c;右侧数组的单元值将被赋…

免费开源的多种人工智能项目,比如:训练一个模型,让人工智能玩王者荣耀

免费开源的多种人工智能项目&#xff0c;比如&#xff1a;训练一个模型&#xff0c;让人工智能玩王者荣耀。 全文大纲 PULSE - 该开源项目可以通过给图片增加像素点来实现去马赛克或高清化。 Depix - 给打了马赛克的文字去码。 TecoGAN - 给视频去马赛克或者进行超分辨率。 Sk…

python -- 函数闭包

1. LEGB规则 L: local 是局部作用域 E: Enclosed 是嵌套函数的外层函数作用域 G: Global 全局作用域 B:Build-In 内置作用域 变量的使用权重&#xff1a;局部变量 > 外层作用域变量 > 全局变量 > 内置变量 下面代码执行后&#xff0c;x变量的值分别为多少&#xff1…

【JavaEE基础学习打卡03】Java EE 平台有哪些内容?

目录 前言一、Java EE平台说明二、Java EE平台容器及组件1.平台容器2.平台组件 三、JavaEE平台API服务1.API服务概览2.平台API 总结 前言 &#x1f4dc; 本系列教程适用于Java Web初学者、爱好者&#xff0c;小白白。我们的天赋并不高&#xff0c;可贵在努力&#xff0c;坚持不…