Redisson源码-多线程之首个获取锁的线程加解锁流程

news2025/1/10 20:34:01

Redisson源码-多线程之首个获取锁的线程加解锁流程

简介
当有多个线程同时去获取同一把锁时,第一个获取到锁的线程会进行加解锁,其他线程需订阅消息并等待锁释放。

以下源码分析基于redisson-3.17.6版本,不同版本源码会有些许不同需注意。

		 <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.17.6</version>
        </dependency>

当我们调用Redisson.lock()并且不设置锁时间时,我们进入RedissonLock的lock方法。

    public void lock() {
        try {
        	// -1L为锁时间,表示不限时
            this.lock(-1L, (TimeUnit)null, false);
        } catch (InterruptedException var2) {
            throw new IllegalStateException();
        }
    }
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
		// 获取当前线程id
        long threadId = Thread.currentThread().getId();
        // 尝试获取锁
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        // 获取到锁直接返回
        if (ttl == null) {
            return;
        }
		// 订阅锁消息:当锁被释放的时候,会通过publish发布一条消息,通知其它等待这个锁的线程,锁已经释放。
        CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
        pubSub.timeout(future);
        RedissonLockEntry entry;
        if (interruptibly) {
            entry = commandExecutor.getInterrupted(future);
        } else {
            entry = commandExecutor.get(future);
        }

        try {
        	// 不停尝试获取锁
            while (true) {
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                // 如果锁的过期时间>=0
                if (ttl >= 0) {
                    try {
                    	// 等待超时时间过去
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                 // 锁过期时间<0,代表锁的超时时间未设置
                } else {
                    if (interruptibly) {
                        entry.getLatch().acquire();
                    } else {
                    	// 无限等待直至获取锁
                        entry.getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(entry, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

我们按照代码逻辑先看一下尝试获取锁的代码

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Long> ttlRemainingFuture;
        if (leaseTime > 0) {
        	// 指定了超时时间
            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
        	// 未指定超时时间
            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
            // lock acquired
            // ttlRemaining为null, 本质就是加锁的LUA脚本中返回nil,表示获取锁成功
            if (ttlRemaining == null) {
                if (leaseTime > 0) {
                	// 如果设置了超时时间,则更新internalLockLeaseTime为指定的超时时间,并且不会启动看门狗
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                	// 自动续期实现,开启看门狗机制
                    scheduleExpirationRenewal(threadId);
                }
            }
            return ttlRemaining;
        });
        return new CompletableFutureWrapper<>(f);
    }

tryLockInnerAsync方法里的代码是加锁的核心代码之一:

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

先介绍一下lua脚本的中的参数:
KEYS[1]:锁住的对象(锁住的key),相当于下图的HASH
ARGV[1]:锁的过期时间
ARGV[2]:UUID+当前线程id,相当于下图的key
nil:相当于null

初次获取锁:
在这里插入图片描述
锁重入:
在这里插入图片描述

接下来我们详细看一下lua脚本的逻辑:

 		 // 通过exists指令判断需要加锁的key是否存在,如果不存在,说明还没被加锁,可以直接进行加锁
		"if (redis.call('exists', KEYS[1]) == 0) then " +
        // 通过hincrby指令往redis中插入一个哈希结构的数据,key[1]=加锁key   ARGV[2]=uuid+当前线程ID  1=锁的重入次数
        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
        // 通过pexpire指令设置锁的过期时间:ARGV[1]=锁的过期时间
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        // 返回nil, 表示加锁成功:nil=null
        "return nil; " +
        "end; " +

        // 如下是可重入锁的逻辑
        // 通过hexists指令判断当前的锁是不是自己的,只有是自己的锁时,才支持可重入
        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
        // 通过hincrby指令更新hash结构的数据(锁结构数据),将value对应的可重入次数加一
        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
        // 通过pexpire指令设置锁的过期时间
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        // 返回nil, 表示加锁成功
        "return nil; " +
        "end; " +

        // 如果当前已经有人获取了锁,并且这个锁不是自己的,那么将会执行pttl指令,返回当前锁剩余的过期时间
        "return redis.call('pttl', KEYS[1]);"

从上面我们可以看出,redisson加锁的本质是通过执行lua脚本,返回nil(相当于null)或锁的剩余过期时间。如果返回并且未设置过期时间则开启看门狗机制。

接下来我们看下开启看门狗机制的代码

protected void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        // 将当前锁的名称和 ExpirationEntry 对象放入 EXPIRATION_RENEWAL_MAP中,并返回之前关联的对象(如果存在)。
        // 本质就是检查当前锁是否已开启自动续期。
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        // 已开启
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        // 未开启
        } else {
            entry.addThreadId(threadId);
            try {
            	// 执行自动续期机制
                renewExpiration();
            } finally {
                if (Thread.currentThread().isInterrupted()) {
                	// 如果线程中断则解除自动续期机制避免死锁
                    cancelExpirationRenewal(threadId);
                }
            }
        }
    }
private void renewExpiration() {
		// 获取当前锁名称的 ExpirationEntry 对象。
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        // 构建Timeout任务去执行锁续期,本质是调用了netty框架中的newTimeout方法,相当于一个延迟定时任务。
        // 相当于每隔 过期时间/3 (默认10秒)毫秒,递归调用renewExpiration方法去执行锁续期直至锁被释放。
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
            	// 二次获取当前锁名称的 ExpirationEntry 对象。
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                // 获取 ExpirationEntry 对象中的第一个线程 ID。
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                // 锁续期
                CompletionStage<Boolean> future = renewExpirationAsync(threadId);
                future.whenComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getRawName() + " expiration", e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    // 续期成功
                    if (res) {
                        // reschedule itself
                        // 递归调用自身,不断续期
                        renewExpiration();
                    // 续期失败,表示锁被释放
                    } else {
                    	// 取消定时任务等操作
                        cancelExpirationRenewal(null);
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

下面我们看一下锁续期的核心代码:

protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                internalLockLeaseTime, getLockName(threadId));
    }

该lua脚本的中的参数与上面相同:
KEYS[1]:锁住的对象(锁住的key)
ARGV[1]:锁的过期时间
ARGV[2]:UUID+当前线程id

下面我们看一下这段lua脚本的逻辑:

	// 通过hexists指令判断当前的锁是不是自己的
	"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
	// 如果是的话通过pexpire指令设置锁的过期时间
	"redis.call('pexpire', KEYS[1], ARGV[1]); " +
	"return 1; " +
	"end; " +
	// 否则返回0
	"return 0;",

整理上文可以看出看门狗机制简单来说就是每隔 过期时间/3 毫秒去执行lua脚本,若锁未被释放则刷新其过期时间,直至锁被释放为止。

接下来我们再看下解锁的逻辑:

@Override
    public void unlock() {
        try {
        	// 释放锁
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
public RFuture<Void> unlockAsync(long threadId) {
		// 释放锁的核心代码
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        CompletionStage<Void> f = future.handle((opStatus, e) -> {
        	// 取消看门狗机制(就是取消上文的定时任务等操作)
            cancelExpirationRenewal(threadId);

            if (e != null) {
                throw new CompletionException(e);
            }
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                throw new CompletionException(cause);
            }

            return null;
        });

        return new CompletableFutureWrapper<>(f);
    }

下面我们看一下解锁的核心代码:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

先说明一下该lua脚本的中的参数:
KEYS[1]:锁住的对象(锁住的key)
KEYS[2]:监听该锁的频道
ARGV[1]:解锁消息
ARGV[2]:锁的过期时间
ARGV[3]:UUID+当前线程id

下面我们看下这段lua脚本的逻辑:

	// 通过hexists指令判断当前的锁是不是自己的
	"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
	// 不是自己的则返回nil(相当于null)
	"return nil;" +
	"end; " +
	// 则使用 hincrby 指令将字段锁的可重入次数减去 1,即减少持有锁的线程数。
	"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
	// 如果结果 > 0,代表解锁成功,但是锁仍然存在
	"if (counter > 0) then " +
	// 通过pexpire指令设置锁的过期时间
	"redis.call('pexpire', KEYS[1], ARGV[2]); " +
	// 返回0
	"return 0; " +
	"else " +
	// 如果结果 < 0 ,代表持有锁的线程数为0,这时需要完全释放锁,通过del指令删除指定key的锁
	"redis.call('del', KEYS[1]); " +
	// 通过publish指令向订阅该锁的频道发送解锁消息
	"redis.call('publish', KEYS[2], ARGV[1]); " +
	// 返回1
	"return 1; " +
	"end; " +
	"return nil;",

以上我们解释说明了单个线程在不等待锁的情况下,直接获取锁,对锁进行续期和解锁的代码逻辑,可以看出加解锁本质上都是通过lua脚本去执行,当有多个线程同时去获取锁时,第一个获取到锁的线程会按照此逻辑执行,其他线程需订阅消息并等待锁释放。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/690794.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构--单链表的插入删除

数据结构–单链表的插入&删除 目标 单链表的插入&#xff08;位插、前插、后插&#xff09; 单链表的删除 单链表的插入 按为序插入(带头结点) ListInsert(&L,i,e):插入操作。在表L中的第i个位置上插入指定元素e。 思路&#xff1a;找到第i-1个结点,将新结点插入其…

软件工程期末报告(登录注册部分)

云小智微校园工具系统的设计与实现成员1注册模块 第一章 绪论 系统的背景描述和概述&#xff1a;可以描述系统服务的对象是学生,满足他们查询课程安排和上课时间的需求。目前市场上确实存在这方面的需求,这款app可以方便学生管理课程。系统采用客户端-服务器架构,运行在安卓平…

自学黑客(网络安全),一般人我劝你还是算了吧(自学网络安全学习路线--第十章 公钥基础设施-PKI)【建议收藏】

文章目录 一、自学网络安全学习的误区和陷阱二、学习网络安全的一些前期准备三、自学网络安全学习路线一、PKI概述1、理论基础2、PKI提供的安全服务 二、数字证书1、数字证书的格式2、数字证书的生命周期3、用JAVA工具生成数字证书 三、PKI组成四、PKI功能五、信任模型六、相关…

基于 Redis 手写一个“秒杀”

博主介绍&#xff1a; ✌博主从事应用安全和大数据领域&#xff0c;有8年研发经验&#xff0c;5年面试官经验&#xff0c;Java技术专家✌ Java知识图谱点击链接&#xff1a;体系化学习Java&#xff08;Java面试专题&#xff09; &#x1f495;&#x1f495; 感兴趣的同学可以收…

Git分布式版本控制工具 —— 详细笔记

❤ 作者主页&#xff1a;欢迎来到我的技术博客&#x1f60e; ❀ 个人介绍&#xff1a;大家好&#xff0c;本人热衷于Java后端开发&#xff0c;欢迎来交流学习哦&#xff01;(&#xffe3;▽&#xffe3;)~* &#x1f34a; 如果文章对您有帮助&#xff0c;记得关注、点赞、收藏、…

现代操作系统(中)

第三章 内存管理 概述 内存&#xff08;RAM&#xff09;是计算机中一种需要认真管理的重要资源。 经过多年探索&#xff0c;人们提出了分层存储器体系&#xff08;memory hierarchy&#xff09;的概念&#xff0c;即在这个体系中&#xff0c;计算机有若干兆&#xff08;MB&a…

统信UOS系统开发笔记(七):在统信UOS系统上使用linuxdeployqt发布qt程序

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/131411975 红胖子(红模仿)的博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV、OpenGL、ffmpeg、OSG、单片机、软…

qt QSqlRelationalTableModel 详解

背景知识&#xff1a; Qt SQL的API分为不同层&#xff1a; 驱动层 驱动层 对于QT是基于C来实现的框架&#xff0c;该层主要包括QSqlDriver、QSqlDriverCreator、QSqlDriverCreatorbase、QSqlDriverPlugin and QSqlResult。这一层提供了特定数据库和SQL API层之间的底层桥梁…

Problem I Rank LED题解 - 2018年第一届GXCPC广西大学生程序设计大赛 正式赛

Problem I Rank LED题解 题目大意 ‘0’到‘9’的数字亮线依次为{6、2、5、5、4、5、6、3、7、6}。 Luras想修改每条光线的位置&#xff0c;使她的新等级尽可能小&#xff0c;同时新等级也是一个不带任何前导零的正整数。 另外&#xff0c;光线总数应与开始时相同。 官方题…

【AIGC】1、爆火的 AIGC 到底是什么 | 全面介绍

文章目录 一、AIGC 的简要介绍二、AIGC 的发展历程三、AIGC 的基石3.1 基本模型3.2 基于人类反馈的强化学习3.3 算力支持 四、生成式 AI&#xff08;Generative AI&#xff09;4.1 单模态4.1.1 生成式语言模型&#xff08;Generative Language Models&#xff0c;GLM&#xff0…

消息处理机制(AOSP4.4.2)

消息处理机制&#xff08;AOSP4.4.2&#xff09; Android 应用程序是通过消息来驱动的&#xff0c;系统为每一个应用程序维护一个消息队列&#xff0c;应用程序的主线程&#xff0c;不断地从这个消息队列中获取消息&#xff08;Looper&#xff09;&#xff0c;然后对消息进行处…

STM32单片机(三)第二节:GPIO输出练习2(LED流水灯)

❤️ 专栏简介&#xff1a;本专栏记录了从零学习单片机的过程&#xff0c;其中包括51单片机和STM32单片机两部分&#xff1b;建议先学习51单片机&#xff0c;其是STM32等高级单片机的基础&#xff1b;这样再学习STM32时才能融会贯通。 ☀️ 专栏适用人群 &#xff1a;适用于想要…

3 分钟为英语学习神器 Anki 部署一个专属同步服务器

原文链接&#xff1a;https://icloudnative.io/posts/anki-sync-server/ Anki 介绍 Anki 是一个辅助记忆软件&#xff0c;其本质是一个卡片排序工具--即依据使用者对卡片上的自定义内容进行主动测试、自我评判后&#xff0c;其内部算法根据评判结果更改每张卡片下次测试时间的…

将win上的文件传输到Ubuntu虚拟机

首先获取Ubuntu系统的ip地址&#xff0c;在Ubuntu的Terminal中输入ifconfig&#xff0c;可以看到Ubuntu的ip地址 可以看到我电脑的ip地址是10.0.2.15。更改虚拟机的网络连接 这里以VirtualBox为例&#xff0c;打开VirtualBox设置&#xff0c;选择网络&#xff0c;将连接方式改…

React.JS实战项目(三):图书购物网站

React.JS实战项目(三):图书购物网站 1、菜单 首页图书新书购物车2、首页 首页视频预览 首页预览 首页主要展示了友情链接、图书分类、好书推荐、新书广场等等信息。 首页部分代码展示 <Row><Col

SpringSecurity整合ssm

SpringSecurity 1. SpringSecurity 框架简介 Spring 是非常流行和成功的 Java 应用开发框架&#xff0c;Spring Security 正是 Spring 家族中的成员。Spring Security 基于 Spring 框架&#xff0c;提供了一套 Web 应用安全性的完整解决方 案。 正如你可能知道的关于安全方面…

nexus 配置pypi代理

在研发环境中由于网络限制&#xff0c;无法访问外网&#xff0c;但经常使用npm、maven、pip等工具&#xff0c;这种场景中使用nexus 做代理是一个比较好的解决办法。 在配置pypi代理时&#xff0c;和配置npm、maven代理有所不同&#xff0c;在配置远程地址时&#xff0c;需要将…

我的IDEA插件

文章目录 前言一、.ignore二、Adapter for Eclipse Code Formatter三、Convert YAML and Properties File四、EasyCode五、Free MyBatis Tool六、Maven Helper七、Rainbow Brackets 前言 目前使用比较顺手的插件&#xff0c;具体使用方法自行查阅 一、.ignore git 忽略文件&…

【算法与数据结构】344、LeetCode反转字符串

文章目录 一、题目二、解法三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、解法 思路分析&#xff1a;关于变量交换有两种办法&#xff0c;一种是最常见的引入一个临时变量方法&#xff0c;另一种是使用位运…

【Java面试题】Java基础——集合

文章目录 集合的形式List和Set的区别ArrayList和LinkedList的区别ArrayList和数组的区别ArrayList的扩容机制是什么&#xff1f;ArrayList有哪些特点List和Map的区别如何让map存储有序数据如何创建Map?常用的Map有哪些?如何在HashMap中插入一个数据遍历一个 List 有哪些不同的…