1. 目前还存在的问题
- 设置的阻塞队列可能会超出最大长度
- 系统重启会导致阻塞队列中的信息消失,可能会出现问题
2. 消息队列
- 消息队列 (Message Queue)。
- 字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色消息队列:存储和管理消息,也被称为消息代理 (Message Broker)
- 生产者
- 发送消息到消息队列
- 消费者
- 从消息队列获取消息并处理消息
- Redis提供了三种不同的方式来实现消息队列
- list结构:基于List结构模拟消息队列
- PubSub:基本的点对点消息队列
- Stream:比较完善的消息队列模型
2.1 基于List结构模拟消息队列
优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失(消息拿出来后没来得及处理就挂了,导致这条消息没有处理)
- 只支持单消费者(一个人获取了这条消息,另一个人就不能获取了)
2.2 PubSub基本的点对点消息队列
优点
- 采用发布订阅模型,支持多生产,多消费
缺点
- 不支持数据持久化(服务一旦关闭就消失了)
- 无法避免消息丢失(没人接收这条消息就丢了)
- 消息堆积有上限,超出时消息丢失(缓存在客户端,有上限)
2.3 基于Stream的消息队列(重点)
Redis 中的 Stream 是一种在 Redis 5.0 版本引入的新数据类型,它专为实现高性能、高可靠性的消息队列和流式数据处理而设计。Stream 结构提供了一种有序、可持久化、可重复消费且支持多路写入与多消费者并行消费的消息存储模型。
写入命令XADD
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
- key:消息队列的名称
- NOMKSTREAM:如果队列不存在,是否自动创建队列。默认是自动创建
- MAXLEN:最大长度,默认不设置上限
- *|ID:消息的唯一id,*代表由Redis自动生成。
- field value [field value …]:称为一个Entry,格式是多个key-value键值对
读取命令XREAD
特点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
2.4 基于Stream的消息队列-消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备以下特点:
- 消息分流
- 队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
- 消息标识
- 消费者者会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后读取消息,确保每一个消息都会被消费。
- 消息确认
- 消费者获取消息后没消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除
2.4.1 创建消费者组
2.4.2 从消费者组读取消息
2.4.3 确认消息命令
将pending-list中的某个消息,标记为已处理。
XACK KEY GROUP ID
小结
- 消息可回溯
- 可以多消费者争抢消息,加快消息速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消息一次
2.4.4 XPENDING
XPENDING 是 Redis 5.0 引入的一项新命令,用于管理 Redis Streams 中的待处理消息。Redis Streams 是一个用于处理实时数据流的数据结构,而 XPENDING 则允许你查看、管理待处理消息的信息。
XPENDING 命令的一般语法如下:
XPENDING stream_name group_name [start end count] [consumer]
其中:
stream_name 是待处理消息所在的流的名称。
group_name 是消费者组的名称。
start 和 end 是两个可选参数,用于指定待处理消息的范围。
count 是一个可选参数,用于指定要返回的待处理消息的数量。
consumer 是一个可选参数,用于指定特定的消费者。
2.5 Java的伪代码
消费者监听消息的基本思路
while(true){
//尝试监听队列,使用阻塞模式,最长等待2000ms
Obejct msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAM s1 >");
if(msg == null){
// null说明没有消息,继续下一次
continue;
}
try{
//处理消息,完成后需要ACK
handleMessage(msg)
}catch(Exception e){
while(true){
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAM s1 0");
if(msg == null){
//null说明没有异常消息,所有消息都已确认,结束循环
break;
}
try{
//处理消息,完成后需要ACK
handleMessage(msg)
}catch(Exception e){
//再次出现异常,记录日志,继续循环
continue;
}
}
}
}
2.6 对比
3.代码优化目标
3.1 修改Lua脚本
-- 1.参数列表
-- 1.1. 优惠券id
local voucherId = ARGV[1]
-- 1.2. 用户id
local userId = ARGV[2]
-- 1.3. 订单id
local orderId = ARGV[3]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1.判断库存是否充足
if(tonumber(redis.call('get',stockKey))<=0)then
-- 3.2.库存不足,返回1
return 1
end
-- 3.3.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call("sismember",orderKey,userId) == 1) then
-- 3.4.存在,说明是重复下单
return 2
end
-- 3.5.扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.6.下单(保存用户)sadd orderKey userId
redis.call('sadd',orderKey,userId)
-- 3.7.发送消息到队列中, XADD stream.order * K1 v1 K2 v2 ...
redis.call('xadd','stream.orders','*',"userId",userId,"voucherId",voucherId,"id",orderId)
return 0
3.2 修改Java端代码
private class VoucherOrderHandler implements Runnable{
String queueName = "stream.orders";
@Override
public void run(){
while (true){
try {
// 1.获取消息队列中的订单信息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
// 2.判断消息获取是否成功
if ( list==null || list.isEmpty()){
// 2.1.如果获取失败,说明没有消息,继续下一次循环
continue;
}
// 3.解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(record.getValue(), new VoucherOrder(), true);
// 4.如果获取成功,可以下单
handleVoucherOrder(voucherOrder);
// 5.ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
}catch (Exception e){
handlePendingList();
log.error("处理订单异常:",e);
}
}
}
private void handlePendingList() {
while (true){
try {
// 1.获取pending-list队列中的订单信息,XREADGROUP GROUP g1 c1 COUNT 1 STREAMS streams.order 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
// 2.判断消息获取是否成功
if ( list==null || list.isEmpty()){
// 2.1.如果获取失败,说明没有消息,结束循环
break;
}
// 3.解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(record.getValue(), new VoucherOrder(), true);
// 4.如果获取成功,可以下单
handleVoucherOrder(voucherOrder);
// 5.ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
}catch (Exception e){
log.error("处理订单异常:",e);
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}
}
4. 总结
目前我们已经能够使用Stream消息队列来实现一个功能较为完全的秒杀功能。