RabbitMQ初步到精通-第十一章-RabbitMQ之常见问题汇总

news2025/1/11 3:50:49

目录

RabbitMQ之常见问题汇总

1.rabbitmq丢消息场景

1.1 消息未持久化丢失

1.2 消费时消息丢失

1.3 如何阻止消息丢失

2. mq消费消息是pull 还是 push

2.1 pull形式消费

2.2 push形式消费

3. mq重复消费场景

3.1 生产端重复情况

3.2 消费端重复

3.3 如何防止

4.prefetch作用


RabbitMQ之常见问题汇总

1.rabbitmq丢消息场景

在前面的文章中,我们介绍了mq如何防消息丢失,从消息从生产者发送到Broker的Exchange,再到Queue,再Deliver到消费者,各个环节都有可能会丢失消息。

这里我们主要模拟一下两个场景:消息未持久化丢失和消费时消息丢失。

1.1 消息未持久化丢失

生产者发送了一条未持久化消息,此时消息未被消费,那这条消息存储在内存中

此时重启MQ,那这条消息就会丢失掉。 

1.2 消费时消息丢失

若消费时使用自动确认机制,产生了2条消息,第一条消息由于处理时间比较长,第二条消息已经拉取到了本地BlockingQueue中,再第一条消息处理的过程中,系统宕机了,第二条消息还没来的及消费,就丢失了。

1.3 如何阻止消息丢失

参考 RabbitMQ初步到精通-第五章-RabbitMQ之消息防丢失_Mr-昊哥的博客-CSDN博客

2. mq消费消息是pull 还是 push

mq消费既支持pull的形式也支持push的形式。pull形式,是靠客户端一直拉取服务端的消息,而客户端并不知道什么时候会有消息进入到队列中,会一直取消息,造成额外的系统开销,效率也不高。

而push形式,是一般我们采用的方式。将监听函数注册到服务中,mqBroker收到消息进行投递,再调用监听函数方法消费消息。

2.1 pull形式消费

代码中使用:

        for (int i = 0; i < 2; i++) {
            GetResponse response = channel.basicGet(QUEUE_NAME, true);
            System.out.println("消费:" + new String(response.getBody()));
        }

抓包情况:

都是先发出请求,再将结果返回。

2.2 push形式消费

代码:

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                Thread.sleep(1000);
                System.out.println("接收到消息:" + new String(body, "UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        channel.basicConsume(QUEUE_NAME, false, consumer);

抓包情况:

服务端将消息推送过来:

3. mq重复消费场景

3.1 生产端重复情况

从生产端即产生了重复的情况,多由于系统bug,或系统间调用造成重试等原因,推出了重复的消息。

3.2 消费端重复

消费使用手动ACK模式,两个消费者,消费者1,虽然消费到了消息,由于处理时间长,未能及时ACK,这时候系统宕机,mq会将NACK状态的消息变为Ready状态,又推送给了消费者2 ,这样就造成了,同一条消息 同时在消费者1和消费者2都消费到了。

生产者:

public class RepeatProducer {

    public static final String QUEUE_NAME = "work";

    //生产者
    public static void main(String[] args) throws Exception {
        //1、获取connection
        Connection connection = RabbitCommonConfig.getConnection();
        //2、创建channel
        Channel channel = connection.createChannel();

        for (int i = 1; i <= 3; i++) {
            sendMsg(channel, i);
            Thread.sleep(1000);
        }
        //4、关闭管道和连接
        channel.close();
        connection.close();
    }

    private static void sendMsg(Channel channel, int k) throws IOException {
        //3、发送消息到exchange
        String msg = "hello work :" + k;
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        System.out.println("生产者发布消息成功!" + k);
    }

}

消费者1:

public class RepeatConsumer1 {
    public static final String QUEUE_NAME = "work";

    //消费者
    public static void main(String[] args) throws Exception {
        //1、获取连对象、
        Connection connection = RabbitCommonConfig.getConnection();
        //2、创建channel
        Channel channel = connection.createChannel();

        channel.basicQos(1);
        //3、创建队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //4.开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer1 接收到消息:" + new String(body, "UTF-8"));
                try {
                    Thread.sleep(1000 * 60);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //手动ACK(接收信息,指定是否批量操作)
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //5.关闭自动ACK
        channel.basicConsume("work", false, consumer);
        System.out.println("消费者1开始监听队列");

        //6、键盘录入,让程序不结束!
        System.in.read();

        //7、释放资源
        channel.close();
        connection.close();
    }


}

消费者2:

public class RepeatConsumer2 {

    public static final String QUEUE_NAME = "work";

    //消费者
    public static void main(String[] args) throws Exception {
        //1、获取连对象、
        Connection connection = RabbitCommonConfig.getConnection();
        //2、创建channel
        Channel channel = connection.createChannel();

        channel.basicQos(1);
        //3、创建队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //4.开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer2 接收到消息:" + new String(body, "UTF-8"));
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //手动ACK(接收信息,指定是否批量操作)
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //5.关闭自动ACK
        channel.basicConsume(QUEUE_NAME, false, consumer);
        System.out.println("消费者2开始监听队列");

        //6、键盘录入,让程序不结束!
        System.in.read();

        //7、释放资源
        channel.close();
        connection.close();
    }

}

结果:

第一次运行:

消费者1开始监听队列
Consumer1 接收到消息:hello work :1

消费者2开始监听队列
Consumer2 接收到消息:hello work :2
Consumer2 接收到消息:hello work :3

运行过程中停掉消费者1

消费者2开始监听队列
Consumer2 接收到消息:hello work :2
Consumer2 接收到消息:hello work :3
Consumer2 接收到消息:hello work :1

3.3 如何防止

消费时引入幂等机制,使用分布式锁,数据库唯一索引等控制。

4.prefetch作用

接触到prefetch分别会在java和spring 客户端中出现,只有需要手动ACK的时候才起作用。

设置了此值,会控制mq服务端给客户端推送的消息数量。

例如设置15,mq堆积了大量消息的情况下,会首次推送给 客户端15个消息,若消息消费慢的情况,mq的服务端会始终保证15个unack的消息的前提下,给客户端推送。

1.java amqp client

        channel.basicQos(15);
        //开启手动确认
        channel.basicConsume(QUEUE_NAME, false, consumer);

2. spring 配置中

spring.rabbitmq.listener.simple.acknowledge-mode = manual
spring.rabbitmq.listener.simple.prefetch = 15

spring若没进行配置默认250

private volatile int prefetchCount = DEFAULT_PREFETCH_COUNT;
public static final int DEFAULT_PREFETCH_COUNT = 250;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/50537.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

行业新趋势!利尔达OpenCPU方案助力水表厂商破局

在“十四五”规划数字化转型的大背景下&#xff0c;水务行业的不断发展对智能水表的需求呈爆发式增长&#xff0c;NB-IOT智能水表凭借其低功耗、低成本、安全、便捷、智能的特点较好解决了传统抄表的痛点&#xff0c;迅速成为行业市场的主角。 但过去两年里&#xff0c;“缺芯…

Rust机器学习之Plotters

Rust机器学习之Plotters 本文将带领大家学习Plotters的基础用法。重点学习Plotters的图表元素和常用图表的使用。 本文是“Rust替代Python进行机器学习”系列文章的第四篇&#xff0c;其他教程请参考下面表格目录&#xff1a; Python库Rust替代方案教程numpyndarrayRust机器…

关于账本数据库:你想知道的这里都有

&#x1f495;前言&#xff1a;十二月份出个openGuass集合专栏&#xff0c;带领大家浅浅的认识一下国产数据库吧&#x1f495; 1. 什么是账本数据库 区块链大家想必都耳熟能详&#xff0c;比特币、以太坊甚至狗狗币等代币&#xff0c;作为区块链的代名词&#xff0c;不仅牵动着…

《歌在飞》在抖音播放7.7亿,歌者苏勒亚其其格用公益让爱心传递

随着短视频的流行&#xff0c;抖音平台也被大家所熟知&#xff0c;很多好听的音乐作品&#xff0c;都是通过抖音平台传唱开来。 曾经有一首《歌在飞》的音乐作品&#xff0c;在抖音平台传唱度很广&#xff0c;截止目前已经有7.7亿的播放量。据悉&#xff0c;《歌在飞》这首歌曲…

基于QPSK的载波同步和定时同步性能仿真,包括Costas环的gardner环

目录 1.算法描述 2.matlab算法仿真效果 3.MATLAB核心程序 4.完整MATLAB 1.算法描述 载波同步是相干解调的基础&#xff0c;不管对于模拟通信还是数字通信来说&#xff0c;只要是相干解调&#xff0c;接收端都必须提供同频同相的载波。当然&#xff0c;若采用基带传输&#…

hadoop 3.x大数据集群搭建系列7-安装Hudi

文章目录编译环境准备一. 下载并解压hudi二. maven的下载和配置2.1 maven的下载和解压2.2 添加环境变量到/etc/profile中2.3 修改为阿里镜像三. 编译hudi3.1 修改pom文件3.2 修改源码兼容hadoop33.3 手动安装Kafka依赖3.4 解决spark模块依赖冲突3.4.1 修改hudi-spark-bundle的p…

pytest + yaml 框架 - 3.全局仅登录一次,在用例中自动在请求头部添加Authentication token认证

前言 我们在使用自动化测试框架的时候&#xff0c;经常会遇到一个需求&#xff0c;希望在全局用例中&#xff0c;仅登录一次&#xff0c;后续所有的用例自动带上请求头部token 或者cookies。 环境准备 Python 3.8版本 Pytest 7.2.0 最新版 pip 安装插件 pip install pytes…

[附源码]Python计算机毕业设计Django的实验填报管理系统

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;我…

OSSID: Online Self-Supervised Instance Detection by (And For) Pose Estimation

许多机器人操作算法都需要 实时目标姿态估计。然而&#xff0c;最先进的目标姿态估计方法是针对一组特定的对象进行训练的&#xff1b;因此&#xff0c;这些方法需要 重新训练 以估计每个新对象的姿势。本文提出了 OSSID 框架&#xff0c;利用 慢速零样本 姿态估计器 来 自监督…

OpenGL 图像色调

目录 一.OpenGL 图像色调 1.IOS Object-C 版本1.Windows OpenGL ES 版本2.Windows OpenGL 版本 二.OpenGL 图像色调 GLSL Shader三.猜你喜欢 零基础 OpenGL ES 学习路线推荐 : OpenGL ES 学习目录 >> OpenGL ES 基础 零基础 OpenGL ES 学习路线推荐 : OpenGL ES 学习目录…

Spring | IOC技术之Bean的配置与实例化

&#x1f451; 博主简介&#xff1a;    &#x1f947; Java领域新星创作者    &#x1f947; 阿里云开发者社区专家博主、星级博主、技术博主 &#x1f91d; 交流社区&#xff1a;BoBooY&#xff08;优质编程学习笔记社区&#xff09; 文章目录Bean的基础配置1、id 与 cla…

家电生产线数控机床上下料长臂机器人组设计

目录 摘 要 I ABSTRCT II 前言 III 1.长臂机器人组概况 1 1.1国内外发展状况 1 1.2研究意义 2 1.2.1长臂机器人组研究现状 2 1.2.2长臂机器人组研究方向 3 1.3本课题意义和目的 4 1.3.1本课题的意义 4 1.3.2本课题的目的 4 2.长臂机器人组的组成分类及设计分析 6 2.1 长臂机器人…

【车载开发系列】UDS诊断---诊断故障清除($0x14)

【车载开发系列】UDS诊断—诊断故障清除&#xff08;$0x14&#xff09; UDS诊断---诊断故障清除&#xff08;$0x14&#xff09;【车载开发系列】UDS诊断---诊断故障清除&#xff08;$0x14&#xff09;一.概念定义二.参数说明三.清除内容方式1&#xff09;清除内容2&#xff09;…

python接口自动化44- requests 库使用 hook 机制

前言 requests 是 Hooks 即钩子方法&#xff0c;用于在某个框架固定的某个流程执行是捎带执行&#xff08;钩上&#xff09;某个自定义的方法。 requests 库只支持一个 response 的钩子&#xff0c;即在响应返回时可以捎带执行我们自定义的某些方法。 可以用于打印一些信息&am…

Unity引擎开发-无人机模拟飞行实现

目 录 摘 要 I Abstract II 一、 绪论 1 &#xff08;一&#xff09; 研究背景和研究意义 1 &#xff08;二&#xff09; 国内外研究现状 1 &#xff08;三&#xff09; 论文组织结构 2 二、 3D游戏技术的知识系统 2 &#xff08;一&#xff09; 3D图形库 2 &#xff08;二&…

UML概述及UML类图详解

一 UML介绍 UML这三个字母的全称是Unified Modeling Language&#xff0c;翻译就是统一建模语言&#xff0c;是一种用于软件系统分析和设计的语言工具&#xff0c;它用于帮助软件开发人员进行思考和记录思路的结果 UML 本身是一套符号的规定&#xff0c;就像数学符号和化学符…

Linux——md5命令

文章目录介绍选项使用案例生成文件md5值文本模式或二进制模式md5值重定向重定向追加md5校验实际开发场景介绍 md5sum命令用于生成和校验文件的md5值。它会逐位对文件的内容进行校验。是文件的内容&#xff0c;与文件名无关&#xff0c;也就是文件内容相同&#xff0c;其md5值相…

CTGU操作系统

CTGU操作系统第一章第二章 Operating-System structures操作系统结构第三章自己看&#xff0c;懒得写了&#xff0c;我也不知道为啥划重点他要把所有PPT过一遍&#xff0c;离谱第一章 第二章 Operating-System structures操作系统结构 第三章自己看&#xff0c;懒得写了&#x…

【能效管理】电力监控系统在某商业数据中心的应用分析

摘要&#xff1a;在电力系统的运行过程中&#xff0c;变电站作为整个电力系统的核心&#xff0c;在保证电力系统可靠的运行方面起着至关重要的作用&#xff0c;基于此需对变电站监控系统的特点进行分析&#xff0c;结合变电站监控系统的功能需求&#xff0c;对变电站电力监控系…

Yoshua Bengio:我的一生

文 | 智商掉了一地2018 年图灵奖获得者、AI 先驱、深度学习三巨头之一、对抗生成网络 GAN、标志性的银灰卷发和浓眉&#xff0c;如果还没猜到的话&#xff0c;当你看到这个封面&#xff0c;一定就会意识到自己在学习的路上&#xff0c;已经或间接或直接地拜读过大佬的著作了。看…