zookeeper(2) 服务器动态上下线监听和分布式锁案例

news2024/12/26 22:22:33

案例一:服务器动态上下线监听

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知 到主节点服务器的上下线。

1.服务端代码

package com.atguigu.case1;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class DistributeServer {
    String connectString = "hadoop100:2181,hadoop101:2181,hadoop102:2181";
    int sessionTimeout = 20000;
    private ZooKeeper zooKeeper;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributeServer server = new DistributeServer();
        server.getConnect();
        server.regist(args[0]);
        server.business();
    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void regist(String hostname) throws InterruptedException, KeeperException {
        String s = zooKeeper.create("/servers/", hostname.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname + "isonline");
    }

    private void getConnect() throws IOException {
         zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }
}

2.客户端代码

package com.atguigu.case1;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class DistributeClient {
    String connectString = "hadoop100:2181,hadoop101:2181,hadoop102:2181";
    int sessionTimeout = 20000000;
    private ZooKeeper zooKeeper;
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributeClient client = new DistributeClient();
        client.getConnect();
        client.getServerList();
        client.business();
    }

    private void getServerList() throws InterruptedException, KeeperException {
        List<String> children = zooKeeper.getChildren("/servers", true);
        ArrayList<String> servers = new ArrayList<>();
        for (int i = 0; i < children.size(); i++) {

            byte[] data = zooKeeper.getData("/servers/" + children.get(i), false, null);
            servers.add(new String(data));
        }
        System.out.println(servers);
    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void regist(String hostname) throws InterruptedException, KeeperException {
        String s = zooKeeper.create("/servers/"+hostname, hostname.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname + "isonline");
    }

    private void getConnect() throws IOException {
        zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    getServerList();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

案例二:分布式锁案例原生方法

package com.atguigu.case2;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DistributedLock {
    String connectString = "hadoop100:2181,hadoop101:2181,hadoop102:2181";
    int sessionTimeout = 2000;
    private final ZooKeeper zooKeeper;

    private CountDownLatch connectLatch  = new CountDownLatch(1);

    private CountDownLatch waitLatch  = new CountDownLatch(1);
    private String waitPath;
    private String currentMode;

    public DistributedLock() throws IOException, InterruptedException, KeeperException {
        //获取连接
         zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //监听到前一个节点已经下线
                //connectLatch 如果连上需要释放
                if(watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }
                //waitLatch 需要释放
                if(watchedEvent.getType() == Event.EventType.NodeDeleted&&watchedEvent.getPath().equals(waitPath)){
                    waitLatch.countDown();
                }
            }
        });
         //等待zk正常连接后,才往下走
         connectLatch.await();
        //判断根节点/locks是否存在
        Stat stat = zooKeeper.exists("/locks", false);
        if(stat == null) {
            //创建根节点
            zooKeeper.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
        }
    }
    //加锁
    public void zklock(){
        //创建对应的临时的带序号的节点
        try {
            currentMode = zooKeeper.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            //判断创建的节点是否是最小的节点,如果是获取到锁;不是,监听前一个序号最小的节点
            List<String> children = zooKeeper.getChildren("/locks", false);
            //如果children只有一个值,那就直接获取锁;如果有多个节点,需要判断谁最小
            if(children.size() == 1) {
                return;
            } else {
                //排序
                Collections.sort(children);
                //获取当前节点名称
                String thisNode = currentMode.substring("/locks/".length());
                //通过thisNode获取在children集合中的位置
                int index = children.indexOf(thisNode);
                if(index == -1) {
                    System.out.println("数据异常");
                }else if(index == 0) {
                    //就一个节点数据,就可以获取锁了
                    return;
                }else {
                    //需要监听 前一个节点变化
                    waitPath = "/locks/"+children.get(index-1);
                    zooKeeper.getData(waitPath,true,null);
                    //等待监听
                    waitLatch.await();
                    //接收到之后
                    return;
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
    public void unZklock(){
        //删除节点
        try {
            zooKeeper.delete(currentMode,-1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
}

测试

package com.atguigu.case2;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

public class DistributedLockTest {
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributedLock lock1 = new DistributedLock();
        DistributedLock lock2 = new DistributedLock();
        new Thread(new Runnable(){
            @Override
            public void run() {

                try {
                    lock1.zklock();;
                    System.out.println("线程1启动获取到锁");
                    Thread.sleep(5000);
                    lock1.unZklock();
                    System.out.println("线程1释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable(){
            @Override
            public void run() {

                try {
                    lock2.zklock();
                    System.out.println("线程2启动获取到锁");
                    Thread.sleep(5000);
                    lock2.unZklock();
                    System.out.println("线程2释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

 案例三:分布式锁案例框架方法

引入包

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>4.3.0</version>
        </dependency>
package com.atguigu.case3;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLockTest {
    public static void main(String[] args) {
        //创建分布式锁1
        InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
        //创建分布式锁2
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
        new Thread(new Runnable(){
            @Override
            public void run() {
                try {
                    lock1.acquire();
                    System.out.println("线程1获取到锁");

                    lock1.acquire();
                    System.out.println("线程1再次获取到锁");

                    lock1.release();
                    System.out.println("线程1释放锁");

                    lock1.release();
                    System.out.println("线程1再次释放锁");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable(){
            @Override
            public void run() {
                try {
                    lock2.acquire();
                    System.out.println("线程2获取到锁");

                    lock2.acquire();
                    System.out.println("线程2再次获取到锁");

                    lock2.release();
                    System.out.println("线程2释放锁");

                    lock2.release();
                    System.out.println("线程2再次释放锁");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    private static CuratorFramework getCuratorFramework() {
        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop100:2181,hadoop101:2181,hadoop102:2181")
                .connectionTimeoutMs(2000)
                .sessionTimeoutMs(2000)
                .retryPolicy(policy).build();
        client.start();
        System.out.println("zookeeper启动成功");
        return client;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1424571.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【大厂AI课学习笔记】1.3 人工智能产业发展(3)

1.3.1 供给侧 技术层面&#xff1a;从实验室走向大规模的商用。 数据层面&#xff1a;数据正式成为重要的生产要素。 市场&#xff1a;供需互促的正向市场环境建立。 资本&#xff1a;走出炒作泡沫&#xff0c;聚焦价值领域。 平台&#xff1a;大厂普遍开放生态。 MORE&am…

Java:搭建eladmin复习mvn、springboot、vue等

目录 1.源码平台后端&#xff1a; 2.源码平台前端&#xff1a; 3.操作系统&#xff1a;centos7.9 4.mysql:5.7.x 安装 5.redis:5.0.X 6.maven&#xff1a;3.8 7.java:1.8&#xff1a; 8.nodejs:16.x 9.通过mvn打包eladmin后端 10.npm打包前端项目进行部署 11.访问测试…

【GEE】基于GEE批量下载Landsat8 L2A数据(整幅)

之前发过一篇使用GEE下载Landsat8的文章&#xff0c;然后有很多小伙伴私信我各种问题&#xff0c;如L1C、L2数据代码怎么修改&#xff0c;如何镶嵌&#xff0c;如何去云、 如何裁剪等一系列问题。正好快过年了&#xff0c;手头的事也没有多少了&#xff0c;所以这两天整理了一下…

ElementUI Form:InputNumber 计数器

ElementUI安装与使用指南 InputNumber 计数器 点击下载learnelementuispringboot项目源码 效果图 el-radio.vue &#xff08;InputNumber 计数器&#xff09;页面效果图 项目里el-input-number.vue代码 <script> export default {name: el_input_number,data() {re…

Log4j2-29-log4j2 discard policy 极端情况下的丢弃策略 同步+异步配置的例子

Log4j2异步日志、同步日志和混合日志的配置详解 Log4j 2中记录日志的方式有同步日志和异步日志两种方式&#xff0c;其中异步日志又可分为使用AsyncAppender和使用AsyncLogger两种方式。 异步日志(性能最好&#xff0c;推荐使用) 异步日志情况下&#xff0c;增加 Disruptor …

如何用JAVA代码将视频号视频下载到本地

1、目前只掌握了&#xff0c;下载自己视频号后台的视频的方法&#xff1a; 登录视频号助手网页-点开视频-复制链接-&#xff08;I/O&#xff09;下载 代码如下&#xff1a; String videoUrl "xxx";String savePath "D:\\videoDownload\\video.mp4";tr…

实战Vue.js与MySQL:爱心商城项目开发指南

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

win10查看Nvidia显卡、cuda版本

通过cmd命令行查看 打开cmd命令行窗口&#xff0c;在命令行输入&#xff1a; nvidia-smi 即可看到相应的显卡信息&#xff0c;以及显卡支持的cuda版本。 如下图所示&#xff0c;可以看到显卡是"GeForce CTX 1650"&#xff0c;cuda版本是11.7

日本大带宽服务器速度和性能评测的关系

在互联网的快速发展中&#xff0c;大带宽服务器在提供高速、稳定的数据传输方面起着至关重要的作用。特别是在日本&#xff0c;由于其先进的网络基础设施和庞大的互联网用户群体&#xff0c;大带宽服务器的需求日益增长。那么&#xff0c;日本大带宽服务器的速度和性能评测有何…

山西电力市场日前价格预测【2024-02-01】

日前价格预测 预测说明&#xff1a; 如上图所示&#xff0c;预测明日&#xff08;2024-02-01&#xff09;山西电力市场全天平均日前电价为455.34元/MWh。其中&#xff0c;最高日前电价为687.90元/MWh&#xff0c;预计出现在18:30。最低日前电价为364.84元/MWh&#xff0c;预计…

【Java程序设计】【C00200】基于(JavaWeb+SSM)的在线网课管理系统(论文+PPT)

基于&#xff08;JavaWebSSM&#xff09;的在线网课管理系统&#xff08;论文PPT&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于ssm的在线网课管理系统 本系统分为管理员、教师、学生以及前台系统4个功能模块。 管理员&#xff1a;管理员进入主…

【SpringBoot系列】自动装配的魅力:Spring Boot vs 传统Spring

IT行业有哪些证书含金量高? 文章目录 IT行业有哪些证书含金量高?强烈推荐前言区别项目配置&#xff1a;依赖管理&#xff1a;内嵌服务器&#xff1a;开发体验&#xff1a; 实例Spring项目示例&#xff1a;Spring Boot项目示例&#xff1a; 总结强烈推荐专栏集锦写在最后 强烈…

头戴式耳机什么牌子性价比高?公认高性价比的头戴式耳机推荐

头戴式耳机作为现代音乐与声音体验的必备品&#xff0c;一直以来都备受消费者的关注&#xff0c;那么&#xff0c;在众多的品牌中&#xff0c;哪些头戴式耳机的性价比最高呢&#xff1f;本文将为你揭晓这个秘密&#xff0c;推荐一些公认的高性价比头戴式耳机&#xff0c;让你在…

【局部自动数据增强】YOCO:将图片一分为二,各自增强后拼合为一

【自动数据增强】YOCO&#xff1a;将图片一分为二&#xff0c;各自增强后拼合为一 核心思想好在哪里&#xff1f;切哪里、切几次&#xff1f;何时用&#xff1f; 总结 核心思想 论文&#xff1a;https://arxiv.org/pdf/2201.12078.pdf 代码&#xff1a;https://github.com/Ju…

Linux逻辑卷(LV)扩容

Linux逻辑卷&#xff08;LV&#xff09;扩容 1、准备物理磁盘&#xff08;分区和不分区都行&#xff09;&#xff0c;可以使用lsblk命令查看新增的磁盘&#xff0c;如下图sde就是我们新增磁盘&#xff0c;容量为600G。 2、将新磁盘变成物理卷&#xff08;PV&#xff09; pvcr…

windows平台使用tensorRT部署yolov5详细介绍,整个流程思路以及细节。

目录 Windows平台上使用tensorRT部署yolov5 前言&#xff1a; 环境&#xff1a; 1.为什么要部署&#xff1f; 2.那为什么部署可以解决这个问题&#xff1f;&#xff08;基于tensorRT&#xff09; 3.怎么部署&#xff08;只讨论tensorRT&#xff09; 3.0部署的流程 3.1怎…

Java动态修改用户Session实战-简单实例准备

锋哥原创的Java动态修改用户Session实战&#xff1a; Java动态修改用户Session实战课程_哔哩哔哩_bilibiliJava动态修改用户Session实战课程&#xff0c;管理员可以修改任意一个用户的session信息作者&#xff1a;java1234_小锋站点&#xff1a;www.java1234.vip喜欢的朋友点赞…

协程 Coroutine 到底是个啥?

看了很多博客&#xff0c;也看了些在线课堂的课程。大神们说的协程 Coroutine 的概念&#xff0c;一直含糊不清。今天自己动手做做实验理解下。 1. 代码如下 private const val i1 1000000000class MainViewModel : ViewModel() {companion object {private const val TAG …

MySQL数据库基础第一篇(SQL通用语法与分类)

文章目录 一、SQL通用语法二、SQL分类三、DDL语句四、DML语句1.案例代码2.读出结果 五、DQL语句1.DQL-基本查询2.DQL-条件查询3.DQL-聚合函数4.DQL-分组查询5.DQL-排序查询6.DQL-分页查询7.DQL语句-执行顺序1.案例代码2.读出结果 六、DCL语句1.DCL-管理用户2.DCL-权限控制1.案例…

C++ 数论相关题目:容斥原理。能被整除的数

给定一个整数 n 和 m 个不同的质数 p1,p2,…,pm 。 请你求出 1∼n 中能被 p1,p2,…,pm 中的至少一个数整除的整数有多少个。 输入格式 第一行包含整数 n 和 m 。 第二行包含 m 个质数。 输出格式 输出一个整数&#xff0c;表示满足条件的整数的个数。 数据范围 1≤m≤16 ,…