Zookeeper 分布式锁

news2024/11/16 18:03:35

优质博文:IT-BLOG-CN

一、简介

随着公司业务的发展,单机应用已经无法支撑现有的用户量,之前采用synchronizedLock锁已经无法满足分布式系统的要求。我们应用程序目前都会运行120台,节假日会扩容至240台,属于多JVM环境。所以需要搭建一套独立的zk集群或者Redisson集群提供分布式锁的功能。

二、ZK分布式锁实现

ZK的特点是:文件系统 + 通知机制

分布式的思想:客户端获取锁时会创建一个临时文件,使用完锁之后删除该临时文件。

客户端向ZK尝试获取锁,ZK会在/lock节点下创建一个临时顺序节点,假设有三个客户端向ZK获取锁,会同时在ZK/lock节点下根据请求的顺序创建3个临时顺序节点:/lock/xxx1/lock/xxx2/lock/xxx3,这个顺序节点由ZK内部自行维护一个节点序号。

案例

【1】第一个客户端向客户端向ZK尝试获取锁,ZK内部会创建一个顺序节点,我们通过Curator框架会得到如下一个临时文件:最后的数字1就是当前客户端请求的顺序。随后,客户端A会通过getChildern()方法查找lock下的所有节点,这个时候会拿到一个顺序集合,当客户端A发现自己创建的顺序节点排在第一位,返回true表示加锁成功,开始业务处理。


【2】第二个客户端B向ZK尝试获取锁,ZK内部同样会创建一个顺序节点xxx2,随后,客户端B通过getChildern()方法查找lock下的所有节点,这个时候会拿到一个顺序集合。

[
    "_d_0asdf9sd-3df6-ak84-blsc9832ld0x-lock-0000000001",
    "_d_0asdf9sd-3df6-ak84-blsc9832ld0x-lock-0000000002"
]

客户端B判断自己创建的临时顺序节点是不是最小序号节点,发现不是加锁失败。加锁失败后,客户端B就会通过watcher监控上一个顺序节点。


【3】客户端A处理完逻辑后释放锁或者session超时后会删除自己创建的临时节点ZK监听器会收到通知客户端B该节点已删除,也就是说客户端A已经释放了锁。客户端B会重新尝试获取锁,重新获取临时节点集合,并检查自己的临时文件是否为最小的序号。是则加锁成功,执行业务逻辑代码。

如果服务器宕机,ZK会自动删除对应的顺序节点

三、Curator框架

通过Curator框架获取和释放zk分布式锁的代码如下:

// 定义锁节点名称
InterProcessMutex lock = InterProcessMutex(client, "/lock")

// 加锁
lock.acquire();

// 释放锁
lock.release();

源码分析

Curator中,尝试获取锁的具体实现在LockInternals.attemptLock方法:

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
    final long startMillis = System.currentTimeMillis();
    final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
    final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
    int retryCount = 0;
    String ourPath = null;
    boolean hasTheLock = false;
    boolean isDone = false;
    while ( !isDone ) {
        isDone = true;

        try {
            // 创建临时顺序节点,并返回创建节点的路径
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            // 获取所有临时节点,并判断自己创建的临时节点是否为最小序号
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        } catch ( KeeperException.NoNodeException e ) {
            // 在找不到锁节点时由StandardLockInternalsDriver引发 
            // 可能发生在会话过期等情况下。因此,如果重试允许,只需重试 
            if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) {
                isDone = false;
            } else {
                throw e;
            }
        }
    }
 
    if ( hasTheLock ) {
        return ourPath;
    }
 
    return null;
}

进入LockInternals.internalLockLoop方法,通过getChildren()获取到所有顺序节点,判断当前创建的节点是否是最小序号节点,如果是则获取锁成功,否则获取锁失败;如果获取锁失败,则通过watcher监听前一个顺序节点的节点变化,如果收到watcher监听回调,则再次进入循环,通过 getChildren()重新判断是否能够获取到锁:

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
    boolean haveTheLock = false;
    boolean doDelete = false;
    try {
        ...
 
        while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) {
            // 得到排序好的临时顺序节点列表
            List<String> children = getSortedChildren();
            String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
 
            // 判断是否能够成功获取锁,在获取锁失败的情况下,会同时返回需要watch的前一个顺序节点路径
            PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            if ( predicateResults.getsTheLock() ) {
                haveTheLock = true;
            } else {
                // 获取锁失败,开始监听前一个顺序节点的节点变化,并等待超时或者watcher监听回调
                String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
 
                synchronized(this) {
                    try {
                        // 使用getData()而不是exists()可以避免留下不必要的观察程序,这是一种资源泄漏
                        client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                        if ( millisToWait != null ) {
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            if ( millisToWait <= 0 ) {
                                doDelete = true;    // 超时-删除我们的节点
                                break;
                            }
 
                            // 等待超时或者收到watcher回调,如果收到回调,则会再次进入循环判断是否能够获取到锁
                            wait(millisToWait);
                        } else {
                            // 没有传递超时时间的情况下,会一直等待直到watcher回调或者触发异常
                            wait();
                        }
                    } catch ( KeeperException.NoNodeException e ) {
                        //  它已被删除(即锁定已释放)。尝试再次获取 
                    }
                }
            }
        }
    } catch ( Exception e ) {
        ThreadUtils.checkInterrupted(e);
        doDelete = true;
        throw e;
    } finally {
        if ( doDelete ) {
            deleteOurPath(ourPath);
        }
    }
    return haveTheLock;
}

StandardLockInternalsDriver.getsTheLock()方法中判断是否能够获取到锁:

public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
    // 获取到当前客户端创建的节点在所有顺序节点中的index
    int ourIndex = children.indexOf(sequenceNodeName);
    validateOurIndex(sequenceNodeName, ourIndex);
 
    // 可重入锁的场景下,maxLeases固定为1,所以只有当ourIndex==0时能够获取到锁(当前节点是第一个顺序节点)
    boolean getsTheLock = ourIndex < maxLeases;
    // 判断是否能获取到锁,如果获取不到,则取到前一个顺序节点的路径
    String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
 
    return new PredicateResults(pathToWatch, getsTheLock);
}

watcher收到监听回调时,通过LockInternals.notifyFromWatcher方法唤醒正在wait的线程:

private final Watcher watcher = new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        notifyFromWatcher();
    }
};
...
private synchronized void notifyFromWatcher() {
    notifyAll();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/711153.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

领域事件驱动(二)聚合与聚合根的了解

上一章对值对象以及实体进行了一些简单的讲解&#xff1a; 聚合 聚合&#xff1a;我们把一些关联性极强、生命周期一致的实体、值对象放到一个聚合里。 聚合有一个聚合根和上下文边界&#xff0c;这个边界根据业务单一职责和高内聚原则&#xff0c;定义了聚合内部应该包含哪…

U-Boot移植 - 2_环境搭建和u-boot烧录启动

文章目录 1. 编译环境搭建1.1 交叉编译器下载1.2 交叉编译器安装 2. 编译原厂uboot3. 烧录开发板3.1 烧录到SD卡3.2 启动开发板 1. 编译环境搭建 1.1 交叉编译器下载 嵌入式Linux开发&#xff0c;程序编译通常在电脑端的Linux&#xff08;如虚拟机中的Ubuntu)下进行编译&…

阿里云ECS部署

nginx 安装nginx # 查看dnf版本 dnf --version# 查找是否是否安装 dnf search nginx# 安装nginx dnf install nginx# 启动nginx systemctl start nginx# 查看nginx运行状态 systemctl status nginx# 相当于开机自启&#xff08;重启服务器&#xff0c;nginx自动启动&#xff…

GoLand下载、安装

一、Goland下载 官方最新版本下载地址&#xff1a; Download GoLand: A Go IDE with extended support for JavaScript, TypeScript, and databases 其他版本下载&#xff1a; Other Versions - GoLand 二、安装过程 1.下载好goland-2021.1.1安装包后&#xff0c;双击运行安装包…

【ARIMA-WOA-CNN-LSTM】合差分自回归移动平均方法-鲸鱼优化-卷积神经网络-长短期记忆神经网络研究(Python代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

Redis主从复制模式4

哨兵模式 有个哨兵一直在巡逻&#xff0c;突然发现。老大挂了&#xff0c;小弟们会自动投票&#xff0c;从众小弟中选出新的老大。即自动版的谋权篡位。我们把这个过程称为哨兵模式 设置哨兵模式语法格式&#xff1a; sentinel monitor 被监控主机名/IP Redis服务端口 票数 关闭…

AutoCV第十一课:DL基础

目录 DL基础前言1. BP训练mnist2. 权重初始化理论分析总结 DL基础 前言 手写AI推出的全新保姆级从零手写自动驾驶CV课程&#xff0c;链接。记录下个人学习笔记&#xff0c;仅供自己参考。 本次课程我们来了解下 BP 反向传播和学习权重初始化相关知识 课程大纲可看下面的思维导…

java新特性stream

stream Java 8 是一个非常成功的版本&#xff0c;这个版本新增的Stream&#xff0c;配合同版本出现的 Lambda &#xff0c;给我们操作集合&#xff08;Collection&#xff09;提供了极大的便利。 Stream将要处理的元素集合看作一种流&#xff0c;在流的过程中&#xff0c;借助…

PMP常用英文术语缩写总结(文字版+表格版+图片版)

PMP常用英文术语缩写总结&#xff08;文字版表格版图片版&#xff09; 文字版 PMBOK Project Management Body of Knowledge 项目管理知识体系 PMI Project Management Institute 项目管理协会 PMO Project Management Office 项目管理办公室 PMIS Project Management Inf…

Spring Boot 中的服务注册是什么,原理,如何使用

Spring Boot 中的服务注册是什么&#xff0c;原理&#xff0c;如何使用 Spring Boot 是一个非常流行的 Java 后端框架&#xff0c;它提供了许多便捷的功能和工具&#xff0c;使得开发者可以更加高效地开发微服务应用。其中&#xff0c;服务注册是 Spring Boot 微服务架构中非常…

代码源 线段树模板

线段树1 思路&#xff1a; 我们需要维护的东西是序列的最小值和最小值个数 这道题没有修改操作&#xff0c;因此不考虑修改 然后考虑Pushup 最小值很简单&#xff0c;直接取min 最小值个数怎么维护呢&#xff1f;考虑这个区间需要维护的值如何从左右两个区间获得 如果左右…

四.流程控制语句

1、条件语句 Go 编程语言中 if 条件语句的语法如下&#xff1a; 1、基本形式 if 布尔表达式 { /* 在布尔表达式为 true 时执行 */ } If 在布尔表达式为 true 时&#xff0c;其后紧跟的语句块执行&#xff0c;如果false 则不执行。 package main import "fmt" fu…

Spring MVC是什么?详解它的组件、请求流程及注解

作者&#xff1a;Insist-- 个人主页&#xff1a;insist--个人主页 作者会持续更新网络知识和python基础知识&#xff0c;期待你的关注 前言 本文将讲解Spring MVC是什么&#xff0c;它的优缺点与九大组件&#xff0c;以及它的请求流程与常用的注解。 目录 一、Spring MVC是什…

计算机网络 - http协议 与 https协议(2)

前言 本篇介绍了构造http请求的的五种方式&#xff0c;简单的使用postman构造http请求&#xff0c;进一步了解https, 学习https的加密过程&#xff0c;了解对称密钥与非对称密钥对于加密是如何进行的&#xff0c;如有错误&#xff0c;请在评论区指正&#xff0c;让我们一起交流…

【数据科学和可视化】反思十年数据科学和可视化工具的未来

数据科学在过去十年中呈爆炸式增长&#xff0c;改变了我们开展业务的方式&#xff0c;并让下一代年轻人为未来的工作做好准备。但是这种快速增长伴随着对数据科学工作的不断发展的理解&#xff0c;这导致我们在如何使用数据科学从我们的大量数据中获得可操作的见解方面存在很多…

vue3实现一个简单的数字滚动效果

一、实现数字按步长递增的效果 1.实现思路 将这个组件封装起来&#xff0c;需要外部引用的文件传递两个值&#xff1a;指定数值 num 和 滚动持续时长 duration。首先设置一个增量 step&#xff0c;让数字按照这个增量来进行递增。然后设置一个定时器 setInterval&#xff0c;…

Flink集群部署总结

集群部署方式 Flink有两种部署方式&#xff0c;Standalone和Flink on Yarn集群部署方式。 Flink集群架构 Flink分布式架构是常见的主从结构&#xff0c;由JobManager和TaskManager组成。JobManager是大脑&#xff0c;负责接收、协调、分发Task到各个TaskManager&#xff0c;也…

靶场搭建——搭建pikachu靶场

搭建pikachu靶场 搭建pikachu靶场1、win11本机搭建步骤2、虚拟机win2012搭建步骤 我所碰见的问题以及解决方式&#xff1a; 搭建pikachu靶场 这里我所运用到的材料有&#xff1a;首先我最终是在虚拟机中环境为win2012和主机都搭建完成。 &#xff08;一个即可&#xff09; Ph…

在各数据库中使用 MERGE 实现插入避重 SQL

MERGE实现插入避重操作 前言 MERGE是一种在数据库管理系统中用于合并&#xff08;插入、更新或删除&#xff09;数据的SQL语句。它允许根据指定的条件将数据从一个表合并到另一个表中&#xff0c;同时避免重复插入或更新数据。 MERGE语句通常由以下几个关键字和子句组成&…