Redis框架(十二):大众点评项目 阻塞队列+异步处理 实现秒杀优化

news2025/1/24 8:51:34

大众点评项目 阻塞队列+异步处理 实现秒杀优化

  • 需求:阻塞队列+异步处理 实现秒杀优化
    • 为什么使用异步处理?
    • 为什么使用阻塞队列?
    • 为什么使用Lua?
  • 业务逻辑及其实现
  • 原有逻辑代码 / 优化后逻辑代码
    • 完整优化业务代码
    • 原有优化业务代码
  • 总结

SpringCloud章节复习已经过去,新的章节Redis开始了,这个章节中将会回顾Redis实战项目 大众点评
主要依照以下几个原则

  1. 基础+实战的Demo和Coding上传到我的代码仓库
  2. 在原有基础上加入一些设计模式,stream+lamdba等新的糖
  3. 通过DeBug调试,进入组件源码去分析底层运行的规则和设计模式

代码会同步在我的gitee中去,觉得不错的同学记得一键三连求关注,感谢:
Redis优化-链接: RedisBlockQueueMethodProject

在这里插入图片描述

需求:阻塞队列+异步处理 实现秒杀优化

我们通过@PostConstruct开启线程池,一旦系统开启,直接进行处理,当有订单添加到阻塞队列,就可以异步处理响应,
首先,用户接收到 是否成功的信息;后面,数据库的操作将从子线程中得到执行

为什么使用异步处理?

请求异步处理,对用户更友好,响应更方便,可以定制流程化我们的操作;
这里 我们开启子线程处理了 数据的CRUD, 不需要得到返回, 从而使压测数据更好看

为什么使用阻塞队列?

  1. 一般的队列只能保证作为一个有限长度的缓冲区,如果超出了缓冲长度,就无法保留当前的任务了,阻塞队列通过阻塞可以保留住当前想要继续入队的任务。
  2. 阻塞队列可以保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait状态,释放cpu资源。
  3. 阻塞队列自带阻塞和唤醒功能,不需要做额外处理,无任务执行时,线程池利用阻塞队列的take方法挂起,从而维持核心线程的存活,不至于一直占用CPU资源。

为什么使用Lua?

  1. Lua脚本在Redis中是原子执行的,执行过程中间不会插入其他命令
  2. Lua脚本可以帮助开发和运维人员创造出自己定制的命令,并可以将这些命令常驻在Redis内存中,实现复用的效果
  3. Lua脚本可以将多条命令一次性打包,有效地减少网络开销

总结下,就是Redis本身的事务不符合我们的要求,在很多场景下, 原生的指令并不友好;
Redis本身提供了multi关键字用来开启事务,exec用来关闭事务。

业务逻辑及其实现

在这里插入图片描述

业务需求:

我们想要每个 秒杀商品 的用户ID和 库存数量放到Redis,
这里每次查找就可以通过Redis直接进行处理,而非DB,这样是很友好的,能极大提升吞吐量

具体实现:

我们通过@PostConstruct开启线程池,一旦系统开启,直接进行处理,当有订单添加到阻塞队列,就可以异步处理响应,
首先,用户接收到 是否成功的信息;后面,数据库的操作将从子线程中得到执行

  1. 先将商品的库存保存到Redis中去
    @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
        // 保存优惠券
        save(voucher);
        // 保存秒杀信息
        SeckillVoucher seckillVoucher = new SeckillVoucher();
        seckillVoucher.setVoucherId(voucher.getId());
        seckillVoucher.setStock(voucher.getStock());
        seckillVoucher.setBeginTime(voucher.getBeginTime());
        seckillVoucher.setEndTime(voucher.getEndTime());
        seckillVoucherService.save(seckillVoucher);
        stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY+ voucher.getId(),
                        voucher.getStock().toString() );
    }
  1. 通过Lua实现Redis事务,将原有的库存判断,打包成Lua脚本,处理更方便
-- 使用lua是为了实现redis的事务,实现可重用性
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]

-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.2.库存不足,返回1
    return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
    -- 3.3.存在,说明是重复下单,返回2
    return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

  1. 调用Lua脚本服务
    private static DefaultRedisScript<Long> SECKILL_SCRIPT;
    static{
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }
  1. 判断Lua的返回值,进一步处理 用户 服务
    @Override
    public Result seckillVoucher(Long voucherId) {

        Long userId = UserHolder.getUser().getId();
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(),
                userId.toString()
        );

        int r = result.intValue();

        if (r!=0){
            return Result.fail(r==1?"库存不足":"不能重复下单");
        }
}
  1. 开始进行异步处理
  • 阻塞队列处理
  • 封装订单
    @Override
    public Result seckillVoucher(Long voucherId) {

        //阻塞队列
        //封装订单
        VoucherOrder voucherOrder = new VoucherOrder();
        long orderId = redisIDProductor.nextId("order");
        voucherOrder.setId(orderId).setUserId(userId).setVoucherId(voucherId);
//        save(voucherOrder);


        currentProxy = (IVoucherOrderService) AopContext.currentProxy();
        //异步执行减库存
        orderTask.add(voucherOrder);

        return Result.ok(orderId);
    }
  1. (1)阻塞队列处理
    现在 执行 orderTask.add(voucherOrder);
    通过线程池ExecutorService 来提交任务,进行处理
    这里的 @PostConstruct是让Bean一初始化就执行,BlockingQueue无任务,就会阻塞,使得线程进入wait状态,释放cpu资源。
    无任务执行时,线程池利用阻塞队列的take方法挂起,从而维持核心线程的存活,不至于一直占用CPU资源。
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    private BlockingQueue<VoucherOrder> orderTask = new ArrayBlockingQueue<>(1024*1024);

    @PostConstruct
    private void init() {
        log.debug("SECKILL_ORDER_EXECUTOR: work...");
        //通过线程池`ExecutorService `来提交任务,进行处理
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    //异步处理阻塞队列中的任务
    private class VoucherOrderHandler implements Runnable{
        @Override
        public void run() {
            while(true){
                try {
                //无任务执行时,线程池利用阻塞队列的take方法挂起,从而维持核心线程的存活,不至于一直占用CPU资源。
                    VoucherOrder voucherOrder = orderTask.take();
                    handlerVoucherOrder(voucherOrder);
                } catch (Exception e) {
//                    e.printStackTrace();
                    log.error("订单异常" + e);
                }

            }
        }
    }

  1. (2)异步处理CRUD操作



    //需要在主线程中获得实际代理
    private IVoucherOrderService currentProxy;


    private void handlerVoucherOrder(VoucherOrder voucherOrder){
        log.debug("voucherOrder: work..." + voucherOrder.toString());
        Long userId = voucherOrder.getUserId();

        //这里还有个分布式锁Redission,后面实现 暂时用Sy来代替

        //intern()是去常量池中去找userId,处理锁失效,事务失效
        synchronized(userId.toString().intern()) {
            try {
                /**
                 * 这里不能IVoucherOrderService currentProxy = (IVoucherOrderService) AopContext.currentProxy();
                 * 原因 private static final ThreadLocal<Object> currentProxy = new NamedThreadLocal<>("Current AOP proxy");
                 * 子线程无法从ThreadLocal中拿到想要的数据的
                 */

                currentProxy.createVoucherOrder(voucherOrder);
            } catch (Exception e) {
                e.printStackTrace();
            }
            //调用的this.create方法,没有实际对象,是事务失效的几种情况之一,所以需要找代理对象来实现
        }

    }
    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        log.debug("createVoucherOrder: work..." + voucherOrder.toString());
        /**
         * 实现一人一单 Long userId = UserHolder.getUser().getId();
         * //由于是异步,createVoucherOrder是通过Proxy进行调用的,
         * 子线程无法从ThreadLocal中拿到想要的数据的,所以必须通过Order来获得
         */

        Long userId = voucherOrder.getUserId();
        Long voucherId = voucherOrder.getVoucherId();


        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();

        if (count > 0) {
            log.error("用户已经购买过一次");
            return ;
        }

        //验证结束,扣减库存
        boolean flag = seckillVoucherService.update()
                .setSql("stock = stock - 1")
                .eq("voucher_id", voucherId)
                .gt("stock", 0)//CAS: 通过设定判断库存数量来进行,适合更新数据使用
                .update();

        if (!flag) {
            log.error("库存不足");
            return ;
        }
        save(voucherOrder);

    }
}
  1. 执行命令操作
    在这里插入图片描述

查看缓存
在这里插入图片描述
在这里插入图片描述

查看数据库
在这里插入图片描述

原有逻辑代码 / 优化后逻辑代码

完整优化业务代码

@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIDProductor redisIDProductor;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    private BlockingQueue<VoucherOrder> orderTask = new ArrayBlockingQueue<>(1024*1024);

    private static DefaultRedisScript<Long> SECKILL_SCRIPT;
    static{
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    //需要在主线程中获得实际代理
    private IVoucherOrderService currentProxy;

    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    @PostConstruct
    private void init() {
        log.debug("SECKILL_ORDER_EXECUTOR: work...");
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    //异步处理阻塞队列中的任务
    private class VoucherOrderHandler implements Runnable{
        @Override
        public void run() {
            while(true){
                try {
                    VoucherOrder voucherOrder = orderTask.take();
                    handlerVoucherOrder(voucherOrder);
                } catch (Exception e) {
//                    e.printStackTrace();
                    log.error("订单异常" + e);
                }

            }
        }
    }

    private void handlerVoucherOrder(VoucherOrder voucherOrder){
        log.debug("voucherOrder: work..." + voucherOrder.toString());
        Long userId = voucherOrder.getUserId();

        //这里还有个分布式锁Redission,后面实现 暂时用Sy来代替

        //intern()是去常量池中去找userId,处理锁失效,事务失效
        synchronized(userId.toString().intern()) {
            try {
                /**
                 * 这里不能IVoucherOrderService currentProxy = (IVoucherOrderService) AopContext.currentProxy();
                 * 原因 private static final ThreadLocal<Object> currentProxy = new NamedThreadLocal<>("Current AOP proxy");
                 * 子线程无法从ThreadLocal中拿到想要的数据的
                 */

                currentProxy.createVoucherOrder(voucherOrder);
            } catch (Exception e) {
                e.printStackTrace();
            }
            //调用的this.create方法,没有实际对象,是事务失效的几种情况之一,所以需要找代理对象来实现
        }


    }

    @Override
    public Result seckillVoucher(Long voucherId) {

        Long userId = UserHolder.getUser().getId();
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(),
                userId.toString()
        );

        int r = result.intValue();

        if (r!=0){
            return Result.fail(r==1?"库存不足":"不能重复下单");
        }

        //阻塞队列
        //封装订单
        VoucherOrder voucherOrder = new VoucherOrder();
        long orderId = redisIDProductor.nextId("order");
        voucherOrder.setId(orderId).setUserId(userId).setVoucherId(voucherId);
//        save(voucherOrder);


        currentProxy = (IVoucherOrderService) AopContext.currentProxy();
        //异步执行减库存
        orderTask.add(voucherOrder);

        return Result.ok(orderId);
    }

    //异步开启事务
    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        log.debug("createVoucherOrder: work..." + voucherOrder.toString());
        /**
         * 实现一人一单 Long userId = UserHolder.getUser().getId();
         * //由于是异步,createVoucherOrder是通过Proxy进行调用的,
         * 子线程无法从ThreadLocal中拿到想要的数据的,所以必须通过Order来获得
         */

        Long userId = voucherOrder.getUserId();
        Long voucherId = voucherOrder.getVoucherId();


        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();

        if (count > 0) {
            log.error("用户已经购买过一次");
            return ;
        }

        //验证结束,扣减库存
        boolean flag = seckillVoucherService.update()
                .setSql("stock = stock - 1")
                .eq("voucher_id", voucherId)
                .gt("stock", 0)//CAS: 通过设定判断库存数量来进行,适合更新数据使用
                .update();

        if (!flag) {
            log.error("库存不足");
            return ;
        }
        save(voucherOrder);

    }
}

原有优化业务代码

   /**
     * 这里是通过乐观锁/悲观锁实现的一人一单,秒杀功能
     * @param voucherId
     * @return
     */
   public Result seckillVoucher1(Long voucherId) {

        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            return Result.fail("秒杀尚未开始");
        }

        if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
            return Result.fail("秒杀已经结束");
        }

        if (voucher.getStock() < 1) {
            return Result.fail("库存不足");
        }


        //实现一人一单
        Long userId = UserHolder.getUser().getId();

        //intern()是去常量池中去找userId,处理锁失效,事务失效
        synchronized(userId.toString().intern()) {
            IVoucherOrderService currentProxy = (IVoucherOrderService) AopContext.currentProxy();
            return currentProxy.createVoucherOrder(voucherId);
            *//*调用的this.create方法,没有实际对象,是事务失效的几种情况之一,所以需要找代理对象来实现
            return createVoucherOrder(voucherId);*//*
        }
    }

    @Transactional
    public  Result createVoucherOrder1(Long voucherId) {
        //实现一人一单
        Long userId = UserHolder.getUser().getId();

        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();

        if (count > 0) {
            return Result.fail("已经购买");
        }

        //验证结束,扣减库存
        boolean flag = seckillVoucherService.update()
                .setSql("stock = stock - 1")
                .eq("voucher_id", voucherId)
                .gt("stock", 0)//CAS: 通过设定判断库存数量来进行,适合更新数据使用
                .update();

        if (!flag) {
            return Result.fail("库存不足");
        }

        //封装订单
        VoucherOrder voucherOrder = new VoucherOrder();
        long orderId = redisIDProductor.nextId("order");
        //订单Id、用户Id、优惠券Id
        voucherOrder.setId(orderId).setUserId(userId).setVoucherId(voucherId);
        save(voucherOrder);


        return Result.ok(orderId);

    }

总结

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/92172.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Traefik整理

entryPoints配置 defaultEntryPoints ["oneway"][entryPoints]# 代表traefik的监听端口为90[entryPoints.oneway]address ":90"#90端口接收到的请求先进行鉴权&#xff0c;traefik会先访问跳转到http://127.0.0.1:51001/t/test1进行权限验证[entryPoint…

如何使用加密sqlite数据库

如何使用加密sqlite数据库&#xff0c;起始有开源的sqlcipher 可以去开源网站搜索一下&#xff0c;如码云&#xff0c;github等&#xff0c;那么如何编译呢&#xff0c; 这是我的虚拟机版本和 config参数 然后就会生成Makefile 直接make即可生成 sqlcipher可执行程序&#xff…

有关Monaco的使用疑惑

前言 学习monaco editor已经有三个多月了。阅读了大部分的的文档&#xff0c;也看了很多相关文章&#xff0c;也调研了一些使用它做的大型开源项目。 一开始的疑惑已经逐渐解开&#xff0c;但随着学习的深入&#xff0c;也遇到了比较奇怪的问题。这篇文件就来聊一聊&#xff0…

软考考试多少分通过?

当然是45&#xff01;45&#xff01;45&#xff01;而且是各科45&#xff01; 初级和中级考两科 综合知识&#xff1a;考试时间为 150 分钟&#xff0c;笔试&#xff0c;选择题&#xff08;上午 9:00-11:30&#xff09;案例分析&#xff1a;考试时间为 90 分钟&#xff0c;笔…

Divide by Zero 2021 and Codeforces Round #714 (Div. 2) C. Add One

Problem - C - Codeforces 翻译&#xff1a; 给你一个整数&#x1d45b;。您必须对其应用&#x1d45a;操作。 在单个操作中&#xff0c;必须将该数字的每个数字&#x1d451;替换为整数&#x1d451;1的十进制表示形式。例如&#xff0c;在应用一次操作后&#xff0c;1912变…

安利一个Python大数据分析神器!

对于Pandas运行速度的提升方法&#xff0c;之前已经介绍过很多回了&#xff0c;里面经常提及Dask&#xff0c;很多朋友没接触过可能不太了解&#xff0c;今天就推荐一下这个神器。 1、什么是Dask&#xff1f; Pandas和Numpy大家都不陌生了&#xff0c;代码运行后数据都加载到…

项目里接入了MQ消息中间件以后,我摸鱼的时间更长了~

V-xin&#xff1a;ruyuanhadeng获得600页原创精品文章汇总PDF 一、前情回顾 之前给大家聊了一下&#xff0c;面试时如果遇到消息中间件这个话题&#xff0c;面试官上来可能问的两个问题&#xff1a; 你们的系统架构中为什么要引入消息中间件&#xff1f;系统架构中引入消息中…

零跑汽车股价终于盼来了期望

近期零跑股价上涨&#xff0c;给其它汽车产业带来了危机。9月29日&#xff0c;零跑汽车作为第四位登陆港股的新势力车企&#xff0c;终于胜利敲钟。但是&#xff0c;紧接着等待零跑的却是刷新新势力纪录的开盘破发。 继开盘首日暴跌33%之后&#xff0c;9月30日收盘&#xff0c;…

Linux权限---用户权限切换与文件权限更改(附目录,哪里不懂点哪里)--- 第一期

目录 1. Linux权限的概念 1.1 什么是权限 1.2 所以权限的操作一共分为两类 1.3 Linux中&#xff0c;默认有两类用户 2. root与普通用户切换指令 2.1 如果想从普通用户转为超级用户可以进行下面操作 2.2 如果想从超级用户转为普通用户可以进行下面操作 2.3 禁止来回切换用…

阿里技术官耗时半年总结出“满分”架构笔记,拿捏分布式到微服务

第 1 章&#xff1a;深入理解网络 讲解分布式的基础一-网络&#xff0c; 对国际互联网、NIO、AIO、网络传输中的对象序列化问题、HTTP 的前世今生、TCP/IP、从 CDN 到 SD-WAN 等知识进行深入讲解。 详细章节介绍&#xff1a; 从国际互联网开始 NIO, 一本难念的经 AIO,大道至…

Redis 非关系型数据库

关系型数据库与非关系型数据库 Redis支持的键值数据类型 Redis中文网 2. 哈希类型 hash 删除&#xff1a;hdel key field 3. 列表类型 list:可以添加一个元素到列表的头部&#xff08;左边&#xff09;或者尾部&#xff08;右边&#xff09; 列表类型相当于队列 4. 集合…

Vue--》vue-router的导航守卫使用讲解

目录 前言 vue-router中编程式导航API 导航守卫 全局守卫 独享路由守卫 组件路由守卫 前言 在浏览器中点击链接实现导航的方式&#xff0c;叫做声明式导航。例如&#xff1a;普通网页中点击<a>链接、vue项目中点击<router-link>都属于声明式导航。 在浏览器…

0基础转软件测试该学些什么?

有很多人员会不断问自己&#xff0c;自己到底要不要学测试&#xff0c;或者要不要转行做测试&#xff0c;测试的职业发展到底怎么样&#xff1f;如果你还在迷茫&#xff0c;在到处找各种大牛问类似的问题&#xff0c;我希望这篇文章&#xff0c;你看完能够结束你的这个烦恼&…

从 0 到 1 搞一个 Compose Desktop 版本的玩天气之踩坑

从 0 到 1 搞一个 Compose Desktop 版本的玩天气之踩坑 大家好&#xff0c;好久不见&#xff0c;接下来一段时间我会系统性地写一套关于 Compose Desktop 的文章&#xff0c;带大家从头到尾写一个桌面版的天气应用&#xff0c;并且打好包让别人也可以进行使用&#xff0c;接下…

java判断选择的日期是否在某个时间区间

效果展示&#xff1a; 具体代码&#xff1a; String startTimeZoom "08:00";//时间区间-开始时间 String endTimeZoom "22:00";//时间区间-结束时间String startTimeChoice "08:00";//时间区间-选择的开始时间 String endTimeChoice "2…

Java中的异常(Exception)

目录 一、什么是异常(Exception)&#xff1f; 二、解决方案&#xff1a;try-catch 三、异常的概念 四、异常的体系图(重点) 小结&#xff1a; 五、常见的五大运行时异常 1、NullPointerException空指针异常 2、ArithmeticException数学运算异常 3、ArrayIndexOutOfBounds…

【MySQL进阶】浅谈InnoDB中的BufferPool

【MySQL进阶】浅谈InnoDB中的BufferPool 文章目录【MySQL进阶】浅谈InnoDB中的BufferPool一、前言——缓存的重要性二、InnoDB的Buffer Pool1&#xff1a;BufferPool 简介2&#xff1a;BufferPool内部组成3&#xff1a;free链表的管理4&#xff1a;flush链表的管理5&#xff1a…

PCA与PCoA

通过分析坐标轴中样本和样本间的距离可看到 2 个样本或 2 组样本间的差异性。若2个样本或2组样本之间的直线距离较近&#xff0c;则表示这2个样本或2组样本差异性较小&#xff1b;相反则表示差异性较大。因此PCA和PCoA 具有直观性(直接看两点之间的距离)和完整性(呈现所有样本)…

spring中i18n国际化处理多语言

前言 在项目中&#xff0c;往往用户会存在多语言的述求&#xff0c;比如说一个系统既有中文的用户&#xff0c;又有英文的用户。怎么来实现多语言呢&#xff1f; 首先前后端分离的项目&#xff0c;前端会有自己的多语言实现方案&#xff0c;大致效果就是&#xff0c;用户切换…

rocketmq源码-pull模式拉取消息、同步拉取消息

前言 上一篇博客&#xff0c;记录的是push模式&#xff0c;异步发送netty请求拉取消息的代码&#xff0c;这篇博客主要记录consumer发送同步netty请求&#xff0c;去拉取消息的逻辑&#xff0c;但是对于同步发送请求&#xff0c;需要结合LitePullConsumer来看 在Lite PullCon…