消息队列 RocketMQ 消息重复消费问题(原因及解决)

news2024/11/18 5:51:13

目录

1.出现重复消费的原因

2.解决

2.1 数据库插入法

2.2 使用布隆过滤器

2.2.1 添加hutool的依赖

2.2.2 测试生产者

2.2.2 测试消费者


1.出现重复消费的原因

  1. BROADCASTING(广播) 模式下,所有注册的消费者都会消费,而这些消费者通常是集群部署的一个个微服务,这样就会多台机器重复消费,当然这个是根据需要来选择。
  2. CLUSTERING(负载均衡)模式下,如果一个 topic 被多个 consumerGroup 消费,也会重复消费。
  3. 即使是在 CLUSTERING 模式下,同一个 consumerGroup 下,一个队列只会分配给一个消费者,看起来好像是不会重复消费。但是,有个特殊情况:一个消费者新上线后,同组的所有消费者要重新负载均衡重平衡 reBalance(反之一个消费者掉线后,也一样)。一个队列所对应的新的消费者要获取之前消费的 offset(偏移量,也就是消息消费的点位),此时之前的消费者可能已经消费了一条消息,但是并没有把 offset 提交给 broker,那么新的消费者可能会重新消费一次。虽然 orderly 模式是前一个消费者先解锁,后一个消费者加锁再消费的模式,比起 concurrently 要严格了,但是加锁的线程和提交offset 的线程不是同一个,所以还是会出现极端情况下的重复消费。
  4. 还有在发送批量消息的时候,会被当做一条消息进行处理,那么如果批量消息中有一条业务处理成功,其他失败了,还是会被重新消费一次。

简单的说:

  1. Consumer 消费完消息并不是实时同步到 Broker 的,而是将 offset 先保存在本地map中,通过定时任务持久化上去。这就导致消息被消费了,但是此时消费者宕机了导致 offset 没提交,下次没提交 offset 的这部分消息会被再次消费
  2. 即使 offset 被提交到了 Broker,在还没来得及持久化的时候 Broker 宕机了,当重启的时候 Broker 会读取consumerOffset.json 中保存的 offset 信息,这就会导致没持久化 offset 的这部分消息会被再次消费

那么如果在CLUSTERING(负载均衡)模式下,并且在同一个消费者组中,不希望一条消息被重复消费,改怎么办呢?我们可以想到去重操作,找到消息唯一的标识,可以是 msgId 也可以是你自定义的唯一的 key,这样就可以去重了

2.解决

我们需要给我们的消费者实现 幂等 ,也就是对同一个消息的处理结果,执行多少次都不变。

幂等性:多次操作产生的影响均和第一次操作产生的影响相同

例如:判断 crud 的幂等性

a. 新增:普通的新增是非幂等,设置了唯一索引的新增是幂等操作

b. 修改:update goods set stock = 10 where id = 1 幂等

               update goods set stock = stock - 1 where id = 1 非幂等

c. 查询:幂等

d. 删除:幂等

那么如何给业务实现幂等呢?这个还是需要结合具体的业务的。你可以使用写入 Redis 来保证,因为Redis 的 key 和 value 就是天然支持幂等的。当然还有使用 数据库插入法 ,基于数据库的唯一键来保证重复数据不会被插入多条。

2.1 数据库插入法

发送方需要给消息带一个唯一标记(根据业务标识)

模拟业务 数据库的订单操作日志表结构(去重表)

给订单号添加唯一索引(订单号存的是 key)

模拟业务,生产者发送了重复的消息

@Test
public void repeatTest() throws Exception {
String key = UUID.randomUUID().toString();
Message<String> msg = MessageBuilder.withPayload("扣减库存 -1").setHeader(RocketMQHeaders.KEYS, key).build();
rocketMQTemplate.syncSend("repeatTopic", msg);
rocketMQTemplate.syncSend("repeatTopic", msg);
}

消费者

@Component
@RocketMQMessageListener(topic = "repeatTopic",consumerGroup = "repeat-consumer-group")
public class RepeatListener implements RocketMQListener<MessageExt> {
    @Autowired
    private LogMapper logMapper;
    @Override
    public void onMessage(MessageExt messageExt) {
        // 先拿key
        String keys = messageExt.getKeys();
        // 插入数据库 因为key做了唯一索引
        OrderOperLog orderOperLog = new OrderOperLog();
        orderOperLog.setType(1l);
        orderOperLog.setOrderSn(keys);
        orderOperLog.setUserId("1003");
        int insert = logMapper.insert(orderOperLog);
        System.out.println(keys);
        System.out.println(new String(messageExt.getBody()));
    }
}

在消费第二条的时候抛出唯一索引重复 SQLIntegrityConstraintViolationException

数据库只插入一条这样的记录

优化,捕获到异常是 SQLIntegrityConstraintViolationException 时直接将消息签收了,不再进行业务处理,因为之前已经消费了一条同样的消息,这样便可以解决重复消费问题

2.2 使用布隆过滤器

  • 使用去重方案解决,例如将消息的唯一标识存起来,然后每次消费之前先判断是否存在这个唯一标识,如果存在则不消费,如果不存在则消费,并且消费以后将这个标记保存。
  • 想法很好,但是消息的体量是非常大的,可能在生产环境中会到达上千万甚至上亿条,那么我们该如何选择一个容器来保存所有消息的标识,并且又可以快速的判断是否存在呢?

我们可以选择布隆过滤器(BloomFilter)

介绍:

布隆过滤器(英语:Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。

布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。

2.2.1 添加hutool的依赖

<dependency>
  <groupId>cn.hutool</groupId>
  <artifactId>hutool-all</artifactId>
  <version>5.7.11</version>
</dependency>

2.2.2 测试生产者

public void testRepeatProducer() throws Exception {
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("test-group");
// 设置nameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动实例
producer.start();
// 我们可以使用自定义key当做唯一标识
String keyId = UUID.randomUUID().toString();
System.out.println(keyId);
Message msg = new Message("TopicTest", "tagA", keyId, "我是一个测试消息".getBytes());
SendResult send = producer.send(msg);
System.out.println(send);
// 关闭实例
producer.shutdown();
}

发送了两条相同的消息

55d397c9-814f-4931-b0fd-7e142c04759b
SendResult [sendStatus=SEND_OK, msgId=7F00000121C418B4AAC204A76B050000, offsetMsgId=C0A8588200002A9F000000000002C359, messageQueue=MessageQueue [topic=repeatTestTopic, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F00000121C418B4AAC204A76B050000, offsetMsgId=C0A8588200002A9F000000000002C43F, messageQueue=MessageQueue [topic=repeatTestTopic, brokerName=broker-a, queueId=2], queueOffset=0]

2.2.2 测试消费者

/**
 * 在boot项目中可以使用@Bean在整个容器中放置一个单利对象
 */
public static BitMapBloomFilter bloomFilter = new BitMapBloomFilter(100); // m数组长度

@Test
public void testRepeatConsumer() throws Exception {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setNamesrvAddr(MyConstant.NAME_SRV_ADDR);
consumer.subscribe("repeatTestTopic", "*");
// 注册一个消费监听 MessageListenerConcurrently是并发消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                    ConsumeConcurrentlyContext context) {
        // 拿到消息的key
        MessageExt messageExt = msgs.get(0);
        String keys = messageExt.getKeys();
        // 判断是否存在布隆过滤器中
        if (bloomFilter.contains(keys)) {
            // 直接返回了 不往下处理业务
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        // 这个处理业务,然后放入过滤器中
        // do sth...
        bloomFilter.add(keys);
        System.out.println("keys:" + keys);
        System.out.println(new String(messageExt.getBody()));
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();
System.in.read();
}

业务只处理了一条

keys:55d397c9-814f-4931-b0fd-7e142c04759b
库存-1

延迟过了后 重复消息被签收

解决重复消费问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1121058.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

hdlbits系列verilog解答(内部wire)-09

文章目录 wire线网类型介绍一、问题描述二、verilog源码三、仿真结果wire线网类型介绍 wire线网类型是verilog的一种数据类型,它是一种单向的物理连线。它可以是输入也可以是输出,它与reg寄存器数据类型不同,它不能存储数据,只能用于组合逻辑建模。常用于assign连续赋值语…

测试饱和了? 大数据测试就业薪资和前景究竟怎么样?

随着不断有转行人员及毕业的大学生进入IT行业&#xff0c;在很多外界人眼里&#xff0c;这个行业的“缺口”已满&#xff0c;人员趋于饱和&#xff0c;但事实真的这样吗&#xff1f;还真没有。只是最基础的岗位需求在慢慢变少了&#xff0c;但行业中比较深的细分岗位&#xff0…

一、软件工程概述+练习题

文章目录 软件工程复习一、 概述1.常见考点1.1 什么是软件&#xff1f;软件的特点 1.2 什么是软件危机&#xff1f;它的具体表现是什么&#xff1f;软件危机的概念软件危机的内容具体表现软件危机的原因消除软件危机的途径 2.软件工程的三要素软件工程的定义 3. 软件生存周期4.…

在软件测试行业这种情况下,凭什么他能拿25k?我却约面试都难?

在当今竞争激烈的软件测试行业中&#xff0c;近期的招聘市场确实面临一些挑战。大量的求职者争相涌入岗位&#xff0c;许多热衷于功能测试的人士甚至难以找到理想的工作机会。更不幸的是&#xff0c;连自动化测试和性能测试这些专业领域也受到了测试开发人员的竞争压力。然而&a…

HashMap遍历之EntrySet————小练习

public static void main(String[] args) {HashMap hashMap new HashMap();hashMap.put("jack",650);hashMap.put("tom",1200);hashMap.put("smith",2900);System.out.println(hashMap);//将jack的工资更改为2600hashMap.put("jack",…

基于ElasticSearch+Vue实现简易搜索

基于ElasticSearchVue实现简易搜索 一、模拟数据 产品名称描述价格库存数量品牌名称智能手表智能手表&#xff0c;具有健康跟踪和通知功能。199.991000TechWatch4K智能电视4K分辨率智能电视&#xff0c;提供出色的画质。699.99500VisionTech无线耳机降噪无线耳机&#xff0c;…

《低代码指南》——维格云和Airtable的比较

Airtable​ 什么是Airtable​ Airtable 是一个任务管理应用程序,它合并了电子表格、数据存储和模板,以帮助组织构建他们的工作流程。 适用于哪些企业/组织/人群​ 根据 Airtable 网站,该工具被超过 200,000 个组织的团队使用。 维格表与Airtable相比如何​ Airtable作为…

【C语言】善于利用指针(三)

&#x1f497;个人主页&#x1f497; ⭐个人专栏——C语言初步学习⭐ &#x1f4ab;点击关注&#x1f929;一起学习C语言&#x1f4af;&#x1f4ab; 目录 导读&#xff1a;1. 函数指针1.1 什么使函数指针1.2 用函数指针变量调用函数 2. 返回指针值的函数3. 函数指针数组3.1 实…

asp.net社区医疗辅助诊断网站系统VS开发sqlserver数据库web结构c#编程

一、源码特点 asp.net社区医疗辅助诊断网站系统 是一套完善的web设计管理系统&#xff0c;系统采用mvc模式&#xff08;BLLDALENTITY&#xff09;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为vs2010&#xff0c;数据库为sqlserver200…

音乐制作软件 Ableton Live 11 Suite mac中文版功能介绍

Ableton Live 11 Suite mac是一款专业级别的音乐制作软件&#xff0c;它提供了多种音乐制作和编辑功能&#xff0c;可以帮助用户创建各种音乐作品。界面简单直观&#xff0c;可以方便地进行各种音乐制作操作。它提供了丰富的音乐制作工具和功能&#xff0c;如录音、采样、编曲、…

C语言实现模拟 strcmp 字符串比较函数,实现字符串大小的比较

完整代码&#xff1a; // 模拟 strcmp 字符串比较函数&#xff0c;实现字符串大小的比较 #include<stdio.h> //strcmp函数是两个字符串自左向右逐个字符相比&#xff08;按 ASCII 值大小相比较&#xff09;&#xff0c;直到出现不同的字符或遇 \0 为止&#xff0c;如果字…

常用Web安全扫描工具合集

漏洞扫描是一种安全检测行为,更是一类重要的网络安全技术,它能够有效提高网络的安全性,而且漏洞扫描属于主动的防范措施,可以很好地避免黑客攻击行为,做到防患于未然。那么好用的漏洞扫描工具有哪些? 1、AWVS Acunetix Web Vulnerability Scanner(简称AWVS)是一款知名…

网站、小程序常见布局样式记录

文章目录 &#x1f380;前言&#xff1a;&#x1f415;网页样式展示小程序&#xff1a;《携程网》&#x1f380;持续更新... &#x1f380;前言&#xff1a; 本篇博客会收藏一些作者见到的网页、小程序页面&#xff0c;目的是用来寻找制作项目网页页面的灵感&#xff0c;有需要…

Java高级编程---Java多线程

Java多线程 线程概述进程线程比较 线程的创建继承Thread类创建多线程实现Runnable接口创建多线程两种实现多线程方式的对比 线程的生命周期及状态转换线程的调度线程的优先级线程休眠线程让步线程插队多线程同步死锁问题 线程概述 计算机能够同时完成多项任务&#xff0c;例如…

[MIT 6.1810]Lab7-networking

Lab7 networking https://pdos.csail.mit.edu/6.828/2023/labs/net.html 目录 Lab7 networking背景驱动程序E1000手册接收描述符发送描述符寄存器约定环形队列 代码实现发送接收坑 背景 为E1000实现驱动&#xff0c;补全kernel/e1000.c中的两个空函数。 为了达成目的&#xff…

【RuoYi移动端】HbuilderX实现底部弹窗示例

一、单选样式弹窗选择 1、View页面代码 <uni-popup ref"textBox" type"bottom"><view class"select_box"><view class"select_row" v-for"(item,index) in status" click"selectClick(item.id)"&g…

FPGA时序分析与约束(6)——综合的基础知识

在使用时序约束的设计过程中&#xff0c;综合&#xff08;synthesis&#xff09;是第一步。 一、综合的解释 在电子设计中&#xff0c;综合是指完成特定功能的门级网表的实现。除了特定功能&#xff0c;综合的过程可能还要满足某种其他要求&#xff0c;如功率、操作频率等。 有…

Fiber Golang:Golang中的强大Web框架

揭示Fiber在Go Web开发中的特点和优势 在不断发展的Web开发领域中&#xff0c;选择正确的框架可以极大地影响项目的效率和成功。介绍一下Fiber&#xff0c;这是一款令人印象深刻的Golang&#xff08;Go语言&#xff09;Web框架。以其飞快的性能和强大的特性而闻名&#xff0c;…

2023-mac brew安装python最新版本,遇见的问题和处理方式

#### 创建Python3.11.6符号链接我现在遇见这个问题了&#xff1a;python --version -bash: python: command not found 192:bin wangyang$ python3 --version Python 3.9.6 192:bin wangyang$ /usr/local/bin/python3 --version Python 3.11.6我要怎么做&#xff0c;我才可以直…

Spring-AOP 讲解

1、为什么会出现AOP思维 我们知道&#xff0c;在我们的项目中&#xff0c;会出现核心代码和非核心代码&#xff0c;对于非核心代码&#xff0c;在各个方法中可能是冗余的&#xff0c;此时为了解决这种非核心代码的冗余以及不方便管理的问题&#xff0c;就出现了AOP思维。 2、…