本文属于看黑马的redis的学习笔记,记录了思路和优化流程,精简版最终版请点击这里查看。
文章目录
- 一、全局ID生成器
- 1.1 理论
- 1.1.1 全局唯一ID生成策略
- 1.2 代码(Redis自增)
- 二、实现优惠券秒杀下单
- 2.1 SQL
- 2.2 SQL对应实体类
- 2.2.1 普通券实体类
- 2.2.2 秒杀券实体类
- 2.2.3 秒杀订单实体类
- 2.3 业务逻辑
- 2.3.1 新增秒杀券
- 2.3.2 秒杀下单
- 2.3.2.1 秒杀下单功能分析
- 2.3.2.2 超卖问题
- 2.2.2.3 秒杀逻辑代码(CAS法乐观锁)
- 2.3.3 一人一单(修改秒杀业务,要求同一个优惠券,一个用户只能下一单)
- 2.3.3.1加锁和事务所产生的一系列问题
- 2.3.3.2 代码
- 2.3.4 分布式锁
- 2.3.4.1 分布式锁概念
- 2.3.4.2 基于Redis的分布式锁
- 2.3.4.2.1 实现思路
- 2.3.4.2.2 存在问题
- 2.3.4.2.3 具体代码
- 2.3.4.3 Redisson
- 2.3.4.3.1 Redisson相关配置
- 2.3.4.3.2 Redisson可重入锁原理
- 2.3.4.4 总结
- 2.3.4 Redis秒杀优化
- 2.3.4.1 流程
- 2.3.4.2 代码
- 2.3.4.2.1 Controller
- 2.3.4.2.2 Service
- 2.3.4.2.3 LUA脚本
- 2.3.4.3 总结
- 2.3.5 Redis消息队列实现异步秒杀
- 2.3.5.1 Redis消息队列
- 2.3.5.2 基于List结构模拟消息队列(理论)
- 2.3.5.3 基于PubSub的消息队列
- 2.3.5.4 基于Stream的消息队列
- 2.3.5.5 Redis各种类型消息队列对比
- 2.3.5.6 基于Redis的Stream结构作为消息队列,实现异步秒杀下单
- 2.3.5.6.1 命令行创建消息队列
- 2.3.5.6.2 LUA脚本
- 2.3.5.6.3 业务逻辑
一、全局ID生成器
1.1 理论
当用户抢购优惠券时,如果订单表使用数据库自增ID就会存在一些问题:
① id的规律性太明显,导致用户根据id猜测到一些信息
②受表单数据量的限制
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足以下特征:
唯一性、高可用、高性能、递增性、安全性。
redis可以很好的实现这一点。
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其他信息:
ID组成部分(Long型):
- 符号位:1bit,永远为0
- 时间戳:31bit,以秒为单位,可以用69年
- 序列号:32bit,支持每秒产生 2 32 {2^{32}} 232个不同ID
1.1.1 全局唯一ID生成策略
- UUID
- Redis自增
- snowflake算法
- 数据库自增
Redis自增策略
- 每天一个key,方便统计订单量
- ID构造器是时间戳+计数器
1.2 代码(Redis自增)
@Component
public class RedisIDWorker {
//开始时间戳,2023.1.1 00:00:00
private static final long BEGIN_TIMESTAMP = 1672531200L;
//序列号位数
private static final int COUNT_BITS = 32;
private StringRedisTemplate stringRedisTemplate;
public RedisIDWorker(StringRedisTemplate stringRedisTemplate){
this.stringRedisTemplate = stringRedisTemplate;
}
public Long nextId(String keyPrefix){//前缀用于区分不同业务
//1. 生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timeStamp = nowSecond - BEGIN_TIMESTAMP;
//2. 生成序列号
//2.1 获取当前日期,获取到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
//2.2 自增长
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
//3. 拼接并返回
return timeStamp << COUNT_BITS | count;
}
}
二、实现优惠券秒杀下单
2.1 SQL
优惠券表(任意券,平价/秒杀券):
CREATE TABLE `voucher` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`shop_id` bigint unsigned DEFAULT NULL COMMENT '商铺id',
`title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '代金券标题',
`sub_title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '副标题',
`rules` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '使用规则',
`pay_value` bigint unsigned NOT NULL COMMENT '支付金额,单位是分。例如200代表2元',
`actual_value` bigint NOT NULL COMMENT '抵扣金额,单位是分。例如200代表2元',
`type` tinyint unsigned NOT NULL DEFAULT '0' COMMENT '0,普通券;1,秒杀券',
`status` tinyint unsigned NOT NULL DEFAULT '1' COMMENT '1,上架; 2,下架; 3,过期',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=COMPACT;
秒杀券(主键就是任意券的id,即本表是任意券的扩展表):
CREATE TABLE `seckill_voucher` (
`voucher_id` bigint unsigned NOT NULL COMMENT '关联的优惠券的id',
`stock` int NOT NULL COMMENT '库存',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`begin_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '生效时间',
`end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '失效时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`voucher_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=COMPACT COMMENT='秒杀优惠券表,与优惠券是一对一关系';
秒杀订单表
CREATE TABLE `tb_voucher_order` (
`id` bigint NOT NULL COMMENT '主键',
`user_id` bigint unsigned NOT NULL COMMENT '下单的用户id',
`voucher_id` bigint unsigned NOT NULL COMMENT '购买的代金券id',
`pay_type` tinyint unsigned NOT NULL DEFAULT '1' COMMENT '支付方式 1:余额支付;2:支付宝;3:微信',
`status` tinyint unsigned NOT NULL DEFAULT '1' COMMENT '订单状态,1:未支付;2:已支付;3:已核销;4:已取消;5:退款中;6:已退款',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '下单时间',
`pay_time` timestamp NULL DEFAULT NULL COMMENT '支付时间',
`use_time` timestamp NULL DEFAULT NULL COMMENT '核销时间',
`refund_time` timestamp NULL DEFAULT NULL COMMENT '退款时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=COMPACT;
2.2 SQL对应实体类
2.2.1 普通券实体类
其中exist=false的是秒杀表对应信息
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("voucher")
public class Voucher implements Serializable {
private static final long serialVersionUID = 1L;
/** 主键 */
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/** 商铺id */
private Long shopId;
/** 代金券标题 */
private String title;
/** 副标题 */
private String subTitle;
/** 使用规则 */
private String rules;
/** 支付金额 */
private Long payValue;
/** 抵扣金额 */
private Long actualValue;
/** 优惠券类型 */
private Integer type;
/** 优惠券状态 */
private Integer status;
/** 库存 */
@TableField(exist = false)
private Integer stock;
/** 生效时间 */
@TableField(exist = false)
private LocalDateTime beginTime;
/** 失效时间 */
@TableField(exist = false)
private LocalDateTime endTime;
/** 创建时间 */
private LocalDateTime createTime;
/** 更新时间 */
private LocalDateTime updateTime;
}
2.2.2 秒杀券实体类
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("seckill_voucher")
public class SeckillVoucher implements Serializable {
private static final long serialVersionUID = 1L;
/** 关联的优惠券的id */
@TableId(value = "voucher_id", type = IdType.INPUT)
private Long voucherId;
/** 库存 */
private Integer stock;
/** 创建时间 */
private LocalDateTime createTime;
/** 生效时间 */
private LocalDateTime beginTime;
/** 失效时间 */
private LocalDateTime endTime;
/** 更新时间 */
private LocalDateTime updateTime;
}
2.2.3 秒杀订单实体类
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("voucher_order")
public class VoucherOrder implements Serializable {
private static final long serialVersionUID = 1L;
/** 主键 */
@TableId(value = "id", type = IdType.INPUT)
private Long id;
/** 下单的用户id */
private Long userId;
/** 购买的代金券id */
private Long voucherId;
/** 支付方式 1:余额支付;2:支付宝;3:微信 */
private Integer payType;
/** 订单状态,1:未支付;2:已支付;3:已核销;4:已取消;5:退款中;6:已退款 */
private Integer status;
/** 下单时间 */
private LocalDateTime createTime;
/** 支付时间 */
private LocalDateTime payTime;
/** 核销时间 */
private LocalDateTime useTime;
/** 退款时间 */
private LocalDateTime refundTime;
/** 更新时间 */
private LocalDateTime updateTime;
}
2.3 业务逻辑
2.3.1 新增秒杀券
/**
* 新增秒杀券
* @param voucher 优惠券信息,包含秒杀信息
* @return 优惠券id
*/
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {
voucherService.addSeckillVoucher(voucher);
return Result.ok(voucher.getId());
}
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
//保存秒杀库存到Redis
stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
}
2.3.2 秒杀下单
2.3.2.1 秒杀下单功能分析
下单时应判断两点:
- 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
- 库存是否充足,不足则无法下单
流程:
- 点击秒杀
- 提交优惠券id
- 查询优惠券信息
- 判断秒杀是否开始或结束,库存是否充足,不满足条件则返回异常
- 如果符合条件,则进行扣减库存,创建订单,并返回订单id
2.3.2.2 超卖问题
其实就是线程安全问题的一种,多个线程同时共享同一个资源,且穿插进行,产生的问题。
假如资源还有两份,但是有三个线程同时进入判断资源是否能够获取的阶段,暂时还没有线程进入扣除资源阶段,那么这三个线程即同时获取到了资源,然后都对资源进行了扣除,即2-1-1-1 = -1,这就是超卖。
常见解决方案就是加锁。
悲观锁:认为线程安全问题一定会发生,因此操作数据前先获取锁,确保线程串行执行。例如Synchronized、Lock、数据库互斥锁
乐观锁:认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其他线程对数据做了修改。如果没有修改则认为是安全的,自己才更新数据;如果已经被修改说明发生了线程安全问题,此时可以重试或异常。
版本号法:给数据加一个版本号,每次修改版本号+1,存库的时候判断其版本号是否与存库之前一致
CAS法(compare and swap):存库的时候判断库中内容是否与改之前拿到的一致
2.2.2.3 秒杀逻辑代码(CAS法乐观锁)
CAS用的话是判断库存是否改变,但是这样会导致大量失败(只卖一点),所以优化为判断库存是否大于0
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIDWorker redisIDWorker;
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
//1. 查询优惠券
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
//2. 判断秒杀是否开始
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始!");
}
//3. 判断秒杀是否结束
if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀已经结束!");
}
//4. 判断库存是否充足
if (seckillVoucher.getStock()<1) {
return Result.fail("库存不足!");
}
//5. 扣减库存
boolean updateFlag = seckillVoucherService.lambdaUpdate()
.setSql("stock = stock -1")
.eq(SeckillVoucher::getVoucherId, voucherId)
// .eq(SeckillVoucher::getStock,seckillVoucher.getStock())//CAS方式乐观锁,会导致只卖一点
.gt(SeckillVoucher::getStock,0)//CAS方式乐观锁,因为mysql在update的时候有行锁是串行的,所以可以
.update();
if(!updateFlag){//扣减失败
return Result.fail("库存不足!");
}
//6. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1 订单ID--全局唯一ID生成器
Long orderId = redisIDWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2 用户id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
//6.3 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//7. 返回订单id
return Result.ok(orderId);
}
}
2.3.3 一人一单(修改秒杀业务,要求同一个优惠券,一个用户只能下一单)
思路: 如果库存充足,根据优惠券id和用户id查询订单,判断订单是否存在
目前的流程就是:
(判断)
1、判断当前条件是否开始或者已经结束
2、判断库存是否充足
(一人一单)
3、查库判断当前用户是否有过秒杀订单
(扣减库存以及存库/事务)
4、扣减库存(优化CAS乐观锁保证不会超卖)
5、生成订单信息,并存库
(事务结束,return)
6、返回订单id
我们可以将一人一单以后的代码抽取出来,单独写一个方法,加@Transactional
来保证事务。
然而一人一单也会存在超买问题,所以需要处理,但是上文解决超卖问题的方法不能成功解决,因为它是insert语句而不是update,所以就要加锁(悲观锁)。
2.3.3.1加锁和事务所产生的一系列问题
要限制一人一单,给新生成的这个带事务的方法上加synchronized
是不友好的,因为这把锁只需要限制同一个用户同时产生的多条线程只会有一个生效,即只限制单用户,多个不同的用户是不受限的。
所以,可以在方法内添加一个synchronized(userId){//业务逻辑+return}
。
但是此时会再次产生一个问题,在这个方法还未结束,但是锁已经释放了的期间,此时事务还没有提交,此时再恰巧进来一个线程,又拿到了锁,还是会产生并发安全问题。
所以要将synchronized锁覆盖整个事务方法:
//伪代码
public Return createVoucherOrder(Long voucherId){
//判断逻辑
synchronized(userId){
return this.createVoucherOrder(voucherId);
}
}
@Transactional
@Override
public Result createVoucherOrder(Long voucherId) {
//事务代码
}
但此时又会产生一个问题,this指向的是当前的类对象而非代理对象,没有事务功能。
事务要想生效,其实是Spring对当前这个类进行动态代理,拿到代理对象来进行事务处理
所以我们要拿到事务代理对象才可以,可以用如下代码来
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
用这个方法需要引入aspectj的依赖
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
然后启动类添加注解暴漏这个对象@EnableAspectJAutoProxy(exposeProxy = true)
才可以使用
还有一些小细节看代码注释,写的很清楚了。
此处知识点:spring框架事务失效、aop代理对象(动态代理)、synchronized锁。
2.3.3.2 代码
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIDWorker redisIDWorker;
@Override
public Result seckillVoucher(Long voucherId) {
//1. 查询优惠券
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
//2. 判断秒杀是否开始
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始!");
}
//3. 判断秒杀是否结束
if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀已经结束!");
}
//4. 判断库存是否充足
if (seckillVoucher.getStock() < 1) {
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
synchronized(userId.toString().intern()) {//intern()是返回字符串的规范表示,从字符串常量池中取(池中没有先存池)
//this指向的是当前的VoucherOrderServiceImpl对象而非代理对象,没有事务功能
//事务要想生效,其实是Spring对当前这个类进行动态代理,拿到代理对象来进行事务处理,拿到事务代理对象才可以
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
@Transactional
@Override
public Result createVoucherOrder(Long voucherId) {
//5. 一人一单
Long userId = UserHolder.getUser().getId();
//5.1 查询订单
Integer count = lambdaQuery().eq(VoucherOrder::getUserId, userId).eq(VoucherOrder::getVoucherId, voucherId).count();
//5.2 判断是否存在
if (count > 0) {
//用户已经购买过了
return Result.fail("用户已经购买过一次!");
}
//6. 扣减库存
boolean updateFlag = seckillVoucherService.lambdaUpdate()
.setSql("stock = stock -1")
.eq(SeckillVoucher::getVoucherId, voucherId)
// .eq(SeckillVoucher::getStock,seckillVoucher.getStock())//CAS方式乐观锁,会导致只卖一点
.gt(SeckillVoucher::getStock, 0)//CAS方式乐观锁,因为mysql在update的时候有行锁是串行的,所以可以
.update();
if (!updateFlag) {//扣减失败
return Result.fail("库存不足!");
}
//7. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//7.1 订单ID--全局唯一ID生成器
Long orderId = redisIDWorker.nextId("order");
voucherOrder.setId(orderId);
//7.2 用户id
voucherOrder.setUserId(userId);
//7.3 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//8. 返回订单id
return Result.ok(orderId);
}
}
2.3.4 分布式锁
然而上文代码仅仅只能解决单机情况的一人一单安全问题,在集群模式下就不行了。
因为集群模式是多个JVM,而synchronized锁是jvm内部的锁,锁监视器是JVM内部的,一个只管一个。
2.3.4.1 分布式锁概念
分布式锁: 满足分布式系统或集群模式下多进程可见并且互斥的锁。
需要满足特性:多进程可见、互斥、高可用、高性能、安全性
MySql | Redis | Zookeeper | |
---|---|---|---|
互斥 | 利用mysql本身的互斥锁机制 | 利用setnx的互斥命令 | 利用节点的唯一性和有序性实现互斥 |
高可用 | 好 | 好 | 好 |
高性能 | 一般 | 好 | 一般 |
安全性 | 断开连接,自动释放锁 | 利用锁超时时间,到期释放 | 临时节点,断开连接自动释放 |
2.3.4.2 基于Redis的分布式锁
实现分布式锁需要实现的的两个基本方法:
- 获取锁
- 互斥:保证只能有一个线程获取锁(setnx操作+expire操作
SET lock thread1 EX 10 NX
) - 非阻塞:尝试一次,成功返回true,失败返回false
- 互斥:保证只能有一个线程获取锁(setnx操作+expire操作
- 释放锁
- 手动释放 (del操作)
- 超时释放:获取锁时设置一个超时时间
流程:
①尝试获取锁
②获取锁成功,执行业务,执行完释放锁
③业务超时或服务当即则自动释放锁
2.3.4.2.1 实现思路
- 利用set nx ex获取锁,并设置过期时间,保存线程标识
- 释放锁时先判断线程标识是否与自己一致,一致则删除锁
特性
- 利用
set nx
满足互斥性 - 利用
set ex
保证故障时锁依然能够释放,避免死锁,提高线程安全性 - 利用Redis集群保证高可用和高并发特性
2.3.4.2.2 存在问题
假如说,业务阻塞时间比设定的超时释放时间要长,那么就可能会有其他的线程趁虚而入拿到锁,执行自己的业务逻辑,但当第一个业务逻辑执行完毕释放锁的时候,释放的又是这个趁虚而入的线程拿到的锁,那么就产生了问题。
改进的Redis分布式锁
①在获取锁时存入线程标识(可用UUID)
②在释放锁时先获取所种的线程标识,判断是否与当前线程标识一致,一致才释放
但还是会出现问题,即在判断线程标识的时候通过,但是还未删除之前,超时释放锁了,而线程2趁虚而入拿到锁,还是会出现线程1释放线程2的极端情况。
所以说要让 判断标识
和 释放锁
成为一个原子性操作。
使用LUA脚本再次改进
LUA脚本可以保证Redis操作原子性。
Redis使用同一个Lua解释器来执行所有命令,同时,Redis保证以一种原子性的方式来执行脚本:当lua脚本在执行的时候,不会有其他脚本和命令同时执行,这种语义类似于 MULTI/EXEC。从别的客户端的视角来看,一个lua脚本要么不可见,要么已经执行完。
类似于在对整个脚本加锁,脚本执行完再执行其他脚本或redis指令。
--比较线程标识与锁中标识是否一致
if(redis.call('get',KEYS[1]) == ARGV[1]) then
-- 释放锁 del key
return redis.call('del',KEYS[1])
end
return 0
2.3.4.2.3 具体代码
public class SimpleRedisLock implements ILock {
private StringRedisTemplate stringRedisTemplate;
private String name;
public SimpleRedisLock(String name,StringRedisTemplate stringRedisTemplate){
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";
private static DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public boolean tryLock(long timeoutSec) {
//获取线程标示
String threadId = ID_PREFIX+Thread.currentThread().getId();
//获取锁
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(flag);//防止自动拆箱 null就成了空指针
}
@Override
public void unlock() {
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX+Thread.currentThread().getId()
);
}
//下文是单机程序时使用,即未使用lua脚本的unlock代码
// @Override
// public void unlock() {
// //获取线程标示
// String threadId = ID_PREFIX+Thread.currentThread().getId();
// //获取锁中标识
// String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// if(threadId.equals(id)){
// stringRedisTemplate.delete(KEY_PREFIX + name);
// }
// }
}
2.3.4.3 Redisson
基于setnx实现的分布式锁存在下面的问题:
- 不可重入:同一个线程无法多次获取同一把锁,可能造成死锁
- 不可重试:获取锁只尝试一次就返回false,没有重试机制
- 超时释放:超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
- 主从一致性:如果Redis提供了主从集群,主从同步存在延迟,当主宕机时,如果从并同步主中的锁数据,则会出现锁实现
Redisson 是一个再Redis的基础上实现的java驻内存数据网格。它不仅提供了一系列分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
2.3.4.3.1 Redisson相关配置
① 引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.19.0</version>
</dependency>
②配置Redis客户端
@Configuration
public class RedisConfig {
@Bean
public RedissonClient redissonClient(){
//配置类
Config config = new Config();
//添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://IP:端口").setPassword("密码");
//创建客户端
return Redisson.create();
}
}
③使用Redisson的分布式锁
@Resource
private RedissonClient redissonClient;
@Test
public void testRedisson() throws InterruptedException {
// 获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
// 尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
// 判断释放获取成功
if(isLock){
try {
System.out.println("执行业务");
}finally {
// 释放锁
lock.unlock();
}
}
}
2.3.4.3.2 Redisson可重入锁原理
可重入,大体意思是,在获取锁的时候,当这个锁被占,就判断占有人是否为自己(同一个线程),如果是自己,则再次获取锁,并设有计数器,记录重入次数。可以利用hash结构实现。
具体流程:
- 判断锁是否存在
- 若锁不存在,则获取锁并添加线程标识
- 若锁存在,则判断锁的标识是否为自己,若否,则获取失败;若是,则锁计数器+1
- (拿到锁之后)设置锁有效期,执行业务
- 释放锁时,判断锁是否为自己
- 若否,则锁已经被释放了(可能超时释放)
- 若是,锁计数器-1
- 判断锁计数是否为0,若否,则重置锁有效期,继续执行业务(转到4);若锁计数器是0,则释放锁
这个复杂的流程一般是用LUA脚本来实现。
获取锁的LUA脚本:
释放锁的LUA脚本:
Redisson分布式锁原理:
- 可重入:利用hash结构记录线程id和重入次数
- 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
- 超时续约:利用watchDog,每隔一段时间(releaseTime/3),重置超时时间(定时任务)
2.3.4.4 总结
- 不可重入的Redis分布式锁
- 原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标识
- 缺点:不可重入、无法重试、锁超时失效
- 可重入的Redis分布式锁
- 原理:利用hash结构,记录线程标识和重入次数;利用watchDog延续锁时间;利用信号量空值锁重试等待
- 缺点:redis宕机引起锁失效问题
- Redissson的multiLock
- 原理:多个独立的Redis节点,必须在所有节点都获取到重入锁,才算获取锁成功
- 缺陷:运维成本高、实现复杂
2.3.4 Redis秒杀优化
2.3.4.1 流程
2.3.4.2 代码
2.3.4.2.1 Controller
@PostMapping("seckill/{id}")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {
return voucherOrderService.seckillVoucher(voucherId);
}
2.3.4.2.2 Service
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIDWorker redisIDWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
//读取LUA脚本
private static DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
//阻塞队列
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);
//线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
//代理对象
private IVoucherOrderService proxy;
//程序启动就运行该异步订单的线程
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
//异步线程订单存库
private class VoucherOrderHandler implements Runnable{
@Override
public void run() {
while (true){
try {
//1. 获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
//2. 创建订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常", e);
}
}
}
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
//1. 获取用户
Long userId = voucherOrder.getUserId();
//2. 创建锁对象
RLock lock = redissonClient.getLock("order:" + userId);
//3. 获取锁 tryLock()默认 -1 30 Second
boolean isLock = lock.tryLock();
//4. 判断是否获取锁成功
if(!isLock){
//获取所失败,返回错误或重试
log.error("不允许重复下单");
return ;
}
try {
proxy.createVoucherOrder(voucherOrder);
}finally {
lock.unlock();
}
}
@Override
public Result seckillVoucher(Long voucherId) {
//获取用户
Long userId = UserHolder.getUser().getId();
//1.执行LUA脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString()
);
int r = result.intValue();
//2. 判断结果是否为0
if(r != 0){
//2.1 不为0,代表没有购买资格
return Result.fail(r==1?"库存不足":"不能重复下单");
}
//2.2 为0,有购买资格,把下单信息保存至阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
//2.3 订单ID--全局唯一ID生成器
Long orderId = redisIDWorker.nextId("order");
voucherOrder.setId(orderId);
//2.4 用户id
voucherOrder.setUserId(userId);
//2.5 代金券id
voucherOrder.setVoucherId(voucherId);
//2.6 放入阻塞队列
orderTasks.add(voucherOrder);
//3. 获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
//4. 返回订单id
return Result.ok(orderId);
}
@Transactional
@Override
public void createVoucherOrder(VoucherOrder voucherOrder) {
Long voucherId = voucherOrder.getVoucherId();
//扣减库存
boolean updateFlag = seckillVoucherService.lambdaUpdate()
.setSql("stock = stock -1")
.eq(SeckillVoucher::getVoucherId, voucherId)
.gt(SeckillVoucher::getStock, 0)
.update();
if (!updateFlag) {//扣减失败
log.error("库存不足");
return;
}
//7. 创建订单
save(voucherOrder);
}
}
2.3.4.2.3 LUA脚本
-- 1.参数列表
-- 1.1优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get',stockKey)) <= 0) then
-- 3.2 库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember',orderKey,userId) == 1) then
-- 3.3.存在,说明是重复下单
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd',orderKey,userId)
return 0
2.3.4.3 总结
秒杀业务优化思路:
- 先利用Redis完成库存余量、一人一单判断,完成抢单业务
- 再将下单业务放入阻塞队列,利用独立线程异步下单
基于阻塞队列的异步秒杀存在的问题:
- 内存限制问题(jdk的阻塞队列,使用jvm内存,可能导致内存溢出/容量大小固定导致超出订单无法处理)
- 数据安全问题(数据不一致,程序宕机或重启,阻塞队列中数据都会消失)
2.3.5 Redis消息队列实现异步秒杀
2.3.5.1 Redis消息队列
消息队列, 字面意思就是存放消息的队列。最简单的消息队列模型包括三个角色
- 消息队列:存储和管理消息,也称为消息代理。
判断秒杀时间和库存、校验一人一单、发送优惠券id和消息队列
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息。
接收消息完成下单.
Redis提供了三种不同的方式来实现消息队列:
- list结构:基于list结构模拟消息队列
- PubSub:基本的点对点消息模型
- Stream:比较完善的消息队列模型
2.3.5.2 基于List结构模拟消息队列(理论)
Redis的list数据结构是一个双向链表,可以利用LPUSH结合RPOP,或者RPUSH结合LPOP实现。
但队列中没有消息时,LPOP或RPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。
优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失
- 只支持单消费者
2.3.5.3 基于PubSub的消息队列
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
SUBSCRIBE channel [channel]
:订阅一个或多个频道PUBLISH channel msg
:向一个频道发送消息PUBSCRIBE pattern[pattern]
:订阅与pattern格式匹配的所有频道- 通配符:
?
任意一个字符;*
0-n个任意字符;`[ae]只能是a或e
- 通配符:
优点:
- 采用发布订阅模型,支持多生产多消费
缺点: - 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
2.3.5.4 基于Stream的消息队列
Stream是Redis5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
发送消息的命令:xadd
例如:
读消息的方式之一:XREAD
xread阻塞方式,读取最新的消息:
消费者组:将多个消费者划分到一个组中,监听同一个队列,具有以下特点:
- 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
- 消息标识:消费者组会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后读取消息。确保每一个消息都会被消费
- 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。
①创建消费者组:XGROUP CREATE key groupName ID [MKSTREAM]
key:队列名称
groupName:消费者组名称
ID:起始ID标识,$代表队列中最后一个消息,0代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列
②删除指定消费者组:XGROUP DESTORY key groupName
③给指定的消费者组添加消费者:XGROUP CREATECONSUMER key groupname consumername
**④删除消费者组中指定消费者:****XGROUP DELCONSUMER key groupName consumername
⑤从消费者组读取消息:XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group:消费者组名称
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
count:本次查询的最大数量
BLOCK milliseconds:当没有消息时最长等大时间
NOACK:无需手动ACK,获取到消息后自动确认
ID:获取消息的起始ID。>
是从下一个未消费的消息开始;其他则是根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
⑥确认消息:XACK key group ID [ID ...]
⑦查看未确认消息:XPENDING key group [[IDLE MIN-IDLE-TIME] start end count [consumer]]
IDLE:空闲时间时间超过min-idle-time的才要
start、end:起止范围 - +
代表所有
count:数量
consumer:哪个消费者的消息
总结:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读风险
- 有消息确认机制,保证消息至少被消费一次
2.3.5.5 Redis各种类型消息队列对比
List | PubSub | Stream | |
---|---|---|---|
消息持久化 | 支持 | 不支持 | 支持 |
阻塞读取 | 支持 | 支持 | 支持 |
消息堆积处理 | 受限于内存空间,可以利用多消费者加快处理 | 受限于消费者缓冲区 | 受限于队列长度,可以利用消费者组提高消费速度,减少堆积 |
消息确认机制 | 不支持 | 不支持 | 不支持 |
消息回溯 | 不支持 | 不支持 | 支持 |
2.3.5.6 基于Redis的Stream结构作为消息队列,实现异步秒杀下单
2.3.5.6.1 命令行创建消息队列
XGROUP CREATE streams.order g1 0 MKSTREAM
2.3.5.6.2 LUA脚本
-- 1.参数列表
-- 1.1优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]
-- 1.3 订单ID(STREAM消息队列)
local orderId = ARGV[3]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get',stockKey)) <= 0) then
-- 3.2 库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember',orderKey,userId) == 1) then
-- 3.3.存在,说明是重复下单
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd',orderKey,userId)
-- 3.6.发送消息到队列中, XADD stream.order * k1 v1 k2 v2(STREAM消息队列)
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
return 0
2.3.5.6.3 业务逻辑
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIDWorker redisIDWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
private static DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
private IVoucherOrderService proxy;
private String queueName = "streams.order";
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
//1.获取队列中的订单消息 XREADGROUP GROUP g1 c1 count 1 BLOCK 2000 STREAMS streams.order >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
//2.判断消息获取是否成功
if(list == null || list.isEmpty()){
//如果获取失败则说明没有消息,继续下次循环
continue;
}
//3. 解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//4.如果获取成功,可以下单
handleVoucherOrder(voucherOrder);
//5.ACK确认 SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
handlePendingList();
}
}
}
}
private void handlePendingList() {
while (true) {
try {
//1.获取pending-list中的订单消息 XREADGROUP GROUP g1 c1 count 1 STREAMS streams.order 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
//2.判断消息获取是否成功
if(list == null || list.isEmpty()){
//如果获取失败则说明没有消息,继续下次循环
break;
}
//3. 解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//4.如果获取成功,可以下单
handleVoucherOrder(voucherOrder);
//5.ACK确认 SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.error("处理penging-list异常", e);
try {
Thread.sleep(20);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
//1. 获取用户
Long userId = voucherOrder.getUserId();
//2. 创建锁对象
RLock lock = redissonClient.getLock("order:" + userId);
//3. 获取锁 tryLock()默认 -1 30 Second
boolean isLock = lock.tryLock();
//4. 判断是否获取锁成功
if (!isLock) {
//获取所失败,返回错误或重试
log.error("不允许重复下单");
return;
}
try {
proxy.createVoucherOrder(voucherOrder);
} finally {
lock.unlock();
}
}
/**
秒杀业务逻辑
*/
@Override
public Result seckillVoucher(Long voucherId) {
//获取用户
Long userId = UserHolder.getUser().getId();
//获取订单
Long orderId = redisIDWorker.nextId("order");
//1.执行LUA脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
int r = result.intValue();
//2. 判断结果是否为0
if (r != 0) {
//2.1 不为0,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
//3. 获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
//4. 返回订单id
return Result.ok(orderId);
}
@Transactional
@Override
public void createVoucherOrder(VoucherOrder voucherOrder) {
//5. 一人一单
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
//5.1 查询订单(这个部分其实没必要,兜底)
Integer count = lambdaQuery().eq(VoucherOrder::getUserId, userId).eq(VoucherOrder::getVoucherId, voucherId).count();
//5.2 判断是否存在
if (count > 0) {
//用户已经购买过了
log.error("用户已经购买过一次!");
return;
}
//6. 扣减库存
boolean updateFlag = seckillVoucherService.lambdaUpdate()
.setSql("stock = stock -1")
.eq(SeckillVoucher::getVoucherId, voucherId)
.gt(SeckillVoucher::getStock, 0)//CAS方式乐观锁,因为mysql在update的时候有行锁是串行的,所以可以
.update();
if (!updateFlag) {//扣减失败
log.error("库存不足");
return;
}
//7. 创建订单
save(voucherOrder);
}
}