RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)

news2025/1/12 4:10:36

文章目录

  • 前言
  • 一、WorkQueues模型
    • 消息发送
    • 消息接收
    • 能者多劳
  • 二、交换机类型
    • 1.Fanout交换机
      • 消息发送
      • 消息接收
    • 2.Direct交换机
      • 消息接收
      • 消息发送
    • 3.Topic交换机
      • 消息发送
      • 消息接收
  • 三、编程式声明队列和交换机
    • fanout示例
    • direct示例
    • 基于注解
  • 四、消息转换器
  • 总结


前言

WorkQueues模型、Fanout交换机、Direct交换机、Topic交换机、基于SpringBoot注解声明队列和交换机、消息转换器。


一、WorkQueues模型

  • Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
  • 当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

在这里插入图片描述
在控制台创建一个work.queue的队列:
在这里插入图片描述

消息发送

循环发送,模拟大量消息堆积现象。

/**
     * workQueue
     * 向队列中不停发送消息,模拟消息堆积。
     */
@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

消息接收

模拟多个消费者绑定同一个队列,我们添加2个方法,并且设置不同睡眠时间模拟不同性能读取

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}

@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

在这里插入图片描述

  • 消费者1很快完成了自己的25条消息
  • 消费者2却在缓慢的处理自己的25条消息。

也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。

能者多劳

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

再次测试,发现结果如下:
在这里插入图片描述

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。
正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

二、交换机类型

1.Fanout交换机

发送流程:
在这里插入图片描述

  • 可以有多个队列
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机
  • 交换机把消息发送给绑定过的所有队列
  • 订阅队列的消费者都能拿到消息

在控制台创建fanout.queue1、fanoutqueue2队列和dragon.fanout交换机,并将队列绑定到交换机:
在这里插入图片描述

在这里插入图片描述

消息发送

@Test
public void testFanoutExchange() {
    // 交换机名称
    String exchangeName = "dragon.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

消息接收

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

2.Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在这里插入图片描述
在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

在控制台声明两个队列direct.queue1和direct.queue2,然后声明一个direct类型的交换机,绑定队列和交换机(使用red和blue作为key,绑定direct.queue1到dragon.direct;使用red和yellow作为key,绑定direct.queue2到dragon.direct):
在这里插入图片描述

在这里插入图片描述

消息接收

@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

消息发送

@Test
public void testSendDirectExchange() {
    // 交换机名称
    String exchangeName = "dragon.direct";
    // 消息
    String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

绑定红色的都会接收到信息:
在这里插入图片描述

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

3.Topic交换机

  • Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。
    只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!
  • BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert
  • 通配符规则:
    • #:匹配一个或多个词
    • *:匹配不多不少恰好1个词
    • 举例:
      dragon.#:能够匹配dragon.stu.insert 或者 dragon.stu
      dragon.*:只能匹配dragon.stu

在这里插入图片描述
假如此时publisher发送的消息使用的RoutingKey共有四种:

  • china.news 代表有中国的新闻消息;
  • china.weather 代表中国的天气消息;
  • japan.news 则代表日本新闻
  • japan.weather 代表日本的天气消息;

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
    • china.news
    • china.weather
  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
    • china.news
    • japan.news

在这里插入图片描述

消息发送

/**
 * topicExchange
 */
@Test
public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "dragon.topic";
    // 消息
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

消息接收

@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

三、编程式声明队列和交换机

SpringAMQP提供了一个Queue类,用来创建队列:

fanout示例

@Configuration
public class FanoutConfiguration {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
//    还可以是  return  ExchangeBuilder.fanoutExchange("dragon.fanout").build();
        return new FanoutExchange("dragon.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
//   还可以是  return  QueueBuilder.durable("fanout.queue1").build();
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
//绑定队列和交换机的另一方法
//    @Bean
//    public Binding bindingQueue2(){
//        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
//    }
}

direct示例

@Configuration
public class DirectConfiguration {

    /**
     * 声明交换机
     * @return Direct类型交换机
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("dragon.direct").build();
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue directQueue2(){
        return new Queue("direct.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    }
}

基于注解

上面的代码还是很多的,基于注解的方式也能够代替上面的繁杂配置,下面演示direct和topic交换机,队列的代码

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "dragon.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "dragon.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

四、消息转换器

随便创建一个队列,然后发送Map对象,你会发现消息格式很不友好

@Test
public void testSendMap() throws InterruptedException {
    // 准备消息
    Map<String,Object> msg = new HashMap<>();
    msg.put("name", "柳岩");
    msg.put("age", 21);
    // 发送消息
    rabbitTemplate.convertAndSend("object.queue", msg);
}

控制台查看:
在这里插入图片描述

  • Spring的消息发送代码接收的消息体是一个Object,而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
  • 只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
    • 数据体积过大
      有安全漏洞
      可读性差
  • 根据上面测试,显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

j解决:
引入依赖:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。其转换器配置。
添加配置:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

总结

以上就是所有讲解。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1244651.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【2023-11-23】生成A~Z编号

生成A~Z编号 需要生成 A~Z的编号&#xff0c;当新的编号超过Z时&#xff0c;从A1开始&#xff0c;依次为B1 C1一直至Z1,如此循环。 最大支持字母为Z&#xff0c;超过后以添加数字后缀的形式标记 简单代码 默认从A开始循环 function getLimitNumber(_total) {var num 0var …

轻松管理文件名:文件批量重命名的技巧与操作

在日常工作中&#xff0c;文件管理是一项至关重要的任务。其中&#xff0c;文件名的管理更是关键。文件名是在查找文件时最直观的线索。一个好的文件名简短而准确地反映文件的内容或用途。然而&#xff0c;随着时间的推移&#xff0c;可能会发现文件名变得冗长、混乱甚至无法反…

壳牌——利用人工智能应对新能源转型

荷兰皇家壳牌(Shell)最初是一家卖贝壳的商店&#xff0c;截至 2018 年&#xff0c;它是全球收入排名第五的公司。它的业务范围涵盖从勘探和钻探到提炼和零售的整个燃料供应链。壳牌在石油、天然气、生物燃料、风能和太阳能等端到端燃料生产领域处于世界领先地位。 当前&#x…

【算法】缓存淘汰算法

目录 1.概述2.代码实现2.1.FIFO2.2.LRU2.3.LFU2.4.Clock2.5.Random 3.应用 1.概述 缓存淘汰策略是指在缓存容量有限的情况下&#xff0c;当缓存空间不足时决定哪些缓存项应当被移除的策略。缓存淘汰策略的目标是尽可能地保持缓存命中率高&#xff0c;同时合理地利用有限的缓存…

【Linux系统编程二十】:(进程通信2)--命名管道/共享内存

【Linux系统编程二十】&#xff1a;命名管道/共享内存 一.命名管道1.创建管道2.打开管道3.进行通信(server/client) 二.共享内存1.实现原理2.申请内存3.挂接4.通信5.去关联6.释放共享内存7.特性&#xff1a; 一.命名管道 上一篇介绍的一个管道是没有名字的 因为你打开那个文件…

前端工程、静态代码、Html页面 打包成nginx 的 docker镜像

1. 创建一个 mynginx的目录 2. 将前端代码文件夹&#xff08;比如叫 front &#xff09;复制到 mynginx 目录下 3. 在mynginx 目录下创建一个名为Dockerfile 的文件&#xff08;文件名不要改&#xff09;&#xff0c;文件内容如下&#xff1a; # 使用官方的 Nginx 镜像作为基…

【C++】泛型编程 ⑭ ( 类模板示例 - 数组类模板 | 容器思想 | 自定义类可拷贝 - 深拷贝与浅拷贝 | 自定义类可打印 - 左移运算符重载 )

文章目录 一、容器思想1、自定义类可拷贝 - 深拷贝与浅拷贝2、自定义类可拷贝 - 代码示例3、自定义类可打印 - 左移运算符重载 二、代码示例1、Array.h 头文件2、Array.cpp 代码文件3、Test.cpp 主函数代码文件4、执行结果 一、容器思想 1、自定义类可拷贝 - 深拷贝与浅拷贝 上…

企业软件定制开发的优势|app小程序网站搭建

企业软件定制开发的优势|app小程序网站搭建 企业软件定制开发是一种根据企业特定需求开发定制化软件的服务。相比于购买现成的软件产品&#xff0c;企业软件定制开发具有许多优势。 1.企业软件定制开发可以满足企业独特需求。每个企业都有自己独特的业务流程和需求&#xff0c;…

为什么vue中数组和对象的props默认值要写成函数形式?

多个组件数据不相互干涉 假如在一个地方引用了同一个组件&#xff0c;并给他们都绑定了单独的值。如果只声明为一个对象或数组&#xff0c;可能会导致在某一个实例中修改数据&#xff0c;影响到其他实例中的数据&#xff0c;因为数组和对象是引用类型的数据。为了在多次引用组件…

揭秘:如何精准定位性能瓶颈,优化系统性能?

你好&#xff0c;我是静姐&#xff0c;目前在一家准一线互联网大厂做测试开发工程师。对于一般公司普通测试工程师来说&#xff0c;可能性能测试做的并不是很复杂&#xff0c;可能只是编写下脚本&#xff0c;做个压测&#xff0c;然后输出报告结果&#xff0c;瓶颈分析和调优的…

ubuntu安装cuda驱动报错及解决,屡试不爽

机器重启输入nvidia-smi提示如下错误,字面意思就是驱动和库不匹配 Failed to initialize NVML: Driver/library version mismatch 查看一下nvidia相关库 sudo dpkg --list | grep nvidia-* 将所有已安装库卸载 sudo apt purge nvidia-* 重新安装驱动 sudo ./NVIDIA-Linux-…

Java实现王者荣耀小游戏

主要功能 键盘W,A,S,D键&#xff1a;控制玩家上下左右移动。按钮一&#xff1a;控制英雄发射一个矩形攻击红方小兵。按钮控制英雄发射魅惑技能&#xff0c;伤害小兵并让小兵停止移动。技能三&#xff1a;攻击多个敌人并让小兵停止移动。普攻&#xff1a;对小兵造成基础伤害。小…

redis的集群,主从复制,哨兵

redis的高可用 在Redis中&#xff0c;实现高可用的技术主要包括持久化、主从复制、哨兵和集群&#xff0c;下面分别说明它们的作用&#xff0c;以及解决了什么样的问题。 持久化&#xff1a; 持久化是最简单的高可用方法&#xff08;有时甚至不被归为高可用的手段&#xff09;…

京东大数据分析:2023年10月手机行业销量同比增长249%

今年双11&#xff0c;手机仍是竞争极为激烈的产品品类&#xff0c;各大手机厂商均在双11之前做足了准备&#xff0c;10月下旬&#xff0c;各电商平台双十一预售正式开启。而在双11大促节的参与下&#xff0c;10月份手机市场的整体销售也呈现增长趋势。 根据鲸参谋平台的数据显示…

mybatis 基本操作 删除 插入 更新 查询

根据主键删除数据 插入数据 -- 插入 insert into emp(username, name, gender, image, job, entrydate, dept_id, create_time, update_time) values (tom,塔姆,tom,1 , 1.png ,now(),1,now(),now() Options(keyProperty "id",useGeneratedKeys true) Insert(&quo…

[译]JavaScript中Base64编码字符串的细节

本文作者为 360 奇舞团前端开发工程师 本文为翻译 原文标题&#xff1a;The nuances of base64 encoding strings in JavaScript 原文作者&#xff1a;Matt Joseph 原文链接&#xff1a;https://web.dev/articles/base64-encoding Base64编码和解码是一种常见的将二进制内容转…

将对象转成URL参数

背景 有的时候前端跳转到其他平台的页面需要携带额外的参数&#xff0c;需要将对象转成用 & 连接的字符串拼接在路径后面。 实现方法

SquareCTF-2023 Web Writeups

官方wp&#xff1a;CTFtime.org / Square CTF 2023 tasks and writeups sandbox Description&#xff1a; I “made” “a” “python” “sandbox” “”“” nc 184.72.87.9 8008 先nc连上看看&#xff0c;只允许一个单词&#xff0c;空格之后的直接无效了。 flag就在当…

河北专升本(微机原理)

目录 第一章&#xff1a;计算机基础与数制转化 1. 进制运算基础 2. 常用编码形式 3. 计算机系统的组成及其工作原理 4. 微机系统主要技术指标 第二章&#xff1a;8086微处理器及其系统 1. 8086微处理器&#xff08;CPU&#xff09; 2. 8086的存储器及I/O组织 3. 8086系…

TikTok历史探秘:短视频中的时间之旅

在数字时代的浪潮中&#xff0c;TikTok崭露头角&#xff0c;成为社交媒体领域的一颗耀眼新星。这款短视频应用以其独特的创意、时尚和娱乐性质&#xff0c;吸引了全球数以亿计的用户。 然而&#xff0c;TikTok并非一夜之间的奇迹&#xff0c;它背后蕴藏着丰富而有趣的历史故事…