目录
利用redis实现消息队列(基于list,点对点模型)——lpush存放队列(lpush 队列名 队列内容(可一次存放多个内容,用空格隔开)) brpop取队列(brpop 队列名 等待时间单位秒(一次取一个))
基于pubsub实现消息队列(发布订阅模型)
基于stream的消息队列
方法一:
存放消息
读取消息
编辑 案例
特点:
方法二:消费者组
创建
读取
java实现思路
特点
三种方式的对比:
案例,更改之前的案例
基于基于秒杀-----分布式锁----lua脚本_xzm_的博客-CSDN博客改进
利用redis实现消息队列(基于list,点对点模型)——lpush存放队列(lpush 队列名 队列内容(可一次存放多个内容,用空格隔开)) brpop取队列(brpop 队列名 等待时间单位秒(一次取一个))
优缺点:
基于pubsub实现消息队列(发布订阅模型)
优缺点:
基于stream的消息队列
方法一:
存放消息
读取消息
案例
特点:
方法二:消费者组
创建
读取
java实现思路
特点
三种方式的对比:
个人感觉:mq>stream>list>pubsub
案例,更改之前的案例
创建消息队列
修改Lua秒杀脚本
---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by Lenovo.
--- DateTime: 2023/5/30 16:55
---
-- 1.参数列表
-- 1.1 . 优惠卷id
local voucherId= ARGV[1]
-- 1.2. 用户id
local userId = ARGV[2]
-- 1.3 订单id
local orderId = ARGV[3]
-- 2.数据key
--2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
--2.2 订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3. 脚本业务
-- 3.1. 判断库存是否充足 get stockKey
if (tonumber(redis.call('get',stockKey)) <= 0 ) then
-- 3.2. 库存不足,返回1
return 1
end
-- 3.2. 判断用户是否下单 SISMEMBER orderKey userId
if (redis.call('SISMEMBER',orderKey,userId) == 1) then
-- 3.3. 存在,说明是重复下单,返回2
return 2
end
-- 3.4. 扣库存 incrby stockKey -1
redis.call('incrby',stockKey, -1)
-- 3.5. 下单(保存用户) sadd orderKey userId
redis.call('sadd',orderKey,userId)
-- 3.6 发送消息到队列中,xadd stream.orders * k1 v1 k2 v2 ...
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
return 0
修改java代码实现发送消息
/**
* 基于stream的实现秒杀
* @param voucherId
* @return
*/
public Result seckillVoucher(Long voucherId) {
//获取用户
Long userId = UserHolder.getUser().getId();
//生成订单id
long orderId = redisIdWorker.nextId("order");
//执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,//脚本文件
Collections.emptyList(),//key的集合
voucherId.toString(), userId.toString() ,String.valueOf(orderId)//三个ARGV
);
//将返回值转换为int
int i = result.intValue();
//判断结果是否为零
if (i != 0 ){
//不为零,无法购买 ---1为库存不足,2为用户已经下单
return Result.fail(i == 1 ? "库存不足" : "不能重复下单");
}
//可以购买
//获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
//返回订单id
return Result.ok(orderId);
}
实现消息的消费
//创建线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable{
String queueName="stream.orders";
@Override
public void run() {
while (true){
try {
//1.获取消息队列中的订单消息,xreadgroup group g1 c1 count 1 block 2000 streams streams.order >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
//2.判断消息是否获取成功
if (list ==null || list.isEmpty()){
//获取失败,进行下一次循环
continue;
}
//3.解析消息中的订单信息
MapRecord<String, Object, Object> entries = list.get(0);
Map<Object, Object> value = entries.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//2.创建订单
handleVoucherOrder(voucherOrder);
//ack确认 ack stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",entries.getId());
} catch (Exception e) {
log.error("处理订单异常",e);
handlePendingList();
}
}
}
private void handlePendingList(){
while (true){
try {
//1.获取pending-list中的订单消息,xreadgroup group g1 c1 count 1 streams streams.order >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
//2.判断消息是否获取成功
if (list ==null || list.isEmpty()){
//获取失败,退出循环
break;
}
//3.解析消息中的订单信息
MapRecord<String, Object, Object> entries = list.get(0);
Map<Object, Object> value = entries.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//2.创建订单
handleVoucherOrder(voucherOrder);
//ack确认 ack stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",entries.getId());
} catch (Exception e) {
log.error("处理pending-list订单异常",e);
try {
Thread.sleep(20);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
}