Spring boot 实践Rabbitmq消息防丢失

news2025/1/13 11:38:46

之前看很多网上大佬的防丢失的文章,文章中理论知识偏多,所以自己想着实践一下,实践过程中也踩了一些坑,因此写出了这篇文章。如果文章有误人子弟的地方,望在评论区指出。

导致消息出现丢失的原因

  1. 发送时失败,指发送端发送完消息准备到达消息队列的过程中,因网络波动、消息队列服务宕机等,消息队列服务无法接收消息,所以导致了丢失。

  2. 到达时宕机,消息队列服务接收到消息之后,如果没有开启持久化,消息会存储在内存中(当然内存吃紧的话,也会转入磁盘,缓解内存),如果这个时候服务挂了,那么内存中的消息就会丢失。

  3. 发送到消费端失败,消费端接收到了消息的时候,消费端服务挂了,而rabbitmq默认自动ack,也就是说rabbitmq发送到消费端,一旦认定了消费端接收了,无论有无消费成功,rabbitmq都认为是发送成功。

下面我们以这三种情况进行实践。

环境

jdk1.8 Spring boot 2.3.7.RELEASE Spring-boot-starter-amqp 2.3.7.RELEASE Rabbitmq 3.7.7

准备工作

我事先准备了好了交换机以及队列

  • 交换机:message.log.test.exchange和message.log.test2.exchange

  • 队列:message.loss.test.queue

其中message.loss.test.queue和message.log.test.exchange是绑定关系,而 message.log.test2.exchange没有绑定队列

1.发送时失败

发送时失败,rabbitmq有两种情况是属于发送时失败。

  1. 消息未到rabbitmq的交换机(exchange)

  2. 消息到达了rabbitmq的交换机(exchange),但是没有到达队列(queue)

第一种的解决方式是使用confirm机制。第二种解决方式则是使用return机制。

使用confirm机制

模拟场景

confirm机制是当发送端的消息没有到达rabbitmq的交换机(exchange)时,会触发confirm方法,告诉发送端该消息没有到达rabbitmq,需要做业务处理。 这里我们发送消息到rabbitmq不存在的交换机上,就可以模拟上述场景。

实现RabbitTemplate.ConfirmCallback接口

/**
 * 当消息没有到达Rabbitmq的交换机时触发该方法(当然到达了也会触发,)
 */
@Component
public class ConfirmCallBack implements RabbitTemplate.ConfirmCallback {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){

        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     *
     * @param correlationData 消息属性体
     * @param ack 是否成功,成功到达true,没有到达,false
     * @param cause rabbitmq自身给的信息
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
         
        //第一个坑,如果发送端发送消息时没有对correlationData进行处理,conirm方法接收到的对象都会是null
        //当接收失败并且correlationData对象为null,证明目前已经无法追溯回业务,可以做业务日志处理
        if(!ack&&correlationData==null){
            System.out.println(cause);
            //日志处理。。。

            return;
        }

        //如果接收失败
        if(!ack){
            System.out.println("消息Id:"+correlationData.getId());
            Message message=correlationData.getReturnedMessage();
            System.out.println("消息体:"+new String(message.getBody()));
            //这里可以持久化业务消息体到数据库,然后定时去进行补偿处理或者重试等等
            return;
        }

        //处理完成

    }
}

发送端代码

/**
 * 消息的推送
 * @return
 */
@PostMapping("push")
public boolean push(){

    TestMessage testMessage=new TestMessage();
    testMessage.setName("mq名称");
    testMessage.setBusinessId("业务Id");

    //定义CorrelationData对象以及消息属性。不然comfirm方法无论失败还是成功,CorrelationData参数永远是null
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    //传递业务数据
    correlationData.setReturnedMessage(new Message(JSONObject.toJSON(testMessage).toString().getBytes(StandardCharsets.UTF_8),new MessageProperties()));

  //发送消息(这里发送给了message.log.test.exchange11交换机,但实际rabbitmq并不存在)template.convertAndSend("message.log.test.exchange11","message_loss_test",testMessage,correlationData);

    return true;
}

这里是我踩的第一个坑,如果发送端不定义correlationData,那么confirm接收到的correlationData对象参数 都会是null

实现效果

使用return机制

模拟场景

当消息到达了rabbitmq的交换机的时候,但是又没有到达队列,那么就会触发return方法。 下面我们定义一个没有绑定队列的交换机,然后发送消息到交换机,就可以模拟上述场景

实现RabbitTemplate.ReturnCallback

/**
 * 当消息没有到达Rabbitmq的队列时就会触发该方法
 */
@Component
public class ReturnCallBack implements RabbitTemplate.ReturnCallback {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnCallback(this);
    }


    /**
     * @param message    消息体
     * @param replyCode  返回代码
     * @param replyText  返回文本
     * @param exchange   交换机
     * @param routingKey 发送方定义的路由key
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息标识:" + message.getMessageProperties().getDeliveryTag());
        String messageBody = null;
        try {
            messageBody = new String(message.getBody(), "UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        System.out.println("消息:" + messageBody);
        System.out.println(replyCode);
        System.out.println(replyText);
        System.out.println(exchange);
        System.out.println(routingKey);

    }
}

发送端代码

/**
 * 消息的推送
 * @return
 */
@PostMapping("push2")
public boolean push2(){

    TestMessage testMessage=new TestMessage();
    testMessage.setName("mq名称2");
    testMessage.setBusinessId("业务Id");

    template.convertAndSend("message.log.test2.exchange","message_loss_test",JSONObject.toJSON(testMessage).toString());

    return true;
}

这里需注意消息体需要JSON序列化,不然returnedMessage方法接收的消息body会是乱码

实现效果

2.rabbitmq服务挂了,造成内存的消息丢失。

这个开启rabbitmq的持久化机制就好了,开启之后消息到达rabbitmq服务,会实时转入磁盘。这里怎么设置就不多说了,网上挺多文章可以解答。

不过即使开启了还是会有一种情况会造成消息丢失,那就是消息即将要持久化到磁盘的那一刻,服务挂了,就会造成丢失,不过这种情况我也不知道怎么模拟,所以就暂不实践了。

3.发送到消费端消费失败

上面提到默认情况下rabbitmq使用的是自动ack的方式,我们将它改成手动ack的方式,就可以解决这个问题。

修改application.yml配置文件

rabbitmq:
 listener:
  simple:
    #开启手动确认
    acknowledge-mode: manual
    #开启失败后的重试机制
    retry:
      enabled: true
      #最多重试3次
      max-attempts: 3

效果

​效果流程:

  1. 第一次用Postman请求之后,控制台显示了消息被消费的信号。

  2. 然后去查看rabbitmq后台管理刚刚被消费的消息以及变为Unacked

  3. 停止程序后(关闭消费端),过一阵子,后台管理显示消息变回了Ready,也就是说重新回到了队列。

  4. 重新启动程序(开启消费段),消息被重新消费。

总而言之,如果消费端没有做手动确认的操作,那么在消费端还没关闭之前,消息会变成Unacked,不会再次被消费,但一旦消费端关闭了,消息会重新回到队列,让消费端消费。

2.消费过程中,触发了未知异常,代码没有try catch

/**
 * 消费
 * @param testmessage 消息体
 * @param message 消息属性
 * @param channel mq通道对象
 */
@RabbitListener(queues = {"message.loss.test.queue"})
public void test(TestMessage testmessage, Message message, Channel channel) throws IOException {
    System.out.println("消费testmessage消息:"+testmessage.getName());
    //故意触发异常
    if(!StringUtils.isEmpty(testmessage.getName())){

        throw new RuntimeException("11211");
    }
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

效果1

​上面的效果图显示,我在触发了异常之后,消息重试了三次,也就是我在application.yml 配置的重试三次

如果我去掉重试机制会是什么效果。

效果2

​效果和忘记做ack操作的效果一样,消息没有ack后,消息会变成Unacked状态,消费端关闭后消息会重新回到队列,然后重新链接的时候,就会再消费一次。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/24520.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

295348-87-7,AF 594 Succinimidyl Ester可用于成像和流式细胞分析

理论分析: 中文名:AF 594活性酯 英文名:AF 594 Succinimidyl Ester,Alexa Fluor 594 NHS Ester,AF 594 NHS Ester CAS号:295348-87-7 化学式:C39H37N3O13S2 分子量:819.85 ex/em : 5…

招投标业务总结

最近接了一个招标投标的项目,开发完成后,整个招投标的流程也就理清楚了,简介一下业务过程。业务主流程可以分为3个阶段: 招标方建立招标项目发布招标公告投标人参与竞标招标方开标评选公示 系统可以划分为两套,一个是给招标方使…

若依框架解读(微服务版)——2.模块间的调用逻辑(ruoyi-api模块)(OpenFeign)

模块之间的关系 我们可以了解到一共有这么多服务,我们先启动这三个服务 其中rouyi–api模块是远程调用也就是提取出来的openfeign的接口 ruoyi–commom是通用工具模块 其他几个都是独立的服务 ruoyi-api模块 api模块当中有几个提取出来的OpenFeign的接口 分别为文件…

华为机试 - ABR 车路协同场景

目录 题目描述 输入描述 输出描述 用例 题目解析 算法源码 题目描述 数轴有两个点的序列 A{A1, A2, …, Am}和 B{B1, B2, ..., Bn}, Ai 和 Bj 均为正整数, A、 B 已经从小到大排好序, A、 B 均肯定不为空, 给定…

大数据培训课程Reduce Join案例实操

Reduce Join案例实操 1.需求 表4-4 订单数据表t_order idpidamount100101110020221003033100401410050251006036 表4-5 商品信息表t_product pidpname01小米02华为03格力将商品信息表中数据根据商品pid合并到订单数据表中。 表4-6 最终数据形式 idpnameamount…

2022我的前端面试总结

Webpack Proxy工作原理?为什么能解决跨域 1. 是什么 webpack proxy,即webpack提供的代理服务 基本行为就是接收客户端发送的请求后转发给其他服务器 其目的是为了便于开发者在开发模式下解决跨域问题(浏览器安全策略限制) 想…

盘点 | 跨平台桌面应用开发的5大主流框架

受益于开源技术的发展,以及响应快速开发的实际业务需求,跨平台开发不仅限于移动端跨平台,桌面端虽然在市场应用方面场景不像移动端那么丰富,但也有市场的需求。 相对于个人开发者而言,跨平台框架的使用,主…

Vue开发 提交后台,二维码,自定义

1. 修改title和图标 资源可以放在static下面,给一个小的: 直接再index里面改: 不生效,需要在 vue.config.js 中增加: module.exports {pwa: {iconPaths: {favicon32: logo.png,favicon16: logo.png,appleTouchIcon:…

阿里巴巴全新SpringCloud实战笔记(全彩版)GitHub狂揽70000标星

最近小编淘到一份宝贝! 先看看目录: 这份手册真的非常全面,涵盖了所有SpringCloud所有的内容,限于文章篇幅原因,只能以截图的形式展示出来,有需要的小伙伴可以文末获取↓↓↓ 直接展示内容: …

react redux 状态管理

1.store store是一个状态管理容器,它通过createStore创建,createStore接收initialState和reducer两个参数。它暴露了4个api分别是: getState() dispatch(action) subscribe(listener) replaceReducer 前三个是比较常用的api,之…

葡萄糖-聚乙二醇-二茂铁Ferrocene-PEG-Glucose

葡萄糖-聚乙二醇-二茂铁Ferrocene-PEG-Glucose,二茂铁,是一种具有芳香族性质的有机过渡金属化合物,化学式为Fe(C5H5)2,常温下为橙黄色粉末,有樟脑气味。熔点172℃-174℃,沸点249℃,100℃以上能升…

腾讯云服务器+宝塔+后端+前端发布

1、申请云服务器。登陆。 https://cloud.tencent.com/ 创建实例 最好重置密码,并记住。 配置安全组,当我们是学习的时候,全部开放好了。 有些版本是去“防火墙”那里配置。 轻量应用服务器(试用的) 2、安装Docker。在…

Oracle LiveLabs实验:Load and Analyze Your Data with Autonomous Database

概述 本研讨会中的实验将引导您完成开始使用 Oracle 自治数据库的所有步骤。 首先,您将创建一个 Oracle 自治数据库实例。 然后,您将练习使用自治数据库工具和 API 从不同位置以不同格式加载数据的几种方法。 您将使用 SQL 分析数据并使用 Oracle Analy…

ShardingSphere笔记(二):自定义分片算法 — 按月分表

ShardingSphere笔记(二):自定义分片算法 — 按月分表 文章目录ShardingSphere笔记(二):自定义分片算法 — 按月分表一、准备二、分表逻辑三、自定义分片算法步骤(以按月分表为例)1. …

【AI工程】08-MLOps工具-在Charmed Kubeflow上运行MindSpore

作者:王磊 更多精彩分享,欢迎访问和关注:https://www.zhihu.com/people/wldandan 在【AI工程】02-AI工程(AI Engineering)面面观中,提到Gartner把AI工程化作为未来重要战略技术趋势,Gartner认为…

关于webpack(v5.74.0)的模块联邦原理

在webpack中模块联邦的实现主要依赖于两个插件ContainerReferencePlugin和ContainerPlugin,ContainerPlugin是用来添加入口依赖并给当前依赖添加异步依赖,ContainerReferencePlugin用来添加解析用户的请求并分析是否是远程模块,然后加载远程模…

使用 JPA、Hibernate 和 Spring Data JPA 进行审计

1. 概述 在ORM的上下文中,数据库审计意味着跟踪和记录与持久实体相关的事件,或者只是实体版本控制。受 SQL 触发器的启发,这些事件是对实体的插入、更新和删除操作。数据库审核的好处类似于源版本控制提供的好处。 在本教程中,我…

Shelby American 汽车 NFT 系列来袭!

我们在 The Sandbox 上推出 Shelby NFT 作品集,加入我们吧!该系列包含 Carroll Shelby 制造的一些最稀有和最抢手的汽车,也是现实生活中最具收藏价值的汽车。这些汽车构成了最伟大的汽车历史,也是传奇人物 Carroll Shelby 的伟大代…

为什么开源在线表单工具能做好数据管理?

在数字化时代,数据的有效应用和管理可以说是企业的无形资产,做好数据管理既能提升办公效率,又能帮助企业从规律的数字化管理中获取高效的管理策略。那么,什么样的开源在线表单工具可以实现这一目的?对于企业而言&#…

Axure药企内部管理平台+企业内部管理系统平台

这是一款根据药企的需求设计的内部管理系统,此系统主要是针对市场部和销售部的管理,此作品选择了管理员和地区经理两个角色进行了设计, 设计软件:Axure8.1(兼容9和10) 作品类型:实战原型 其主要…