Zookeeper Java 开发,自定义分布式锁示例

news2024/11/28 2:47:39

文章目录

    • 一、概述
    • 二、导入依赖包
    • 三、创建锁的过程
      • 3.1 通过 create 创建节点信息
      • 3.2 AsyncCallback.StringCallback 回调函数
      • 3.3 AsyncCallback.Children2Callback 的回调函数
      • 3.4 Watcher 的回调函数
    • 四、完整示例
      • 4.1 完整分布式锁代码
      • 4.2 测试类

如果您还没有安装Zookeeper请看ZooKeeper 安装说明,Zookeeper 命令使用方法和数据说明,Zookeeper Java 开发入门。

一、概述

  • 情景:假设有10个客户端(分散的10台主机)要执行一个任务,这个任务某些过程需要保持原子性。那么我们就需要一个分布式锁。

  • 原理:通过在Zookeeper中创建序列节点来实现获得锁,删除节点来释放锁。其实质是一个按先来后到的排序过程,实现过程如下:

    • 客户端发起请求,创建锁序列节点(/lock/xxxxxxxx)

    • 获取所有锁节点,判断自己是否为最小节点

      • 如果自己是最小序列节点,则立即获得锁
      • 否则不能获得锁,但要监控前一个序列节点的状态
    • 获得锁的客户端开始执行任务。

    • 执行完任务后释放锁。

      • 由于后一个节点监控了前一个节点,当前一个节点删除时,后一个客户端会收到回调。

      • 在这个回调中再获取所有锁节点,判断自己是否为最小节点。

      • 以此类推,直到全部结束。

  • 流程如下

在这里插入图片描述

  • 如果您对没有做过 Zookeeper 开发,强烈建立先看 Zookeeper Java 开发入门。

二、导入依赖包

  • 在 pom.xml 文件中导入 Zookeeper 包,注意一般这个包的版本要和您安装的 Zookeeper 服务端版本一致。

    <dependency>
       <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.8.2</version>
    </dependency>
    

三、创建锁的过程

3.1 通过 create 创建节点信息

  • 通过 create 创建序列节点信息。他是异步方式,创建成功后会调用 AsyncCallback.StringCallback.processResult 回调函数。
    public void lock() throws InterruptedException, LockException {
        zooKeeper.create("/lock", "xxx".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, context);
        countDownLatch.await();

        if(StringUtils.isEmpty(this.lockNodePath)){
            throw new LockException("创建锁失败");
        }

        System.out.println(this.appName + " 获得锁");
    }

3.2 AsyncCallback.StringCallback 回调函数

  • 在 AsyncCallback.StringCallback 的回调函数中通过 getChildren 方法获取 ZooKeeper 锁节点下的所有节点信息。这个方法是异步的,调用成功后会调用 AsyncCallback.Children2Callback.processResult 回调函数。
    // AsyncCallback.StringCallback
    @Override
    public void processResult(int i, String s, Object o, String s1) {
        if(StringUtils.isEmpty(s1)){
            // 这里是创建锁失败的情况。
            this.countDownLatch.countDown();
            return;
        }
        System.out.println(this.appName + " create lock node="+s1);

        this.lockNodePath = s1;
        // 获取 ZooKeeper 锁节点下的所有节点信息,以此来判断我是不是第一个创建节点,如果就获得锁,否则监控前一个节点信息。
        zooKeeper.getChildren("/", false, this, context);
    }

3.3 AsyncCallback.Children2Callback 的回调函数

  • 在 AsyncCallback.Children2Callback 的回调函数判断我是不是第一个创建节点,如果就获得锁,否则监控前一个节点信息。监控前一个节点信息使用 exists 方法,这个方法设置了 Watcher 的 processResult 回调函数
    // AsyncCallback.Children2Callback
    @Override
    public void processResult(int i, String s, Object o, List<String> list, Stat stat) {

        Collections.sort(list);

//        for (String s1 : list) {
//            System.out.println("\t "+this.lockNodePath+" previous lock node="+s1);
//        }

        int index = list.indexOf(lockNodePath.substring(1));
        if(0 == index){
            // 如果我现在是第一个节点,则获得锁
            try {
                zooKeeper.setData("/", this.lockNodePath.getBytes(), -1);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.countDownLatch.countDown();
        }
        else {
            // 我不是第一个节点,监控前一个节点信息(等他删除后我就是可能是第一个了)
            String watchNodePath = "/" + list.get(index - 1);
            System.out.println("\t "+this.lockNodePath+" watch node:"+ watchNodePath);
            zooKeeper.exists(watchNodePath, this, new StatCallback() {
                @Override
                public void processResult(int i, String s, Object o, Stat stat) {

                }
            }, context);
        }
    }

3.4 Watcher 的回调函数

  • 在 Watcher 的回调函数,我们通过判断 watchedEvent.getType() 为 NodeDeleted 类型时,重新获取 ZooKeeper 锁节点下的所有节点信息,这使得消息回到了 “3.3”步,判断谁是第一个节点,然后获得得,完成整个流程。
    // Watcher
    @Override
    public void process(WatchedEvent watchedEvent) {
        switch (watchedEvent.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                zooKeeper.getChildren("/", false, this, context);
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
            case PersistentWatchRemoved:
                break;
        }
    }

四、完整示例

4.1 完整分布式锁代码

package top.yiqifu.study.p131;


import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ZookeeperLock implements Watcher, AsyncCallback.StringCallback,
        AsyncCallback.Children2Callback {

    private String appName;
    private ZooKeeper zooKeeper;

    private Object context;
    private String lockNodePath;
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public ZookeeperLock(String name, ZooKeeper zk){
        this.appName = name;
        this.zooKeeper = zk;
        this.context = this;
    }


    public void lock() throws InterruptedException, LockException {
        zooKeeper.create("/lock", "xxx".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, context);
        countDownLatch.await();

        if(StringUtils.isEmpty(this.lockNodePath)){
            throw new LockException("创建锁失败");
        }

        System.out.println(this.appName + " 获得锁");
    }

    public void unlock() throws KeeperException, InterruptedException, LockException {
        if(StringUtils.isEmpty(this.lockNodePath)){
            throw new LockException("没有获得锁,无法释放锁");
        }
        zooKeeper.delete(lockNodePath, -1);

        System.out.println(this.appName + " 释放锁");
    }


    // AsyncCallback.StringCallback
    @Override
    public void processResult(int i, String s, Object o, String s1) {
        if(StringUtils.isEmpty(s1)){
            // 这里是创建锁失败的情况。
            this.countDownLatch.countDown();
            return;
        }
        System.out.println(this.appName + " create lock node="+s1);

        this.lockNodePath = s1;
        // 获取 ZooKeeper 锁节点下的所有节点信息,以此来判断我是不是第一个创建节点,如果就获得锁,否则监控前一个节点信息。
        zooKeeper.getChildren("/", false, this, context);
    }

    // AsyncCallback.Children2Callback
    @Override
    public void processResult(int i, String s, Object o, List<String> list, Stat stat) {

        Collections.sort(list);

//        for (String s1 : list) {
//            System.out.println("\t "+this.lockNodePath+" previous lock node="+s1);
//        }

        int index = list.indexOf(lockNodePath.substring(1));
        if(0 == index){
            // 如果我现在是第一个节点,则获得锁
            try {
                zooKeeper.setData("/", this.lockNodePath.getBytes(), -1);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.countDownLatch.countDown();
        }
        else {
            // 我不是第一个节点,监控前一个节点信息(等他删除后我就是可能是第一个了)
            String watchNodePath = "/" + list.get(index - 1);
            System.out.println("\t "+this.lockNodePath+" watch node:"+ watchNodePath);
            zooKeeper.exists(watchNodePath, this, new StatCallback() {
                @Override
                public void processResult(int i, String s, Object o, Stat stat) {

                }
            }, context);
        }
    }

    // Watcher
    @Override
    public void process(WatchedEvent watchedEvent) {
        switch (watchedEvent.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                zooKeeper.getChildren("/", false, this, context);
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
            case PersistentWatchRemoved:
                break;
        }
    }



    public class LockException extends  Exception
    {
        public LockException(String message){
            super(message);
        }
    }
}

4.2 测试类

package top.yiqifu.study.p131;


import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class Test06_ZookeeperLock {

    public static void main(String[] args) {
        try {
            // 创建 ZooKeeper 对象
            final ZooKeeper zooKeeper = testCreateZookeeper();

            int clientCount = 10;
            final CountDownLatch countDownLatch = new CountDownLatch(clientCount);
            for (int i = 0; i < clientCount; i++) {
                new Thread(new Runnable(){
                    @Override
                    public void run() {
                        TestLock(zooKeeper);
                        countDownLatch.countDown();
                    }
                }).start();
            }

            countDownLatch.await();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    private static void  TestLock(ZooKeeper zooKeeper){
        try {
            String appName = Thread.currentThread().getName();
            ZookeeperLock zookeeperLock = new ZookeeperLock(appName, zooKeeper);
            
            // 加锁(获得分布式锁)
            zookeeperLock.lock();
            System.out.println(appName + " 执行任务");
            Thread.sleep(1000);
		   // 释放锁
            zookeeperLock.unlock();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (ZookeeperLock.LockException e) {
            e.printStackTrace();
        }
    }

    private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);

        // ZooKeeper 集群地址(没连接池的概念,是Session的概念)
        //String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181";
        String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181/aaa";
        // ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。
        Integer sessionTimeout = 3000;
        // ZooKeeper Session 级别 Watcher(Watch只发生在读方法上,如 get、exists)
        final ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    Event.KeeperState state = watchedEvent.getState();
                    Event.EventType type = watchedEvent.getType();
                    String path = watchedEvent.getPath();

                    switch (state) {
                        case Unknown:
                            break;
                        case Disconnected:
                            break;
                        case NoSyncConnected:
                            break;
                        case SyncConnected:
                            countDownLatch.countDown();
                            break;
                        case AuthFailed:
                            break;
                        case ConnectedReadOnly:
                            break;
                        case SaslAuthenticated:
                            break;
                        case Expired:
                            break;
                        case Closed:
                            break;
                    }
                    switch (type) {
                        case None:
                            break;
                        case NodeCreated:
                            break;
                        case NodeDeleted:
                            break;
                        case NodeDataChanged:
                            break;
                        case NodeChildrenChanged:
                            break;
                        case DataWatchRemoved:
                            break;
                        case ChildWatchRemoved:
                            break;
                        case PersistentWatchRemoved:
                            break;
                    }

                    System.out.println("Session watch state=" + state);
                    System.out.println("Session watch type=" + type);
                    System.out.println("Session watch path=" + path);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        countDownLatch.await();
        ZooKeeper.States state = zooKeeper.getState();
        switch (state) {
            case CONNECTING:
                break;
            case ASSOCIATING:
                break;
            case CONNECTED:
                break;
            case CONNECTEDREADONLY:
                break;
            case CLOSED:
                break;
            case AUTH_FAILED:
                break;
            case NOT_CONNECTED:
                break;
        }
        System.out.println("ZooKeeper state=" + state);

        return zooKeeper;
    }


}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1216365.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Shopee选品工具软件——知虾,助您轻松把握市场趋势

在如今竞争激烈的电商市场中&#xff0c;了解市场趋势和数据分析是成功的关键。对于Shopee虾皮平台上的商家来说&#xff0c;知虾是一款强大的选品工具软件&#xff0c;它提供了全面的数据分析服务&#xff0c;帮助商家快速了解大盘走势&#xff0c;并挖掘潜力行业类目。本文将…

[msg_msg] corCTF2021 -- fire_of_salvation

前言 msg_msg 是 kernel pwn 中经常用作堆喷的结构体. 其包含一个 0x30 大小的 header. 但 msg_msg 的威力远不如此, 利用 msg_msg 配合其他堆漏洞可以实现任意地址读写的功能. 程序分析 本题给了源码, 可以直接对着源码看. 并且题目给了编译配置文件, 所以可以直接编译一个…

C 语言指针和数组

C 语言指针和数组 在本教程中&#xff0c;您将了解C语言编程中数组与指针之间的关系。您还将学习使用指针访问数组元素。 在了解数组与指针之间的关系之前&#xff0c;请确保检查以下两个主体&#xff1a; [C 数组](C 语言数组-CSDN博客)[C 指针](C 语言指针-CSDN博客) 数组…

2023上海国际电力电工展盛大举行 规模创新高 与行业「升级、转型、融合」

由中国电力企业联合会、国家电网主办及雅式展览服务有限公司承办的「第三十一届上海国际电力设备及技术展览会 (EP Shanghai 2023)」从11月15日起至17日一连三天于上海新国际博览中心盛大举行&#xff0c;并首度增设专题子展「上海国际储能技术应用展览会」。本届展会以“升级、…

2023_“数维杯”问题B:棉秸秆热解的催化反应-详细解析含代码

题目翻译&#xff1a; 随着全球对可再生能源需求的不断增加&#xff0c;生物质能作为一种成熟的可再生能源得到了广泛的关注。棉花秸秆作为一种农业废弃物&#xff0c;因其丰富的纤维素、木质素等生物质成分而被视为重要的生物质资源。虽然棉花秸秆的热解可以产生各种形式的可…

jffs2文件系统(二)

本篇文章讲解一下如何制作jffs2文件系统&#xff0c;以及如何在linux下把jffs2作为根文件系统使用。 文件系统制作 制作工具&#xff1a;mtd_utils&#xff0c;可以自己安装 mkfs.jffs2 -o root-uclibc-jffs2 -r root-uclibc -e 0x10000 -s 0x1000 -n -l -X zlib --pad0x10000…

基于卷积神经网络的猫种类的识别

1.介绍 图像分类是计算机视觉中的一个关键任务&#xff0c;而猫种类识别作为一个有趣且实用的应用场景&#xff0c;通过卷积神经网络&#xff08;CNN&#xff09;的模型能够识别猫的不同品种。在这篇博客中&#xff0c;将详细介绍如何利用深度学习技术构建模型&#xff0c;从而…

gd32 USB HOST 接口

接口 CPU引脚 复用 DM PB14 USBHS_DM AF12 DP PB15 USBHS_DP AF12

互联网上门预约洗衣洗鞋店小程序;

拽牛科技干洗店洗鞋店软件&#xff0c;方便快捷&#xff0c;让你轻松洗衣。只需在线预约洗衣洗鞋服务&#xff0c;附近的门店立即上门取送&#xff0c;省心省力。轻松了解品牌线下门店&#xff0c;通过列表形式展示周围门店信息&#xff0c;自动选择最近门店为你服务。简单填写…

【Linux专题】SFTP 用户配置 ChrootDirectory

【赠送】IT技术视频教程&#xff0c;白拿不谢&#xff01;思科、华为、红帽、数据库、云计算等等https://xmws-it.blog.csdn.net/article/details/117297837?spm1001.2014.3001.5502 红帽认证 认证课程介绍&#xff1a;红帽RHCE9.0学什么内容&#xff0c;新版有什么变化-CSDN…

【带头学C++】----- 七、链表 ---- 7.1 链表的概述

目录 七、链表 7.1 链表的是什么&#xff1f; 7.2数组和链表的优点和缺点 7.3 链表概述 ​编辑 7.4 设计静态链表 7.4.1 定义一个结点&#xff08;结构体&#xff09; 7.4.2 使用头结点构建一个单向链表 七、链表 7.1 链表的是什么&#xff1f; C链表是一种数据结构&a…

如何构建风险矩阵?3大注意事项

风险矩阵法&#xff08;RMA&#xff09;是确定威胁优先级别的最有效工具之一&#xff0c;可以帮助项目团队识别和评估项目中的风险&#xff0c;帮助项目团队对风险进行排序&#xff0c;清晰地展示风险的可能性和严重性&#xff0c;为项目团队制定风险管理策略提供依据。 如果没…

SecureCRT\\FX:打造安全可靠的终端模拟器和FTP客户端

在现代的工作环境中&#xff0c;远程连接和文件传输是不可或缺的任务。而SecureCRT\\FX作为一款安全可靠的终端模拟器和FTP客户端&#xff0c;将帮助您高效管理远程连接和文件传输。 SecureCRT\\FX提供了强大的终端模拟功能&#xff0c;支持SSH、Telnet、RDP等多种协议&#x…

92.Linux的僵死进程以及处理方法

目录 1.什么是僵死进程&#xff1f; 2.代码演示僵死进程 3.解决办法 1.什么是僵死进程&#xff1f; 僵死进程是指一个子进程在父进程之前结束&#xff0c;但父进程没有正确地等待&#xff08;使用 wait 或 waitpid 等系统调用&#xff09;来获取子进程的退出状态。当一个进…

流程图怎么画,用什么软件做?一文弄懂流程图:从流程图的定义、流程图各种图形的含义到流程图制作,一步到位!

流程图&#xff0c;也被称为过程流程图或流程图&#xff0c;是一种表达工作或过程中步骤之间逻辑关系的可视化工具。它主要由不同形状和符号的框以及指向这些框的箭头组成。每个形状或符号都有特定的含义&#xff0c;它们代表了工作流程中的一种特定类型的步骤或动作。 使用流…

视频集中存储/云存储平台EasyCVR级联下级平台的详细步骤

安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安…

JVM bash:jmap:未找到命令 解决

如果我们在使用JVM的jmap命令时遇到了"bash: jmap: 未找到命令"的错误&#xff0c;这可能是因为jmap命令没有在系统的可执行路径中。 要解决这个问题&#xff0c;可以尝试以下几种方法&#xff1a; 1. 检查Java安装&#xff1a;确保您已正确安装了Java Development …

【Android】导入三方jar包/系统的framework.jar

1.Android.mk导包 1).jar包位置 与res和src同一级的libs中(没有就新建) 2).Android.mk文件 LOCAL_STATIC_ANDROID_LIBRARIES&#xff1a;android静态库&#xff0c;经常用于一些support的导包 LOCAL_JAVA_LIBRARIES&#xff1a;依赖的java库&#xff0c;一般为系统的jar…

[修改Linux下ssh端口号]解决无法修改sshd_config无法修改

前言&#xff1a;写本文的前因是本人的阿里云服务器经常被黑客暴力破解ssh的22端口号。再网络上搜索解决都是说使用root权限进行修改&#xff0c;但本人在root下也无法成功进行修改sshd_config文件。所以在大量搜索下终于找到了解决方案&#xff0c;现在分享出来给有需要的人使…