【测试开发】Mq消息重复如何测试?

news2024/12/26 22:46:30

本篇文章主要讲述重复消费的原因,以及如何去测试这个场景,最后也会告诉大家,目前互联网项目关于如何避免重复消费的解决方案。

Mq为什么会有重复消费的问题?

Mq 常见的缺点之一就是消息重复消费问题,产生这种问题的原因是什么呢?有以下几点:

工作流程

Mq消息重复如何测试?

1、producer 生成数据,发送到broker集群,当遇到网络抖动超时,可能会重复发送。

为了保证数据的可靠性一般都会配置重试机制如下:

rocketmq:
  producer:
    group: sanyouProducer
    #发送消息超过5秒未接收到broker返回的成功消息
    send-message-timeout: 5000
    #重试最大次数
    retry-times-when-send-failed: 2
    max-message-size: 4194304
  name-server: 172.30.34.10:9876;172.30.35.37:9876;172.30.35.30:9876
  #发送消息超时时长,意思是超过5秒钟未收到broker返回的发送成功的消息,
  #producer会重复发送,但并不是一直发送,会根据retry-times-when-send-failed次数,
  #最多重试多少次

极端情况下,网络出现抖动,生产者超过设置的时间未收到broker返回的成功消息,会重新发送消息。

2、消费者宕机,未提交offset给broker

由上图可知,broker接收到producer 发送的消息后,会把消息发送给消费者,一般情况下,消费者消费完一条数据,会提交一个offset给到broker,告诉它,这条消息我消费了,但是,极端情况下,消费者消费一条消息成功,提交offset之前,宕机了或者网络抖动超时了,broker未收到offset,就认为这条消息没人消费,当消费者重启服务器或网络恢复,那么broker还会发送这条消息给消费者重新消费。

3、业务上的bug,可能会导致重复消费。

生产者producer的上游系统,突然出现了bug,导致重复调用生产者所在服务的接口,生产者收到请求后,继续发送消息给broker。

当然了,重复消费的原因有很多,以上只是常见的几种原因,那怎么去测试呢?

怎么测试重复消费场景?

假如有这么一个场景,采购员在采购系统的前端页面进行采购单下单操作,下单成功后,采购系统这边会保留一份采购单数据,然后发送一条mq给到wms 仓库系统,那么生产者就是采购系统,消费者就是wms仓库系统,wms消费到采购单的消息,落入数据库wms_purchase表中,为了简化,我只设计了三个字段。

建表ddl:

CREATE TABLE `wms_purchase` (
  `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '仓库采购单id',
  `purchase_id` bigint(20) NOT NULL COMMENT '采购单id',
  `purchase_name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=237 DEFAULT CHARSET=utf8;

怎么测试呢?很简单,我们只要编写生产者工具,在工具里加个循环,尽量循环多次,如下:

@RestController
@RequestMapping("/mq")
public class ProducerController {
    // 自动注入 RocketMQTemplate模板类,用于生产消息
    @Autowired
    private RocketMQTemplate mqTemplate;
    // 模拟生产者重复消费问题,前提是数据库没有唯一索引,并且项目未做幂等性校验
    @RequestMapping("/send")
    public String testSend(@RequestBody WmsPurchaseDto params) {
        try {
            for (int i = 0; i <100 ; i++) {
                mqTemplate.convertAndSend("fourbrothertopic", params);
            }
            return "success";
        } catch (Exception e) {
            e.printStackTrace();
            return "fail";
        }
    }

解读:

requestmapping对外暴露一个web接口,地址是localhost:8080/demo/mq/send,
post请求,参数是json格式,类似
{
    "purchaseId": "256465",
    "purchaseName": "测试"
}
这种形式,然后起个for循环,循环调用convertAndSend方法,发送同样的消息,最终结果如下图:

Mq消息重复如何测试?

这里模拟producer重复发送的场景,前提是数据库没有对采购单id做唯一索引,并且项目未做幂等性校验。数据库里出现很多采购单id一样的数据,业务上这是不允许的。

假如说,项目出现了这么一种bug,开发那边是怎么修复的呢?

Mq如何保证幂等性?

分享几种解决方案的具体代码demo:

1、数据库unique key(表里不允许重复列出现)来保证幂等性。

很简单,我们只要在wms_purchase里,对purchaseId添加唯一索引即可,提示:在添加唯一索引之前,需清理完表里的数据。

也可以使用ddl语句:

ALTER TABLE `wms_purchase` ADD UNIQUE ( `purchaseId` ) 

代码不变,调用以下接口:

localhost:8080/demo/mq/send post请求
{
    "purchaseId": "256465",
    "purchaseName": "测试"
}

得到以下结果:

Mq消息重复如何测试?

上图中,循环生产同一条采购单数据,但是右边表中只出现了一条采购单id是256465的数据,说明添加唯一索引确实保证了幂等性,但是代码里却出现大量类似Duplicate entry '256465' for key 'uniqe_key_purchaseid' 日志,是因为触发了数据设置的唯一索引,

由于触发了唯一索引,导致消费者未提交offset给broker,那么broker会认为这条消息未被消费,后续会持续不断地推送消息给消费者,也就意味着会持续不断地报错。

另外这种持续无效的请求数据库会占用数据库的连接资源,在高并发的场景下,会严重拖垮系统响应效率。

虽然保证了幂等性,但是日志里总是报错,太不讲究、也不雅观,那怎么解决呢?

2、数据库unique key+redis 来保证幂等性。

如截图:

Mq消息重复如何测试?

通俗的理解就是,消费者在进行数据库落库操作之前,会判断redis是有这条采购单数据,如果有就直接放过这条消息不做处理,没有这条数据,那就进行落库操作,但在落库之前还要进一步判断数据库是否有这条采购单数据,没有那就进行落库,落库成功,再把采购单的id当做key,采购单数据当做value set 进redis缓存里,设置一定的过期时间。

redis基于内存,操作数据特别快,在进行落库之前查询redis,可以避免很多无效的请求数据库,但是为啥要设置过期时间?因为redis的内存资源有限,并且很宝贵,所以我们希望设置的数据能在一段时间内定期失效,即使失效,也没关系,还有数据库的唯一索引兜底。

这样就很好的保证了幂等性,也避免了大量的日志报错。伪代码如下:

@Component
//mq的监听器,指定topic是TopicTest,消费者组consumerGroupTest
@RocketMQMessageListener(topic = "fourbrothertopic", consumerGroup = "consumerGroupTest")
@Slf4j
public class ConsumeController implements RocketMQListener {

    @Autowired
    private WmsPurchaseMapper wmsPurchaseMapper;

    @Autowired
    private RedisTemplate redisTemplate;
    @Override
    public void onMessage(String message) {
        log.info("------- Consumer: {}", message);
        //将message消息映射成WmsPurchase实体
        WmsPurchase wmsPurchase = JSONObject.parseObject(message, WmsPurchase.class);
        //首先判断redis里面是否有这条采购单数据,通过PurchaseId查询,有数据,则直接放过不做处理
       if (redisTemplate.opsForValue().get(wmsPurchase.getPurchaseId().toString())==null){
           //然后再使用PurchaseId查询数据库,有数据,则直接放过不做处理
           if (null == wmsPurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId())){
               //数据库没有数据,就进行插入操作,
               if (wmsPurchaseMapper.insert(wmsPurchase)>0){
                   //插入成功就把purchaseid塞进redis里,过期时间是72小时
                   redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId(),wmsPurchase.toString(),72, TimeUnit.HOURS);
               }
           }else {
               //能走到这个判断分支,说明缓存里的采购单数据已经失效,如果还有消息重复消费
               //那就再放入缓存一次,72h过期
               redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId(),wmsPurchase.toString(),72, TimeUnit.HOURS);
               log.info("数据库已保留该数据");
               // 触发重复消费告警机制
           }
       }else {
           log.info("缓存已保留该数据");
            // 触发重复消费告警机制
       }
    }
}

思路很简单,如代码中注释。当然这种方法也有缺点,就是过于依赖redis,有些系统没有使用redis组件,那么还得维护一套redis组件,并且还得保证redis集群高可用。那项目只有mysql,能不能依靠数据库去维护保证幂等性呢?当然可以!

3、还有一种方法叫去重表+唯一索引,顾名思义就是另外维护一张表,记录已经消费的采购单数据,其实和上述方法差不多,上述方法查询缓存,取重表查询数据库取重表。

伪代码 如下:

@Component
//mq的监听器,指定topic是TopicTest,消费者组consumerGroupTest
@RocketMQMessageListener(topic = "fourbrothertopic", consumerGroup = "consumerGroupTest")
@Slf4j
public class ConsumeController implements RocketMQListener {

    @Autowired
    private WmsPurchaseMapper wmsPurchaseMapper;

    @Autowired
    private UniquePurchaseMapper uniquePurchaseMapper;

    @Autowired
    private RedisTemplate redisTemplate;
    @SneakyThrows
    @Override
    public void onMessage(String message) {
        log.info("------- Consumer: {}", message);
        //将message消息映射成WmsPurchase实体
        WmsPurchase wmsPurchase = JSONObject.parseObject(message, WmsPurchase.class);
        log.info("映射后实体消息"+ JSON.toJSONString(wmsPurchase));
        if (uniquePurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId().intValue())  == null){
            if (null == wmsPurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId())){
                //数据库没有数据,就进行插入操作,
                if (wmsPurchaseMapper.insert(wmsPurchase)>0){
                    //插入成功就把purchaseid塞进unique_purchase
                    UniquePurchase  uniquePurchase =   new UniquePurchase();
                    uniquePurchase.setPurchaseId(wmsPurchase.getPurchaseId().intValue());
                    log.info("插入取重表消息:"+ JSON.toJSONString(uniquePurchase));
                    uniquePurchaseMapper.insert(uniquePurchase);
                }
            }else {
                log.info("数据库已保留该数据");
                //自动触发告警机制
            }
        }else {
            log.info("取重表已有这条采购单数据");
        }
 }

代码已上传至gitee,感兴趣可以自行阅读。

上述方式在查询取重表时,并发不安全,极端情况下还是会触发唯一索引错误,比如说,消费者要消费大量消息(线程),执行上述代码,A线程执行完23行,挂起了,cpu把执行权给了B线程,B执行到25行并插入成功,那么这时A线程被唤起,也执行到了23行,结果触发了唯一索引错误。那怎么避免呢?

我们可以让所有线程别并发执行,串行执行,那就用到redis的分布式锁技术。

4、分布式锁+uniquekey

伪代码如下

@Component
//mq的监听器,指定topic是TopicTest,消费者组consumerGroupTest
@RocketMQMessageListener(topic = "fourbrothertopic", consumerGroup = "consumerGroupTest")
@Slf4j
public class ConsumeController implements RocketMQListener {

    @Autowired
    private WmsPurchaseMapper wmsPurchaseMapper;
    @Autowired
    private RedissonClient redisson;
    @Autowired
    private UniquePurchaseMapper uniquePurchaseMapper;
    @Autowired
    private RedisTemplate redisTemplate;
    @SneakyThrows
    @Override
    public void onMessage(String message) {
        log.info("------- Consumer: {}", message);
        //将message消息映射成WmsPurchase实体
        WmsPurchase wmsPurchase = JSONObject.parseObject(message, WmsPurchase.class);
// 注入redisson
// 获取锁对象
        RLock lock = redisson.getLock("lockName");
        try {
            // 1. 最常见的使用方法
            //lock.lock();
            // 2. 支持过期解锁功能,10秒钟以后自动解锁, 无需调用unlock方法手动解锁
            //lock.lock(10, TimeUnit.SECONDS);
            // 3. 尝试加锁,最多等待2秒,上锁以后8秒自动解锁
            boolean res = lock.tryLock();
            if (res) { //成功
                    //然后再使用PurchaseId查询数据库,有数据,则直接放过不做处理
                    if (null == wmsPurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId())){
                        //数据库没有数据,就进行插入操作,
                        if (wmsPurchaseMapper.insert(wmsPurchase)>0){
                            //插入成功就把purchaseid塞进redis里,过期时间是72小时
                            redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId().toString(),wmsPurchase.toString(),1, TimeUnit.HOURS);
                        }
                    }else {
                        redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId().toString(),wmsPurchase.toString(),1, TimeUnit.HOURS);
                        log.info("数据库已保留该数据");
                        //自动触发告警机制
                    }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //释放锁
            RLock lockName = redisson.getLock("lockName");
            if (lockName.isLocked()) {
                if (lockName.isHeldByCurrentThread()) {
                    lockName.unlock();
                }
            }
        }
}

这种也是比较常见的一种,缺点也很明显,在高并发,大请求量的场景下,所有线程串行执行,处理效率势必会降低。当然了,技术没有好坏,只有合不合适。如果你的项目并发量一般,可以尝试使用上述方法。

具体代码demo已上传至gitee平台,地址如下:

https://gitee.com/lv1792017548/rocketmq-demo.git

总结

本文主要分享了如何测试mq消息队列重复性消费,以及避免重复消费常见的解决方案。

【B站最全最易学】十年大佬终于将测试开发路线整理出来了,小白一学就会,拿走不谢,允许白嫖!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/825840.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从封面开始,打造一个引人注目的视频作品

在如今的互联网时代&#xff0c;短视频已经成为了人们生活中不可或缺的一部分。而一个吸引人的视频封面可以让你的作品更具吸引力&#xff0c;吸引更多观众的点击。那么&#xff0c;如何制作一个令人印象深刻的视频封面呢&#xff1f;下面就让我们揭秘一些实用技巧吧&#xff0…

Chrome 75不支持保存成mhtml的解决方法

在Chrome 75之前&#xff0c;可以设置chrome://flags -> save as mhtml来保存网页为mhtml。 升级新版&#xff0c;发现无法另存为/保存网页为MHTML了。 在网上搜索无果后&#xff0c;只得从chromium项目的commits中查找&#xff0c;原来chrome搞了个"Chrome Flag Owner…

新闻稿发布中,首发来源和转载是什么意思?

一秒推小编告诉您&#xff0c;在新闻稿发布中&#xff0c;首发来源和转载是两个常用的词语&#xff0c;它们有着不同的含义和使用场合。#新闻稿发布# 首发来源指的是原创的、第一次发布该条新闻的媒体或媒体机构。比如&#xff0c;如果一家新闻机构发布了一则新闻稿&#xff0c…

圆圈中最后剩下的数字(约瑟夫环)——剑指 Offer 62

文章目录 题目描述法一 数学递归 题目描述 法一 数学递归 int lastRemaining(int n, int m){return f(n, m);}int f(int n, int m){if(n1){return 0;}int x f(n-1, m);return (mx)%n;}

浅析视频技术与AI智能感知与生鲜供应链的数字化应用

一、行业背景 近年来&#xff0c;我国肉类、水果、蔬菜、水产品、乳品、速冻食品等生鲜市场需求快速增长&#xff0c;营商环境持续改善&#xff0c;推动冷链物流较快发展&#xff0c;但仍面临不少突出瓶颈和痛点难点卡点问题&#xff0c;难以有效满足市场需求。传统生鲜食材供…

【EI/SCOPUS征稿】2023年通信网络与机器学习国际学术会议(CNML 2023)

2023年通信网络与机器学习国际学术会议&#xff08;CNML 2023&#xff09; 2023 International Conference on Communication Networks and Machine Learning 随着数据流量的显著增长&#xff0c;新的通信应用程序不断出现&#xff0c;并产生更多的数据流量&#xff0c;这些数…

单片机外部晶振故障后自动切换内部晶振——以STM32为例

单片机外部晶振故障后自动切换内部晶振——以STM32为例 作者日期版本说明Dog Tao2023.08.02V1.0发布初始版本 文章目录 单片机外部晶振故障后自动切换内部晶振——以STM32为例背景外部晶振与内部振荡器STM32F103时钟系统STM32F407时钟系统 代码实现系统时钟设置流程时钟源检测…

RabbitMQ输出日志配置

参考地址rabbitmq启用日志功能记录消息队列收发情况_rabbitmq开启日志_普通网友的博客-CSDN博客 启用日志插件命令 # 设置用户权限 rabbitmqctl set_user_tags mqtt-user administrator rabbitmqctl set_permissions -p / mqtt-user ".*" ".*" ".*&…

总结动量定理的交易规则

动量定理策略是一种趋势策略&#xff0c;基于周线图中的“三烛台”形态(上涨或下跌)进行交易。Forexclub总结的交易规则如下&#xff1a; 1. 下一个烛台必须比上一个烛台大&#xff0c;以确认趋势存在。 2. 多奇烛台(不带主体的烛台)不考虑在内。 3. 止损设置在序列中第一根蜡…

线性代数 | 机器学习数学基础

前言 线性代数&#xff08;linear algebra&#xff09;是关于向量空间和线性映射的一个数学分支。它包括对线、面和子空间的研究&#xff0c;同时也涉及到所有的向量空间的一般性质。 本文主要介绍机器学习中所用到的线性代数核心基础概念&#xff0c;供读者学习阶段查漏补缺…

AIGC风起,快看能否走出“水逆周期”?

文|琥珀消研社 作者| 石榴 7月28日-31日&#xff0c;终于回归线下的ChinaJoy 2023在上海新国际博览中心举行&#xff0c;不仅吸引了广大二次元爱好者&#xff0c;还有不少互联网大厂的影子&#xff0c;比如腾讯、网易、美团、哔哩哔哩等等。 而在同月快看世界举办的第二届KK…

【BASH】回顾与知识点梳理(六)

【BASH】回顾与知识点梳理 六 六. 管线命令 (pipe)6.1 撷取命令&#xff1a; cut, grepcutgrep 6.2 排序命令&#xff1a; sort, wc, uniqsortuniqwc 6.3 双向重导向&#xff1a; tee6.4 字符转换命令&#xff1a; tr, col, join, paste, expandtrcoljoinpasteexpand 6.5 分区命…

VSCode和QT联合开发

提示&#xff1a;本文为学习记录&#xff0c;若有错误&#xff0c;请联系作者&#xff0c;谦虚受教。 文章目录 前言一、VSCODE下载二、使用步骤1.下载扩展 二、新建工程1.新建文件夹2.新建工程3.UI界面文件操作4.效果 总结 前言 一、VSCODE下载 下载地址 二、使用步骤 1.下…

Docker Dockerfile 语法与指令

一、简介 Docker 镜像原理、容器转成镜像 随便找个案例&#xff0c;进入 https://hub.docker.com/ 搜索 centos&#xff0c;然后随便找个版本&#xff08;例如&#xff1a;centos7&#xff09;点击一下&#xff0c;就会进入 centos7 的 dockerfile 文件&#xff1a; // 空镜像…

用python画满天星花朵,用python绘制漫天雪花

这篇文章主要介绍了用python绘制漫天雪花&#xff0c;具有一定借鉴价值&#xff0c;需要的朋友可以参考下。希望大家阅读完这篇文章后大有收获&#xff0c;下面让小编带着大家一起了解一下。 import turtle as t import random t.pensize(1) t.screensize(800,800,black) def s…

全新升级!腾讯云大数据 ES Serverless 服务开启日志分析新体验

2023年8月1号&#xff0c;腾讯云大数据 ES Serverless服务重磅发布&#xff0c;拥有自动弹性、完全免运维、极致成本、Elastic Stack生态兼容、灵活易用、稳定可靠等优势特性&#xff0c;提供开箱即用的云端Elasticsearch体验&#xff0c;助力企业高效上云&#xff01; 自建El…

【云原生K8s】初识Kubernetes的理论基础

K8S由google的Borg系统(博格系统&#xff0c;google内部使用的大规模容器编排工具)作为原型&#xff0c;后经GO语言延用Borg的思路重写并捐献给CNCF基金会开源。 云原生基金会&#xff08;CNCF&#xff09;于2015年12月成立&#xff0c;隶属于Linux基金会。CNCF孵化的第一个项目…

swing布局详解

1. 布局管理器接口 &#xff08;1&#xff09;说明 布局管理器接口为LayoutManager和LayoutManager2&#xff0c;LayoutManager2是LayoutManager的子类。 &#xff08;2&#xff09;常用方法 方法描述LayoutManageraddLayoutComponent(String name, Component comp) removeL…

使用HBuilderX如何创建一个vue项目?

要在HBuilderX中创建一个Vue项目&#xff0c;可以按照以下步骤进行操作&#xff1a; 1.【创建】打开HBuilderX&#xff0c;点击菜单栏中的"文件"&#xff0c;然后选择"新建"&#xff0c;再选择"项目"。 在弹出的对话框中&#xff0c;选择"…

【简单认识GFS分布式文件系统】

文章目录 一.GlusterFS 概述1.GlusterFS简介2.特点3.GlusterFS 术语4.模块化堆栈式架构5.GlusterFS 的工作流程6.GlusterFS的卷类型1、**分布式卷&#xff08;Distribute volume&#xff09;**2、条带卷&#xff08;Stripe volume&#xff09;3、复制卷&#xff08;Replica vol…