Redis分布式锁 - 基础实现及优化

news2024/11/26 14:57:09

应用场景

  • 互联网秒杀
  • 抢优惠卷
  • 接口幂等性校验

代码示例

案例1 - StringRedisTemplate基础实现

package com.wangcp.redisson;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class IndexController {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 模拟下单减库存的场景
     * @return
     */
    @RequestMapping(value = "/duduct_stock")
    public String deductStock(){
        // 从redis 中拿当前库存的值
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + "");
            System.out.println("扣减成功,剩余库存:" + realStock);
        }else{
            System.out.println("扣减失败,库存不足");
        }
        return "end";
    }
}

我们分析下在高并发场景下会存在什么问题, 假设在redis中库存(stock)初始值是100。5个客户端同时请求该接口,可能就会存在同时执行

int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

这行代码,获取到的值都为100,紧跟着判断大于0后都进行-1操作,最后设置到redis 中的值都为99。但正常执行完成后redis中的值应为 95。

案例2 - 使用synchronized 实现单机锁

在遇到案例1的问题后,大部分人的第一反应都会想到加锁来控制事务的原子性,如下代码所示:

@RequestMapping(value = "/duduct_stock")
public String deductStock(){
    synchronized (this){
        // 从redis 中拿当前库存的值
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + "");
            System.out.println("扣减成功,剩余库存:" + realStock);
        }else{
            System.out.println("扣减失败,库存不足");
        }
    }
    return "end";
}

现在当有多个请求访问该接口时,同一时刻只有一个请求可进入方法体中进行库存的扣减,其余请求等候。

但我们都知道,synchronized 锁是属于JVM级别的,也就是俗称的“单机锁”。现在基本大部分企业使用的都是集群部署,以上代码在集群部署的情况下还能保证库存数据一致性吗?
在这里插入图片描述
答案是不能,如图所示,请求经Nginx分发后,可能存在多个服务同时从Redis中获取库存数据,此时只加synchronized是无效的,并发越高,出现问题的几率就越大。

案例3 - 使用SETNX实现分布式锁

setnx:将 key 的值设为 value,当且仅当 key 不存在。 ,若给定 key 已经存在,则 setnx 不做任何动作。

/**
 * 模拟下单减库存的场景
 * @return
 */
@RequestMapping(value = "/duduct_stock")
public String deductStock(){
    String lockKey = "product_001";
    // 使用 setnx 添加分布式锁
    // 返回 true 代表之前redis中没有key为 lockKey 的值,并已进行成功设置
    // 返回 false 代表之前redis中已经存在 lockKey 这个key了
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "wangcp");
    if(!result){
        // 代表已经加锁了
        return "error_code";
    }

    // 从redis 中拿当前库存的值
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    if(stock > 0){
        int realStock = stock - 1;
        stringRedisTemplate.opsForValue().set("stock",realStock + "");
        System.out.println("扣减成功,剩余库存:" + realStock);
    }else{
        System.out.println("扣减失败,库存不足");
    }

    // 释放锁
    stringRedisTemplate.delete(lockKey);
    return "end";
}

我们知道 Redis 是单线程执行,现在再看案例2中的流程图时,哪怕高并发场景下多个请求都执行到了setnx的代码,redis会根据请求的先后顺序进行排列,只有排列在队头的请求才能设置成功。其它请求只能返回“error_code”。

当setnx设置成功后,可执行业务代码对库存扣减,执行完成后对锁进行释放。

那么以上代码已经完美实现分布式锁了吗?能够支撑高并发场景?答案是否定的,还是存在很多问题的,离真正的分布式锁还差的很远。

存在的问题:
死锁:假如第一个请求在setnx加锁完成后,执行业务代码时出现了异常,那么释放锁的代码就无法执行,后面所有的请求也都无法进行操作。

针对死锁的问题,对代码再次进行优化,添加 try-finally,在 finally 中添加释放锁代码,这样无论如何都会执行释放锁代码,如下所示:

   /**
     * 模拟下单减库存的场景
     * @return
     */
@RequestMapping(value = "/duduct_stock")
public String deductStock(){
    String lockKey = "product_001";

    try{
        // 使用 setnx 添加分布式锁
        // 返回 true 代表之前redis中没有key为 lockKey 的值,并已进行成功设置
        // 返回 false 代表之前redis中已经存在 lockKey 这个key了
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "wangcp");
        if(!result){
            // 代表已经加锁了
            return "error_code";
        }
        // 从redis 中拿当前库存的值
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + "");
            System.out.println("扣减成功,剩余库存:" + realStock);
        }else{
            System.out.println("扣减失败,库存不足");
        }
    }finally {
        // 释放锁
        stringRedisTemplate.delete(lockKey);
    }

    return "end";
}

经过改进后的代码是否还存在问题呢?我们思考正常执行的情况下应该是没有问题,但我们假设请求在执行到业务代码时服务突然宕机了,或者正巧你的运维同事重新发版,粗暴的 kill -9 掉了呢,那代码还能执行 finally 吗?

案例4 - 加入过期时间

针对想到的问题,对代码再次进行优化,加入过期时间,这样即便出现了上述的问题,在时间到期后锁也会自动释放掉,不会出现“死锁”的情况。

@RequestMapping(value = "/duduct_stock")
public String deductStock(){
    String lockKey = "product_001";

    try{
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"wangcp",10,TimeUnit.SECONDS);
        if(!result){
            // 代表已经加锁了
            return "error_code";
        }
        // 从redis 中拿当前库存的值
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + "");
            System.out.println("扣减成功,剩余库存:" + realStock);
        }else{
            System.out.println("扣减失败,库存不足");
        }
    }finally {
        // 释放锁
        stringRedisTemplate.delete(lockKey);
    }

    return "end";
}

事实上给锁加入过期时间后就可以了?超时时间设置的10s真的合适吗?
在这里插入图片描述
假设同一时间有三个请求。

请求1首先加锁后需执行15秒,但在执行到10秒时锁失效释放。

请求2进入后加锁执行,在请求2执行到5秒时,请求1执行完成进行锁释放,但此时释放掉的是请求2的锁。

请求3在请求2执行5秒时开始执行,但在执行到3秒时请求2执行完成将请求3的锁进行释放。

现在只是模拟3个请求便可看出问题,如果在真正高并发的场景下,可能锁就会面临“一直失效”或“永久失效”。

那么具体问题出在哪里?

  • 1.存在请求释放锁时释放掉的并不是自己的锁
  • 2.超时时间过短,存在代码未执行完便自动释放

针对问题对应的解决方法:

  • 针对问题1,想到在请求进入时生成一个唯一id,使用该唯一id作为锁的value值,释放时先进行获取比对,比对相同时再进行释放,这样就可以解决释放掉其它请求锁的问题。
  • 针对问题2,不断的延长过期时间真的合适吗?设置短了存在超时自动释放的问题,设置长了又会出现宕机后一段时间锁无法释放的问题,虽然不会再出现“死锁”。但是…

案例5 - Redisson分布式锁

package com.wangcp.redisson;

import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class IndexController {

    @Autowired
    private RedissonClient redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 模拟下单减库存的场景
     * @return
     */
    @RequestMapping(value = "/duduct_stock")
    public String deductStock(){
        String lockKey = "product_001";
        // 1.获取锁对象
        RLock redissonLock = redisson.getLock(lockKey);
        try{
            // 2.加锁
            redissonLock.lock();  // 等价于 setIfAbsent(lockKey,"wangcp",10,TimeUnit.SECONDS);
            // 从redis 中拿当前库存的值
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            if(stock > 0){
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock",realStock + "");
                System.out.println("扣减成功,剩余库存:" + realStock);
            }else{
                System.out.println("扣减失败,库存不足");
            }
        }finally {
            // 3.释放锁
            redissonLock.unlock();
        }
        return "end";
    }
}

Redisson 分布式锁原理解析

在这里插入图片描述

Redisson 底层源码

点击 lock() 方法,查看源码,最终看到以下代码

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

加锁最终执行的这段 lua 脚本

if (redis.call('exists', KEYS[1]) == 0) then 
    redis.call('hset', KEYS[1], ARGV[2], 1); 
    redis.call('pexpire', KEYS[1], ARGV[1]); 
    return nil; 
end;

脚本的主要逻辑为:

  • exists 判断 key 是否存在
  • 当判断不存在则设置 key
  • 然后给设置的key追加过期时间

这样来看其实和前面案例中的实现方法好像没什么区别,但实际上并不是。

这段lua脚本命令在Redis中执行时,会被当成一条命令来执行,能够保证原子性,要么都成功,要么都失败。

在源码中看到Redssion的许多方法实现中很多都用到了lua脚本,这样能够极大的保证命令执行的原子性。

Redisson锁自动“续期”

private void scheduleExpirationRenewal(final long threadId) {
    if (expirationRenewalMap.containsKey(getEntryName())) {
        return;
    }

    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {

            RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                                                                     "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                                                     "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                                                     "return 1; " +
                                                                     "end; " +
                                                                     "return 0;",
                                                                     Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    expirationRenewalMap.remove(getEntryName());
                    if (!future.isSuccess()) {
                        log.error("Can't update lock " + getName() + " expiration", future.cause());
                        return;
                    }

                    if (future.getNow()) {
                        // reschedule itself
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
        task.cancel();
    }
}

这段代码是在加锁后开启一个守护线程进行监听。Redisson超时时间默认设置30s,线程每10s调用一次判断锁还是否存在,如果存在则延长锁的超时时间。

现在再回过头来看看案例5中的加锁代码与原理图,其实到这种程度已经可以满足很多公司的使用了,并且很多公司也确实是这样用的。

实际上还是存在问题的, 例如:

  • Redis 在实际部署使用时都是集群部署的,在高并发场景下加锁,当把key写入到master节点后,master还未同步到slave节点时master宕机了,原有的slave节点经过选举变为了新的master节点,此时可能就会出现锁失效问题。
  • 通过分布式锁的实现机制可以知道,高并发场景下只有加锁成功的请求可以继续处理业务逻辑。那就出现了多个线程都来加锁,但有且仅有一个加锁成功了,剩余的都在等待。其实分布式锁与高并发在语义上就是相违背的,我们的请求虽然都是并发,但Redis帮我们把请求进行了排队执行,也就是把并行转为了串行。串行执行的代码肯定不存在并发问题了,但是程序的性能肯定也会因此受到影响。

针对这些问题,有什么好的解决方案呢?

互联网产品需要满足CAP原则(一致性、可用性、分区容错性),Redis满足AP(可用性、分区容错性),如果想要解决上述问题就需要寻找满足CP(一致性、分区容错性)的技术。如zookeeper,zookeeper的集群间数据同步机制是当主节点接收数据后不会立即返回给客户端成功的反馈,它会先与子节点进行数据同步,半数以上的节点都完成同步后才会通知客户端接收成功。并且如果主节点宕机后,根据zookeeper的Zab协议(Zookeeper原子广播)重新选举的主节点一定是已经同步成功的。

Redisson与zookeeper分布式锁我们如何选择呢?

如果并发量没有那么高,可以用zookeeper来做分布式锁,但是它的并发能力远远不如Redis。
如果你对并发要求比较高的话,那就用Redis,偶尔出现的主从架构锁失效的问题其实是可以容忍的。

关于提升性能,可以参考ConcurrentHashMap的锁分段技术的思想,例如我们代码的库存量当前为1000,那可以分为10段,每段100,然后对每段分别加锁,这样就可以同时执行10个请求的加锁与处理,当然有要求的同学还可以继续细分。但其实Redis的Qps已经达到10W+了,没有特别高并发量的场景下也是完全够用的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/92467.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

以流量为王的时代,如何获得不错的流量,泰山众筹如何脱颖而出?

由于互联网、疫情等因素的影响&#xff0c;实体业务变得越来越困难。许多实体店已经开始转向在线电子商务&#xff0c;但运营一个好的电子商务平台并不容易。没有稳定的流量和忠实的用户&#xff0c;很难达到理想的效果。那到底如何才能获得不错的“流量”呢&#xff1f;泰山众…

第十四届蓝桥杯集训——JavaC组第十三篇——for循环

第十四届蓝桥杯集训——JavaC组第十三篇——for循环 目录 第十四届蓝桥杯集训——JavaC组第十三篇——for循环 for循环(重点) 倒序迭代器 for循环死循环 for循环示例 暴力循环 等差数列求和公式 基础循环展开 循环控制语句 break结束 continue继续 for循环(重点) f…

【图像融合】多尺度奇异值分解图像融合【含Matlab源码 2040期】

⛄一、多尺度奇异值分解的偏振图像融合去雾算法简介 立足于提高传统算法的适应性&#xff0c;提高去雾图像的质量&#xff0c;本文设计了如图 2 所示的去雾算法流程。首先&#xff0c;使用基于最小二乘方法计算出更加精确的偏振信息&#xff0c;改善了以往偏振信息计算不准确的…

基于Qt(C++)实现(PC)学生信息管理系统【100010043】

学生信息管理系统 一、系统指南 本系统为表格式的学生信息管理系统&#xff0c;提供了文件新建、打开及保存功能&#xff0c;还可在表格中对数据进行增加、删除、修改、搜索&#xff0c;下面将一一介绍这些功能 1、新建文件 新建文件将会产生一个全新的空表格&#xff0c;…

基于java+springmvc+mybatis+vue+mysql的少儿编程管理系统

项目介绍 在国家重视教育影响下&#xff0c;教育部门的密确配合下&#xff0c;对教育进行改革、多样性、质量等等的要求&#xff0c;使教育系统的管理和运营比过去十年前更加理性化。依照这一现实为基础&#xff0c;设计一个快捷而又方便的网上少儿编程教育网站系统是一项十分…

原来这就是BFC,遇到样式问题别瞎搞了

看到一篇前端面试题&#xff0c;第一个问题是 什么是BFC &#xff1f;&#xff0c;一下子唤起了我的辛酸回忆&#xff0c;那是在七月&#xff0c;在沪漂找工作的路上&#xff0c;预约的一个电话面试&#xff0c;眼看着时间就要到了&#xff0c;人生第一次进星巴克&#xff0c;提…

leetcode 337. 打家劫舍 III-[python3图解]-递归+记忆化搜索

题目 小偷又发现了一个新的可行窃的地区。这个地区只有一个入口&#xff0c;我们称之为root。除了root之外&#xff0c;每栋房子有且只有一个“父“房子与之相连。一番侦察之后&#xff0c;聪明的小偷意识到“这个地方的所有房屋的排列类似于一棵二叉树”。 如果 两个直接相连…

【Python百日进阶-数据分析】Day130 - plotly柱状图(条形图):go.bar()实例1

文章目录4.2 plotly.graph_objects条形图4.2.1 go的基本条形图4.2.2 分组条形图4.2.3 堆叠条形图4.2.4 带悬停文本的条形图4.2.5 带直接文本标签的条形图4.2.6 使用uniformtext控制文本大小4.2.7 旋转条形图标签4.2.8 自定义单个条颜色4.2.9 自定义单个条的宽度4.2.10 自定义单…

NetInside网络分析为企业IT工作保驾护航(二)

前言 某企业的DMS经销商在线系统&#xff0c;最近一段时间运维人员经常接到反馈&#xff0c;DMS使用出现大量访问慢的情况,针对此情况进行监测分析。 该企业已部署NetInside流量分析系统&#xff0c;使用流量分析系统提供实时和历史原始流量&#xff0c;重点针对DMS系统性能进…

MobileNetV3基于NNI剪枝操作

NNI剪枝入门可参考&#xff1a;nni模型剪枝_benben044的博客-CSDN博客_nni 模型剪枝 1、背景 本文的剪枝操作针对CenterNet算法的BackBone&#xff0c;即MobileNetV3算法。 该Backbone最后的输出格式如下&#xff1a; 假如out model(x)&#xff0c;则x[-1][hm]可获得heatma…

Spring框架04(Spring框架中AOP)

一、spring中bean的生命周期 1.singleton 容器启动的时候创建对象&#xff0c;容器正常关闭时销毁对象 2.prototype 获取对象的时候创建对象&#xff0c;spring容器不负责对象的销毁 生命周期的过程&#xff1a; 1.调用无参创建对象 2.调用set方法初始化属性 3.调用初始化…

知识付费系统源码,可直接打包成app、H5、小程序

知识付费&#xff0c;在近几年来&#xff0c;越来越受到大家的关注。知识付费系统源码是将知识通过互联网渠道变现的方式。以知识为载体&#xff0c;通过付费获得在线知识以及在线学习所带来的收益。知识付费平台主要以分享知识内容&#xff0c;内容分为直播、录播、图文等形式…

【从零开始学爬虫】采集收视率排行数据

l 采集网站 ​【场景描述】采集收视率排行数据。 【源网站介绍】收视率排行网提供收视率排行,收视率查询,电视剧收视率,综艺节目收视率和电视台收视率信息。 【使用工具】前嗅ForeSpider数据采集系统 【入口网址】http://www.tvtv.hk/archives/category/tv 【采集内容】 …

产线工控安全

场景描述 互联网飞速发展&#xff0c;工业4.0的大力推行&#xff0c;让工控产线更加智能化&#xff0c;生产网已经发展成一个组网的计算机环境。这些工控产线组网中的所有工控设备现在统称为主机。 信息化虽然提高各大企业的生产效率&#xff0c;但也会遭遇各类安全问题&…

Problem B: 算法10-15~10-17:基数排序

Problem Description 基数排序是一种并不基于关键字间比较和移动操作的排序算法。基数排序是一种借助多关键字排序的思想对单逻辑关键字进行排序的方法。 通过对每一个关键字分别依次进行排序&#xff0c;可以令整个关键字序列得到完整的排序。而采用静态链表存储记录&#xf…

FAST-LIO论文阅读

1. 摘要 本文提出一个开销较小且鲁棒的激光惯性里程计框架。使用迭代扩展卡尔曼滤波器来实现激光雷达特征点和IMU的紧耦合&#xff0c;可以在快速运动、有噪声或重复纹理等退化环境中鲁棒地定位。为了在测量数据量很大的情况下降低开销&#xff0c;提出了计算卡尔曼增益的新公…

如何做电商运营

电商是通过电子设备和网络技术进行的商业模式&#xff0c;通俗的来说也就是通过网络结识买家完成最终交易。电子商务凭借它便宜&#xff0c;丰富和方便的特性&#xff0c;迅速占领了中国一大半的经济市场&#xff0c;作为个人怎么才能做好电商呢&#xff1f;掌握这几个要点就不…

物联网开发笔记(63)- 使用Micropython开发ESP32开发板之控制ILI9341 3.2寸TFT-LCD触摸屏进行LVGL图形化编程:显示中文

一、目的 这一节我们学习如何使用我们的ESP32开发板来控制ILI9341 3.2寸TFT-LCD触摸屏进行LVGL图形化编程的第一步&#xff1a;显示中文。 二、环境 ESP32 3.2寸 ILI9341触摸屏 Thonny IDE 几根杜邦线 Win10 接线方法&#xff1a;请看上一篇文章。 三、流程介绍 …

Verilog刷题HDLBits——Conwaylife

Verilog刷题HDLBits——Conwaylife题目描述代码结果题目描述 Conway’s Game of Life is a two-dimensional cellular automaton. The “game” is played on a two-dimensional grid of cells, where each cell is either 1 (alive) or 0 (dead). At each time step, each c…

【图像融合】小波变换(加权平均法+局域能量+区域方差匹配)图像融合【含Matlab源码 1819期】

⛄一、小波变换彩色图像融合简介 1 前言 图像融合是将不同传感器所获得的多个图像根据某种算法进行融合处理,取长补短,使一幅图像能够更清楚、更准确地反映多幅图像的信息,多聚焦彩色图像融合是图像融合的一个分支。目前在各种图像采集与分析系统中已使用的CCD数码相机,对于聚…