RabbitMQ高级特性 - 消息分发(限流、负载均衡)

news2024/9/24 3:18:10

文章目录

  • RabbitMQ 消息分发
    • 概述
    • 如何实现消费分发机制(限制每个队列消息数量)
    • 使用场景
      • 限流
        • 背景
        • 实现 demo
      • 非公平发送(负载均衡)
        • 背景
        • 实现 demo

RabbitMQ 消息分发


概述

RabbitMQ 的队列在有多个消费者订阅时,默认会通过轮询的机制将消息分发给不同的消费者,但是有些消费者消费速度慢,有些消费者消费速度快,就会导致消费速度慢的消费者影响整个的任务的吞吐量下降.

例如,公司有1个正式员工和1个实习生,现在有 10 个任务分配平均给他们(各 5 个),而由于实习生干活比较慢,就会导致整个完成任务的吞吐量下降.

消息分发机制给 “正式工” 多分一些任务,给 “实习生” 少分一些任务.

如何实现消费分发机制(限制每个队列消息数量)

可以在配置文件中配置 prefetchCount(或者使用原生的 channel.basicQos(int prefetchCount)),来限制当前消息通道上(channel)的每一个消费所能保持的最大未确认消息的数量.

例如 prefetchCount 为 10,并且一个 channel 上有两个消费者,那么每个消费者都最多接收 10 条未确认的消息. 此时整个 channel 上未确认消息总数可能达到 20 条.

具体使用:例如配置 prefetch = 5,那么 RabbitMQ 就会为消费者计数. 发送一条消息计数+1,消费一条消息计数-1,当达到了上限5,mq队列 就不会再发送消息,直到消费者确认了某条消息(类似 TCP 中的华滑动窗口).

使用场景

限流

背景

假设,订单系统每秒最多处理 1000 请求,正常情况下,该订单系统可以满足日常使用.
但是在突发的秒杀场景下,请求瞬间增多,每秒 1w qps,这不得把订单系统打成筛子.

问题:mq 在中间的话,不是已经有削峰填谷的作用了么?为什么还要使用 mq 的 prefetch 限流机制?
尽管消息队列可以延缓高峰压力,但消费者的处理能力还是有限的(如果不配置 prefetch,消费者自身从队列中取消息的量是不可控的). 如果消费者一次性取走过多的消息,就可能会导致资源紧张. prefetch 限流就是用来控制每个消费者取消息的数量,确保消费者不会过载.

实现 demo

假设限制未确认消息上限为 5,发送消息数量为 20.

a)配置 prefetch 参数,设置应答方式为手动应答.

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: env-base
    port: 5672
    username: root
    password: 1111
    listener:
      simple:
        acknowledge-mode: manual # 手动确认
        prefetch: 5

b)配置交换机队列

@Configuration
class MQConfig {

    @Bean
    fun transQueue() = Queue(MQConst.TRANS_QUEUE)

    @Bean
    fun qosExchange() = DirectExchange(MQConst.QOS_EXCHANGE)
    @Bean
    fun qosQueue() = Queue(MQConst.QOS_QUEUE)
    @Bean
    fun qosBinding(): Binding = BindingBuilder
        .bind(qosQueue())
        .to(qosExchange())
        .with(MQConst.QOS_BINDING_KEY)

}

c)接口(生产者)

@RestController
@RequestMapping("/mq")
class MQApi(
    val rabbitTemplate: RabbitTemplate,
) {
    @RequestMapping("/qos")
    fun qos(): String {
        for (i in 1..20) {
            rabbitTemplate.convertAndSend(MQConst.QOS_EXCHANGE, MQConst.QOS_BINDING_KEY, "qos msg $i")
        }
        return "ok"
    }

}

d)消费者

@Component
class QosListener {

    @RabbitListener(queues = [MQConst.QOS_QUEUE])
    fun handMessage(
        message: Message,
        channel: Channel
    ) {
        val deliverTag = message.messageProperties.deliveryTag
        try {
            println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")
            // 这里不主动应答,模拟超长业务
            // channel.basicAck(deliverTag, false)
        } catch (e: Exception) {
            channel.basicNack(deliverTag, false, true)
        }
    }

}

e)效果如下:
可以观察到,消费者只接收到 5 个消息,但由于没有主动应答,队列 就不会给消费者发送新的消息.
在这里插入图片描述
在这里插入图片描述

Ps:此时如果直接关闭程序,这 5 个为应答的消息就会重回队列,成为 Ready 状态.
如下可以直接清理掉这些消息:
在这里插入图片描述

非公平发送(负载均衡)

背景

假设有两个消费者,mq 默认会按照轮询的策略将消息分发给消费者.

*但有一个中情况就比较尴尬:打个比方 一个是正式工,另一个是实习生,正式工就处理的很快,而实习生就很慢,就会造成整个任务的进度被拖慢. *

因此我们可以通过 负载均衡 的方式,让处理的快的消费者多处理一些,处理慢的消费者少处理一些.

具体的:只需要配置 prefetch,并开启自动应答即可. 这样一来,处理的快的消费者,自动应答的就更快,接收的消息也就更多.

实现 demo

a)配置文件

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: env-base
    port: 5672
    username: root
    password: 1111
    listener:
      simple:
        acknowledge-mode: manual # 手动确认
        prefetch: 1 # 具体配置为多少,需要根据实际业务以及系统承受能力(压测)

b)生产者

    @RequestMapping("/qos")
    fun qos(): String {
        for (i in 1..20) {
            rabbitTemplate.convertAndSend(MQConst.QOS_EXCHANGE, MQConst.QOS_BINDING_KEY, "qos msg $i")
        }
        return "ok"
    }

c)两个消费者

@Component
class QosListener {

    @RabbitListener(queues = [MQConst.QOS_QUEUE])
    fun fastHandMessage(
        message: Message,
        channel: Channel
    ) {
        val deliverTag = message.messageProperties.deliveryTag
        try {
            println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")
            Thread.sleep(1000)
            println("正式工: 任务处理完成!")
            channel.basicAck(deliverTag, false)
        } catch (e: Exception) {
            channel.basicNack(deliverTag, false, true)
        }
    }

    @RabbitListener(queues = [MQConst.QOS_QUEUE])
    fun slowHandMessage(
        message: Message,
        channel: Channel
    ) {
        val deliverTag = message.messageProperties.deliveryTag
        try {
            println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")
            Thread.sleep(2000)
            println("实习生: 任务处理完成!")
            channel.basicAck(deliverTag, false)
        } catch (e: Exception) {
            channel.basicNack(deliverTag, false, true)
        }
    }

}

d)效果如下:
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1976382.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL —— 库,数据类型 与 表

库与基础操作 1.1 查看数据库 使用 show databases; 可以查看当前 MySQL 目前有多少个数据库 5 rows 表示有 5 行,这里是表示的是有效的数据,不包括 第一行的指引 set 表示结果集合 0.01 sec 表示这个 sql 语句一共运行了0.01 秒,一般情况…

【多线程】线程的五种创建方法

文章目录 线程在 Java 代码中编写多线程程序Thread 标准库 创建线程的写法1 . 继承 Thread 类代码回调函数休眠操作:sleep()抢占式执行观察线程jconsoleIDEA 内置调试器 2 . 实现 Runnable 接口代码 3. 匿名内部类创建 Thread ⼦类对象代码匿名内部类 4.匿名内部类创…

Python数据分析案例57——信贷风控模型预测评估及其可解释性(shap, scorecardpy包应用)

案例背景 在信贷风控场景下,其实模型的可解释性就变得很重要。在平时做一些普通的机器学习的案例的时候,我们根本不关心这些变量是怎么究竟影响到模型最后的决策的,随便直接把数据丢进去,再把要预测的数据丢进去就能出结果。但是…

Rabbitmq的几种工作模式

工具类 public class RabbitMQConnection {public static Connection getConnection() throws Exception{//1.创建connectionFactoryConnectionFactory connectionFactory new ConnectionFactory();//2.配置HostconnectionFactory.setHost("127.0.0.1");//3.设置Po…

kafka从浅入深

一、什么是kafka? kafka本质上是一个消息队列MQ(Message Queue),用做数据流转。 1.使用消息队列的好处? 1.1、解耦:允许独立扩展或修改队列两头的处理过程; 1.2、可恢复性:即使一个…

Unity 资源之 Break Items - Toon VFX破碎物品与卡通硬币动画分享

Unity 特效资源分享 - 破碎物品与卡通硬币动画 一、前言二,资源包内容三、免费获取资源包 一、前言 今天为大家带来一份超级实用的视觉特效资源分享!我们精心整理了 6 个令人惊叹的破碎物品效果和 1 个萌趣十足的卡通硬币动画视觉特效,让您的…

编译和汇编的区别

一、编译 编译是将高级语言(如C、C、Java等)编写的源代码转换成计算机可以直接执行的低级语言(通常是机器语言或汇编语言)的过程 编译 —— 将人类可读的源代码转换为计算机可执行的指令集 编译过程 通常包括词法分析、语法分…

bootloader开发总结

bootloader开发总结 首先明白了BootLoader和应用程序之间跳转,就明白了大概。上电启动程序,会有一个程序入口,这个入口由0x33fff6(28335的)地址决定。 应用程序也会有一个启动入口,这个用户可以自己决定。 bin文件是高地址在前&a…

【数据结构】非线性表----二叉树详解

二叉树与普通的树的本质上的区别实际上只有一个——子结点的数量。 普通的树:任意数量的子结点 二叉树:只有两个子结点,也称为左孩子和右孩子结点。 二叉树一共有五种形态: 1.空二叉树。 2.只有一个根结点。 3.根结点只有左子树…

【OpenCV C++20 学习笔记】图像缩放-高斯金字塔

图像缩放-高斯金字塔 原理高斯金字塔 代码实现放大缩小形成金字塔 原理 在图像处理中,经常需要将图像转化成不同的尺寸,即放大或缩小。 除了直接用resize()函数重新设置图片尺寸,另一种常用的方法就是“图像金字塔”。 图像金字塔是从底层的…

vector的底层原理剖析及其实现

vector 一、定义二、常用接口及模拟实现三、vector迭代器失效问题四、使用memcpy拷贝会出现的问题五、二维数组vector<vector< T >> vv 一、定义 vector 是 C 标准模板库&#xff08;Standard Template Library, STL&#xff09;中的一个非常有用的容器。它是一个…

23款奔驰GLS450加装原厂电吸门配置,提升车辆舒适性和便利性

今天是一台22款奔驰GLS450&#xff0c;车主是佛山的 以前被不良商家坑了 装了副厂的电吸门 刚开始就很正常 用了半年之后 就开始开不了门&#xff0c;被锁在里面&#xff0c;刚开始车主以为是零件坏了 后来越来越频繁&#xff0c;本来是为了家里老人小孩关门方便而升级的&#…

J031_使用TCP协议支持与多个客户端同时通信

一、需求文档 使用TCP协议支持与多个客户端同时通信。 1.1 Client package com.itheima.tcp2;import java.io.DataOutputStream; import java.io.OutputStream; import java.net.Socket; import java.util.Scanner;public class Client {public static void main(String[] a…

软件设计之Java入门视频(22)

软件设计之Java入门视频(22) 视频教程来自B站尚硅谷&#xff1a; 尚硅谷Java入门视频教程&#xff0c;宋红康java基础视频 相关文件资料&#xff08;百度网盘&#xff09; 提取密码&#xff1a;8op3 idea 下载可以关注 软件管家 公众号 学习内容&#xff1a; 该视频共分为1-7…

Flask 介绍

Flask 介绍 为什么要学 Flask框架对比设计哲学功能特点适用场景学习曲线总结 Flask 的特点Flask 常用扩展包Flask 的基本组件Flask 的应用场景官方文档官方文档链接文档内容概述学习建议 Flask 是一个使用 Python 编写的轻量级 Web 应用框架。它旨在让 Web 开发变得快速、简单且…

ACl访问控制实验

要求&#xff1a;PC1可以telnet登录r1&#xff0c;不能ping通r1&#xff0c;pc1可以ping通r2&#xff0c;但不能telnet登录r2&#xff0c;pc2的所有限制与pc1相反 实验思路&#xff1a;因为华为的ensp默认允许所有&#xff0c;所以只写拒绝规则就行 rule 5 deny icmp source 19…

只需0.5秒 Stability AI新模型超快生成3D图像

生成式人工智能&#xff08;AI&#xff09;明星初创公司Stability AI 8月发布最新突破性3D模型Stable Fast 3D&#xff0c;将单张图片生成3D图像的速度大幅提升。Stability AI今年3月发布的3D模型SV3D需要多达10分钟生成3D资产&#xff0c;基于TripoSR的新模型Stable Fast 3D完…

【面试官:我看你SQL语句掌握的怎么样?面试SQL语句专题3】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

【教程】Python语言的地球科学常见数据—— IMS积雪覆盖数据的处理

将ASCII数据转化为netCDF数据、分析新疆北疆、青藏高原和东北地区气候态积雪分布、分析新疆北疆、青藏高原和东北地区积雪面积变化规律。 美国国家冰雪中心&#xff08;NSIDC&#xff09;从 1997 年 2 月至今的北半球雪盖和海冰的地图。这些数据以 ASCII 文本和 GeoTIFF 格式提…

AIWEB1综合靶场通关教程,从外网打到内网【附靶场环境】

前言 靶场获取后台回【aiweb1】 下载之后设置为nat模式 启动即可&#xff0c;不需要登录 靶机复现 主机发现 访问即可 信息收集robots.txt文件 访问尝试&#xff0c;原来是什么也没有的&#xff0c;404 我们去访问这个上级目录&#xff0c;发现有一个id 注入测试 语法错误&am…