【源码分析】从源码层面深度剖析Redisson实现分布式锁的原理

news2024/11/17 5:35:43

快速入门

  1. 引入redisson依赖
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.16.0</version>
</dependency>
  1. 编写测试代码
public class RedissonTest {

    private static RedissonClient redissonClient;

    static {
        Config config=new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        redissonClient= Redisson.create(config);
    }

    public static void main(String[] args) throws InterruptedException {
        RLock rLock=redissonClient.getLock("updateData");
        //最多等待100秒、上锁10s以后自动解锁
        if(rLock.tryLock(100,10, TimeUnit.SECONDS)){
            System.out.println("获取锁成功");
        }
        Thread.sleep(2000);
        rLock.unlock();

        redissonClient.shutdown();
    }
}

lock方法会一直重试,tryLock方法会尝试,在一定时间内放弃重试。加了leaseTime,没有看门狗机制

private void redissonDoc() throws InterruptedException {
    //1. 普通的可重入锁
    RLock lock = redissonClient.getLock("generalLock");

    // 拿锁失败时会不停的重试
    // 具有Watch Dog 自动延期机制 默认续30s 每隔30/3=10 秒续到30s
    lock.lock();

    // 尝试拿锁10s后停止重试,返回false
    // 具有Watch Dog 自动延期机制 默认续30s
    boolean res1 = lock.tryLock(10, TimeUnit.SECONDS);

    // 拿锁失败时会不停的重试
    // 没有Watch Dog ,10s后自动释放
    lock.lock(10, TimeUnit.SECONDS);

    // 尝试拿锁100s后停止重试,返回false
    // 没有Watch Dog ,10s后自动释放
    boolean res2 = lock.tryLock(100, 10, TimeUnit.SECONDS);

    //2. 公平锁 保证 Redisson 客户端线程将以其请求的顺序获得锁
    RLock fairLock = redissonClient.getFairLock("fairLock");

    //3. 读写锁 没错与JDK中ReentrantLock的读写锁效果一样
    RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readWriteLock");
    readWriteLock.readLock().lock();
    readWriteLock.writeLock().lock();
}

实现原理

RedissonLock#lock(),无参数默认leaseTime是-1

    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }
        //...
    }

RedissonLock#tryAcquire

    private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }

RedissonLock#tryAcquireAsync,如果租约时间是-1,获取internalLockLeaseTime;如果不等于-1,获取该时间。租约默认时间是在Config的构造函数中设置的,lockWatchdogTimeout。如果续约时间设置为-1,且获取锁成功,开启看门狗。

    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Long> ttlRemainingFuture;
        if (leaseTime != -1) {
            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining == null) {
                if (leaseTime != -1) {
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

RedissonBaseLock#scheduleExpirationRenewal,判断是否存在EXPIRATION_RENEWAL_MAP,不存在则执行 renewExpiration();,存在则直接返回。考虑到了可重入锁。

    protected void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            renewExpiration();
        }
    }

RedissonBaseLock#renewExpiration,开启定时任务。每次间隔租期的1/3时间执行

    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getRawName() + " expiration", e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

使用lua进行续约

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                internalLockLeaseTime, getLockName(threadId));
    }

RedissonLock#tryLockInnerAsync,使用lua脚本进行加锁。

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

RedissonLock#unlockInnerAsync,使用lua脚本进行解锁

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

RedissonLock处理锁竞争。subscribeFuture.await(time, TimeUnit.MILLISECONDS)在指定时间内获取锁失败,返回false。

    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        //...
        
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        
         try {
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
    }

在这里插入图片描述

标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符
标题复制10行,并且每行大于10个字符

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/497713.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SubMain GhostDoc Enterprise v2022 Crack

GhostDoc 是一个 Visual Studio 扩展&#xff0c;适用于需要使用可自定义模板从源代码生成 XML 注释、维护干净和最新文档、生成多种格式的帮助文档、在 Visual Studio 中使用智能源代码拼写检查器等的开发人员. GhostDoc 还有助于自动生成干净、有用的干净代码文档&#xff0c…

scratch统计距离学校远近 中国电子学会图形化编程 少儿编程 scratch编程等级考试四级真题和答案解析2023年3月

目录 scratch统计距离学校远近 一、题目要求 1、准备工作 2、功能实现 二、案例分析

Elasticsearch:结合两全其美:Elasticsearch 与 BM25 和 HNSW 的混合搜索

就搜索算法而言&#xff0c;没有万能的解决方案。 不同的算法在不同的场景下效果更好&#xff0c;有时需要算法的组合才能达到最好的效果。 在 Elasticsearch 中&#xff0c;一种流行的组合搜索算法的方法是使用混合搜索&#xff0c;将用于文本搜索的 BM25 算法与用于最近邻搜索…

数据结构与算法十 并查集

一 并查集 并查集是一种树型的数据结构 &#xff0c;并查集可以高效地进行如下操作&#xff1a; 查询元素p和元素q是否属于同一组合并元素p和元素q所在的组 1.1 并查集结构 并查集也是一种树型结构&#xff0c;但这棵树跟我们之前讲的二叉树、红黑树、B树等都不一样&#xf…

【Stable Diffusion】基本概念之hypernetwork

1.基本概念 hypernetwork&#xff0c;中文名为超网络&#xff0c;是一种神经网络架构,它允许动态生成神经网络的参数(权重)。简而言之,hypernetwork可以生成其他神经网络。 在Stable Diffusion中,hypernetwork被用于动态生成分类器的参数&#xff0c;为Stable Diffusion模型添加…

牛客刷SQL题Day5

SQL69 返回产品并且按照价格排序 select prod_name , prod_price from Products where prod_price between 3 and 6 select prod_name , prod_price from Products where 6>prod_price and prod_price >3 踩坑1&#xff1a; between......and.......包括边界。 踩坑2&am…

ES6之箭头函数

文章目录 前言一、定义二、简化1.当函数参数只有一个时2.当函数体只有一条return语句时 三、注意1.箭头函数的this2.不能作为构造函数实例化对象3.不能使用argument变量 总结 前言 简单的讲&#xff0c;箭头函数是将原function关键字和函数名删掉的一种简写函数形式。 一、定义…

二维体光子晶体的平面波展开法代码

%书上的代码&#xff0c;和FEM符合的更好 %在这个代码里试着把单位原胞的相对介电常数分布画出来 %这个代码的单位原胞的中心就是(0,0)点&#xff0c;也就是坐标原点 %The program for the computation of the PhC photonic %band structure for 2D PhC. %Parameters of the st…

Jvm --java虚拟机(上)

为什么学习jvm 如果你这辈子只甘心做一个平庸的Java码农&#xff0c;那么你可以利用阅读本文的时间去学习其他新的技术知识&#xff0c;但是如果你想成为一个更更更更优秀的中高级程序员&#xff01;那么请继续阅读本文&#xff0c;希望这篇文章会对你有所帮助&#xff0c;那么…

国考省考结构化面试:综合分析题,名言哲理(警句观点启示)、漫画反驳题等

国考省考结构化面试&#xff1a;综合分析题&#xff0c;名言哲理&#xff08;警句观点启示&#xff09;、漫画反驳题等 2022找工作是学历、能力和运气的超强结合体! 公务员特招重点就是专业技能&#xff0c;附带行测和申论&#xff0c;而常规国考省考最重要的还是申论和行测&a…

从面向过程到面向对象

目录 1、抽象 2、UML类图 3、类定义 4、类成员函数 &#xff08;1&#xff09;构造函数&#xff08;constructor&#xff09; &#xff08;2&#xff09;析构函数&#xff08;destructor&#xff09; 5、对象实现 6、封装 7、getter、setter方法 为什么要从面向过程转…

C++实现ini配置文件解析——API设计

什么是配置文件 INI文件&#xff08;Initialization File&#xff09;是一种文本文件格式&#xff0c;通常用于存储配置数据。INI文件最初由Microsoft在Windows系统中引入&#xff0c;用于存储应用程序的配置信息。 INI文件的结构相对简单&#xff0c;由一系列的节&#xff0…

国外15家值得关注的AI创业公司

文 | 小戏、iven 星星之火&#xff0c;可以燎原。 在大模型横空出世的这个疯狂的春天&#xff0c;一场关于 AI 产品的革命也正在席卷全球。这边是大公司一个接一个模型搞军备竞赛&#xff0c;那边是各路豪强纷纷下场创业招兵买马。那么&#xff0c;除了咱们耳熟能详的 OpenAI 以…

数字化转型导师坚鹏:企业数字化营销

企业数字化营销 ————助力零售业务向批量化开发转变&#xff0c;对公业务向智慧化转变 课程背景&#xff1a; 很多企业存在以下问题&#xff1a; 不清楚数字化营销对企业发展有什么影响&#xff1f; 不知道如何提升企业数字化营销能力&#xff1f; 不知道企业如何开…

面试官:一千万的数据,你是怎么查询的

面试官&#xff1a;一千万的数据&#xff0c;你是怎么查询的&#xff1f; 前言 面试官&#xff1a;来说说&#xff0c;一千万的数据&#xff0c;你是怎么查询的&#xff1f;B哥&#xff1a;直接分页查询&#xff0c;使用limit分页。面试官&#xff1a;有实操过吗&#xff1f;B…

word@通配符@高级搜索查找@替换@中英文标点符号替换

文章目录 高级搜索通配符批量选中引用序号上标调整搜索替换作用范围设置&#x1f388;通过样式选择作用区域通过鼠标选择作用区域 高级替换操作顺序 标点符号替换&#x1f388;将英文逗号替换为中文逗号使用普通查找和替换&#xff1a;使用通配符替换 将英文句点替换为中文句号…

【Stable Diffusion】ControlNet基本教程(二)

接上篇【Stable Diffusion】ControlNet基本教程&#xff08;一&#xff09;&#xff0c;本篇介绍两个ControlNet常见的基本用法&#xff0c;更多用法欢迎关注博主&#xff0c;博主还会更新更多有趣的内容。 3.ControlNet基本用法 3.1漫画线稿上色 &#xff08;1&#xff09;上传…

Mysql索引(3):索引分类

1 索引分类 在MySQL数据库&#xff0c;将索引的具体类型主要分为以下几类&#xff1a;主键索引、唯一索引、常规索引、全文索引。 分类含义特点关键字主键索引针对于表中主键创建的索引 默认自动创建, 只能有一个 PRIMARY 唯一索引 避免同一个表中某数据列中的值重复可以有多…

Graph Embeddings—随机游走基本概念

Random Walk Approaches for Node Embeddings 一、随机游走基本概念 想象一个醉汉在图中随机的行走&#xff0c;其中走过的节点路径就是一个随机游走序列。 随机行走可以采取不同的策略&#xff0c;如行走的方向、每次行走的长度等。 二、图机器学习与NLP的关系 从图与NLP的…

【计算机网络】总结篇

【C语言部分】总结篇 【操作系统】总结篇 【数据库&#xff08;SQL&#xff09;】总结篇 本文目录 1. 简述网络七层参考模型及每一层的作用2. 简述静态路由和动态路由3. 说说有哪些路由协议&#xff0c;都是如何更新的4. 简述域名解析过程&#xff0c;本机如何干预域名解析5. 简…