ZooKeeper【实际案例】

news2024/11/25 21:11:57

服务器动态上下线监听

需求

        在我们的分布式系统中,有多台服务器节点,我们希望任意一台客户端都能实时收到服务器节点的上下线。

实现

服务器节点上线以后自动去zookeeper目录注册自己的节点信息(创建Znode临时节点),这就需要我们创建一个永久目录节点 servers 来供服务器集群在这之下创建临时节点。

客户端监听zookeeper目录下节点的变化。

ZooKeeper可以监听到七种类型变化:

  1. None:连接建立事件
  2. NodeCreated:节点创建
  3. NodeDeleted:节点删除
  4. NodeDataChanged:节点数据变化
  5. NodeChildrenChanged:子节点列表变化
  6. DataWatchRemoved:节点监听被移除
  7. ChildWatchRemoved:子节点监听被移除

 DistributeClient 

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class DistributeClient {

    private String connectionString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private int sessionTimeOut = 2000;
    private ZooKeeper zk;

    public static void main(String[] args) throws InterruptedException, KeeperException, IOException {

        DistributeClient client = new DistributeClient();
        //1.获取zookeeper连接
        client.getConnection();

        //2.监听/servers 下面子节点的变化
        client.getServerList();

        //3.业务逻辑
        client.business();


    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void getServerList() throws InterruptedException, KeeperException {
        //设置一直监听 它会自动走初始化ZooKeeper时指定的监听器方法process()
        List<String> children = zk.getChildren("/servers", true);

        //存储主机名称
        ArrayList<String> servers = new ArrayList<>();

        for (String child : children) {
            byte[] data = zk.getData("/servers/" + child, false, null);

            servers.add(new String(data));
        }

        //直接打印List集合
        System.out.println(servers);
    }

    private void getConnection() throws IOException {
        zk = new ZooKeeper(connectionString , sessionTimeOut, new Watcher() {

            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    //一直监听
                    getServerList();
                } catch (InterruptedException | KeeperException e) {
                    e.printStackTrace();
                }
            }

        });
    }

}

DistributeServer

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Id;

import java.io.IOException;

public class DistributeServer {

    private String connectionString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private int sessionTimeOut = 2000;
    private ZooKeeper zk;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        DistributeServer server = new DistributeServer();
        //1.连接zookeeper集群
        server.getConnection();

        //2.注册Znode到zookeeper目录
        server.register("hadoop102");

        //3.启动业务逻辑
        server.business();

    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void register(String hostname) throws InterruptedException, KeeperException {
        String s = zk.create("/servers/" + hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println(hostname+" is online !");
    }

    private void getConnection() throws IOException {

            zk = new ZooKeeper(connectionString, sessionTimeOut, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });

    }

}

测试

        测试时,zookeeper集群必须开启,因为我们需要通过API来连接我们的zookeeper集群才能实现节点的创建。此外,我们的服务器地址映射需要再windows端进行配置后才能使用,或者直接使用服务器的 ip 。

命令行测试

开启客户端集群,再通过命令行来模拟服务器集群测试客户端的监听效果。

运行DistributeClient ,观察控制台输出:

模拟服务器测试

运行DistributeServer来模拟服务器hadoop102上线,观察DistributeServer控制台的输出:

停止运行DistributeServer,意味着服务器下线,临时节点也就自动删除了。

 分布式锁案例

        单机情况下也就是只有一个进程的情况下使用Synchronized是可以保证线程安全的。但是分布式情况下是多个不同的进程,而不是一个进程里面不同的线程,所以Synchronized是无法保证多个进程安全的。

定义


        互斥是我们分布式系统同步化问题中的一个重要部分,ZooKeeper帮我们解决了这一问题。 分布式系统的基础是多进程之间并发和协作,不同进程将需要同时访问相同的资源,为了保证这种并发访问不会崩溃资源或使其不一致,需要保证进程的互斥访问,当一个进程使用某个共享资源,其他进程不允许对这个资源操作。

过程

  1. 多个客户端对某一共享资源进行访问请求
  2. zookeeper收到请求之后,在zookeeper的 /locks 目录下创建多个临时带序列编号节点,代表每个客户端的请求。
  3. 编号最小的请求节点优先获得锁,进行资源的访问,此时其它请求节点不允许访问该资源。
  4. 其它请求节点会监听自己前一个编号小的请求,监听前一个请求节点是否已经释放掉锁。
  5. 如果前一个请求节点完成之后释放掉锁自己就立即拿到锁,重复第3部

实现

分布式锁对象 DistributeLock

        Hadoop集群中不同的节点之间需要协作完成各种任务。这就需要多个节点在同一时间对某一个资源进行访问,并防止并发冲突。Zookeeper提供了分布式锁机制,可以提供多个节点之间的同步和互斥,避免数据不一致等问题。

        在zookeeper中,我们每个需要互斥访问的请求任务都会有一把锁(每个任务一把锁),对应到这个案例是每个 /locks 下的任务节点都有一把自己的锁,每把锁都有两个功能(上锁和解锁),当自己拿到资源的时候就上锁,避免别的进程来访问,当自己使用完之后,就解锁,供其它任务节点按照顺序使用。

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DistributeLock {

    //配置多个zookeeper服务器
    private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    //设置客户端连接的最大时间 ms
    private final int sessionTimeOut = 2000;
    private final ZooKeeper zk;

    private String lastPath;

    private CountDownLatch waitLatch = new CountDownLatch(1);

    private CountDownLatch connectLatch = new CountDownLatch(1);
    private String currentNode;

    public DistributeLock() throws IOException, InterruptedException, KeeperException {

        //获取连接 zk
        zk = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //connectLatch 如果连接上zk 就释放它 不然它会一直阻塞
                if (watchedEvent.getState()==Event.KeeperState.SyncConnected){
                    connectLatch.countDown();
                }
                //如果前一个请求节点释放了锁(前一个节点在释放锁之后会被自动删除 这样我们就可以监听到)
                if (watchedEvent.getType()==Event.EventType.NodeDeleted && watchedEvent.getPath().equals(lastPath)){
                    waitLatch.countDown();
                }
            }
        });

        //等待zk正常连接后才能继续往下走
        connectLatch.await();

        //判断根节点/locks是否存在
        Stat stat = zk.exists("/locks", false);

        if (stat == null){
            //创建一个永久节点
            zk.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
        }



    }

    //对资源上锁
    public void zkLock() throws InterruptedException, KeeperException {

        //创建临时带序号节点
        currentNode = zk.create("/locks/" + "tmp-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        //判断当前任务节点是不是最小的节点
        List<String> children = zk.getChildren("/locks", false);
        //如果节点/locks目录下只有一个节点
        if (children.size()==1){
            return;
        }else {
            Collections.sort(children);

            //获取节点名称 tmp-000000
            String thisNode = currentNode.substring("/locks/".length());
            //通过节点名称来获取到集合中的位置
            int index = children.indexOf(thisNode);

            //
            if (index == -1){
                System.err.println("数据异常");
            }else if(index == 0){//说明自己就是第一个节点 拿到锁
                return;
            }else {
                //前一个节点 = 当前节点序号 - 1
                lastPath = "/locks/"+children.get(index - 1);
                //需要监听前一个节点锁的情况 这里设置监听 需要再初始化ZooKeeper中实现process的逻辑代码
                zk.getData(lastPath, true, null);

                //等待监听
                waitLatch.await();

                return;
            }
        }


    }

    //解锁
    public void unLock() throws InterruptedException, KeeperException {

        //删除节点
        zk.delete(currentNode,-1);

    }
}

模拟任务节点使用分布式锁的过程

        开启两个线程来模拟两个请求任务同时被记录到 /locks 目录节点下,当其中一个线程占用了资源(上了锁),另一个线程就只能等待其释放锁。

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

public class DistributeLockTest {

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributeLock lock1 = new DistributeLock();
        DistributeLock lock2 = new DistributeLock();

        //开启两个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.zkLock();
                    System.out.println("线程1启动 获取到锁");
                    Thread.sleep(5000);

                    lock1.unLock();
                    System.out.println("线程1 释放锁");
                } catch (InterruptedException | KeeperException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.zkLock();
                    System.out.println("线程2启动 获取到锁");
                    Thread.sleep(5000);

                    lock1.unLock();
                    System.out.println("线程2 释放锁");
                } catch (InterruptedException | KeeperException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/647293.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

无人机侦察区域覆盖

irvingvasquez/ocpp: Optimal coverage path planning (github.com) matlab2020可运行

MySQL数据表进阶操作

MySQL数据表高级操作 一、克隆表二、清空表三、创建临时表四、创建外键约束五、MySQL中6种常见的约束&#xff1a;六、数据库用户管理 一、克隆表 将数据表的数据记录生成到新的表中 被克隆的对象&#xff1a; 方法一&#xff1a; 第一步&#xff1a;create table 新表名 li…

2023年简历石沉大海,别投了,软件测试岗位饱和了....

各大互联网公司的接连裁员&#xff0c;政策限制的行业接连消失&#xff0c;让今年的求职雪上加霜&#xff0c;想躺平却没有资本&#xff0c;还有人说软件测试岗位饱和了&#xff0c;对此很多求职者深信不疑&#xff0c;因为投出去的简历回复的越来越少了。 另一面企业招人真的…

机器学习_预测概率校准

我们在建模时通常根据准确性或准确性来评估其预测模型&#xff0c;但几乎不会问自己&#xff1a;“我的模型能够预测实际概率吗&#xff1f;” 但是&#xff0c;从商业的角度来看&#xff0c;准确的概率估计是非常有价值的&#xff08;准确的概率估计有时甚至比好的精度更有价值…

Dao层、Service层、Entity层、Servlet层、Utils层

这几天在复习高数&#xff0c;还有刷题。 B&#xff1a; 第五周任务 [Cloned] - Virtual Judge (vjudge.net) http://t.csdn.cn/S3imr G&#xff1a; 第五周任务 [Cloned] - Virtual Judge (vjudge.net) http://t.csdn.cn/UVgfK Dao层是数据访问层Service层是业务逻辑层…

深度学习HashMap之手撕HashMap

认识哈希表 HashMap其实是数据结构中的哈希表在Java里的实现。 哈希表本质 哈希表也叫散列表&#xff0c;我们先来看看哈希表的定义&#xff1a; 哈希表是根据关键码的值而直接进行访问的数据结构。 简单说来说&#xff0c;哈希表由两个要素构成&#xff1a;桶数组和散列函数…

汽车电子Autosar之车载以太网

前言 近些年来&#xff0c;随着为了让汽车更加安全、智能、环保等&#xff0c;一系列的高级辅助驾驶功能喷涌而出。未来满足这些需求&#xff0c;就对传统的电子电器架构带来了严峻的考验&#xff0c;需要越来越多的电子部件参与信息交互&#xff0c;导致对网络传输速率&#x…

NR及LTE中的IQ数据与信息、比特率、码元、波特率之间的关系

信息与比特率 信息&#xff1a;对信源进行数字编码后的数据&#xff0c;基本单位是bit。 比特率&#xff1a;信息的速率称为比特率(bit/s、bps)&#xff0c;通常用Rb表示。 码元与波特率 码元 固定时长的信号波形(数字脉冲)&#xff0c;也称为一个符号&#xff0c;symb。 (…

LeetCode36. 有效的数独

请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 &#xff0c;验证已经填入的数字是否有效即可。 数字 1-9 在每一行只能出现一次。 数字 1-9 在每一列只能出现一次。 数字 1-9 在每一个以粗实线分隔的 3x3 宫内只能出现一次。&#xff08;请参考示例图&#xff09; …

【极海APM32F4xx Tiny】学习笔记05-移植 RTT NANO工程

5.移植 RTT NANO工程 移植步骤&#xff1a; 1. mdk添加rtt nano 包文件 2. 添加源码 3. 屏蔽2个中断处理函数 4. 修改board.c文件 5. 添加控制台 6. 添加finsh组件 7. 编写测试工程 1. mdk添加rtt nano 包文件 也可以下载后手动安装 下载链接https://www.rt-thread.org/downl…

【openeuler】Yocto embedded sig联合例会 (2022-11-03)

Yocto &embedded sig联合例会 (2022-11-03)_哔哩哔哩_bilibili

从浏览器输入url到页面加载(六)前端必须了解的路由器和光纤小知识

前言 上一章我们说到了数据包在网线中的故事&#xff0c;说到了双绞线&#xff0c;还说到了麻花。这一章继续沿着这条线路往下走&#xff0c;说一些和cdn以及路由器相关&#xff0c;运营商以及光纤相关的小知识&#xff0c;前端同学应该了解一下的 目录 前言 1. CDN和路由器…

STM32-I2C通信在AT24C02的应用

AT24C02是一种失去电源供给后依旧能保持数据的储存器&#xff0c;常用来储存一些配置信息&#xff0c;在系统重新上电之后也可以加载。它的容量是2k bit的EEPROM存储器&#xff0c;采用I2C通信方式。 AT24C02支持两种写操作&#xff1a;字节写操作和页写操作。本实验中我们采用…

三十八、动态规划——背包问题( 01 背包 + 完全背包 + 多重背包 + 分组背包 + 优化)

动态规划-背包问题算法主要内容 一、基本思路1、背包问题概述2、动态规划&#xff08;DP&#xff09;问题分析 二、背包问题1、0 1 背包问题2、完全背包问题3、多重背包问题4、分组背包问题 三、例题题解 一、基本思路 1、背包问题概述 0 1 背包问题&#xff1a; 条件&#x…

前端解决按钮重复提交数据问题(节流和防抖)

&#x1f37f;*★,*:.☆(&#xffe3;▽&#xffe3;)/$:*.★* &#x1f37f; &#x1f35f;欢迎来到前端初见的博文&#xff0c;本文主要讲解在工作解决按钮重复提交数据问题&#xff08;节流和防抖&#xff09; &#x1f468;‍&#x1f527; 个人主页 : 前端初见 &#x1f9…

【redis】数据类型,持久化、事务和锁机制、Java和redis交互、使用redis缓存、三大缓存问题

文章目录 Redis数据库NoSQL概论Redis安装和部署基本操作数据操作 数据类型介绍HashListSet和SortedSet 持久化RDBAOF 事务和锁机制锁 使用Java与Redis交互基本操作SpringBoot整合Redis 使用Redis做缓存Mybatis二级缓存Token持久化存储 三大缓存问题缓存穿透缓存击穿缓存雪崩 Re…

kotlin协程Job、CoroutineScope作用域,Android

kotlin协程Job、CoroutineScope作用域&#xff0c;Android import androidx.appcompat.app.AppCompatActivity import android.os.Bundle import android.util.Log import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines…

(横向刷题)【算法1-6】二分查找与二分答案【算法2-1】前缀和、差分与离散化(上),总结

【算法1-6】二分查找与二分答案 P1024[NOIP2001 提高组] 一元三次方程求解 思路&#xff1a;题目说明根与根之差的绝对值>1,且x1<x2&&f(x1)*f(x2)<0则其中存在解&#xff0c;于是联想到枚举&#xff0c;再用二分答案法控制精度 总结&#xff1a;二分对于精度…

【RabbitMQ教程】第四章 —— RabbitMQ - 交换机

&#x1f4a7; 【 R a b b i t M Q 教程】第四章—— R a b b i t M Q − 交换机 \color{#FF1493}{【RabbitMQ教程】第四章 —— RabbitMQ - 交换机} 【RabbitMQ教程】第四章——RabbitMQ−交换机&#x1f4a7; &#x1f337; 仰望天空&#xff0c;妳我亦是行人.✨ &…

《阿里大数据之路》读书笔记:第一章 总述

阿里巴巴大数据系统体系架构图 阿里数据体系主要分为数据采集、数据计算、数据服务和数据应用四大层次。 一、数据采集层 阿里巴巴建立了一套标准的数据采集体系方案&#xff0c;致力全面、高性能、规范地完成海量数据的采集&#xff0c;并将其传输到大数据平台。 数据来源主…