本文是上一篇文章的后续,上一篇文章链接 马点评Redis实战(短信登录;商户查询缓存)
锁
一、优惠卷秒杀
id是一个订单必备的属性,而订单的id属性是必须唯一的,首先我们会想到使用数据库主键id,并设置为自增。这样似乎就能满足唯一性。
但是,这样会存在一些问题:
- id的规律太过明显,因为id自增每次都+1,这样的id很容易被人猜出来一些信息,不安全。
- 受单表数据量限制,数据库的一张表不能无限的存储数据,那么就有人说了,我们可以用多张表存储,但是问题又来了,用多张表存储就会存在id重复的问题。
1.全局id生成器
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足以下特性:
- 唯一性,订单id一定是不重复唯一的
- 高可用,任何时候有服务调用整个id生成器都要成功,否则影响业务
- 高性能,生成id的速度一定要快,否则会拖慢调用它的业务,导致整个业务变慢
- 递增性,订单一般作为主键存在数据库,所以有递增性更便于数据库构建索引
- 安全性,不能规律太明显让人轻松的猜出订单的信息
这些特性是不是让你想起来一个数据网格,没错就是Redis,redis的集群和主从方案可以解决高可用,并且redis非常高效满足高性能,当然为了增加id的安全性,我们要手动拼接一些信息。
下面是全局id生成器的代码:
使用位运算和或运算来拼接id,非常巧妙,可以看看视频理解redis实现全局唯一ID
我们使用了冒号:来分割redis的key,这样可以让我们更清晰的看到redis中缓存key的结构
package com.hmdp.utils;
/**
* @author Watching
* * @date 2023/4/11
* * Describe:
*/
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
/**
* 秒杀券 订单唯一ID:时间戳 + 序列号
*/
@Component
public class RedisIdWorker {
private static final long BEGIN_TIMESTAMP = 1640995200L;//开始时间戳
private static final long COUNT_BITS = 32;//序列号位数
@Resource
StringRedisTemplate stringRedisTemplate;
public long nextId(String keyPrefix) {
//1.时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
//2.序列号
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
//increment会自动创建不存在的key,所以不存在空指针异常
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);//加一个date就可以保证每天都可以有2^32个序列
//拼接生成
return timestamp << COUNT_BITS | count;//这里使用位运算和或运算来拼接id,非常巧妙,可以看看视频理解
}
public static void main(String[] args) {
LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0, 0);
long second = time.toEpochSecond(ZoneOffset.UTC);
System.out.println("second= " + second);
}
}
我们用单元测试类来测试一下这段代码
使用CountDownLunch来测试异步任务的耗时
@Test
@Test
public void test4() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(300);//用于测试多线程耗时,使用300个标识
Runnable runnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
long id = redisIdWorker.nextId("order");
System.out.println("id:" + id);
}
countDownLatch.countDown();//标识符减1
}
};
long start = System.currentTimeMillis();
for (int j = 0; j < 300; j++) {
es.submit(runnable);
}
countDownLatch.await();//等待标识符减为0
long end = System.currentTimeMillis();
System.out.println(end - start);
}
这是部分测试结果,生成30000个唯一id花费了1000多毫秒(还包括打印的时间),效率算高了。
当然除了自定义的全局ID生成器,还有一些现成的全局唯一ID生成策略
- uuid
- redis自增
- snowflake,雪花算法,很出名
- 数据库自增,这里是指专门拿一张表来存储唯一id(我也不是很懂这里
2.实现优惠卷秒杀下单
2.1 添加优惠卷
使用postman或者其它工具访问http://localhost:8081/voucher/seckill接口,添加秒杀优惠卷,记得先启动项目,具体的优惠卷和秒杀优惠卷的表结构自行查看
注意,mysql的TimeStamp类型目前只能支持到2038.1.1,超过这个时间就会报错。
添加成功之后就可以在页面看到这个优惠卷了
2.2 优惠卷秒杀
下面的代码没有特别需要注意的,只需要跟着上面的流程图编写。
唯一需要注意的点就是并发的安全性问题,后面可能会使用redisson客户端解决,或者其它方法加锁。
@RestController
@RequestMapping("/voucher-order")
public class VoucherOrderController {
@Autowired
private IVoucherOrderService iVoucherOrderService;
@PostMapping("seckill/{id}")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {
return iVoucherOrderService.seckillVoucher(voucherId);
}
}
/**
* 优惠卷秒杀
*
* @param voucherId
* @return
*/
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠卷
SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
if (voucher == null) {
return Result.fail("优惠卷不存在");
}
//2.判断秒杀是否开始,结束
LocalDateTime beginTime = voucher.getBeginTime();
LocalDateTime endTime = voucher.getEndTime();
if (LocalDateTime.now().isBefore(beginTime)) {
return Result.fail("活动尚未开始");
}
if (LocalDateTime.now().isAfter(endTime)) {
return Result.fail("活动已经结束");
}
//3.判断库存是否充足
Integer stock = voucher.getStock();
if (stock < 1) {
return Result.fail("库存不足");
}
//4.扣减库存
LambdaUpdateWrapper<SeckillVoucher> wrapper = new LambdaUpdateWrapper<>();
wrapper.eq(SeckillVoucher::getVoucherId, voucherId).set(SeckillVoucher::getStock, voucher.getStock() - 1);
iSeckillVoucherService.update(voucher, wrapper);
//5.创建订单
//5.1订单id
long id = redisIdWorker.nextId("order");
//5.2用户id
Long userId = UserHolder.getUser().getId();
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(userId);
voucherOrder.setId(id);
int insert = baseMapper.insert(voucherOrder);
//6.返回订单id
return Result.ok(id);
}
2.3 高并发情况下会出现的超卖问题
如下图所示,两个线程同时执行,都查询到了库存还有1份,那么两个线程此时都具有了购买资格,于是他们同时对库存减1。则库存执行了两次减1操作。这就是超卖。
上面的情况肯定是不能发生的,那么我们如何解决呢?
2.4 加锁解决超卖问题
超卖问题归根到底就是并发太高,导致公用资源被同时获取。所以我们可以加锁来解决。
有两种加锁方式:
- 悲观锁,悲观锁会阻塞,影响业务执行效率,不适用于秒杀业务
- 乐观锁
2.5 乐观锁解决超卖问题
乐观锁解决超卖问题有两种方式:
- 版本号法
线程1和线程2同时获取到了库存(库存为1)和版本号(版本号为1),他们都获得了扣减库存的资格。此时线程1先对库存做了扣减(库存减1为0),版本号加1为2。然后线程2也对库存进行扣减,但是它做扣减之前对比版本号,发现版本号和自己之前查出来的不同,说明库存已经被修改,所以线程2停止对库存的修改并返回错误信息。这样就不会发生数据不安全问题了。
- CAS法(compare and set)
从上面的业务流程来看,我们发现库存stock字段本身就可以作为一个版本号,所以我们可以对上面的业务流程就行优化。
线程1和线程2都查出库存为1,它们两个线程都获得了扣减库存的资格,首先线程1先扣减了库存,然后线程2也准备扣减库存,线程2在扣减库存之前对比当前库存和之前自己查出来的库存是否有变化,如果有则拒绝执行这条语句。
使用乐观锁的注意事项在注释中表明了。
=====存在一个疑问,为什么在我TODO那段代码在高并发的情况下会出现库存更新失败的问题?不是超卖,是库存更新失败,比如100个线程都执行成功了,但是数据库数据只减少了20个
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
//1.查询优惠卷
SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
if (voucher == null) {
return Result.fail("优惠卷不存在");
}
//2.判断秒杀是否开始,结束
LocalDateTime beginTime = voucher.getBeginTime();
LocalDateTime endTime = voucher.getEndTime();
if (LocalDateTime.now().isBefore(beginTime)) {
return Result.fail("活动尚未开始");
}
if (LocalDateTime.now().isAfter(endTime)) {
return Result.fail("活动已经结束");
}
//3.判断库存是否充足
Integer stock = voucher.getStock();
if (stock < 1) {
return Result.fail("库存不足");
}
//4.扣减库存
//TODO 为什么这样更新库存会在高并发的情况下出现库存扣减失败的问题
// LambdaUpdateWrapper<SeckillVoucher> wrapper = new LambdaUpdateWrapper<>();
// wrapper.set(SeckillVoucher::getStock, stock - 1).eq(SeckillVoucher::getVoucherId, voucherId);
// boolean flag = iSeckillVoucherService.update(wrapper);
boolean flag = iSeckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
//.eq("stock",stock)这里直接和查出来的库存对比,会导致错误率过高。
// 100个库存,同时有100个线程都获取库存成功获得了扣减库存的资格,但是只有一个会真正扣减库存成功,其它99个都会失败,此时库存从100减到了99,明显是可以继续扣减的,这样不符合业务逻辑
.gt("stock",0)//这里设置为只要库存大于0就可以继续扣减,解决了上面的问题
.update();
if (!flag) {
return Result.fail("库存不足");
}
//5.创建订单
//5.1订单id
long id = redisIdWorker.nextId("order");
//5.2用户id
Long userId = UserHolder.getUser().getId();
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(userId);
voucherOrder.setId(id);
int insert = baseMapper.insert(voucherOrder);
//6.返回订单id
return Result.ok(id);
}
2.6 悲观锁和乐观锁的对比
2.7 一人一单实现
这一块涉及的知识点很多,包括spring事务失效、加锁范围和事务的关系、锁的对象要使用选择,这里只简单说一下,建议看视频理解更佳。
单节点 一人一单实现
- spring事务失效
如果一个类中的非事务方法调用了同一个类中的事务方法,那么这个事务方法上的事务注解也会失效。
这是因为内部调用不会经过 Spring 代理(spring就是通过代理对象完成事务功能的。)而是默认使用this.xxx()调用,因此无法触发事务。
所以我们需要获取CreateVoucherOrder方法的代理对象进行调用。
并且我们需要引入aspectj依赖,并在启动类上添加注解暴露代理对象 @EnableAspectJAutoProxy(exposeProxy = true)//开启代理对象暴露 - 加锁的范围影响数据库事务
比如在下面这个方法,方法上面加了@Transactional注解开启了事务,同时方法内部加了锁,那么这个事务是会出问题的,因为当在java中对数据库执行了操作之后,数据库事务会在test方法结束后提交,在flag标记处并未提交,但此时锁已经释放了,那么其它线程进入又会操作数据库,导致在事务提交之前数据库数据又被修改了,这样会对数据安全产生影响。
@Transactional
public int test(){
synchronized (this){
//xxxxxx操作数据库增删改
//xxxxxx操作数据库增删改
}
//flag标记
return 0;
}
- 锁的对象选择
如果使用userId.toString()方法作为锁,这样是锁不住的,因为每次调用这个toString方法都会产生一个新的对象,这样的话锁对象每次都不一样,就不谈能锁住的事了。所以我们为了保证同一个用户锁对象是同一个,在toString方法后面调用intern方法,intern方法是在常量池中获取一个唯一的字符串对象,所以这就能保证锁对象的唯一性了。(字符串常量池自行了解)
synchronized (userId.toString().intern()){
//xxxxxx
//xxxxxx
}
一人一单实现代码
/**
* 优惠卷秒杀
*
* @param voucherId
* @return
*/
public static int a = 0;
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠卷
SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
if (voucher == null) {
return Result.fail("优惠卷不存在");
}
//2.判断秒杀是否开始,结束
LocalDateTime beginTime = voucher.getBeginTime();
LocalDateTime endTime = voucher.getEndTime();
if (LocalDateTime.now().isBefore(beginTime)) {
return Result.fail("活动尚未开始");
}
if (LocalDateTime.now().isAfter(endTime)) {
return Result.fail("活动已经结束");
}
//3.判断库存是否充足
Integer stock = voucher.getStock();
if (stock < 1) {
return Result.fail("库存不足");
}
Long userId = UserHolder.getUser().getId();
//必须使用intern方法,否则每次toString都会创建一个新的对象,就算是同一个userID也无法锁住,而intern方法会在常量池中去获取字符串,这个字符串是唯一的。
synchronized (userId.toString().intern()) {
/**
* 如果一个类中的非事务方法调用了同一个类中的事务方法,那么这个事务方法上的事务注解也会失效。
* 这是因为内部调用不会经过 Spring 代理,而是默认使用this.xxx(),因此无法触发事务。
* 所以我们需要获取CreateVoucherOrder方法的代理对象进行调用,spring就是通过代理对象完成事务功能的。
* 并且我们需要引入aspectj依赖,并在启动类上暴露代理对象 @EnableAspectJAutoProxy(exposeProxy = true)//开启代理对象暴露
*/
//获取代理对象(事务)
IVoucherOrderService iVoucherOrderService = (IVoucherOrderService) AopContext.currentProxy();
return iVoucherOrderService.CreateVoucherOrder(voucherId);
}
}
/**
* 将一人一单,扣减库存,创建订单抽取出来,并使用Synchronized锁住,保证一人一单,具体的看视频吧,这块难得说清,配合视频可以理解的清除一点
*
* @param voucherId
* @return
*/
@Transactional
public Result CreateVoucherOrder(Long voucherId) {
//4.一人一单
Long userId = UserHolder.getUser().getId();
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
return Result.fail("同一用户仅限购买一次哦~");
}
//5.扣减库存
//TODO 为什么这样更新库存会在高并发的情况下出现库存扣减失败的问题
// LambdaUpdateWrapper<SeckillVoucher> wrapper = new LambdaUpdateWrapper<>();
// wrapper.set(SeckillVoucher::getStock, stock - 1).eq(SeckillVoucher::getVoucherId, voucherId);
// boolean flag = iSeckillVoucherService.update(wrapper);
boolean flag = iSeckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
//.eq("stock",stock)这里直接和查出来的库存对比,会导致错误率过高。
// 100个库存,同时有100个线程都获取库存成功获得了扣减库存的资格,但是只有一个会真正扣减库存成功,其它99个都会失败,此时库存从100减到了99,明显是可以继续扣减的,这样不符合业务逻辑
.gt("stock", 0)//这里设置为只要库存大于0就可以继续扣减,解决了上面的问题
.update();
if (!flag) {
return Result.fail("库存不足");
}
//6.创建订单
//6.1订单id
long id = redisIdWorker.nextId("order");
//6.2用户id
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(userId);
voucherOrder.setId(id);
int insert = baseMapper.insert(voucherOrder);
//7.返回订单id
return Result.ok(id);
}
在上面的代码中,我们使用的是Synchronized对一人一单操作进行加锁,但是synchronized是基于jvm进行加锁操作的,所以在集群环境或者分布式项目中Synchronized的锁就会失效。
效果演示
所以我们不能使用jvm的锁,要自己实现一个在集群环境下的分布式锁,后文会继续讲解。
二、分布式锁
分布式锁就是满足集群和分布式环境下多进程可见且互斥的锁。
mysql、redis、zookeeper实现分布式锁的比较
自己使用redis实现分布式锁坑较多,这里就不详细描述了,建议看视频使用redis ifAbsent实现分布式锁
三、秒杀优化
1.异步秒杀思路
原本的业务逻辑图:
通过nginx将请求分发到tomcat服务器,然后再tomcat服务器中分别串行的执行每一个请求,而每个请求的耗时=每个模块的耗时之和。
类似于饭店,客人下单,服务员接单,然后根据客人信息去后厨做菜,做好了之后再去接待下一个客人
优化后的逻辑图:
将判断秒杀库存和一人一单拿出来到redis中做,并将结果存储为一个订单,将订单存在消息队列中,将订单id返回给用户。tomcat再到消息队列中取任务执行。
类似于饭店,客人在前台服务员处下单,获取了一个小票,前台服务员会存储这些小票的信息,后厨会根据前台存储的信息来做菜;这样能提高效率。
1.2 如何在redis中实现下单资格判断
首先在redis中需要两个数据结构存储优惠卷的id和库存,对这款优惠卷下单的用户的id
String
Set
然后根据下面的流程图进行判断
库存不足返回false,用户已经下单返回false,只有当库存充足且用户未下单同时满足才执行后续流程。
实现
需求:
1 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
//添加优惠卷库存到redis中
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY,voucher.getStock().toString());
}
2 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
-- 1 参数列表
-- 1.1 优惠卷id
local voucherId = ARGV[1];
-- 1.2 用户id
local userId = ARGV[2];
-- 2 数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3 业务脚本
-- 3.1
if(tonumber(redis.call('get',stockKey)) <= 0) then
-- 3.2 库存不足 返回1
return 1
end
-- 3.3 判断用户是否已经下过单 sismember orderKey userId
if(redis.call('sismember',orderKey,userId) == 1) then
-- 3.4 存在,说明已经下过单,返回2
return 2
end
-- 3.5 扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.6 下单 保存用户 sadd orderKey userId
redis.call('sadd',orderKey,userId)
return 0
2.1 执行lua脚本,如果执行成功则将订单任务存入阻塞队列,注意,这里要获取一个事务代理对象,用于在子线程中访问数据库时事务不会失效(使用proxy代理对象访问)
//静态代码块加载lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
private IVoucherOrderService proxy;
/**
* 优惠卷秒杀
*
* @param voucherId
* @return
*/
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
//1.执行lua脚本,判断是否有秒杀资格
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString());
if (result == null) {
return Result.fail("lua脚本返回值为null");
}
//2.判断返回值是否为0
//2.1 不为0,代表没有购买资格
if (result != 0) {
return Result.fail(result == 1 ? "库存不足" : "重复下单");
}
//2.2 为0,代表有购买资格,将用户id,优惠卷id等下单信息保存在阻塞队列
//将用户id,优惠卷id等下单信息保存在阻塞队列
long orderId = redisIdWorker.nextId("order");
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(userId);
voucherOrder.setId(orderId);
orderTask.add(voucherOrder);
//获取事务代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
//在这里就代表用户已经下单成功了,就可以返回用户订单号,然后把阻塞队列orderTask中的订单信息异步处理
return Result.ok(orderId);
}
2.2 异步处理阻塞队列中的订单信息
//创建阻塞队列用于存放订单信息
private static BlockingQueue<VoucherOrder> orderTask = new ArrayBlockingQueue<>(1024 * 1024);
//创建线程池,用于提交订单任务
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
//在类初始化完成之后就要一直从阻塞队列中取,不需要等到有数据了再取
@PostConstruct//该注解会让此方法在类初始化完成之后(依赖注入完成之后)立刻执行
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
//创建内部类实现Runnable接口
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
VoucherOrder order = orderTask.take();
handleVouchOrder(order);
} catch (Exception e) {
log.error("处理订单出现异常", e);
}
}
}
}
//处理订单
private void handleVouchOrder(VoucherOrder order) {
//创建锁对象
Long userId = order.getUserId();//这里不用UserHolder获取userId是因为这里是线程池新开的线程,没办法在这里获取到ThreadLocal中的信息
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean b = lock.tryLock();
if (!b) {
log.error("不允许重复下单");
return;
}
proxy.CreateVoucherOrderByOrder(order);
}
/**
* 异步提交订单需要用到的方法,直接传入订单信息,不需要包装
* @param voucherOrder
*/
@Transactional
@Override
public void CreateVoucherOrderByOrder(VoucherOrder voucherOrder) {
//4.一人一单
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
log.error("重复下单");
return;
}
//5.扣减库存
//TODO 为什么这样更新库存会在并发的情况下出现库存扣减失败的问题
// LambdaUpdateWrapper<SeckillVoucher> wrapper = new LambdaUpdateWrapper<>();
// wrapper.set(SeckillVoucher::getStock, stock - 1).eq(SeckillVoucher::getVoucherId, voucherId);
// boolean flag = iSeckillVoucherService.update(wrapper);
boolean flag = iSeckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
//.eq("stock",stock)这里直接和查出来的库存对比,会导致错误率过高。
// 100个库存,同时有100个线程都获取库存成功获得了扣减库存的资格,但是只有一个会真正扣减库存成功,其它99个都会失败,此时库存从100减到了99,明显是可以继续扣减的,这样不符合业务逻辑
.gt("stock", 0)//这里设置为只要库存大于0就可以继续扣减,解决了上面的问题
.update();
if (!flag) {
log.error("库存不足");
return;
}
//6.创建订单
//6.1订单id
//6.2用户id
int insert = baseMapper.insert(voucherOrder);
}
阻塞队列是存在jvm内存中的,存在内存限制问题,而且如果服务出现问题,导致阻塞队列中的数据消失,这样就无法从阻塞队列中获取到用户的订单信息了。
2 使用消息队列替代阻塞队列
2.1 基于list结构模拟消息队列
2.2 pubsub
2.3 stream
2.3.1 基于Stream的消息队列-单消费模式
2.3.2 基于Stream的消息队列-消费者组