大数据Zookeeper--案例

news2025/4/7 17:50:06

文章目录

  • 服务器动态上下线监听案例
    • 需求
    • 需求分析
    • 具体实现
    • 测试
  • Zookeeper分布式锁案例
    • 原生Zookeeper实现分布式锁
    • Curator框架实现分布式锁
  • Zookeeper面试重点
    • 选举机制
    • 生产集群安装多少zk合适
    • zk常用命令

服务器动态上下线监听案例

需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知
到主节点服务器的上下线。

需求分析

服务器动态上下线

具体实现

1)先在集群上创建/servers节点

[zk: localhost:2181(CONNECTED) 10] create /servers "servers" 
Created /servers

2)在Idea中创建包名:com.yudan.case1

3)服务器端向Zookeeper注册代码

import org.apache.zookeeper.*;

import java.io.IOException;

public class DistributeServer {

    private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private int sessionTime = 100000;
    private ZooKeeper zk;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        DistributeServer server = new DistributeServer();

        // 1、获取zk连接
        server.getConnect();

        // 2、注册服务器到zk集群
        server.regist(args[0]);

        // 3、启动 业务逻辑(睡觉)
        server.business();
    }

	// 创建到 zk 的客户端连接
    private void getConnect() throws IOException {

        zk = new ZooKeeper(connectString, sessionTime, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }

	// 注册到服务器
    private void regist(String hostname) throws InterruptedException, KeeperException {
        String create = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println(hostname + " " + "is online");
    }
	
	// 业务功能
    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }
}

4)客户端代码

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class DistributeClient {

    private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private int sessionTime = 100000;
    private ZooKeeper zk;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributeClient client = new DistributeClient();

        // 1、获取zk连接
        client.getConnect();

        // 2、监听/servers下面子节点的增加和删除
        client.getServersList();

        // 3、业务逻辑(睡觉)
        client.business();
    }

	// 创建到 zk 的客户端连接
    private void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTime, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
            	// 再次启动监听
                try {
                    getServersList();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } catch (KeeperException e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }

	// 获取服务器列表信息
    private void getServersList() throws InterruptedException, KeeperException {
        // 获取服务器子节点信息,并且对父节点进行监听
        List<String> children = zk.getChildren("/servers", true);
		
		// 存储服务器信息列表 
        ArrayList<String> servers = new ArrayList<>();
        
		// 遍历所有节点,获取节点中的主机名称信息 
        for (String child : children) {
            byte[] data = zk.getData("/servers/" + child, false, null);

            servers.add(new String(data));
        }

        // 打印
        System.out.println(servers);
    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }
}

测试

1)在Linux命令行上操作增加减少服务器

(1)启动DistributeClient 客户端

(2)在hadoop102上zk的客户端/servers目录上创建临时带序号节点

[zk: localhost:2181(CONNECTED) 1]  create -e -s /servers/hadoop102 "hadoop102" 
[zk: localhost:2181(CONNECTED) 2]  create -e -s /servers/hadoop103 "hadoop103"

(3)观察Idea控制台变化

[hadoop102, hadoop103]

(4)执行删除操作

[zk: localhost:2181(CONNECTED) 8]  delete /servers/hadoop1020000000000 

(5)观察Idea控制台变化

[hadoop103] 

2)在Idea上操作增加减少服务器

(1)启动DistributeClient 客户端(如果已经启动过,不需要重启)

(2)启动DistributeServer 服务

  • 点击Edit Configurations…
    在这里插入图片描述
  • 在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop102
    在这里插入图片描述
  • 回到DistributeServer的main方法,右键,在弹出的窗口中点击Run “DistributeServer.main()”
  • 观察DistributeServer控制台,提示hadoop102 is online
  • 观察DistributeClient控制台,提示hadoop102已经上线

Zookeeper分布式锁案例

什么叫做分布式锁呢?

比如说"进程1"在使用该资源的时候,会先去获得锁,"进程1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
分布式锁

原生Zookeeper实现分布式锁

1)分布式锁实现

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DistributeLock {

    private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private final int sessionTime = 100000;
    private final ZooKeeper zk;
    // 当前client等待的子节点
    private String waitPath;
    // zookeeper节点等待
    private CountDownLatch waitLatch = new CountDownLatch(1);
    // zookeeper连接
    private CountDownLatch connectLatch = new CountDownLatch(1);
    // 当前client创建的子节点
    private String currentMode;
	
	// 和 zk 服务建立连接,并创建根节点
    public DistributeLock() throws IOException, InterruptedException, KeeperException {

        // 1、获取连接
        zk = new ZooKeeper(connectString, sessionTime, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // connectLatch 如果连接上zk 可以释放
                // 连接建立时, 打开latch, 唤醒wait在该latch上的线程
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }

                // waitLatch 需要释放
                // 发生了waitPath的删除事件
                if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });

        // 等待 zookeeper正常连接后,往下走程序
        connectLatch.await();

        // 2、判断根节点/locks是否存在
        Stat stat = zk.exists("/locks", false);

        if (stat == null) {
            // 创建一下根节点
            zk.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
        }
    }

    // 对zk加锁
    public void zkLock() {
        // 创建对应的临时带序号节点
        try {
            currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            // 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是, 监听他序号的前一个节点
            List<String> children = zk.getChildren("/locks", false);

            // 如果children 只有一个值,那就直接获取锁;如果有多个节点,需要判断,哪个节点最小
            if (children.size() == 1) {
                return;
            } else {
                // 对children集合内的节点进行排序
                Collections.sort(children);

                // 获取节点名称 seq-
                String thisNode = currentMode.substring("/locks/".length());

                // 通过seq- 获取到该节点在children集合中的位置
                int index = children.indexOf(thisNode);

                // 判断
                if (index == -1) {
                    System.out.println("数据异常");
                } else if (index == 0) {
                    // 就一个节点,可以获取锁了
                    return;
                }else {
                    // 需要监听前一个节点
                    waitPath = "/locks/" + children.get(index-1);
                    zk.getData(waitPath,true,null);

                    // 等待监听
                    waitLatch.await();

                    return;
                }
            }

        } catch (KeeperException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    // 对zk解锁
    public void unzkLock() {
        // 删除节点
        try {
            zk.delete(currentMode,-1);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        }
    }
}

2)分布式锁测试

(1)创建两个线程

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

public class DistributeLockTest {

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
	
		// 创建分布式锁1
        final DistributeLock lock1 = new DistributeLock();
		// 创建分布式锁2
        final DistributeLock lock2 = new DistributeLock();

        new Thread(new Runnable() {
            @Override
            public void run() {
            	// 获取锁对象
                try {
                    lock1.zkLock();
                    System.out.println("线程1 启动,获取到锁");
                    Thread.sleep(5 * 1000);

                    lock1.unzkLock();
                    System.out.println("线程1 释放锁");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
            	// 获取锁对象
                try {
                    lock2.zkLock();
                    System.out.println("线程2 启动,获取到锁");
                    Thread.sleep(5 * 1000);

                    lock2.unzkLock();
                    System.out.println("线程2 释放锁");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();

    }
}

(2)观察控制台变化

线程1获取锁 
线程1释放锁 
线程2获取锁 
线程2释放锁

Curator框架实现分布式锁

1)原生的Java API开发存在的问题

(1)会话连接是异步的,需要自己去处理。比如使用CountDownLatch

(2)Watch需要重复注册,不然就不能生效

(3)开发的复杂性还是比较高的

(4)不支持多节点删除和创建。需要自己去递归

2)Curator是一个专门解决分布式锁的框架,解决了原生Java API开发分布式遇到的问题。

详情请查看官方文档:https://curator.apache.org/index.html

3)Curator 案例实操

(1)添加依赖

<dependency> 
	<groupId>org.apache.curator</groupId> 
	<artifactId>curator-framework</artifactId> 
	<version>4.3.0</version> 
</dependency> 

<dependency> 
	<groupId>org.apache.curator</groupId> 
	<artifactId>curator-recipes</artifactId> 
	<version>4.3.0</version> 
</dependency> 

<dependency> 
	<groupId>org.apache.curator</groupId> 
	<artifactId>curator-client</artifactId> 
	<version>4.3.0</version> 
</dependency> 

(2)代码实现

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLockTest {

    public static void main(String[] args) {

        // 创建分布式锁1
        InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

        // 创建分布式锁2
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.acquire();
                    System.out.println("线程1 获取到锁");

                    lock1.acquire();
                    System.out.println("线程1 再次获取到锁");

                    Thread.sleep(5 * 1000);

                    lock1.release();
                    System.out.println("线程1 释放锁");

                    lock1.release();
                    System.out.println("线程1 再次释放锁");

                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.acquire();
                    System.out.println("线程2 获取到锁");

                    lock2.acquire();
                    System.out.println("线程2 再次获取到锁");

                    Thread.sleep(5 * 1000);

                    lock2.release();
                    System.out.println("线程2 释放锁");

                    lock2.release();
                    System.out.println("线程2 再次释放锁");

                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
    }

	// 分布式锁初始化
    private static CuratorFramework getCuratorFramework() {
		
		// 重试策略,初始时间3秒,重试3次
        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);

        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop103:2181,hadoop104:2181")
                .connectionTimeoutMs(100000)
                .sessionTimeoutMs(100000)
                .retryPolicy(policy).build();

        // 启动客户端
        client.start();

        System.out.println("zookeeper 启动成功!");
        return client;
    }
}

(2)观察控制台变化:

线程1获取锁 
线程1再次获取锁 
线程1释放锁 
线程1再次释放锁 
线程2获取锁 
线程2再次获取锁 
线程2释放锁 
线程2再次释放锁

Zookeeper面试重点

选举机制

半数机制,超过半数的投票通过,即通过。

(1)第一次启动选举规则:

投票过半数时,服务器id大的胜出

(2)第二次启动选举规则:

①EPOCH大的直接胜出

②EPOCH相同,事务id大的胜出

③事务id相同,服务器id大的胜出

生产集群安装多少zk合适

安装奇数台。

生产经验:

  • 10台服务器:3台zk;
  • 20台服务器:5台zk;
  • 100台服务器:11台zk;
  • 200台服务器:11台zk

zk常用命令

ls、get、create、delete

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1435533.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

发现本地Elasticsearch版本 不兼容第三方管理工具 带大家在官网中寻找并下载指定版本的Elasticsearch

我们在 springboot 整合Elasticsearch时可能会出现版本不兼容异常 首先 大家要搞清楚 目前 Elasticsearch(ES)与springboot 市场版本互相的兼容情况 可以参考我的文章 springboot与Elasticsearch版本兼容对比 这里 我们想下载 其他版本 还是访问官网 https://www.elastic.co/…

探索LLM的意图识别能力

不可否认的是&#xff0c;LLM&#xff08;例如 OpenAI 的 GPT 系列&#xff09;将在不断发展的对话式 AI 领域发挥重要作用。 关于使用 ChatGPT 执行各种任务的帖子和文章不计其数。 GPT 有几个关键功能值得进一步探索&#xff0c;例如其摘要、分类和生成文本的能力。 其中&…

U盘误删的文件怎么找回?5个宝藏方法分享!

“我想问问大家&#xff0c;在使用u盘时如果不小心把很重要的文件删除了应该怎么办呀&#xff1f;有什么简单又快速的U盘文件恢复方法可以推荐吗&#xff1f;” U盘作为一种便携式的存储设备&#xff0c;在日常工作中扮演着重要的角色。然而&#xff0c;如果不小心误删了U盘中的…

SpringBoot:自动配置报告

自动配置报告demo&#xff1a;点击查看 LearnSpringBoot03AutoConfig 点击查看更多的SpringBoot教程 一、application.properties代码 #开启springboot debug模式 #自动配置报告 #Positive matches: 自动配置类启用了&#xff08;自动配置类匹配上了&#xff09;&#xff0c…

hexo和github.io博客的搭建

简要&#xff1a; 最近在牛客网上看到有很多应届毕业生大佬的求职简历上都写上了自己的博客地址&#xff0c;并且在acwing上看到图图佬&#xff0c;铅笔佬也有自己的博客地址&#xff0c;大部分都采用了自己搭建博客的方式&#xff0c;而不是用脏乱差的csdn来写博客。所以我也采…

电脑服务器离线安装.net framework 3.5解决方案(错误:0x8024402c )(如何确定当前系统是否安装NET Framework 3.5)

问题环境&#xff1a; 日常服务的搭建或多或少都会有需要到NET Framework 3.5的微软程序运行框架&#xff0c;本次介绍几种不同的安装方式主要解决运行在Windows 2012 以上的操作系统的服务。 NET Framework 3.5 是什么&#xff1f; .NET Framework是微软公司推出的程序运行框架…

利用LLM大模型生成sql的深入应用探究

Chat2DB 是一款有开源免费的多数据库客户端工具,和传统的数据库客户端软件Navicat、DBeaver 相比 Chat2DB 集成了 AIGC 的能力&#xff0c;能够将自然语言转换为 SQL&#xff0c;也可以将 SQL 转换为自然语言&#xff0c;可以给出研发人员 SQL 的优化建议&#xff0c;极大地提升…

YOLO-World——超级轻量级开放词汇目标检测方法

前言 目标检测一直是计算机视觉领域中不可忽视的基础挑战&#xff0c;对图像理解、机器人技术和自主驾驶等领域具有广泛应用。随着深度神经网络的发展&#xff0c;目标检测方面的研究取得了显著进展。尽管这些方法取得了成功&#xff0c;但它们存在一些限制&#xff0c;主要体…

Python __pycache__文件

pycharm配置位置 几个基本概念 源代码&#xff08;source code&#xff09; 我们每天编写的Python、Java、C等代码通常指的就是源代码&#xff0c;源代码的特点是人类可读。但是CPU只能读懂二进制&#xff0c;看不懂我们写的源代码&#xff0c;因此还需要进行编译&#xff08…

vue2 自定义指令 v-highlight 文本高亮显示分享

简单分享一个文本高亮显示的自定义指令&#xff0c;主要分两部分&#xff1a; 1、代码实现&#xff1a;在 main.js 文件中添加一个自定义指令&#xff0c;实现搜索时文本高亮显示&#xff0c;代码如下&#xff1a; const highlightText (el, searchText) > {const textCo…

安装PyInstaller的保姆级教程

一、安装PyInstaller之前首先要安装Python&#xff0c;小编这里安装的是Python3.9&#xff0c;目前&#xff08;2024/2/6&#xff09;匹配到的最高版本的PyInstaller的版本为6.3.0。需要安装Python的小伙伴可以去这里安装python详细步骤&#xff08;超详细&#xff0c;保姆级&a…

黑马头条 Kafka

我是南城余&#xff01;阿里云开发者平台专家博士证书获得者&#xff01; 欢迎关注我的博客&#xff01;一同成长&#xff01; 一名从事运维开发的worker&#xff0c;记录分享学习。 专注于AI&#xff0c;运维开发&#xff0c;windows Linux 系统领域的分享&#xff01; 知…

SpringBoot-基础篇03

之前搭建了整个开发环境实现了登录注册&#xff0c;springBoot整合mybatis完成增删改查&#xff0c;今天完成分页查询&#xff0c;使用阿里云oss存储照片等资源&#xff0c;后期会尝试自己搭建分布式文件系统来实现。 一&#xff0c;SpringBootMybatis完成分页查询 1&#xff…

选择大语言模型:2024 年开源 LLM 入门指南

作者&#xff1a;来自 Elastic Aditya Tripathi 如果说人工智能在 2023 年起飞&#xff0c;这绝对是轻描淡写的说法。数千种新的人工智能工具被推出&#xff0c;人工智能功能被添加到现有的应用程序中&#xff0c;好莱坞因对这项技术的担忧而戛然而止。 甚至还有一个人工智能工…

LLMs之miqu-1-70b:miqu-1-70b的简介、安装和使用方法、案例应用之详细攻略

LLMs之miqu-1-70b&#xff1a;miqu-1-70b的简介、安装和使用方法、案例应用之详细攻略 目录 miqu-1-70b的简介 miqu-1-70b的安装和使用方法 1、安装 2、使用方法 miqu-1-70b的案例应用 miqu-1-70b的简介 2024年1月28日&#xff0c;发布了miqu 70b&#xff0c;潜在系列中的…

Java面向对象 创建类 创建对象

目录 创建类类的属性类的方法实例分析 创建对象创建Test类测试分析 创建类 类的属性 属性用于定义该类或该类对象包含的数据或者说静态特征。属性作用范围是整个类体。 属性定义格式&#xff1a; [修饰符] 属性类型 属性名 [默认值] ;类的方法 方法用于定义该类或该类实例…

【ARM 嵌入式 编译系列 2.7 -- GCC 编译优化参数详细介绍】

请阅读【嵌入式开发学习必备专栏 】 文章目录 GCC 编译优化概述常用优化等级-O1 打开的优化选项-O2 打开的优化选项-O3 打开的优化选项-Os 打开的优化选项优化技术使用优化选项的注意事项GCC 编译优化概述 GCC(GNU Compiler Collection)包含了用于C、C++、Objective-C、Fort…

大模型综述

1.概念 大模型是指人工智能预训练大模型&#xff0c;具有海量参数和复杂架构&#xff0c;用于深度学习任务的模型&#xff0c;拥有强大的处理能力和表征能力&#xff0c;以数据算力为支撑&#xff0c;借助数据管理、模型训练、评估优化、服务平台、插件等辅助工具&#xff0c;…

Windows显示空的可移动磁盘的解决方案

123  大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式…

macbookpro和macbookair的区别?cleanmymac 怎么清理mac空间

苹果mac air和pro区别有&#xff1a;1、air采用了轻薄的设计&#xff0c;重量相对较轻&#xff0c;便于携带&#xff0c;而pro更加注重性能&#xff0c;所以比较重&#xff1b;2、air通常搭载较低功耗的处理器内存和存储容量相对较小&#xff0c;而pro配备了更强大的处理器、更…