RabbitMQ(任务模型,交换机(广播,订阅,通配符订阅))

news2025/1/23 2:08:08

一.WorkQueues模型

WorkQueues(任务模式):让多个消费者绑定到一个队列,共同消费队列中的消息

架构:

所需场景:

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。 此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

1.新建队work.queue

2.生产者模块循环发送消息
    @Test
    void testWorkQueue() throws InterruptedException {
        String queueName = "work.queue";
        for (int i = 1; i <= 50; i++) {
            String msg = "hello, worker, message_" + i;
            rabbitTemplate.convertAndSend(queueName, msg);
            Thread.sleep(20);
        }
    }
3.消费者模块模拟多个消费者绑定该队列
    @RabbitListener(queues = "work.queue")
    public void listenWorkQueueMsg1(String msg) throws InterruptedException {
        System.out.println("消费者一接收到work.queue的信息" + msg+ LocalDateTime.now());
        Thread.sleep(20);
    }

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueueMsg2(String msg) throws InterruptedException {
        System.out.println("消费者2接收到work.queue的信息" + msg+LocalDateTime.now());
        Thread.sleep(200);
    }

消费者1 sleep了20毫秒,相当于每秒钟处理50个消息

消费者1 sleep了20毫秒,相当于每秒钟处理50个消息

4.测试

测试结果中:尽管给消费者二设置了比消费者一十倍长的休眠时间。但是两个消费者的处理信息个数是相同的。且消费者一很快就处理完消息,而剩下的消息只有消费者二去消费。并没有充分利用每一个消费者的能力。

可以通过配置消费者模块的yml文件解决该问题

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
5.二次测试

6.总结:

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理

  • 通过设置prefetch来控制消费者预取的消息数量

二.Exchange(交换机)

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失。

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机

  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列

  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符

  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

三.Fanout交换机(广播)

架构:

广播模式发送流程:

  • 1)  可以有多个队列

  • 2)  每个队列都要绑定到Exchange(交换机)

  • 3)  生产者发送的消息,只能发送到交换机

  • 4)  交换机把消息发送给绑定过的所有队列

  • 5)  订阅队列的消费者都能拿到消息

一.创建两个队列:fanout.queue1,fanout.queue2

二.创建一个交互机:fanout

三.将队列绑定至交互机

四.编写测试代码

生产者端:

    @Test
    void testSendFanout() {
        String exchangeName = "hmall.fanout";//交换机
        String msg = "hello, everyone!";
        rabbitTemplate.convertAndSend(exchangeName, null, msg);
    }

消费端:

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1 收到了 fanout.queue1的消息:【" + msg +"】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) throws InterruptedException {
        System.out.println("消费者2 收到了 fanout.queue2的消息:【" + msg +"】");
    }

测试结果:

五.总结

交换机的作用

  • 接收publisher发送的消息

  • 将消息按照规则路由到与之绑定的队列

  • 不能缓存消息,路由失败,消息丢失

  • FanoutExchange的会将消息路由到每个绑定的队列

四.Direct交换机(订阅)

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

结构:

在Direct模型下

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

一.测试逻辑:
  1. 声明一个名为hmall.direct的交换机

  2. 声明队列direct.queue1,绑定hmall.directbindingKeybludred

  3. 声明队列direct.queue2,绑定hmall.directbindingKeyyellowred

  4. consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

  5. 在publisher中编写测试方法,向hmall.direct发送消息

二.创建两个队列direct.queue1和direct.queue2

三.创建一个direct类型交换机 hmall.direct并绑定队列

四.编写测试类

消费模块:

    @RabbitListener(queues = "direct.queue1")
    public void listenDirectQueue1(String msg){
        System.out.println("消费者1 收到了 direct.queue1的消息:【" + msg +"】");
    }

    @RabbitListener(queues = "direct.queue2")
    public void listenDirectQueue2(String msg){
        System.out.println("消费者2 收到了 direct.queue2的消息:【" + msg +"】");
    }

生产模块:

    @Test
    void testSendDirect() {
        String exchangeName = "hmall.direct";
        String msg = "蓝色通知,警报解除,哥斯拉是放的气球";
        rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
    }

测试结果:

rabbitTemplate.convertAndSend(exchangeName, "blue", msg)改为rabbitTemplate.convertAndSend(exchangeName, "red", msg)

测试结果:

五.总结:

Direct交换机与Fanout交换机的差异

  • Fanout交换机将消息路由给每一个与之绑定的队列

  • Direct交换机根据RoutingKey判断路由给哪个队列

  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

五.Topic交换机

架构:

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。 只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert

通配符规则:

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu

  • item.*:只能匹配item.spu

一.测试逻辑

假如此时生产者发送的消息使用的RoutingKey共有四种:

  • china.news代表有中国的新闻消息;

  • china.weather 代表中国的天气消息;

  • USA.news 则代表美国新闻

  • USA.weather 代表美国的天气消息;

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:

    • china.news

    • china.weather

  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:

    • china.news

    • USA.news

二.绑定交互机与队列

三.编写测试代码

消费者:

@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

生产者:

@Test
void testSendTopic() {
    String exchangeName = "hmall.topic";
    String msg = "今天天气挺不错,我的心情的挺好的";
    rabbitTemplate.convertAndSend(exchangeName, "china.weather", msg);
    rabbitTemplate.convertAndSend(exchangeName,"USA.news","美国新闻");
    rabbitTemplate.convertAndSend(exchangeName,"china.news","中国新闻");
}

测试结果

四.总结

Direct交换机与Topic交换机的差异

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割

  • Topic交换机与队列绑定时的bindingKey可以指定通配符

  • #:代表0个或多个词

  • *:代表1个词

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1495510.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大数据开发-Hadoop之MapReduce

文章目录 MapReduce原理剖析MapReduce之Map阶段MapReduce之Reduce阶段WordCount分析多文件WordCount分析 实战wordCount案例开发 MapReduce原理剖析 MapReduce是一种分布式计算模型,主要用于搜索领域&#xff0c;解决海量数据的计算问题MapReduce由两个阶段组成&#xff1a;Ma…

热红外图像直方图修正显示

热红外图像的直方图修正是一种用于增强图像对比度和可视化细节的技术。下面是一个使用Python和OpenCV库实现直方图均衡化的示例代码&#xff1a; import cv2 import numpy as np# 读取热红外图像 image cv2.imread(thermal_image.png, cv2.IMREAD_GRAYSCALE)# 对图像进行直方…

植被生长动态与多时间尺度干旱事件的关联性研究

随着全球气候变暖的趋势愈发明显&#xff0c;干旱事件不仅发生的频率增加&#xff0c;其持续时间和影响范围也在不断扩大。干旱对生态环境造成了严重破坏&#xff0c;导致生物多样性减少、土地退化和水资源短缺&#xff1b;对农业生产而言&#xff0c;干旱会导致作物减产甚至绝…

Java精品项目--第5期基于SpringBoot的高速收费系统的设计分析与实现

项目使用技术栈 SpringBootMavenShiroMySQLMybatis-PlusJavaJDK1.8HTML 系统介绍 项目截图

【详识C语言】程序环境和预处理

本章重点&#xff1a; 程序的翻译环境 程序的执行环境 详解&#xff1a;C语言程序的编译链接 预定义符号介绍 预处理指令 #define 宏和函数的对比 预处理操作符#和##的介绍 命令定义 预处理指令 #include 预处理指令 #undef 条件编译 程序的翻译环境和执行环境 在ANSI C的任何…

2024.3.6每日一题

LeetCode 找出数组中的 K -or 值 题目链接&#xff1a;2917. 找出数组中的 K-or 值 - 力扣&#xff08;LeetCode&#xff09; 题目描述 给你一个下标从 0 开始的整数数组 nums 和一个整数 k 。 nums 中的 K-or 是一个满足以下条件的非负整数&#xff1a; 只有在 nums 中&…

Jenkins-Android源码编译【架构设计】(适用鸿蒙/自动化/多产品/持续迭代)

文章目录 Jenkins-Android源码编译【架构设计】&#xff08;适用鸿蒙/自动化/多产品/持续迭代&#xff09;通俗介绍Jenkins框架设计Jenkins部署系统/插件配置JOB配置 源码编译环境准备AOSP编译基本框架编译脚本aosp_build_sciptsjenkins_build_sciptsStage1Stage2Stage3Stage4P…

灵魂指针,教给(一)

欢迎来到白刘的领域 Miracle_86.-CSDN博客 系列专栏 C语言知识 先赞后看&#xff0c;已成习惯 创作不易&#xff0c;多多支持&#xff01; 一、内存和地址 1.1 内存 在介绍知识之前&#xff0c;先来想一个生活中的小栗子&#xff1a; 假如把你放在一个有100间屋子的酒店…

上海亚商投顾:沪指震荡微涨 AI手机、军工板块集体走强

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 沪指昨日低开后震荡回升&#xff0c;黄白二线分化明显&#xff0c;银行等权重板块走势较强。AI手机概念股持续…

pytorch_retinaface训练Resnet50_Final.pth过程+无图版安装Nvidia+CUDA驱动GPU

背景 当前处于人脸检测分支&#xff0c;项目就是retinaface官方的代码加上数据集目录结构&#xff0c;目的是训练出最后的模型文件Resnet50_Final.pth 代码 https://gitee.com/congminglst/pytorch_-retinaface.git 项目结构与设计 图片数据集采用widerface&#xff0c; 前…

STM32CubeIDE基础学习-STM32CubeIDE软件快捷键介绍

STM32CubeIDE基础学习-STM32CubeIDE软件快捷键介绍 文章目录 STM32CubeIDE基础学习-STM32CubeIDE软件快捷键介绍前言第1章 查看快捷键方法第2章 设置快捷键方法第3章 常用快捷键示例总结 前言 这个STM32CubeIDE软件使用的是Eclipse框架的开发环境&#xff0c;所以所使用的快捷…

AntV L7初体验

本案例使用L7库和Mapbox GL JS创建的简单地图可视化示例&#xff0c;加载点数据。 文章目录 1. 引入 CDN 链接2. 导出模块3. 创建地图3.1. 注册 token3.2. 创建地图实例 4. 创建场景5.创建点图层6. 演示效果7. 代码实现 1. 引入 CDN 链接 <!-- 1.引入CDN链接 --> <!--…

泰迪智能科技-2024年高校大数据人才培养探索模式

随着数字经济的高速发展&#xff0c;对于大数据人才的需求日益增长。产业数字化和数字产业化之间的关系&#xff0c;已经成为推动社会发展的关键。为此&#xff0c;高校及产业界需要紧密配合&#xff0c;以培养出符合时代需求的大数据人才。 数字产业化与产业数字化高速发…

HarmonyOS NEXT应用开发案例集

概述 随着应用代码的复杂度提升&#xff0c;为了使应用有更好的可维护性和可扩展性&#xff0c;良好的应用架构设计变得尤为重要。本篇文章将介绍一个应用通用架构的设计思路&#xff0c;以减少模块间的耦合、提升团队开发效率&#xff0c;为开发者呈现一个清晰且结构化的开发…

windows11配置电脑IP

windows11配置电脑IP 选择"开始>设置>“网络&Internet >以太网”。在 "属性"下&#xff0c;编辑IP地址&#xff0c;子网掩码&#xff0c;网关以及DNS。

Qt ini配置文件

ini文件用于保存用户的设置操作&#xff0c;下列以背景颜色设置为例子 暂时默认设置为白色背景 这段代码放置在主窗口的构造函数中&#xff0c;用于初始化读取ini文件 QString color;QSettings *set new QSettings("color.ini",QSettings::IniFormat);set->begi…

《探索虚拟与现实的边界:VR与AR谁更能引领未来?》

引言 在当今数字时代&#xff0c;虚拟现实&#xff08;VR&#xff09;和增强现实&#xff08;AR&#xff09;技术正以惊人的速度发展&#xff0c;并逐渐渗透到我们的日常生活中。它们正在重新定义人与技术、人与环境之间的关系&#xff0c;同时也为各行各业带来了全新的可能性。…

学术论文GPT的源码解读与二次开发:从ChatPaper到gpt_academic

前言 本文的前两个部分最早是属于此旧文的《学术论文GPT的源码解读与微调&#xff1a;从ChatPaper到七月论文审稿GPT第1版》&#xff0c;但为了每一篇文章各自的内容更好的呈现&#xff0c;于是我今天做了以下三个改动 原来属于mamba第五部分的「Mamba近似工作之线性Transfor…

【YOLO v5 v7 v8 v9小目标改进】RevCol:解决深度学习信息从低层(输入)传递至高层(输出)的过程中,信息会逐层丢失问题

RevCol&#xff1a;解决深度学习信息从低层&#xff08;输入&#xff09;传递至高层&#xff08;输出&#xff09;的过程中&#xff0c;信息会逐层丢失问题 学习解耦表示可逆列网络&#xff08;RevCol&#xff09;子特征1&#xff1a;多级可逆单元子特征2&#xff1a;可逆列架构…

实践航拍小目标检测,基于YOLOv7【tiny/l/x】不同系列参数模型开发构建无人机航拍场景下的小目标检测识别分析系统

关于无人机相关的场景在我们之前的博文也有一些比较早期的实践&#xff0c;感兴趣的话可以自行移步阅读即可&#xff1a; 《deepLabV3Plus实现无人机航拍目标分割识别系统》 《基于目标检测的无人机航拍场景下小目标检测实践》 《助力环保河道水质监测&#xff0c;基于yolov…