RabbitMQ 高级特性——消息分发

news2025/1/16 3:50:39

在这里插入图片描述

文章目录

  • 前言
  • 消息分发
    • RabbitMQ 分发机制的应用场景
      • 1. 限流
      • 2. 负载均衡

前言

当 RabbitMQ 的队列绑定了多个消费者的时候,队列会把消息分发给不同的消费者,每条消息只会发送给订阅列表的一个消费者,但是呢,RabbitMQ 默认是以轮询的方式进行分发的,而不会管消费者是否已经消费并且已经确认了消息,这种方式其实是不合理的,因为每个消费者的消费能力是不同的,如果某个消费者的消费能力很低,那么就会导致其他的消费者已经消费完成所有消息了,但是这个消费者还有很多消息需要消费,这样就会导致消息积压。那么该任何解决这个问题呢?就是我们这篇文章需要讲到的消息分发。

消息分发

RabbitMQ 的消息分发机制主要分为两种:1. 轮询分发 2. 非公平分发

  1. 轮询分发
  • 在默认情况下,RabbitMQ采用轮询的方式将队列中的消息分发给消费者。这意味着如果有多个消费者订阅了同一个队列,RabbitMQ会尝试公平地将消息依次分发给每个消费者。
  • 轮询分发机制确保了消息在多个消费者之间的均衡分配,避免了某个消费者过载而其他消费者空闲的情况。
  1. 非公平分发
  • 为了更好地控制消息的分发过程,RabbitMQ提供了非公平分发的机制。在这种机制下,消费者可以通过设置basic.qos方法并指定prefetch_count参数来限制RabbitMQ一次性发送给它的消息数量。
  • 通过调整prefetch_count的值,消费者可以根据自己的处理能力来控制消息的分发速度,从而避免因为处理速度不同而导致的消息堆积或空闲。

RabbitMQ 分发机制的应用场景

消息分发的常见应用场景有两个:

  1. 限流
  2. 负载均衡

1. 限流

每逢双十一或者其他节日的时候,某些购物平台的订单量会激增,这样就会导致单个服务器接收的订单数量超过了能够承受的范围,所以为了保证我们的订单服务器能够正常运行不发生宕机故障,就需要对服务器接收的消息数量做出限制。

那么如何实现限流的功能呢?我们通过设置 prefetchCount 参数并且设置确认方式为手动确认,prefetchCount 就是控制消费者从队列中预取消息的数量,以此来实现限流和负载均衡。通过设置这个配置,就可以保证消费者中最多只能存在 prefetchCount 个未确认的消息。

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 设置确认方式为手动确认
        prefetch: 5 # 限制消费者只能接收5条消息
public static final String QOS_EXCHANGE = "qos.exchange";
public static final String QOS_QUEUE = "qos.queue";

声明交换机、队列和绑定关系:

@Configuration
public class QosConfig {
    @Bean("qosExchange")
    public Exchange qosExchange() {
        return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();
    }

    @Bean("qosQueue")
    public Queue qosQueue() {
        return QueueBuilder.durable(Constants.QOS_QUEUE).build();
    }

    @Bean("qosBinding")
    public Binding qosBinding(@Qualifier("qosExchange") Exchange exchange, @Qualifier("qosQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();
    }
}

生产者:

@RequestMapping("/qos")
public String qos() {
    for (int i = 0; i < 20; i++) {
        rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE,"qos","rabbitmq qos" + i);
    }
    return "消息发送成功";
}

消费者:

@Component
public class QosListener {
    @RabbitListener(queues = Constants.QOS_QUEUE)
    public void listener1(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("消费者1接收到消息:" + new String(message.getBody()) + ".deliveryTag:" + deliveryTag);
        //channel.basicAck(deliveryTag,false); 手动确认消息,我们这里不确认,看看在没有确认的情况下,队列会向消费者投递多少条消息
    }

    @RabbitListener(queues = Constants.QOS_QUEUE)
    public void listener2(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("消费者2接收到消息:" + new String(message.getBody()) + ".deliveryTag:" + deliveryTag);
        //channel.basicAck(deliveryTag,false); 手动确认消息,我们这里不确认,看看在没有确认的情况下,队列会向消费者投递多少条消息
    }
}

在这里插入图片描述

在这里插入图片描述

可以看到通过设置 prefetchCount 可以实现限流的效果,而如果我们讲这个配置给注掉的话,那么队列中的消息会全部打给消费者:

在这里插入图片描述

2. 负载均衡

因为每个服务器的处理业务能力不同,有的服务器处理业务速度很快,而有的服务器处理业务的速度则很慢,如果按照轮询的方式分发消息的话,就会出现某些服务器很忙,有些服务器处理完成业务之后很闲的情况,对于这种情况,我们可以通过设置 prefetchCount 的值为 1 来实现负载均衡。

只有当消费者处理完成消息并且手动确认之后,队列才会继续向其发送下一条消息。

我们修改 prefetchCount 的值为 1,然后其他的代码不需要修改,只是通过 Thread.sleep() 方法来模拟出消费者消费速度不同的情况:

@Component
public class QosListener {
    @RabbitListener(queues = Constants.QOS_QUEUE)
    public void listener1(Message message, Channel channel) throws IOException, InterruptedException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        Thread.sleep(1000);
        System.out.println("消费者1接收到消息:" + new String(message.getBody()) + ".deliveryTag:" + deliveryTag);
        channel.basicAck(deliveryTag,false); 
    }

    @RabbitListener(queues = Constants.QOS_QUEUE)
    public void listener2(Message message, Channel channel) throws IOException, InterruptedException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        Thread.sleep(2000);
        System.out.println("消费者2接收到消息:" + new String(message.getBody()) + ".deliveryTag:" + deliveryTag);
        channel.basicAck(deliveryTag,false); 
    }
}

在这里插入图片描述
通过设置 prefetchCount 的值为 1,就可以实现出负载均衡的效果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2236301.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深度学习:bert模型

multi-headed机制 1、通过不同的head得到多个特征表达&#xff0c;一般8个head 2、将所有特征拼接在一起 3、降维&#xff0c;将Z0~Z7连接一个FC全连接实现降维 多层堆叠 位置编码 如何实现位置编码&#xff1f; &#xff08;1&#xff09;为每个时间步添加一个0-1范围内的数…

Vue实战学习(2)(Vue快速入门(快速构建一个局部Vue项目))

目录 一、Vue快速入门。 &#xff08;1&#xff09;快速入门的案例需求。 &#xff08;2&#xff09;原生js解决。 &#xff08;3&#xff09;使用Vue解决。 1、准备一个html页面。且该页面需要引入Vue模块。 2、创建Vue程序的应用实例。 3、准备html元素&#xff08;如div&…

SpringMVC学习记录(三)之响应数据

SpringMVC学习记录&#xff08;三&#xff09;之响应数据 一、页面跳转控制1、快速返回模板视图2、转发和重定向 二、返回JSON数据1、前置准备2、ResponseBody 三、返回静态资源1、静态资源概念2、访问静态资源 /*** TODO: 一个controller的方法是控制层的一个处理器,我们称为h…

Spring WebFlux 核心原理(2-3)

1、Project Reactor 高级 1.1、响应式流的生命周期 要理解多线程的工作原理以及 Reactor 中实现的各种内部优化&#xff0c;首先必须了解 Reactor 中响应式类型的生命周期。 1.1.1、组装时 流生命周期的第一部分是组装时&#xff08;assembly-time&#xff09;。 Reactor 提供…

Python爬虫与Web渗透测试入门指南——初学者防踩雷

目录 Python爬虫与Web渗透测试入门指南一、学习方向和基础知识Python爬虫学习方向Web渗透学习方向 二、具体知识点总结三、学习流程和典型案例案例1&#xff1a;Python爬虫 - 简单网页数据爬取案例2&#xff1a;Web渗透 - SQL注入漏洞检测与利用案例3&#xff1a;Python爬虫 - …

apache-seata-2.1.0 AT模式使用篇(配置简单)

最近在研究seata的AT模式&#xff0c;先在本地搭建了一个演示demo&#xff0c;看看seata是如何使用的。在网上搜的demo&#xff0c;配置相对来说都比较多。我最终搭建的版本&#xff0c;配置较少&#xff0c;所以写篇文章分享下&#xff0c;希望能帮到对seata感兴趣的小伙伴。先…

Java代码与数据库纽带——JDBC

ok&#xff0c;看了题目&#xff0c;就可以知道今天要分享的是JDBC 讲这个这之前&#xff0c;想讲讲之前的。 之前我们操作数据库基本都是通过MySQL客户端&#xff0c;进行编写sql语句来操作的。 但是我们在开发中一般都是通过代码来操控数据库的。 而且在我们日常开发中&a…

navicat pg库安装mysql fdw 外表扩展

在Windows上手动安装mysql_fdw&#xff08;MySQL Foreign Data Wrapper&#xff09;通常涉及一系列步骤&#xff0c;包括下载源码、编译、配置和测试。以下是一个详细的指南&#xff1a; 一、下载mysql_fdw源码 访问mysql_fdw的GitHub发布页面&#xff0c;选择最新版本的源码…

智能提醒助理系列-jdk8升级到21,springboot2.3升级到3.3

本系列文章记录“智能提醒助理”产品建设历程&#xff0c;记录实践经验、巩固知识点、锻炼总结能力。 本篇介绍技术栈升级的过程&#xff0c;遇到的问题和解决方案。 一、需求出发点 智能提醒小程序 当前使用的是jdk8&#xff0c;springboot2.3,升级到jdk21和springboot3.3 学…

雷军-2022.8小米创业思考-11-新零售:用电商思维做新零售,极致的效率+极致的体验。也有弯路,重回极致效率的轨道上。

第十一章 新零售 当我们说到小米模式的时候&#xff0c;其实我们说的是两件东西&#xff1a; 一是小米模式的本质&#xff0c;即高效率的商业模式&#xff1b; 另一件是小米这家公司具象的商业模式&#xff0c;这是小米在实践中摸索、建立的一整套业务模型。 从2015年到202…

人工智能——小白学习指南

知孤云出岫 目录 1. **智能评测系统**2. **个性化学习路径推荐**3. **虚拟学习助手**4. **学习行为分析**5. **数据驱动的教学决策**6. **自动化课程推荐**7. **数据隐私与安全保护** 人工智能知识点的总结和学习路线&#xff0c;以数据表格形式呈现&#xff0c;并附带在教育行…

【深度学习基础】常用图像卷积核类型

&#x1f308; 个人主页&#xff1a;十二月的猫-CSDN博客 &#x1f525; 系列专栏&#xff1a; &#x1f3c0;深度学习_十二月的猫的博客-CSDN博客 &#x1f4aa;&#x1f3fb; 十二月的寒冬阻挡不了春天的脚步&#xff0c;十二点的黑夜遮蔽不住黎明的曙光 目录 1. 前言 2. 常…

SpringCloud框架学习(第一部分:初始项目搭建)

目录 一、SpringBoot和SpringCloud版本选型 1.Springcloud版本选择 2.Springcloud版本选择 3.Springcloud Alibaba版本选择 4.SpringCloud VS SpringBoot VS SpringCloud Alibaba版本三者制约对应关系 二、SpringCloud介绍 1.单体架构 2.微服务架构 3.springcloud 4.S…

【动手学运动规划】 4.1 图搜的基础

&#x1f3f0;代码及环境配置&#xff1a;请参考 环境配置和代码运行! 4.1.1 基础概念 4.1.1.1 Configuration Space(配置空间) configuration: 机器人上每一点位置的完整说明degrees of freedom: 机器人能够独立移动或旋转的关节数量&#xff08;下图所示有4个自由度&#x…

如何用彩屏显示精美的动画

1什么样的动画是精美的&#xff1f; 1&#xff09;视觉暂留 视频播放的原理基于人眼的视觉暂留现象。‌视频是由一系列静态图像&#xff08;帧&#xff09;组成的&#xff0c;这些图像以特定的频率&#xff08;帧率&#xff09;连续播放&#xff0c;使得人眼无法区分单帧图像&…

信息安全工程师(81)网络安全测评质量管理与标准

一、网络安全测评质量管理 遵循标准和流程 网络安全测评应严格遵循国家相关标准和流程&#xff0c;确保测评工作的规范性和一致性。这些标准和流程通常包括测评方法、测评步骤、测评指标等&#xff0c;为测评工作提供明确的指导和依据。 选择合格的测评团队 测评团队应具备相关…

【CTFN】基于耦合翻译融合网络的多模态情感分析的层次学习

同样用了翻译模块的论文->MTMSA 代码地址->github地址 abstract 多模态情感分析是一个具有挑战性的研究领域&#xff0c;涉及多个异构模态的融合。主要的挑战是在多模式融合过程中出现一些缺失的模式。然而&#xff0c;现有的技术需要所有的模态作为输入&#xff0c;因…

1.每日SQL----2024/11/7

题目&#xff1a; 计算用户次日留存率,即用户第二天继续登录的概率 表&#xff1a; iddevice_iddate121382024-05-03232142024-05-09332142024-06-15465432024-08-13523152024-08-13623152024-08-14723152024-08-15832142024-05-09932142024-08-151065432024-08-131123152024-…

安利一款开源企业级的报表系统SpringReport

SpringReport是一款企业级的报表系统&#xff0c;支持在线设计报表&#xff0c;并绑定动态数据源&#xff0c;无需写代码即可快速生成想要的报表&#xff0c;可以支持excel报表和word报表两种格式&#xff0c;同时还可以支持excel多人协同编辑&#xff0c;后续考虑实现大屏设计…

使用ookii-dialogs-wpf在WPF选择文件夹时能输入路径

在进行WPF开发时&#xff0c;System.Windows.Forms.FolderBrowserDialog的选择文件夹功能不支持输入路径&#xff1a; 希望能够获得下图所示的选择文件夹功能&#xff1a; 于是&#xff0c;通过NuGet中安装Ookii.Dialogs.Wpf包&#xff0c;并创建一个简单的工具类&#xff1a; …