【RocketMQ】RocketMQ应用难点

news2024/12/25 1:40:04

🎯 导读:本文探讨了RocketMQ中消息重复消费的问题及其解决方案,尤其是在CLUSTERING模式下的扩容影响。文章分析了重复消费的原因,如广播模式、负载均衡模式下的多consumerGroup消费、消费者组内的动态变化及网络延迟等,并提出了利用唯一标识进行去重的方法。此外,还讨论了如何选择合适的存储机制以高效处理大量消息标识,如HashMap、MySQL、Redis以及推荐的布隆过滤器等方案。对于防止消息堆积与确保消息不丢失,也给出了相应的策略和技术措施。

文章目录

  • 消息重复消费问题(去重)
    • 为什么出现重复消费
    • 解决方案
      • MySQL去重
      • 布隆过滤器
      • 加锁(适合短时间的去重)
      • 过程
    • 那些操作需要控制幂等
    • 布隆过滤器实现
      • 生产者
      • 添加 hutool 的依赖
      • 消费者
  • 如何解决消息堆积问题?
    • 什么情况下会出现堆积
      • 生产速度 远大于 消费速度
      • 消费者消费出现问题
    • 跳过堆积
    • 重置消费点位
  • 如何确保消息不丢失?
    • 方式一:将mq的刷盘机制设置为同步刷盘
    • 方式二:生产者做日志
    • 方式三:集群主备模式
    • 方式四:开启mq的trace机制,消息跟踪机制

消息重复消费问题(去重)

为什么出现重复消费

官网说明:RocketMQ确保所有消息至少传递一次。在大多数情况下,消息不会重复。

  • BROADCASTING(广播)模式下,所有注册的消费者都会消费,这些消费者通常是集群部署的一个个微服务,这样就会多台机器重复消费
  • CLUSTERING(负载均衡)模式下,如果一个topic被多个consumerGroup消费,也会重复消费
  • 扩容影响:在 CLUSTERING 模式下,只有一个 consumerGroup ,一个队列只会分配给一个消费者,看起来好像是不会重复消费。但有个特殊情况:一个消费者新上线后,同组的所有消费者要重新负载均衡(反之,一个消费者掉线后也一样)。一个队列所对应的新的消费者要获取之前消费的offset,可能此时之前的消费者已经消费了一条消息,但并没有把 offset 提交给broker,那么新的消费者可能会重新消费一次。虽然orderly模式是前一个消费者先解锁,后一个消费者加锁再消费的模式,比起concurrently要严格了,但是加锁的线程和提交offset的线程不是同一个,所以还是会出现极端情况下的重复消费
  • 在发送批量消息的时候,会被当做一条消息进行处理,那么如果批量消息中有一条业务处理成功,其他失败了,还是会被重新消费一次
  • 网络卡顿,生产者多次发一样的消息(例如买一个东西,发送了两次减库存)

【扩容影响说明】

一开始只有一个消费者,4个队列都归C1管,C1已经开始消费a、b、c、d了,但是还没有消费完,没有返回让offset偏移

在这里插入图片描述

突然C2上线了,看到 c、d 还没有被消费完,然后自己又拿去消费一次

在这里插入图片描述

那么如果在CLUSTERING(负载均衡)模式下,并且在同一个消费者组中,不希望一条消息被重复消费,该怎么办呢?我们可以想到去重操作,找到消息唯一的标识,可以是msgId,也可以是你自定义的唯一的key,

解决方案

官网:RocketMQ 无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是 msgld,也可以是消息内容中的唯一标识字段,例如订单 id 等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断不存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)

msgld 一定要是全局唯一标识符,但实际使用中,可能会存在相同的消息有两个不同 msgld 的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况需要使用业务字段进行重复消费。 使用自己的Key更安全

使用去重方案解决,例如将消息的唯一标识存起来,然后每次消费之前先判断是否存在这个唯一标识,如果存在则不消费,如果不存在则消费,并且消费以后将这个标记保存。消息的体量是非常大的,可能在生产环境中会到达上千万甚至上亿条,那么我们该如何选择一个容器来保存所有消息的标识,并且又可以快速的判断是否存在呢?

  • HashMap:单机部署可以使用,无法解决分布式问题
  • MySQL去重表,加唯一索引,插入成功就执行业务:太慢了,数据库压力大
  • Redis(setnx):占用内存大,成本高
  • 布隆过滤器(推荐):内存小,效率高

MySQL去重

@SpringBootTest
class ARocketmqDemoApplicationTests {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Test
    void repeatProducer() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("repeat-producer-group");
        producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        producer.start();
        String key = UUID.randomUUID().toString();
        System.out.println(key);
        // 测试 发两个key一样的消息
        Message m1 = new Message("repeatTopic", null, key, "扣减库存-1".getBytes());
        Message m1Repeat = new Message("repeatTopic", null, key, "扣减库存-1".getBytes());
        producer.send(m1);
        producer.send(m1Repeat);
        System.out.println("发送成功");
        producer.shutdown();
    }

    /**
     * mysql的唯一索引实现消费幂等性
     * ---------------------
     * 我们设计一个去重表 对消息的唯一key添加唯一索引
     * 每次消费消息的时候 先插入数据库 如果成功则执行业务逻辑 [如果业务逻辑执行报错 则删除这个去重表记录]
     * 如果插入失败 则说明消息来过了,直接签收了
     *
     * @throws Exception
     */
    @Test
    void repeatConsumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
        consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        consumer.subscribe("repeatTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                // 先拿key
                MessageExt messageExt = msgs.get(0);
                String keys = messageExt.getKeys();
                // 原生方式操作
                Connection connection = null;
                try {
                    connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2B8&useSSL=false", "root", "123456");
                } catch (SQLException e) {
                    e.printStackTrace();
                }
                PreparedStatement statement = null;

                try {
                    // 插入数据库 因为我们 key做了唯一索引
                    statement = connection.prepareStatement("insert into order_oper_log(`type`, `order_sn`, `user`) values (1,'" + keys + "','123')");
                } catch (SQLException e) {
                    e.printStackTrace();
                }

                // 新增 要么成功 要么报错   修改 要么成功,要么返回0 要么报错
                try {
                    // 执行新增
                    statement.executeUpdate();
                } catch (SQLException e) {
                    System.out.println("executeUpdate");
                    if (e instanceof SQLIntegrityConstraintViolationException) {
                        // 唯一索引冲突异常
                        // 说明消息来过了
                        System.out.println("该消息来过了");
                        // 签收消息,从队列中删除消息
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    e.printStackTrace();
                }

                // 如果业务报错 则删除掉这个去重表记录 delete order_oper_log where order_sn = keys;
                // 不删除的话,每次重试就会报错,重试失败
                System.out.println(new String(messageExt.getBody()));
                System.out.println(keys);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.in.read();
    }

}

布隆过滤器

布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。

在hutool的工具中我们可以直接使用,使用redis的bitmap类型手写一个也可以的,Redisson中也有布隆过滤器的实现 https://hutool.cn/docs/#/bloomFilter/%E6%A6%82%E8%BF%B0

在这里插入图片描述

  • 布隆过滤器判断key不在,说明消息没有被消费过,执行消费
  • 布隆过滤器判断key存在,消息可能被消费过,需要做进一步判断(可以结合MySQL做进一步判断)

加锁(适合短时间的去重)

  • 添加一个限时锁
  • 如果可以加锁成功,执行消费,消费失败,释放锁;如果加锁失败,说明消息被消费了,或者正在消费中,直接返回即可,后续如果消费失败,消息会被再次投递

过程

  • 生产者发消息带唯一标记
  • 消费者控制消息消费幂等性(多次操作的影响均和第一次操作产生的影响相同)
    • 方式一:查询key是否存在(存在就返回、不存在就新增并执行业务)
    • 方式二:直接插入(插入成功就执行业务,否则返回)

那些操作需要控制幂等

新增:普通的新增操作是非幂等的(字段有唯一约束除外)

修改:看情况(++不是幂等,i=i+1是幂等)

查询:幂等操作

删除:幂等操作

布隆过滤器实现

生产者

@Test
public void testRepeatProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    // 我们可以使用自定义key当做唯一标识
    String keyId = UUID.randomUUID().toString();
    System.out.println(keyId);
    Message msg = new Message("TopicTest", "tagA", keyId, "我是一个测试消息".getBytes());
    SendResult send = producer.send(msg);
    System.out.println(send);
    // 关闭实例
    producer.shutdown();
}

添加 hutool 的依赖

<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.7.11</version>
</dependency>

消费者

/**
 * 在boot项目中可以使用@Bean在整个容器中放置一个单例对象
 */
public static BitMapBloomFilter bloomFilter = new BitMapBloomFilter(100);
 
@Test
public void testRepeatConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    consumer.setMessageModel(MessageModel.BROADCASTING);
    // 设置nameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅一个主题来消费   表达式,默认是*
    consumer.subscribe("TopicTest", "*");
    // 注册一个消费监听 MessageListenerConcurrently是并发消费
    // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            // 拿到消息的key
            MessageExt messageExt = msgs.get(0);
            String keys = messageExt.getKeys();
            // 判断是否存在布隆过滤器中
            if (bloomFilter.contains(keys)) {
                // 执行进一步的精确判断
                boolean isConsumed = MySQL查询(keys);
                if (isConsumed == true) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            执行业务处理(假如执行完业务,宕机了,key没有被添加到布隆过滤器中,还是重复消费)
            bloomFilter.add(keys);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

上面的方案已经可以解决绝大多数的重复消费问题了,像宕机这种极端场景,我还不知道完美的解决方案,有知道的大佬请告诉我,谢谢

如何解决消息堆积问题?

一般认为单条队列消息差值>=10w时算消息堆积

什么情况下会出现堆积

生产速度 远大于 消费速度

  • 生产方可以做业务限流

  • 增加消费者数量,但是消费者数量<=队列数量,适当增加消费线程数量

    在这里插入图片描述

    • @RocketMQMessageListener(topic = "modeTopic",
       consumerGroup = "mode-consumer-group-a", 
       messageModel = MessageModel.CLUSTERING, 
       consumeThreadNumber = 40)
      public class MyConsumer implements RocketMQConsumer {
          // ...
      }
      
    • 核心线程数 < CPU核心数

    • 最大线程数

      • IO密集,读文件、操作数据库,CPU快,磁盘慢,CPU空闲很多,建议设置为2n,n是CPU的最大处理器数量,如下图所示,建议通过Runtime.getRuntime().availableProcessors()来获取,因为不同服务器这个是不一样的,不建议写死
        在这里插入图片描述

      • CPU密集,CPU使用频繁,如果开太多,CPU核数不够用,会频繁切换,效率低,建议设置(n+1)

  • 动态扩容队列数量(一般由运维工程师来决定),从而增加消费者数量。动态扩容之后,程序不是一下就感知到的,刚扩容的时候,新来的队列还是收不到消息,要过一段时间才会收到

    在这里插入图片描述

    在这里插入图片描述

    • 读队列数量不要设置大于写队列数量(读多没有用),直接设置相等就可以
    • perm:设置为2,这个主题的消息只能读不能写;设置为4,只能写,不能读;设置为6,可写可读
    • 当队列的最大位点不是全为0的话(如下图所示),不可以缩容,不然会出问题,有的队列的消息会丢失;全是0的时候,可以缩
      在这里插入图片描述

消费者消费出现问题

  • 排查消费者程序的问题

跳过堆积

跳过堆积,这个组的消息都不要了,偏移量会自动往后面移动,表示已经消费过。跳过之后,无法回滚,要谨慎

在这里插入图片描述

重置消费点位

从这个时间开始至今被消费过的消息,会重新被消费

在这里插入图片描述

如何确保消息不丢失?

硬盘读写方式:

  • 随机读写:将数据不固定存储到不同的扇区,查询数据较慢
  • 顺序读写:提前申请一片空间,将数据顺序存储(MQ使用的是这种)

方式一:将mq的刷盘机制设置为同步刷盘

消息持久化:

  • 同步刷盘:生产者给broker发送消息,broker先把消息持久化到磁盘之后,再返回成功给生产者。安全,性能降低,但其实性能还是不错的
  • 异步刷盘:先把数据存储到内存的buffer中,到达一定的量,再存储到磁盘中。不安全,性能好

方式二:生产者做日志

不用同步刷盘,生产者可以自己做消息日志,写到文件或者数据库中,不占用MQ的性能

  • 生产者使用同步发送模式,收到mq的返回确认以后,顺便往自己的数据库里面写key createTime status=1。消费者消费以后,修改数据这条消息的状态 = 2,记录消息的handleTime
  • 写一个定时任务,间隔两天去查询数据,如果有status = 1 and createTime < day-2,将这个消息补发一次

方式三:集群主备模式

单个硬盘存储,如果硬盘坏了,消息还是会丢失的,因此需要集群模主备模式,将消息持久化在不同的硬件上

方式四:开启mq的trace机制,消息跟踪机制

1、在broker.conf中开启消息追踪traceTopicEnable=true

在这里插入图片描述

2、重启broker

3、生产者配置文件开启消息轨迹enable-msg-trace: true

在这里插入图片描述

如果使用的是原生API,可以这样启动

在这里插入图片描述

4、消费者开启消息轨迹功能,可以给单独的某一个消费者开启enableMsgTrace = true

在这里插入图片描述

在rocketmq的面板中可以查看消息轨迹,默认会将消息轨迹的数据存在RMQ_SYS_TRACE_TOPIC主题里面

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2180812.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于单片机的催眠电路控制系统

** 文章目录 前言一 概要功能设计设计思路 软件设计效果图 程序文章目录 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师&#xff0c;一名热衷于单片机技术探索与分享的博主、专注于 精通51/STM32/MSP430/AVR等单片机设计 主…

动手学深度学习(李沐)PyTorch 第 3 章 线性神经网络

3.1 线性回归 线性回归是对n维输入的加权&#xff0c;外加偏差 线性回归可以看作是单层神经网络 回归问题中最常用的损失函数是平方误差函数。 平方误差可以定义为以下公式&#xff1a; 常数1/2不会带来本质的差别&#xff0c;但这样在形式上稍微简单一些 &#xff08;因为当…

【C++篇】领略模板编程的进阶之美:参数巧思与编译的智慧

文章目录 C模板进阶编程前言第一章: 非类型模板参数1.1 什么是非类型模板参数&#xff1f;1.1.1 非类型模板参数的定义 1.2 非类型模板参数的注意事项1.3 非类型模板参数的使用场景示例&#xff1a;静态数组的实现 第二章: 模板的特化2.1 什么是模板特化&#xff1f;2.1.1 模板…

YOLO11关键改进与网络结构图

目录 前言&#xff1a;一、YOLO11的优势二、YOLO11网络结构图三、C3k2作用分析四、总结 前言&#xff1a; 对于一个科研人来说&#xff0c;发表论文水平的高低和你所掌握的信息差有着极大的关系&#xff0c;所以趁着YOLO11刚刚发布&#xff0c;趁热了解&#xff0c;先人一步对…

与我免费ai书童拆解《坚持》创作历程

插科打诨的海侃胡闹&#xff0c;调侃舒展《坚持》诗创的灵魂盛宴之旅。 (笔记模板由python脚本于2024年09月30日 19:11:42创建&#xff0c;本篇笔记适合喜欢python和诗歌的coder翻阅) 【学习的细节是欢悦的历程】 Python 官网&#xff1a;https://www.python.org/ Free&#x…

如何让ollama本地模型使用code-interpreter(代码解释器)?

代码解释器通常都需要在GPU的环境下使用原生的模型通过transformer来实现&#xff0c;且本身还需要模型本身支持&#xff0c;ollama本地蒸馏过的模型占用的资源比较小&#xff0c;也方便本地使用&#xff0c;但是如果想用这些模型的代码解释器&#xff0c;即让大模型写程序并执…

小巧机身,但强劲动力实现千元级净须,未野迷你剃须刀测评

剃须刀是很多朋友每天都要用的工具&#xff0c;在选择上非常丰富&#xff0c;就便捷性和可靠性来说&#xff0c;电动剃须刀还是更方便一些。以前多数人用的都是飞利浦等传统品牌。近几年国产剃须刀也开始崛起&#xff0c;但是也存在很多令人不够满意的产品&#xff0c;比如说&a…

Redis入门第三步:Redis事务处理

欢迎继续跟随《Redis新手指南&#xff1a;从入门到精通》专栏的步伐&#xff01;在本文中&#xff0c;我们将探讨Redis的事务处理机制。了解如何使用事务来保证一系列操作的原子性和一致性&#xff0c;这对于构建可靠的应用程序至关重要 1 什么是Redis事务&#x1f340; ​ R…

高效学习工作SMART原则

S代表Specific&#xff08;明确具体的&#xff09;&#xff0c;意味着你需要清晰地定义你的目标&#xff0c;并确保它是具体而明确的。例如&#xff0c;如果你的目标是“提高销售”&#xff0c;那么这个目标就不是足够具体。更好的表述可能是&#xff1a;“在接下来的三个月内&…

【Python报错已解决】 ModuleNotFoundError: No module named ‘lime‘

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 专栏介绍 在软件开发和日常使用中&#xff0c;BUG是不可避免的。本专栏致力于为广大开发者和技术爱好者提供一个关于BUG解决的经…

828华为云征文 | 利用FIO工具测试Flexus云服务器X实例存储性能

目录 一、Flexus云服务器X实例概要 1.1 Flexus云服务器X实例摘要 1.2 产品特点 1.3 存储方面性能 1.4 测评服务器规格 二、FIO工具 2.1 安装部署FIO 2.2 主要性能指标概要 三、进行压测 3.1 测试全盘随机读IO延迟 3.2 测试全盘随机写IO延迟 3.3 测试随机读IOPS 3.4…

《后端程序猿 · Spring事务失效场景》

&#x1f4e2; 大家好&#xff0c;我是 【战神刘玉栋】&#xff0c;有10多年的研发经验&#xff0c;致力于前后端技术栈的知识沉淀和传播。 &#x1f497; &#x1f33b; CSDN入驻不久&#xff0c;希望大家多多支持&#xff0c;后续会继续提升文章质量&#xff0c;绝不滥竽充数…

如何使用ssm实现钢铁集团公司安全管理系统的构建与实现

TOC ssm748钢铁集团公司安全管理系统的构建与实现jsp 研究背景与现状 时代的进步使人们的生活实现了部分自动化&#xff0c;由最初的全手动办公已转向手动自动相结合的方式。比如各种办公系统、智能电子电器的出现&#xff0c;都为人们生活的享受提供帮助。采用新型的自动化…

SpringBoot教程(三十一) | SpringBoot生成Docker镜像包

SpringBoot教程&#xff08;三十&#xff09; | SpringBoot生成Docker镜像包 前提方式一&#xff1a;spring-boot-maven-plugin 方式方式二&#xff1a;Dockfile 方式&#xff08;推荐&#xff09; 前提 如果你在 Windows 上&#xff0c;确保 Docker Desktop 已经启动并正在运…

Java常用三类定时器快速入手指南

文章目录 Java常用三类定时器快速入手指南一、序言二&#xff0c;Timer相关1、概念2、Timer类3、TimerTask类4、ScheduleExecutorService接口 三&#xff0c;Scheduled相关1、配置1.1 SpringMVC配置1.2 SpringBoot配置&#xff08;1&#xff09;单线程&#xff08;2&#xff09…

python 如何引用变量

在字符串中引入变量有三种方法&#xff1a; 1、 连字符 name zhangsan print(my name is name) 结果为 my name is zhangsan 2、% 字符 name zhangsan age 25 price 4500.225 print(my name is %s%(name)) print(i am %d%(age) years old) print(my price is %f%(pric…

【数字图像处理】小白也能懂,最浅显方式手撕直方图均衡化(附python实现)

文章目录 1 概念2 原理2.1 数学原理 3 python代码实现4 测试效果5 结论 1 概念 直方图均衡化&#xff0c;同伽马变换一样&#xff0c;也是增强图像对比度的一种工具。区别在于&#xff0c;直方图均衡化是一种自适应的工具&#xff0c;即自动工具。也就是说&#xff0c;我们只需…

使用RestTemplate调用EMQX API查询MQTT客户端列表信息

项目中集成mqtt客户端查询功能&#xff0c;使用到了EMQX api-v5&#xff0c;具体步骤&#xff1a; 一、准备工作 首先在EMQX dashboard中添加API 密钥 填写密钥名称&#xff0c;点击确定&#xff0c;会生成API Key和Secret Key&#xff0c;保存起来备用。 二、配置文件 在…

SUP-NeRF-ECCV2024数据集: 单目3D对象重建的新突破

2024-09-25&#xff0c;由Bosch Research North America和Michigan State University联合发布的SUP-NeRF&#xff0c;是一个基于单目图像进行3D对象重建的新型方法。一个无缝集成姿态估计和物体重建的统一网格。 ECCV&#xff1a;欧洲计算机视觉会议的缩写&#xff0c;它是计算…

如何使用ssm实现科技银行业务管理系统+vue

TOC ssm743科技银行业务管理系统vue 第一章 绪论 1.1 研究背景 在现在社会&#xff0c;对于信息处理方面&#xff0c;是有很高的要求的&#xff0c;因为信息的产生是无时无刻的&#xff0c;并且信息产生的数量是呈几何形式的增加&#xff0c;而增加的信息如何存储以及短时间…