RabbitMQ不公平分发问题分析及问题解决

news2024/11/22 16:49:34

1.不公平分发

1.1 不公平分发策略是什么?

在 RabbitMQ 中,不公平分发(Unfair Dispatch)是指当多个消费者(Consumers)同时订阅同一个队列(Queue)时,消息的分发机制是不公平的,可能导致负载不均衡等问题。

1.2 不公平分发产生的原因?

默认情况下,RabbitMQ 采用的是轮询(Round Robin)的方式将消息(工作线程)平均分发给各个消费者,也就是理论上每一个消费者消费的消息数量都是一样的。但是实际上,可能由于消费者处理消息的速度不同(可能由于网络因素、配置因素等不同),可能就会导致有些消费者长时间处于空闲状态,有些消费者消息处理不过来,导致消息积压,导致负载不均衡等情况,会严重影响到整个系统的性能。

1.3 怎么解决?

1.3.1 公平分发

公平分发:在公平预取模式下,每个消费者一次只能从队列中预先获取一条消息。当消费者处理完当前的消息并确认后,RabbitMQ 才会将下一条消息发送给该消费者,没有处理完消息的消费者将继续处理消息。这样可以确保每个消费者都能够公平地接收和处理消息,避免某个消费者长时间占用资源导致负载不均衡。

代码演示公平分发策略,在消费者中消费消息之前,设置参数 channel.basicQos(1);

public class Work05 {
    //队列名称
    public static final String TASK_QUEUE_NAME = "ACK_QUEUE";

    //接受消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("C1等待接受消息处理时间较短");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            //沉睡1S
            SleepUtils.sleep(1);
            System.out.println("接受到的消息:" + new String(message.getBody(), "UTF-8"));
            //手动应答
            /**
             * 1.消息的标记Tag
             * 2.是否批量应答 false表示不批量应答信道中的消息
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };

        CancelCallback cancelCallback = (consumerTag -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");

        });
        //设置不公平分发
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
}

开启成功,会看到如下结果:

image-20230711004411432

演示结果如下

image-20230711005222486

C2消费者处理消息时间较长,在处理消息完成之前,不会收到新的消息。C1消费者处理消息效率较高,因为处理完一条消息后将继续处理收到的新消息。

1.3.2 预取值分发

预取值分发策略:带权的消息分发,公平分发策略虽然解决了负载不均衡的问题,但是在高并发场景下会存在性能问题。

试想一下,每个消费者消息消息时都会从一个缓存区里面读取消息,这个缓存区有大小限制,既解决了消息积压的问题,又优化了消费者处理完消息之后才向队列中读取一条数据这种损耗性能的操作。

因此可以通过一个未确认的消息缓冲区,开发人员能限制此缓冲区的大小以避免缓冲区里面无限制的未确认消息问题。这个时候也是可以通过使用 basic.qos 方法设置「预取计数」值来完成的。

该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量, RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。

通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。

预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境 中。对于大多数应用来说,稍微高一点的值将是最佳的。

public class Work03 {

    //队列名称
    public static final String TASK_QUEUE_NAME = "ACK_QUEUE";

    //接受消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("C1等待接受消息处理时间较短");

        DeliverCallback deliverCallback =(consumerTag,message) ->{

            //沉睡1S
            SleepUtils.sleep(1);
            System.out.println("接受到的消息:"+new String(message.getBody(),"UTF-8"));
            //手动应答
            /**
             * 1.消息的标记Tag
             * 2.是否批量应答 false表示不批量应答信道中的消息
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

        };

        CancelCallback cancelCallback = (consumerTag -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");

        });
        //设置不公平分发
        //int prefetchCount = 1;
        //值不等于 1,则代表预取值,预取值为4
        int prefetchCount = 4;
        channel.basicQos(prefetchCount);
        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

注意:不公平分发和预取值分发都用到 basic.qos 方法,如果取值为 1,代表不公平分发,取值不为1,代表预取值分发。

image-20231217154934867

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1317740.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

把文化注入品牌,五粮液荣获“全国企业文化优秀成果特等奖”

执笔 | 萧 萧 编辑 | 扬 灵 12月15日,以“塑造优秀企业文化,凝聚企业发展力量”为主题的全国企业文化年会(2023)首次在长江首城、中国酒都、中国动力电池之都宜宾盛大举行。 凭借“弘扬和美文化,谱写高质量发展新篇章”成果…

2018年AMC8数学竞赛真题的典型考点和详细解析

从战争中学习战争最有效。前几天,六分成长分析了2023年、2022年、2020、2019年的AMC8的典型考题、考点和详细答案解析。今天继续为大家分享2018年的AMC8的五道典型考题。 欢迎您查看历史文章了解之前各年的真题解析,本系列会持续更新,直到大家…

Linux:进程地址空间

目录 1.程序地址空间 2.进程地址空间 1.程序地址空间 我们在讲C/C语言的时候,32位平台下,我们见过这样的空间布局图 我们来验证一下这张图的正确性: int un_gval;int init_gval100;int main(int argc, char* argv[],char* env[]){//代码…

Nat. Mach. Intell. | 通过深度神经网络联合建模多个切片来构建一个三维全生物体空间图谱

今天为大家介绍的是来自Angela Ruohao Wu 和Can Yang团队的一篇论文。空间转录组学(ST)技术正在革新探索组织空间结构的方式。目前,ST数据分析通常局限于单个二维(2D)组织切片,这限制了我们理解在三维&…

【ArkTS】入门

代码结构分析 struct Index{ } 「自定义组件:可复用的UI单元」 xxx 「装饰器:用来装饰类结构、方法、变量」 Entry 标记当前组件是入口组件(该组件可被独立访问,通俗来讲:它自己就是一个页面)Component 用…

【后端学前端】第四天 css动画 垂直轮播效果(css变量、位移缩放动画、动画延迟)

1、学习信息 视频地址&#xff1a;css动画 垂直轮播效果&#xff08;css变量、位移缩放动画、动画延迟&#xff09;_哔哩哔哩_bilibili 2、源码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title…

QT for Android安卓编译环境搭建+首次编译3个大坑

1、安装 编译环境能否搭建成功&#xff0c;主要是看各个依赖软件的版本是否匹配。依赖的软件有3个&#xff1a;JDK、安卓SDK、安卓NDK。 我的qt版本是5.14.1&#xff0c;我亲测以下版本可以成功让编译安卓&#xff1a; QT5.14 JDK1.8.0 安卓SDK26.1 安卓NDK20.1 在QT-&g…

爬虫 scrapy ——scrapy shell调试及下载当当网数据(十一)

目录 一、scrapy shell 1.什么是scrapy shell&#xff1f; 2.安装 ipython 3.使用scrapy shell 二、当当网案例 1.在items.py中定义数据结构 2.在dang.py中解析数据 3.使用pipeline保存 4.多条管道的使用 5.多页下载 参考 一、scrapy shell 1.什么是scrapy shell&am…

VBA即用型代码手册之工作薄的创建及保存

我给VBA下的定义&#xff1a;VBA是个人小型自动化处理的有效工具。可以大大提高自己的劳动效率&#xff0c;而且可以提高数据的准确性。我这里专注VBA,将我多年的经验汇集在VBA系列九套教程中。 作为我的学员要利用我的积木编程思想&#xff0c;积木编程最重要的是积木如何搭建…

python封装执行cmd命令的方法

一、前置说明 在自动化时&#xff0c;经常需要使用命令行工具与系统进行交互&#xff0c;因此可以使用python封装一个执行cmd命令的方法。 二、代码实现 import subprocess import timefrom common.exception import RunCMDError from common.logger import loggerclass Cmd…

Nat Med | 白血病患儿持续存在CD19 CAR-T细胞的转录特征

今天给同学们分享一篇实验文章“Transcriptional signatures associated with persisting CD19 CAR-T cells in children with leukemia ”&#xff0c;这篇文章发表在Nat Med期刊上&#xff0c;影响因子为82.9。 结果解读&#xff1a; 研究队列和实验概述 作者研究了15例高…

网络编程day2作业

1.tcp实现通信 服务器&#xff1a; //tcp服务端#include <head.h>#define SERPORT 8888 #define IP "192.168.125.6"int main(int argc, const char *argv[]) { //1.创建套接字int sfdsocket(AF_INET,SOCK_STREAM,0);//2.绑定struct sockaddr_in ser;ser.sin…

java --- 集合进阶

目录 一、单列集合顶层接口 Collection 1.1 基本方法 1.2 Collection 的遍历方式 二、list集合 1.2 ArrayList Vector 底层结构 1.3 LinkedList ArrayList 和 LinkedList 比较 三、set接口 3.1、Set 接口和常用方法 3.2 HashSet HashSet 底层机制&#xff08;HashMap…

虚拟机Linux(Centos7)安装Docker

如果没有安装虚拟机的&#xff0c;可以参考这篇VMware虚拟机安装Linux操作系统&#xff08;CentOS7&#xff09; 文章目录 0.安装Docker1.CentOS安装Docker1.1.卸载&#xff08;可选&#xff09;如何看自己的虚拟机上是否安装过docker&#xff1f; 1.2.安装docker1.3.启动docke…

pytest之allure测试报告02:allure具体使用方法

一、allure包含的方法 二、allure使用教程 &#xff08;1&#xff09;用例中写入allure方法 allure.epic("数据进制项目epic") allure.feature("手机号模块feature") class TestMobile:allure.story("杭州的手机号story")allure.title("测…

多层记忆增强外观-运动对齐框架用于视频异常检测 论文阅读

MULTI-LEVEL MEMORY-AUGMENTED APPEARANCE-MOTION CORRESPONDENCE FRAMEWORK FOR VIDEO ANOMALY DETECTION 论文阅读 摘要1.介绍2.方法2.1外观和运动对其建模2.2.记忆引导抑制模块2.3. Training Loss2.4. Anomaly Detection 3.实验与结果4.结论 论文标题&#xff1a;MULTI-LEVE…

重磅!大模型(LLMs)排行榜清单发布!

目前&#xff0c;人工智能领域呈现出一片蓬勃发展的景象&#xff0c;大型模型成为了激发这一繁荣的关键引擎。 国内不仅涌现了众多大模型&#xff0c;而且它们的发展速度之快令人瞩目。这种全面拥抱大型模型的态势为整个人工智能生态系统赋予了新的活力&#xff0c;让我们对国…

栈——OJ题

&#x1f4d8;北尘_&#xff1a;个人主页 &#x1f30e;个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上&#xff0c;不忘来时的初心 文章目录 一、最小栈1、题目讲解2、思路讲解3、代码实现 二、栈的压入、弹出序列1、题目讲解2、思路讲解…

CCD相机为什么需要积分球均匀光源

积分球内腔是一个具备高漫反射特性的收光球&#xff0c;其内部中空、内球面均匀地涂有漫反射材料&#xff0c;具有匀光与混光的作用&#xff0c;因此常常被用来做收光的均光球。由于光源性能等因素的影响&#xff0c;可能导致出射光线带偏振方向、出光不均匀&#xff0c;使用积…

Windows11环境下配置深度学习环境(Pytorch)

目录 1. 下载安装Miniconda2. 新建Python3.9虚拟环境3. 下载英伟达驱动4. 安装CUDA版Pytorch5. CPU版本pytorch安装 1. 下载安装Miniconda 下载安装包&#xff1a;镜像文件地址 将Miniconda相关路径添加至系统变量的路径中。 打开Anaconda Powershell Prompt&#xff0c;输入…