Zookeeper学习---3、服务器动态上下线监听案例、ZooKeeper 分布式锁案例、企业面试真题

news2025/1/11 8:08:49

1、服务器动态上下线监听案例

1、需求
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
2、需求分析
在这里插入图片描述
3、具体实现
(1)先在集群上创建/servers 节点
在这里插入图片描述
(2)在IDEA中创建包名:org.example.zkcase1
(3)服务端向Zookeeper注册代码

package org.example.zkcase1;

import org.apache.zookeeper.*;

import java.io.IOException;

/**
 * @ClassName DistributeServer
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/5/25 9:12
 * @Version 1.0
 */
public class DistributeServer {

    private String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private int sessionTimeout=2000;

    ZooKeeper zk=null;
    private String parentNode="/servers";

    //创建到zk的客户端连接
    public void getConnect() throws IOException {

        zk=new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }

    //注册服务器
    public void registServer(String hostname) throws InterruptedException, KeeperException {
        String create = zk.create(parentNode + "/servers", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname+"is online"+create);
    }

    //业务功能
    public void business(String hostname) throws InterruptedException {
        System.out.println(hostname+" is working...");
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        //1、获取zk连接
        DistributeServer server=new DistributeServer();
        server.getConnect();

        //2、利用zk连接注册服务器信息
        server.registServer(args[0]);

        //3、启动业务功能
        server.business(args[0]);
    }
}


(4)客户端代码

package org.example.zkcase1;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * @ClassName DistributeClient
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/5/25 9:23
 * @Version 1.0
 */
public class DistributeClient {

    private String connectionString="hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private int sessionTimeout=2000;
    private ZooKeeper zk;

    private String parentNode="/servers";

    //创建到zk的客户端连接
    public void getConnection() throws IOException {
        zk=new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

                //再次启动监听
                try {
                    getServerList();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } catch (KeeperException e) {
                    throw new RuntimeException(e);
                }

            }
        });
    }

    //获取服务器列表信息
    public void getServerList() throws InterruptedException, KeeperException {
        //1、获取服务器子节点信息,并且对父节点进行监听
        List<String> children = zk.getChildren(parentNode, true);

        //2、储存服务器信息列表
        ArrayList<String> servers=new ArrayList<>();

        //3、遍历所有节点,获取节点中的主机名称信息
        for (String child : children) {
            byte[] data = zk.getData(parentNode + "/" + child, false, null);
            servers.add(new String(data));
        }

        //4、打印服务器列表信息
        System.out.println(servers);
    }

    //业务功能
    public void business() throws InterruptedException {
        System.out.println("client is working...");
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        //1、获取zk连接
        DistributeClient client=new DistributeClient();
        client.getConnection();

        //2、获取servers的子节点信息,从中获取服务器信息列表
        client.getServerList();

        //3、业务进程启动
        client.business();
    }
}


4、测试
(1)在Linux命令行上操作增加减少服务器
(a)启动 DistributeClient 客户端
(b)在 hadoop102 上 zk 的客户端/servers 目录上创建临时带序号节点
在这里插入图片描述
(c)观察IDEA控制台变化
在这里插入图片描述
(d)执行删除操作
在这里插入图片描述
(e)观察IDEA控制台变化
在这里插入图片描述
(2)在IDEA上操作增加和减少服务器
(a)启动 DistributeClient 客户端(如果已经启动过,不需要重启)
(b)启动 DistributeServer 服务
①点击 Edit Configurations…
在这里插入图片描述
②在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop102
在这里插入图片描述
③回到 DistributeServer 的 main 方 法 , 右 键 , 在 弹 出 的 窗 口 中 点 击 Run“DistributeServer.main()
在这里插入图片描述
④观察 DistributeServer 控制台,提示 hadoop102 is working
⑤观察 DistributeClient 控制台,提示 hadoop102 已经上线

2、ZooKeeper 分布式锁案例

什么加做分布式锁呢?
比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

2.1 原生 Zookeeper 实现分布式锁案例

1、分布式锁实现

package org.example.lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @ClassName DistributedLock
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/5/25 14:51
 * @Version 1.0
 */
public class DistributedLock {
    //Zookeeper server列表
    private String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";

    //超时时间
    private int sessionTimeout=2000;

    private ZooKeeper zk;

    private String rootNode="locks";
    private String subNode="seq-";

    //当前client等待的子节点
    private String waitPath;

    //Zookeeper连接
    private CountDownLatch connectLatch=new CountDownLatch(1);

    //Zookeeper节点等待
    private CountDownLatch waitLatch=new CountDownLatch(1);

    //当前client创建的子节点
    private String currentNode;

    //和zk服务建立连接,并创建根节点
    public DistributedLock() throws IOException, InterruptedException, KeeperException {
        zk=new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //连接建立时,打开latch,唤醒wait在该latch上的线程
                if (watchedEvent.getState()==Event.KeeperState.SyncConnected){
                    try {
                        connectLatch.countDown();
                    } catch (Exception e){
                        e.printStackTrace();
                    }
                }

                //发生了waitPath的删除事件
                if (watchedEvent.getType()==Event.EventType.NodeDeleted&&watchedEvent.getPath().equals(waitPath)){
                    waitLatch.countDown();
                }
            }
        });

        //等待连接建立
        connectLatch.await();

        //获取根节点状态
        Stat stat = zk.exists("/" + rootNode, false);

        if (stat==null){
            System.out.println("根节点不存在");
            zk.create("/"+rootNode,new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
        }
    }

    //加锁方法
    public void zkLock(){
        try {
            //在根节点下创建临时顺序节点,返回值为创建的节点路径
            currentNode=zk.create("/"+rootNode+"/"+subNode,null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);

            //wait一会儿,让结果更加清晰一些
            Thread.sleep(10);

            //注意,没有必要监听"/locks"的子节点的变化情况
            List<String> childrenNodes = zk.getChildren("/" + rootNode, false);

            //列表中只有一个子节点,那肯定就是currentNode,说明client获得锁
            if (childrenNodes.size()==1){
                return;
            }
            else {
                //对根节点下的所有临时顺序节点从小到大排序
                Collections.sort(childrenNodes);

                //当前节点名称
                String thisNode = currentNode.substring(("/" + rootNode + "/").length());
                int index = childrenNodes.indexOf(thisNode);

                if (index==-1){
                    System.out.println("数据异常");
                } else if (index==0) {
                    //index==0,说明thisNode在列表中最小,当前client获得锁
                    return;
                }else {
                    //获得排名比current前一位的节点
                    this.waitPath="/"+rootNode+"/"+childrenNodes.get(index-1);

                    //在waitPath上注册监听器,当waitPath被删除时,Zookeeper会回调监听器的process方法
                    zk.getData(waitPath,true,new Stat());

                    //进入等待锁状态
                    waitLatch.await();

                    return;
                }
            }

        } catch (KeeperException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

    }

    //解锁方法
    public void zkUnlock(){
        try {
            zk.delete(this.currentNode,-1);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        }
    }
}



2、分布式锁测试

package org.example.lock;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

/**
 * @ClassName DistributedLockTest
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/5/25 15:27
 * @Version 1.0
 */
public class DistributedLockTest {
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        //创建分布式锁
        final DistributedLock lock1=new DistributedLock();
        final DistributedLock lock2=new DistributedLock();

        new Thread(new Runnable() {
            @Override
            public void run() {
                //获取锁对象

                try {
                    lock1.zkLock();
                    System.out.println("线程1获取锁");
                    Thread.sleep(5*1000);


                    lock1.zkUnlock();
                    System.out.println("线程1释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                //获取锁对象

                try {
                    lock2.zkLock();
                    System.out.println("线程2获取锁");
                    Thread.sleep(5*1000);


                    lock2.zkUnlock();
                    System.out.println("线程2释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}


(2)观察控制台变化
在这里插入图片描述

2.2 Curator框架实现分布式锁案例

1、原生的 Java API 开发存在的问题
(1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
(2)Watch 需要重复注册,不然就不能生效
(3)开发的复杂性还是比较高的
(4)不支持多节点删除和创建。需要自己去递归
2、Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。
3、Curator 案例实操
(1)添加依赖

<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-framework</artifactId>
 <version>4.3.0</version>
</dependency>
<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-recipes</artifactId>
 <version>4.3.0</version>
</dependency>
<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-client</artifactId>
 <version>4.3.0</version>
</dependency>

(2)代码实现

package org.example.zkCase2;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * @ClassName CuratorLockTest
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/5/25 15:52
 * @Version 1.0
 */
public class CuratorLockTest {
    private String rootNode="/locks";

    //zookeeper server列表
    private String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";

    //connection超时时间
    private  int connectionTimeOut=2000;

    //session超时时间
    private int sessionTimeOut=2000;

    public static void main(String[] args) {
        new CuratorLockTest().test();
    }

    //测试
    private  void test(){
        //创建分布式锁1
        final InterProcessLock lock1=new InterProcessMutex(getCuratorFramework(),rootNode);
        //创建分布式锁2
        final InterProcessLock lock2=new InterProcessMutex(getCuratorFramework(),rootNode);

        new Thread(new Runnable() {
            @Override
            public void run() {
                //获取锁对象
                try {
                    lock1.acquire();
                    System.out.println("线程1获取锁");

                    //测试锁重入
                    lock1.acquire();
                    System.out.println("线程1再次获取锁");

                    lock1.release();
                    System.out.println("线程1释放锁");

                    lock1.release();
                    System.out.println("线程1再次释放锁");
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }



            }
        }).start();


        new Thread(new Runnable() {
            @Override
            public void run() {
                //获取锁对象
                try {
                    lock2.acquire();
                    System.out.println("线程2获取锁");

                    //测试锁重入
                    lock2.acquire();
                    System.out.println("线程2再次获取锁");

                    lock2.release();
                    System.out.println("线程2释放锁");

                    lock2.release();
                    System.out.println("线程2再次释放锁");
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }



            }
        }).start();
    }

    //分布式锁初始化
    private CuratorFramework getCuratorFramework() {

        //重试策略,初始时间3秒,重试3次
        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);

        //通过工厂创建Curator
        CuratorFramework client= CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .connectionTimeoutMs(connectionTimeOut)
                .sessionTimeoutMs(sessionTimeOut)
                .retryPolicy(policy)
                .build();

        //开启连接
        client.start();
        System.out.println("zookeeper 初始化完成...");
        return client;
    }
}


(3)观察控制台变化
在这里插入图片描述

3、企业面试真题

3.1 选举机制

半数机制·,超过半数的投票通过,即通过。
1、第一次启动选举机制:
投票过半数时,服务器id大的胜出
2、第二次启动选举规则:
(1)EPOCH大的直接胜出
(2)EPOCH相同,事务id大的胜出
(3)事务id相同,服务器id大的胜出

3.2 生产集群安装多少zk合适

安装奇数台。
生产经验:

  • 10台服务器:3台zk
  • 20台服务器:5台zk
  • 100台服务器:11台zk
  • 200台服务器:11台zk
    服务器台数多:
    (1)好处:提高可靠性
    (2)坏处:提高通信延时

3.3 常用命令

ls、get、create、delete

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/582416.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

软考A计划-试题模拟含答案解析-卷八

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例 &#x1f449;关于作者 专注于Android/Unity和各种游戏开发技巧&#xff0c;以及各种资源分享&am…

2023年上半年系统集成项目管理工程师下午真题及答案解析

试题一(18分) A公司跨国收购了B公司的主营业务&#xff0c;保留了B公司原有的人员组织结构和内部办公系统。为了解决B公司内部办公系统与A公司原有系统不兼容的问题&#xff0c;财务、人力和行政部门联合向公司高层申请尽快启动系统和业务的整合。 A公司领导指定HR总监王工担…

云容灾部署前的准备指南

据ITIC的研究表明&#xff0c;98%的千人规模企业每年都会遭遇停机危机&#xff0c;每停机一小时就会损失约700,000人民币。当灾难发生时&#xff0c;使用云容灾的企业可以通过云平台提供的资源和服务&#xff0c;快速帮助企业恢复业务。 HyperBDR云容灾&#xff0c;深度对接全…

Kibana:使用 Docker 安装 Kibana - 8.x

Kibana 的 Docker 镜像可从 Elastic Docker 注册中心获得。 基本映像是 ubuntu:20.04。www.docker.elastic.co 上提供了所有已发布的 Docker 图像和标签的列表。 源代码在 GitHub 中。 这些镜像包含免费和订阅功能。 开始 30 天试用以试用所有功能。 如果你还没有安装好自己的…

一文了解什么是ChatGPT

ChatGPT 是一种自然语言人工智能聊天机器人。在最基本的层面上&#xff0c;这意味着你可以问它任何问题&#xff0c;它会生成一个答案。 一、如何使用聊天 GPT 首先&#xff0c;转到chat.openai.com。如果这是您的第一次&#xff0c;您需要在开始之前使用 OpenAI 设置一个免费…

C919中有哪些项目是华为之作?

#C919# C919和华为都是我们国人的骄傲。那你知道在C919中有哪些项目是华为之作吗&#xff1f;C919与华为的合作主要涉及航空电子领域&#xff1a; 1.飞机高清视频传输系统&#xff1a;该系统使用华为的数字视频传输技术&#xff0c;可以将高清视频信号快速地传输到地面监控中心…

Gradio的web界面演示与交互机器学习模型,高级接口特征《6》

大多数模型都是黑盒&#xff0c;其内部逻辑对最终用户是隐藏的。为了鼓励透明度&#xff0c;我们通过简单地将Interface类中的interpretation关键字设置为default&#xff0c;使得向模型添加解释变得非常容易。这允许您的用户了解输入的哪些部分负责输出。 1、Interpret解释 …

NetApp E 系列混合闪存阵列——专为需要高带宽的专用应用程序而构建(如数据分析、视频监控、HPC、基于磁盘的备份)

E 系列混合闪存阵列&#xff1a;专为交付而构建 为什么选择 NetApp E 系列阵列&#xff1f; 超过 100 万次的安装和计数 凭借其提供的精简性和可靠性&#xff0c;我们的 E 系列阵列成为了众多企业的首选系统。从推动数据密集型应用程序&#xff08;如分析、视频监控和基于磁盘…

PLC/DCS系统常见的干扰现象及判断方法

一般来说&#xff0c;常见的干扰现象有以下几种&#xff1a; 1.系统发指令时&#xff0c;电机无规则地转动&#xff1b; 2.信号等于零时&#xff0c;数字显示表数值乱跳; 3。传感器工作时&#xff0c;DCS/PLC 采集过来的信号与实际参数所对应的信号值不吻合&#xff0c;且误…

微信小程序报错:“该小程序提供的服务出现故障,请稍后再试”(IOS报错,Android则正常)

记录对接微信小程序时遇到的问题&#xff0c;问题表现为&#xff1a; 1、发送消息后出现报错&#xff1a;该小程序提供的服务出现故障&#xff0c;请稍后再试 2、只有IOS会报错&#xff0c;Android则是正常的 3、IOS报错的微信号&#xff0c;即使在电脑端登录&#xff0c;使…

HKPCA Show携手电巢直播开启“云”观展!掀起一场电子人的顶级狂欢!

近日&#xff0c;国际电子电路&#xff08;深圳&#xff09;展览会&#xff08;HKPCA Show&#xff09;已于深圳国际会展中心圆满举办&#xff01;本次展览划分七大主题专区&#xff0c;面积超50,000平方米&#xff0c;展位超2500个&#xff0c;汇聚众多行业知名、有影响力的参…

腾讯云3年轻量应用服务器和5年CVM云服务器限制说明

腾讯云轻量服务器2核2G4M带宽三年388元、2核4G5M带宽三年599元、CVM云服务器2核2G配置5年1728元、2核4G配置5年3550元、4核8G配置5年6437元&#xff0c;从性价比角度来看&#xff0c;还是轻量应用服务器比较划算&#xff0c;腾讯云百科分享阿里云3年轻量应用服务器和5年云服务器…

华为手机怎么录屏?分享2个好用的手机录屏方法!

案例&#xff1a;华为手机怎么录制屏幕&#xff1f; 【有些内容通过文字和图片&#xff0c;不能很好地表达。我想把内容录制下来&#xff0c;发给别人&#xff0c;方便他们理解。有人知道华为手机怎么录屏吗&#xff1f;】 华为是一款知名的智能手机品牌&#xff0c;其强大的…

PUSH消息推送的实现原理

PUSH消息推送的实现原理_腾讯新闻 编辑导语&#xff1a;如今&#xff0c;push已经成为了我们手机信息流的一种推广方式&#xff0c;那么push消息推送是如何实现的呢&#xff1f;作者总结了几种消息推送的类型以及实现原理&#xff0c;一起来看看。 一、消息推送的类型 1. 短信…

使用 Elastic Learned Sparse Encoder 和混合评分的卓越相关性

作者&#xff1a;The Elastic Platform team 2023 年 5 月 25 今天&#xff0c;我们很高兴地宣布 Elasticsearch 8.8 正式发布。 此版本为矢量搜索带来了多项关键增强功能&#xff0c;让开发人员无需付出通常的努力和专业知识即可在搜索应用程序中利用一流的 AI 驱动技术。 使…

06- AOP(实现案例:记录日志操作)

目录 1. 通知类型 2. 通知顺序 3. 切入点表达式 execution() annotation() 4. 连接点&#xff08;JoinPoint&#xff09; 5. 案例&#xff1a;将CRUD接口的相关操作记录到数据库中 AOP: Aspect Oriented Programming (面向切面编程、面向方面编程)&#xff0c;其实就是…

Zookeeper学习---2、客户端API操作、客户端向服务端写数据流程

1、客户端API操作 1.1 IDEA 环境搭建 前提&#xff1a;保证 hadoop102、hadoop103、hadoop104 服务器上 Zookeeper 集群服务端启动。 1、创建一个工程&#xff1a;Zookeeper 2、添加pom文件 <?xml version"1.0" encoding"UTF-8"?> <project …

JavaEE进阶5/24(Spring入门)

1.IOC 控制反转 把对象的生命周期的控制权由程序员反转给其他人。 控制反转减少了代码的耦合性。 哪里发生了反转&#xff1f;f 1.对象生命周期的控制权由程序员转交给Spring 2.对象创建的顺序反转了&#xff0c;原本程序员通过new来创建的是从外层到内层的&#xff0c;控制反转…

原型设计工具Quant-UX

什么是 Quant-UX &#xff1f; Quant UX 是一种研究、可用性和原型设计工具&#xff0c;可快速测试您的设计并获得数据驱动的洞察力。Quant-UX 使验证您的想法变得简单。使用 Quant UX 的可视化编辑器可在几分钟内创建一个交互式原型&#xff0c;感觉就像真正的应用程序一样。 …

使用Windbg静态分析dump文件的一般步骤详解

目录 1、概述 2、静态分析dump文件的一般步骤 2.1、查看异常类型 2.2、使用.ecxr命令切换到发生异常的线程上下文&#xff0c;查看发生异常的那条汇编指令 2.3、使用kn/kv/kp命令查看异常发生时的函数调用堆栈 2.4、使用lm命令查看模块的时间戳&#xff0c;找到对应的pdb…