Redis-Redis 高并发分布式锁

news2024/11/29 22:53:42

集群分布式场景高并发

1.negix配置代理和路由

高并发场景超卖问题

1.使用原生redis控制超卖时(若是商品,则可以将商品id作为锁对象),会遇到的问题

问题一:若直接使用:将获取锁的对象和设置的超时的时间分开,则不能控制原子性,如下所示

         Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "zhuge");
        stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

 问题二:若直接使用:将获取锁的对象和设置的超时的时间放在一个原子操作里执行时,在临界条件下,当程序执行到最后准备释放锁时候,锁的超时时间已到,则此时的锁成为已过期,则释放不了锁而当下一个线程也来执行任务时,前一个任务将这个任务所拿的所给释放掉了(释放掉不属于自己的锁对象);则引入redisson分布式锁来解决当前的问题,redisson具有锁续命机制

@RestController
public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedisTemplate redisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() {
        String lockKey = "lock:product_101";
        //Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "zhuge");
        //stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
       String clientId = UUID.randomUUID().toString();
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS); //jedis.setnx(k,v)
        if (!result) {
            return "error_code";
        }
  
        try {
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        } finally {
           if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
                stringRedisTemplate.delete(lockKey);
            }
        }


        return "end";
    }

使用分布式锁redisson

redisson使用

引入对应的redission的jar包

        <dependency>
			<groupId>org.redisson</groupId>
			<artifactId>redisson</artifactId>
			<version>3.6.5</version>
		</dependency>

 设置redission配置


@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public Redisson redisson() {
        // 此为单机模式
        Config config = new Config();
        config.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0);
        return (Redisson) Redisson.create(config);
    }

}

 redission的基本使用


@RestController
public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedisTemplate redisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() {
        String lockKey = "lock:product_101";
        //Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "zhuge");
        //stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
        /*String clientId = UUID.randomUUID().toString();
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS); //jedis.setnx(k,v)
        if (!result) {
            return "error_code";
        }*/
        //获取锁对象
        RLock redissonLock = redisson.getLock(lockKey);
        //加分布式锁
        redissonLock.lock();  //  .setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
        try {
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        } finally {
            /*if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
                stringRedisTemplate.delete(lockKey);
            }*/
            //解锁
            redissonLock.unlock();
        }


        return "end";
    }

Redission执行的逻辑流程

 Redission分布式锁加锁源码分析

        redissonLock.lock(); 

加锁 

 @Override
    public void lockInterruptibly() throws InterruptedException {
        lockInterruptibly(-1, null);
    }

执行 lockInterruptibly加锁逻辑

@Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId); //尝试去加锁,返回的时加锁后的过期时间
        // lock acquired
        if (ttl == null) {  //若ttl为null ,则表示加锁成功 ;若ttl不为null ,则往下走
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId); //发布订阅,订阅前者执行的任务若提前执行完,则唤醒机制,去重新获取锁
        commandExecutor.syncSubscription(future);

        try {
            while (true) { //进入循环
                ttl = tryAcquire(leaseTime, unit, threadId); //再次尝试获取锁,返回加锁成功后的过期时间
                // lock acquired
                if (ttl == null) {  //若ttl为null ,则表示加锁成功 ;若ttl不为null ,则往下走
                    break;
                }

                // waiting for message
                if (ttl >= 0) { 若ttl大于o
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
//进行间歇性锁自旋逻辑,不占用cpu资源
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

执行tryAcquire(leaseTime, unit, threadId)尝试加锁逻辑

 private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
//leaseTime 默认设置为-1
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//执行加锁逻辑
        ttlRemainingFuture.addListener(new FutureListener<Long>() { //异步执行,锁续命逻辑
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) { //若加锁不成功,则退出
                    return;
                }

                Long ttlRemaining = future.getNow(); //若加锁成功,则ttlRemaining 为null
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId); //加锁成功,则执行锁续命逻辑
                }
            }
        });
        return ttlRemainingFuture;
    }

执行tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); 加锁逻辑

  <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
//通过lua脚本来执行加锁逻辑,来保证原子性
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            //执行加锁逻辑 (key,argv与下面所传参数一一对应)
                  "if (redis.call('exists', KEYS[1]) == 0) then " +      
            //KEY[1]表示下面的getName(),
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +    
            //ARGV[2]表示下面的getLockName(threadId)
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " + 
            //ARGV[1]表示下面的internalLockLeaseTime
                      "return nil; " +
                  "end; " +
//执行锁重入逻辑(一个线程对同一个锁对象进行多次加锁,此为重入锁逻辑)
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);", //设置返回过期时间
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
    

执行scheduleExpirationRenewal(threadId);锁续命逻辑

private void scheduleExpirationRenewal(final long threadId) {
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }

        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                //执行锁续命逻辑
                RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//KEYS[1]表示getName(),ARGV[2]表示锁对象getLockName(threadId), ARGV[1]表示过期时间
//判断当前锁的对象,为当前的线程对象,那么则当前的锁的对象设置原始的过期时间,以达到续命效果
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return 1; " +
                        "end; " +
                        "return 0;",
                          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                
                //异步监听执行        
                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        //判断是否任然持有锁,是的话,则getNow()为null
                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);//再次执行续命逻辑
                        }
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
            task.cancel();
        }
    }

在外层未获取到锁的线程  getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);间隙自旋获取锁对象

getLatch()表示信号量,信号量为1,则表示阻塞状态,最终通过发布订阅方式来唤醒当前被阻塞的线程,唤醒后则执行获取锁的逻辑 doAcquireSharedNanos(arg, nanosTimeout);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1253752.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

虚拟机可ping树莓派树莓派无法ping虚拟机 的解决办法

问题描述 在学习交叉编译的过程中&#xff0c;发现了树莓派无法ping通虚拟机的问题。所以我尝试了各种ping&#xff0c;发现&#xff1a; 虚拟机可以ping通树莓派和主机树莓派可以ping通主机主机可以ping通树莓派和虚拟机唯独树莓派没法ping通虚拟机 尝试各种方法后找到一种…

Docker Swarm总结+Jenkins安装配置与集成(4/4)

博主介绍&#xff1a;Java领域优质创作者,博客之星城市赛道TOP20、专注于前端流行技术框架、Java后端技术领域、项目实战运维以及GIS地理信息领域。 &#x1f345;文末获取源码下载地址&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&#x1f3fb;…

Python中zip()函数用法解析

打包 zip() 函数是 Python 中一个非常有用的函数&#xff0c;它用于将多个可迭代对象组合成一个元组序列&#xff0c;依次将来自每个可迭代对象的元素打包在一起。 基本的语法是 zip(iterable1, iterable2, ...)&#xff0c;其中 iterable1, iterable2, ... 是要合并的可迭代…

python游戏开发pygame初步

文章目录 安装和示例移动物体优化 安装和示例 顾名思义&#xff0c;PyGame就是用来做游戏的Python库&#xff0c;提供了许多游戏开发功能&#xff0c;如图像处理、音频播放、事件处理、碰撞检测等等。从这个角度来说&#xff0c;pygame不仅是一个游戏库&#xff0c;同时也是一…

【代码】基于量子粒子群算法(QPSO)优化LSTM的风电、负荷等时间序列预测算法matlab

程序名称&#xff1a;基于量子粒子群算法&#xff08;QPSO&#xff09;优化LSTM的风电、负荷等时间序列预测算法 实现平台&#xff1a;matlab 代码简介&#xff1a;代码是基于QPSO-LSTM的负荷、光伏、风电等时间序列预测&#xff0c;MATLAB编写。包含LSTM&#xff08;长短时记…

leetcode刷题详解四

25. K 个一组翻转链表 这道题本质上还是用的反转前n个链表的思想。 具体细节如下&#xff1a; 先调用一次函数&#xff0c;使用一个newHead接受返回值&#xff0c;这个是为了方便最后函数的返回。 调用reverseN这个函数的时候&#xff0c;要标记反转这段链表的前置节点和后置节…

基于C#实现三元组

我们知道矩阵是一个非常强大的数据结构&#xff0c;在动态规划以及各种图论算法上都有广泛的应用&#xff0c;当然矩阵有着不足的地方就是空间和时间复杂度都维持在 N2 上&#xff0c;比如 1w 个数字建立一个矩阵&#xff0c;在内存中会占用 1w*1w1 亿的类型空间&#xff0c;这…

Hadoop实践指南:揭秘HDFS元数据并解析案例

1.什么是元数据 元数据&#xff08;Metadata&#xff09;&#xff0c;描述数据的数据&#xff08;data about data&#xff09;。 1.1 HDFS元数据 元数据&#xff1a;关于文件或目录的描述信息&#xff0c;如文件所在路径、文件名称、文件类型等等&#xff0c;这些信息称为文…

二进制数据转换成十六进制表示 binascii.hexlify()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 二进制数据转换成十六进制表示 binascii.hexlify() 选择题 binascii.hexlify()参数的数据类型可以是&#xff1f; import binascii number 11 byte_data number.to_bytes() hex_data bin…

NX二次开发UF_CURVE_ask_conic_data 函数介绍

文章作者&#xff1a;里海 来源网站&#xff1a;https://blog.csdn.net/WangPaiFeiXingYuan UF_CURVE_ask_conic_data Defined in: uf_curve.h int UF_CURVE_ask_conic_data(tag_t conic, UF_CURVE_conic_t * conic_data ) overview 概述 Reads the data from the conic ar…

SSF-CNN:空间光谱融合的卷积光谱图像超分网络

SSF-CNN: SPATIAL AND SPECTRAL FUSION WITH CNN FOR HYPERSPECTRAL IMAGE SUPER-RESOLUTION 文章目录 SSF-CNN: SPATIAL AND SPECTRAL FUSION WITH CNN FOR HYPERSPECTRAL IMAGE SUPER-RESOLUTION简介解决问题网络框架代码实现训练部分运行结果 简介 ​ 本文提出了一种利用空…

基于STC12C5A60S2系列1T 8051单片读写掉电保存数据IIC总线器件24C02多字节并显示在液晶显示器LCD1602上应用

基于STC12C5A60S2系列1T 8051单片多字节读写掉电保存数据IIC总线器件24C02多字节并显示在液晶显示器LCD1602上应用 STC12C5A60S2系列1T 8051单片机管脚图STC12C5A60S2系列1T 8051单片机I/O口各种不同工作模式及配置STC12C5A60S2系列1T 8051单片机I/O口各种不同工作模式介绍IIC通…

pytorch分布式训练

1 基本概念 rank&#xff1a;进程号&#xff0c;在多进程上下文中&#xff0c;我们通常假定rank 0是第一个进程或者主进程&#xff0c;其它进程分别具有1&#xff0c;2&#xff0c;3不同rank号&#xff0c;这样总共具有4个进程 node&#xff1a;物理节点&#xff0c;可以是一个…

Java游戏之王者荣耀

首先创建类&#xff1a; 游戏运行结果如下&#xff1a; GameFrame类 所需图片&#xff1a; GameObject类 Turret类 所需图片&#xff1a; TurretBlue类 TurretRed类 Champion类 所需图片&#xff1a; 单个&#xff1a; move包: ChampionDaji类 所需图片&#xff1a; Minio…

css加载会造成阻塞吗??

前言 前几天面试问到了这个问题&#xff0c;当时这个答得不敢确定哈哈&#xff0c;虽然一面还是过了 现在再分析下这个&#xff0c;总结下&#xff0c;等下次遇到就能自信得回答&#xff0c;666 准备工作 为了完成本次测试&#xff0c;先来科普一下&#xff0c;如何利用chr…

输出后,我悟了!

大家好&#xff0c;我是木川 今天和前同事吃饭聊天&#xff0c;谈到了输出&#xff0c;今天简单谈下关于输出的重要性 一、为什么要输出 1、不输出容易忘&#xff0c;如果不输出很容易就忘记了&#xff0c;如果再遇见一次&#xff0c;还是需要重新学习&#xff0c;实际上是浪费…

【如何学习Python自动化测试】—— Python 的 unittest 框架

10 、Python 的 unittest 框架 10.1 Unittest 框架介绍 Unittest是Python语言中的一种测试框架&#xff0c;是Python标准库中的一个模块。它可以帮助开发者编写自动化测试&#xff0c;可以进行单元测试、集成测试、功能测试等各种类型的测试。 Unittest的特点是简单易学&#…

用 Addon 增强 Node.js 和 Electron 应用的原生能力

前言 Node.js Addon 是 Node.js 中为 JavaScript 环境提供 C/C 交互能力的机制。其形态十分类似 Java 的 JNI&#xff0c;都是通过提供一套 C/C SDK&#xff0c;用于在 C/C 中创建函数方法、进行数据转换&#xff0c;以便 JavaScript / Java 等语言进行调用。这样编写的代码通常…

突破技术障碍:软件工程师如何应对项目中的难题?

在软件开发项目中&#xff0c;工程师常常会遇到各种技术难题。这些难题可能涉及到复杂的算法、不兼容的系统、难以预见的软件行为&#xff0c;或者其他许多方面。 以下是一些策略和方法&#xff0c;可以帮助软件工程师有效地应对这些挑战&#xff1a; 1、理解问题&#xff1a;…