1. 分布式锁概念
在多线程环境下,为了保证数据的线程安全,锁保证同一时刻,只有一个可以访问和更新共享数据。在单机系统我们可以使用 synchronized 锁、Lock 锁保证线程安全。
synchronized 锁是 Java 提供的一种内置锁,在单个 JVM 进程中提供线程之间的锁定机制,控制多线程并发。只适用于单机环境下的并发控制。
想要在多个节点中提供锁定,在分布式系统并发控制共享资源,确保同一时刻只有一个访问可以调用,避免多个调用者竞争调用和数据不一致问题,保证数据的一致性,就需要分布式锁。
分布式锁:控制分布式系统不同进程访问共享资源的一种锁的机制。不同进程之间调用需要保持互斥性,任意时刻,只有一个客户端能持有锁。
共享资源包含:
- 数据库
- 文件硬盘
- 共享内存
分布式锁特性:
- 互斥性:锁只能被持有的客户端删除,不能被其他客户端删除
- 锁超时释放:持有锁超时,可以释放,防止不必要的资源浪费,也可以防止死锁
- 可重入性:一个线程如果获取了锁之后,可以再次对其请求加锁。
- 高性能和高可用:加锁和解锁需要开销尽可能低,同时也要保证高可用,避免分布式锁失效
- 安全性:锁只能被持有的客户端删除,不能被其他客户端删除
模拟并发环境下单
①:添加 Redis 依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
②:添加配置:
spring:
redis:
host: localhost
port: 6379
password:
timeout: 2000s
# 配置文件中添加 lettuce.pool 相关配置,则会使用到lettuce连接池
lettuce:
pool:
max-active: 8 # 连接池最大连接数(使用负值表示没有限制) 默认为8
max-wait: -1ms # 接池最大阻塞等待时间(使用负值表示没有限制) 默认为-1ms
max-idle: 8 # 连接池中的最大空闲连接 默认为8
min-idle: 0 # 连接池中的最小空闲连接 默认为 0
③:Redis 配置类:
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
// json 序列化配置
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
jackson2JsonRedisSerializer.setObjectMapper(om);
// String 序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// 所有的 key 采用 string 的序列化
template.setKeySerializer(stringRedisSerializer);
// 所有的 value 采用 jackson 的序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash 的 key 采用 string 的序列化
template.setHashKeySerializer(stringRedisSerializer);
// hash 的 value 采用 jackson 的序列化
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
Redis 工具类:
@Component
public class RedisUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 普通缓存获取
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
// 普通缓存放入
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean setIfAbsent(String key, Object value, long timeout, TimeUnit unit) {
return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit));
}
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
//springboot2.4后用法
redisTemplate.delete(Arrays.asList(key));
}
}
}
}
④:添加下单接口
@Slf4j
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private RedisUtil redisUtil;
@GetMapping("/initProductStock")
public void initProductStock() {
redisUtil.set("stock", 100);
}
@GetMapping("/create_order")
public void createOrder() {
// 获取当前库存
int stock = (Integer) redisUtil.get("stock");
if (stock > 0) {
// 减库存
int realStock = stock - 1;
redisUtil.set("stock", realStock);
// TODO 添加订单记录
log.info("扣减成功,剩余库存:" + realStock);
return;
}
log.error("扣减失败,库存不足");
}
}
接口说明:
/order/initProductStock
:先向 Redis 中初始化一个库存/order/create_order
:下单接口:先从缓存获取库存,如果库存大于 0,则库存减 1
⑤:并发测试
使用 JMeter 进行并发环境测试,10 个线程,循环 5 次。
⑥:测试结果,打印日志如下:
使用 JMeter 调用了 50 次接口后,按照正常情况下,库存应该为:50 = 100 - 50。
但通过日志显示,最终库存为:95。
这是因为在并发环境下,多个线程下单操作,前面的线程还未更新库存,后面的线程已经请求进来,并获取到了未更新的库存,后续扣减库存都不是扣减最近的库存。线程越多,扣减的库存越少。这就是在高并发场景下发生的超卖问题。
很明显,上述问题是出现了线程安全的问题,我们首先能想到的肯定是给它加 synchronized 锁。
是的,没问题,但是我们知道,synchronized 锁是属于JVM 级别的,也就是我们所谓的“单机锁”,如果是多机部署的环境中,还能保证数据的一致性吗?
答案肯定是不能的。这个时候,就需要用到了我们 Redis 分布式锁
用 Redis 实现分布式锁的几种方案,都是用 SETNX 命令(设置 key 等于某 value)。只是高阶方案传的参数个数不一样,以及考虑了异常情况
SETNX 是SET IF NOT EXISTS的简写。命令格式:SETNX key value,如果 key不存在,则 SETNX 成功返回 1,如果这个 key 已经存在了,则返回 0
setIfAbsent()
是 setnx + expire 的合并命令
2. Redis分布式锁方案一:SETNX + EXPIRE
问题:为什么要加过期时间
如果在释放锁之前 Redis 宕机了,就会造成一直死锁。
setnx
命令 和 expire
命令一定要是原子操作。
伪代码如下:
if(jedis.setnx(key_resource_id,lock_value) == 1){ //加锁
expire(key_resource_id,100); //设置过期时间
try {
do something //业务请求
}catch(){
}
finally {
jedis.del(key_resource_id); //释放锁
}
}
setnx
和 expire
两个命令分开了,「不是原子操作」。如果执行完 setnx
加锁,正要执行 expire
设置过期时间时,进程 crash 或者要重启维护了,那么这个锁就“长生不老”了,「别的线程永远获取不到锁啦」
@GetMapping("/create_order")
public void createOrder() {
String key = "lock_key";
// 1.加锁
boolean lock = tryLock(key, 1, 60L, TimeUnit.SECONDS);
if (lock) {
try {
// 获取当前库存
int stock = (Integer) redisUtil.get("stock");
if (stock > 0) {
// 减库存
int realStock = stock - 1;
redisUtil.set("stock", realStock);
// TODO 添加订单记录
log.info("扣减成功,剩余库存:" + realStock);
return;
}
log.error("扣减失败,库存不足");
} catch (Exception e) {
log.error("扣减库存失败");
} finally {
// 3.解锁
unlock(key);
}
} else {
log.info("未获取到锁...");
}
}
public boolean tryLock(String key, Object value, long timeout, TimeUnit unit) {
return redisUtil.setIfAbsent(key, value, timeout, unit);
}
public void unlock(String key) {
redisUtil.del(key);
}
使用 JMeter 运行后,结果如下:
获取到锁的线程已成功扣除库存,没有获取到锁的线程只打印日志。
3. Redis分布式锁方案二:SETNX + EXPIRE + 校验唯一随机值
方案一还是有一定的缺陷的:假设线程 A 获取锁成功,一直在执行业务逻辑,但是 60s 过去了,还没执行完。但是,此时,锁已经过期了。线程 B 又请求过来了,显然,线程 B 也可以获取锁成功,也开始执行业务逻辑代码。那么问题就来了:在线程 B 执行过程中,线程 A 已经执行完了,就会把线程 B 的锁给释放掉!
既然锁可能被别的线程误删,那我们给 value 值设置一个标记当前线程唯一的随机数,在删除的时候,校验一下,就OK了
@GetMapping("/create_order")
public void createOrder() {
String key = "lock_key";
String value = "ID_PREFIX" + Thread.currentThread().getId();
// 1.加锁
boolean lock = tryLock(key, value, 60L, TimeUnit.SECONDS);
if (lock) {
// ...
}
public void unlock(String key, String value) {
String currentValue = (String)redisUtil.get(key);
if (StringUtils.hasText(currentValue) && currentValue.equals(value)) {
redisUtil.del(key);
}
}
这里需要注意的是:释放锁时,先 get 再删除,这并不是原子操作,无法保证进程安全。为了更严谨,这里用 lua 脚本代替
lua 脚本
Lua脚本是redis已经内置的一种轻量小巧语言,其执行是通过 redis 的 eval/evalsha 命令来运行,把操作封装成一个 Lua 脚本,如论如何都是一次执行的原子操作
lockDel.lua如下:resources/lua/lockDel.lua
if redis.call('get', KEYS[1]) == ARGV[1]
then
-- 执行删除操作
return redis.call('del', KEYS[1])
else
-- 不成功,返回0
return 0
end
public void unlock(String key, String value) {
// 解锁脚本
DefaultRedisScript<Long> unlockScript = new DefaultRedisScript<>();
unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/lockDel.lua")));
unlockScript.setResultType(Long.class);
// 执行lua脚本解锁
Long execute = redisTemplate.execute(unlockScript, Collections.singletonList(key), value);
}
或者:
public void unlock(String key, String value) {
// 解锁脚本
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Collections.singletonList(key), value);
}
4. Redis分布式锁方案三:Redisson
方案二还存在问题:「锁过期释放,业务没执行完」。 如果设置的超时时间比较短,而业务执行的时间比较长。比如超时时间设置5s,而业务执行需要10s,此时业务还未执行完,其他请求就会获取到锁,两个请求同时请求业务数据,不满足分布式锁的互斥性,无法保证线程的安全
4.1 Redisson 概念
其实我们设想一下,是否可以给获得锁的线程,开启一个定时守护线程,每隔一段时间检查锁是否还存在,存在则对锁的过期时间延长,防止锁过期提前释放。当前开源框架 Redisson 解决了这个问题。Redisson 底层原理图如下:
只要线程一加锁成功,就会启动一个 watch dog
看门狗,它是一个后台线程,会每隔 10 秒检查一下,如果线程 1 还持有锁,那么就会不断的延长锁 key 的生存时间。因此,Redisson 就是使用 watch dog 解决了「锁过期释放,业务没执行完」问题
Redis 虽然作为分布式锁来说,性能是最好的。但是也是最复杂的。上面总结 Redis 主要有下面几个问题:
- 未设置过期时间,会死锁
- 设置过期时间
- 锁误删
- 业务还继续执行,导致多个线程并发执行
线上都是用 Redission
实现分布式锁,Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务。Redisson 是基于 netty 通信框架实现的,所以支持非阻塞通信,性能优于 Jedis。
Redisson 分布式锁四层保护:
- 防死锁
- 防误删
- 可重入(一个线程可以在获取锁之后再次获取同一个锁,而不需要等待锁释放)
- 自动续期
Redisson 实现 Redis 分布式锁,支持单机和集群模式
4.2 Redisson 实现
使用 Redission 分布式锁,分成三个步骤:
- 获取锁
redissonClient.getLock("lock")
- 加锁
rLock.lock()
- 解锁
rLock.unlock()
引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.20.0</version>
</dependency>
Redisson 配置类:
@Configuration
public class RedissionConfig {
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.password}")
private String password;
private int port = 6379;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
// 单机版
config.useSingleServer()
.setAddress("redis://" + redisHost + ":" + port);
//.setPassword(password);
config.setCodec(new JsonJacksonCodec());
return Redisson.create(config);
}
}
集群版:
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
config.useClusterServers()
.setScanInterval(2000) // 集群状态扫描间隔时间,单位是毫秒
//可以用"rediss://"来启用SSL连接
.addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
.addNodeAddress("redis://127.0.0.1:7002");
return Redisson.create(config);
}
下单接口:
@Slf4j
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private RedissonClient redissonClient;
@GetMapping("/create_order")
public void createOrder() {
String key = "lock_key";
RLock rLock = redissonClient.getLock(key);
// 1.加锁
rLock.lock();
try {
// 获取当前库存
int stock = (Integer) redisUtil.get("stock");
if (stock > 0) {
// 减库存
int realStock = stock - 1;
redisUtil.set("stock", realStock);
// TODO 添加订单记录
log.info("扣减成功,剩余库存:" + realStock);
return;
}
log.error("扣减失败,库存不足");
} catch (Exception e) {
log.error("扣减库存失败");
} finally {
// 3.解锁
rLock.unlock();
}
}
}
使用 JMeter 并发运行后:
Redission 实现的分布式锁,直接调用,不需要锁异常、超时并发、锁删除等问题,它把处理上面的问题的代码都封装好了,直接调用即可