简单解释一下个人理解的分布式锁以及主要的实现手段。
文章目录
- 什么是分布式锁
- 常用分布式锁
- 实现
什么是分布式锁
以java应用举例,如果是单应用的情况下,我们通常使用synchronized或者lock进行线程锁,主要为了解决多线程或者高并发场景下的共享资源不能按照预期结果被处理。
举个最简单的例子,总共有10件商品但是有100个人同时来消费,我们需要保证永远只卖出10件商品,否则多卖出的那件商品可就需要程序员来买单了。
那么什么又是分布式锁呢,即上述的线程锁只能保证在同个jvm层面进行管控,这个时候我们将该应用进行升级迭代为分布式架构,即上述的购物服务可能被分布式部署为10套应用,这个时候100个人来消费这件事情可能会被均匀地分摊到各个服务上。
那么我们就无法通过jvm锁来管控这若干个微服务的线程,所以就需要引入分布式锁的概念,通常需要引入第三方中间件的形式来完成分布式锁的功能。
常用分布式锁
- Redis分布式锁
常见,且推荐。 - MySQL分布式锁
特殊场景下可应用,但是不推荐。 - Zookeeper分布式锁
使用zk的节点机制实现,根据实际情况可以选用。
实现
通过上述了解,我们明白redis分布式锁是较为常见的,通常在工作过程中或者面试过程中也会经常使用到。所以下文我们主要着重就redis实现分布式锁进行阐述。
首先我们先了解为什么redis能够做分布式锁,由于redis内部机制是单线程操作,但是由于多路复用以及非阻塞io等特性我们完全可以不用担心其性能,且作为一个kv缓存数据库我们可以极快的利用其kv存值特点进行管理从各个服务注册的redis的key。
例如,服务1已经设置了一个约定好的key值,这个时候服务2拿着同样的key值想进来,这个时候就会通知服务2,其他服务已经预定的这个key值了,不好意思,你得等着。在redis中就可以用setnx方法来实现该功能,setnx即set if not exists的简写。
不知不觉,我们就用redis实现了单机下用synchronized以及lock才能实现的锁机制。
只有setnx成功的服务才能够继续执行后续业务流程,业务流程处理完成之后再进行释放锁,即使用delete(key)进行清除缓存key。
这个时候我们又引申出另一个问题,假设服务1的setnx成功了,但是在执行业务过程中崩溃了。如果只是运行时异常还能够处理,我们在finally中进行delete(key)的处理,但是如果整个服务异常中断了,这个时候就有一个很有意思的事情发生了,其他若干个服务都在等着服务1释放锁,但是由于服务1上了锁之后自己就崩溃了,他也想释放但是丞妾做不到了。
其实要想解决这个问题也比较简单,我们可以在上锁的时候设置有效时间嘛,比如设置3s,那么即使上述情况发生,那么也只会影响3s。3s后该锁被自动释放,其他的服务又可以开始愉快的争夺锁了。
说了这么多,直接show me the code吧:
前置条件,引入starter-data-redis的pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
- 首先我们使用RedisTemplate封装一个工具类
@Component
public final class RedisUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// =============================common============================
/**
* 指定缓存失效时间
* @param key 键
* @param time 时间(秒)
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据key 获取过期时间
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* 判断key是否存在
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除缓存
* @param key 可以传一个值 或多个
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
}
}
}
/**
* set nx,上锁
* @param key 一般设为lock
*@param value 一般使用uuid
*@param time 缓存时间,单位为s
*/
public boolean setNx(String key, String value, int time){
return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, value, time, TimeUnit.SECONDS));
}
//未指定过期时间
/**
* 未指定过期事件的redis锁
* @param key
* @param value
* @return
*/
public boolean setNx(String key, String value){
return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, value));
}
// ============================String=============================
/**
* 普通缓存获取
* @param key 键
* @return 值
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* 普通缓存放入
* @param key 键
* @param value 值
* @return true成功 false失败
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 普通缓存放入并设置时间
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 递增
* @param key 键
* @param delta 要增加几(大于0)
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
/**
* 递减
* @param key 键
* @param delta 要减少几(小于0)
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
// ============================set=============================
/**
* 根据key获取Set中的所有值
* @param key 键
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0) {
expire(key, time);
}
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 获取set缓存的长度
*
* @param key 键
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
}
- 模拟场景使用
@Slf4j
@RequestMapping("/redisLock")
@RestController
public class MyRedisLockController {
@Resource
private RedisUtil redisUtil;
/**
* 原子操作redis的数量,保证多少线程进来就执行多少次操作
*
* @return
*/
@GetMapping("/goLock")
public String goLock() {
//配置锁,设置随机uuid进行验证防止误删
String uuid = UUID.randomUUID().toString();
//设置过期时间为10s
boolean lock = redisUtil.setNx("lock", uuid, 10);
if (lock) {
log.info("lock成功.{}", uuid);
//若已经上锁
Object value = redisUtil.get("num");
//2.1判断num为空return
if (StringUtils.isEmpty(value)) {
return "key is null";
}
//2.2有值就转成成int
int num = Integer.parseInt(value + "");
//2.3把redis的num加1
redisUtil.set("num", ++num);
//2.4释放锁,del,保证锁必须被释放-->当业务执行时间小与过期时间时需要释放锁
if (uuid.equals((String) redisUtil.get("lock"))) {
redisUtil.del("lock");
return "success";
} else {
return "fail";
}
} else {
log.error("lock失败.开始重试{}", uuid);
//上锁失败
try {
Thread.sleep(100);
goLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return "done";
}
/**
* 模拟实际高并发场景下的
*
* @return
*/
@GetMapping("/goShop")
public String goShop() {
// 首先判断库存量是否还有余量
Object value = redisUtil.get("num");
log.info("当前库存.{}", value);
//2.1判断num为空return
if (StringUtils.isEmpty(value)) {
log.info("尚未设置库存!");
return "尚未设置库存!";
}
//2.2有值就转成成int
int num = Integer.parseInt(value + "");
if (num == 0) {
log.info("已售罄!");
return "已售罄";
}
//配置锁,设置随机uuid进行验证防止误删
String uuid = UUID.randomUUID().toString();
//设置过期时间为10s
boolean lock = redisUtil.setNx("lock", uuid, 10);
if (lock) {
// 执行一些其他业务操作 并扣减库存...
log.info("抢购成功.{}", uuid);
//2.3扣减库存
redisUtil.set("num", --num);
//2.4释放锁,del,保证锁必须被释放-->当业务执行时间小与过期时间时需要释放锁
if (uuid.equals((String) redisUtil.get("lock"))) {
redisUtil.del("lock");
return "success";
} else {
return "fail";
}
} else {
log.error("抢购失败{}", uuid);
}
return "done";
}
}
参考资料:
- Springboot集成Redis——实现分布式锁