Redis分布式锁

news2025/1/11 2:59:54

1. 什么是分布式锁

分布式锁指的是,所有服务中的所有线程都去获得同一把锁,但只有一个线程可以成功的获得锁,其他没有获得锁的线程必须全部等待,等到获得锁的线程释放掉锁之后获得了锁才能进行操作
Redis官网中,set key value有个带有NX参数的命令,这是一个原子性加锁的命令,指的是此key没有被lock时,当前线程才能加锁,如果已经被其他线程占用,就不能加锁。

2. Redisson实现Redis分布式锁的底层原理

在这里插入图片描述

2.1 添加依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.12.4</version>
</dependency>

2.2 测试用例

库存数量100,调用一次减1,小于等于0的时候返回false,表示下单失败。

@Component
public class RedissonLock {
    private static Integer inventory = 100;
    /**
     * 测试
     *
     * @return true:下单成功 false:下单失败
     */
    public Boolean redisLockTest(){
        // 获取锁实例
        RLock inventoryLock = RedissonService.getRLock("inventory-number");
        try {
            // 加锁
            inventoryLock.lock();
            if (inventory <= 0){
                return false;
            }
            inventory--;
            System.out.println("线程名称:" + Thread.currentThread().getName() + "剩余数量:" + RedissonLock.inventory);
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            // 释放锁
            inventoryLock.unlock();
        }
        return true;
    }
}

2.3 获取锁的实例

RLock inventoryLock = RedissonService.getRLock("inventory-number");

这段就是获取锁的实例,inventory-number为指定锁名称,进去getLock(String name)方法之后就能看到获取锁的实例就是在RedissonLock构造方法中,初始化一些属性。

public RLock getLock(String name) {
	return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}

RedissonLock的构造函数:

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        //命令执行器
        this.commandExecutor = commandExecutor;
        //UUID字符串(MasterSlaveConnectionManager类的构造函数 传入UUID)
        this.id = commandExecutor.getConnectionManager().getId();
        //内部锁过期时间(防止死锁,默认时间为30s)
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        //uuid+传进来的锁名称
        this.entryName = this.id + ":" + name;
        //redis消息体
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }

2.4 加锁

inventoryLock.lock();
这段代码表示加锁,一步一步进去源码里面看看,进来首先看到如下lock()方法:

public void lock() {
    try {
        this.lock(-1L, (TimeUnit)null, false);
    } catch (InterruptedException var2) {
        throw new IllegalStateException();
    }
}

可以看到这里设置了一些默认值,然后继续调用了带参lock()方法,也是在这里,完成了加锁的逻辑,源码如下:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        // 线程ID
        long threadId = Thread.currentThread().getId();
        // 尝试获取锁
        Long ttl = this.tryAcquire(leaseTime, unit, threadId);
        // 如果过期时间等于null,则表示获取到锁,直接返回,不等于null继续往下执行
        if (ttl != null) {
            // 如果获取锁失败,则订阅到对应这个锁的channel
            RFuture<RedissonLockEntry> future = this.subscribe(threadId);
            if (interruptibly) {
                // 可中断订阅
                this.commandExecutor.syncSubscriptionInterrupted(future);
            } else {
                // 不可中断订阅
                this.commandExecutor.syncSubscription(future);
            }
            try {
                // 不断循环
                while(true) {
                    // 再次尝试获取锁
                    ttl = this.tryAcquire(leaseTime, unit, threadId);
                    // ttl(过期时间)为空,说明成功获取锁,返回
                    if (ttl == null) {
                        return;
                    }
                    // ttl(过期时间)大于0 则等待ttl时间后继续尝试获取
                    if (ttl >= 0L) {
                        try {
                            ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        } catch (InterruptedException var13) {
                            if (interruptibly) {
                                throw var13;
                            }
                            ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        }
                    } else if (interruptibly) {
                        ((RedissonLockEntry)future.getNow()).getLatch().acquire();
                    } else {
                        ((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly();
                    }
                }
            } finally {
                // 取消对channel的订阅
                this.unsubscribe(future, threadId);
            }
        }
    }

再来看下获取锁的tryAcquire方法:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}

进去看下tryAcquireAsync方法:

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
        // 有设置过期时间
        if (leaseTime != -1L) {
            return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            // 没有设置过期时间
            RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
            ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                if (e == null) {
                    if (ttlRemaining == null) {
                        this.scheduleExpirationRenewal(threadId);
                    }
                }
            });
            return ttlRemainingFuture;
        }
    }

tryLockInnerAsync方法是真正执行获取锁的逻辑,它是一段LUA脚本代码。在这里,它使用的是hash数据结构。

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        this.internalLockLeaseTime = unit.toMillis(leaseTime);
        return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, 
        // 如果锁不存在,则通过hset设置它的值,并设置过期时间
        "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; 
        // 如果锁已存在,并且锁的是当前线程,则通过hincrby给数值递增1(这里显示了redis分布式锁的可重入性)
        if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; 
        // 如果锁已存在,但并非本线程,则返回过期时间ttl
        return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
    }
KEYS[1]代表的是你加锁的那个key,比如说:RLock inventoryLock = RedissonService.getRLock("inventory-number");这里你自己设置了加锁的那个锁key就是"inventory-number"。
ARGV[1]代表的就是锁key的默认生存时间,上面也截图看了,默认时间为30秒。
ARGV[2]代表的是加锁的客户端的ID,类似于后面这样: 8743c9c0-0795-4907-87fd-6c719a6b4586:1

上面这段LUA代码看起来也不是很复杂,其中有三个判断:

  • 通过exists判断锁存不存在,如果锁不存在,则设置值和过期时间,加锁成功。
  • 通过hexists判断,如果锁已存在,并且锁的是当前线程,则证明是重入锁,加锁成功,ARGV[2]的value+1,原来是1,现在变为2,当然,释放的时候也要释放两次。
  • 如果锁已存在,但锁的不是当前线程,则证明有其他线程持有锁。返回当前锁的过期时间,加锁失败

2.5 解锁

inventoryLock.unlock();
这段代码表示解锁,跟刚才一样,一步一步进去源码里面看看,进来首先看到如下unlock()方法:

public void unlock() {
        try {
            this.get(this.unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException var2) {
            if (var2.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException)var2.getCause();
            } else {
                throw var2;
            }
        }
    }

进去unlockAsync()查看,这是解锁的方法:

public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise();
        // 释放锁的方法
        RFuture<Boolean> future = this.unlockInnerAsync(threadId);
        // 添加监听器 解锁opStatus:返回值
        future.onComplete((opStatus, e) -> {
            this.cancelExpirationRenewal(threadId);
            if (e != null) {
                result.tryFailure(e);
            //如果返回null,则证明解锁的线程和当前锁不是同一个线程,抛出异常
            } else if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
                result.tryFailure(cause);
            } else {
                // 解锁成功
                result.trySuccess((Object)null);
            }
        });
        return result;
    }

再进去看下释放锁的方法:unlockInnerAsync():

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, 
        // 如果释放锁的线程和已存在锁的线程不是同一个线程,返回null
        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; 
        // 如果是同一个线程,就通过hincrby减1的方式,释放一次锁
        local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
        // 若剩余次数大于0 ,则刷新过期时间
        if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; 
        // 其他就证明锁已经释放,删除key并发布锁释放的消息
        else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; 
        return nil;", 
        Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
    }

上述代码是释放锁的逻辑。同样的,它也是有三个判断:

  • 如果解锁的线程和当前锁的线程不是同一个,解锁失败,抛出异常。
  • 如果解锁的线程和当前锁的线程是同一个,就通过hincrby减1的方式,释放一次锁。若剩余次数还大于0,则证明是重入锁,再次刷新过期时间。
  • 锁已不存在,通过publish发布锁释放的消息,解锁成功

3. 其他方式实现分布式锁

redis 分布式锁有什么缺陷?

搬运
redis分布式锁的缺点(zk分布式锁与redis分布式锁优缺点)
面试必问:如何实现Redis分布式锁
redis实现分布式锁的原理
Redis分布式锁的实现以及原理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/21539.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

中加石墨再冲刺港交所上市:2022年初至今收入为零,陈东尧为CEO

11月18日&#xff0c;中加石墨控股股份有限公司&#xff08;下称“中加石墨”&#xff09;在港交所递交招股书&#xff0c;准备在港交所主板&#xff0c;宏信为独家保荐人。据贝多财经了解&#xff0c;这是中加石墨第二次递表&#xff0c;此前曾于2022年2月28日递交上市申请材料…

通过inode结构体取到次设备号,实现LED灯的亮灭

对Linux来说&#xff0c;设备驱动也是文件。驱动控制硬件的过程&#xff0c;实际上是对驱动文件的读写操作。 对内核来说&#xff0c;如何获取唯一的文件标识呢&#xff1f;当然是通过file结构体中的&#xff0c;inode结构体识别应用层打开的到底是哪一个设备文件。 实验操作及…

数据结构题目收录(二十)

1、含有n个非叶结点的m阶B树中至少包含&#xff08;&#xff09;个关键字。 A&#xff1a;n(m1)B&#xff1a;nC&#xff1a;n(┌\ulcorner┌m/2┐\urcorner┐-1)D&#xff1a;(n-1)(┌\ulcorner┌m/2┐\urcorner┐-1)1 解析 除根结点外&#xff0c;m阶B树中的每个非叶结点至…

mongodump工具安装及使用详解

MongoDB导入导出和备份的命令工具从4.4版本开始不再自动跟随数据库一起安装&#xff0c;而是需要自己手动安装。 官方网站下载链接&#xff1a;Download MongoDB Command Line Database Tools | MongoDB 将下载的压缩包通过工具上传到服务器或者虚拟机中某个路径下并解压&…

ZYNQ图像处理项目——线性神经网络识别mnist

一、线性神经网络识别mnist 线性神经网络其实也可以叫做线性分类器&#xff0c;其实就是没有激活函数的神经网络来对目标进行识别&#xff0c;像支持向量机、多元回归等都是线性的。这边我采用了线性神经网络来识别mnist数字。 我这边是看了一本讲神经网络的书籍&#xff0c;然…

分析高数值孔径物镜的聚焦特性

摘要 高数值孔径的物镜广泛用于光刻、显微等方面。 因此&#xff0c;在仿真聚焦时考虑光的矢量性质是至关重要的。VirtualLab可以支持此类透镜的光线和场追迹分析。通过场追迹分析&#xff0c;可以清楚地显示出由于矢量效应引起的非对称焦点。相机探测器和电磁场探测器可以方便…

【MySQL】Spring Boot项目基于Sharding-JDBC和MySQL主从复制实现读写分离(8千字详细教程)

目录前言一、 介绍二、 主从复制1. 原理2. 克隆从机3. 克隆从机大坑4. 远程登陆5. 主机配置6. 从机配置7. 主机&#xff1a;建立账户并授权8. 从机&#xff1a;配置需要复制的主机9. 测试10. 停止主从同步三、 读写分离1. Sharding-JDBC介绍2. 一主一从3. 一主一从读写分离3.1 …

安服-windowslinux日志分析

目录 windows日志分析 windows事件日志 日志分析工具 Linux日志分析 windows日志分析 windows事件日志 日志分析工具 Linux日志分析 rsyrslog.conf中记录了&#xff0c;这些日志文件存储的位置以及存储的内容是关于什么的日志 其中lastlog比较重要&#xff0c;记录了用户登录…

FRP之入门篇

目录 一、前言 1、概述 2、原理 3、支持功能 4、适用场景 二、环境准备 三、使用 1、安装包下载 2、服务端部署 2.1、上传安装包 2.3、启动服务端 3、客户端部署 3.1、代理服务准备 3.2、上传安装包 3.3、客户端配置 3.4、启动客户端 4、功能验证 一、前言 1、…

Redis在Windows和Linux下的安装方法(超级详细)

Redis的两种不同系统安装1. redis在Windows下的安装2. redis在Linux下的安装1. redis在Windows下的安装 下载安装包(https://github.com/MicrosoftArchive/redis/releases) 下载完后得到安装包找一个自己熟悉的路径就可以进行解压了,我放的是D盘 解压后的文件: 进入到文件夹中…

Java集合(二):Map集合与Collections工具类

目录 Map接口 Map接口的常用方法 删除方法 判断方法 查找方法 增加方法 Map常用方法&遍历操作 HashTable 字典-Dictionary,v> HashMap、HashTable和LinkedHashMap TreeMap 【2】TreeMap-存储自定义数据类型-【内部比较器】 HashMap底层源码 jdk8-源码…

央视春晚临近,主持人李思思被爆离职,知情人火速做出回应

每年的这个时候&#xff0c;中央电视台的春晚&#xff0c;都成为人们热议的话题&#xff0c;不过今年的话题却比较火爆。大众们所关注的央视春晚&#xff0c;第一是参加春晚的明星嘉宾&#xff0c;其次就是参加春晚的节目主持人。 说起央视春晚的主持人&#xff0c;最早要追溯到…

【笔试题】【day26】

文章目录第一题&#xff08;就绪队列中的进程数&#xff09;第二题&#xff08;磁盘缓冲区存在的意义&#xff09;第三题&#xff08;进程从执行态变成就绪态的原因&#xff09;第四题&#xff08;管道&#xff09;第五题&#xff08;文件打开&#xff0c;操作系统会做什么&…

python 图像处理(一阶梯度图像和角度图像)

在整个图像处理的学习过程中可以看到&#xff0c;在很多应用中图像强度的变化情况是非常重要的信息。强度的变化可以用灰度图像I&#xff08;对于彩色图像&#xff0c;通常对每个颜色通道分别计算导数&#xff09;的x和y的方向导数和进行描述。 图像的梯度向量为&#xff1a; …

[datawhale202211]跨模态神经搜索实践:前端简介 Streamlit

结论速递 VCED项目使用一个十分简单好用的 Web 应用程序框架Streamlit。 本次任务简单学习Streamlit的基本使用。并逐行阅读学习VCED项目的前端代码&#xff0c;学习数据的传递&#xff0c;中间存储方式&#xff0c;以及功能的实现。 前情回顾 环境配置Jina生态跨模态模型 …

[python]basemap后安装后hello world代码

import matplotlib.pyplot as plt import numpy as np from mpl_toolkits.basemap import Basemap m Basemap() # 实例化一个map m.drawcoastlines() # 画海岸线 m.drawmapboundary(fill_colorwhite) m.fillcontinents(colorwhite,lake_colorwhite) # 画大洲&#x…

点云配准(四) Sparse Point Registration 算法浅析

Sparse Point Registration (SPR)是一篇2017年的点云配准算法,该算法的主要目的是对稀疏点云进行配准,并且取得了不错的成果和突破。本文一方面是对SPR配准算法模型进行了简单的原理解析以及附加代码实现,另一方面是对之前工作的总结,也算水篇博文,接下来的工作主要就是…

正统类加载器Tomcat(tomcat二探)

主流的Java Web服务器&#xff0c;如Tomcat、Jetty、WebLogic、WebSphere或其他笔者没有列举的服务器&#xff0c; 都实现了自己定义的类加载器&#xff0c;而且一般还都不止一个。因为一个功能健全的Web服务器&#xff0c;都要解决 如下的这些问题&#xff1a; 部署在同一个服…

C++设计模式之桥模式

桥模式也是设计模式中单一组件模式的一种。什么是单一组件模式呢&#xff1f; 单一组件模式&#xff1a; 在软件组件设计中&#xff0c;如果责任划分的不清晰&#xff0c;使用继承得到的结果往往是随着需求的变化而变化&#xff0c;子类急剧膨胀&#xff0c;同时充斥着重复代…

SpringBoot-Dubbo中的Customer怎么获取了注册中心的服务呢?

1.Dubbo中的Customer怎么获取了注册中心的服务呢&#xff1f; &#xff08;1&#xff09;要在pom文件中导入接口依赖 &#xff08;2&#xff09;在配置文件中指定服务中心的ip地址 &#xff08;3&#xff09;使用的dubbo自己的注解向服务中心中获取服务&#xff0c;并且将获取…