文章目录
- Redis优化秒杀、Redis消息队列实现异步秒杀
- 一、秒杀优化
- 1.1 回顾“一人一单”秒杀业务代码
- 1.2 异步秒杀思路
- 1.3 基于Redis完成秒杀资格判断
- 1.3.1 修改VoucherServiceImpl
- 1.3.2 Lua脚本编写
- 1.3.3 Redis+lua判断用户是否抢购成功
- 1.3.4 基于阻塞队列实现异步秒杀下单
- 1.3.4 秒杀初步优化总结
- 二、Redis消息队列
- 2.1 基于List实现消息队列
- 2.2 基于PubSub实现消息队列
- 2.3 基于Stream的消息队列
- 2.3.1 Stream 单消费模式
- 2.3.1.1 发送消息
- 2.3.1.2 读取消息
- 2.3.1.3 STREAN类型消息队列的XREAN命令特点
- 2.3.2 Stream消费者组模式
- 2.3.2.1 创建消费者组
- 2.3.2.2 读取消息
- 2.3.2.3 确认消息
- 2.3.2.4 查看pending-list
- 2.3.2.5 消费者组 - 消费者监听消息基本思路
- 2.3.2.6 STREAN类型消息队列的XREANGROUP命令特点
- 2.4 总结
- 三、基于Stream消息队列实现异步秒杀
Redis优化秒杀、Redis消息队列实现异步秒杀
承接Redis - 优惠券秒杀、库存超卖、分布式锁、Redisson文章
一、秒杀优化
1.1 回顾“一人一单”秒杀业务代码
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀活动尚未开始");
}
// 3.判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀活动已经结束");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
return Result.fail("库存不足");
}
UserDTO user = UserHolder.getUser();
// 创建锁对象
// 锁定的范围与之前一样。切记不能把order锁住,范围太大了,以后有关order的都被锁住了
// 之前的方式:SimpleRedisLock lock = new SimpleRedisLock("order:" + user.getId(), stringRedisTemplate);
// TODO 使用Redisson客户端获取锁对象
RLock lock = redissonClient.getLock("lock:order:" + user.getId());
// 获取锁
// 订单大概是500ms,我们这里可以设定为秒
boolean isLock = lock.tryLock();
// 判断是否获取锁成功
if (!isLock) {
// 不成功
// 我们要避免一个用户重复下单,既然获取锁失败,说明在并发执行,我们要避免并发执行
return Result.fail("不允许重复下单");
}
// 成功
// createVoucherOrder方法执行过程中可能会有异常,我们放到try...catch中
try {
// 获取当前对象的代理对象 强转
VoucherOrderServiceImpl proxy = (VoucherOrderServiceImpl) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}finally {
// 出现异常做锁的释放
lock.unlock();
}
}
// 如果在方法上添加synchronized,说明同步锁是this,当前对象
// 不建议 把synchronized放在方法上,锁住此对象后,不管任何一个用户来了,都是这把锁,也就意味着整个方法被串行化了
// 所谓“一人一单”,只需要对同一个用户加锁即可,如果不是同一个用户,无需加锁
@Transactional
public Result createVoucherOrder(Long voucherId) {
// 新增一人一单的判断
UserDTO user = UserHolder.getUser();
// user.getId().toString()转换成字符串也无法保证线程安全,因为每次的String都不一样
// 我们可以加一个intern,是一个字符串对象规范表示,回去字符串常量池中找一找和此字符串的值一样的字符串地址并返回
// 查询订单
int count = query().eq("user_id", user.getId())
.eq("voucher_id", voucherId).count();
// 判断是否存在
if (count > 0) {
// 用户至少下过一单,不能再下了
return Result.fail("一人一单,不可重复下单");
}
// 说明没买过,继续执行代码
// 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1") //set stock = stock-1
.eq("voucher_id", voucherId) //where voucher_id= voucherId
.gt("stock", 0)//where stock>0
.update();
if (!success) {
return Result.fail("扣减失败,可能库存不足");
}
// 6.创建订单
// 我们只管订单id,代金券id,下单用户id
VoucherOrder voucherOrder = new VoucherOrder();
// 6.1 订单id
// 使用自定义id生成器生成id
long orderID = redisIdWorker.nextId("order");
voucherOrder.setId(orderID);
// 6.2 用户id
// 我们之前编写的登录拦截器
voucherOrder.setUserId(user.getId());
// 6.3 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 7.返回订单id
return Result.ok(orderID);
}
}
代码中有大量数据库的操作,整个业务性能并不是很好
平均耗时达到了497毫秒
1.2 异步秒杀思路
首先回顾一下之前秒杀业务的流程
-
前端发起请求到达我们的Nginx,然后Nginx会把我们的请求负载均衡到我们的tomcat
-
而在tomcat中执行各种逻辑,查询优惠券、判断秒杀库存、查询订单、校验一人一单、减库存、创建订单,整个业务的耗时就是各种逻辑耗时之和
其中查询优惠券、查询订单、减库存、创建订单四步访问数据库,数据库的并发能力本身比较差,更何况减库存和创建订单是对数据库的写操作,为了避免安全问题还加了分布式锁,也使得业务性能大打折扣
怎么改进一下呢?
将业务逻辑分成两个不同的线程,主线程的作用就是判断用户的购买资格,如果有资格,就开启一个独立的线程来处理耗时较久的减库存、创建订单操作,这样一来,效率大大提高
Redis的性能要比MYSQL的性能好,我们完全可以把判断秒杀库存操作和检验一人一单操作放到Redis中去做
主线程进来以后首先找Redis完成对于秒杀资格的判断,如果有资格就执行后续的减库存、下单操作
我们怎么在Redis中判断秒杀库存和检验一人一单呢?
我们需要把优惠券的库存信息和相关的订单信息缓存在redis中,我们应该选择一个什么样的数据结构?
-
库存比较简单,只需要一个普通String结构即可,key是优惠券的id,值是库存的值,将来在做库存判断的时候,看一下值是否大于0
当判断有库存的时候,value值要减1,先提前先减这
-
一人一单,我们需要在Redis中记录当前的优惠券被哪些用户购买过,以后再有用户来的时候,我们只需要判断此用户是否购买过
对此我们采用set集合,此可以确保元素的唯一性,可以在一个key中保存多个值,以后有用户下单成功后,我们就记录用户的id,再用更多用户来,我们依次记录就行。若发现set集合中已经存在了,那肯定不允许购买了
最终在Redis的流程图
对Redis的判断有很多的判断,业务流程比较长,我们必须确保执行时的一个原子性,这时就需要采用Lua脚本了。
这一部分不需要写java代码,需要Lua脚本
我们进入到Tomcat中需要做的是什么?
耗时较久的下单、减库存等核心写操作,并没有在当前流程中出现
什么时候做下单减库存的操作呢?
我们开启一个独立的线程来读取提前保存好的用户信息、优惠券信息,就可以完成异步数据库的写操作,当我们返回订单id给用户的那一刻,用户就可以付款了,秒杀业务已经结束了
所以我们什么时候将优惠券信息、用户信息写入到数据库里,完成下单减库存的操作,其实就没有那么重要了(实效性要求不高),根据MySQL数据库能够承受的频率去将数据写入数据库
1.3 基于Redis完成秒杀资格判断
案例:改进秒杀业务,提高并发性能
需求:
- 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
- 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
- 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
1.3.1 修改VoucherServiceImpl
新增秒杀优惠券的同时,将优惠券信息保存到Redis中
优惠券库存信息永久保存,没有过期时间
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// TODO 优惠券库存信息保存到Redis中
stringRedisTemplate.opsForValue().set("seckill:stock:"+voucher.getId(),voucher.getStock().toString());
}
测试一下
{
"shopId": 1,
"title": "100元代金券",
"subTitle": "周一至周五均可使用",
"rules": "全场通用\n无需预约\n可无限叠加\n不兑现、不找零\n仅限堂食",
"payValue": 8000,
"actualValue": 10000,
"type": 1,
"stock": 10,
"beginTime": "2022-01-25T10:09:17",
"endTime": "2025-01-26T12:09:04"
}
第一个需求完成
1.3.2 Lua脚本编写
基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
编写Lua脚本
要使用乱脚本完成下面的内容
-- 1. 参数列表
-- 1.1 优惠券id
-- 需要去redis中读取库存数量,判断是否充足,其中key的前缀seckill:stock是固定的,后面的id是需要传入的
local voucherId = ARGV[1]
-- 1.2 用户id
-- 需要知道用户id,才能判断用户之前是否下过单
local userId = ARGV[2]
-- 2.数据相关key
-- 2.1 库存key, lua中是用 .. 拼接字符串
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key,值是一个set集合,集合名称就是下面。内容是购买订单的用户的id,这样可以记录谁购买了谁没有购买
local orderKey = 'seckill:order:' .. voucherId
-- 3.lua脚本业务
-- 3.1 判断库存是否充足
-- redis.call('get',stockKey)得到的结果是字符串,是无法和数字比较的
if (tonumber(redis.call('get',stockKey))<=0) then
-- 3.2库存不足
return 1
end
-- 3.2判断用户是否下单
-- 借助命令SISMEMBER命令,判断一个给定的值是不是当前set集合中的一个成员,如果存在返回1,不存在返回0
if (redis.call('SISMEMBER',orderKey,userId) ==1) then
-- 3.3redis中存在,说明是重复下单
return 2
end
-- 3.4 扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.5 下单,保存用户 sadd orderKey userId
redis.call('sadd',orderKey,userId)
-- 成功返回0
return 0
1.3.3 Redis+lua判断用户是否抢购成功
如果抢购成功,将优惠券id和用户id封装后存入阻塞队列,完成下面的操作
改造VoucherOrderServiceImpl类
现在还没有编写阻塞队列
// TODO 获取脚本
// RedisScript是一个接口,我们使用一个实现类,泛型就是返回值的类型
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
// TODO 静态代码块做初始化
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
// TODO 借助Spring提供的方法区resource下找
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
// TODO 配置一下返回值类型
SECKILL_SCRIPT.setResultType(Long.class);
}
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
// TODO 1.执行lua脚本
// 我们没有传入key,所以传入一个空集合即可
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString()
);
// TODO 2.判断结果是否为0(0有购买资格,不为0没有购买资格)
int r = result.intValue();
if (r!=0){
// TODO 2.1 不为0没有购买资格
return Result.fail(r==1 ? "库存不足":"不能重复下单");
}
// TODO 2.2 为0有购买资格,把下单的信息保存到堵塞队列,后续可以异步完成下单业务
// 生成订单id
long orderId = redisIdWorker.nextId("order");
// 组合队列有点麻烦,先写到这里
// TODO 3. 返回订单id
return Result.ok(orderId);
}
1.3.4 基于阻塞队列实现异步秒杀下单
我们刚刚编写的代码没有编写阻塞队列,下面来编写一下
完成下面功能
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
- 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
这个地方就是将抢单成功的用户id,还有优惠券的id,以及相关的订单id封装一下放到一个队列里。
队列里放的就是所有等待下单的订单的信息
而后我们会开启一个线程的任务,异步去执行相关订单的操作,这样后下单就不会影响到我们整个业务的耗时
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
//TODO 获取脚本
// RedisScript是一个接口,我们使用一个实现类,泛型就是返回值的类型
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
//静态代码块做初始化
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
// 借助Spring提供的方法区resource下找
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
// 配置一下返回值类型
SECKILL_SCRIPT.setResultType(Long.class);
}
// TODO 创建堵塞队列
// 有一个特点:当一个线程尝试从这个队列里获取元素时,如果没有元素,这个线程就会堵塞,知道队列中有元素他才会被唤醒
// 1024*1024表示队列的长度
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
// TODO 创建一个线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct//当前类初始化完毕后执行这个方法
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
// TODO 创建一个线程任务
// 什么时候执行这个任务呢?用户秒杀抢购之前,因为一旦用户开始秒杀,阻塞队列中就会有新的订单,这个任务就应该去取出订单相关信息
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
// 不断的从堵塞队列orderTasks取出信息,然后执行
while (true) {
// take就是一个阻塞方法,获取队列中的头部,如果没有就等到有
try {
// TODO 1.获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
// TODO 2.创建订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
// e.printStackTrace();
log.error("处理订单异常", e);
}
}
}
}
/**
* 订单处理方法
* 我们在Redisson脚本中加了一个次锁了,在这里为什么还要再加一次锁呢?
* 做一个兜底方案,万一Redis出现问题,有个保证
*
* @param voucherOrder
*/
private void handleVoucherOrder(VoucherOrder voucherOrder) {
// TODO 1.获取用户
Long userId = voucherOrder.getUserId();
// TODO 2.创建锁对象
RLock lock = redissonClient.getLock("lock:order:" + userId);
// TODO 3.获取锁
boolean isLock = lock.tryLock();
// TODO 4.判断是否获取锁成功
if (!isLock) {
// 获取锁失败
log.error("不允许重复下单");
return;
}
// TODO
try {
// 之前这种获取代理对象的方式是获取不到的,因为不在同一个线程下面了
// VoucherOrderServiceImpl proxy = (VoucherOrderServiceImpl) AopContext.currentProxy();
// return proxy.createVoucherOrder(voucherId);
// TODO 在主线程获取,这个地方使用
proxy.createVoucherOrder(voucherOrder);
} finally {
// 出现异常做锁的释放
lock.unlock();
}
}
private VoucherOrderServiceImpl proxy;
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
// 1.执行lua脚本
// 我们没有传入key,所以传入一个空集合即可
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString()
);
// 2.判断结果是否为0(0有购买资格,不为0没有购买资格)
int r = result.intValue();
if (r != 0) {
// 2.1 不为0没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// TODO 2.2 为0有购买资格,把下单的信息保存到堵塞队列,后续可以异步完成下单业务
// TODO 阻塞队列,将用户id,订单id,优惠券id,保存到阻塞队列中
VoucherOrder voucherOrder = new VoucherOrder();
// TODO 2.3 生成订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// TODO 2.4 用户id
voucherOrder.setUserId(userId);
// TODO 2.5 代金券id
voucherOrder.setVoucherId(voucherId);
// TODO 2.6 放入阻塞队列
orderTasks.add(voucherOrder);
// TODO 3.获取代理对象
proxy = (VoucherOrderServiceImpl) AopContext.currentProxy();
// 3. 返回订单id
return Result.ok(orderId);
}
// 如果在方法上添加synchronized,说明同步锁是this,当前对象
// 不建议 把synchronized放在方法上,锁住此对象后,不管任何一个用户来了,都是这把锁,也就意味着整个方法被串行化了
// 所谓“一人一单”,只需要对同一个用户加锁即可,如果不是同一个用户,无需加锁
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
// 新增一人一单的判断
Long userId = voucherOrder.getUserId();
// user.getId().toString()转换成字符串也无法保证线程安全,因为每次的String都不一样
// 我们可以加一个intern,是一个字符串对象规范表示,回去字符串常量池中找一找和此字符串的值一样的字符串地址并返回
// 查询订单
int count = query().eq("user_id", userId)
.eq("voucher_id", voucherOrder.getVoucherId()).count();
// 判断是否存在
if (count > 0) {
// 用户至少下过一单,不能再下了
log.error("用户至少下过一单,不能再下了");
return;
}
// 说明没买过,继续执行代码
// 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1") //set stock = stock-1
.eq("voucher_id", voucherOrder.getVoucherId()) //where voucher_id= voucherId
.gt("stock", 0)//where stock>0
.update();
if (!success) {
log.error("扣减失败,可能库存不足");
return;
}
// TODO 创建订单
save(voucherOrder);
}
}
}
1.3.4 秒杀初步优化总结
-
秒杀业务的优化思路是什么?
变同步下单为异步下单
同步下单:请求来了之后判断有没有资格,有资格就自己下单,由于各种原因,耗时非常久
异步下单:业务分成两部分,一部分对于抢购资格的判断,如果有资格就立即结束,返回订单号;另一部分执行耗时较久的业务
- 先利用Redis完成库存余量、一人一单判断,完成抢单业务
- 再将下单业务放入阻塞队列,利用独立线程异步下单
-
基于堵塞队列的异步秒杀存在哪些问题?
- 内存限制问题
我们使用的是JDK中堵塞队列,使用的JVM内存,如果不加以限制,在高并发的情况下,可能会有无数的订单对象需要去创建并放到堵塞队列里面,会导致内存溢出(所以设置了一个队列的长度)
如果队列中存满了,就存不进去了
- 数据安全问题
我们现在基于内存保存这些订单信息,如果说服务突然宕机,内存中所有的订单信息就都丢失了,用户付款了但是后台没有相关的订单数据
二、Redis消息队列
消息队列(Message Queue),存放消息的队列
最简单的消息队列模型包括下列角色
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
消息队列的好处:
解除耦合,提高效率
想象成快递员、外卖柜、消费者
假设一个快递员来投递快递(没有外卖柜),结果没有快递柜就送货上门,但是消费者正在公司上班工作,此时用户就和快递员说等等我,如果快递员等的话就浪费了快递员的时间,如果不等的话用户可能找不到外卖
有了快递柜就不一样了,外卖员取到物品放到外卖柜即可,然后用户什么时候有时间,什么时候就去取,此时快递员和消费者之间就解除耦合了
对于秒杀操作相同,有人抢购商品时,我们不着急真正下单,先判断有没有资格,如果有购买的资格,这个时候不要去写入数据库,而是把信息存入到消息队列中
这个时候我们开启一个独立的线程作为消费者,他不断的从队列里获取消息,真正的完成下单功能
功能和读者队列差不多,但是有两点不同
- 消息队列时独立在JVM服务之外的独立服务,不受JVM内存限制
- 消息队列不仅仅做队列存储,还要确保数据的安全,要做持久化,不管服务宕机还是重启,数据不会丢失,消息投递给消费者以后,要求消费者做消息的确认,如果消费者没有确认,消息就会在队列中依然存在,下一次会再投递给消费者,让它继续处理,这个过程直到成功为止(确保消息至少被消费一次)
Redis提供了三种不同的方式来实现消息队列
- List结构:基于List结构模拟消息队列
- PubSub:基本的点对点消息模型
- Stream:比较完善的消息队列模型
之前也学习过两个消息队列
RabbitMQ基础介绍及同步通讯及异步通讯_mq怎么同步_我爱布朗熊的博客
SpringAMQP-Basic Queue、Work Queue、Fanout、Direct、Topic
2.1 基于List实现消息队列
Redis的List数据结构是一个双向链表,很容易模拟出队列效果
队列时入口和出口不在一遍,因此我们可以利用LPUSH结合RPOP或者RPUSH结合LPOP实现
List有关命令:Redis命令——通用命令、String类型、Key层级结构、Hash类型、List类型、Set类型、SortedSet类型_redis指令查看层级结构
但是!!当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并代码消息
因此这里应该使用BROPO或者BLPOP来实现阻塞效果
相比与JDK中阻塞队列此种方式有什么好处?
-
独立于JVM之外的独立存储,不依赖于JVM内存
-
数据安全,Redis支持数据持久化,存储到队列中后,数据就不会丢失
-
可以满足消息有序性(队列的特性:先进先出)
缺点是什么?
-
无法避免消息丢失
如果我们从Redis将消息取出,还没有执行完,挂掉了,那这个任务在Redis中就没有了,无法再次执行了
-
只支持单消费者
一旦任务被一个消费者拿走后,就从队列移除了,其他消费者不能够拿到。无法实现一条消息被很多人消费这样的需求
2.2 基于PubSub实现消息队列
PubSub(发布订阅)是Redis 2.0版引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关信息
相关命令
- SUBSCRIBE channel [channel] : 订阅一个或多个频道
- PUBLISH channel msg:向一个频道发送消息
- PSUBSCRIBE pattern [pattern]:订阅与pattern格式匹配的所有频道。pattern是通配符
?:代表一个字符
* :代表0个或多个字符
[ae]:表示可以是a字符,也可以是e字符
生产者:发送消息一方基本上没有什么变化,只不过发送消息的时候要带上通道名称
消费者:可以允许有多个消费者订阅,订阅的时候可以指定自己的频道名称
下面可以演示一下
消费者1进行订阅,发现天生就是堵塞式的
消费者2进行订阅
生产者进行发布消息,两个消费者可以收到消息
基于PubSub消息队列有哪些特点?
优点
- 采用发布订阅模型,支持多生产、多消费
缺点
-
不支持数据持久化
List结构的本质不是一个消息队列,是一个链表,只不过我们当成消息队列来用了
而PubSub本身设计出来就是来做消息发送的,如果我们发布出来的消息没有任何人订阅,那这个消息就丢失了
-
无法避免消息丢失
发布完没人收,就丢失了
-
消息堆积有上限,超出时数据丢失
2.3 基于Stream的消息队列
Stream是Redis 5.0引入的一种新数据类型
,可以实现一个功能非常完善的消息队列。
官网:Commands | Redis
2.3.1 Stream 单消费模式
2.3.1.1 发送消息
发送消息命令xadd
- key:队列名称
- NOMKSTREAM: 如果队列不存在,是否自动创建队列,默认自动创建,如果给了这个值,就不创建
- MAXLEN | MINID [= | ~] threshold [LIMIT count]:设置消息队列的最大消息数量。比如说我们将消息队列最大消息数量设置为1000,如果超过1000后,会将一些旧的信息剔除。如果不给值的话就不设置上限
- *|ID :指定消息唯一id,,*代表由Redis自动生成,格式是“时间戳-递增数字”
- field value:代表字段和值,存到消息里面的消息体,被称为Entry,格式就是多个key-value键值对,一个键值对就是一个Entry
例如:
2.3.1.2 读取消息
读取消息方式之一:xread
-
Count count:每次读取消息的最大数量,可以一次读多条,也可以读取一条
-
BLOCK milliseconds:当没有消息时,是否堵塞、堵塞时长。
BLOCK 如果不给参数就是不堵塞,如果有消息就直接返回,没有消息就返回为空;给参数是堵塞,有消息就返回,没消息就堵塞;
milliseconds的就是等待的时长,若为0,就是一直等待
-
STREAMS KEY [KEY …]:要从哪个队列读取消息,key就是队列名。可同时指定对个队列
-
ID:起始消息id,只返回大于该ID的消息。
0:代表从第一个消息开始
$:代表从最新的消息开始
测试一下
我们发现了,同一个消息可以读取两次。
读完之后不会删除,永久存在
开发中我们可以循环调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果
while(true){
//尝试读取队列中的消息,最多堵塞2秒
Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $");
if(mes == null){
continue;
}
// 处理消息
handleMessage(msg);
}
注意!当我们指定其实ID为$时,代表读取最新的消息,如果我们处理一条消息过程中,有超过一条以上的消息到大队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题
2.3.1.3 STREAN类型消息队列的XREAN命令特点
STREAN类型消息队列的XREAN命令特点
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
2.3.2 Stream消费者组模式
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备一下特点:
-
消息分流
队列中的消息会分流给组内不同消费者,而不是重复消费,从而加快消息处理速度。从一定程度上可以避免消息堆积问题
-
消息标示
消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
-
消息确认
消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完后需要通过XACK来确认消息,标记消息已处理,才会从pending-list移除。这样完美解决消息丢失问题
2.3.2.1 创建消费者组
创建消费者组
XGROUP create key groupName ID [MKSTREAM]
-
key:
队列名称
-
groupName:
消费者组的名称
-
ID:
这个组监听消息的时候,从哪开始监听
0:代表从第一个消息开始
$:代表从最新的消息开始
-
MKSTREAM:队列不存在时自动创建队列
其他常见命令:
#删除指定的消费者组
XGROUP DESTORY key groupName
# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername
#删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupnameconsumername
一般情况下不需要自己添加消费者,当我们从组中指定一个消费者并监听消息的时候,如果发现消费者不存在,会自动帮我们创建出来
2.3.2.2 读取消息
怎么监听消息?
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
-
group:
消费组名称
-
consumer:
消费者名称,如果消费者不存在,会自动创建一个消费者
-
count:
本次查询的最大数量
-
BLOCK milliseconds:
当没有消息时,是否堵塞、堵塞时长。
BLOCK 如果不给参数就是不堵塞,如果有消息就直接返回,没有消息就返回为空;给参数是堵塞,有消息就返回,没消息就堵塞;
milliseconds的就是等待的时长,若为0,就是一直等待
-
NOACK:
参数代表不用消费者确认,如果不给这个参数代表需要消费者确认,所以一般我们不给这个参数
-
STREAMS KEY [KEY …]:
要从哪个队列读取消息,key就是队列名。可同时指定对个队列
-
ID:
起始消息id
“>”:从下一个未消费的消息开始
其他:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
正常情况下,我们都应该给大于号,读那些未消费的消息,如果出现了异常情况,再去pending-list中读已消费但未处理的消息
测试一下
我们上面读了好几条消息,但是从来没有确认过
一定要一个消息,确认一条消息
2.3.2.3 确认消息
怎么确认消息?
XACK key group ID [ID ...]
-
key
队列名称
-
group
组名称
-
ID
要确认哪一条消息,将待处理的消息变成正确处理(已经处理)的消息,这样的话就会从pending-List中移除
测试
比如我们下面确认s1队列、g1组中后面五个id对应的消息
2.3.2.4 查看pending-list
假如我们读取了一个消息,但是出现异常没有确认,此消息会放入pending-list中,那我们怎么查看一下pending-list
官网:XPENDING | Redis
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
-
key
队列名称
-
group
组名称
-
IDLE min-idle-time
空闲时间。获取消息以后,确认之前的这段时间。
比如说我们给了一个参数5000,那空闲时间超过5000ms以上的消息才进入pending-list队列
-
**start end **
起始范围。pending-list中有很多消息,这两个参数就是告诉它我们像获取的最小id和最大id是什么
-:减号代表最小的
+:加号代表最大的
-
count
我们要获取的数量
-
consumer
要获取哪个消费者的pending-list
读取pending-list中的第一条消息
就是正常读取
XREADGROUP GROUP g1 C1 COUNT 1 BLOCK 200 STREAMS S1 0
2.3.2.5 消费者组 - 消费者监听消息基本思路
使用while循环一直去获取消费者组中的消息
2.3.2.6 STREAN类型消息队列的XREANGROUP命令特点
首先具备XREAD单消费模式所有优点,并且弥补一些缺点
STREAN类型消息队列的XREAN命令特点
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
STREAN类型消息队列的XREANGROUP命令特点
-
消息可回溯
消息被消费完了,并不会从队列中删除,其他人也可以来拿(这是针对不同的组)
-
可以多消费者争抢消息,加快消费速度
-
可以堵塞读取
-
没有消息漏读的风险
消费者组里面会去标记,上一次消费到哪里,下一次再来的时候就可以从上一次标记之后读取
-
有消息确认机制,保证消息至少被消费一次
独立于JVM之外,不受JVM限制
可以在Redis中做持久化,安全性得到保证
2.4 总结
三、基于Stream消息队列实现异步秒杀
需求:
① 创建一个Stream类型的消息队列,名为stream.orders
MKSTREAM:队列不存在时自动创建队列
XGROUP CREATE stream.orders g1 0 MKSTREAM
② 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherld、userld、orderld
对于XADD中的参数*|ID :指定消息唯一id,,*代表由Redis自动生成,格式是“时间戳-递增数字”
改了两行代码
-- 1. 参数列表
-- 1.1 优惠券id
-- 需要去redis中读取库存数量,判断是否充足,其中key的前缀seckill:stock是固定的,后面的id是需要传入的
local voucherId = ARGV[1]
-- 1.2 用户id
-- 需要知道用户id,才能判断用户之前是否下过单
local userId = ARGV[2]
-- TODO 1.3 订单id
local orderId = ARGV[3]
-- 2.数据相关key
-- 2.1 库存key, lua中是用 .. 拼接字符串
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key,值是一个set集合,集合名称就是下面。内容是购买订单的用户的id,这样可以记录谁购买了谁没有购买
local orderKey = 'seckill:order:' .. voucherId
-- 3.lua脚本业务
-- 3.1 判断库存是否充足
-- redis.call('get',stockKey)得到的结果是字符串,是无法和数字比较的
if (tonumber(redis.call('get',stockKey))<=0) then
-- 3.2库存不足
return 1
end
-- 3.2判断用户是否下单
-- 借助命令SISMEMBER命令,判断一个给定的值是不是当前set集合中的一个成员,如果存在返回1,不存在返回0
if (redis.call('SISMEMBER',orderKey,userId) == 1) then
-- 3.3redis中存在,说明是重复下单
return 2
end
-- 3.4 扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.5 下单,保存用户 sadd orderKey userId
redis.call('sadd',orderKey,userId)
-- TODO 3.6 发送消息到队列当中,XADD stream.orders * k1 v1 k2 v2.....
redis.call("xadd","stream.orders","*","userId",userId,"voucherId",voucherId,"id",orderId)
return 0
改造seckill秒杀业务逻辑
java代码确实删减了很多
//TODO 获取脚本
// RedisScript是一个接口,我们使用一个实现类,泛型就是返回值的类型
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
//静态代码块做初始化
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
// 借助Spring提供的方法区resource下找
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
// 配置一下返回值类型
SECKILL_SCRIPT.setResultType(Long.class);
}
private VoucherOrderServiceImpl proxy;
@Override
public Result seckillVoucher(Long voucherId) {
// 获取用户id
Long userId = UserHolder.getUser().getId();
// TODO 获取订单id,这不操作移到上面
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本
// 我们没有传入key,所以传入一个空集合即可
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(),String.valueOf(orderId)
);
// 2.判断结果是否为0(0有购买资格,不为0没有购买资格)
int r = result.intValue();
if (r != 0) {
// 2.1 不为0没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 3.获取代理对象
proxy = (VoucherOrderServiceImpl) AopContext.currentProxy();
// 4. 返回订单id
return Result.ok(orderId);
}
③ 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
这一步直接看完整代码
//创建一个线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct//当前类初始化完毕后执行这个方法
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
String queueName = "stream.orders";
@Override
public void run() {
while (true) {
try {
// TODO 1.获取消息队列中的订单信息,XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
// 为什么返回一个List?因为count不是1的话,可能会有多条消息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
// 组是属于消费者的一部分信息,在这里统一叫做Consumer(一定是Spring包下的)
Consumer.from("g1", "c1"),
// StreamReadOptions.empty()创建一个空的,count(1)每次读取一条消息
// block(Duration.ofMillis(2000)),只能传入一个Duration参数
StreamReadOptions.empty().count(1).block(Duration.ofMillis(2000)),
// queueName消息队列名字,ReadOffset.lastConsumed()最近一个未消费消息
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
// TODO 2.判断消息获取是否成功
if(list ==null || list.isEmpty() ){
// TODO 2.1 如果获取失败,说明没有消息,继续下一次循环
// 如果获取失败,说明没有消息,继续下一次循环
continue;
}
// TODO 3.解析消息中的订单信息
// 明确知道count是1,所以直接获取0就行
MapRecord<String, Object, Object> record = list.get(0);
// 消息id
RecordId id = record.getId();
// 将Map转成VoucherOrder
Map<Object, Object> values = record.getValue();
// true表示出现错误忽略
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// TODO 3.如果获取成功,可以下单
handleVoucherOrder(voucherOrder);
// TODO 4.ACK确认,XACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",id);
} catch (Exception e) {
log.error("处理订单异常", e);
try {
Thread.sleep(20);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
// TODO 处理时消息时报异常了,消息没有被确认,会进入pending-list,我们在这里需要去pendingList取出来再去做处理
handlePendingList();
}
}
}
private void handlePendingList() {
while (true) {
try {
// TODO 1.获取pending-list中的订单信息,XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.orders 0
// 为什么返回一个List?因为count不是1的话,可能会有多条消息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
// 组是属于消费者的一部分信息,在这里统一叫做Consumer(一定是Spring包下的)
Consumer.from("c1", "g1"),
// StreamReadOptions.empty()创建一个空的,count(1)每次读取一条消息
StreamReadOptions.empty().count(1),
// queueName消息队列名字,ReadOffset.from("0"),读取pending-list第一条消息
StreamOffset.create(queueName, ReadOffset.from("0"))
);
// TODO 2.判断消息获取是否成功
if(list ==null || list.isEmpty() ){
// TODO 2.1 如果获取失败,说明pending-list没有异常消息,结束循环
// 如果获取失败,说明没有消息,继续下一次循环
continue;
}
// TODO 3.解析消息中的订单信息
// 明确知道count是1,所以直接获取0就行
MapRecord<String, Object, Object> record = list.get(0);
// 消息id
RecordId id = record.getId();
// 将Map转成VoucherOrder
Map<Object, Object> values = record.getValue();
// true表示出现错误忽略
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// TODO 3.如果获取成功,可以下单
handleVoucherOrder(voucherOrder);
// TODO 4.ACK确认,XACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",id);
} catch (Exception e) {
log.error("处理订单异常", e);
// 这个地方不用递归,直接下一次循环处理就行
}
}
}
}
/**
* 订单处理方法
* 我们在Redisson脚本中加了一个次锁了,在这里为什么还要再加一次锁呢?
* 做一个兜底方案,万一Redis出现问题,有个保证
*
* @param voucherOrder
*/
private void handleVoucherOrder(VoucherOrder voucherOrder) {
// 1.获取用户
Long userId = voucherOrder.getUserId();
// 2.创建锁对象
RLock lock = redissonClient.getLock("lock:order:" + userId);
// 3.获取锁
boolean isLock = lock.tryLock();
// 4.判断是否获取锁成功
if (!isLock) {
// 获取锁失败
log.error("不允许重复下单");
return;
}
try {
// 之前这种获取代理对象的方式是获取不到的,因为不在同一个线程下面了
// VoucherOrderServiceImpl proxy = (VoucherOrderServiceImpl) AopContext.currentProxy();
// return proxy.createVoucherOrder(voucherId);
// 在主线程获取,这个地方使用
proxy.createVoucherOrder(voucherOrder);
} finally {
// 出现异常做锁的释放
lock.unlock();
}
}
private VoucherOrderServiceImpl proxy;
@Override
public Result seckillVoucher(Long voucherId) {
// 获取用户id
Long userId = UserHolder.getUser().getId();
// 获取订单id,这不操作移到上面
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本
// 我们没有传入key,所以传入一个空集合即可
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
// 2.判断结果是否为0(0有购买资格,不为0没有购买资格)
int r = result.intValue();
if (r != 0) {
// 2.1 不为0没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 3.获取代理对象
proxy = (VoucherOrderServiceImpl) AopContext.currentProxy();
// 4. 返回订单id
return Result.ok(orderId);
}
// 如果在方法上添加synchronized,说明同步锁是this,当前对象
// 不建议 把synchronized放在方法上,锁住此对象后,不管任何一个用户来了,都是这把锁,也就意味着整个方法被串行化了
// 所谓“一人一单”,只需要对同一个用户加锁即可,如果不是同一个用户,无需加锁
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
// 新增一人一单的判断
Long userId = voucherOrder.getUserId();
// user.getId().toString()转换成字符串也无法保证线程安全,因为每次的String都不一样
// 我们可以加一个intern,是一个字符串对象规范表示,回去字符串常量池中找一找和此字符串的值一样的字符串地址并返回
// 查询订单
int count = query().eq("user_id", userId)
.eq("voucher_id", voucherOrder.getVoucherId()).count();
// 判断是否存在
if (count > 0) {
// 用户至少下过一单,不能再下了
log.error("用户至少下过一单,不能再下了");
return;
}
// 说明没买过,继续执行代码
// 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1") //set stock = stock-1
.eq("voucher_id", voucherOrder.getVoucherId()) //where voucher_id= voucherId
.gt("stock", 0)//where stock>0
.update();
if (!success) {
log.error("扣减失败,可能库存不足");
return;
}
// 创建订单
save(voucherOrder);
}
}