目录
四、 优惠卷秒杀系列功能实现
4.1 全局ID生成器
4.1.1 全局ID生成器的选型
4.1.2 全局ID生成器的实现
4.1.3 全局ID生成器的测试
4.1.4 其他ID生成器的拓展
4.2 利用PostMan模拟管理员后台添加秒杀优惠卷信息
【代码实现】
【PostMan测试】
4.3 优惠卷秒杀下单功能基础逻辑实现
【功能说明】
【代码实现】
【功能测试】
4.4 (优化)解决超卖问题
4.4.1 超卖现象重现及其原因
4.4.2 解决超卖问题的方案比较
悲观锁方案
乐观锁方案
4.4.3 乐观锁两大方案比较
版本号法(涉及ABA问题时使用)
CAS法(不涉及ABA问题时使用)
4.4.4 基于CAS法解决超卖问题
4.5 (优化)实现单机情况下 限购一人一单
4.5.1 限购功能整体概述
4.5.2 限购功能实现及说明
【基础实现】
【基础锁实现】
【降低锁粒度实现】
【关于如何锁用户】
【扩大锁范围实现】
【解决代理对象事务】
4.5.3 限购功能测试检验
4.6 (拓展)集群模式下限购一人一单
4.6.1 单体模式下限购功能实现的局限性
4.6.2 配置集群环境,测试限购代码的并发问题
4.6.3 解决策略——分布式锁
4.7 (知识点)分布式锁
4.7.1 分布式锁介绍
4.7.2 基于Redis的分布式锁方案
4.7.3 基于Redis实现分布式锁的初级版本
4.7.3.1 代码实现
4.7.3.2 接口测试
4.7.4 (优化)解决分布式锁出现的误删问题
4.7.4.1 出现误删的原因判断
4.7.4.2 解决方案——检查锁是不是自己的:
4.7.4.3 代码实现及其测试
4.7.5 (优化)解决分布式锁的原子性问题
4.7.5.1 非原子性操作带来的并发安全问题
4.7.5.2 解决方案——引入Rua脚本实现命令原子化
4.7.5.3 Rua脚本在Redis中的基本使用
4.7.5.4 使用Rua编写释放锁脚本
4.7.5.5 在IDEA编写使用lua脚本的代码
4.7.5.6 功能测试
4.8 (拓展知识点) Redisson ——成熟的锁工具包
4.8.1 基于setnx实现的分布式锁还存在的问题
4.8.2 Redisson介绍
4.8.3 Redisson入门
4.8.4 Redisson的可重入原理
4.8.5 (TODO)Redisson的可重试原理
4.8.6(TODO) Redisson的超时释放原理
4.8.7 Redisson的主从一致性原理
4.8.8 (TODO)建立Redis集群环境,测试主从一致性问题
4.9 Redis优化秒杀系列功能实现
4.9.1 业务性能瓶颈分析
4.9.2 优化流程分析
4.9.3 改进秒杀业务代码改造及详细解释
1. 将优惠卷信息存入Redis中
2. 基于Lua脚本,判断秒杀资格
3. 抢购成功,封装信息到阻塞队列中,实现异步下单
4.9.4 Redis优化秒杀功能测试
1. 简单测试功能是否实现,使用postman发送请求
2. 利用jmeter测试高并发情况
4.9.5 总结
4.10(知识点) 学习Redis消息队列实现异步秒杀
4.10.1 基于List类型模拟的消息队列原理及其使用
4.10.2 基于PubSub的消息队列原理及其使用
4.10.3(三者之最) 基于Stream的消息队列原理及其使用
4.10.4 基于Redis的Stream结果作为消息队列,优化异步秒杀下单功能
4.10.4.1 创建Stream消息队列
4.10.4.2 修改秒杀资格判断的Lua脚本
4.10.4.3 完善秒杀下单代码及其代码说明
1. 首先是修改预加载Lua脚本信息
2. 主程序改变逻辑,将消息队列逻辑放入到了lua脚本
3. 重头戏,开启线程任务!!
4.10.4.4 秒杀下单功能测试
1. 简单测试功能是否实现,使用postman发送请求
2. 利用jmeter测试高并发情况
4.11 总结情况
四、 优惠卷秒杀系列功能实现
4.1 全局ID生成器
4.1.1 全局ID生成器的选型
为什么需要全局ID生成器呢?
由于秒杀业务需要,有时候我们会生成数量极为庞大的业务数据。但是对于单张表,往往会存在存储数据的上限(随着存储数据的增加,查询的效率会越来越低),我们往往会进行分表操作。
这么一来,分表带来的 ID不唯一、ID自增的规律性太强导致安全性低等问题就产生了。为了解决这些问题,我们可以利用全局ID生成器进行开发。
全局ID生成器需要具备的特征:
使用Redis符合上述五点需求
高可用、高性能: Redis的查询能力本身就比数据库要快得多
唯一性: 由于Redis数据库可以全局通用一张表,因此可以更好地维护ID的唯一性
递增性:Redis拥有自增操作,INCREMENT
安全性:通过设置ID的自增规律,可以提高ID的安全性
全局ID生成器的格式规范:
4.1.2 全局ID生成器的实现
步骤:
1. 获取一个基准时间戳,标记为BEGIN_TIMESTAMP
2. 获取当前时间戳标记为now
3. 计算全局ID的时间戳部分
4. 以天为单位,分割key【可以更好的统计每天的订单数量】
5. 获取全局ID的序列号【调用Redis的自增方法】
6. 利用运算符将时间戳 与 序列号 进行拼接并返回
在utils包下新建RedisWorkers用于实现全局ID生成:
@Component
public class RedisIdWorker {
// 开始时间戳 2024-09-12-0-0-0
private static final long BEGIN_TIMESTAMP = 1726099200L;
// 序列号位数
private static final int COUNT_BITS = 32;
private StringRedisTemplate stringRedisTemplate;
public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public Long nextId(String keyPrefix){
// 1. 时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond-BEGIN_TIMESTAMP;
// 2. 序列号
//2.1 获取当前日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 3. 拼接返回
return timestamp << COUNT_BITS | count;
}
/**
* 获取当前时间戳
* @param args
*/
public static void main(String[] args) {
LocalDateTime time = LocalDateTime.of(2024,9,12,0,0,0);
Long second = time.toEpochSecond(ZoneOffset.UTC); // 1726099200
System.out.println("second=" + second);
}
}
4.1.3 全局ID生成器的测试
开启3000个线程任务,每个线程插入100条订单数据,运行查看效果
@SpringBootTest
class HmDianPingApplicationTests {
@Resource
private RedisIdWorker redisIdWorker;
private ExecutorService es = Executors.newFixedThreadPool(500);
@Test
void testIdWorker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3000);
Runnable task = () ->{
for(int i=0;i<100;i++){
long id = redisIdWorker.nextId("order");
System.out.println("id = " + id);
}
latch.countDown();
};
long begin = System.currentTimeMillis();
for(int i=0;i<3000;i++){
es.submit(task);
}
latch.await();
long end = System.currentTimeMillis();
System.out.println("time = " + (end-begin));
}
}
4.1.4 其他ID生成器的拓展
除了采用Redis自增生成全局ID,还有以下方法可行:
1. UUID 【无序】
2. 数据库自增 【数据量不能太多】
3. 雪花算法 (本次项目使用的Redis自增实际上就是运用了雪花算法的思想)
Redis自增策略
1. 每天一个key,方便统计订单量
2. ID构造是 时间戳 + 计数器
4.2 利用PostMan模拟管理员后台添加秒杀优惠卷信息
【代码实现】
/**
* 新增秒杀优惠券
* 多表添加 加事务
* @param voucher
*/
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
// 关联优惠券id
seckillVoucher.setVoucherId(voucher.getId());
// 秒杀库存
seckillVoucher.setStock(voucher.getStock());
// 开始时间
seckillVoucher.setBeginTime(voucher.getBeginTime());
// 结束时间
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
}
【PostMan测试】
4.3 优惠卷秒杀下单功能基础逻辑实现
【功能说明】
实现基本的下单逻辑
请求方式 | POST |
请求路径 | /voucher-order/seckill/{id} |
请求参数 | 优惠卷id |
返回值 | 订单Id |
【代码实现】
利用全局Id生成订单id
@Resource
private ISeckillVoucherService seckillVoucherService;
/**
* 全局ID生成器
*/
@Resource
private RedisIdWorker redisIdWorker;
/**
* 秒杀优惠券下单
* @param voucherId
* @return
*/
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
//1. 提交优惠卷id 查询优惠卷id信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
if(voucher == null){
return Result.fail("优惠券不存在!");
}
//2. 判断优惠卷是否在抢购时间内
LocalDateTime beginTime = voucher.getBeginTime();
LocalDateTime endTime = voucher.getEndTime();
long stockCount =voucher.getStock();
if(beginTime.isAfter(LocalDateTime.now())){
// 否----> 返回异常信息---->结束
return Result.fail("秒杀尚未开始!");
}
if(endTime.isBefore(LocalDateTime.now())){
// 否----> 返回异常信息---->结束
return Result.fail("秒杀已经结束!");
}
//3. 判断优惠卷库存是否充足
if(stockCount <= 0){
// 否----> 返回异常信息---->结束
return Result.fail("库存不足!");
}
//4. 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1")
.eq("voucher_id",voucherId).update();
if(!success){
return Result.fail("库存不足!");
}
//5. 创建优惠卷订单
VoucherOrder voucherOrder = new VoucherOrder();
//5.1 订单id 【全局ID生成器】4
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//5.2 用户id 【当前登录用户】
long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
//5.3 优惠卷id 【传递过来的优惠卷id】
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//6. 返回订单id ----> 结束
return Result.ok(orderId);
}
【功能测试】
点击抢购,成功生成订单信息,库存扣减1
4.4 (优化)解决超卖问题
4.4.1 超卖现象重现及其原因
步骤:
利用jmeter进行多线程模拟高并发抢购优惠卷时的情况
现象:
发现优惠卷有超卖的现象
原因:
在多线程高并发访问的环境下,当库存为1时,正常情况下最后一个线程在执行完最后一个扣减操作的过程中,有其余的线程涌入,此时由于扣减操作未完成,库存量还是为1,从而误判库存剩余,最后导致超卖。
现象回顾
回看数据库报告
正常来说异常为50%才是正确的,但是这里的异常没有到50%。在高并发访问的过程中出现了超卖现象,即同一时刻两个线程同时完成导致优惠卷最后的数量异常。
4.4.2 解决超卖问题的方案比较
悲观锁方案
- 悲观锁,认为线程安全问题一定会发生,因此操作数据库之前都需要先获取锁,确保线程串行执行。常见的悲观锁有:
synchronized
、lock
- 悲观锁,适合写入操作较多、冲突频繁的场景
乐观锁方案
- 乐观锁,认为线程安全问题不一定发生,因此不加锁,而是判断有没有其他线程在自己进行操作时修改了数据,有则重试。常见的实现方式有:版本号法、CAS操作.
- 乐观锁,适合读取操作较多、冲突较少的场景。
4.4.3 乐观锁两大方案比较
版本号法(涉及ABA问题时使用)
首先,我们要为数据库表新增一个版本号字段 version
然后,线程开始查询库存及其库存版本号,记录版本号信息。
接着,在通过判断后,执行扣减时,检查当前版本号与先前记录的版本号信息是否一致,如若一致(例如线程1),则可以顺利执行扣减更新语句。如若不一致(例如线程2),则无法执行扣减更新语句。
CAS法(不涉及ABA问题时使用)
在版本号的基础上,可以发现其实库存量直接可以充当版本号的作用,于是就出现了更加简洁,不用额外添加字段的方法——CAS法。
尽管CAS操作具有诸多优点,但它也存在一个潜在的问题,即ABA问题。ABA问题是指在CAS操作过程中,一个线程a将数值改成了b,接着又改成了a,此时CAS认为是没有变化,其实是已经变化过了。这个问题在多线程环境下可能会导致程序行为不符合预期,从而引发并发错误。
4.4.4 基于CAS法解决超卖问题
只需要在扣减库存这里加多一个库存值的判断,同时记得如果判断出问题了要重试!
//4. 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1")
.eq("voucher_id",voucherId).eq("stock",voucher.getStock())
.update();
上述代码的异常率特别高
解决失败率高的问题
//4. 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1")
.eq("voucher_id",voucherId).gt("stock",0) // 库存大于0就行了
.update();
4.5 (优化)实现单机情况下 限购一人一单
高能瞬间:实战篇-07.优惠券秒杀-实现一人一单功能_哔哩哔哩_bilibili
4.5.1 限购功能整体概述
为什么要一人一单?
对于秒杀优惠卷,我们当然不希望一个人承包大部分甚至所有的优惠卷,这种行径无疑是电商黄牛。我们希望每个顾客至多只能抢购一张券。
限购一人一单实现逻辑?
在扣减库存之前,我们对优惠卷数据进行查询,查看当前用户(用户id)是否在数据库中已经存在抢购数据,如果有了,则不再给予抢购机会。否则才会进行扣减库存的操作
实现限购功能需要克服的潜在风险?
存在线程并发问题: 假设有100个相同用户标识的线程同时查询数据库,发现数据库没有数据时,100个线程同时通过了判断,结果导致1个用户抢了多单
4.5.2 限购功能实现及说明
【基础实现】
根据前面的概述,我们大概明白了整个功能的逻辑,实际上就是简单的增加一个判断。
//4. 限制一人一单【悲观锁方案】
Long userId = UserHolder.getUser().getId();
//4.1 查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//4.2 判断订单是否存在
// 是 -----> 返回异常信息---->结束
if (count > 0) {
return Result.fail("该用户已经购买过一次了!");
}
【基础锁实现】
但是单单只添加判断逻辑,还会造成线程并发问题,因此我们需要给判断过程加锁,最简单的方式就是将一人一单功能抽取成一个方法,在方法上添加synchronized关键字
@Transactional
public synchronized Result createVoucherOrder(Long voucherId) {
//4. 限制一人一单【悲观锁方案】
Long userId = UserHolder.getUser().getId();
//4.1 查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//4.2 判断订单是否存在
// 是 -----> 返回异常信息---->结束
if (count > 0) {
return Result.fail("该用户已经购买过一次了!");
}
//5. 扣减库存——解决超卖问题【乐观锁方案】
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1")
.eq("voucher_id", voucherId).gt("stock", 0) // 库存大于0就行了
.update();
if (!success) {
return Result.fail("库存不足!");
}
//6. 创建优惠卷订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1 订单id 【全局ID生成器】4
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2 用户id 【当前登录用户】
// long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
//6.3 优惠卷id 【传递过来的优惠卷id】
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//7. 返回订单id ----> 结束
return Result.ok(orderId);
}
【降低锁粒度实现】
以上代码还无法实现全部功能,由于我们将整个方法都锁住了,相当于每个用户进行抢购时,都需要判断该方法是否已经被执行,这样会导致整个抢购秒杀过程退化成串行执行。 而我们想实现的是同一个用户标识线程串行执行,不同用户之间并行执行。因此我们需要降低锁的粒度,将锁作用在用户上。
【关于如何锁用户】
userId.toString().intern()
由于同一个用户标识的不同线程创建的User对象实际上并不是同一个,为了能达成用户级锁,我们必须是按值锁,因此需要调用toString().intern()方法
@Transactional
public Result createVoucherOrder(Long voucherId) {
//4. 限制一人一单【悲观锁方案】
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
//4.1 查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//4.2 判断订单是否存在
// 是 -----> 返回异常信息---->结束
if (count > 0) {
return Result.fail("该用户已经购买过一次了!");
}
//5. 扣减库存——解决超卖问题【乐观锁方案】
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1")
.eq("voucher_id", voucherId).gt("stock", 0) // 库存大于0就行了
.update();
if (!success) {
return Result.fail("库存不足!");
}
//6. 创建优惠卷订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1 订单id 【全局ID生成器】4
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2 用户id 【当前登录用户】
// long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
//6.3 优惠卷id 【传递过来的优惠卷id】
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//7. 返回订单id ----> 结束
return Result.ok(orderId);
}
}
【扩大锁范围实现】
上述代码实现了将锁的粒度成功的缩小到了用户级,但是还不是最优的代码,还存在一定的风险。具体的,由于@Transactional事务是作用在方法上的,而现在我们的锁仅仅是作用在方法内部,这样一来就会存在一种异常情况:当方法内部都执行完毕了,锁会先释放,然后才是Spring自动提交事务。在提交事务完成之前,锁已经释放掉了,这也就意味着在这期间其他的线程完全有机会趁虚而入。为了解决这个问题,我们必须把锁的范围重新扩大到将方法包裹其中。
/**
* 秒杀优惠券下单
* @param voucherId
* @return
*/
@Override
public Result seckillVoucher(Long voucherId) {
//1. 提交优惠卷id 查询优惠卷id信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
if(voucher == null){
return Result.fail("优惠券不存在!");
}
//2. 判断优惠卷是否在抢购时间内
LocalDateTime beginTime = voucher.getBeginTime();
LocalDateTime endTime = voucher.getEndTime();
long stockCount =voucher.getStock();
if(beginTime.isAfter(LocalDateTime.now())){
// 否----> 返回异常信息---->结束
return Result.fail("秒杀尚未开始!");
}
if(endTime.isBefore(LocalDateTime.now())){
// 否----> 返回异常信息---->结束
return Result.fail("秒杀已经结束!");
}
//3. 判断优惠卷库存是否充足
if(stockCount <= 0){
// 否----> 返回异常信息---->结束
return Result.fail("库存不足!");
}
/* 为什么要加在createVoucherOrder方法中?
* 因为createVoucherOrder方法中已经加了事务,如果加了锁,事务回滚,锁也会释放,这样就会导致锁失效
* 为什么要指定锁用户id?
* 将锁的粒度降低,将锁的粒度降低到用户粒度,这样就可以保证一个用户只会有一个订单
* 添加.intern() 确保是按值加锁 而不是按对象加锁
* 为什么要使用代理对象?
* @Transactional注解是Spring的事务注解,它只能在Spring管理的Bean中生效,
* 如果直接在ServiceImpl中调用createVoucherOrder方法,
* 那么@Transactional注解就不会生效,
* 因为createVoucherOrder方法不是在Spring管理的Bean中调用的,
* 所以需要使用代理对象来调用createVoucherOrder方法,这样@Transactional注解就会生效
*/
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
return createVoucherOrder(voucherId);
}
}
【解决代理对象事务】
上述代码看起来很完美了,但是事实上还是存在问题@Transactional事务机制是失效的。这是由于我们创建的createVoucherOrder()方法并不受Spring管理,所以没有办法使用Spring的事务机制。要解决这个问题,我们需要引入Spring中AspectJ注解支持来暴露代理对象,通过代理对象机制实现事务功能。具体操作如下:
1. 导入AspectJ注解支持依赖
2. 获取createVoucherOrder()方法的代理对象
3. 通过代理对象调用方法
4. 在启动类上添加@EnableAspectJAutoProxy(exposeProxy = true)注解,暴露代理对象
<!--动态代理模式-->
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
/**
* 启动类添加@EnableAspectJAutoProxy(exposeProxy = true) 暴露代理对象
*
*/
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
// 开启AspectJ注解支持 暴露代理对象
@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {
public static void main(String[] args) {
SpringApplication.run(HmDianPingApplication.class, args);
}
}
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
// 获取代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
4.5.3 限购功能测试检验
两百个线程共用一个token向后端发送请求,来模拟同一用户高并发抢购的情况
200个线程只有1个通过了请求,数据库中优惠卷剩余99张,优惠卷订单只有1个。证明我们的功能实现了
4.6 (拓展)集群模式下限购一人一单
4.6.1 单体模式下限购功能实现的局限性
在4.5时,我们成功的完成了限购一人一单的业务功能。事实上还是有相对较大的局限性。由于秒杀业务的高并发性,为了防止单台服务器压力过大的原因,我们会考虑将多台服务器部署成一个集群环境。这时候,我们在单体环境中写的加用户锁方案就有了并发安全问题。
如下图,我们知道。Java提供的锁方案事实上是由JVM下的锁监视器去进行监听的。因此在单体环境中,由于锁监视器唯一,可以正常的执行线程监听任务。但是放在集群环境中,每一个服务器都有一个JVM平台,都有一个锁监视器。在nginx服务器发送轮询请求时,可以有多个线程同时获得锁,同时执行。于是乎限购功能出现了并发安全问题。
4.6.2 配置集群环境,测试限购代码的并发问题
配置两台Tomcat服务器
配置nginx服务器轮询访问
并重新启动nginx服务器
测试集群环境
访问:localhost:8080/api/voucher/list/1 两次
8081、8082 各被轮询了1次
4.6.3 解决策略——分布式锁
解决策略无非是将监视器统一,且必须要在集成环境中唯一共享。分布式锁正是这样一种策略。
我们将会在下一小结详细介绍分布式锁的相关知识,并使用分布式锁在集群环境下解决并发安全问题
4.7 (知识点)分布式锁
4.7.1 分布式锁介绍
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁的特点:
1、高可用的获取锁与释放锁;
2、高性能的获取锁与释放锁;
3、多进程可见
4、具备可重入特性;
5、具备锁失效机制,防止死锁;
6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。
分布式锁的实现方式:
1. 基于数据库的实现:
使用数据库的事务和锁机制来实现分布式锁。可靠性高,支持事务机制和锁机制,但是性能较低,对数据库性能有一定影响。适用于需要强一致性和高可靠性的场景。
2. 基于缓存的实现:
使用缓存系统(如Redis)的原子操作来实现分布式锁。性能较高,支持原子操作,但是可靠性相对较低,需要处理缓存故障和失效的情况。适用于高并发场景。
3. 基于分布式协调服务的实现:
使用分布式协调服务(如ZooKeeper、etcd)来实现分布式锁。通过创建临时顺序节点和监听节点变化来控制锁的获取和释放。可靠性高,支持分布式环境,但是性能较低,对分布式协调服务的性能有一定影响。适用于需要高可靠性和一致性的场景。
4. 基于消息队列的实现:
使用消息队列来实现分布式锁。通过发送和接收消息来控制锁的获取和释放。
优点简单易用,适用于简单的场景。缺点性能较低,不适用于高并发和高吞吐量的场景。
4.7.2 基于Redis的分布式锁方案
获取锁 + 添加过期时间 【且确保原子性操作】
EX 设置过期时间 NX 不存在才添加
SETNX lock thread1 EX 10 NX
释放锁
DEL key
4.7.3 基于Redis实现分布式锁的初级版本
利用Redis,完成基本的分布式锁构建
4.7.3.1 代码实现
创建Lock接口
//父类
public interface ILock {
/**
* 获取锁
* @param timeoutSec
* @return
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}
实现Redis分布式锁方法
//实现 ILock接口
public class SimpleRedisLock implements ILock{
private String name;
private StringRedisTemplate stringRedisTemplate;
// 锁前缀
private static final String KEY_PREFIX = "lock:";
/**
* 获取锁
* @param timeoutSec
* @return
*/
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
long threadId = Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name ,threadId+"",timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
/**
* 释放锁
*/
@Override
public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
使用分布式锁
/*
* 分布式锁方案
*/
// 1. 创建锁对象
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
// 2. 尝试获取锁
boolean isLock = lock.tryLock(1200);
// 3. 判断锁是否获取成功
if(! isLock){
return Result.fail("不允许重复下单");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
// 4. 释放锁
lock.unlock();
}
4.7.3.2 接口测试
利用postman进行测试
发送两个请求,获取锁成功,一个获取锁失败,线程Id为72
4.7.4 (优化)解决分布式锁出现的误删问题
4.7.4.1 出现误删的原因判断
线程1获取分布式锁后运行,结果发生了业务阻塞,阻塞时间大于设定的超时释放锁时间。于是在线程1还未执行完业务时,分布式锁已经被释放了。
这时候线程2获取锁开始执行业务,恰好线程1结束了阻塞,也顺利的结束了业务,于是将锁释放了。这样一来线程2的锁又没有了,线程3恰巧又来了......如此反复,导致线程1释放线程2拿到的的锁;线程2释放线程3拿到的锁,造成并发安全问题
4.7.4.2 解决方案——检查锁是不是自己的:
如何知道这个锁是不是自己的呢? -----在获取锁时存入线程标识【UUID】
流程:
4.7.4.3 代码实现及其测试
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();// 判断锁是不是自己的
if(threadId.equals(id)){ // 如果是自己的锁,则释放锁
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
// 如果不是自己的锁,则不管
/**
* 获取锁
* @param timeoutSec
* @return
*/
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name ,threadId,timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
/**
* 释放锁
*/
@Override
public void unlock() {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断锁是不是自己的
if(threadId.equals(id)){ // 如果是自己的锁,则释放锁
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
// 如果不是自己的锁,则不管
}
集群环境测试
4.7.5 (优化)解决分布式锁的原子性问题
4.7.5.1 非原子性操作带来的并发安全问题
在我们完善了Redis分布式锁误删问题后,程序还是没有达到完美状态。以下这种情况依旧存在并发安全问题:
线程1获取锁执行业务,在执行业务完成后判断当前锁的标识是否和自己的一致,发现一致后,执行释放锁操作。但恰好在执行释放锁操作时出现了阻塞,知道超过了锁的超时释放时间。锁被释放了,于是乎线程2提前拿到了锁,结果线程1阻塞结束,又把线程2刚刚拿到的锁给释放掉了
4.7.5.2 解决方案——引入Rua脚本实现命令原子化
出现上述问题,究其原因就是 判断锁标识 和 释放锁 是两个动作,不符合原子性。因此若是在两者间发生了阻塞,就会造成并发安全问题。
所以解决策略也很简单, 将判断锁的操作和释放锁的操作组合成一个原子性操作,一起执行,要阻塞都阻塞,要通过都通过
在Redis中, 提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 命令,确保多条命令执行时的原子性。
4.7.5.3 Rua脚本在Redis中的基本使用
1. 执行Redis命令
redis.call('命名名称','key','其他参数') redi.call('set','name','jack')
2. 执行脚本
EVAL "脚本内容" 脚本需要的key类型的参数个数 # 执行无参脚本 EVAL "return redis.call('set','name','jack') " 0 # EVAL执行带参脚本 redis 127.0.0.1:6379> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second 1) "key1" 2) "key2" 3) "first" 4) "second"
4.7.5.4 使用Rua编写释放锁脚本
-- 释放锁的业务
-- 1. 获取锁中的线程标识
-- 2. 判断是否与指定的标识相同
-- 3. 一致则释放锁
----------------------开始脚本-------------------------
-- 锁的key
--local key = "lock:order:5"
local key = KEYS[1]
-- 当前线程标识
--local threadId = "asdasdasdasd"
local threadId = ARGV[1]
-- 获取锁中的线程标识
local id = redis.call("GET", key)
-- 比较锁中的线程标识与当前线程标识是否一致
if(id == threadId) then
-- 释放锁
redis.call("DEL", key)
end
return 0
----------------------结束脚本-------------------------
4.7.5.5 在IDEA编写使用lua脚本的代码
在SimpleRedisLock方法内修改:
// 提前读取脚本
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
// 指定脚本位置
UNLOCK_SCRIPT.setLocation(new ClassPathResource("lua/unlock.lua"));
// 指定返回类型
UNLOCK_SCRIPT.setResultType(Long.class);
}
/**
* 释放锁 --- 基于lua脚本
*/
@Override
public void unlock() {
/*
* 调用Lua脚本
* 参数1: 预加载的lua脚本
* 参数2: KEYS[]
* 参数3: ARGV[]
*/
String key = KEY_PREFIX + name;
String argv = ID_PREFIX + Thread.currentThread().getId();
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(key),
argv
);
}
4.7.5.6 功能测试
1.启动集群环境,先让8081 获取锁成功
2. 模拟锁超时,将Redis中的锁删掉,让8082获得一把新的锁
3. 让失去锁的8081执行lua脚本,观察8082的锁会不会被清除
结果:不会被清除
4. 让获得锁的8082执行lua脚本,观察8082的锁会不会被清除
结果: 被清除了
4.8 (拓展知识点) Redisson ——成熟的锁工具包
4.8.1 基于setnx实现的分布式锁还存在的问题
1. 不可重入问题:同一个线程无法多次获取相同的一把锁
2. 不可重试问题:获取锁只尝试一次就返回false,没有重试机制
3. 超时释放问题:业务耗时过长,还是有可能导致锁释放,设置超时时间很讲究
4. 主从一致性问题:主从同步存在延迟,如果在主节点设置锁,在还没有同步到从节点时,主节点宕机。
当然,以上问题指针对部分有需要的业务才会有问题,我们前面完善的Redis + lua脚本的分布式锁方案已经很优秀了
4.8.2 Redisson介绍
官网:Home · redisson/redisson Wiki (github.com)
Redisson 是一个在 Redis 的基础上实现的一个 Java 驻内存数据网格(In-Memory Data Grid, IMDG)。它提供了丰富的分布式和可扩展的 Java 数据结构,包括分布式锁和同步器。Redisson 的可重试锁(Retryable Lock)是基于 Redis 的特性来实现的一种锁机制,它允许在锁不可用时进行重试,直到成功获取锁或者达到重试次数上限。
4.8.3 Redisson入门
1. 引入依赖
<!--Redisson--> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.6</version> </dependency>
2. 创建Config文件
@Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient(){ // 配置 Config config = new Config(); // 单机模式 // 集群模式:config.useClusterServers().addNodeAddress("redis://192.168.186.136:7000"); config.useSingleServer().setAddress("redis://192.168.186.136:6379").setPassword("123321"); // 创建RedissonClient对象 return Redisson.create(config); } }
3. 使用Redisson——以秒杀优惠卷为例
/* * Redisson方案 */ //1. 创建锁对象 RLock lock = redissonClient.getLock("lock:order:" + userId); //2. 尝试获取锁 boolean isLock = lock.tryLock(); // 3. 判断锁是否获取成功 if(! isLock){ return Result.fail("不允许重复下单"); } try { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { // 4. 释放锁 lock.unlock(); }
4.8.4 Redisson的可重入原理
不可重入锁的基本流程Demo:
在method1中获取锁成功后,其调用的method2没办法获取锁。在这种嵌套的业务中,不可重入锁具有局限性
如何实现可重入?
添加一个当前获取锁的次数字段 —— hash结构存储
同一个线程内的业务方法在获取锁的时候,发现如果锁存在了就将锁的次数+1,锁的次数就代表了当前获取锁的方法个数。在进行删除锁时,需要先检查锁的重入次数-1 后是否为 0 ,是才能释放锁,否则只做-1操作。可重入锁的设计流程
可重入锁的脚本实现
--- --- Generated by EmmyLua(https://github.com/EmmyLua) --- Created by good boy. --- DateTime: 2024/9/15 16:30 --- ------------------------获取可重入锁的Lua脚本------------------------------------- local key = KEYS[1] -- 获取锁的key local threadId = ARGV[1] -- 获取线程唯一标识 local releaseTime = ARGV[2] -- 获取锁的自动释放时间 -- 判断锁是否存在 -- 不存在的逻辑 if(redis.call('exists',key) == 0 ) then -- 不存在则,获取锁,标记获取锁的次数 redis.call('hset', key, threadId, '1'); -- 设置过期时间 redis.call('expire', key, releaseTime); -- 返回获取锁成功 return 1; end; -- 锁已经存在,判断线程标识是不是自己的 -- 是自己的逻辑 if(redis.call('hexists', key, threadId) == 1) then -- 是自己的,则获取锁的次数+1 redis.call('hincrby', key, threadId, 1); -- 重置有效期 redis.call('expire', key, releaseTime); -- 返回获取锁成功 return 1; end; -- 走到这里说明获取到的锁并不是自己的 return 0; ----------------释放可重入锁的Lua脚本----------------------------- local key = KEYS[1] --获取锁的key local threadId = ARGV[1] --获取线程唯一标识 local releaseTime = ARGV[2] --获取锁的自动释放时间 -- 判断锁是不是自己的 -- 锁不是自己的逻辑 if(redis.call('hexists', key, threadId) == 0 ) then return nil; -- 锁不是自己的,直接返回nil,表示自己的锁已经被释放了 end; -- 锁是自己的逻辑 -- 将锁的计数器-1 local count = redis.call('hincrby', key, threadId, -1); -- 判断锁的计数器是否为0 if(count > 0) then -- 大于0,说明还有别的方法获取了锁,不用释放锁,只需要重置过期时间,让其他业务有足够的时间执行 redis.call('expire', key, releaseTime); return nil; else -- 等于0,说明最外层业务已经完成了,可以释放锁 redis.call('del', key); return nil; end;
4.8.5 (TODO)Redisson的可重试原理
基于看门狗 + 消息订阅模型实现
(看晕了,源码留后面啃)
实战篇-19.分布式锁-Redisson的可重入锁原理_哔哩哔哩_bilibili
4.8.6(TODO) Redisson的超时释放原理
基于看门狗的定时续约机制
简单来说,在成功获取锁之后,系统会立即启动一个自动续期机制(通常通过
scheduleExpirationRenewal(threadId)
方法实现),该机制利用一个映射(map)来关联业务名称与定时任务。这个定时任务被设置为每隔一定时间(例如10秒)执行一次,其主要职责是重置锁的最大超时时间。通过递归或循环调用重置锁时间的逻辑,确保锁在业务执行期间不会因为超时而被自动释放,从而实现了锁的“永久持有”效果,直到业务逻辑执行完毕。当业务逻辑执行完成并显式释放锁时,系统会同时取消之前设置的定时任务,以避免不必要的资源消耗。这种设计确保了锁的持有与业务执行周期紧密相关,既提高了系统的灵活性,又增强了资源使用的效率。
实战篇-20.分布式锁-Redisson的锁重试和WatchDog机制_哔哩哔哩_bilibili
4.8.7 Redisson的主从一致性原理
Redis主从集群模式造成一致性问题的原因
Java应用发送命令到主节点,主节点崩溃,Redis哨兵机制选取更新最近主节点的从节点,将其变成新的主节点。但是还是存在先前存储的命令失效问题
Redisson解决主从一致性问题方案——连锁方案
取消单主多从方案,采用高可用集群 + 主从分布
4.8.8 (TODO)建立Redis集群环境,测试主从一致性问题
实战篇-21.分布式锁-Redisson的multiLock原理_哔哩哔哩_bilibili
太吃电脑性能了,得晚点实现,或者等上docker
4.9 Redis优化秒杀系列功能实现
4.9.1 业务性能瓶颈分析
优化前的方案
业务耗时为Tomcat六步总和,其中查询优惠卷、查询订单、减库存、创建订单都去数据库查询,效率慢。此外,目前我们还没实现MySQL集群,就更慢了
优化后的方案
将校验秒杀资格的两个环节提前到Redis异步执行,从而减少处理的环节,提高整体效率。
4.9.2 优化流程分析
Redis部分——lua脚本实现
1. 判断秒杀库存------使用String结构存储
2. 校验一人一单------使用Set结构存储userIdJava部分
1. 判断lua脚本执行结果
2. 返回“小票”,优惠卷id、用户id、订单id
4.9.3 改进秒杀业务代码改造及详细解释
1. 将优惠卷信息存入Redis中
/** * 新增秒杀优惠券 * 多表添加 加事务 * @param voucher */ @Override @Transactional public void addSeckillVoucher(Voucher voucher) { // 保存优惠券 save(voucher); // 保存秒杀信息 SeckillVoucher seckillVoucher = new SeckillVoucher(); // 关联优惠券id seckillVoucher.setVoucherId(voucher.getId()); // 秒杀库存 seckillVoucher.setStock(voucher.getStock()); // 开始时间 seckillVoucher.setBeginTime(voucher.getBeginTime()); // 结束时间 seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher); // 保存库存到Redis stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY + voucher.getId(),voucher.getStock().toString()); }
2. 基于Lua脚本,判断秒杀资格
--- --- Generated by EmmyLua(https://github.com/EmmyLua) --- Created by good boy. --- DateTime: 2024/9/15 22:30 --- -----------判断秒杀资格的lua脚本--------------------------------- local voucherId = ARGV[1] -- 优惠卷id local userId = ARGV[2] -- 用户id local stockKey = 'seckill:stock:' .. voucherId -- 库存key local orderKey = 'seckill:order:' .. voucherId -- 订单key -- 脚本开始 -- 1. 判断库存是否充足 if( tonumber(redis.call('get', stockKey)) <=0) then -- 库存不足返回1 return 1 end -- 2. 判断用户是否下单 if( redis.call('sismember', orderKey, userId) == 1 ) then -- 用户已经下单返回2 return 2 end -- 3. 扣减库存 incrby -1 redis.call('incrby', stockKey, -1) -- 4. 下单(保存用户) redis.call('sadd', orderKey, userId) -- 5. 返回0表示下单成功 return 0
/** * 秒杀优惠券下单------秒杀优化代码----lua脚本 * @param voucherId * @return */ @Override public Result seckillVoucher(Long voucherId) { // 获取用户 Long userId = UserHolder.getUser().getId(); //1.执行Lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(),userId.toString() ); //2.判断结果是否为0 int r = result.intValue(); if(r != 0){ //3.不为0,代表没有购买资格 return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!"); } long orderId = redisIdWorker.nextId("order"); //TODO 4.为0,代表有购买资格,将下单信息保存至阻塞队列 //5.返回订单id return Result.ok(orderId); }
3. 抢购成功,封装信息到阻塞队列中,实现异步下单
调用创建订单逻辑,首先会进入到seckillVoucher秒杀优惠卷,进入seckillVouvher方法后,首先调用Lua脚本去判断该用户请求是否具备秒杀资格,如果具备秒杀资格,则创建订单信息,并将订单信息保存到阻塞队列中去.
/** * 预加载lua脚本 */ private static DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/Seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); }
/** * 秒杀优惠券下单------秒杀优化代码----lua脚本---主线程 * @param voucherId * @return */ private IVoucherOrderService proxy; @Override public Result seckillVoucher(Long voucherId) { // 获取用户 Long userId = UserHolder.getUser().getId(); //1.执行Lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(),userId.toString() ); //2.判断结果是否为0 int r = result.intValue(); if(r != 0){ //3.不为0,代表没有购买资格 return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!"); } long orderId = redisIdWorker.nextId("order"); //4.为0,代表有购买资格,将下单信息保存至阻塞队列 VoucherOrder voucherOrder = new VoucherOrder(); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); // 放入阻塞队列 orderTasks.add(voucherOrder); //提前 获取代理对象 proxy = (IVoucherOrderService) AopContext.currentProxy(); //5.返回订单id return Result.ok(orderId); }
注意标号顺序,我们提前定义了阻塞队列和线程池,并将线程任务放到初始化方法中,只要程序一启动,就会执行init()中的方法。
当seckillVoucher方法执行成功将订单信息存储到阻塞队列中后。只要阻塞队列有值,我们就会通过线程任务方法VoucherOrderHandler()方法取出队首元素,并且调用handleVocherOrder()方法创建订单。
进入到handleVocherOrder()方法,我们再次判断是否正常,判断通过后调用创建订单方法createVoucherOrder()
由于该方法添加了事务,在子线程中事务及其代理对象都失效了,为了能继续使用代理对象进行调用,我们在主方法seckillVoucher()中提前声明并赋值了代理对象,这样在子线程任务中就可以获取到代理对象执行创建订单的方法了。
进入到最后的创建订单方法createVoucherOrder() ,首先对一人一单、库存超卖问题再次进行判断,均通过以后创建订单。
通过上述步骤实现了异步创建订单,提高了并发效率//1. 创建-- 阻塞队列 private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); //2,创建-- 秒杀线程池 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); //5. 初始化方法 一初始化就执行 @PostConstruct public void init(){ SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } //4. 创建-- 订单外部类 private void handleVocherOrder(VoucherOrder voucherOrder){ // 获取用户 Long userId = voucherOrder.getUserId(); // 1. 创建锁对象 RLock lock = redissonClient.getLock("lock:order:" + userId); //2. 尝试获取锁 boolean isLock = lock.tryLock(); // 3. 判断锁是否获取成功 if(! isLock){ log.error("不允许重复下单"); } try { proxy.createVoucherOrder(voucherOrder); } finally { // 4. 释放锁 lock.unlock(); } } //3. 创建-- 秒杀线程任务 private class VoucherOrderHandler implements Runnable{ @Override public void run() { while (true) { try { //1. 获取队列中的订单信息 VoucherOrder voucherOrder = orderTasks.take(); //2. 创建订单 handleVocherOrder(voucherOrder); } catch (Exception e) { log.error("获取订单异常",e); } } } }
/** * 秒杀优惠券下单------秒杀优化代码----创建订单 * @param voucherOrder */ @Transactional public void createVoucherOrder(VoucherOrder voucherOrder) { //4. 限制一人一单【悲观锁方案】 Long userId = voucherOrder.getUserId(); //4.1 查询订单 int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count(); //4.2 判断订单是否存在 // 是 -----> 返回异常信息---->结束 if (count > 0) { log.error("用户已经购买了一次了"); } //5. 扣减库存——解决超卖问题【乐观锁方案】 boolean success = seckillVoucherService.update() .setSql("stock = stock-1") .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // 库存大于0就行了 .update(); if (!success) { log.error("库存不足"); } //6. 创建订单 save(voucherOrder); }
4.9.4 Redis优化秒杀功能测试
1. 简单测试功能是否实现,使用postman发送请求
【错误提醒】千万不要把Redis中的 seckill:stock:11 删掉啊!这个优惠卷信息是我们创建优惠卷时顺便存进去的,你要是将它清除了,后面的测试就进行不了了,lua脚本会报nil错误的(俺排了大半个小时)
2. 利用jmeter测试高并发情况
这里我生成了1000个token的文件,实际上是10个token重复了100次,用于模拟1000个请求(10个用户),同时请求jmeter的情况。结果和视频中做出来的不太一样,但是因为没有1000不同用户,所有我也没办法求证我的程序性能到底得到优化没有
优惠卷改成了5张
1000个token
开启测试
测试1s内发送1000个请求 10个不同用户token,抢购5张券
测试 1s内发送100个请求10个用户 抢5张券
测试5秒内 发送10000个请求 10个不同用户token 抢5张券
测试1秒内 发送10000个请求 10个不同用户token 抢5张券【有点不行】
4.9.5 总结
简单来说,就是Java阻塞队列还不够好
4.10(知识点) 学习Redis消息队列实现异步秒杀
消息队列(Message Queue):存放消息的队列。最简单的消息队列模型包括3个角色:
消息队列:存储和管理消息,也被称为消息代理(Message Broker)
生产者:发送消息到消息队列
消费者:从消息队列获取消息并处理消息Redis提供了三种不同的方式来实现消息队列:
list结构:基于List结构模拟消息队列
PubSub:基本的点对点消息模型
Stream:比较完善的消息队列模型基础的消息队列模型
4.10.1 基于List类型模拟的消息队列原理及其使用
在Day2中我就总结过Redis中各个数据结构使用的业务场景,其中基于双向链表实现的list结构则可以用于模拟消息队列的功能(Redis学习Day2——Redis基础使用-CSDN博客)
实现原理
- 使用 LPUSH + RPOP 或 RPUSH + LPOP 实现消息队列
- 使用 BLPOP 或 BRPOP 实现带有阻塞效果的消息队列
实现效果
基于List结构实现的消息队列比较简单,只支持单消费者的模式
优点
- 相比于java实现的阻塞队列,不会受限于JVM的存储上限,没有未知的内存溢出风险
- 属于Redis的基础数据结构,可以实现持久化存储,数据安全性也有所保障
- List队列,先进先出,消息处理有序
缺点
- 消息丢失无法避免
- 只支持单消费者模式
缺点说明
假设从Redis中取出一条消息,但是还没来得及处理就挂掉了,结果等到恢复正常后,刚从队列中取出来的消息已经不可复现了,因此存在消息丢失的风险。
由于消息一旦被某个消费者拿走了,就会从消息队列中移除,因此该模式仅仅支持单消费者
常用命令
- 阻塞监听
- 存放消息
4.10.2 基于PubSub的消息队列原理及其使用
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
实现原理
- 使用 SUBSCRIBE channel [channel] :订阅一个或多个频道
- 使用 PUBLISH channel msg :向一个频道发送消息
- 使用 PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
实现效果
发布者 + 订阅者模式,也就是说,可以实现多消费者的消息队列模型
优点
- 采用发布订阅模型,支持多生产者、多消费者的模型
缺点
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出是数据丢失
缺点说明
PubSub本身设计出来是用于消息发送的,不具备存储功能,因而没有持久化策略。当某个频道没有任何人订阅,在该频道发送的数据直接丢失了。此外PubSub发送的信息过多未处理完容易造成堆积丢失。因此PubSub不适用于可靠性要求高的场景。
常见命令使用
- 订阅单个频道【自带阻塞】
- 订阅多个频道
- 发布消息
4.10.3(三者之最) 基于Stream的消息队列原理及其使用
Stream是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
主要特征
数据结构:Redis Stream是一个由有序消息组成的日志数据结构,每个消息都有一个全局唯一的ID,确保消息的顺序性和可追踪性。
消息ID:消息的ID由两部分组成,分别是毫秒级时间戳和序列号。这种设计确保了消息ID的单调递增有序性。
消费者组:Redis Stream新增消费者组的概念,允许多个消费者以组的形式订阅Stream,并且每个消息只会被组内的一个消费者处理,避免了消息的重复消费。
优点
- 消息可回溯,可被多个消费者读取,可阻塞读取
- 消息读完了不消失,会永久的保持在队列当中。
缺点
- 任然有消息漏读的风险,在处理消息的过程中,如果同时来了多条消息,最后可能只能读到最后一条新消息,从而造成了消息漏读
常见命令使用
- 发送消息 —— XADD
- 读取消息—— XREAD
- 读取第一条消息,且可重复读
- 读取最新消息
- 读取并阻塞等待最新消息
- BUG——漏读消息
基于Stream的消息队列——消费者组
消费者组(Consumer Group):
将多个消费者划分到一个组中,监听同一个队列。具备下列的特点:
消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
消息标识:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。【解决消息丢失问题】
消费者组常见命令
- 创建消费者组——XGROUP CREATE
- 删除消费者组——XGROUP DESTORY
- 添加消费者到消费组——XGROUP CREATECONSUMER
- 将消费者移除消费组——XGROUP DELCONSUMER
- 从消费者组读取信息——XREADGROUP
4.10.4 基于Redis的Stream结果作为消息队列,优化异步秒杀下单功能
【步骤说明】
①创建一个Stream类型的消息队列,名为stream.orders
②修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
③项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
4.10.4.1 创建Stream消息队列
1. 登录redis
2. 创建队列和消费者组
4.10.4.2 修改秒杀资格判断的Lua脚本
--- --- Generated by EmmyLua(https://github.com/EmmyLua) --- Created by good boy. --- DateTime: 2024/9/16 20:32 --- -----------判断秒杀资格的lua脚本--------------------------------- local voucherId = ARGV[1] -- 优惠卷id local userId = ARGV[2] -- 用户id local orderId = ARGV[3] -- 订单id local stockKey = 'seckill:stock:' .. voucherId -- 库存key local orderKey = 'seckill:order:' .. voucherId -- 订单key -- 脚本开始 -- 1. 判断库存是否充足 if( tonumber(redis.call('get', stockKey)) <=0 ) then -- 库存不足返回1 return 1 end -- 2. 判断用户是否下单 if( redis.call('sismember', orderKey, userId) == 1 ) then -- 用户已经下单返回2 return 2 end -- 3. 扣减库存 incrby -1 redis.call('incrby', stockKey, -1) -- 4. 下单(保存用户) redis.call('sadd', orderKey, userId) -- 5. 发送消息到队列中 XADD [队列名]stream.orders * k1 v1 k2 v2 ... redis.call('XADD','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId) -- 6. 返回0 return 0
4.10.4.3 完善秒杀下单代码及其代码说明
1. 首先是修改预加载Lua脚本信息
/** 方案二、三公共代码 * 预加载lua脚本 */ private static DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript<>(); // 这是第二种方案需要执行的lua脚本 // SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/seckill.lua")); // 这是第三种方案需要执行的lua脚本 SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/streamSeckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); }
2. 主程序改变逻辑,将消息队列逻辑放入到了lua脚本
/** * 秒杀优惠券下单------秒杀优化代码----lua脚本---主线程---使用Redis stream的消息队列完成的 */ private IVoucherOrderService proxy; @Override public Result seckillVoucher(Long voucherId) { // 获取用户 Long userId = UserHolder.getUser().getId(); // 获取订单id long orderId = redisIdWorker.nextId("order"); //1.执行Lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(),userId.toString(),String.valueOf(orderId) ); //2.判断结果是否为0 int r = result.intValue(); if(r != 0){ //3.不为0,代表没有购买资格 return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!"); } //5.返回订单id return Result.ok(orderId); }
3. 重头戏,开启线程任务!!
【注意特意标明】【黑马点评】已解决java.lang.NullPointerException异常-CSDN博客
项目启动时,init()方法执行,开启线程任务VoucherOrderHandler()方法。
进入到VoucherOrderHandler()方法,我们不断循环尝试去去消息队列中的信息,
获取成功: 执行handleVocherOrder()方法创建订单;
获取失败: 说明没有消息 ---->继续循环
出现异常: 执行handlePendingList()方法处理PendingList的异常
handleVocherOrder()方法和之前java阻塞队列方案写法一致,不过多赘述。
进入到handlePendingList()方法后,一样循环获取PendingList中的消息
获取成功: 那就反过来调用handleVocherOrder()方法,执行订单创建
获取失败: 说明Pending List没有消息 ---->结束循环
出现异常: 休眠一段时间后自动回到VoucherOrderHandler()方法中的下一次循环
// 1,创建-- 秒杀线程池 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); //2. 初始化方法 一初始化就执行 @PostConstruct public void init(){ SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } //3. 创建线程任务用于接收消息队列的信息 private class VoucherOrderHandler implements Runnable{ // 消息队列名称 private String queueName = "stream.orders"; @Override public void run() { while (true) { try{ //1. 获取队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.oredes > // 指定队列名称,组名称,消费者名称,读取模式,读取数量,阻塞时间,队列名称,读取位置 List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(queueName, ReadOffset.lastConsumed()) ); //2. 判断消息获取是否成功 if( list == null || list.isEmpty()){ //2.1 获取失败 说明没有消息 ---->继续循环 continue; } // 解析消息中的订单信息 MapRecord<String,Object,Object> record = list.get(0); // 获取键值对集合 Map<Object,Object> values = record.getValue(); // 获取订单信息 VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true); //3. 获取成功,执行订单创建 handleVocherOrder(voucherOrder); //4. ACK确认 SACK stream.orders g1 id stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId()); }catch (Exception e) { // 消息没有被ACK确认 进入Pending List log.error("订单处理出现异常",e); handlePendingList(); } } } // 5.取不到订单————— 处理Pending List中的订单信息 private void handlePendingList(){ while (true) { try { //1. 获取Pending List中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.oredes 0 // 指定队列名称,组名称,消费者名称,读取模式,读取数量,阻塞时间,队列名称,读取位置 List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1), StreamOffset.create(queueName, ReadOffset.from("0")) ); //2. 判断消息获取是否成功 if (list == null || list.isEmpty()) { //2.1 获取失败 说明Pending List没有消息 ---->结束循环 break; } // 解析消息中的订单信息 MapRecord<String, Object, Object> record = list.get(0); // 获取键值对集合 Map<Object, Object> values = record.getValue(); // 获取订单信息 VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true); //3. 获取成功,执行订单创建 handleVocherOrder(voucherOrder); //4. ACK确认 SACK stream.orders g1 id stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId()); } catch (Exception e) { log.error("Pending List订单处理出现异常", e); try { Thread.sleep(20); }catch (InterruptedException interruptedException){ interruptedException.printStackTrace(); } } } } } // 4. 取到了订单—————创建订单 private void handleVocherOrder(VoucherOrder voucherOrder){ // 获取用户 Long userId = voucherOrder.getUserId(); // 1. 创建锁对象 RLock lock = redissonClient.getLock("lock:order:" + userId); //2. 尝试获取锁 boolean isLock = lock.tryLock(); // 3. 判断锁是否获取成功 if(! isLock){ log.error("不允许重复下单"); } try { proxy.createVoucherOrder(voucherOrder); } finally { // 4. 释放锁 lock.unlock(); } }
4.10.4.4 秒杀下单功能测试
1. 简单测试功能是否实现,使用postman发送请求
2. 利用jmeter测试高并发情况
老规矩10个token
这次我们先测试一下 10个线程请求 10个不同用户 抢5张卷的情况
测试一下 1000个线程请求 10个不同用户 抢5张卷的情况
测试一下 10000个线程请求 10个不同用户 抢5张卷的情况
4.11 总结情况
思路好像跟着来的,但是耗时和吞吐量始终是上不去,我上网看了看大家做的情况,好像都没有黑马一千多甚至两千的吞吐量。我感觉其实stream实现的消息队列相比java实现的阻塞队列性能上其实也没有优化到哪去,只是更加灵活可靠了吧。