【Redis】Redis 的学习教程(十一)之使用 Redis 实现分布式锁

news2025/1/11 4:20:51

1. 分布式锁概念

在多线程环境下,为了保证数据的线程安全,锁保证同一时刻,只有一个可以访问和更新共享数据。在单机系统我们可以使用 synchronized 锁、Lock 锁保证线程安全。

synchronized 锁是 Java 提供的一种内置锁,在单个 JVM 进程中提供线程之间的锁定机制,控制多线程并发。只适用于单机环境下的并发控制。

想要在多个节点中提供锁定,在分布式系统并发控制共享资源,确保同一时刻只有一个访问可以调用,避免多个调用者竞争调用和数据不一致问题,保证数据的一致性,就需要分布式锁

分布式锁:控制分布式系统不同进程访问共享资源的一种锁的机制。不同进程之间调用需要保持互斥性,任意时刻,只有一个客户端能持有锁。

共享资源包含:

  • 数据库
  • 文件硬盘
  • 共享内存

分布式锁特性:

  • 互斥性:锁只能被持有的客户端删除,不能被其他客户端删除
  • 锁超时释放:持有锁超时,可以释放,防止不必要的资源浪费,也可以防止死锁
  • 可重入性:一个线程如果获取了锁之后,可以再次对其请求加锁。
  • 高性能和高可用:加锁和解锁需要开销尽可能低,同时也要保证高可用,避免分布式锁失效
  • 安全性:锁只能被持有的客户端删除,不能被其他客户端删除

模拟并发环境下单

①:添加 Redis 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

②:添加配置:

spring:
  redis:
    host: localhost
    port: 6379
    password:
    timeout: 2000s
    # 配置文件中添加 lettuce.pool 相关配置,则会使用到lettuce连接池
    lettuce:
      pool:
        max-active: 8  # 连接池最大连接数(使用负值表示没有限制) 默认为8
        max-wait: -1ms # 接池最大阻塞等待时间(使用负值表示没有限制) 默认为-1ms
        max-idle: 8    # 连接池中的最大空闲连接 默认为8
        min-idle: 0    # 连接池中的最小空闲连接 默认为 0

③:Redis 配置类:

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        // json 序列化配置
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // String 序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // 所有的 key 采用 string 的序列化
        template.setKeySerializer(stringRedisSerializer);
        // 所有的 value 采用 jackson 的序列化
        template.setValueSerializer(jackson2JsonRedisSerializer);
        // hash 的 key 采用 string 的序列化
        template.setHashKeySerializer(stringRedisSerializer);
        // hash 的 value 采用 jackson 的序列化
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

}

Redis 工具类:

@Component
public class RedisUtil {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // 普通缓存获取
    public Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }

    // 普通缓存放入
    public boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    public boolean setIfAbsent(String key, Object value, long timeout, TimeUnit unit) {
        return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit));
    }

    public void del(String... key) {
        if (key != null && key.length > 0) {
            if (key.length == 1) {
                redisTemplate.delete(key[0]);
            } else {
                //springboot2.4后用法
                redisTemplate.delete(Arrays.asList(key));
            }
        }
    }

}

④:添加下单接口

@Slf4j
@RestController
@RequestMapping("/order")
public class OrderController {

    @Autowired
    private RedisUtil redisUtil;

    @GetMapping("/initProductStock")
    public void initProductStock() {
        redisUtil.set("stock", 100);
    }

    @GetMapping("/create_order")
    public void createOrder() {
        // 获取当前库存
        int stock = (Integer) redisUtil.get("stock");
        if (stock > 0) {
            // 减库存
            int realStock = stock - 1;
            redisUtil.set("stock", realStock);
            // TODO 添加订单记录
            log.info("扣减成功,剩余库存:" + realStock);
            return;
        }
        log.error("扣减失败,库存不足");
    }

}

接口说明:

  • /order/initProductStock:先向 Redis 中初始化一个库存
  • /order/create_order:下单接口:先从缓存获取库存,如果库存大于 0,则库存减 1

⑤:并发测试

使用 JMeter 进行并发环境测试,10 个线程,循环 5 次。

在这里插入图片描述

⑥:测试结果,打印日志如下:

在这里插入图片描述
使用 JMeter 调用了 50 次接口后,按照正常情况下,库存应该为:50 = 100 - 50。

但通过日志显示,最终库存为:95。

这是因为在并发环境下,多个线程下单操作,前面的线程还未更新库存,后面的线程已经请求进来,并获取到了未更新的库存,后续扣减库存都不是扣减最近的库存。线程越多,扣减的库存越少。这就是在高并发场景下发生的超卖问题。

很明显,上述问题是出现了线程安全的问题,我们首先能想到的肯定是给它加 synchronized 锁。

是的,没问题,但是我们知道,synchronized 锁是属于JVM 级别的,也就是我们所谓的“单机锁”,如果是多机部署的环境中,还能保证数据的一致性吗?

答案肯定是不能的。这个时候,就需要用到了我们 Redis 分布式锁

用 Redis 实现分布式锁的几种方案,都是用 SETNX 命令(设置 key 等于某 value)。只是高阶方案传的参数个数不一样,以及考虑了异常情况

SETNX 是SET IF NOT EXISTS的简写。命令格式:SETNX key value,如果 key不存在,则 SETNX 成功返回 1,如果这个 key 已经存在了,则返回 0

setIfAbsent() 是 setnx + expire 的合并命令

2. Redis分布式锁方案一:SETNX + EXPIRE

问题:为什么要加过期时间

如果在释放锁之前 Redis 宕机了,就会造成一直死锁。

setnx 命令 和 expire 命令一定要是原子操作。

伪代码如下:

if(jedis.setnx(key_resource_id,lock_value) == 1{ //加锁
    expire(key_resource_id,100; //设置过期时间
    try {
        do something  //业务请求
    }catch(){
  }
  finally {
       jedis.del(key_resource_id); //释放锁
    }
}

setnxexpire 两个命令分开了,「不是原子操作」。如果执行完 setnx 加锁,正要执行 expire 设置过期时间时,进程 crash 或者要重启维护了,那么这个锁就“长生不老”了,「别的线程永远获取不到锁啦」

@GetMapping("/create_order")
public void createOrder() {
    String key = "lock_key";
    // 1.加锁
    boolean lock = tryLock(key, 1, 60L, TimeUnit.SECONDS);
    if (lock) {
        try {
            // 获取当前库存
            int stock = (Integer) redisUtil.get("stock");
            if (stock > 0) {
                // 减库存
                int realStock = stock - 1;
                redisUtil.set("stock", realStock);
                // TODO 添加订单记录
                log.info("扣减成功,剩余库存:" + realStock);
                return;
            }
            log.error("扣减失败,库存不足");
        } catch (Exception e) {
            log.error("扣减库存失败");
        } finally {
            // 3.解锁
            unlock(key);
        }
    } else {
        log.info("未获取到锁...");
    }
}

public boolean tryLock(String key, Object value, long timeout, TimeUnit unit) {
    return redisUtil.setIfAbsent(key, value, timeout, unit);
}

public void unlock(String key) {
    redisUtil.del(key);
}

使用 JMeter 运行后,结果如下:

在这里插入图片描述

获取到锁的线程已成功扣除库存,没有获取到锁的线程只打印日志。

3. Redis分布式锁方案二:SETNX + EXPIRE + 校验唯一随机值

方案一还是有一定的缺陷的:假设线程 A 获取锁成功,一直在执行业务逻辑,但是 60s 过去了,还没执行完。但是,此时,锁已经过期了。线程 B 又请求过来了,显然,线程 B 也可以获取锁成功,也开始执行业务逻辑代码。那么问题就来了:在线程 B 执行过程中,线程 A 已经执行完了,就会把线程 B 的锁给释放掉!

既然锁可能被别的线程误删,那我们给 value 值设置一个标记当前线程唯一的随机数,在删除的时候,校验一下,就OK了

@GetMapping("/create_order")
public void createOrder() {
    String key = "lock_key";
    String value = "ID_PREFIX" + Thread.currentThread().getId();
    // 1.加锁
    boolean lock = tryLock(key, value, 60L, TimeUnit.SECONDS);
    if (lock) {
        // ...
}

public void unlock(String key, String value) {
    String currentValue = (String)redisUtil.get(key);
    if (StringUtils.hasText(currentValue) && currentValue.equals(value)) {
        redisUtil.del(key);
    }
}

这里需要注意的是:释放锁时,先 get 再删除,这并不是原子操作,无法保证进程安全。为了更严谨,这里用 lua 脚本代替

lua 脚本

Lua脚本是redis已经内置的一种轻量小巧语言,其执行是通过 redis 的 eval/evalsha 命令来运行,把操作封装成一个 Lua 脚本,如论如何都是一次执行的原子操作

lockDel.lua如下:resources/lua/lockDel.lua

if redis.call('get', KEYS[1]) == ARGV[1]
    then
  -- 执行删除操作
        return redis.call('del', KEYS[1])
    else
  -- 不成功,返回0
        return 0
end
public void unlock(String key, String value) {
    // 解锁脚本
    DefaultRedisScript<Long> unlockScript = new DefaultRedisScript<>();
    unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/lockDel.lua")));
    unlockScript.setResultType(Long.class);
    // 执行lua脚本解锁
    Long execute = redisTemplate.execute(unlockScript, Collections.singletonList(key), value);
}

或者:

public void unlock(String key, String value) {
    // 解锁脚本
    String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
    redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Collections.singletonList(key), value);
}

4. Redis分布式锁方案三:Redisson

方案二还存在问题:「锁过期释放,业务没执行完」。 如果设置的超时时间比较短,而业务执行的时间比较长。比如超时时间设置5s,而业务执行需要10s,此时业务还未执行完,其他请求就会获取到锁,两个请求同时请求业务数据,不满足分布式锁的互斥性,无法保证线程的安全

4.1 Redisson 概念

其实我们设想一下,是否可以给获得锁的线程,开启一个定时守护线程,每隔一段时间检查锁是否还存在,存在则对锁的过期时间延长,防止锁过期提前释放。当前开源框架 Redisson 解决了这个问题。Redisson 底层原理图如下:

在这里插入图片描述

只要线程一加锁成功,就会启动一个 watch dog 看门狗,它是一个后台线程,会每隔 10 秒检查一下,如果线程 1 还持有锁,那么就会不断的延长锁 key 的生存时间。因此,Redisson 就是使用 watch dog 解决了「锁过期释放,业务没执行完」问题

Redis 虽然作为分布式锁来说,性能是最好的。但是也是最复杂的。上面总结 Redis 主要有下面几个问题:

  • 未设置过期时间,会死锁
  • 设置过期时间
    • 锁误删
    • 业务还继续执行,导致多个线程并发执行

线上都是用 Redission 实现分布式锁,Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务。Redisson 是基于 netty 通信框架实现的,所以支持非阻塞通信,性能优于 Jedis。

Redisson 分布式锁四层保护:

  • 防死锁
  • 防误删
  • 可重入(一个线程可以在获取锁之后再次获取同一个锁,而不需要等待锁释放)
  • 自动续期

Redisson 实现 Redis 分布式锁,支持单机和集群模式

4.2 Redisson 实现

使用 Redission 分布式锁,分成三个步骤:

  1. 获取锁 redissonClient.getLock("lock")
  2. 加锁 rLock.lock()
  3. 解锁 rLock.unlock()

引入依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.20.0</version>
</dependency>

Redisson 配置类:

@Configuration
public class RedissionConfig {

    @Value("${spring.redis.host}")
    private String redisHost;

    @Value("${spring.redis.password}")
    private String password;

    private int port = 6379;

    @Bean
    public RedissonClient getRedisson() {
        Config config = new Config();
        // 单机版
        config.useSingleServer()
                .setAddress("redis://" + redisHost + ":" + port);
        //.setPassword(password);
        config.setCodec(new JsonJacksonCodec());
        return Redisson.create(config);
    }

}

集群版:

@Bean
public RedissonClient getRedisson() {
    Config config = new Config();
    config.useClusterServers()
            .setScanInterval(2000) // 集群状态扫描间隔时间,单位是毫秒
            //可以用"rediss://"来启用SSL连接
            .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
            .addNodeAddress("redis://127.0.0.1:7002");
    return Redisson.create(config);
}

下单接口:

@Slf4j
@RestController
@RequestMapping("/order")
public class OrderController {

    @Autowired
    private RedissonClient redissonClient;

    @GetMapping("/create_order")
    public void createOrder() {
        String key = "lock_key";
        RLock rLock = redissonClient.getLock(key);
        // 1.加锁
        rLock.lock();
        try {
            // 获取当前库存
            int stock = (Integer) redisUtil.get("stock");
            if (stock > 0) {
                // 减库存
                int realStock = stock - 1;
                redisUtil.set("stock", realStock);
                // TODO 添加订单记录
                log.info("扣减成功,剩余库存:" + realStock);
                return;
            }
            log.error("扣减失败,库存不足");
        } catch (Exception e) {
            log.error("扣减库存失败");
        } finally {
            // 3.解锁
            rLock.unlock();
        }
    }
}

使用 JMeter 并发运行后:

在这里插入图片描述

Redission 实现的分布式锁,直接调用,不需要锁异常、超时并发、锁删除等问题,它把处理上面的问题的代码都封装好了,直接调用即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1030630.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

北工大汇编——综合题(1)

题目要求 统计字符数。从键盘输入一行字符&#xff0c;统计字母、空格、数字、其他宇符的个数&#xff0c;并显示。要求&#xff1a;提示输入一行宇符串&#xff1b;键盘输入宇符串&#xff0c;Enter 键结束输入&#xff0c;并换行显示结果。 题目代码 DATAS SEGMENT;此处输…

Node.js 调用 fluent-ffmpeg

最近开发H5资源在线裁剪&#xff0c;最终在资源合成的步骤&#xff0c;选择 ffmpeg 作为合成的插件&#xff0c;记录下使用方式。 一、介绍 ffmpeg 一款跨平台多媒体处理工具&#xff0c;可以进行视频转码、裁剪、合成、音视频提取、推流等操作。 二、安装 Node js 可以利用…

qt+ffmpeg视频播放器实现音视频倍速功能

目录 一、前言 二、开发环境参考源码 开发环境&#xff1a; 参考源码&#xff1a; 三、添加倍速控件 四、倍速调节代码 五、视频倍速调节 六、音频倍速方案一 七、音频倍速方案二 八、最终效果 九、参考文献 十、结语 一、前言 参考了云天之巅的FFMPEG Qt视频播放器…

Tomcat常见报错以及手动实现Tomcat

一.Tomcat的简单启动 1.安装Tomcat 2.Tomcat启动 1. 双击 bin 目录下的 startup.bat 文件 2. 输入 http://localhost:8080/&#xff0c;显示如下界面代表安装成功, 默认在 8080 端口 3. 注意&#xff0c;不要关闭黑窗口&#xff0c;关闭了&#xff0c;tomcat 服务就停止了…

LabVIEW开发基于物联网的多功能功率分析仪

LabVIEW开发基于物联网的多功能功率分析仪 根据技术规则&#xff0c;电气元件网络中的单个被创建为在标称正弦波振动制造频率下运行。失真顺序的电流和电压波与正弦波不同&#xff0c;它们或多或少地扭曲成形状。它是由交流网络中非线性组件的存在引起的&#xff0c;例如静态转…

R语言进行孟德尔随机化+meta分析(1)---meta分析基础

目前不少文章用到了孟德尔随机化meta分析&#xff0c;今天咱们也来介绍一下&#xff0c;孟德尔随机化meta其实主要就是meta分析的过程&#xff0c;提取了孟德尔随机化文章的结果&#xff0c;实质上就是个meta分析&#xff0c;不过多个孟德尔随机化随机化的结果合并更加加强了结…

月木学途开发 4.公告模块

概述 效果图 数据库设计 DROP TABLE IF EXISTS announcement; CREATE TABLE announcement (announcementId int(11) NOT NULL AUTO_INCREMENT,announcementTitle varchar(255) DEFAULT NULL,announcementTime varchar(255) DEFAULT NULL,announcementContent longtext,PRIMAR…

计算机组成原理——基础入门总结(二)

上一期的路径&#xff1a;基础入门总结&#xff08;一&#xff09; 目录 一.输入输出系统和IO控制方式 二.存储系统的基本概念 三.cache的基本概念和原理 四.CPU的功能和基本结构 五.总线概述 一.输入输出系统和IO控制方式 IO设备又可以被统一称为外部设备~ IO接口&…

Jetpack:在数据变化时如何优雅更新Views数据

本文讲的是关于Jetpack的架构组件LiveData&#xff0c;LiveData是Lifecycle-aware 组件的一个应用&#xff0c;这意味着LiveData遵守Activity、Fragment和Service等组件的生命周期&#xff0c;在它们生命周期处于活跃状态&#xff08;CREATED和RESUMED&#xff09;才进行更新Vi…

《计算机视觉中的多视图几何》笔记(8)

8 More Single View Geometry 本章主要讲述除了点以外的几何体&#xff0c;在投影变换下的性质。这些几何体包括&#xff1a;平面&#xff0c;线&#xff0c;圆锥曲线&#xff0c;二次曲线。 讲到这里就明白了&#xff0c;为什么投影几何这么重要&#xff0c;因为摄像机就是一…

VLANIF配置

目录 实验原理&#xff1a; 案例&#xff1a; 设备配置 用ping验证不同vlan之间实现相互通信 实验原理&#xff1a; VLANIF接口是一种第三层的逻辑接口&#xff0c;用于在第三层实现不同VLAN 之间的通信。 每个VALN有一个VLANIF接口&#xff0c;并通过该接口在网络层转发…

【操作系统笔记】缓存一致性

CPU 核心之间数据如何传播 高速缓存中的值被修改了&#xff0c;那么怎么同步到内存中呢&#xff1f; ① 写直达&#xff08;Write-Through&#xff09;② 写回&#xff08;Write-Back&#xff09; 写直达&#xff08;Write-Through&#xff09; 简单&#xff0c;但是很慢&am…

《Kubernetes部署篇:Ubuntu20.04基于containerd二进制部署K8S 1.25.14集群(多主多从)》

一、架构图 如下图所示&#xff1a; 二、部署说明 2.1、部署流程 1、系统环境初始化&#xff0c;主要包括 主机名设置、主机hosts解析、关闭防火墙、关闭swap分区、修改系统参数、时间时区同步、修改内核参数、启用ipvs模式。 2、使用一键生成K8S集群证书工具创建证书文件。…

vue页面嵌入飞书网页组件,用于在类似ERP,OA等系统中展示在线文档

先展示最终效果(就是在vue页面中,内嵌了一块ifream页面): 1. 注册进入飞书开放平台,地址为: 飞书开放平台 2.进入开放平台后,选择--创建企业自建应用--创建网页应用,然后在主页面记住该应用的appId和appSecret参数,后面要用 3.然后注意一点的是,因为后面的授权等逻辑我们一般…

Android设计支持库

本文所有的代码均存于 https://github.com/MADMAX110/BitsandPizzas 设计支持库&#xff08;Design Support Library&#xff09;是 Google 在 2015 年的 I/O 大会上发布的全新 Material Design 支持库&#xff0c;在这个 support 库里面主要包含了 8 个新的 Material Design …

clickhouse简单安装部署

目录 前言(来源于官方文档)&#xff1a; 一.下载并上传 1.下载地址&#xff1a;点我跳转下载 2.上传至Linux 二.解压和配置 1.解压顺序 注意&#xff1a;必须按照以下顺序解压&#xff0c;并且每解压一个都要执行该解压后文件的install/doinst.sh文件 解压步骤&#xff…

antd-design-vue Table组件全局配置(分页器...)

描述&#xff1a;该框架许多默认配置好像还不支持&#xff0c;一般都是挨个使用挨个配置。我的项目中也遇到了类似的情况&#xff0c;但是当需求发生变化时&#xff0c;代码所有的组件使用则都需要修改&#xff0c;这种方式真的很不礼貌。 《我为了一口醋包了顿饺子》 需求是将…

MQ - 19 安全_限流方案的设计

文章目录 导图Pre概述集群中的数据加密加密算法分类消息队列限流机制思考单机限流全局限流全局限流还是单机限流?对哪些资源和维度进行限流发生限流后怎么处理消息队列全局限流设计单机限流方案全局限流方案消息队列的服务降级配置 Broker 的 CPU 或内存的使用率额度配置磁盘保…

C语言每日一题(5):求两个数二进制中不同位的个数

文章主题&#xff1a;求两个数二进制中不同位的个数&#x1f525;所属专栏&#xff1a;C语言每日一题&#x1f4d7;作者简介&#xff1a;每天不定时更新C语言的小白一枚&#xff0c;记录分享自己每天的所思所想&#x1f604;&#x1f3b6;个人主页&#xff1a;[₽]的个人主页&a…

BOA服务器移植

BOA服务器移植 1、源码下载 http://www.boa.org/ News! (last updated 23 February 2005) Latest Released Version (0.94.13) here (signature here) --- 下载地址1.1 boa简介&#xff1a; 其可执行代码只有大约60KB左右&#xff0c;Boa是一个单任务的HTTP服务器&#xff…