文章目录
- 一、优惠券秒杀
- 1.1 全局唯一ID概述
- 1.2 Redis实现全局唯一ID
- 1.3 添加优惠券
- 1.3.1 entity
- 1.3.2 Controller
- 1.3.3 Service层
- 1.3.4 测试
- 1.4 优惠券秒杀下单
- 1.4.1 entity
- 1.4.2 Controller
- 1.4.3 Service
- 1.4.3 测试
- 1.5 库存超卖问题
- 1.5.1 库存超卖原因
- 1.5.2 介绍乐观锁
- 1.5.3 乐观锁解决库存超卖
- 1.6 实现一人一单功能
- 1.6.1 悲观锁解决“一人一单”多线程并发问题
- 1.7 集群下的线程并发安全问题
- 二、分布式锁
- 2.1 基本原理
- 2.2 实现方案对比
- 2.3 Redis分布式锁实现思路
- 2.4 Redis 分布式锁实现 - 初级版本
- 2.4.1 定义ILock接口
- 2.4.2 定义SimpleRedisLock实现类
- 2.4.3 修改“一人一单业务”
- 2.4.4 问题分析
- 2.5 Redis分布式锁实现 - 改进
- 2.5.1 修改SimpleRedisLock实现类
- 2.5.2 问题分析
- 2.6 Lua脚本解决多条命令原子性问题
- 2.6.1 编写Lua脚本
- 2.6.2 Java调用lua脚本 - 改进Redis分布式锁
- 2.7 总结
- 三、Redisson
- 3.1 Redisson快速入门
- 3.2 可重入锁原理
- 3.3 Redisson锁重试和WatchDog机制
- 3.1.1 锁重试
- 3.1.2 锁超时
- 3.1.3 总结
- 3.4 multiLock原理
- 3.4.1 解析
- 3.4.2 测试
- 3.5 总结
一、优惠券秒杀
1.1 全局唯一ID概述
每个店铺都可以发布优惠券
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题
-
id规律性太明显
用户可以根据id猜测到一些信息。
比如我昨天下单了一个商品,id为10,今天下单了一个商品,id为100,那我就能猜到这段时间内售出了90件商品
-
受单表数据量的限制
用户只要有订单的行为,就会不断的产生新的订单。
当网站达到一定规模后,一年会有上千万的订单,几年后单张表就保存不了如此大规模的数据。
我们只能分成单张表存储,每张表都采用id自增的方式,显然实现不了,因为mysql每张表都是各自计算自己自增长,都从“1”开始增长,那这样的话这几张表中某些数据id可能会重复
如果id重复,以后查询订单会出现问题
为了解决上述问题,我们可以使用全局ID生成器
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具
全局的含义:在同一个业务下,不管分布式系统将来有多少个服务、多少个节点、业务下分成了多少张不同的表,只要使用全局ID生成器得到的id,一定是当前业务内唯一id,不会出现冲突
当然不同业务的id出现冲突后也没有关系
单体项目也会用到全局ID生成器
一般要满足下列特性
-
唯一性
-
高可用
起码用着用着不能挂掉
-
高性能
正确的生成id,并且生成id的速度要快
-
递增新
要有一个单调的递增的特性。虽然不是数据库那样自增,但是我们要确保整体的id是逐渐增大的一个过程,有利于数据库创建索引
-
安全性
规律性不能太明显
我们Redis命令中有一个incr命令的
Redis命令——通用命令、String类型、Key层级结构、Hash类型、List类型、Set类型、SortedSet类型_redis多个层级命令查询_我爱布朗熊的博客-CSDN博客
首先能确保唯一性,因为Redis独立于数据库之外的,不论数据库有几张表、几个数据库,Redis只有一个,那这样的话所有人来访问Redis时,它的自增一定是唯一的
高可用、高性能、递增性不解释了
安全性比较麻烦一点,关键看我们采用什么自增,如果还是1,2,3,4,…的话安全性肯定差
为了保证安全性,我们可以拼接一些其他信息
考虑到数据库的性能,我们id仍然采用数值类型(Long)
第一位永远是0,代表我们的符号永远是正数
中间一部分放31bit的时间戳,其实就是增加id复杂性的,为什么是31位,因为我们想要以秒为单位(我们要定义一个初始的时间为基准,比如2000年,这31位bit代表秒,算出与2000年相差多少时间即可,31位bit最多可代表69年)
最后一部分放置序列号,也就是Redis命令incr自增的值
这样规划的话,一秒内可支持2的32次方个订单量
全局唯一ID生成策略
-
UUID:
使用JDK自带工具类就可以生成16进制的一长串的数值,因为是16进制,所以返回的是一个字符串结构,也没有单调递增的特性
-
Redis自增
上面已经分析了
-
snowflake 雪花算法
也是采用的long类型的64位的数字,自增采用当前机器的自增,内部维护的,所以要维护一个机械id
-
数据库自增
Redis自增ID策略
- 每天一个key,方便统计订单量
- 限定自增的值,让其不超过上限
- ID构造是时间戳+计数器
1.2 Redis实现全局唯一ID
位运算学习Redis全局ID自增的时候,为什么return的时候使用了或运算就可以了完成这个操作-大数据-CSDN问答
实现代码
@Component
public class RedisIdWorker {
// 2022年1月1日0时0分0秒 的 秒数
private static final long BEGIN_TIMESTAMP = 1640995200L;
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* id生成策略
*
* @param keyPrefix 业务前缀
* @return 生成的id
*/
public long nextId(String keyPrefix) {
// TODO 1.生成时间戳(当前时间减去我们规定的开始时间的秒数)
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
// TODO 2.生成序列号
// 同一个业务不要使用同一个key,因为incr自增有限度,2的64次方
// 为了预防订单数日积月累超过2的64次方,我们可以再拼接一个日期字符串,这样做还能方便以后统计
// TODO 2.1 获取当前日期
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
// TODO 2.2 自增长
// 返回值是Long,但是我们改成了long,改完出现警告:会有空指针问题
// 但是并不会出现空指针问题,加入此key不存在,它会在自动帮我们创建一个key
long count = stringRedisTemplate.opsForValue().increment("icr" + keyPrefix + ":" + date);
// TODO 拼接两部分
// 我们的返回值是long,直接拼接timestamp与count就是字符串了,不能直接拼接
// 为了解决这个问题,我们使用位运算
// timestamp<<32 时间戳向左移动32位,把redis自增的数创建出来,空出来的数以0位补充
// | 代表或运算,一个为真,就是真 0|0=0, 0|1=1,因为后面32位都是0,所以还是count本身
return timestamp<<32 | count ;
}
// public static void main(String[] args) {
// LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0, 0);
// 传入一个时区作为参数
// long second = time.toEpochSecond(ZoneOffset.UTC);
// System.out.println(second);
// }
}
下面进行测试
测试类中进行测试,如果一直转圈的话,将Redis客户端关闭再重新连接即可
@Resource
private RedisIdWorker redisIdWorker;
private ExecutorService es = Executors.newFixedThreadPool(500);
@Test
void testIdWorker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300);
Runnable task = () -> {
for (int i = 0; i < 100; i++) {
long id = redisIdWorker.nextId("order");
System.out.println("id = " + id);
}
latch.countDown();
};
long begin = System.currentTimeMillis();
for (int i = 0; i < 300; i++) {
es.submit(task);
}
System.out.println("laizhelile");
latch.await();
long end = System.currentTimeMillis();
System.out.println("time = " + (end - begin));
}
1.3 添加优惠券
平价券与特价券。平价券可以任意购买,而特价券需要秒杀抢购
-
tb_voucher: 平价优惠券的基本信息,优惠券金额、使用规则
下表中并没有库存,任意的人都可以随意购买
- tb_seckill_voucher:特价优惠券库存、开始抢购时间,结束抢购时间
1.3.1 entity
Voucher类,里面也包含了秒杀券的关键信息,库存、开始时间、结束时间
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_voucher")
public class Voucher implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 商铺id
*/
private Long shopId;
/**
* 代金券标题
*/
private String title;
/**
* 副标题
*/
private String subTitle;
/**
* 使用规则
*/
private String rules;
/**
* 支付金额
*/
private Long payValue;
/**
* 抵扣金额
*/
private Long actualValue;
/**
* 优惠券类型
*/
private Integer type;
/**
* 优惠券类型
*/
private Integer status;
/**
* 库存
*/
@TableField(exist = false)
private Integer stock;
/**
* 生效时间
*/
@TableField(exist = false)
private LocalDateTime beginTime;
/**
* 失效时间
*/
@TableField(exist = false)
private LocalDateTime endTime;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 更新时间
*/
private LocalDateTime updateTime;
}
SeckillVoucher类,秒杀券类,目的补充Voucher类
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_seckill_voucher")
public class SeckillVoucher implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 关联的优惠券的id
*/
@TableId(value = "voucher_id", type = IdType.INPUT)
private Long voucherId;
/**
* 库存
*/
private Integer stock;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 生效时间
*/
private LocalDateTime beginTime;
/**
* 失效时间
*/
private LocalDateTime endTime;
/**
* 更新时间
*/
private LocalDateTime updateTime;
}
1.3.2 Controller
/**
* 新增秒杀券
* @param voucher 优惠券信息,包含秒杀信息
* @return 优惠券id
*/
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {
voucherService.addSeckillVoucher(voucher);
return Result.ok(voucher.getId());
}
注意! 我们使用Voucher接收要添加的实体类的信息,包含秒杀信息
1.3.3 Service层
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
}
1.3.4 测试
请求地址:http://localhost:8081/voucher/seckill、
添加请求头:authorization,具体的值去页面复制即可
{
"shopId": 1,
"title": "100元代金券",
"subTitle": "周一至周五均可使用",
"rules": "全场通用\n无需预约\n可无限叠加\n不兑现、不找零\n仅限堂食",
"payValue": 8000,
"actualValue": 10000,
"type": 1,
"stock": 10,
"beginTime": "2022-01-25T10:09:17",
"endTime": "2025-01-26T12:09:04"
}
再看一下页面的展示效果
1.4 优惠券秒杀下单
涉及表 tb_voucher_order 订单表
目前来说对我们最重要的就是用户id,代金券id
案例:下单时需要判断两点
- 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
- 库存是否充足,不足则无法下单
1.4.1 entity
优惠券订单实体类
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_voucher_order")
public class VoucherOrder implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 主键
*/
@TableId(value = "id", type = IdType.INPUT)
private Long id;
/**
* 下单的用户id
*/
private Long userId;
/**
* 购买的代金券id
*/
private Long voucherId;
/**
* 支付方式 1:余额支付;2:支付宝;3:微信
*/
private Integer payType;
/**
* 订单状态,1:未支付;2:已支付;3:已核销;4:已取消;5:退款中;6:已退款
*/
private Integer status;
/**
* 下单时间
*/
private LocalDateTime createTime;
/**
* 支付时间
*/
private LocalDateTime payTime;
/**
* 核销时间
*/
private LocalDateTime useTime;
/**
* 退款时间
*/
private LocalDateTime refundTime;
/**
* 更新时间
*/
private LocalDateTime updateTime;
}
1.4.2 Controller
@RestController
@RequestMapping("/voucher-order")
public class VoucherOrderController {
@Resource
private IVoucherOrderService voucherOrderService;
@PostMapping("seckill/{id}")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {
return voucherOrderService.seckillVoucher(voucherId);
}
}
1.4.3 Service
优惠券的id与秒杀优惠券的id相同
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
// TODO 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// TODO 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀活动尚未开始");
}
// TODO 3.判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀活动已经结束");
}
// TODO 4.判断库存是否充足
if (voucher.getStock() < 1) {
return Result.fail("库存不足");
}
// TODO 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1")
.eq("voucher_id", voucherId)
.update();
if (!success) {
return Result.fail("扣减失败,可能库存不足");
}
// TODO 6.创建订单
// 我们只管订单id,代金券id,下单用户id
VoucherOrder voucherOrder = new VoucherOrder();
// TODO 6.1 订单id
// 使用自定义id生成器生成id
long orderID = redisIdWorker.nextId("order");
voucherOrder.setId(orderID);
// TODO 6.2 用户id
// 我们之前编写的登录拦截器
UserDTO user = UserHolder.getUser();
voucherOrder.setUserId(user.getId());
// TODO 6.3 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// TODO 7.返回订单id
return Result.ok(orderID);
}
可以看一下这篇文章,有对UserHolder登录拦截器的讲解
基于Session实现短信登录_c# 短信验证码登录_我爱布朗熊的博客-CSDN博客
1.4.3 测试
订单表中出现了数据
并且秒杀优惠券的数量也减少了1
1.5 库存超卖问题
真实的秒杀场景下,有无数的用户一起抢购,点击按钮,每秒钟的并发量可能成百上千
我们使用Jmeter来测试一下刚刚的程序,创建200个线程抢购优惠券,但是数据库中优惠券的数量为100,按理说应该有100个人抢不到
但是看一下最终结果图,发现有45%的人失败了,按理说应该是50%呀
看一下数据库的量,库存成-9了
在高并发的场景下出现了库存超卖问题,这是秒杀场景下很容易出现的问题,并且也是不可接收的一个问题
1.5.1 库存超卖原因
高并发场景不变,假设我们的库存量还有1件
线程2在线程1查询完后又进行查询,发现还有库存
这仅仅是两个线程,如果是更多线程的话更麻烦,扣减的更多
超卖问题使典型的多线程安全问题,针对这一问题,常见的解决方案就是加锁
悲观锁与乐观锁并不是一种真正的锁,而是一种锁设计的理念
悲观锁很影响效率,在抢购这种多线程并发的条件下,并不好用
而且是我们只是极少数情况会出现多线程并发问题,使用悲观锁并不是很好
乐观锁那是相当的乐观啊,那我们就是用乐观锁
1.5.2 介绍乐观锁
乐观锁的关键是判断之前查询得到的数据是否被修改过,常见的方式有两种:
-
版本号发
应用最广泛、普遍。
字段中多增加一个version版本,每当数据做一次修改,版本号就会+1,想要判断数据有没有被修改过,比对一下版本号就可以了
- CAS法
其实是对版本号法做了一些优化,版本号发其实就是使用version标注数据有没有被修改
我们其实可以用库存代替版本,在更新时直接比较库存的数量有没有发生变化即可
1.5.3 乐观锁解决库存超卖
我们采用CAS法,修改之前的业务逻辑
只需要修改第五步,新增加一个条件
// TODO 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1") //set stock = stock-1
.eq("voucher_id", voucherId) //where voucher_id= voucherId
.eq("stock",voucher.getStock())//where stock= stock
.update();
if (!success) {
return Result.fail("扣减失败,可能库存不足");
}
修改之后发现异常比率高达89%,失败的人大大增加了,还不如不改嘞
观察数据库,发现库存并没有多卖
为什么没卖完,就这么错错误呢?
牵扯 乐观锁一个弊端,成功率太低,如下图所示
对乐观锁进行改进
对库存来说非常好改,只需要库存大于0就行
// TODO 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1") //set stock = stock-1
.eq("voucher_id", voucherId) //where voucher_id= voucherId
.gt("stock",0)//where stock>0
.update();
if (!success) {
return Result.fail("扣减失败,可能库存不足");
}
完美达到我们的预期
除此之外,为了提高成功率,可以采用分批加锁的方式,或者是分段加锁方式
我们可以把数据类资源分成多份。比如说库存总共100分,将其存到十张表中,每个表中库存量是10,用户在抢的时候可以去多张表里面去抢,成功率提高了10倍
1.6 实现一人一单功能
需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单
比之前多了一个判断而已,没什么大不了的!!!!!
TODO注释标识的内容就是我们新增的,一定要先判断订单是否存在,再决定是够扣减库存
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀活动尚未开始");
}
// 3.判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀活动已经结束");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
return Result.fail("库存不足");
}
// TODO 新增一人一单的判断
UserDTO user = UserHolder.getUser();
// TODO 查询订单
int count = query().eq("user_id", user.getId())
.eq("voucher_id", voucherId).count();
// TODO 判断是否存在
if (count > 0) {
// TODO 用户至少下过一单,不能再下了
return Result.fail("一人一单,不可重复下单");
}
// TODO 说明没买过,继续执行代码
// 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1") //set stock = stock-1
.eq("voucher_id", voucherId) //where voucher_id= voucherId
.gt("stock", 0)//where stock>0
.update();
if (!success) {
return Result.fail("扣减失败,可能库存不足");
}
// 6.创建订单
// 我们只管订单id,代金券id,下单用户id
VoucherOrder voucherOrder = new VoucherOrder();
// 6.1 订单id
// 使用自定义id生成器生成id
long orderID = redisIdWorker.nextId("order");
voucherOrder.setId(orderID);
// 6.2 用户id
// 我们之前编写的登录拦截器
voucherOrder.setUserId(user.getId());
// 6.3 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 7.返回订单id
return Result.ok(orderID);
}
但是!!!在高并发线程下(200个线程,同一个用户),异常率95%,显然有问题,有10个线程下单成功了
虽然在代码中做了一人一单的判断,但是问题依然存在
与库存超卖相同,在高并发的场景下,可能会存在一些线程差不多同时执行下面标红的内容,也就是说通过了“一人一单”的校验,那既然通过了,就能生成订单了,从而导致实际没有完成“一人一单”的需求
1.6.1 悲观锁解决“一人一单”多线程并发问题
在哪一段添加悲观锁?
校验一人一单、扣减库存、创建订单。
注意我们加锁的位置!!
-
不能加在createVoucherOrder方法上
如果在方法上添加synchronized,说明同步锁是this,当前对象
不建议 把synchronized放在方法上,锁住此对象后,不管任何一个用户来了,都是这把锁,也就意味着整个方法被串行化了
所谓“一人一单”,只需要对同一个用户加锁即可,如果不是同一个用户,无需加锁 -
不能加在createVoucherOrder方法里面
加载方法里面后,锁定的范围有点小了,因为我们还没有提交事务,锁就被释放了,再提交事务修改数据库数据之前,很有可能有其他线程进入createVoucherOrder方法实现一人一单的判断,查询订单发现不存在,不存在的原因很可能是没有提交事务
为了避免这种情况,不能把锁加在createVoucherOrder方法里面
-
锁加在seckillVoucher方法返回值上
等createVoucherOrder方法全部执行完再释放锁,已经写入数据库了,很完美了
synchronized (user.getId().toString().intern()) { return createVoucherOrder(voucherId); }
有关锁的内容可以查看下面这篇文章JavaSE——多线程详细_javase多线程_我爱布朗熊的博客-CSDN博客
为什么createVoucherOrder会存在事物失效的情况?
seckillVoucher在调用createVoucherOrder方法时其实是return this.createVoucherOrder(voucherId);,this指的是当前对象,而不是它的代理对象,而事物要想生效,是因为Spring对当前的类做了动态代理,拿到了它的代理对象去做事物处理
但是this当前对象是没有事物功能的(这就是Spring事物失效的几种可能性之一)
解决办法之一就是获取当前对象的代理对象,如下所示
synchronized (user.getId().toString().intern()) {
// 获取当前对象的代理对象 强转
VoucherOrderServiceImpl proxy = (VoucherOrderServiceImpl) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
这么做需要导入下面的依赖:
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
并且还需要启动类暴露这个代理对象,如果不暴露的话是获取不到的
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {
....
}
完整代码
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀活动尚未开始");
}
// 3.判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀活动已经结束");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
return Result.fail("库存不足");
}
UserDTO user = UserHolder.getUser();
synchronized (user.getId().toString().intern()) {
// 获取当前对象的代理对象 强转
VoucherOrderServiceImpl proxy = (VoucherOrderServiceImpl) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
// 如果在方法上添加synchronized,说明同步锁是this,当前对象
// 不建议 把synchronized放在方法上,锁住此对象后,不管任何一个用户来了,都是这把锁,也就意味着整个方法被串行化了
// 所谓“一人一单”,只需要对同一个用户加锁即可,如果不是同一个用户,无需加锁
@Transactional
public Result createVoucherOrder(Long voucherId) {
// TODO 新增一人一单的判断
UserDTO user = UserHolder.getUser();
// user.getId().toString()转换成字符串也无法保证线程安全,因为每次的String都不一样
// 我们可以加一个intern,是一个字符串对象规范表示,回去字符串常量池中找一找和此字符串的值一样的字符串地址并返回
// TODO 查询订单
int count = query().eq("user_id", user.getId())
.eq("voucher_id", voucherId).count();
// TODO 判断是否存在
if (count > 0) {
// TODO 用户至少下过一单,不能再下了
return Result.fail("一人一单,不可重复下单");
}
// TODO 说明没买过,继续执行代码
// 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1") //set stock = stock-1
.eq("voucher_id", voucherId) //where voucher_id= voucherId
.gt("stock", 0)//where stock>0
.update();
if (!success) {
return Result.fail("扣减失败,可能库存不足");
}
// 6.创建订单
// 我们只管订单id,代金券id,下单用户id
VoucherOrder voucherOrder = new VoucherOrder();
// 6.1 订单id
// 使用自定义id生成器生成id
long orderID = redisIdWorker.nextId("order");
voucherOrder.setId(orderID);
// 6.2 用户id
// 我们之前编写的登录拦截器
voucherOrder.setUserId(user.getId());
// 6.3 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 7.返回订单id
return Result.ok(orderID);
}
补充:@Transactional修饰的方法为什么只能是public
- 访问权限:非公共方法(如私有方法、受保护的方法)对外部调用者不可见,因此不具备可以被事务切面拦截和织入的能力。而公共方法可以被任何调用者访问和调用,因此可以被事务切面拦截。
- 代理方式:Spring框架通过AOP(面向切面编程)来实现
@Transactional
的功能,通常使用动态代理技术。而动态代理只能为公共方法创建代理对象,无法为非公共方法创建代理对象。
1.7 集群下的线程并发安全问题
我们刚刚“一人一单”的操作,只适合在单机情况下使用
如果我们的项目面对高并发的,我们会把一个项目部署到多个机器,从而形成一个负载均衡集群,在集群模式下,悲观锁synchronize的就会受到挑战,出现一些问题
将服务启动两份,端口分别为8081,8082(我们之前使用的就是8081)
1.配置一下8082端口 -Dserver.port会将配置文件中的端口覆盖掉
启动完成后形成一个小的集群
2.修改nginx的conf目录下的nginx.conf文件,配置反向代理和负载均衡
标红的地方是修改的地方
3.重新加载nginx配置文件
4.在悲观锁处设立断点
5.准备两个如下请求,连着发送
token都是一样的,说明是用一个用户
发现:两个请求都能进入synchronized悲观锁
造成后果:count为0,那相当于一人下了两单,并没有完成“一人一单”的需求
现在我们是两台JVM
在一个JVM内部锁的原理是在JVM内部维护了一个锁的监视器对象,而现在锁的对象用的是用户id,并且在常量池里面,并且一个JVM中只有这一个池子,那当id相同的情况下,说明就是同一个锁,锁的监视器是同一个
但是!当我们集群部署的时候,一个新的部署就意味着这是一个全新的tomcat,也就意味着是一个全新的JVM。JVM之间的常量池是各自独立的
如下图所示
二、分布式锁
要想解决集群下的线程并发安全问题,必须使用分布式锁
2.1 基本原理
经过1.7的分析,我们希望让所有的JVM都使用同一个锁监视器,所以这个锁监视器一定是一个在JVM外部的,多JVM进程都可以看到的锁监视器,此时便能实现多进程之间的互斥
如下图所示
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
-
多线程可见:多个JVM都能够看得到
这个其实很容易做到,比如使用Redis、MySQL
-
互斥:不管一个人访问多少次,只能有一个成功,其他人都得失败
-
高可用:不管什么技术,这都是一个通用的特性
要确保获取锁的动作不能经常出现问题
-
高性能:不管什么技术,这都是一个通用的特性
加了锁之后,变成串行化执行,而如果获取锁的动作也很慢,那这性能只能是更差劲
-
安全性:
获取锁的时候要考虑一些异常情况,比如获取锁完了,还没有释放,挂掉了怎么办,会不会产生死锁
2.2 实现方案对比
核心:实现多线程之间互斥
MySQL数据库或者其他数据库,都具备一个事物机制,我们在写的时候,会自动给我们分配一个互斥锁,这样一来在多个事物之间就是互斥的,只有一个人能够去执行,能够借助这个原理来实现。我们可以在业务执行前,先去MySQL里申请一个互斥锁,再去执行业务,当业务执行完后提交事务,释放锁,当我们的业务发生异常的时候能够自动回滚
MySQL支持主从模式,可用性挺高的
安全性也是可以的,出现异常我们的锁也是能够及时释放的,断开连接后,会自动释放锁
Redis的实现就是利用setnx这种互斥命令,向redis存放数据的时候,只有数据不存在的时候存放才能成功,释放锁也很简单,把key删掉就可以
可用性挺高的,支持集群和主从
性能也挺好,远远高于MySQL
安全性好好考虑,万一宕机后无法删除key,就出现死锁了,但是我们可以利用key的过期机制,到期后自动释放,但是过期时间具体是多久,后续会讨论
Zookeeper利用节点的唯一性和有序性实现互斥
我们可以利用有序性实现互斥,比如说很多线程都去Zookeeper中创建线程,这样一来每一个节点的id是单调递增的,我们可以人为规定id小的那个才能获取锁成功(最小的只有一个)
或者也能利用唯一性,大家都去创建节点,并且节点名称大家都一样,这样一来只有一个人成功
释放锁很简单,把节点删除就行
可用性不错,支持集群
性能不太好,因为Zookeepor强调强的一致性,这样会导致主从沪指之间做数据同步的时候消耗一定的时间,性能比redis差一点
安全性很好,节点都是临时节点,一旦出故障,断开连接之后会自动释放
2.3 Redis分布式锁实现思路
实现分布式锁时需要实现两个基本方法,获取锁与释放锁
-
获取锁
-
互斥:确保只能有一个线程获取锁
可以利用setNx互斥特性,如果缓存中有key=lock的数据,那无法再添加
setNx lock thread1
在讨论的时候建议添加一个超时时间,避免服务宕机引起死锁,时间不能设计的太短,否则业务没执行完锁就被释放了
expire lock 10
我们建议上面两条语句一块之前,避免执行expire之前发生宕机,其中ex后面跟的参数代表10s
set lock thread1 EX 10 NX
-
-
释放锁
-
手动释放:删除对应key即可
del lock
-
超时释放:获取锁时添加一个超时时间
expire lock 10
-
获取锁的时候要么成功,要么失败,那失败之后我们应该怎么做呢?
JDK提供的锁有两种机制
- 阻塞式获取,获取锁失败了然后堵塞等待,一直等到有人释放锁
- 非阻塞试获取,尝试获取锁,如果获取失败了,会立即结束返回一个结果,而不是一直等待
在这里我们采用非阻塞式,因为阻塞式对CPU有一定的浪费,而且阻塞式实现起来相对麻烦一点
2.4 Redis 分布式锁实现 - 初级版本
2.4.1 定义ILock接口
public interface ILock {
/**
* 尝试获取锁,非阻塞方式
* @param timeoutSec 锁持有的超时时间,过期后自动释放
* @return true代表获取锁成功;false代表获取锁失败
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}
2.4.2 定义SimpleRedisLock实现类
public class SimpleRedisLock implements ILock {
// 业务名称,为了获取锁
private String name;
private StringRedisTemplate stringRedisTemplate;
// 锁的前缀
private static final String KEY_PREFIX = "lock:";
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(long timeoutSec) {
// TODO 获取线程标识
long threadId = Thread.currentThread().getId(); //线程id,同一个JVM线程id是不会重复的
// TODO 获取锁
// setIfAbsent是setnx
// 此处的value比较特殊,要加上线程的标识
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
// 直接返回success自动拆箱,会有安全风险。比如success为null,那拆箱后就是空指针
// 所以采取下面这种情况
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// TODO 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
2.4.3 修改“一人一单业务”
可以把VoucherOrderServiceImpl类中的seckillVoucher方法中的synchronized关键字删除了,我们要使用自定义的锁
带有TODO标识的,便是我们新增的
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀活动尚未开始");
}
// 3.判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀活动已经结束");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
return Result.fail("库存不足");
}
UserDTO user = UserHolder.getUser();
// TODO 创建锁对象
// 锁定的范围与之前一样。切记不能把order锁住,范围太大了,以后有关order的都被锁住了
SimpleRedisLock lock = new SimpleRedisLock("order:" + user.getId(), stringRedisTemplate);
// TODO 获取锁
// 订单大概是500ms,我们这里可以设定为5秒
boolean isLock = lock.tryLock(5);
// TODO 判断是否获取锁成功
if (!isLock) {
// TODO 不成功
// 我们要避免一个用户重复下单,既然获取锁失败,说明在并发执行,我们要避免并发执行
return Result.fail("不允许重复下单");
}
// TODO 成功
// createVoucherOrder方法执行过程中可能会有异常,我们放到try...catch中
try {
// 获取当前对象的代理对象 强转
VoucherOrderServiceImpl proxy = (VoucherOrderServiceImpl) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}finally {
// TODO 出现异常做锁的释放
lock.unlock();
}
}
// 如果在方法上添加synchronized,说明同步锁是this,当前对象
// 不建议 把synchronized放在方法上,锁住此对象后,不管任何一个用户来了,都是这把锁,也就意味着整个方法被串行化了
// 所谓“一人一单”,只需要对同一个用户加锁即可,如果不是同一个用户,无需加锁
@Transactional
public Result createVoucherOrder(Long voucherId) {
// 新增一人一单的判断
UserDTO user = UserHolder.getUser();
// user.getId().toString()转换成字符串也无法保证线程安全,因为每次的String都不一样
// 我们可以加一个intern,是一个字符串对象规范表示,回去字符串常量池中找一找和此字符串的值一样的字符串地址并返回
// 查询订单
int count = query().eq("user_id", user.getId())
.eq("voucher_id", voucherId).count();
// 判断是否存在
if (count > 0) {
// 用户至少下过一单,不能再下了
return Result.fail("一人一单,不可重复下单");
}
// 说明没买过,继续执行代码
// 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1") //set stock = stock-1
.eq("voucher_id", voucherId) //where voucher_id= voucherId
.gt("stock", 0)//where stock>0
.update();
if (!success) {
return Result.fail("扣减失败,可能库存不足");
}
// 6.创建订单
// 我们只管订单id,代金券id,下单用户id
VoucherOrder voucherOrder = new VoucherOrder();
// 6.1 订单id
// 使用自定义id生成器生成id
long orderID = redisIdWorker.nextId("order");
voucherOrder.setId(orderID);
// 6.2 用户id
// 我们之前编写的登录拦截器
voucherOrder.setUserId(user.getId());
// 6.3 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 7.返回订单id
return Result.ok(orderID);
}
进行测试
一个是true,一个是false,说明只能8081端口获取锁成功
2.4.4 问题分析
上面的代码在大多数环境下能够正常运行,但是依然存在一些问题
分析问题产生的原因:
- 线程1因为业务堵塞,导致锁的过期时间到了,自动删除
- 在线程1堵塞中,锁删除后,线程2成功获取到锁执行业务,但是还没执行完
- 此时线程1苏醒,执行手动删除key功能,将线程2的锁删掉了
- 此时新来线程3,发现没有后获取锁执行业务
这样以来,线程2和线程3出现线程安全问题
最重要的原因就是线程1在释放锁的时候直接把别人的删除了
解决方案:删除锁的时候看一下这个锁是不是自己线程的
如果不是我们的锁,不用管就行了
2.5 Redis分布式锁实现 - 改进
关键点两个
- 第一个:在获取锁时存入线程标识(可以用UUID标识)
我们之前使用的是id,现成的线程id
long threadId = Thread.currentThread().getId();
线程的id其实就是一个递增数字,在JVM内部每创建一个线程,这个数字就会递增
但是集群下有多个JVM,那每个JVM都会维护一个递增数字,很有可能会冲突
为了解决冲突,我们可以使用一个UUID+线程id的方式,两者结合,使用UUID区分不同的JVM,再使用线程id来区分不同线程
- 第二个:在释放锁时先获取锁中线程标识,判断是否与当前线程标识一致。一致则释放锁,不一致则不释放锁
2.5.1 修改SimpleRedisLock实现类
public class SimpleRedisLock implements ILock {
// 业务名称,为了获取锁
private String name;
private StringRedisTemplate stringRedisTemplate;
// 锁的前缀
private static final String KEY_PREFIX = "lock:";
//TODO UUID,使用胡图工具类, true表示将UUID生成的横线去掉
private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(long timeoutSec) {
// TODO 获取线程标识
String threadId = ID_PREFIX+Thread.currentThread().getId(); //线程id,同一个JVM线程id是不会重复的
// 获取锁
// setIfAbsent是setnx
// 此处的value比较特殊,要加上线程的标识
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
// 直接返回success自动拆箱,会有安全风险。比如success为null,那拆箱后就是空指针
// 所以采取下面这种情况
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// TODO 获取线程标识
String threadId = ID_PREFIX+Thread.currentThread().getId();
// TODO 获取锁中标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// TODO 判断标识是否一致
if (threadId.equals(id)){
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}
最终效果图
2.5.2 问题分析
到这里还不是一个完美的程序,在某些极端的情况下,还有一些问题
判断锁和释放锁是两个动作,假设释放锁的时候发生了堵塞,如果堵塞时间很长,锁自己超时释放了,其他线程如线程2就能够获取锁了
这怎么会堵塞呢?判断之后紧接着就释放,中间并没有任何代码操作,怎么会堵塞呢?
JVM中有一个机制叫做垃圾回收机制,当做这种操作的时候会阻塞我们的所有的代码。所以说不是代码业务堵塞,而是JVM本身产生的阻塞
而就在线程2获取锁成功后,线程1堵塞结束,恢复运行,直接执行释放锁的动作(不再判断了,是在判断后出现的堵塞),这样就把线程2的锁干掉了
此时线程3趁虚而入,执行业务
线程2与线程3发生了线程安全问题
解决方案:判断锁动作与释放锁的动作成为一个原子性操作,一块执行,不能出现间隔
2.6 Lua脚本解决多条命令原子性问题
Redis提供了一个Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。
脚本可以理解为一个函数,里面有很多的代码
Lua是一种编程语言,基本语法参考:https://www.runoob.com/lua/lua-tutorial.html
重点介绍Redis提供的调用函数
# 执行Redis命令
redis.call('命令名称','key','其他参数',....)
例如,我们执行 set name jack,则脚本是
redis.call('set','name','jack')
例如,执行多个命令,先执行set name Rose,再执行get name,则脚本如下
# 先执行set name Jack
redis.call('get','name','jack')
# 再执行 get name
local name = redis.call('get','name')
# 返回
return name
脚本写好后,怎么调用脚本?
下面的script就是脚本
例如,我们要执行redis.call(‘set’,‘name’,‘jack’)这个脚本,语法如下
#调用脚本, 脚本本身是一个字符串,用双引号引起来
# 0 代表着参数的数量,numkeys,因为脚本中是写死的,没有任何的参数
EVAL "return redis.call('set','name','jack')" 0
脚本是可以传参的,也可以是不传参的
带参数的脚本语言怎么写?
Redis里面的参数,分为两类,一类是key类型的参数(如name),另一类是其他类型的参数(Jack)
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其他参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数
下面这个语句的key、value都没有写死。
在lua语言中,数组的角标是从1开始的
EVAL "return redis.call('set',KEYS[1],ARGV[1])" 1 name Rose
2.6.1 编写Lua脚本
回顾一下释放锁的业务流程
- 获取锁中的线程标识
- 判断是否与指定的标识(当前线程标识)一致
- 如果一致则释放锁(删除)
- 如果不一致则什么都不做
注意!key不能写死,不同的业务的key不同
-- 锁的key
--local key = "lock:order:5" 但是不能写死
local key = KEYS[1]
-- 当前线程标识
--local threadId = "UUID-10086" 但是不能写死
local key =ARGV[1]
-- 获取锁中的线程标识,也就是一个get命令
local id = redis.call('get',key)
-- 比较线程标识与锁中的标识是否一致
if(id == threadId) then
-- 释放锁 del key,删除成功return 1
return redis.call('del',key)
end
-- if不成立 return 0
return 0
2.6.2 Java调用lua脚本 - 改进Redis分布式锁
需求:基于Lua脚本实现分布式锁的释放锁逻辑
提示:RedisTemplate调用Lua脚本的API如下
结合EVAL进行分析:
execute函数等于EVAL,RedisScript等于下面的script,List<K>,就是Key类型的参数,Object args 其实就是ARGV参数
释放锁的代码如下图所示,在脚本中执行了部分代码,能够满足原子性
// TODO 提前获取锁的脚本
// RedisScript是一个接口,我们使用一个实现类,泛型就是返回值的类型
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
// TODO 静态代码块做初始化
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
// TODO 借助Spring提供的方法区resource下找
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
// TODO 配置一下返回值类型
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public void unlock() {
// TODO 调用脚本
// Key是一个集合,Collections.singletonList(KEY_PREFIX + name)可以快速生成一个集合
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId()
);
// 不要执行结果的返回值了,成功就是成功,不成功就是被删掉了
}
2.7 总结
基于Redis的分布式锁实现思路
- 利用 set nx ex获取锁,并这是过期时间,保存线程标识
- 释放锁时先判断线程标识是否与自己一致,一致则删除锁
特性
- 利用set nx 满足互斥性
- 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
- 利用Redis集群保证高可用和高并发特性
三、Redisson
基于setnx实现的分布式锁存在下面问题
-
不可重入
不可重入:同一个线程无法多次获取同一把锁
比如一个方法a调用方法b
在a中要先获取锁,执行业务再调用b,而在b中又要获取同一把锁,在这种情况下如果锁时不可重入的,b中无法继续执行业务逻辑
在这种情况下,我们要求锁可重入
可重入:同一个线程可以多次获取同一把锁
-
不可重试
获取锁只尝试一次就返回false,没有重试机制
我们之前实现的锁是非阻塞式的,如果获取锁失败就会立即返回false,没有重试机制,但是在很多业务下不能说立即失败
我们希望的时候发现锁被占用的时候,我等一等,如果最后成功了再去执行业务
-
超时释放
锁超时释放虽然可以避免死锁,但如果业务执行耗时较长,也会导致锁释放,存在安全隐患(因超时释放导致的误删问题,也有其他的)
-
主从一致性
如果Redis提供了主从集群,主从同步存在延迟(还未同步给从Redis库),此时“主”宕机时,我们会选一个从库作为新的主库,但是新的主库上还未同步完之前的数据,是没有锁的标识的,也就是说其他线程能够趁虚而入拿到锁
可能会在极端情况下出现安全问题,但是概率比较低
我们可以借助框架对其进行改善
Redisson是一个在Redis的基础上实现的Java驻内存数据网格 (ln-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
在redis基础上实现的一个分布式工具集合。在分布式系统下用到的各种各样的工具他都有,包括分布式锁
官网:Redisson: Easy Redis Java client with features of In-Memory Data Grid
GitHub地址:GitHub - redisson/redisson: Redisson - Easy Redis Java client with features of In-Memory Data Grid. Over 50 Redis based Java objects and services: Set, Multimap, SortedSet, Map, List, Queue, Deque, Semaphore, Lock, AtomicLong, Map Reduce, Publish / Subscribe, Bloom filter, Spring Cache, Tomcat, Scheduler, JCache API, Hibernate, MyBatis, RPC, local cache …
3.1 Redisson快速入门
可以导入坐标,再配置Redis客户端,或者是在yaml文件中配置,并且提供了一个Redisson-starter
第二种方式不太推荐,因为会替代Spring官方对Redisson的配置和实现,所以建议使用第一种,自己配置一个Redisson
Maven坐标
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
配置Redisson客户端
@Configuration
public class RedisConfig {
@Bean
public RedissonClient redissonClient(){
// TODO 配置类
Config config = new Config();
// TODO 添加Redis地址,如单点地址(非集群)
// 我没有密码,如果有密码的话可以设.setPassword("")
// 如果是集群的话使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
// TODO 创建客户端
return Redisson.create(config);
}
}
使用Redisson分布式锁改造之前的业务
lock.tryLock(1,10,TimeUnit.SECONDS)
参数1:获取锁的等待时间,也就是说tryLock这个动作是阻塞式的,如果获取锁失败后会等待一段时间再重试,如果重试后还没有获取到锁,就返回false(重试机制)
参数2:锁自动释放时间,超过这个时间没有释放锁的话,就自动释放
参数3:时间单位
lock.tryLock(),没有参数的话,表示不等待,失败直接返回
代码知识更改了一下客户端而已,其他的不需要动
@Resource
private RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀活动尚未开始");
}
// 3.判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀活动已经结束");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
return Result.fail("库存不足");
}
UserDTO user = UserHolder.getUser();
// 创建锁对象
// 锁定的范围与之前一样。切记不能把order锁住,范围太大了,以后有关order的都被锁住了
// 之前的方式:SimpleRedisLock lock = new SimpleRedisLock("order:" + user.getId(), stringRedisTemplate);
// TODO 使用Redisson客户端获取锁对象
RLock lock = redissonClient.getLock("lock:order:" + user.getId());
// 获取锁
// 订单大概是500ms,我们这里可以设定为秒
boolean isLock = lock.tryLock();
// 判断是否获取锁成功
if (!isLock) {
// 不成功
// 我们要避免一个用户重复下单,既然获取锁失败,说明在并发执行,我们要避免并发执行
return Result.fail("不允许重复下单");
}
// 成功
// createVoucherOrder方法执行过程中可能会有异常,我们放到try...catch中
try {
// 获取当前对象的代理对象 强转
VoucherOrderServiceImpl proxy = (VoucherOrderServiceImpl) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}finally {
// 出现异常做锁的释放
lock.unlock();
}
}
3.2 可重入锁原理
Redisson可以实现可重入
不可重入:同一个线程无法多次获取同一把锁
可重入:同一个线程可以多次获取同一把锁
比如一个方法a调用方法b
在a中要先获取锁,执行业务再调用b,而在b中又要获取同一把锁,在这种情况下如果锁时不可重入的,b中无法继续执行业务逻辑
在这种情况下,我们要求锁可重入
我们之前自定义的锁无法实现可重入,可以看下图:
method1首先获取锁,并且成功,调用了method2,method2也来获取锁,然后获取失败了,因为redis中对应的key已经有值了,setnx不会再成功了
如果要实现可重入锁,我们需要将之前String结构修改为Hash结构
不仅要记录线程标识,也要记录这是第几次获取锁,获取锁的时候value+1,释放锁的时候value-1
我们什么时候才能真正的释放锁,把锁删除呢?
等到释放锁的时候value对应的值为0,便可以删除
我们从String结构改Hash结构后,获取锁和释放锁有了很大的差别
String类型时采用setnx ex命令,setnx 命令用来判断是否存在互斥,获取锁能不能成功,而ex命令设置过期时间
但是现在采用Hash后,没有这样的组合命令,只能先判断锁是否存在,再手动设置过期时间,流程图如下所示
流程图
对于获取锁和释放锁,我们依然要借助Lua脚本实现原子性
获取锁的Lua脚本
释放锁的Lua脚本
3.3 Redisson锁重试和WatchDog机制
下面我们来讨论一下Redisson怎么解决不可重试、超时释放
lock.tryLock(1,10,TimeUnit.SECONDS)
参数1:获取锁的等待时间,也就是说tryLock这个动作是阻塞式的,如果获取锁失败后会等待一段时间再重试,如果重试后还没有获取到锁,就返回false(重试机制)
参数2:锁自动释放时间,超过这个时间没有释放锁的话,就自动释放
参数3:时间单位
lock.tryLock(),没有参数的话,表示不等待,失败直接返回
**lock.tryLock(1L, TimeUnit.SECONDS);**两个参数的话,第一个参数表示等待时间,,第二个是时间单位,这句代码表示,1s中内拿不到锁就不等待了
3.1.1 锁重试
我们可以看一下**lock.tryLock(1L, TimeUnit.SECONDS);**方法
在tryLock方法中调用了一个全参的tryLock方法,点进去看一看
在这个方法中又调用了tryAcquire方法
继续往下走,看一下tryAcquireAsync方法
因为我们没有设置过期时间,会有一个默认的过期时间,如下图所示
getLockWatchdogTimeout()方法的值是30000L,单位是毫秒,也就是30s
再往下执行,就是到了下图的方法
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
命令return redis.call(‘pttl’, KEYS[1]);是获取有效期还是多久,ttl返回的是以秒为单位,而pttl返回的是以毫秒为单位
拿到锁的剩余有效期有什么用呢?
回到方法private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId
在这里获取了一个Future对象
return之后继续倒着看,到了下图这里,得到了剩余的有效期,然后接着返回
之后回到了最初的地方
如果ttl是null,表示没有其他线程获取锁,我们可以获取这个锁,返回true
如果不是null,代表获取锁失败,失败就需要尝试
time表示设置的过期时间还剩余多少,如果有剩余就尝试,如果没有的话直接返回false
假如time>0,还没有到达过期时间,我们就等待尝试一下,但是这个尝试也不是立即尝试,而是执行了一个subscribe方法,一个订阅的方法
那订阅的什么呢?订阅的别人释放了锁的信号
之前在Lua脚本中,发布了一个消息,我们现在订阅的就是这一条
然后下面这个代码很巧妙,我们收到通知的时间是不确定的,所以await一下,但是await多久呢? 就是time,剩余的锁释放时间
如果await的等待时间超过time时间,我们就unsubscribe取消订阅,因为已经没有任何意义了
如果await等待时间没有超时的话,就继续往下走,再计算一下time,看看是否小于零,小于0就是超时了,返回false(源码时间计算很严谨,一直在那算)
如果时间充足的话就进入while语句(只要time时间有剩余,就一直循环),又执行tryAcquire方法,与之前的一样
3.1.2 锁超时
我听不太懂,-20.分布式锁-Redisson的锁重试和WatchDog机制
我们应该确保是业务执行玩而释放锁而不是业务堵塞后锁超时了释放
回到tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) 方法
ttlRemaining==null,说明我们获取锁成功了,这个时候我们要去解决有效期的问题
怎么解决有效期的问题?调用的scheduleExpirationRenewal方法
其中renewExpiration方法更新有效期,下面看一下
Timeout是一个超时任务或者定时任务,执行了renewExpirationAsync方法
我们看一下renewExpirationAsync方法
执行下面一个脚本
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end;
return 0;
3.1.3 总结
-
可重入:利用Hash结构记录线程id和重入次数
Hash结构代替了原来自定义所使用的String结构
Hash结构的一个字段记录线程标识,;另一个字段记录重录的次数
每一次获取锁的时候,先判断锁是否存在,如果不存在就直接获取,如果已经存在,此时不一定会失败,如果发现线程标识就是当前线程,代表可以再次获取,把重入次数加1即可。
释放锁的时候,每释放一次次数就减1,直到重入次数减到0,证明走到了最外层,所有的业务都结束了,真正的去释放锁
-
可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
在第一次获取锁失败后,并不是立即失败,而是去做一个等待,等待释放锁的一个消息,得到消息后就可以去重新获取锁了,如果获取又失败,继续等待
当然也不是无限制的重试,会有一个等待时间,超过了这个时间就不再重试了
-
超时续约:利用了watchDog,每隔一段时间(releaseTime/3),重置超时时间
获取锁成功以后会开启一个定时任务,这个任务每隔一段时间就会去重置锁的超时时间,重新配置一下expire,这样以后锁的尝试时间就会重新计时
3.4 multiLock原理
3.4.1 解析
下面来看一下“主从一致性”的问题
**先来看看什么叫做主从?**其实就是多台Redis,只不过各自角色不同
我们可以把其中一台作为主节点Redis Master,其他的作为从节点Redis Slave。
除此之外主从的功能也不一样,往往会做读写的分离。主库做增删改操作,从库做读的一些操作
既然主节点做写操作,从节点做读的操作,那这样的话,从节点应该没有数据呀。所以,主和从之间需要做数据的同步,主节点会不断的把自己的数据同步给从节点,确保主从之间数据是一致的,但是主从之间会有一定的延迟,所以主从的同步也会存在一定的延迟,主从同步一致性正是因为这样的延迟所存在
假设主库发生问题,宕机,Redis中会有哨兵监控集群状态,发现主机宕机后会和他断开,之后从其他的从节点之中选出一个库作为新主库。
因为之前主从同步未完成,锁还没有同步到新主库之中,此时java应用再来访问新的主节点会发现锁已经没有了,可以获取锁,出现了并发的安全问题
Redis怎么解决这个问题的呢?
简单粗暴,不要主从了,所有的Redis主从都变成单个独立的Redis节点,相互之间没有任何关系,都可以去做读写。
此时我们获取锁的方式就变了,之前是到Master中获取锁,现在我们需要依次去各个Redis节点获取锁,各个Redis库中都有锁的标识。
这个情况什么时候叫获取锁成功呢? 每个Redis Master都获取锁成功,才叫成功
然后我们可以对这几个Master节点建立主从同步,也不用担心线程安全问题
如下所示,加入最上面的Node挂掉后,后面的从节点成为新主节点,很显然不会有锁,但是其他两个Node节点都有锁,这样说及时两个有锁一个没锁,获取失败
也就是说有一个节点有锁,其他线程就不可能拿到锁
这套方案在Redisson中叫做multiLock
3.4.2 测试
重新配置Redisson,端口6379,6380,6381
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// TODO 配置类
Config config = new Config();
// TODO 添加Redis地址,如单点地址(非集群)
// 我没有密码,如果有密码的话可以设,如果是集群的话使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
// TODO 创建客户端
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient2(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6380");
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient3(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6381");
return Redisson.create(config);
}
}
测试代码
@Slf4j
@SpringBootTest
class RedissonTest {
// TODO 三个注入客户端
@Resource
private RedissonClient redissonClient;
@Resource
private RedissonClient redissonClient2;
@Resource
private RedissonClient redissonClient3;
private RLock lock;
@BeforeEach
void setUp() {
// TODO
RLock lock1 = redissonClient.getLock("order");
RLock lock2 = redissonClient2.getLock("order");
RLock lock3 = redissonClient3.getLock("order");
// TODO 创建联锁,用哪个客户端调用没有区别
lock = redissonClient.getMultiLock(lock1,lock2,lock3);
}
// TODO 使用方式和之前没有区别,代码不用动
@Test
void method1() throws InterruptedException {
// 尝试获取锁
boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
if (!isLock) {
log.error("获取锁失败 .... 1");
return;
}
try {
log.info("获取锁成功 .... 1");
method2();
log.info("开始执行业务 ... 1");
} finally {
log.warn("准备释放锁 .... 1");
lock.unlock();
}
}
void method2() {
// 尝试获取锁
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败 .... 2");
return;
}
try {
log.info("获取锁成功 .... 2");
log.info("开始执行业务 ... 2");
} finally {
log.warn("准备释放锁 .... 2");
lock.unlock();
}
}
}
3.5 总结
-
不可重入Redis分布式锁
- 原理:利用setnx的互斥性;利用ex避免死锁;释放锁时线程标示
- 缺陷:不可重入、无法重试、锁超时失效
-
可重入的Redis分布式锁
- 原理:利用hash结构,记录线程标示和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
- 缺陷:redis宕机引起锁失效问题
-
Redisson的multiLock
- 原理:利用多个独立的Redis节点,必须在所有节点都获取重入锁,才能算获取锁成功