redis分布式锁详解

news2025/1/9 2:27:57

一、基本分布式锁实现

1、案例(扣减库存)

@RequestMapping("reduceStock")
    public String reduceStock() {
        String lockKey = "lock:product_101";
        String clientId = UUID.randomUUID().toString();
        // 过期时间要和设置key成为一条命令 否则分布式环境下可能机器宕机导致其它节点无法获取锁 设置clientId是为了防止高并发场景下释放其它线程的锁
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS); //jedis.setnx(k,v)
        if (!result) {
            return "error_code";
        }
        try {
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        } finally {
            if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
                stringRedisTemplate.delete(lockKey);
            }
        }
        return "end";
    }

2、存在问题

在高并发场景下,各节点执行时间会变慢,比如A线程执行过程中,key超时了,其它线程又可以加锁,比如B线程就可以加锁成功,B线程执行过程中A线程执行完了就会把key锁释放,此时B线程如果正在执行中,结果B的锁又被释放了

二、使用redission实现分布式锁

springboot下引入redisson相关依赖

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.9.0</version>
    </dependency>

    <dependency>
      <groupId>org.redisson</groupId>
      <artifactId>redisson</artifactId>
      <version>3.6.5</version>
    </dependency>

使用

@RequestMapping("reduceStock")
    public String reduceStock() {
        String lockKey = "lock:product_001";
        //获取锁对象
        RLock redissonLock = redisson.getLock(lockKey);
        //加分布式锁
        redissonLock.lock();  //  .setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
        try {
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        } finally {
            //解锁
            redissonLock.unlock();
        }
        return "end";
    }

三、redisson源码分析

1、加锁

public void lockInterruptibly() throws InterruptedException {
        this.lockInterruptibly(-1L, (TimeUnit)null);
    }

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        // 线程id
        long threadId = Thread.currentThread().getId();
        // 尝试获取锁,这部分是获取锁的关键方法
        Long ttl = this.tryAcquire(leaseTime, unit, threadId);
        // 不等于null说明获取锁失败
        if (ttl != null) {
            // redis订阅redisson_lock__channel + ":" + id + ":" + threadId
            RFuture<RedissonLockEntry> future = this.subscribe(threadId);
            this.commandExecutor.syncSubscription(future);

            try {
                while(true) {
                    // 再尝试获取一次锁
                    ttl = this.tryAcquire(leaseTime, unit, threadId);
                    if (ttl == null) {
                        // 获取成功直接返回
                        return;
                    }

                    // 大于等于0说明还没释放锁,通过semaphore阻塞时长为key的剩余有效时间
                    if (ttl >= 0L) {
                        this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } else {
                        // 说明锁时间已到期可以尝试获取锁
                        this.getEntry(threadId).getLatch().acquire();
                    }
                }
            } finally {
                this.unsubscribe(future, threadId);
            }
        }
    }
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
    }

    private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1L) {
            // 持有锁的时间不等于-1说明是重入
            return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        } else {
            // 执行Lua脚本返回结果,如果加锁成功返回null不成功返回key剩余有效时间
            RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
            ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
                public void operationComplete(Future<Boolean> future) throws Exception {
                    if (future.isSuccess()) {
                        Boolean ttlRemaining = (Boolean)future.getNow();
                        if (ttlRemaining) {
                            // 递归调用scheduleExpirationRenewal()方法延时执行
                            RedissonLock.this.scheduleExpirationRenewal(threadId);
                        }

                    }
                }
            });
            return ttlRemainingFuture;
        }
    }

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        this.internalLockLeaseTime = unit.toMillis(leaseTime);
        return this.commandExecutor.evalWriteAsync(this.getName(), 
    LongCodec.INSTANCE, command, 
    // 如果key不存在,证明锁被释放,加锁成功,设置过期时间,直接返回null
    "if (redis.call('exists', KEYS[1]) == 0) then 
        redis.call('hset', KEYS[1], ARGV[2], 1); 
        redis.call('pexpire', KEYS[1], ARGV[1]); 
    return nil; end; 
    // key和field确定的加锁的就是当前线程,加锁成功,重新设置过期时间,重入次数加1,返回null
    if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
        redis.call('hincrby', KEYS[1], ARGV[2], 1); 
        redis.call('pexpire', KEYS[1], ARGV[1]); 
        return nil; end; 
    // pttl是返回剩余key的有效时间
    return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
    }

 

2、加锁成功线程的子线程加锁延长持有锁时间

private void scheduleExpirationRenewal(final long threadId) {
        if (!expirationRenewalMap.containsKey(this.getEntryName())) {
            Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                public void run(Timeout timeout) throws Exception {
                    RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, 
                    // 如果key对应的field存在,也就是当前线程,会重新设置过期时间
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then                     
                        redis.call('pexpire', KEYS[1], ARGV[1]); 
                        return 1; end; 
                    return 0;", 
                    Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)});
                    future.addListener(new FutureListener<Boolean>() {
                        public void operationComplete(Future<Boolean> future) throws Exception {
                            RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
                            if (!future.isSuccess()) {
                                RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
                            } else {
                                if ((Boolean)future.getNow()) {
                                    RedissonLock.this.scheduleExpirationRenewal(threadId);
                                }

                            }
                        }
                    });
                }
            }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
            if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) {
                task.cancel();
            }

        }
    }

3、加锁失败的线程订阅channel

protected RFuture<RedissonLockEntry> subscribe(long threadId) {
        // 订阅
        return PUBSUB.subscribe(this.getEntryName(), this.getChannelName(), this.commandExecutor.getConnectionManager().getSubscribeService());
    }

// 如果发布了订阅回来执行这个方法
protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(unlockMessage)) {
            // semaphore释放资源 之前的tryAcquire()方法就会被唤醒
            value.getLatch().release();

            while(true) {
                Runnable runnableToExecute = null;
                synchronized(value) {
                    Runnable runnable = (Runnable)value.getListeners().poll();
                    if (runnable != null) {
                        if (value.getLatch().tryAcquire()) {
                            runnableToExecute = runnable;
                        } else {
                            value.addListener(runnable);
                        }
                    }
                }

                if (runnableToExecute == null) {
                    return;
                }

                runnableToExecute.run();
            }
        }
    }

4、释放锁发布channel

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, 
            // 如果key已经失效,证明锁已经被释放,直接发布订阅消息
            "if (redis.call('exists', KEYS[1]) == 0) then 
                redis.call('publish', KEYS[2], ARGV[1]); return 1; end;
            // 如果发现调用释放锁的线程不是当前线程返回null
            if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
                return nil;end; 
            // key对应field的value减1
            local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
            // 如果还是大于0 说明是重入锁,不用管
            if (counter > 0) then 
                redis.call('pexpire', KEYS[1], ARGV[2]); return 0; 
            else 
                // 真正的释放锁 删除key 发布订阅消息,就会触发之前的onMessage()方法,释放semaphore资源
                redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; 
            return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/994812.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

linux下shell脚本实现wordpress搭建

wordpress_auto_install.sh #!/bin/bashuser$(whoami)function wordpress_auto_install () { if [ $user "root" ];thenecho "前提&#xff1a;调整系统配置&#xff0c;如关闭selinux、firewall等&#xff01;"sed -i s/SELINUXenforcing/SELINUXdis…

光线投射之伪3d

光线投射是一种在 2D 地图中创建 3D 透视的渲染技术。当计算机速度较慢时&#xff0c;不可能实时运行真正的 3D 引擎&#xff0c;光线投射是第一个解决方案。光线投射可以非常快&#xff0c;因为只需对屏幕的每条垂直线进行计算。 光线投射的基本思想如下&#xff1a;地图是一…

rtthread下基于spi device架构MCP25625驱动

1.CAN驱动架构 由于采用了RTT的spi device架构&#xff0c;不能再随心所遇的编写CAN驱动 了&#xff0c;之前内核虽然采用了RTT内核&#xff0c;但是驱动并没有严格严格按RTT推荐的架构来做&#xff0c;这次不同了&#xff0c;上次是因为4个MCP25625挂在了4路独立的SPI总线上&…

【图论】Floyd

算法提高课笔记&#xff09; 文章目录 例题牛的旅行题意思路代码 排序题意思路代码 观光之旅题意思路代码 例题 牛的旅行 原题链接 农民John的农场里有很多牧区&#xff0c;有的路径连接一些特定的牧区。 一片所有连通的牧区称为一个牧场。 但是就目前而言&#xff0c;你…

程序依赖相关知识点(PDG,SDG)

什么叫可达性 变量v的定义d&#xff1a;对变量v的赋值语句称为变量v的定义 变量v的使用&#xff1a;在某个表达式中引用变量v的值 当变量v被再次赋值时&#xff0c;上一次赋值对变量v的定义d就被kill掉了 如果定义d到点p之间存在一条路径&#xff0c;且在路径中定义d没有被…

Java 多线程系列Ⅵ(并发编程的五大组件)

JUC 组件 前言一、Callable二、ReentrantLock三、Atomic 原子类四、线程池五、Semaphore六、CountDownLatch 前言 JUC&#xff08;Java.util.concurrent&#xff09;是 Java 标准库中的一个包&#xff0c;它提供了一组并发编程工具&#xff0c;本篇文章就介绍几组常见的 JUC 组…

汇川PLC学习Day2:编写检测IO端口状态程序

汇川PLC学习Day2&#xff1a;编写检测IO端口状态程序 一、 新增IO和模拟量模块 IO组态界面 模块参数设置 程序编写 想法是将DA模块的通道0接到AD模块的通道0&#xff0c;将DA模块的通道1接到AD模块的通道1&#xff0c;PLC本身发模拟量给自己PLC收模拟量转换&#xff0c;…

MySQL 8.0.25版本下载、安装及配置(Windows 10/11 64位)详细教程【超详细,保姆级教程!!!】

本文介绍关于windows 11如何安装配置MySQL 8.0.25版本的详细步骤 MySQL下载地址&#xff08;官网&#xff09; 一、下载MySQL 8.0.25 1、进入官网&#xff0c;选择版本 8.0.25 2、下载MySQL压缩包 3、下载完成后将压缩包解压至方便自己查找的位置&#xff08;切记&#xf…

Tensor数据转换为稀疏矩阵

Tensor数据转换为稀疏矩阵 一、稀疏矩阵 原文链接 常用的稀疏矩阵存储格式有COO&#xff0c;CSR/CSC&#xff0c;LIL 1.COO COO(Coordinate format )是最为简单的格式&#xff0c;以三元组的形式存储稀疏矩阵。记录矩阵中非零元素的数值和所在的行序号和列序号。形式为&am…

工商业储能CE认证电表ADW300

安科瑞 华楠 ADW300 无线计量仪表主要用于计量低压网络的三相有功电能&#xff0c;具有体积小、精度高、功能丰富等优点&#xff0c;并且可选通讯方式多&#xff0c;可支持 RS485 通讯和 Lora、NB、4G、wifi 等无线通讯方式&#xff0c;增加了外置互感器的电流采样模式&#x…

【数据结构】线性表

线性表 顺序表链式存储单链表双链表 知识目录 顺序表 概念&#xff1a;用一组地址连续的存储单元依次存储线性表的数据元素&#xff0c;这种存储结构的线性表称为顺序表。 特点&#xff1a;逻辑上相邻的数据元素&#xff0c;物理次序也是相邻的。 只要确定好了存储线性表的…

基本数据类型和包装类型 使用规范

使用规范 1 概念1.1 基本数据类型1.2 包装类型1.3 对应关系1.4 自动装箱/拆箱 2 变量类型2.1 全局变量2.1.1 常量&#xff08;Constants&#xff09;2.1.2 类变量&#xff08;Class Variables&#xff09;2.1.3 实例变量&#xff08;Instance Variables&#xff09; 2.2 局部变…

快速实现抖音上下滑动,你不知道的ViewPager2用法,信息量巨大,建议收藏点赞。老tier~

万能ViewPager2适配器–SmartViewPager2Adapter 特点功能 完全脱离xml&#xff0c;所有效果只需要通过api调用 具体功能&#xff1a;1. 两句代码实现抖音列表效果2. 无感且丝滑&#xff0c;动态从头部或者底部加载数据3. 设置上下加载监听&#xff0c;再达到预加载limit的时…

用python实现基本数据结构【02/4】

*说明 如果需要用到这些知识却没有掌握&#xff0c;则会让人感到沮丧&#xff0c;也可能导致面试被拒。无论是花几天时间“突击”&#xff0c;还是利用零碎的时间持续学习&#xff0c;在数据结构上下点功夫都是值得的。那么Python 中有哪些数据结构呢&#xff1f;列表、字典、集…

STM32初学-外部RTC时钟芯片DS3231

RTC(Real_Time Clock)即实时时钟&#xff0c;它是电子产品中不可或缺的东西。其最直接的作用就是时钟功能。细心的朋友可以发现&#xff0c;当我们的电脑或者手机没联网时&#xff0c;仍然可以正常显示日期与时钟&#xff0c;这就是RTC的功劳。 RTC的运行无需网络连接&#xff…

python创建exe文件

1、搭建环境 pip install pyinstaller 2、准备测试代码 exe_test.py import timeprint("hello") print("hello") print("hello") print("hello")time.sleep(5) 注&#xff1a;添加sleep以便在执行exe文件的时候能看到结果 3、生…

在Windows操作系统上安装PostgreSQL数据库

在Windows操作系统上安装PostgreSQL数据库 一、在Windows操作系统上安装PostgreSQL数据库 一、在Windows操作系统上安装PostgreSQL数据库 点击 PostgreSQL可跳转至PostGreSQL的官方下载地址。 &#xff08;1&#xff09; &#xff08;2&#xff09;选择安装的目录&#xff…

入门人工智能 —— 使用 Python 进行文件读写,并完成日志记录功能(4)

入门人工智能 —— 使用 Python 进行文件读写&#xff08;4&#xff09; 入门人工智能 —— 使用 Python 进行文件读写打开文件读取文件内容读取整个文件逐行读取文件内容读取所有行并存储为列表 写入文件内容关闭文件 日志记录功能核心代码&#xff1a;完整代码&#xff1a;运…

小工具——筛选图像小工具

最近在公司手动筛图片&#xff0c;需要将某些含有检测目标的图像手动筛选出来用于做新模型的测试。我最开始是两个文件夹&#xff0c;来回复制粘贴&#xff0c;后来感觉这种效率太低了&#xff0c;就随手写了一个图像筛查小工具。代码如下&#xff1a; import sys from PyQt5.…

图论-图的深度优先遍历-Java

回顾力扣144/94//145/102/589/590/429&#xff0c;熟练掌握递归和非递归写法。 图论不强调非递归。 使用邻接表 1个连通分量 Graph.java package Chapt02_DFS; import java.io.File; import java.io.IOException; import java.util.TreeSet; import java.util.Scanner;///…