在jvm以外的服务,不受jvm内存的限制
不仅仅做数据的存储,还保证了数据的安全,持久化
1.基于List结构模拟消息队列
优点:
利用Redis存储,不受JVM内存限制
基于Redis的持久化机制,数据安全性有保证
可以满足消息的有序性
缺点:
无法避免消息丢失
只支持单消费者
2.基于PubSub的消息队列
优点:
采用发布订阅模型,支持多生产、多消费
缺点:
不支持数据持久化
无法避免消息丢失
消息堆积有上限,超出时数据丢失
3.基于Stream的消息队列
STREAM类型消息队列的XREAD命令特点:
消息可回溯
一个消息可以被多个消费者读取
可以阻塞读取
有消息漏读的风险
4.基于Stream的消息队列——消费者组
命令特点:
1.消息可回溯
2.可以多消费者争抢消息,加快消费速度
3.可以阻塞读取
4没有消息漏读的风险
5.有消息确认机制,保证消息至少被消费一次
5.基于Redis的Stream结构作为消息队列,实现异步下单秒杀
@Slf4j
@SuppressWarnings("all")
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorked redisIdWorked;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedissonClient redissonClient;
//创建线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();
@PostConstruct//spring初始化完毕之后执行该方法
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
//创建线程任务
private class VoucherOrderHandler implements Runnable{
@Override
public void run() {
while (true) {
try {
//1.获取消息队列中的订单信息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
//2.判断订单是否为空
if (list==null || list.isEmpty()){
//如果为null,说明没有消息,继续下一次循环
continue;
}
//解析数据
MapRecord<String, Object, Object> entries = list.get(0);
Map<Object, Object> value = entries.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//3.创建订单
handlerVoucherOreder(voucherOrder);
//4.确认消息
stringRedisTemplate.opsForStream().acknowledge("stream.orders","g1",entries.getId());
} catch (Exception e) {
log.error("处理订单异常信息", e);
//处理异常
handlePendingList();
}
}
}
private void handlePendingList() {
while (true) {
try {
//1.获取pending-list中的订单信息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create("stream.orders", ReadOffset.from("0"))
);
//2.没有异常消息
if (list==null || list.isEmpty()){
break;
}
//解析数据
MapRecord<String, Object, Object> entries = list.get(0);
Map<Object, Object> value = entries.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//3.创建订单
handlerVoucherOreder(voucherOrder);
//4.确认消息
stringRedisTemplate.opsForStream().acknowledge("stream.orders","g1",entries.getId());
} catch (Exception e) {
log.error("处理订单异常信息", e);
try {
Thread.sleep(20);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
}
}
private void handlerVoucherOreder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
//创建锁对象
//SimpleRedisLock lock = new SimpleRedisLock("order:"+id,stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:order"+userId);
//尝试获取锁
boolean flag = lock.tryLock();
if(!flag){
//获取锁失败
log.error("不允许重复下单");
return;
}
try {
//如果这个类本身调用是不具备管理事务的,如果使用Spring管理可以控制事务的一致性
//获取一个spring的代理对象 是基于ThreadLocal实现的 所以在子线程中获取不到
//IVoucherOrderService proxy =(IVoucherOrderService) AopContext.currentProxy();
//利用spring代理对象确保事务的一致性
proxy.createVoucherOrder(voucherOrder);
} finally {
//释放锁
lock.unlock();
}
}
//加载lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
IVoucherOrderService proxy;
@Override
public Result seckillVoucher(Long voucherId) {
//1.执行lua脚本
//获取用户id
Long userId = UserHolder.getUser().getId();
//获得订单id
long orderId = redisIdWorked.nextId("order");
Long luaRes= stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(),String.valueOf(orderId)
);
//2.判断结果是否为0
//2.1库存不足,返回1
if(luaRes.intValue()!=0){
return Result.fail(luaRes.intValue()==1 ?"库存不足":"一人只能下一单");
}
proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(orderId);
}
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder){
//5.保证一人一单
//5.1用户id
Long userId = voucherOrder.getUserId();
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
if(count>0){
log.error("该用户已经下单");
return ;
}
//6.扣减库存
seckillVoucherService.update().setSql("stock=stock-1")
.eq("voucher_id",voucherOrder.getVoucherId()).gt("stock",0)//stock>0
.update();
this.save(voucherOrder);
}
}