Redisson分布式锁-源码分析

news2024/11/17 9:41:13

Redisson分布式锁整体流程图

1677219798772.jpg

Redisson分布式锁源码流程图

Redisson分布式锁源码解析

获取分布式锁lock

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    //获取当前线程ID
    long threadId = Thread.currentThread().getId();
    /*
    *
    * 尝试获取分布式锁
    *   a.如果获取到锁:返回null
    *   b.如果没有获取到锁:返回当前分布式锁的剩余的过期时间
    */
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }
    //ttl不为null说明锁被其他线程占用,没有获取到锁。订阅解锁消息
    CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
    pubSub.timeout(future);
    RedissonLockEntry entry;
    //订阅解锁消息:如果分布式锁未进行解锁(pub解锁消息),当前线程进入阻塞状态。当接收到分布式锁的pub消息,当前线程被唤醒继续执行
    if (interruptibly) {
        entry = commandExecutor.getInterrupted(future);
    } else {
        entry = commandExecutor.get(future);
    }

    try {
        while (true) {
            //当接收到分布式锁的pub消息,当前线程被唤醒继续执行,继续尝试获取锁
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // lock acquired
            //获取成功跳出循环
            if (ttl == null) {
                break;
            }

            // waiting for message
            if (ttl >= 0) {
                try {
                    //阻塞ttl毫秒后继续执行
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    //线程被打断,直接跳出循环
                    if (interruptibly) {
                        throw e;
                    }
                    //线程被打断,但是没有设置打断跳出循环,则继续阻塞ttl毫秒后继续执行
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    entry.getLatch().acquire();
                } else {
                    entry.getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        unsubscribe(entry, threadId);
    }
    //        get(lockAsync(leaseTime, unit));
}

详细步骤如下:

  1. 获取当前获取锁的线程ID。
  2. 调用tryAcquire方法尝试获取锁。
    1. tryAcquire方法返回null,说明获取到了锁。
    2. tryAcquire方法返回不是null值,说明没有获取到了锁,返回的Long值指的是其他线程占用该分布式锁的过期时间,单位为毫秒。
  3. 未获取到锁的线程订阅(sub)解锁(pub)消息,如果分布式锁未进行解锁(pub解锁消息),当前线程进入阻塞状态。当接收到分布式锁的pub消息,当前线程被唤醒继续执行,进入while循环调用tryAcquire方法继续争抢分布式锁。
  4. while中:
    1. 抢到了分布式锁,则跳出循环,并执行finally语句块的取消订阅解锁消息
    2. 如果没有抢到分布式锁,则执行 entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);方法阻塞分布式锁剩余时间ttl毫秒后,继续while循环争取分布式锁,直到抢到分布式锁。

尝试获取分布式锁tryAcquire

尝试获取分布式锁,获取到分布式锁后,开启一个开门狗,为分布式锁续期。为获取到分布式锁,返回分布式锁剩余的过期时间,毫秒为单位。

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    //获取分布式锁
    //返回null则说明获取到了分布式锁
    //返回不为null说明没有获取到分布式锁,返回的是分布式锁的剩余失效时间
    if (leaseTime > 0) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    //解锁的
    CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
    ttlRemainingFuture = new CompletableFutureWrapper<>(s);
    //如果获取到分布式锁,则使用看门狗进行锁的续期操作。默认过期时间是30秒,看门狗续期的时间间隔是过期时间的三分之一。
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // lock acquired
        if (ttlRemaining == null) {
            if (leaseTime > 0) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

详细步骤如下:

  1. 调用tryLockInnerAsync方法执行Lua脚本获取锁,获取失败返回锁的过期时间。获取成功返回null
  2. 获取分布式锁成功开启一个开门狗,进行锁的续期操作。默认过期时间是30秒,看门狗续期的时间间隔是过期时间的三分之一。
  3. 未获取到锁则返回锁的过期时间,单位毫秒。

获取分布式锁的lua脚本

/**
 * 执行Lua脚本获取锁
 * 1.key是否存在
 * 2.重入锁+1
 * 3.重置锁的过期时间(默认30秒)
 */
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            "if ((redis.call('exists', KEYS[1]) == 0) " +
                        "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
                "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

详细步骤如下

  1. key是否存在
  2. 重入锁+1
  3. 重置锁的过期时间(默认30秒)

分布式锁续期

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    
    Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            //执行分布式锁续期的lua脚本
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock {} expiration", getRawName(), e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                
                if (res) {
                    // reschedule itself
                    //自动续期
                    renewExpiration();
                } else {
                    //取消分布式锁续期
                    cancelExpirationRenewal(null);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}

分布式锁续期lua脚本

protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/687736.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

参与 2023 第二季度官方 Flutter 开发者调查

Flutter 3.10 已经正式发布&#xff0c;每个季度一次的 Flutter 开发者调查也来啦&#xff01;邀请社区的各位成员们填写&#xff1a; 调研旨在了解你对 Flutter 的满意程度以及对其各个子系统的反馈。你的意见将对我们改进 Flutter 的功能和性能产生重要影响。 在这次调研中&a…

Linux——软硬链接的理解

目录 那什么是链接&#xff1f; 链接命令的生成&#xff1a; 实验案例&#xff1a; 硬链接概念&#xff1a; 软链接概念&#xff1a; 情况1&#xff1a;删除myfile.txt&#xff1a; 情况2&#xff1a;重新创建一个新的myfile.txt文件&#xff1a; 软链接作用&#xff1…

推特引流:社交引流的技巧与方法

推特是一个广泛使用的社交媒体平台&#xff0c;可以用于引流和推广您的品牌、产品或服务。以下是一些社交引流的技巧和方法&#xff0c;可以帮助您在推特上获得更多的关注和流量&#xff1a; 优化个人资料&#xff1a;确保您的推特个人资料完整并具有吸引力。包括一个清晰的头…

yolov8-02 训练自己的数据集

1. 准备数据集 数据集格式跟yolov5一样&#xff0c;关于如何准备数据集可见之前的文章。 2. 创建 mydata.yaml 格式参考coco128.yaml&#xff0c;主要是 train / validate文件的存放路径&#xff0c;可以是同一个。 在ultralytics-main/ultralytics/datasets中&#xff0c;…

【Linux】ubuntu20.04安装ansys2023r1教程--超详细

一、安装包及其和谐文件 双击挂载 二、在ubuntu上安装依赖项 执行命令sudo apt install build-essential xterm libmotif-dev libxtst-dev libxt-dev libzip-dev libxmu-dev tcl tk lsb csh xfonts-75dpi xfonts-100dpi wine 弹出一个提示&#xff0c;需要去下载一个171MB的压…

Simulink 中基于 FPGA 的波束成形:算法设计(附源码)

一、前言 本示例显示了在 Simulink中开发适用于在硬件&#xff08;如现场可编程门阵列 &#xff08;FPGA&#xff09;&#xff09;上实现的波束成形器的工作流程的前半部分。它还演示如何将实现模型的结果与行为模型的结果进行比较。 示例 Simulink 中基于 FPGA 的波束成形&…

shell 数组 ${array[@]} ${array[*]}的使用及区别

数组定义 shell中用括号来表示数组&#xff0c;数组元素间使用空格隔开。 例如&#xff1a; a(1 2 3 4) 表示a数组且有元素为1,2,3,4 也可单个元素逐步来赋值 b[1]"a" b[2]"b" b[3]"c" echo ${b[]} # a b c 关联数组 定义关联数组&#xf…

论文翻译:Segment Anything

论文地址&#xff1a;https://arxiv.org/abs/2304.02643 代码地址&#xff1a;https://github.com/facebookresearch/segment-anything 数据集地址&#xff1a;https://ai.facebook.com/datasets/segment-anything/ “Segment Anything"项目旨在通过引入新的任务、数据集…

决定AI大模型胜负的关键:解读数据在未来竞争中的角色

随着人工智能的迅猛发展&#xff0c;高质量数据的重要性已愈发明显。以大型语言模型为例&#xff0c;近年来的飞跃式进展在很大程度上依赖于高质量和丰富的训练数据集。相比于GPT-2&#xff0c;GPT-3在模型架构上的改变微乎其微&#xff0c;更大的精力是投入到了收集更大、更高…

声卡设备无法正常工作或初始化的原因和解决方法

先来一个小科普&#xff0c;声卡设备是电脑中负责处理音频信号的硬件部件&#xff0c;它需要与相应的声卡驱动程序配合使用&#xff0c;才能让电脑发出或录制声音。 不过&#xff0c;自带声卡的设备或是自行匹配的声卡设备&#xff0c;也经常出现声卡设备无法正常工作或初始化…

通过Jenkins实现Unity多平台自动打包以及相关问题解决

简介 通过本文可以了解到如何在windows和mac上部署Jenkins。并且通过Jenkins实现Unity在IOS,安卓和PC等多平台自动打包的功能&#xff0c;并且可以将打包结果通过飞书机器人同步到飞书群内。优化工作流&#xff0c;提高团队的开发效率。文末记录了实际使用Jenkins时遇到的各种问…

Leetcode44 通配符匹配

给你一个输入字符串 (s) 和一个字符模式 (p) &#xff0c;请你实现一个支持 ? 和 * 匹配规则的通配符匹配&#xff1a; ? 可以匹配任何单个字符。 * 可以匹配任意字符序列&#xff08;包括空字符序列&#xff09;。 判定匹配成功的充要条件是&#xff1a;字符模式必须能够 完…

真心靠谱 Ubuntu18.04 换源 国内阿里云私服

本篇 blog 真心靠谱 1、备份原来的默认源 cp /etc/apt/sources.list{,.bak} 2、换阿里云的源&#xff08;需要稍作修改&#xff09; 直接使用阿里云的会报错 https://developer.aliyun.com/mirror/ubuntu 以上报错&#xff1a;是https证书问题&#xff0c;网上有人说安装证…

基于改进ISODATA算法的负荷场景曲线聚类MATLAB程序

参考文献&#xff1a; 基于机器学习的短期电力负荷预测和负荷曲线聚类研究_张辰睿&#xff08;硕士论文&#xff09; 参考其第三章 主要内容&#xff1a; 主要包含四种聚类算法&#xff0c;K-means聚类、ISODATA聚类、L-ISODATA聚类及K-L-ISODATA聚类&#xff0c;并且包含了…

JSch登录sftp时发现需要Kerberos身份验证

本问记录使用JSch登录sftps时遇到的Kerberos验证问题并记录了解决方法 项目场景&#xff1a; 项目开发中使用了SFTP&#xff0c;debug调试程序时发现了每次都需要手动输入 Kerberos的口令信息。这就很奇怪了难道每次连接SFTP时候都需要手动输入吗&#xff1f; 日志如下&#x…

RPC核心原理详解

什么是RPC&#xff1f; RPC的全称是Remote Procedure Call&#xff0c;即远程过程调用。简单解读字面上的意思&#xff0c;远程肯定是指要跨机器而非本机&#xff0c;所以需要用到网络编程才能实现&#xff0c;但是不是只要通过网络通信访问到另一台机器的应用程序&#xff0c…

基于Dubbo分布式学校信息管理系统设计与实现

一、引言 1.1 课题背景 随着时代的发展与进步,计算机网络也随之日益完善,渐渐覆盖了我们生活的各个方面。在信息化和数字化的时代背景下,使用计算机管理学校信息来提升教育工作的质量和效率,是大势所趋,所以近年来,随着网络技术的不断发展,使用信息管理系统的学校越来…

云原生时代数据治理的变革与创新

随着数字化进程的深入&#xff0c;企业对数据的依赖日益加深&#xff0c;数据资源的重要性愈发凸显。如何管好、用好数据&#xff0c;做好数据治理工作&#xff0c;发挥数据资源价值&#xff0c;成为企业提质增效过程中的重要议题。 在本次直播中&#xff0c;我们介绍了数据治…

leetcode:191. 位1的个数

难度&#xff1a;简单 编写一个函数&#xff0c;输入是一个无符号整数&#xff08;以二进制串的形式&#xff09;&#xff0c;返回其二进制表达式中数字位数为 1 的个数&#xff08;也被称为汉明重量&#xff09;。 提示&#xff1a; 请注意&#xff0c;在某些语言&#xff08;…

qt 最小文件系统 交叉编译qt源码

busybox qt源码下载后&#xff0c;需要交叉编译&#xff0c;在开发板上生成相应的库&#xff0c;才能在开发板上使用 我用qt制作了一个计时器&#xff0c;有相应的按钮功能。在windows上我大概知道鼠标点击按钮能够触发相应事件。把该程序移植到linux开发板上&#xff0c;开发…