RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(基于SpringBoot)

news2024/11/20 6:22:35

文章目录

  • 前言
  • 一、WorkQueues模型
    • 消息发送
    • 消息接收
    • 能者多劳
  • 二、交换机类型
    • 1.Fanout交换机
      • 消息发送
      • 消息接收
    • 2.Direct交换机
      • 消息接收
      • 消息发送
    • 3.Topic交换机
      • 消息发送
      • 消息接收
  • 三、编程式声明队列和交换机
    • fanout示例
    • direct示例
    • 基于注解
  • 四、消息转换器
  • 总结


前言

WorkQueues模型、Fanout交换机、Direct交换机、Topic交换机、基于SpringBoot注解声明队列和交换机、消息转换器。


一、WorkQueues模型

  • Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
  • 当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

在这里插入图片描述
在控制台创建一个work.queue的队列:
在这里插入图片描述

消息发送

循环发送,模拟大量消息堆积现象。

/**
     * workQueue
     * 向队列中不停发送消息,模拟消息堆积。
     */
@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

消息接收

模拟多个消费者绑定同一个队列,我们添加2个方法,并且设置不同睡眠时间模拟不同性能读取

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}

@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

在这里插入图片描述

  • 消费者1很快完成了自己的25条消息
  • 消费者2却在缓慢的处理自己的25条消息。

也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。

能者多劳

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

再次测试,发现结果如下:
在这里插入图片描述

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。
正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

二、交换机类型

1.Fanout交换机

发送流程:
在这里插入图片描述

  • 可以有多个队列
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机
  • 交换机把消息发送给绑定过的所有队列
  • 订阅队列的消费者都能拿到消息

在控制台创建fanout.queue1、fanoutqueue2队列和dragon.fanout交换机,并将队列绑定到交换机:
在这里插入图片描述

在这里插入图片描述

消息发送

@Test
public void testFanoutExchange() {
    // 交换机名称
    String exchangeName = "dragon.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

消息接收

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

2.Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在这里插入图片描述
在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

在控制台声明两个队列direct.queue1和direct.queue2,然后声明一个direct类型的交换机,绑定队列和交换机(使用red和blue作为key,绑定direct.queue1到dragon.direct;使用red和yellow作为key,绑定direct.queue2到dragon.direct):
在这里插入图片描述

在这里插入图片描述

消息接收

@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

消息发送

@Test
public void testSendDirectExchange() {
    // 交换机名称
    String exchangeName = "dragon.direct";
    // 消息
    String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

绑定红色的都会接收到信息:
在这里插入图片描述

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

3.Topic交换机

  • Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。
    只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!
  • BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert
  • 通配符规则:
    • #:匹配一个或多个词
    • *:匹配不多不少恰好1个词
    • 举例:
      dragon.#:能够匹配dragon.stu.insert 或者 dragon.stu
      dragon.*:只能匹配dragon.stu

在这里插入图片描述
假如此时publisher发送的消息使用的RoutingKey共有四种:

  • china.news 代表有中国的新闻消息;
  • china.weather 代表中国的天气消息;
  • japan.news 则代表日本新闻
  • japan.weather 代表日本的天气消息;

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
    • china.news
    • china.weather
  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
    • china.news
    • japan.news

在这里插入图片描述

消息发送

/**
 * topicExchange
 */
@Test
public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "dragon.topic";
    // 消息
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

消息接收

@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

三、编程式声明队列和交换机

SpringAMQP提供了一个Queue类,用来创建队列:

fanout示例

@Configuration
public class FanoutConfiguration {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
//    还可以是  return  ExchangeBuilder.fanoutExchange("dragon.fanout").build();
        return new FanoutExchange("dragon.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
//   还可以是  return  QueueBuilder.durable("fanout.queue1").build();
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
//绑定队列和交换机的另一方法
//    @Bean
//    public Binding bindingQueue2(){
//        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
//    }
}

direct示例

@Configuration
public class DirectConfiguration {

    /**
     * 声明交换机
     * @return Direct类型交换机
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("dragon.direct").build();
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue directQueue2(){
        return new Queue("direct.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    }
}

基于注解

上面的代码还是很多的,基于注解的方式也能够代替上面的繁杂配置,下面演示direct和topic交换机,队列的代码

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "dragon.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "dragon.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

四、消息转换器

随便创建一个队列,然后发送Map对象,你会发现消息格式很不友好

@Test
public void testSendMap() throws InterruptedException {
    // 准备消息
    Map<String,Object> msg = new HashMap<>();
    msg.put("name", "柳岩");
    msg.put("age", 21);
    // 发送消息
    rabbitTemplate.convertAndSend("object.queue", msg);
}

控制台查看:
在这里插入图片描述

  • Spring的消息发送代码接收的消息体是一个Object,而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
  • 只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
    • 数据体积过大
      有安全漏洞
      可读性差
  • 根据上面测试,显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

j解决:
引入依赖:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。其转换器配置。
添加配置:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

总结

以上就是所有讲解。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1269600.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

借助 DPM 代码扫描的力量解锁医疗设备的可追溯性

在当今的医疗保健系统中&#xff0c;医疗设备的可追溯性变得比以往任何时候都更加重要。为了增强现代医疗保健领域的可追溯性和安全性&#xff0c;UDI 条形码充当唯一设备标识的标准&#xff0c;为医疗设备提供唯一标识符。 DataMatrix 代码&#xff08;或直接零件标记代码&am…

矩阵代数与MATLAB实现(特征值、广义特征值、酋矩阵、)

矩阵代数的相关知识 目录 一、特征值与特征向量 1、特征值与特征向量 2、MATLAB计算 二、广义特征值与广义特征向量 1、广义特征值与广义特征向量 2、MATLAB计算 三、酋矩阵 1、酋矩阵 2、MATLAB计算 四、未完待续 总结 提示&#xff1a;以下是本篇文章正文内容&…

动态规划:解决复杂问题的利器(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

Android控件全解手册 - 多语言切换完美解决方案(兼容7.0以上版本)

Unity3D特效百例案例项目实战源码Android-Unity实战问题汇总游戏脚本-辅助自动化Android控件全解手册再战Android系列Scratch编程案例软考全系列Unity3D学习专栏蓝桥系列ChatGPT和AIGC &#x1f449;关于作者 专注于Android/Unity和各种游戏开发技巧&#xff0c;以及各种资源分…

深入理解OS--硬件高速缓存,缓存一致性,存储设备

存储技术 1.SRAM&#xff0c;DRAM 静态&#xff1a;更快&#xff0c;用作高速缓存存储器。处理器高速缓存采用SRAM。 动态&#xff1a;用作主存及图形系统的帧缓冲区。内存采用DRAM。 2.内存 2.1.内存数据访问示例 设备控制器存在缓存。设备芯片自身存在缓存。 2.2.采用并行…

Spring不再支持Java8了

在今天新建模块的时候发现了没有java8的选项了&#xff0c;结果一查发现在11月24日&#xff0c;Spring不再支持8了&#xff0c;这可怎么办呢&#xff1f;我们可以设置来源为阿里云https://start.aliyun.com/ 。 java8没了 设置URL为阿里云的地址

【Web】NewStarCTF Week3 个人复现

①Include &#x1f350; ?filephpinfo 提示查下register_argc_argv 发现为on LFI包含 pearcmd命令执行学习 pearcmd.php文件包含妙用 ?file/usr/local/lib/php/pearcmd&config-create/<?eval($_POST[a])?>./ha.php ?file./ha post传&#xff1a; asystem…

赛博朋克-MISC-bugku-解题步骤

——CTF解题专栏—— 题目信息&#xff1a; 题目&#xff1a;赛博朋克 作者&#xff1a;Mooooooof 提示&#xff1a; 解题附件&#xff1a; 解题思路&#xff1a; 赛博朋克是个txt文件&#xff1f;&#xff1f;&#xff1f;&#xff0c;不是个game.exe吗&#xff08;开玩笑…

3D场景建模工具

在线工具推荐&#xff1a; 三维数字孪生场景工具 - GLTF/GLB在线编辑器 - Three.js AI自动纹理化开发 - YOLO 虚幻合成数据生成器 - 3D模型在线转换 - 3D模型预览图生成服务 1. 什么是3D场景建模&#xff1f; 3D场景建模是一种通过计算机图形学技术&#xff0c;将现实世…

如何跑通跨窗口渲染:multipleWindow3dScene

New 这是一个跨窗口渲染的示例&#xff0c;用 Three.js 和 localStorage 在同一源&#xff08;同产品窗口&#xff09;上跨窗口设置 3D 场景。而这也是本周推特和前端圈的一个热点&#xff0c;有不少人在争相模仿它的实现&#xff0c;如果你对跨窗口的渲染有兴趣&#xff0c;可…

pdf文件编辑,[增删改查]

pdf文件是投标文件中必不可少的格式&#xff0c;传统的方式先编辑word格式&#xff0c;最后生成pdf&#xff0c;但是有时候需要直接编辑pdf文件&#xff0c;编辑pdf的工具无疑 “adobe acrobat dc”是最好用的之一了 1.把图片文件添加到pdf指定位置&#xff0c;例如把一张图片添…

zookeeper集群(很少用)+kafka集群(常用)

一、zookeeper zookeeperkafka&#xff08;2.7.0版本&#xff09; kafka&#xff08;3.4.1版本&#xff09;不依赖于zookeeper 1、定义&#xff1a;zookeeper开源&#xff0c;分布式架构&#xff0c;提供协调服务&#xff08;Apache项目&#xff09;&#xff0c;基于观察者模…

【C语言】把歌词里的播放时间跟歌词提取出来

一&#xff0c;介绍 给到一个字符串&#xff0c;里面包含了时间&#xff08;唱该歌词的时间以及该歌词&#xff09;例如“[02:16.33][04:11.44][05:11.44]我想大声宣布对你依依不舍”&#xff0c;如何把两者都给打印出来呢&#xff1f;下面给出解释 二&#xff0c;代码 #incl…

<蓝桥杯软件赛>零基础备赛20周--第8周第1讲--十大排序

报名明年4月蓝桥杯软件赛的同学们&#xff0c;如果你是大一零基础&#xff0c;目前懵懂中&#xff0c;不知该怎么办&#xff0c;可以看看本博客系列&#xff1a;备赛20周合集 20周的完整安排请点击&#xff1a;20周计划 每周发1个博客&#xff0c;共20周&#xff08;读者可以按…

fastadmin 如何引入自己的js

在需要的界面中&#xff1a;如何实例说明&#xff1a; 中<script> function zhuruJs(url) { let temp document.createElement( script ); temp.setAttribute( type, text/javascript" );temp.src urL; document.head . appendChild(temp); zhuruJs(location…

轻松愉悦的验证方式:实现图片旋转验证功能

说在前面 在当今互联网时代&#xff0c;随着技术的不断进步&#xff0c;传统的验证码验证方式已经无法满足对安全性和用户体验的需求。为了应对日益狡猾的机器人和恶意攻击&#xff0c;许多网站和应用程序开始引入图形验证码&#xff0c;其中一种备受欢迎的形式就是图片旋转验证…

springmvc实验(三)——请求映射

【知识要点】 方法映射概念 所谓的方法映射就是将前端发送的请求地址和后端提供的服务方法进行关联。在springMVC框架中主要使用Controller和RequestMapping两个注解符&#xff0c;实现请求和方法精准匹配。注解符Controller Spring中包含了一个Controller接口&#xff0c;但是…

【C/C++笔试练习】this指针的概念、初始化列表、const对象调用、构造和析构函数、继承和组合、重载和多态、虚函数的定义、计算日期到天数转换、幸运的袋子

文章目录 C/C笔试练习选择部分&#xff08;1&#xff09;this指针的概念&#xff08;2&#xff09;初始化列表&#xff08;3&#xff09;const对象调用&#xff08;4&#xff09;构造和析构函数&#xff08;5&#xff09;继承和组合&#xff08;6&#xff09;重载和多态&#x…

七天.NET 8操作SQLite入门到实战 - 第四天EasySQLite前后端项目框架搭建

前言 今天的主要任务是快速下载并安装.NET 8 SDK&#xff0c;搭建EasySQLite的前后端框架。 .NET 8 介绍 .NET 8 是 .NET 7 的后继版本。 它将作为长期支持 (LTS) 版本得到三年的支持。 使用技术栈和开发环境 咱们的.NET 8操作SQLite入门到实战教程主要使用技术栈为如下所示…

Android帝国之进程杀手--lmkd

本文概要 这是Android系统启动的第三篇文章&#xff0c;本文以自述的方式来讲解lmkd进程&#xff0c;通过本文您将了解到lmkd进程在安卓系统中存在的意义&#xff0c;以及它是如何杀进程的。&#xff08;文中的代码是基于android13&#xff09; 我是谁 init&#xff1a;“大…