RabbitMQ消息丢失的情况,以及如何通过代码解决

news2025/1/9 15:18:42

目录

RabbitMQ消息丢失问题:

代码部分:

完整代码:

RabitMQConfig:

CourseMQListener:

生产者跟交换机通信的消息丢失解决 :

交换机跟消息队列的消息丢失:

消息队列跟消费者的消息丢失:

消费者服务器宕机:

重复消息的问题:


RabbitMQ消息丢失问题:

 

代码部分:

完整代码:

RabitMQConfig:

package com.dmdd.educourseservice.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.pool.ConnFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ的配置
 */
@Slf4j
@Configuration
public class RabbitMQConfig {

    public static final String QUEUE_COURSE_SAVE = "queue.course.save";
    public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
    public static final String KEY_COURSE_SAVE = "key.course.save";
    public static final String KEY_COURSE_REMOVE = "key.course.remove";
    public static final String COURSE_EXCHANGE = "edu.course.exchange";

    @Bean
    public Queue queueCourseSave() {
        return new Queue(QUEUE_COURSE_SAVE);
    }

    @Bean
    public Queue queueCourseRemove() {
        return new Queue(QUEUE_COURSE_REMOVE);
    }

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(COURSE_EXCHANGE);
    }

    @Bean
    public Binding bindCourseSave() {
        return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);
    }

    @Bean
    public Binding bindCourseRemove() {
        return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);
    }
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory factory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
        //设置RabbitTemplate支持事务
//        rabbitTemplate.setChannelTransacted(true);
//        return rabbitTemplate;
        //设置发布确认回调接口 参数1 消息相关信息(id) 参数2 是否成功 参数3 失败原因    生产者与交换机之间
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack){
                log.info("消息投递成功{}",correlationData);
            }
            else {
                //消息补偿的逻辑,如:保存失败的信息,通过定时任务重新投递
                log.info("消息投递失败{}",cause);
            }
        });
        //设置消息发送失败返回回调,参数:消息内容,响应代码,响应文本,交换机,路由键  交换机与队列之间
        rabbitTemplate.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) -> {
            //消息补偿的逻辑,如:保存失败消息,通过定时任务重新投递
            log.error("消息发送失败,message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}",message, replyCode, replyText, exchange, routingKey);
        }));
        return  rabbitTemplate;
    }


}

CourseMQListener:

package com.dmdd.edusearchservice.listener;

import com.alibaba.fastjson.JSON;
import com.dmdd.common.entity.Course;
import com.dmdd.edusearchservice.service.CourseIndexService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;


@Slf4j
@Component
public class CourseMQListener {

    //对课程进行添加或更新的队列名
    public static final String QUEUE_COURSE_SAVE = "queue.course.save";
    //对课程进行删除的队列名
    public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
    //对课程进行添加或更新的路由键
    public static final String KEY_COURSE_SAVE = "key.course.save";
    //对课程进行删除的路由键
    public static final String KEY_COURSE_REMOVE = "key.course.remove";
    //课程交换机名
    public static final String COURSE_EXCHANGE = "edu.course.exchange";

    @Autowired
    private CourseIndexService courseIndexService;
    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 监听课程添加操作
     */
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"),
                    exchange = @Exchange(value = COURSE_EXCHANGE, type = ExchangeTypes.TOPIC, ignoreDeclarationExceptions = "true")
                    , key = KEY_COURSE_SAVE)})
    public void receiveCourseSaveMessage(String data, Channel channel, Message message) {
        try {
            log.info("课程保存:{}", data);
            //获得消息的id
            String id = message.getMessageProperties().getHeader("spring_returned_message_correlation");
            //将id保存到redis中,如果id存在就不执行业务逻辑
            ValueOperations<String, String> ops = redisTemplate.opsForValue();
            //如果id不存在就,设置id为键,执行业务逻辑
            if (ops.setIfAbsent(id, "0", 1000, TimeUnit.SECONDS)) {
                //将json转换为课程对象
                Course course = JSON.parseObject(data, Course.class);
                courseIndexService.saveCourse(course);
                log.info("上传到elasticsearch成功");
                //逻辑执行完将消息状态设置为1
                ops.set(id, "1", 1000, TimeUnit.SECONDS);
                //手动确认
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                //id存在,重复消息
                log.info("消息已经存在,重复消息 {}", id);
                if ("1".equals(ops.get(id))) {
                    log.info("该消息已经执行完毕 {}", id);
                    //手动确认
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                }
            }
        } catch (Exception ex) {
            log.error("接收消息出现异常", ex);
        }
    }

    /**
     * 监听课程删除操作
     */
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),
                    exchange = @Exchange(value = COURSE_EXCHANGE, type = ExchangeTypes.TOPIC, ignoreDeclarationExceptions = "true"),
                    key = KEY_COURSE_REMOVE)})
    public void receiveCourseDeleteMessage(Long id) {
        try {
            log.info("课程删除完成:{}", id);
            courseIndexService.removeCourse(String.valueOf(id));
        } catch (Exception ex) {
            log.error("接收消息出现异常", ex);
        }
    }
}

生产者跟交换机通信的消息丢失解决 :

//设置发布确认回调接口 参数1 消息相关信息(id) 参数2 是否成功 参数3 失败原因    生产者与交换机之间
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (ack){
        log.info("消息投递成功{}",correlationData);
    }
    else {
        //消息补偿的逻辑,如:保存失败的信息,通过定时任务重新投递
        log.info("消息投递失败{}",cause);
    }

通过ack响应确认交换机是否收到消息。可以方便排查

交换机跟消息队列的消息丢失:

//设置消息发送失败返回回调,参数:消息内容,响应代码,响应文本,交换机,路由键  交换机与队列之间
rabbitTemplate.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) -> {
    //消息补偿的逻辑,如:保存失败消息,通过定时任务重新投递
    log.error("消息发送失败,message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}",message, replyCode, replyText, exchange, routingKey);
}));

查询交换机与消息队列可能丢失消息的问题

消息队列跟消费者的消息丢失:

消费者服务器宕机:

//手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

 消息队列发送消息给消费者时,消费者服务器宕机了,可以通过该代码解决

重复消息的问题:

@Test
void contextLoadas(){
    Course course = new Course();
    course.setId(222);
    course.setCourseName("测试mq+redis完全版");
    String json = JSON.toJSONString(course);
    rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,
            RabbitMQConfig.KEY_COURSE_SAVE,
            json,
            new CorrelationData(UUID.randomUUID().toString()));
}
if (ops.setIfAbsent(id, "0", 1000, TimeUnit.SECONDS)) {
    //将json转换为课程对象
    Course course = JSON.parseObject(data, Course.class);
    courseIndexService.saveCourse(course);
    log.info("上传到elasticsearch成功");
    //逻辑执行完将消息状态设置为1
    ops.set(id, "1", 1000, TimeUnit.SECONDS);
    //手动确认
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
    //id存在,重复消息
    log.info("消息已经存在,重复消息 {}", id);
    if ("1".equals(ops.get(id))) {
        log.info("该消息已经执行完毕 {}", id);
        //手动确认
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

 消息发出去之后,可能出现网络抖动或延迟问题,导致消息发送超时,重复发送相同消息,因此我们可以发送消息时,传一个加密过的uuid,跟随消息内容一起过去,将uuid作为redis的键来储存信息,并设定状态码为0,当消费者服务将该消息执行完后,状态码为1,如果在执行服务期间有重复的消息发送过来,就会直接打回。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/417715.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

自动处理【支付宝交易支付投诉管理系统】配置指南

大家好&#xff0c;我是小悟 已经有小伙伴开始使用自动处理【支付宝交易支付投诉管理系统】&#xff0c;所以详细介绍一下如何配置。 阅读这篇文章之前&#xff0c;结合这篇【连夜干出来一个自动处理【支付宝交易支付投诉管理系统】&#xff0c;支持多商户】干货食用更佳。 连…

Filter 过滤器 Listener 监听器

Filter web中的过滤器当用户访问服务器资源时&#xff0c;过滤器将请求拦截下来&#xff0c;完成一些通用的操作应用场景如&#xff1a;登录验证、统一编码处理、敏感字符过滤 编写filter对目标资源servlet进行拦截 1. 编写java类&#xff0c;实现filter接口 public class Qu…

智慧医院人员定位系统解决方案,助力医院安全管理智能化

随着经济的发展与生活质量的提升&#xff0c;人们对医疗健康的重视度越来越高&#xff0c;医疗行业也因此蓬勃发展起来。然而&#xff0c;不断扩大的经营规模也给医院安全管理带来挑战和难题。 医院安全管理痛点 1、医疗事件信息获取不及时甚至存在瞒报现象&#xff0c;管理者…

yc博客项目创建-白手起家

初始化项目 1、码云创建代码库 2、下载码云项目到本地 3、IDEA直接生成springboot项目 接入mysql 1、配置文件 2、代码配置 启动项目 访问项目 访问连接&#xff1a; http://localhost:8089/yc-blog/index/listlistContent 注意点&#xff1a;server.servlet.context-path…

Redis用于全局ID生成器、分布式锁的解决方案

全局ID生成器 每个店铺都可以发布优惠卷 当用户抢购时&#xff0c;就会生成订单并保存到tb_voucher_order这张表中&#xff0c;而订单表如果使用数据库自增id就存在一些问题&#xff1a; 1.id的规律性太明显 2.受单表数据量的限制 全局ID生成器&#xff0c;是一种在分布式系…

极光笔记 | 如何在Shopify中使用EngageLab (下)

Sendgird发布的《2022 Global Messaging Engagement Report》中揭示了世界各地的用户更喜欢用哪种方式与品牌互动&#xff0c;结论是&#xff1a;“电子邮件仍然是第一名&#xff08;短信紧随其后&#xff09;”。4800多名受访者中&#xff0c;有18%的人将电子邮件列为他们最常…

普通人是否能从ChatGPT中分一杯羹?

ChatGPT3.0刚刚推出&#xff0c;最开始的时候&#xff0c;人们只是将ChatGPT看作一个很会聊天的机器人&#xff0c;无论问题多么天马行空&#xff0c;它的答案看上去都有理有据。后来&#xff0c;像打开潘多拉魔盒一样&#xff0c;很多人开始拿它编大纲、撰写文案、编代码、创作…

Docker本地推送到hub,以及上传时遇到的问题解决

1.在本地创建一个 Dockerfile FROM ubuntu:latest RUN apt-get update && apt-get install -y curl CMD ["curl", "https://www.baidu.com"]2.在本地构建 Docker 镜像 在创建本地docker镜像的时候[TAG] .和[TAG] /PATH/TO 需要注意dockerfile文件…

ATTCK v12版本战术介绍——防御规避(二)

一、引言 在前几期文章中我们介绍了ATT&CK中侦察、资源开发、初始访问、执行、持久化、提权战术、防御规避&#xff08;一&#xff09;理论知识及实战研究&#xff0c;本期我们为大家介绍ATT&CK 14项战术中防御规避战术&#xff08;二&#xff09;&#xff0c;包括防御…

【数据结构】顺序栈和链栈的基本操作(定义,初始化, 入栈,出栈,取栈顶元素,遍历,置空)

&#x1f38a;专栏【数据结构】 &#x1f354;喜欢的诗句&#xff1a;更喜岷山千里雪 三军过后尽开颜。 &#x1f386;音乐分享【勋章】 大一同学小吉&#xff0c;欢迎并且感谢大家指出我的问题&#x1f970; 目录 ⭐栈的分类 ✨顺序栈 &#x1f388;优点&#xff1a; &…

离线安装k8sv1.20.5版本并部署服务

注意&#xff1a;我这里的离线安装包是V1.20.5的&#xff0c;单安装一个master节点并部署服务&#xff0c;保证可以使用。如果安装集群也是可以的&#xff0c;但是需要把离线包上传到所有的node节点&#xff0c;导入&#xff0c;最后把node节点接入到K8S集群即可&#xff0c;本…

js flyout 2: VScroll

目录版权描述测试页面showFlyout问题1 - scroll 实现可能不准?问题2 - 容器内容重排可导致浮层错位关于重排小结附录 - 完整代码版权 本文为原创, 遵循 CC 4.0 BY-SA 版权协议, 转载需注明出处: https://blog.csdn.net/big_cheng/article/details/130101031. 文中代码属于 pu…

【致敬未来的攻城狮计划】学习总结

文章目录【致敬未来的攻城狮计划】学习总结前言学习总结一、RT-Thread二、RA2E1开发板三、学习移植RT-Thread四、学习RT-Thread设备五、其他收获六、总结【致敬未来的攻城狮计划】学习总结 &#x1f680;&#x1f680;开启攻城狮的成长之旅&#xff01;这是我参与的由 CSDN博客…

【RabbitMQ学习日记】——死信队列与延迟队列

一、死信队列 1.1 相关概念 死信&#xff0c;顾名思义就是无法被消费的消息&#xff0c;字面意思可以这样理解&#xff0c;一般来说&#xff0c;producer 将消息投递到 broker 或者直接到 queue 里了&#xff0c;consumer 从 queue 取出消息进行消费&#xff0c;但某些时候由…

云擎未来,智信天下 | 2023移动云大会来了!

新三年&#xff0c;新征程 2023年作为新三年开局之年 移动云又将以怎样的 全新品牌形象、全新战略规划 向“一流云服务商”战略目标勇毅前行&#xff1f; 答案就在这里&#xff1a; 2023移动云大会&#xff0c;官宣定档&#xff01; 2023.4.25 - 4.26 苏州金鸡湖国际会…

MATLAB配置C/C++库(Visual Studio,MinGW-w64 C/C++ 编译器)问题(包括低版本matlab配置高版本VS)

问题描述 使用matlab加载C语言的库函数时&#xff0c;需要提前配置好C/C编译器&#xff0c;否则在matlab中使用 loadlibrary 加载C /C库中的函数时候&#xff0c;会报错&#xff1a; “未找到支持的编译器或 SDK。您可以安装免费提供的 MinGW-w64 C/C 编译器&#xff1b;请参…

软考第三章 广域通信网

广域通信网 1.公共交换电话网 公共交换电话网PSTN&#xff1a;是为了话音通信而建立的网络&#xff0c;在有些地方用户仍然通过电话线拨号上网 1.1 电话系统的结构 电话系统是一个高度冗余的分级网络。用户电话通过一对铜线连接到最近的端局。 公共电话网由本地网和长途网组…

一文速学数模-最优化算法(二)梯度下降算法一文详解+Python代码

目录 前言 一、梯度下降法简述 二、梯度下降算法原理理解 1.梯度 2.梯度定义 3.梯度下降 4.损失函数(loss function) 5.学习率(步长) 三、梯度下降算法代码展示 消失和爆炸梯度 前言 最近会不断更新深度学习系列文章(全实战性可运行代码)加入到我的一文速学-数学建模…

Git项目同时推送到GitHub和Gitee详细操作

文章目录前言一、创建仓库【Create a new repository】二、初始化三、配置公钥四、密钥验证五、代码推送总结前言 将Git项目同时推送到GitHub和Gitee的好处如下&#xff1a; 提高代码可见性和协作性&#xff1a;GitHub和Gitee都是知名的代码托管平台&#xff0c;推送代码到这两…

大数据能力提升项目|学生成果展系列之五

导读为了发挥清华大学多学科优势&#xff0c;搭建跨学科交叉融合平台&#xff0c;创新跨学科交叉培养模式&#xff0c;培养具有大数据思维和应用创新的“π”型人才&#xff0c;由清华大学研究生院、清华大学大数据研究中心及相关院系共同设计组织的“清华大学大数据能力提升项…