RabbitMQ的WorkQueues模型

news2024/11/24 10:40:45

WorkQueues模型

Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息
在这里插入图片描述

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

接下来,我们就来模拟这样的场景。

在这里插入图片描述

首先,我们在控制台创建一个新的队列,命名为work.queue
image.png

3.3.1.消息发送

这次我们循环发送,模拟大量消息堆积现象。
在publisher服务中的SpringAmqpTest类中添加一个测试方法:

/**
     * workQueue
     * 向队列中不停发送消息,模拟消息堆积。
     */
@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "work.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

3.3.2.消息接收

要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}

@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:

  • 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
  • 消费者2 sleep了200毫秒,相当于每秒处理5个消息

3.3.3.测试

启动ConsumerApplication后,在执行publisher服务中刚刚编写的发送测试方法testWorkQueue。
最终结果如下:

消费者1接收到消息:【hello, message_0】21:06:00.869555300
消费者2........接收到消息:【hello, message_1】21:06:00.884518
消费者1接收到消息:【hello, message_2】21:06:00.907454400
消费者1接收到消息:【hello, message_4】21:06:00.953332100
消费者1接收到消息:【hello, message_6】21:06:00.997867300
消费者1接收到消息:【hello, message_8】21:06:01.042178700
消费者2........接收到消息:【hello, message_3】21:06:01.086478800
消费者1接收到消息:【hello, message_10】21:06:01.087476600
消费者1接收到消息:【hello, message_12】21:06:01.132578300
消费者1接收到消息:【hello, message_14】21:06:01.175851200
消费者1接收到消息:【hello, message_16】21:06:01.218533400
消费者1接收到消息:【hello, message_18】21:06:01.261322900
消费者2........接收到消息:【hello, message_5】21:06:01.287003700
消费者1接收到消息:【hello, message_20】21:06:01.304412400
消费者1接收到消息:【hello, message_22】21:06:01.349950100
消费者1接收到消息:【hello, message_24】21:06:01.394533900
消费者1接收到消息:【hello, message_26】21:06:01.439876500
消费者1接收到消息:【hello, message_28】21:06:01.482937800
消费者2........接收到消息:【hello, message_7】21:06:01.488977100
消费者1接收到消息:【hello, message_30】21:06:01.526409300
消费者1接收到消息:【hello, message_32】21:06:01.572148
消费者1接收到消息:【hello, message_34】21:06:01.618264800
消费者1接收到消息:【hello, message_36】21:06:01.660780600
消费者2........接收到消息:【hello, message_9】21:06:01.689189300
消费者1接收到消息:【hello, message_38】21:06:01.705261
消费者1接收到消息:【hello, message_40】21:06:01.746927300
消费者1接收到消息:【hello, message_42】21:06:01.789835
消费者1接收到消息:【hello, message_44】21:06:01.834393100
消费者1接收到消息:【hello, message_46】21:06:01.875312100
消费者2........接收到消息:【hello, message_11】21:06:01.889969500
消费者1接收到消息:【hello, message_48】21:06:01.920702500
消费者2........接收到消息:【hello, message_13】21:06:02.090725900
消费者2........接收到消息:【hello, message_15】21:06:02.293060600
消费者2........接收到消息:【hello, message_17】21:06:02.493748
消费者2........接收到消息:【hello, message_19】21:06:02.696635100
消费者2........接收到消息:【hello, message_21】21:06:02.896809700
消费者2........接收到消息:【hello, message_23】21:06:03.099533400
消费者2........接收到消息:【hello, message_25】21:06:03.301446400
消费者2........接收到消息:【hello, message_27】21:06:03.504999100
消费者2........接收到消息:【hello, message_29】21:06:03.705702500
消费者2........接收到消息:【hello, message_31】21:06:03.906601200
消费者2........接收到消息:【hello, message_33】21:06:04.108118500
消费者2........接收到消息:【hello, message_35】21:06:04.308945400
消费者2........接收到消息:【hello, message_37】21:06:04.511547700
消费者2........接收到消息:【hello, message_39】21:06:04.714038400
消费者2........接收到消息:【hello, message_41】21:06:04.916192700
消费者2........接收到消息:【hello, message_43】21:06:05.116286400
消费者2........接收到消息:【hello, message_45】21:06:05.318055100
消费者2........接收到消息:【hello, message_47】21:06:05.520656400
消费者2........接收到消息:【hello, message_49】21:06:05.723106700

可以看到消费者1和消费者2竟然每人消费了25条消息:

  • 消费者1很快完成了自己的25条消息
  • 消费者2却在缓慢的处理自己的25条消息。

也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。

3.3.4.能者多劳(prefetch)

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

再次测试,发现结果如下:

消费者1接收到消息:【hello, message_0】21:12:51.659664200
消费者2........接收到消息:【hello, message_1】21:12:51.680610
消费者1接收到消息:【hello, message_2】21:12:51.703625
消费者1接收到消息:【hello, message_3】21:12:51.724330100
消费者1接收到消息:【hello, message_4】21:12:51.746651100
消费者1接收到消息:【hello, message_5】21:12:51.768401400
消费者1接收到消息:【hello, message_6】21:12:51.790511400
消费者1接收到消息:【hello, message_7】21:12:51.812559800
消费者1接收到消息:【hello, message_8】21:12:51.834500600
消费者1接收到消息:【hello, message_9】21:12:51.857438800
消费者1接收到消息:【hello, message_10】21:12:51.880379600
消费者2........接收到消息:【hello, message_11】21:12:51.899327100
消费者1接收到消息:【hello, message_12】21:12:51.922828400
消费者1接收到消息:【hello, message_13】21:12:51.945617400
消费者1接收到消息:【hello, message_14】21:12:51.968942500
消费者1接收到消息:【hello, message_15】21:12:51.992215400
消费者1接收到消息:【hello, message_16】21:12:52.013325600
消费者1接收到消息:【hello, message_17】21:12:52.035687100
消费者1接收到消息:【hello, message_18】21:12:52.058188
消费者1接收到消息:【hello, message_19】21:12:52.081208400
消费者2........接收到消息:【hello, message_20】21:12:52.103406200
消费者1接收到消息:【hello, message_21】21:12:52.123827300
消费者1接收到消息:【hello, message_22】21:12:52.146165100
消费者1接收到消息:【hello, message_23】21:12:52.168828300
消费者1接收到消息:【hello, message_24】21:12:52.191769500
消费者1接收到消息:【hello, message_25】21:12:52.214839100
消费者1接收到消息:【hello, message_26】21:12:52.238998700
消费者1接收到消息:【hello, message_27】21:12:52.259772600
消费者1接收到消息:【hello, message_28】21:12:52.284131800
消费者2........接收到消息:【hello, message_29】21:12:52.306190600
消费者1接收到消息:【hello, message_30】21:12:52.325315800
消费者1接收到消息:【hello, message_31】21:12:52.347012500
消费者1接收到消息:【hello, message_32】21:12:52.368508600
消费者1接收到消息:【hello, message_33】21:12:52.391785100
消费者1接收到消息:【hello, message_34】21:12:52.416383800
消费者1接收到消息:【hello, message_35】21:12:52.439019
消费者1接收到消息:【hello, message_36】21:12:52.461733900
消费者1接收到消息:【hello, message_37】21:12:52.485990
消费者1接收到消息:【hello, message_38】21:12:52.509219900
消费者2........接收到消息:【hello, message_39】21:12:52.523683400
消费者1接收到消息:【hello, message_40】21:12:52.547412100
消费者1接收到消息:【hello, message_41】21:12:52.571191800
消费者1接收到消息:【hello, message_42】21:12:52.593024600
消费者1接收到消息:【hello, message_43】21:12:52.616731800
消费者1接收到消息:【hello, message_44】21:12:52.640317
消费者1接收到消息:【hello, message_45】21:12:52.663111100
消费者1接收到消息:【hello, message_46】21:12:52.686727
消费者1接收到消息:【hello, message_47】21:12:52.709266500
消费者2........接收到消息:【hello, message_48】21:12:52.725884900
消费者1接收到消息:【hello, message_49】21:12:52.746299900

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。
正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

3.3.5.总结(如何解决消息堆积的问题?)

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1861552.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

80、443端口不能开放也能为IP地址申请SSL证书!

IP地址证书作为一种特定的证书&#xff0c;不同于传统的域名验证证书&#xff0c;IP地址证书是通过验证IP地址来确保安全连接。在证书申请过程中&#xff0c;往往要求短暂开放80或者443端口&#xff0c;如果不能开放&#xff0c;IP地址证书则不能签发。 JoySSL提供的IP地址证书…

如何采集拼多多的商品或店铺数据

怎么使用简数采集器批量采集拼多多的商品或店铺相关信息呢&#xff1f; 简数采集器暂时不支持采集拼多多的商品或店铺相关数据&#xff0c;只能采集页面公开显示的信息&#xff0c;谢谢。 简数采集器采集网站文章资讯等数据特别简单高效&#xff1a;只需输入网站网址&#xf…

文心一言使用笔记

目录 让文心一言提炼已有的内容&#xff0c;模仿给出的案例写一段宣传稿方法例子 发现写出的内容有瑕疵&#xff0c;如何微调&#xff1f;比如文心一言介绍的领导不全如何让文心一言检查语法和表达问题&#xff1f; 如何让文心一言将每个片段用一两句话总结&#xff1f;为了防止…

Open3D 点云的ISS关键点提取

目录 一、概述 1.1原理 1.2应用场景 1.3算法实现步骤 二、代码实现 2.1 完整代码 2.2关键函数 2.3关键点可视化 三、实现效果 3.1原始点云 3.2提取后点云 一、概述 1.1原理 ISS&#xff08;Intrinsic Shape Signatures&#xff09;关键点提取是一种常用于三维点云的…

清九野小红盾舒敏牙刷,我的口腔护理新体验

我最近发现一款很舒服的牙刷&#xff0c;叫做清九野小红盾舒敏牙刷&#xff0c;很适合牙龈敏感的朋友&#xff0c;如果刷牙时常感到不适&#xff0c;不妨试试这款既能有效清洁牙齿&#xff0c;又不会刺激牙龈的牙刷。 ## &#x1f31f; 独特设计&#xff0c;双重植毛 首先&…

PR素材库,这里应有尽有!

Premiere简称PR&#xff0c;操作简单&#xff0c;容易上手&#xff0c;功能丰富&#xff0c;因此做为后期必备软件之一。如果进行视频剪辑&#xff0c;去哪里找合适的素材呢&#xff0c;其实国内外都有不少专业的渠道。 一、视频素材 01.摄图视频 传送门&#xff1a;https://…

面向遥感图像的小目标检测最新方法 FFCA-YOLO

论文简介 在遥感图像中&#xff0c;小目标检测面临着特征表示不足和背景混淆等挑战&#xff0c;特别是当算法需要在有限计算资源的约束下进行实时处理时&#xff0c;对准确性和速度的优化要求尤为严格。为解决这些问题&#xff0c;本文提出了一种高效的目标检测器——特征增强、…

用vite 打包之后项目不能在本地查看

安装插件 npm i vitejs/plugin-legacy 在vite.config.js 中配置 重新打包就可以正常访问了&#xff08;注意vite 和 vitejs/plugin-legacy的版本要匹配&#xff0c;否则会打包失败&#xff09;

什么是CDN?CDN的工作原理是怎样的?

1.什么是CDN&#xff1f; CDN的全称是Content Delivery Network&#xff0c;即内容分发网络。CDN是构建在网络之上的内容分发网络&#xff0c;依靠部署在各地的边缘服务器&#xff0c;通过中心平台的负载均衡、内容分发、调度等功能模块&#xff0c;使用户就近获取所需内容&am…

React hydrateRoot如何实现

React 服务器渲染中&#xff0c;hydrateRoot 是核心&#xff0c;它将服务器段的渲染与客户端的交互绑定在一起&#xff0c;我们知道 React 中 Fiber Tree 是渲染的的核心&#xff0c;那么 React 是怎么实现 hydrateRoot 的呢&#xff1f;首先我们验证一下&#xff0c;hydrateRo…

2024广东省职业技能大赛云计算赛项实战——集群部署GitLab Runner

集群部署GitLab Runner 前言 题目如下: 部署GitLab Runner 将GitLab Runner部署到gitlab-ci命名空间下&#xff0c;Release名称为gitlab-runner&#xff0c;为GitLab Runner创建持久化构建缓存目录/home/gitlab-runner/ci-build-cache以加速构建速度&#xff0c;并将其注册到…

Spark SQL 血缘解析方案

背景 项目背景建设数据中台,往往数据开发人员首先需要能够通过有效的途径检索到所需要的数据,然后根据检索的数据模型进行业务加工然后得到一些中间模型,最后再通过数据抽取工具或者OLAP分析工具直接将数据仓库中加工好的公共模型输出到应用层。这里我不在去介绍数据仓库为…

在操作系统中,background通常指的是运行于后台的进程或任务

在计算机中&#xff0c;"background"一词具有多种含义&#xff0c;以下是一些主要的解释和相关信息&#xff1a; 计算机视觉中的背景&#xff08;Background&#xff09;&#xff1a; 在计算机视觉中&#xff0c;background指的是图像或视频中的背景部分&#xff0c;…

【TOOL】ceres学习笔记(一) —— 教程练习

文章目录 一、Ceres Solver 介绍二、Ceres 使用基本步骤1. 构建最小二乘问题2. 求解最小二乘问题 三、使用案例1. Ceres Helloworld2. Powell’s Function3. Curve Fitting4. Robust Curve Fitting 一、Ceres Solver 介绍 Ceres-solver 是由Google开发的开源C库&#xff0c;用…

接口测试代码和工具

通过python的requests给接口发送请求进行测试 #coding:utf-8 import requests class TestApi(): url_login "https://legend-sit.omodaglobal.com/api/auth/oauth2/token" url_topic_b "https://legend-sit.omodaglobal.com/api/community/topic_b/page?…

12 学习总结:操作符

目录 一、操作符的分类 二、二进制和进制转换 &#xff08;一&#xff09;概念 &#xff08;二&#xff09;二进制 &#xff08;三&#xff09;进制转换 1、2进制与10进制的互换 &#xff08;1&#xff09;2进制转化10进制 &#xff08;2&#xff09;10进制转化2进制 2…

React useId Hook

React 中有一个 useId hook&#xff0c;可以生成一个唯一 ID&#xff0c;这个有什么用处呢&#xff0c;用个 UUID 是不是可以替代呢&#xff1f;如果我们只考虑客户端&#xff0c;那么生成唯一 Id 的方法比较简单&#xff0c;我们在 State 中保存一个计数器就好&#xff0c;但是…

第二天的课根本跟不上啊 难难难啊

编程实现三个数求最大 编程实现求解一元二次方程 传参问题 直接使用返回值 复制控制 复制控制是指在C中控制对象复制行为的机制&#xff0c; 包括拷贝构造函数&#xff08;copy constructor&#xff09;、 赋值操作符&#xff08;copy assignment operator&#xff09;、 …

Win10可用的VC6.0绿色版及辅助插件assist_X

VC6.0&#xff0c;作为微软的经典开发工具&#xff0c;承载着无数开发者的青春与回忆。它曾是Windows平台上软件开发的重要基石&#xff0c;为开发者们提供了稳定且强大的编程环境&#xff0c;尤其是其MFC&#xff08;Microsoft Foundation Classes&#xff09;库&#xff0c;为…

STM32HAL库--DMA实验(速记版)

本章利用 DMA 来实现串口数据传送&#xff0c;并在LCD 模块上显示当前的传送进度。 DMA 简介 DMA&#xff0c;全称为&#xff1a;Direct Memory Access&#xff0c;即直接存储器访问。DMA 传输方式无需 CPU 直接控制传输&#xff0c;也没有中断处理方式那样保留现场和恢复现场…