Redis实战(4)——Redisson分布式锁

news2024/11/24 20:37:31

1 基于互斥命令实现分布式锁的弊端

根据上篇文章基于redis互斥命令实现的分布式锁任然存在一定的弊端

  • 1无法重入: 同一个线程无法重新获得同一把锁
  • 2超时删除 :会因为超时、任务阻塞而自动释放锁,出现其他线程抢占锁出现并行导致线程不安全的问题
  • 3 不可重试: 基于setnx 互斥指令实现的非阻塞式分布式锁在获取不到锁时将会立即返回,没有重试机制
  • 4主从一致性: 如果Redis提供了主从同步,主从同步时出现了延迟时,会出现无法判定当前线程锁的状态,出现线程不安全的问题。因为一般写指令【setnx】向主Redis操作,读指令【get key】向从Redis操作,即主从分离的情形。

2 Redisson

为了解决上述基于互斥命令实现的锁出现的问题,可以选择使用Redisson分布式锁方案。
Redisson不仅仅是一个Redis客户端,它还实现了很多具有分布式特性的常用工具类。
引入依赖

        <!--redisson 分布式锁-->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.5.6</version>
        </dependency>

创建Redisson的客户端

@Configuration
public class RedissonClientConfig {
    
    @Bean
    public RedissonClient redissonClient(){
        // 配置类
        Config config = new Config();
        // 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
        config.useSingleServer()
                .setAddress("redis://127.0.0.1:6379")
                .setDatabase(1);
        // 创建客户端
        return Redisson.create(config);
    }
}

3 Redisson 分布式锁实现原理

3.1 Redisson实现可重入锁原理

相较于redis使用互斥指令setnex 创建的分布式锁,存储的数据结构为 S t r i n g \textcolor{red}{String} String,Redisson存储的数据结构为 H a s h \textcolor{red}{Hash} Hash,线程在尝试获得锁的时候,除了存储当前线程标识之外,还会存储锁的重入次数。
同一个线程获得锁时,重入次数加一。释放锁时,重入次数减一,直至减至0时redis数据库删除该key的信息。为了基于原子性操作,Redisson获得锁和释放锁的逻辑都是基于Lua脚本实现的

在这里插入图片描述
执行获得锁的底层代码执行 L u a 脚本 \textcolor{red}{执行获得锁的底层代码执行Lua脚本} 执行获得锁的底层代码执行Lua脚本

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  //判定是否存在锁
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      //不存在,初次设置锁标识,重入次数为1
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      //设置锁的过期时间
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //判定是否为当前线程id 持有锁
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      // 是,则锁的重入次数加一
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      //更新锁持有的有效时长
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //未获得锁,返回当前锁的剩余的有效期时间 【单位ms】
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

KEYS[1] 锁的key
ARGV[1] 有效时长
ARGV[2] 锁的线程标识

释放锁的代码执行 L u a 脚本 \textcolor{red}{释放锁的代码执行Lua脚本} 释放锁的代码执行Lua脚本

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                 //锁不存在时,发布锁释放的消息
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                //不是当前线程标识,无法释放锁
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                //判定为当前线程标识持有锁,锁重入次数减一
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    //重入次数未减至0 时,更新锁的持有时间
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    //重入次数减至0时,redis删除锁,锁成功释放
                    "redis.call('del', KEYS[1]); " +
                    //发布锁成功释放的消息
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

    }

3.2 锁重试与锁的超时释放问题

在内部Redisson获得锁时,会执行以下方法。当成功获得锁时,返回null,获得锁失败时,获得锁剩余的持有时间。利用redis的订阅和信号量机制,在设定的等待时间内尝试重试获得锁。做到了锁重试,且不是无休止的盲目等待去获得锁的信息。
至于锁的超时释放问题,redisson 提供了watchdog机制,当不设定锁的超时时间,即默认设置为-1 时,利用watchdog机制,每隔一段时间 (internalLockLeaseTime 3),重置锁的有效时长

在这里插入图片描述

锁在最大等待时间内进行锁重试 \textcolor{red}{锁在最大等待时间内进行锁重试} 锁在最大等待时间内进行锁重试

    //超时释放时间,时间单位,线程标识id
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        //超时时间转化为ms,存入内部成员变量中
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  //锁不存在的时候,设置锁
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //获得锁失败,获得锁的剩余持有时间
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        //设定的锁等待时间,转为ms 单位
        long time = unit.toMillis(waitTime);
        //当前时间
        long current = System.currentTimeMillis();
        final long threadId = Thread.currentThread().getId();
        //获得锁的剩余有效时间。根据上述方法值返回
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // ttl=null,标识锁获得成功,返回true
        if (ttl == null) {
            return true;
        }
        
        //更新锁等待时长,减去初次获得锁的时间
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            //等待时长<0,即已超出设定的超时等待时长。获得锁失败
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        //订阅锁释放的信息
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        //在剩余等待时长中等待看是否有其它线程释放锁的信息,没有收到释放锁的信息
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            //超时,取消锁释放的订阅
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            //判定没有收到锁释方的消息,获得锁失败
            acquireFailed(threadId);
            return false;
        }

        try {
            //再次更新锁的等待时间
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                //<0,即超时,获得锁失败
                acquireFailed(threadId);
                return false;
            }
        
            //进入重新获得锁逻辑
            while (true) {
                long currentTime = System.currentTimeMillis();
                //重新获得锁的剩余时间
                ttl = tryAcquire(leaseTime, unit, threadId);
                //null,成功获得锁
                if (ttl == null) {
                    return true;
                }

                //没有获得锁成功,更新锁的等待时间
                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                     //等待时间<0,即超时,获得锁失败
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

锁超时—— w a t c h d o g 机制 \textcolor{red}{锁超时——watchdog机制} 锁超时——watchdog机制

    private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        }
        //当设定超时时间为-1时
        RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
            @Override
            public void operationComplete(Future<Boolean> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }
                Boolean ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining) {
                    //更新锁的有效时长 。
                    //scheduleExpirationRenewal是一个定时任务。任务的间隔时间是 internalLockLeaseTime / 3 。internalLockLeaseTime 设定的是30s,即锁的watchdog时间。直到用户显示的执行unlock()方法,取消该定时任务,锁成功释放。watchdog机制保证了锁的超时释放问题。
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

3.3 Redisson主从一致性解决方案

在redis 集群模式下,由于主从节点 读写分离 \textcolor{red}{读写分离} 读写分离,出现的因为主从同步出现延迟或主从同步数据失败会导致多个线程获得锁出现线程不安全问题。在Redisson中可以采用RedissonMultiLock【联锁,把多个锁联合成一把锁来看待】来解决。
即每一个redis节点都当成Master节点来看待,在获得锁时,必须每一个Redis节点都获得锁成功才算成功,释放锁时需要每一个Redis节点都释放锁成功才算成功。

M u l t i L o c k 的使用 \textcolor{red}{MultiLock的使用} MultiLock的使用

// 初始化三个锁 ,指向不同的redis节点
RLock lock1 = redissonClient.getLock("lockName1");
RLock lock2 = redissonClient.getLock("lockName2");
RLock lock3 = redissonClient.getLock("lockName3");

// 初始化三个锁的合并锁
RLock multiLock = redissonClient.getMultiLock(lock1, lock2, lock3);

// 获取锁
try{
multiLock.lock();
//do something
}finally{
 multiLock.unlock();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/825519.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HTML基础介绍1

HTML是什么 1.HTML&#xff08;HyperText Mark-up Language&#xff09;即超文本标签语言&#xff08;可以展示的内容类型很多&#xff09; 2.HTML文本是由HTML标签组成的文本&#xff0c;可以包括文字、图形、动画、声音、表格、连接等 3.HTML的结构包括头部&#xff08;He…

AI绘画:当艺术遇见智能

&#x1f482; 个人网站:【工具大全】【游戏大全】【神级源码资源网】&#x1f91f; 前端学习课程&#xff1a;&#x1f449;【28个案例趣学前端】【400个JS面试题】&#x1f485; 寻找学习交流、摸鱼划水的小伙伴&#xff0c;请点击【摸鱼学习交流群】 前言 随着人工智能技术…

API接口给开发程序提供帮助,API接口应用价值

API可以用于开发使用相同数据的其他应用程序&#xff0c;比如公司&#xff0c;他们可以创建一个API &#xff0c;允许其他开发人员使用他们的数据并用其做其他事情&#xff0c;可以是 业务相关的 网站也可以是移动应用程序。 公司作为 信息的所有者&#xff0c; 便可以免费或收…

一次有趣的Webshell分析经历

一次有趣的Webshell分析经历 1.拉取源代码2.解密后门代码3.分析webshell逻辑4.分析404的原因5.附&#xff1a;格式化后的php代码 1.拉取源代码 在对某目标做敏感目录收集时发现对方网站备份源代码在根目录下的 backup.tar.gz&#xff0c;遂下载&#xff0c;先使用D盾分析有没有…

JS逆向-小红薯X-S环境分析

目录 前言一、分析二、验证借鉴 前言 听说这是个抓的比较严格的网址&#xff0c;这里就不作太深的分析&#xff0c;仅是将一些环境点给分析出来。 详细的可以看这位大佬的&#xff08;玩的就是一个风险转移&#xff09; 小红书x-s新版分析(2023-05-30失效) 一、分析 看波封面…

Python高阶技巧 网络编程

Socket ocket (简称 套接字) 是进程之间通信一个工具&#xff0c;好比现实生活中的插座&#xff0c;所有的家用电器要想工作都是基于插座进行&#xff0c;进程之间想要进行网络通信需要socket。 Socket负责进程之间的网络数据传输&#xff0c;好比数据的搬运工。 客户端和服务…

Element快速入门

文章目录 Element简介快速入门常见组件表格组件Pagination分页Dialog对话框Form表单 案例基本页面布局页面组件实现axios异步加载数据 vue路由打包部署 本人主攻后端&#xff0c;前端的文章基本就用来记一下的 写的文章基本没什么内容&#xff0c;还望看的多包含 Element 简介…

如何在保健品行业运用IPD?

保健品是指能调节机体功能&#xff0c;不以治疗为目的&#xff0c;并且对人体不产生任何急性、亚急性或者慢性危害的产品。保健品是食品的一个种类&#xff0c;具有一般食品的共性&#xff0c;其含有一定量的功效成分&#xff0c;能调节人体的机能&#xff0c;具有特定的功效&a…

使用Goland导出UML类图

1.安装依赖&#xff1a;goplantuml go get github.com/jfeliu007/goplantuml/parser go install github.com/jfeliu007/goplantuml/cmd/goplantumllatest 验证是否安装成功&#xff1a; 在$GOPATH的bin目录下生成.exe可执行文件&#xff1a; 2.在Goland的External Tools中添…

Java课题笔记~ MyBatis的工作流程和核心对象

一、工作流程 MyBatis的工作流程是MyBatis中重要的知识点&#xff0c;整个MyBatis工作流程分为5个步骤。 编写配置文件与映射文件&#xff1a;配置文件设置数据库连接&#xff1b;映射文件设置与SQL文件相关的操作。 MyBatis通过配置文件和映射文件生成SqlSessionFactory对象…

【第四版】 信息系统项目管理高级(高项)--第五章 信息系统工程 知识点逻辑思维导图

第五章 信息系统工程 Part1 软件工程 一、架构设计 1.软件架构目的&#xff1a;解决好软件的复用、质量、维护问题2.软件架构风格 数据流风格&#xff1a;批处理序列、管道/过滤器调用/返回风格&#xff1a;主程序/子程序独立构建风格&#xff1a;通信工程、事件驱动虚拟机风格…

年龄大了转嵌入式有机会吗?

首先&#xff0c;说下结论&#xff1a;年龄并不是限制转行嵌入式软件开发的因素&#xff0c;只要具备一定的编程和电子基础知识&#xff0c;认真学习和实践&#xff0c;是可以成为优秀的嵌入式软件开发工程师的。 1、转行建议 在转行的初期阶段&#xff0c;需要耐心学习嵌入式…

测试|LoadRunner安装及介绍

测试|LoadRunner安装及介绍 文章目录 测试|LoadRunner安装及介绍1.什么是LoadRunner2.LoadRunner特点3.LoadRunner基本概念4.LoadRunner三大组件之间关系LoadRunner安装1.安装包2.安装loadrunner 1.什么是LoadRunner LoadRunner是用来模拟用户负载完成性能测试的工具。 它适用…

Ubuntu安装Anaconda并配置Python虚拟环境

目录 1、Anaconda 1.1、下载Anaconda安装包 1.2、安装Anaconda 2、Python虚拟环境 1、Anaconda 1.1、下载Anaconda安装包 这是清华的下载镜像列表&#xff1a; Index of /https://repo.anaconda.com/archive/我们下载的是Anaconda3-2023.07-1-Linux-x86_64.sh版本。 ht…

【深度学习】Transformer,Self-Attention,Multi-Head Attention

必读文章&#xff1a; https://blog.csdn.net/qq_37541097/article/details/117691873 论文名&#xff1a;Attention Is All You Need 文章目录 1、Self-Attention 自注意力机制2、Multi-Head Attention 1、Self-Attention 自注意力机制 Query&#xff08;Q&#xff09;表示当…

【秋招】算法岗的八股文之机器学习

目录 机器学习特征工程常见的计算模型总览线性回归模型与逻辑回归模型线性回归模型逻辑回归模型区别 朴素贝叶斯分类器模型 (Naive Bayes)决策树模型随机森林模型支持向量机模型 (Support Vector Machine)K近邻模型神经网络模型卷积神经网络&#xff08;CNN&#xff09;循环神经…

MPLS虚拟专用网跨域--OptionB方案

OptionB方案 跨域VPN-OptionB中,两个ASBR通过MP-EBGP交换它们从各自AS的PE设备接收的标签VPN-IPv4路由。图中,VPN LSP表示私网隧道,LSP表示公网隧道。 跨域VPN-OptionB方案中,ASBR接收本域内和域外传过来的所有跨域VPN-IPv4路由,再把VPN-IPv4路由发布出去。但MPLS VPN的…

item_get-小红薯-商品详情

一、接口参数说明&#xff1a; item_get-获得小红薯商品详情&#xff0c;点击更多API调试&#xff0c;请移步注册API账号点击获取测试key和secret 公共参数 名称类型必须描述keyString是调用key&#xff08;http://o0b.cn/iimiya&#xff09;secretString是调用密钥api_nameS…

JVM之内存结构

1.程序计数器 定义&#xff1a;程序计数器&#xff08;Program Counter Register&#xff09;是JVM中一块较小的内存空间。解释器在解释JVM指令为机器码以供CPU执行时&#xff0c;会去程序计数器当中找到jvm指令的执行地址。 作用&#xff1a;记住下一条jvm指令的执行地址 特…

机器学习-特征选择:如何使用Lassco回归精确选择最佳特征?

一、引言 特征选择在机器学习领域中扮演着至关重要的角色&#xff0c;它能够从原始数据中选择最具信息量的特征&#xff0c;提高模型性能、减少过拟合&#xff0c;并加快模型训练和预测的速度。在大规模数据集和高维数据中&#xff0c;特征选择尤为重要&#xff0c;因为不必要的…