02_RabbitMQ消息丢失解决方案及死信队列

news2024/11/15 16:01:48

一、数据丢失

第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题,都有可能。

第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了。

第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。

解决方案

消费者从队列中获取到数据,应答成功之后,队列认为消费者对消息处理完成,从队列中删除消息。

生产者

mq告知生产者,消息接收成功,没有成功,生产者可以继续重新发送消息

RabbitMQ有两种方式来解决这个问题:

  • 通过AMQP提供的事务机制实现(同步,不推荐);

  • 使用发送者确认模式实现(confirm,异步);

事务使用

事务的实现主要是对信道(Channel)的设置,主要的方法有三个:

  1. channel.txSelect()声明启动事务模式;

  2. channel.txCommit()提交事务;

  3. channel.txRollback()回滚事务;

public class Product {
​
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnUtils.getConn();
        Channel channel = conn.createChannel();
​
        String queueName = "my-que";
        // queueDeclare(String queue队列名, boolean durable持久化, boolean exclusive是否排外, boolean autoDelete自动删除,Map<String, Object> arguments队列其他属性信息)
        channel.queueDeclare(queueName,false,false,false,null);
​
        String message = "hello rabbitmq";
​
        try{
            // 开启事务
            channel.txSelect();
            // 发布消息
            channel.basicPublish("",queueName,null,message.getBytes());
            // 提交事务
            channel.txCommit();
        }catch (Exception e){
            e.printStackTrace();
            // 回滚事务
            channel.txRollback();
        }
        
        channel.close();
        conn.close();
    }
}
Confirm发送方确认模式

Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。

Confirm的三种实现方式:

  • channel.waitForConfirms()普通发送方确认模式,单条数据;

  • channel.waitForConfirmsOrDie()批量确认模式,多条数据;

  • channel.addConfirmListener()异步监听发送方确认模式;

public class Product {
​
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection conn = ConnUtils.getConn();
        // 创建信道:处理消息
        Channel channel = conn.createChannel();
​
        // 创建队列:存储消息
        String queueName = "my-que";
        // queueDeclare(String queue队列名, boolean durable持久化, boolean exclusive是否排外, boolean autoDelete自动删除,Map<String, Object> arguments队列其他属性信息)
        channel.queueDeclare(queueName, false, false, false, null);
​
        // 定义消息
        String message = "hello rabbitmq";
​
        // 开启发送方确认模式
        channel.confirmSelect();
        // basicPublish(交换器,序列名,参数信息,消息)
        channel.basicPublish("", queueName, null, message.getBytes());
​
        if(channel.waitForConfirms()){
            System.out.println("发送成功");
        }
​
        channel.close();
        conn.close();
    }
}
// 开启发送方确认模式
channel.confirmSelect();
​
for (int i = 0; i < 10; i++) {
    channel.basicPublish("", queueName, null, message.getBytes());
}
​
// 直到所有信息都发布,只要有一个未确认就会IOException
channel.waitForConfirmsOrDie(); 
System.out.println("全部执行完成");
// 开启发送方确认模式
channel.confirmSelect();
​
for (int i = 0; i < 10; i++) {
    channel.basicPublish("", queueName, null, message.getBytes());
}
​
//异步监听确认和未确认的消息
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("未确认消息,标识:" + deliveryTag);
    }
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
    }
});
​
System.out.println("执行操作...");

MQ

rabbitmq做持久化,持久化主要分为:

  • 交换器的持久化

    // 参数1 exchange :交换器名
    // 参数2 type :交换器类型
    // 参数3 durable :是否持久化
    channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
  • 队列持久化

    // 参数1 queue :队列名
    // 参数2 durable :是否持久化
    // 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
    // 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
    // 参数5 arguments
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  • 消息持久化

    // 参数1 exchange :交换器
    // 参数2 routingKey : 路由键
    // 参数3 props : 消息的其他参数,其中 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化
    // 参数4 body : 消息体
    channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

消费者

关闭自动应答,改为手动应答,处理完成,MQ中再做删除

        // 消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("deliveryTag:" + envelope.getDeliveryTag());
                Thread.sleep(1000);
                System.out.println("获取到的消息内容:" + new String(body));
                // 手动应答
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
​
        // false:关闭自动应答
        channel.basicConsume(queueName, false, consumer);

二、死信队列

  • 消息过期会自动删除

  • 消费者立刻接收消息进行处理

消费者立刻接收到数据处理,需求队列得数据不立刻被消费者处理,而是希望等待一段时间才被处理

死信队列:是延迟队列得一种实现方式,不是一种特殊得队列,死信交换机+消息有效期

  • 交换机的类型:fanout,direct,topic,header

  • 队列:不区分类型,也是一个普通得队列

  • 死信交换机:将一个交换机设置为某一个队列的死信交换机,当前队列过期的消息会发送给交换机

  • 消息:有效期

消息传递顺序

生产者发布消息 --> 交换机(转发消息) --> 队列(接收存储消息) --> 消费者(接收处理消息)

队列转发消息:

  1. 正常消息:转给消费者处理;

  2. 过期消息:默认删除,也可以通过配置(死信交换机)转发给交换机;

交换机的消息来源:

  1. 生产者发布的(普通得生产者)

  2. 其他队列到期删除的消息自动进入队列(队列需要绑定交换机,队列发布)

队列指的内容:

  • 消息来源都是从交换机发送得

死信队列:交换机B作为队列A的死信交换机,接收队列A中被删除的消息,实现队列A中的消息延迟处理的效果

死信队列DLX(dead-letter-exchange),利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新投递到另一个exchange上,这个exchange就是DLX,似乎叫死信交换机更加贴切,当死信投递到这个exchange后,我们也可以用一个queue来绑定该exchange,该exchange就可以根据路由规则把这个死信路由到这个queue了,同样可以创建消费端去进行消费,以便对死信进行相应的处理。

延时队列:实际是不存在直接可用的延时队列,可通过死信消息和死信队列来实现延时队列的功能。RabbitMQ中的所有队列,消息存进来,立刻分发给对应的消费者进行处理。

死信交换机: DLX 全称(Dead-Letter-Exchange)。其实它是个普通的交换机,但它是设置在队列上某个参数的值对应的交换机。

死信队列:如果某个队列上存在参数:x-dead-letter-exchange, 当这个队列里的消息变成死信消息(dead message)后会被重新Pushlish到 x-dead-letter-exchange 所对应参数值的交换机上,跟这个交换机所绑定的队列就是死信队列。

为什么使用延迟队列

订单五分钟内支付:未支付订单超时

以前:1.用户做支付时,判断是否超时,修改订单状态;(不调用)

​ 2.定时任务,每一秒刷新一次;(频繁刷新)

延时队列的应用场景很多,在我的项目开发中也涉及到很多,例如:订单五分钟未支付自动取消、订单准备超时30分钟推送提醒给门店、订单完成后两小时推送评价邀请给用户等等,这些间隔指定时间后的操作都可以使用延时队列。

消息变成死信的情况

死信消息

  • 消息被拒绝(basic.reject / basic.nack),并且requeue = false(不重回队列)。

  • 消息TTL过期。

  • 队列达到最大长度。

过期消息:RabbitMq 有两种设置消息过期的方式:

  1. 创建队列时通过 x-message-ttl 参数指定该队列消息的过期时间,这种队列里的消息过期时间全部相同。

  2. 生产者Pushlish消息时,通过设置消息的 expiration 参数指定过期时间,每个消息的过期时间都不一样。

  如果两者同时使用,过期时间按照小的一方为准,两种方式设置的时间都是 毫秒。

Queue参数信息

  • name: 队列的名称;

  • actualName: 队列的真实名称,默认用name参数,如果name为空,则根据规则生成一个;

  • durable: 是否持久化;

  • exclusive: 是否独享、排外的;

  • autoDelete: 是否自动删除;

  • arguments:队列的其他属性参数,有如下可选项,可参看图2的arguments:

    • x-message-ttl:消息的过期时间,单位:毫秒;

    • x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;

    • x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;

    • x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;

    • x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;

    • x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;

    • x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值

    • x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)

    • x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;

    • x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;

    • x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。

交换机参数信息

类型

  • FanoutExchange :扇形 交换机

  • DirectExchange :直连交换机 routing模式用的交换机

  • TopicExchange :主题模式交换机

参数信息

  • name:交换机名称

  • durable:是否把交换机持久化到磁盘上

  • autoDelete :是否自动删除交换机 用法有点类似上面的队列中的autoDelete 只有所有队列都和交换机接触订阅,说白了就是所有绑定到交换机上的队列不再需要改交换机,他就该死了,有点残酷

  • artuments :额外参数

实现

生产者

发布消息,设置消息过期时间

public class Producer {
​
    // default exchange
    private static String exchange = "test_dlx_exchange";
    // default exchange 的路由规则: routingKey(test) 将匹配同名的 queue(test)
    private static String routingKey = "dlx.abc";
​
    public static void main(String[] args) throws IOException, TimeoutException {
​
        // 1 创建Connection
        Connection connection = ConnUtils.getConn();
​
        // 2 创建Channel
        Channel channel = connection.createChannel();
​
        // 3 发送消息
        for (int i = 0; i < 5; i++) {
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    // deliveryMode:1(nopersistent)非持久化,2(persistent)持久化
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    // expiration:消息的失效时间
                    .expiration("50000")
                    .build();
            String msg = "RabbitMQ: dlx message" + i;
            channel.basicPublish(exchange , routingKey , properties , msg.getBytes());
        }
​
        // 5 关闭连接
        channel.close();
        connection.close();
    }
}

正常消费者

绑定死信队列,当消息变为死信消息时,会自动存入私信队列

public class Consumer {
​
    public static void main(String[] args) throws IOException, TimeoutException {
​
        Connection connection = ConnUtils.getConn();
​
        Channel channel = connection.createChannel();
​
        // 定义死信队的Exchange
        String dlxExchange = "dlx.exchange";
        channel.exchangeDeclare(dlxExchange, "topic");
​
        // 死信队列名
        String dlxQueue = "dlx.queue";
        channel.queueDeclare(dlxQueue, true, false, false, null);
​
        // # 表示所有的key都可以路由到s死信队列
        String dlxRoutingKey = "#";
        // 绑定死信队列和exchange
        channel.queueBind(dlxQueue, dlxExchange, dlxRoutingKey, null);
​
        // 定义正常的消费者j监听队列
        // 申明exchange
        String exchangeName = "test_dlx_exchange";
        channel.exchangeDeclare(exchangeName, "topic");
​
        // 申明队列
        Map<String, Object> arguments = new HashMap<>();
        // 设置死信队列,arguments要设置到申明的队列上
        arguments.put("x-dead-letter-exchange", dlxExchange);
        String queueName = "test_dlx_queue";
        channel.queueDeclare(queueName, true, false, false, arguments);
​
        // 队列绑定到exchange
        String routingKey = "dlx.#";
        channel.queueBind(queueName, exchangeName, routingKey);
​
        channel.basicQos(1);
​
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("---消费者-- " + new String(message.getBody(), "UTF-8"));
​
            }
        };
​
        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("---消费者--:cancelCallback ");
            }
        };
​
        // callback: 消费者会调函数,处理发送过来的消息。
        // cancelCallback: 消费者取消订阅时的回调方法。
        // 消费消息,autoAck一定要设为false,手工ack
        channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
    }
}

死信消费者

处理死信队列中得信息

// DLXConusmer,监听消费死信队列中的消息
public class DLXConusmer {
​
    public static void main(String[] args) throws IOException, TimeoutException {
​
        Connection connection = ConnUtils.getConn();
​
        Channel channel = connection.createChannel();
​
        String queueName = "dlx.queue";
        String exchangeName = "dlx.exchange";
        String routingKey = "#";
​
        // 申明exchange
        channel.exchangeDeclare(exchangeName, "topic");
        // 申明队列
        channel.queueDeclare(queueName, true, false, false, null);
        // 队列绑定到exchange
        channel.queueBind(queueName, exchangeName, routingKey, null);
​
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                try {
                    System.out.println("---死信队列消费者---");
                    System.out.println(new String(message.getBody(), "UTF-8"));
                } finally {
                    // consumer手动 ack 给broker
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                }
            }
        };
        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("---消费者:cancelCallback");
            }
        };
​
        // 消费消息,autoAck一定要设置为false
        channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2156506.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Attention is All You Need精读

原文开头&#xff0c;不是搬运 dog。All attention is all you need Abstract 摘要 这篇文章的内容源于基于编码器-解码器架构的RNN模型&#xff0c;在其他人的工作中&#xff0c;我们可以发现注意力机制对于提升编码器-解码器架构模型的性能很重要。这篇文章提出了一个比较简…

《代码整洁之道:程序员的职业素养》

作者&#xff1a;【美】Robert C. Martin 第1章 专业主义 专业主义就意味着担当责任&#xff0c;软件开发太复杂了&#xff0c;不可能没什么 bug。但很不幸&#xff0c;这并不能为你开脱。人体太复杂了&#xff0c;不可能尽知其全部&#xff0c;但医生仍要发誓不伤害病人。如…

隐藏excel单元格数据的两个方法

在Excel中&#xff0c;公式是用来计算数据和结果的非常重要的一部分。但是&#xff0c;有时候您可能希望隐藏公式&#xff0c;以保护其不被他人修改或查看。那么今天小编就来给大家分享隐藏excel单元格数据的方法。 一、使用“隐藏”功能 在Excel中&#xff0c;我们还可以使用…

ZYNQ学习--AXI总线协议

一、AXI 总线简介 AXI&#xff08;Advanced Extensible Interface&#xff09;高级拓展总线是AMBA&#xff08;Advanced Microcontroller Bus Architecture&#xff09;高级微控制总线架构中的一个高性能总线协议&#xff0c;由ARM公司开发。AXI总线协议被广泛应用于高带宽、低…

大语言模型超参数调优:开启 AI 潜能的钥匙

前言 在人工智能的广袤领域中&#xff0c;大语言模型&#xff08;LLM&#xff09;凭借其强大的实力&#xff0c;不断重塑着我们对机器理解语言的认知。然而&#xff0c;要使这些模型在特定应用场景中发挥最大效能&#xff0c;关键在于巧妙调整其超参数。本文将引领你深入探究 …

x-cmd pkg | bat: cat 命令现代化替代品,终端用户必备工具

目录 简介快速上手安装使用与第三方工具组合使用 功能特点竞品和相关作品进一步阅读 简介 bat 是由 github.com/sharkdp 用 Rust 开发的 cat 命令现代化替代品。它比 cat 命令扩展了更多的现代化功能&#xff0c;如语法高亮、自动分页、Git集成等&#xff0c;能为用户提供更为…

python如何跨文件调用自己定义的函数

当自己定义函数过多时&#xff0c;只有一个python文件时代码会很长&#xff0c;不易理清代码框架&#xff0c;比如下面这段代码&#xff0c;如何隐藏具体函数细节呢&#xff1f;也就是把def函数放到另外一个python文件里步骤如下&#xff1a; 一个python文件代码篇幅过长 imp…

结构体对齐、函数传参、库移植

结构体字节对齐 按固定位大小匹配地址&#xff0c;a:10b:1020位 <32位4字节 202040位>32位 所以ab20作为一个int型&#xff0c;int c:20 单独做4个字节&#xff08;int&#xff09; 101020 &#xff08;int&#xff09;4个字节 &#xff0c;20&#xff08;int&#x…

算法之逻辑斯蒂回归(Logistic regression)

简介&#xff1a;个人学习分享&#xff0c;如有错误&#xff0c;欢迎批评指正。 逻辑斯蒂回归&#xff08;Logistic Regression&#xff09;是统计学中一种广泛应用于二分类问题的算法。它的主要目标是预测二分类问题中的事件发生的概率。尽管名字里有“回归”&#xff0c;但逻…

wordpress迁移到别的服务器

wordpress论坛网站搭建 于2023/11/16写的该文章 一-配置环境 配置LNMP&#xff08;linuxnginxmysqlphpphpmyadmin&#xff09;环境或者LAMP&#xff08;apache&#xff09; 可以选择集成了这些软件的套件 下载链接&#xff1a;https://www.xp.cn/download.html 手动下载这…

https加密原理

以为http的数据都是以明文传送&#xff0c;会有很大的安全问题&#xff0c;所以出现的https协议。https就是在http协议的基础上增加了一个安全层&#xff0c;可以对数据进行加密和解密(例如SSL、TLS等)。 https加密解密的原理&#xff1a;证书非对称加密对称加密 在讲解原理前…

Python爬虫之urllib模块详解

Python爬虫入门 此专栏为Python爬虫入门到进阶学习。 话不多说&#xff0c;直接开始吧。 urllib模块 Python中自带的一个基于爬虫的模块&#xff0c;其实这个模块都几乎没什么人用了&#xff0c;我就随便写写了。 - 作用&#xff1a;可以使用代码模拟浏览器发起请求。&…

2024 年最新前端ES-Module模块化、webpack打包工具详细教程(更新中)

模块化概述 什么是模块&#xff1f;模块是一个封装了特定功能的代码块&#xff0c;可以独立开发、测试和维护。模块通过导出&#xff08;export&#xff09;和导入&#xff08;import&#xff09;与其他模块通信&#xff0c;保持内部细节的封装。 前端 JavaScript 模块化是指…

【Pytorch】一文快速教你高效使用torch.no_grad()

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 博主简介 博主致力于嵌入式、Python、人工智能、C/C领域和各种前沿技术的优质博客分享&#xff0c;用最优质的内容带来最舒适的…

2024年中国研究生数学建模竞赛A/C/D/E题全析全解

问题一&#xff1a; 针对问题一&#xff0c;可以采用以下低复杂度模型&#xff0c;来计算风机主轴及塔架的疲劳损伤累积程度。 建模思路&#xff1a; 累积疲劳损伤计算&#xff1a; 根据Palmgren-Miner线性累积损伤理论&#xff0c;元件的疲劳损伤可以累积。因此&#xff0c;…

基于SpringBoot+Vue的商城积分系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、SSM项目源码 精品专栏&#xff1a;Java精选实战项目源码、Python精…

手机在网状态查询接口如何用C#进行调用?

一、什么是手机在网状态查询接口&#xff1f; 手机在网状态查询接口是利用实时数据来对手机号码在运营商网络中的状态进行查询的工具&#xff0c;包括正常使用状态、停机状态、不在网状态、预销户状态等。 二、手机在网状态查询适用哪些场景&#xff1f; 例如&#xff1a;商…

【线程池】ThreadPoolExecutor应用

ThreadPoolExecutor应用 每一步的坚持与积累,都是铸就高薪和大牛的必经的修炼 哈哈,不吹牛逼了,今天来分享最近在提升中的学习总结,无论是对在职场还是求职,看完,我相信都会有些许的收获和成长 也难得过了一个悠闲点的周末,哈哈哈,一起奥利给!! 本文总纲: 1.为什么要自定义线程…

联合体的用法和用联合体判断大小端存储

像结构体⼀样&#xff0c;联合体也是由⼀个或者多个成员构成&#xff0c;这些成员可以不同的类型。但是编译器只为最⼤的成员分配⾜够的内存空间。联合体的特点是所有成员共⽤同⼀块内存空间。所 以联合体也叫&#xff1a;共⽤体。 给联合体其中⼀个成员赋值&#xff0c;其他成…

Linux 文件系统(下)

目录 一.文件系统 1.文件在磁盘上的存储方式 a.盘面、磁道和扇区 b.分区和分组 2.有关Block group相关字段详解 a.inode编号 b.inode Table&#xff08;节点表&#xff09; c.Data blocks&#xff08;数据区&#xff09; d.小结 二.软硬链接 1.软链接 a.软链接的创建…