RabbitMQ防止消息丢失

news2024/11/18 23:48:28

生产者没有成功把消息发送到MQ
丢失的原因 :因为网络传输的不稳定性,当生产者在向MQ发送消息的过程中,MQ没有成功接收到消息,但是生产者却以为MQ成功接收到了消息,不会再次重复发送该消息,从而导致消息的丢失。

解决办法 : 有两个解决办法:事务机制和confirm机制,最常用的是confirm机制(发布确认机制)。

注意:

          RabbitMQ的事务机制是同步的,很耗型能,会降低RabbitMQ的吞吐量。

          confirm机制是异步的,生成者发送完一个消息之后,不需要等待RabbitMQ的回调,就可以发送下一个消息,当RabbitMQ成功接收到消息之后会自动异步的回调生产者的一个接口返回成功与否的消息。

两个机制说明如下:

confirm(发布确认)机制
解释:RabbitMQ可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,生产者每次写的消息都会分配一个唯一的 id,如果消息成功写入 RabbitMQ 中,RabbitMQ 会给生产者回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,生产者可以重新发送。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么可以重发。

代码

yml配置

----------------------------------------------------------------------------------------------------

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
/**
* 交换机回滚
*/
@Component
@Slf4j
public class ExchangeCallback implements RabbitTemplate.ConfirmCallback{
    /* correlationData 内含消息内容
     * ack 交换机接受成功或者失败。 true表示交换机接受消息成功, false表示交换机接受失败
     * cause 表示失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("hello world");
        String id = correlationData.getId();
        String message = new String(correlationData.getReturnedMessage().getBody());
        if (ack){
            log.info("交换机收到消息id为{}, 消息内容为{}", id, message);
        }else {
            log.info("交换机未收到消息id为{}, 消息内容为{}, 原因为{}", id, message, cause);
        }
    }
}

----------------------------------------队列防止消息丢失----------------------------------------------------------------

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

/**
 * 队列防止消息丢失
 */
@Slf4j
@Component
public class QueueCallback implements RabbitTemplate.ReturnCallback{
    @Override
    public void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息 {} 经交换机 {} 通过routingKey={} 路由到队列失败,失败code为:{}, 失败原因为:{}",
                new String(message.getBody()), exchange, routingKey, replyCode, replyText);
    }
}

--------------------------引用->controller-----------------------------------------------

//交换机回滚
@Autowired
private ExchangeCallback exchangeCallback;
//队列回滚
@Autowired
private QueueCallback queueCallback;
/**
 * 初始化交换机监听
 */
@PostConstruct
public void init(){  
//交换机
rabbitTemplate.setConfirmCallback(exchangeCallback);
/**
 * true:交换机无法将消息进行路由时,会将该消息返回给生产者
 * false:如果发现消息无法进行路由,则直接丢弃
 */
rabbitTemplate.setMandatory(true);
//队列
rabbitTemplate.setReturnCallback(queueCallback);
}
/**
     * 发送消息
     * 结果:"这是一条消息"
     */
    @GetMapping("/sendMessageTest")
    public String sendMessageTest(){
        // 消息类型为object 发送对象也是可以的
        String msg = "这是一条消息";
        // 第一个参数为发送消息到那个交换机上,第二个是发送的路由键(交换机进行需要符合绑定的队列),第三个参数为发送的消息
//CommonUtils.dirExchange--自己的交换机名称
//CommonUtils.routingKey --路由Key值 
        rabbitTemplate.convertAndSend("1235",CommonUtils.routingKey,msg);
        System.out.println("消息发送成功:"+msg);
        return "发送成功;发送内容为:"+msg;
    }

运行结果:

消息接收确认

消息通过 ACK 确认是否被正确接收,每个 Message 都要被确认(acknowledged),可以手动去 ACK 或自动 ACK

消息确认模式有:

  • AcknowledgeMode.NONE:自动确认
  • AcknowledgeMode.AUTO:根据情况确认(默认值)
  • AcknowledgeMode.MANUAL:手动确认

手动确认消息

1、basicAck

表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。

void basicAck(long deliveryTag, boolean multiple)

  • deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel

  • multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。

  • 举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

2、basicNack

表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

  • deliveryTag:表示消息投递序号。
  • multiple:是否批量确认。
  • requeue:值为 true 消息将重新入队列。

3、basicReject

拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)

  • deliveryTag:表示消息投递序号。
  • requeue:值为 true 消息将重新入队列。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/451410.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

直播授课在线课堂学习平台的设计与实现nodejs+vue

在各学校的教学过程中,直播授课管理是一项非常重要的事情。随着计算机多媒体技术的发展和网络的普及,“基于网络的学习模式”正悄无声息的改变着传统的直播学习模式,“基于网络的直播教学平台”的研究和设计也成为教育技术领域的热点课题。1、…

金融数据获取:获取网站交互式图表背后的数据,Headers模拟浏览器请求,防盗链破解及Cookie验证

有时候写爬虫难免会遇上只提供一张图表却没有背后结构化数据的情况,尤其是金融市场数据,例如股票K线,基金净值等。笔者看了市面上主流的行情网站,基本都是图一这样的交互式图表,但这样的图表是没有办法直接拿到原始数据…

前端技术学习第八讲:VUE基础语法---初识VUE

VUE基础语法—初识VUE 一、初识VUE Vue (读音 /vjuː/,类似于 view) 是一套用于构建用户界面的渐进式框架。与其它大型框架不同的是,Vue 被设计为可以自底向上逐层应用。Vue 的核心库只关注视图层,不仅易于上手,还便于与第三方库…

如何快速白嫖一个SSL证书

首先呢,还是要非常感谢各大云厂商能够为白嫖党免费提供申请SSL证书的这个一个平台。 SSL证书作用不在描述,自行百度? 本篇主要从百度云、腾讯云、阿里云讨论下 阿里云 地址:阿里云-计算,为了无法计算的价值 首先需…

手写axios源码系列三:dispatchRequest发送请求

文章目录 一、dispatchRequest 发送请求代码设计思路1、创建 dispatchRequest.js 文件2、创建 adapters.js 文件3、创建 xhr.js 文件4、总结 上篇文章中介绍了创建 axios 函数对象的思路,在 Axios 的原型对象上声明了一个 request 方法,在 request 方法中…

基于Java+Springboot+vue在线版权登记管理系统设计实现

基于JavaSpringbootvue在线版权登记管理系统设计实现 博主介绍:5年java开发经验,专注Java开发、定制、远程、指导等,csdn特邀作者、专注于Java技术领域 作者主页 超级帅帅吴 Java项目精品实战案例《500套》 欢迎点赞 收藏 ⭐留言 文末获取源码联系方式 文…

亚马逊云科技在上云旅程中不断调整和优化成本管理计划

亚马逊云科技的云财务管理旨在帮助企业建立一个成功的CFM战略:通过4个云财务管理CFM原则或步骤作为路线图:SEE-查看、SAVE-保存、PLAN-计划和RUN-运行。 对现有工作负载的预测和规划 1、 优化计算资源与架构: 与技术业务相关部门合作&…

Notion打不开

如果Notion打不开,可以尝试以下方法:1. 尝试Ping一下Notion的服务器,如果是正常的,但访问502了,那么很可能是DNS污染了。建议将DNS修改为114.114.114.114,再加个8.8.8.8,修改完成后再度访问Noti…

doccano使用记录

参考文章:https://github.com/PaddlePaddle/PaddleNLP/blob/develop/model_zoo/uie/doccano.md 参考文章:https://github.com/doccano/doccano 参考文章:https://doccano.github.io/doccano/ 参考文章:https://zhuanlan.zhihu.com…

06 | 立迈胜电机使用问题汇总

1 前提 使用STM2832B-485-MA-0FS等 2 常见问题 2.1 操作相关 问题1:怎么识别到电机设备 解决方法: 1、电机上电,在通讯处,点击【打开】 2、设备类型选择【串口】 3、选择串口选择【对应的COM】 4、选择对应的波特率 问题2…

python 的 object 与type的关系

python 的 object 与type的关系 是并列关系&#xff0c;两种是相互依赖的 查询父类 type.__bases__ object.__bases__(<class ‘object’>,) () 查询类型 type(type) type(object)<class ‘type’> <class ‘type’> 在python中&#xff0c;type用于描述…

Docker之Docker网络

Docker网络 1. 理解Docker01.1 测试1.2 原理1.3 小结 2. -link3. 自定义网络3.1 网络模式3.2 测试3.3 自定义网络 4. 网络连通5. 实战&#xff1a;部署Redis集群6. 总结 1. 理解Docker0 清空所有环境 docker rm -f $(docker ps -aq) docker rmi -f $(docker images -aq)1.1 测…

51.现有移动端开源框架及其特点—PocketFlow-1

51.1 简介 全球首个自动模型压缩框架一款面向移动端AI开发者的自动模型压缩框架,集成了当前主流的模型压缩与训练算法,结合自研超参数优化组件实现了全程自动化托管式的模型压缩与加速。 开发者无需了解具体算法细节,即可快速地将AI技术部署到移动端产品上,实现了自动托管式…

Java项目打包exe运行文件

Java项目打包exe运行文件 JavaSE打包成exe运行文件的方法有很多种&#xff0c;此处我们主要讲解我常用的一种exe4j&#xff0c;打包前我们需要先安装exe4j这个工具。 注意&#xff1a;exe4j仅支持最低JDK1.8最高JDK11&#xff0c;所以在安装之前一定要查看自己的JDK版本&#…

银行数字化转型导师坚鹏:数字化时代普惠金融模式和产品创新

数字化时代普惠金融模式和产品创新 课程背景&#xff1a; 很多银行存在以下问题&#xff1a; 不清楚普惠金融的机遇与挑战&#xff1f; 不知道普惠金融模式和产品如何创新&#xff1f; 不知道普惠金融产品创新的成功案例&#xff1f; 课程特色&#xff1a; 用实战案例…

使用Docker安装Zookpeer集群

1&#xff09;需要提前安装python和docker-compose 注&#xff1a;sudo权限看自己机器的权限 安装python-pip&#xff1a;sudo yum -y install epel-releasesudo yum -y install python-pip安装docker-compose&#xff1a;sudo pip install docker-compose 注意在安装过程中很…

FileZilla密钥登录

使用密码登录非常的方便&#xff0c;但是有的客户的云服务器上是限定只能通过密钥登录。我一般使用命令行的scp命令就可以正常上传&#xff0c;但是对于我一些同事来说&#xff0c;就很不方便。 生成密钥 这个不难&#xff0c;可以参考我之前的文章。 《Mac使用ssh连接远程服…

实验07:子集和问题

1.实验目的&#xff1a; 深刻理解回溯法的基本思想&#xff0c;掌握回溯法解决问题的一般步骤&#xff0c;学会使用回溯法解决实际问题.运用所熟悉的编程工具&#xff0c;借助回溯法的思想求解子集和数的问题。 2.实验内容&#xff1a; 给定 n n n 个正整数 { x 1 , x 2 ,…

springboot 接口防刷(根据IP与路径限制)

接口防刷 一、全局接口防刷&#xff08;通过拦截器方式&#xff09;1、原理 代码示例 二、个别接口防刷&#xff08;接口注解方式)1、代码示例 一、全局接口防刷&#xff08;通过拦截器方式&#xff09; 1、原理 代码示例 通过ip地址uri拼接用以作为访问者访问接口区分通过…

NX状态检测

输入 sudo -H pip install jetson-stats 如果提示没有pip&#xff0c;那么就输入 sudo apt-get install python-pip 之后输入 sudo jtop进行监测 用这个方法可以看到当前Jetpack的版本