RabbitMQ(黑马spring cloud笔记)

news2025/1/11 7:37:30

MQ

目录

  • MQ
    • 一、同步通讯和异步通讯
      • 1. 同步通讯
      • 2. 异步通讯
    • 二、RabbitMQ
      • 1. 部署
      • 2. 架构
      • 3. 常见消息模型
        • 3.1 基本消息队列(Basic Queue)
      • 3.2 工作消息队列(Work Queue)
      • 3.3 发布订阅(Publish、Subscribe)
    • 4. 消息转换器

一、同步通讯和异步通讯

1. 同步通讯

优点

  • 时效性强,立即获取结果

缺点

  • 耦合度高
  • 性能和吞吐能力不如异步
  • 额外资源消耗
  • 级联失败问题

2. 异步通讯

在这里插入图片描述

优点

  • 服务解耦
  • 性能提升,吞吐量提高
  • 服务没有强依赖,不担心级联问题
  • 流量削峰

缺点

  • 依赖Broker的可靠性、安全性、吞吐能力
  • 架构复杂的情况下,业务没有明显的流程线,不好追踪管理

MQ即是事件驱动架构中的Broker。

二、RabbitMQ

1. 部署

直接docker拉一个:

# 拉取镜像
docker pull rabbitmq:3-management
#启动容器
docker run \
 -e RABBITMQ_DEFAULT_USER=root \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management
 # 15672是管理口

2. 架构

在这里插入图片描述

几个概念:

  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息
  • virtual host:虚拟主机,对queue、exchange等资源的逻辑分组

3. 常见消息模型

3.1 基本消息队列(Basic Queue)

在这里插入图片描述

  1. 依赖

    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    amqp是高级消息队列协议,springAMQP则是一种实现。

  2. 配置

    spring:
      rabbitmq:
        host: 190.92.246.107 # 主机名
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: root
        password: 123456
    
  3. 实现

    • 发布者

      public class PublisherTest {
          @Autowired
          private RabbitTemplate rabbitTemplate;
      
          @Test
          public void testSimpleQueue() {
              String queueName = "simple.queue";
              String message = "hello, spring amqp";
              rabbitTemplate.convertAndSend(queueName, message);
          }
      }
      
    • 消费者

      配置都是一样的

      @Component
      public class SpringRabbitListener {
          @RabbitListener(queues = {"simple.queue"})
          public void listenSimpleQueue(String msg) {
              System.out.println(msg);
          }
      }
      

      启动main函数,成功:

      在这里插入图片描述

3.2 工作消息队列(Work Queue)

在这里插入图片描述

两个消费者合作处理消息,避免消息堆积。

AMQP有一个消息预取机制,预取多少条消息是可以配置的。

spring:
  rabbitmq:
    host: 190.92.246.107 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: root
    password: 123456
    listener:
      simple:
        prefetch: 1
  • 发布者:

    @Test
    public void testSimpleQueue() throws InterruptedException {
        String queueName = "simple.queue";
        String message = "hello, spring amqp";
        for (int i = 0; i < 50; i++) {
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20);
        }
    }
    
  • 消费者

    @Component
    public class SpringRabbitListener {
        @RabbitListener(queues = {"simple.queue"})
        public void listenSimpleQueue1(String msg) throws InterruptedException {
            System.out.println("消费者1" + "【" + msg + "】" + LocalTime.now());
            Thread.sleep(20);
        }
    
        @RabbitListener(queues = {"simple.queue"})
        public void listenSimpleQueue2(String msg) throws InterruptedException {
            System.err.println("消费者2" + "【" + msg + "】" + LocalTime.now());
            Thread.sleep(200);
        }
    }
    

如果消息预取机制不设置,意味着不设限,那么在这个例子中每个消费者无论处理能力如何,都会处理25条消息,设置为1后,则按照能力分配。

3.3 发布订阅(Publish、Subscribe)

和之前不同的是,可以将一条消息发送给多个消费者,实现方式是加入了交换机。

根据交换机类型不同分为三种:广播、路由和主题

  • Fanout Exchange 广播

    这个交换机会将消息路由到每一个和它绑定的队列

    在这里插入图片描述

    • 发布者

      不同的是,我们发送消息到交换机

      @Test
      public void testSendFanoutExchange() {
          String exchangeName = "root.fanout";
          String message = "hello everyone";
          rabbitTemplate.convertAndSend(exchangeName, "", message);
      }
      
    • 订阅者

      首先创建交换机和队列,并将队列绑定到交换机上(有注解的写法,像后文路由模式那样)

      @Configuration
      public class FanoutConfig {
          @Bean
          public FanoutExchange fanoutExchange() {
              return new FanoutExchange("root.fanout");
          }
      
          @Bean
          public Queue fanoutQueue1() {
              return new Queue("fanout.queue1");
          }
      
          @Bean
          public Queue fanoutQueue2() {
              return new Queue("fanout.queue2");
          }
      
          @Bean
          public Binding bindQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
              return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
          }
      
          @Bean
          public Binding bindQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
              return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
          }
      }
      

      然后监听队列:

      @RabbitListener(queues = {"fanout.queue1"})
      public void listenFanoutQueue1(String msg) throws InterruptedException {
          System.out.println("fanout.queue1消费者" + "【" + msg + "】" + LocalTime.now());
      }
      
      @RabbitListener(queues = {"fanout.queue2"})
      public void listenFanoutQueue2(String msg) throws InterruptedException {
          System.err.println("fanout.queue2消费者" + "【" + msg + "】" + LocalTime.now());
      }
      

      启动测试:

      在这里插入图片描述

  • Direct Exchange 路由

    在这里插入图片描述

    特点:

    • 每个Queue都与Exchange设置一个BindingKey
    • 发布者发送消息时,指定消息的RoutingKey
    • Exchange将消息路由到BindingKey与消息Routingkey一致的队列

    接下来就可以测试一下:

    有一个交换机,两个队列,两个消费者分别有两个BindingKey。

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "root.direct", type = ExchangeTypes.DIRECT),
            key = {
                    "blue",
                    "red"
            }
    ))
    public void listenDirectQueue1(String msg) {
        System.err.println("direct.queue1消费者" + "【" + msg + "】" + LocalTime.now());
    
    }
    
    
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "root.direct", type = ExchangeTypes.DIRECT),
            key = {
                    "yellow",
                    "red"
            }
    ))
    public void listenDirectQueue2(String msg) {
        System.err.println("direct.queue2消费者" + "【" + msg + "】" + LocalTime.now());
    
    }
    

    发布者:

    @Test
    public void testSendDirectExchange() {
        String exchangeName = "root.direct";
        String message = "hello red";
        rabbitTemplate.convertAndSend(exchangeName, "red", message);
    }
    

    不断更换routingKey,观察订阅者日志。

  • Topic Exchange 主题

    在这里插入图片描述

    和路由模式类似,区别是这个模式的key是多个单词的列表,以 “ . ” 分割。

    在指定BIndingKey时可以使用通配符。例如:#代表0个或多个单词,*代表一个单词。

    • 订阅

      @RabbitListener(bindings = @QueueBinding(
              value = @Queue(name = "topic.queue1"),
              exchange = @Exchange(name = "root.topic", type = ExchangeTypes.TOPIC),
              key = {
                      "china.#"
              }
      ))
      public void listenTopicQueue1(String msg) {
          System.err.println("topic.queue1消费者" + "【" + msg + "】" + LocalTime.now());
      }
      @RabbitListener(bindings = @QueueBinding(
              value = @Queue(name = "topic.queue2"),
              exchange = @Exchange(name = "root.topic", type = ExchangeTypes.TOPIC),
              key = {
                      "#.news"
              }
      ))
      public void listenTopicQueue2(String msg) {
          System.err.println("topic.queue2消费者" + "【" + msg + "】" + LocalTime.now());
      }
      
    • 发布

      @Test
      public void testSendTopicExchange() {
          String exchangeName = "root.topic";
          String message = "hello world";
          rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
      }
      

      改变routingkey,观察日志。

4. 消息转换器

我们不仅仅可以发送字符串消息,还可以发送对象,默认情况下,需要传统的序列化方式,对象需要实现Serializable接口,不太方便,我们使用json。

  1. 引入依赖

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    
  2. 自定义MessageConverter

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    

    这个时候发送的消息就会经过json序列化了。

  3. 测试

    创建队列

    @Bean
    public Queue fanoutExchange() {
        return new Queue("object.queue");
    }
    

    消费者(需要像发布者一样的,引入jackson,然后定义messageConverter)

    @RabbitListener(queues = "object.queue")
    public void listenObjectQueue(Map<String, Object> msg) {
        System.err.println("object.queue消费者" + "【" + msg.get("name") + "】" + LocalTime.now());
        System.err.println("object.queue消费者" + "【" + msg.get("date") + "】" + LocalTime.now());
    }
    

    发布消息

    @Test
    public void testSendObj() {
        String queue = "object.queue";
        Map<String, Object> msg = new HashMap<>();
        msg.put("name", "root");
        msg.put("date", new Date());
        rabbitTemplate.convertAndSend(queue, msg);
    }
    

    成功:

    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/341135.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TPAMI 2022 | RC-Explainer:图神经网络的强化因果解释器

文章目录 一、论文关键信息二、基础概念三、主要内容1. Motivations2. Insights3. 解决方案的关键四、总结与讨论CSDN 叶庭云:https://yetingyun.blog.csdn.net/ 一、论文关键信息 论文标题:Reinforced Causal Explainer for Graph Neural Networks 期刊信息:IEEE Transact…

【C++】内存管理

&#x1f345;不同的数据放在不同的地方&#xff0c;需要内存管理 目录 ☃️1.C/C中的内存分布 ☃️2.C语言中动态内存管理方式 ☃️3.C内存管理方式 &#x1f41d;3.1 new/delete操作内置类型 &#x1f41d;3.2 new和delete操作自定义类型 &#x1f41d;3.3 operator n…

FISCO BCOS节点扩容和使用console进行群组扩容

一、安装并启动FISCO BCOS 搭建单机单群组4节点的教程查看&#xff1a;https://blog.csdn.net/yueyue763184/article/details/128924144?spm1001.2014.3001.5501 二、下载扩容脚本 在fisco目录下输入以下命令&#xff1a; curl -#LO https://raw.githubusercontent.com/FI…

155、【动态规划】leetcode ——474. 一和零:三维数组+二维滚动数组(C++版本)

题目描述 原题链接&#xff1a;474. 一和零 解题思路 &#xff08;1&#xff09;三维数组 本题是要在已有的字符串中&#xff0c;找到给定的m个0和n个1&#xff0c;组出最大的子集。将字符串集合中的各个字符串看作物品&#xff0c;m个0和n个1看作背包的重量&#xff0c;则该…

jenkins +docker+python接口自动化之jenkins容器安装python3(二)

jenkins dockerpython接口自动化之jenkins容器安装python3&#xff08;二&#xff09; 目录&#xff1a;导读 前提是在docker下已经配置好jenkins容器了&#xff0c;是将python安装在jenkins容器下的 1、先看你的jenkins是否安装好 2、以root权限进入jenkins容器&#xff1…

NLP方向的论文可投的核心期刊

目录1、《计算机仿真》北大核心、科技核心2、《通信学报》北大核心、科技核心、CSCD核心3、《计算机科学》北大核心、EI来源期刊、CSCD核心4、《计算机工程》北大核心、科技核心5、《计算机应用》北大核心、科技核心、CSCD核心6、《计算机工程与应用》北大核心、科技核心、CSCD…

Python - 数据容器dict(字典)

目录 字典的定义 字典数据的获取 字典的嵌套 字典的各种操作 新增与更新元素 [Key] Value 删除元素 pop和del 清空字典 clear 获取全部的键 keys 遍历字典 容器通用功能总览 字典的定义 使用{}&#xff0c;不过存储的元素是一个个的&#xff1a;键值对&#…

golang的web框架Gin(一)---Gin的Resutful风格

Restful风格是什么&#xff1f; REST与技术无关&#xff0c;代表的是一种软件架构风格&#xff0c;REST是Representational State Transfer的简称&#xff0c;中文翻译为“表征状态转移”或“表现层状态转化”。 RESTFUL特点包括&#xff1a; 每一个URI代表1种资源&#xff…

STM32F103C8T6—库函数应用I2C/SPI驱动OLED显示中文、字符串

文章目录1. I2C与SPI通信协议对比2. 四脚OLED与六脚OLED3. I2C驱动OLED显示oled.h & oled.c&#xff1a;汉字取模 & oledfont.h&#xff1a;main.c 显示示例&#xff1a;连线方法&#xff1a;4. SPI驱动OLED显示1. I2C与SPI通信协议对比 I2C&#xff08;Inter-Integra…

基于springboot的毕业设计管理系统

摘要随着信息技术和网络技术的飞速发展&#xff0c;人类已进入全新信息化时代&#xff0c;传统管理技术已无法高效&#xff0c;便捷地管理信息。为了迎合时代需求&#xff0c;优化管理效率&#xff0c;各种各样的管理系统应运而生&#xff0c;各行各业相继进入信息管理时代&…

【开源】祁启云网络验证系统V1.11

简介 祁启云免费验证系统 一个使用golang语言、Web框架beego、前端Naive-Ui-Admin开发的免费网络验证系统 版本 当前版本1.11 更新方法 请直接将本目录中的verification.exe/verification直接覆盖到你服务器部署的目录&#xff0c;更新前&#xff0c;请先关闭正在运行的验…

搭建云端vscode-server,使用web ide进行远程开发

使用乌班图系统&#xff0c;搭建自己的网页vs code开发环境github地址&#xff1a;GitHub - coder/code-server: VS Code in the browser安装脚本curl -fsSL https://code-server.dev/install.sh | sh出现deb package has been installed.表示已经正确安装。测试启动2.1修改配置…

Idea打包springboot项目war包,测试通过

pom.xml文件 <!--包名以及版本号&#xff0c;这个是打包时候使用&#xff0c;版本可写可不写&#xff0c;建议写有利于维护系统--> <artifactId>tsgdemo</artifactId> <version>0.0.1-SNAPSHOT</version> <!--打包形式--> <packaging&…

在项目中遇到的关于form表单的问题

前言 以下内容都是基于element Plus 和 vue3 一个form-item校验两个下拉框 有时候不可避免会遇到需要一个form-item校验两个下拉框的情况&#xff0c;比如&#xff1a; 这种情况下传统的校验已经无法实现&#xff0c;需要通过form表单提供的自定义校验来实现。以上面的必填…

6年软件测试经验,从我自己的角度理解自动化测试

接触了不少同行&#xff0c;由于他们之前一直做手工测试&#xff0c;现在很迫切希望做自动化测试&#xff0c;其中不乏工作5年以上的人。 本人从事软件自动化测试已经近6年&#xff0c;从server端到web端&#xff0c;从API到mobile&#xff0c;切身体会到自动化带来的好处与痛楚…

Ansible自动化运维工具---安装及命令模块

目录 引言 一、Ansible概述 1.1、Ansible 自动运维工具特点 1.2、Ansible 运维工具原理 1.3、Ansible系统架构 1.4、Ansible的特性 二、安装Ansible 三、Ansible命令模块 command模块 shell模块 cron模块 user模块 group模块 copy模块 file模块 hostname 模块 p…

[acwing周赛复盘] 第 90 场周赛20230211 补

[acwing周赛复盘] 第 90 场周赛20230211 补 一、本周周赛总结二、 4806. 首字母大写1. 题目描述2. 思路分析3. 代码实现三、4807. 找数字1. 题目描述2. 思路分析3. 代码实现四、4808. 构造字符串1. 题目描述2. 思路分析3. 代码实现六、参考链接一、本周周赛总结 T1 模拟T2 模拟…

ThingsBoard-实现定时任务调度器批量RPC

1、概述 ThingsBoard-CE版是不支持调度器的,只有PE版才支持,但是系统中很多时候需要使用调度器来实现功能,例如:定时给设备下发rpc查询数据,我们如何来实现呢?下面我将教你使用巧妙的方法来实现。 2、使用什么实现 我们可以使用规则链提供的一个节点来实现,这个节点可…

Linux系统服务:Apache安装及配置应用

目录 一、Apache安装 1、Apache简介 2、Yum安装 3、编译安装 4、服务管理 5、编译安装实现systemctl服务管理 二、Apache配置应用 1、基础应用 2、隐藏版本号 3、更改监听端口 一、Apache安装 1、Apache简介 Apache即阿帕奇是一款开源的、世界使用排名第一的Web服务…

2023 年腾讯云服务器CVM快速配置购买教程,新手上云必备!

腾讯云服务器快速配置购买教程是新手必备的上云教程。主机教程网在本文中以腾讯云服务器为例&#xff0c;给大家带来一个完整的、手把手教学的服务器购买流程。助力快速完成服务器的购买、配置、以及网站的搭建&#xff0c;给新手节省宝贵的时间&#xff0c;避免采坑&#xff0…