Redisson源码-单线程加解锁流程

news2024/11/15 20:01:54

Redisson源码-单线程加解锁流程

以下源码分析基于redisson-3.17.6版本,不同版本源码会有些许不同需注意。

		 <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.17.6</version>
        </dependency>

当我们调用Redisson.lock()并且不设置锁时间时,我们进入RedissonLock的lock方法。

    public void lock() {
        try {
        	// -1L为锁时间,表示不限时
            this.lock(-1L, (TimeUnit)null, false);
        } catch (InterruptedException var2) {
            throw new IllegalStateException();
        }
    }
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
		// 获取当前线程id
        long threadId = Thread.currentThread().getId();
        // 尝试获取锁
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        // 获取到锁直接返回
        if (ttl == null) {
            return;
        }
		// 订阅锁消息:当锁被释放的时候,会通过publish发布一条消息,通知其它等待这个锁的线程,锁已经释放。
        CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
        pubSub.timeout(future);
        RedissonLockEntry entry;
        if (interruptibly) {
            entry = commandExecutor.getInterrupted(future);
        } else {
            entry = commandExecutor.get(future);
        }

        try {
        	// 不停尝试获取锁
            while (true) {
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                // 如果锁的过期时间>=0
                if (ttl >= 0) {
                    try {
                    	// 等待超时时间过去
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                 // 锁过期时间<0,代表锁的超时时间未设置
                } else {
                    if (interruptibly) {
                        entry.getLatch().acquire();
                    } else {
                    	// 无限等待直至获取锁
                        entry.getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(entry, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

我们按照代码逻辑先看一下尝试获取锁的代码

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Long> ttlRemainingFuture;
        if (leaseTime > 0) {
        	// 指定了超时时间
            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
        	// 未指定超时时间
            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
            // lock acquired
            // ttlRemaining为null, 本质就是加锁的LUA脚本中返回nil,表示获取锁成功
            if (ttlRemaining == null) {
                if (leaseTime > 0) {
                	// 如果设置了超时时间,则更新internalLockLeaseTime为指定的超时时间,并且不会启动看门狗
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                	// 自动续期实现,开启看门狗机制
                    scheduleExpirationRenewal(threadId);
                }
            }
            return ttlRemaining;
        });
        return new CompletableFutureWrapper<>(f);
    }

tryLockInnerAsync方法里的代码是加锁的核心代码之一:

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

先介绍一下lua脚本的中的参数:
KEYS[1]:锁住的对象(锁住的key),相当于下图的HASH
ARGV[1]:锁的过期时间
ARGV[2]:UUID+当前线程id,相当于下图的key
nil:相当于null

初次获取锁:
在这里插入图片描述
锁重入:
在这里插入图片描述

接下来我们详细看一下lua脚本的逻辑:

 		 // 通过exists指令判断需要加锁的key是否存在,如果不存在,说明还没被加锁,可以直接进行加锁
		"if (redis.call('exists', KEYS[1]) == 0) then " +
        // 通过hincrby指令往redis中插入一个哈希结构的数据,key[1]=加锁key   ARGV[2]=uuid+当前线程ID  1=锁的重入次数
        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
        // 通过pexpire指令设置锁的过期时间:ARGV[1]=锁的过期时间
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        // 返回nil, 表示加锁成功:nil=null
        "return nil; " +
        "end; " +

        // 如下是可重入锁的逻辑
        // 通过hexists指令判断当前的锁是不是自己的,只有是自己的锁时,才支持可重入
        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
        // 通过hincrby指令更新hash结构的数据(锁结构数据),将value对应的可重入次数加一
        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
        // 通过pexpire指令设置锁的过期时间
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        // 返回nil, 表示加锁成功
        "return nil; " +
        "end; " +

        // 如果当前已经有人获取了锁,并且这个锁不是自己的,那么将会执行pttl指令,返回当前锁剩余的过期时间
        "return redis.call('pttl', KEYS[1]);"

从上面我们可以看出,redisson加锁的本质是通过执行lua脚本,返回nil(相当于null)或锁的剩余过期时间。如果返回并且未设置过期时间则开启看门狗机制。

接下来我们看下开启看门狗机制的代码

protected void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        // 将当前锁的名称和 ExpirationEntry 对象放入 EXPIRATION_RENEWAL_MAP中,并返回之前关联的对象(如果存在)。
        // 本质就是检查当前锁是否已开启自动续期。
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        // 已开启
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        // 未开启
        } else {
            entry.addThreadId(threadId);
            try {
            	// 执行自动续期机制
                renewExpiration();
            } finally {
                if (Thread.currentThread().isInterrupted()) {
                	// 如果线程中断则解除自动续期机制避免死锁
                    cancelExpirationRenewal(threadId);
                }
            }
        }
    }
private void renewExpiration() {
		// 获取当前锁名称的 ExpirationEntry 对象。
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        // 构建Timeout任务去执行锁续期,本质是调用了netty框架中的newTimeout方法,相当于一个延迟定时任务。
        // 相当于每隔 过期时间/3 (默认10秒)毫秒,递归调用renewExpiration方法去执行锁续期直至锁被释放。
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
            	// 二次获取当前锁名称的 ExpirationEntry 对象。
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                // 获取 ExpirationEntry 对象中的第一个线程 ID。
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                // 锁续期
                CompletionStage<Boolean> future = renewExpirationAsync(threadId);
                future.whenComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getRawName() + " expiration", e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    // 续期成功
                    if (res) {
                        // reschedule itself
                        // 递归调用自身,不断续期
                        renewExpiration();
                    // 续期失败,表示锁被释放
                    } else {
                    	// 取消定时任务等操作
                        cancelExpirationRenewal(null);
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

下面我们看一下锁续期的核心代码:

protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                internalLockLeaseTime, getLockName(threadId));
    }

该lua脚本的中的参数与上面相同:
KEYS[1]:锁住的对象(锁住的key)
ARGV[1]:锁的过期时间
ARGV[2]:UUID+当前线程id

下面我们看一下这段lua脚本的逻辑:

	// 通过hexists指令判断当前的锁是不是自己的
	"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
	// 如果是的话通过pexpire指令设置锁的过期时间
	"redis.call('pexpire', KEYS[1], ARGV[1]); " +
	"return 1; " +
	"end; " +
	// 否则返回0
	"return 0;",

整理上文可以看出看门狗机制简单来说就是每隔 过期时间/3 毫秒去执行lua脚本,若锁未被释放则刷新其过期时间,直至锁被释放为止。

接下来我们再看下解锁的逻辑:

@Override
    public void unlock() {
        try {
        	// 释放锁
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
public RFuture<Void> unlockAsync(long threadId) {
		// 释放锁的核心代码
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        CompletionStage<Void> f = future.handle((opStatus, e) -> {
        	// 取消看门狗机制(就是取消上文的定时任务等操作)
            cancelExpirationRenewal(threadId);

            if (e != null) {
                throw new CompletionException(e);
            }
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                throw new CompletionException(cause);
            }

            return null;
        });

        return new CompletableFutureWrapper<>(f);
    }

下面我们看一下解锁的核心代码:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

先说明一下该lua脚本的中的参数:
KEYS[1]:锁住的对象(锁住的key)
KEYS[2]:监听该锁的频道
ARGV[1]:解锁消息
ARGV[2]:锁的过期时间
ARGV[3]:UUID+当前线程id

下面我们看下这段lua脚本的逻辑:

	// 通过hexists指令判断当前的锁是不是自己的
	"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
	// 不是自己的则返回nil(相当于null)
	"return nil;" +
	"end; " +
	// 则使用 hincrby 指令将字段锁的可重入次数减去 1,即减少持有锁的线程数。
	"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
	// 如果结果 > 0,代表解锁成功,但是锁仍然存在
	"if (counter > 0) then " +
	// 通过pexpire指令设置锁的过期时间
	"redis.call('pexpire', KEYS[1], ARGV[2]); " +
	// 返回0
	"return 0; " +
	"else " +
	// 如果结果 < 0 ,代表持有锁的线程数为0,这时需要完全释放锁,通过del指令删除指定key的锁
	"redis.call('del', KEYS[1]); " +
	// 通过publish指令向订阅该锁的频道发送解锁消息
	"redis.call('publish', KEYS[2], ARGV[1]); " +
	// 返回1
	"return 1; " +
	"end; " +
	"return nil;",

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/674657.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

推荐5 款好用的 Linux 音乐播放器

目前 Linux 上有几十个音乐播放器&#xff0c;这使得找到一个最好用的变成很困难。之前我们已经回顾了其中的一些播放器&#xff0c;如 Cantata&#xff0c;Exaile&#xff0c;甚至不那么出名的 Clementine&#xff0c;Nightingale 和 Quod Libet。 在本篇文章中我将涵盖更多的…

python学习——pandas数据处理 时间序列案例 matplotlib绘图案例

目录 pandas数据处理1.合并数据1) 堆叠合并2) 主键合并3) 重叠合并 2.分组和聚合3.索引和符合索引4.去除重复值5.处理缺失值6.处理离群值7.标准化数据1) 离差标准化函数2) 标准差标准化函数3) 小数定标差标准化函数 8.转换数据--离散处理9.时间序列【案例】时间序列案例案例1&a…

C++测试

开始对C嘎嘎下手&#xff01; 1.有关char数组 定义长度为5&#xff0c;但是实际长度是定义长度减1 突然就想到计网安全中的栈溢出问题了&#xff0c;C语言是不检查你是否越界的&#xff0c;如果通过让实参溢出覆盖掉原程序的返回地址&#xff0c;通过精心控制是可以让计算机执…

高级数据结构——红黑树

目录 1. 红黑树的概念 2. 红黑树的性质 3. 红黑树 6. 红黑树的验证 7. 红黑树的删除 8. 红黑树与AVL数的比较 9. 红黑树的应用 10. 完整代码 10.1 RBTree.h 10.2 test.cpp 1. 红黑树的概念 红黑树&#xff0c;是一种二叉搜索树&#xff0c;但在每个结点上增加一个存…

49天精通Java,第37天,可变参数列表

目录 一、可变参数列表二、可变参数列表的优缺点1、优点2、缺点 三、可变参数列表的适用场景1、函数重载2、命令行参数解析3、集合操作4、函数式编程 大家好&#xff0c;我是哪吒。 &#x1f3c6;本文收录于&#xff0c;49天精通Java从入门到就业。 全网最细Java零基础手把手…

SpringBoot 如何使用 @ResponseStatus 注解处理异常状态码

SpringBoot 如何使用 ResponseStatus 注解处理异常状态码 在 SpringBoot 应用程序中&#xff0c;异常处理是一个非常重要的话题。当应用程序出现异常时&#xff0c;我们需要对异常进行处理&#xff0c;以保证应用程序的稳定性和可靠性。除了使用异常处理器外&#xff0c;Sprin…

重新理解微服务之终究绕不过这4个坎之(一)

写在前头 大家曾经有没有遇过日常技术交流的时候&#xff0c;会讨论某某技术之间的关系是什么&#xff0c;某些技术是否应该用到微服务。我相信热爱技术交流的您&#xff0c;就算不是在微服务这里领域&#xff0c;或多或少都会跟其他同行会做一些争议话题的探讨&#xff0c;而…

华为OD机试真题B卷 JavaScript 实现【字符串分隔】,附详细解题思路

一、题目描述 输入一个字符串&#xff0c;请按长度为8拆分每个输入字符串并进行输出&#xff0c;长度不是8整数倍的字符串请在后面补数字0&#xff0c;空字符串不处理。 二、输入描述 连续输入字符串(每个字符串长度小于等于100)。 三、输出描述 依次输出所有分割后的长度…

k8s使用ceph存储

文章目录 初始化操作k8s使用ceph rbdvolumePV静态pv动态pv k8s使用cephfsvolume静态pv 初始化操作 ceph创建rbd存储池 ceph osd pool create k8s-data 32 32 replicated ceph osd pool application enable k8s-data rbd rbd pool init -p k8s-dataceph添加授权&#xff0c;需…

指针和数组--指针数组及其应用

目录 一、指针数组用于表示多个字符串 二、指针数组用于表示命令行参数 一、指针数组用于表示多个字符串 一维数组可存储一个字符串&#xff0c;二维数组可存储多个字符串。 二维数组的元素在内存中是连续存放的&#xff0c;存完第一行后&#xff0c;再存第二行&#xff0c;以…

多线程之JUC

写在前面 本文一起看下jdk并发包的相关内容。 1&#xff1a;JUC包提供了哪些功能 先通过包结构看下JUC提供的功能&#xff1a; 接下来分别看下。 1.1&#xff1a;锁 JUC中的锁机制提供了比synchronized&#xff0c;wait/notify更加灵活的同步控制&#xff0c;在java.util.…

大数据基础平台实施及运维进阶

1、完全分布式部署介绍 完全分部式是真正利用多台Linux主机来进行部署Hadoop&#xff0c;对Linux机器集群进行规划&#xff0c;使得Hadoop各个模块分别部署在不同的多台机器上。 2、nameNode HA完全分布式部署 2.1、nameNode切换方法 分别处于Active和Standby中 hadoop可以…

操作系统复习笔记4

1、queueType队列类型 队列中的数据也呈线性排列。虽然与栈有些相似&#xff0c;但队列中添加和删除数据的操作分别是在两端进行的。 线性表有顺序存储和链式存储&#xff0c;队列作为一种特殊的线性表&#xff0c;也同样存在这两种存储方式。 1.1 顺序队列 用数组存储队列…

C语言学习(二十五)---指针练习题(一)

在上一节内容中&#xff0c;我们学习了递归与冒泡排序法的有关内容&#xff0c;今天我们将继续往下学习&#xff0c;主要内容为指针练习题&#xff0c;好了&#xff0c;话不多说&#xff0c;开整&#xff01;&#xff01;&#xff01; 在之前的第18—22的内容中&#xff0c;我…

lnmp框架的应用

目录 应用一 nginx访问状态统计 1.先查看http_stub_status有没有安装 2.进入nginx的配置文件改配置 3.nginx-检查配置 重启服务 最后这个20就是显示的状态统计 应用二 给网站加密 1.首先安装http-tools软软件 2.把nginx设置锁也要有执行权限 3.进入nginx配置文件 4. 检查…

【Windows个性化设置篇】StartAllBack更改win11任务栏设置

【Windows个性化设置篇】StartAllBack更改win11任务栏设置 Windows11目前不支持更改任务栏位置固定的修改&#xff0c;因为想把任务栏固定到旁边&#xff0c;从而充分利用电脑屏幕位置。之前试过TranslucentTB可以把任务栏透明化&#xff0c;很漂亮&#xff0c;但在分屏操作时…

【Vue3】Vue3+Vite+TS使用npm包引入百度地图

文章目录 Vue3ViteTS引入百度地图一、注册二、安装依赖包三、参考文档四、全局注册五、局部导入六、断网地图的使用八、项目使用成功图片九、使用卫星图 Vue3ViteTS引入高德地图npm包查找地图依赖包 Vue3ViteTS引入百度地图 一、注册 官网&#x1f449;百度地图开放平台 注册…

python---案例分析(1)

标准库 python自带的 第三方库 其他人做出来的 例1: 实现一个日期计算器 EG: 计算2012年2月14日和2016年2月3日之间的差值 使用datetime 1.根据日期构造出datetime类型的变量 2.把两个变量进行相减,得到的结果即为所求 1) 2) 3) 例2: 实现单词逆序 翻转单词顺序 i am a s…

MySQL数据库表的操作

创建表 语法&#xff1a; CREATE TABLE table_name (field1 datatype,field2 datatype,field3 datatype ) character set 字符集 collate 校验规则 engine 存储引擎; 说明&#xff1a; field 表示列名。 datatype 表示列的类型。 character set 字符集&#xff0c;如果没有指…

hutool包下的BeanUtil工具使用、SQL中的and和OR的优先级

SQL中的and和OR的优先级 首先and的优先级大于or&#xff0c;通俗理解其实or查询其实会把条件分为左右两边来查。 如select * from user where id 1 and status 2 or status 3,本来想查询user表中id为1的状态为2或者3的数据&#xff0c;其实只会这样执行&#xff0c;and比or…