RabbitMQ 介绍与 SpringBootAMQP使用

news2025/1/12 0:58:23

一、MQ概述

异步通信的优点:

  • 耦合度低
  • 吞吐量提升
  • 故障隔离
  • 流量削峰

异步通信的缺点:

  • 依赖于Broker的可靠性、安全性、吞吐能力
  • 架构复杂,业务么有明显的流程线,不方便追踪管理

什么是的MQ
MQ(Message Queue),消息队列,就是放消息的队列。也是事件驱动架构中的Broker。

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP、XMPP、
SMTP、STOMP
OpenWire、STOMP、
REST、XMPP、AMQP
自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒级
消息可靠性一般一般

二、RabbitMQ概述

1. RabbitMQ的结构和概念

  • Channel:操作MQ的工具
  • Exchange:路由消息到队列中
  • Queue:缓存消息
  • Virtual Host:虚拟主机,是对Queue、Exchange等资源的逻辑分组

在这里插入图片描述
2. 常见消息模型

  • 基本消息队列(BasicQueue)
    在这里插入图片描述

    • Publisher:消息发布者,将消息发送到队列Queue
    • Queue:消息队列,负责接受并缓存消息
    • Consumer:订阅队列,处理队列中的消息

    在这里插入图片描述

  • 工作消息队列(WorkQueue)
    在这里插入图片描述
    Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

    当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型,多个消费者共同进行消息处理,提高消费速度。

    在这里插入图片描述

  • 发布订阅(Publish、Subscribe),根据交换机类型不同分为三种:

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:

    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Consumer:消费者,与以前一样,订阅队列,没有变化

  • Queue:消息队列也与以前一样,接收消息、缓存消息。

    在这里插入图片描述
    Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

在这里插入图片描述

  • Fanout Exchange: 广播
    在这里插入图片描述
    在广播模式下,消息发送流程:

    - 1)  可以有多个队列
    - 2)  每个队列都要绑定到Exchange(交换机)
    - 3)  生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
    - 4)  交换机把消息发送给绑定过的所有队列
    - 5)  订阅队列的消费者都能拿到消息
    

在这里插入图片描述

  • Direct Exchange:路由
    在这里插入图片描述

    在Fanout模式中,一条消息会被所有订阅的队列都消费。但是,在某些场景下,希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
    在这里插入图片描述

    在Direct模型下:

    • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
    • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
    • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
  • Topic Exchange:主题
    在这里插入图片描述

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符。

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

如下图:

  • Queue1:绑定的是china.# ,因此凡是以 china.开头的routing key 都会被匹配到。包括china.news和china.weather
  • Queue2:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配。包括china.news和japan.news
    在这里插入图片描述

3. RabbitMQ的安装

1、安装Erlang:RabbitMQ是用Erlang编写的,因此首先需要安装Erlang运行环境(注意Erlang与RabbitMQ的对应版本)。运行以下命令进行安装:sudo apt install erlang

2、在线拉取镜像:docker pull rabbitmq:3-management

3、运行以下命令来下载并启动RabbitMQ Docker镜像:docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

4、浏览器访问RabbitMQ管理页面:http://IP:15672/(注意:若网页无法访问,可能是rabbitmq_management插件未启用)

5、进入sbin目录下,查看插件,命令:rabbitmq-plugins list
在这里插入图片描述

6、 若 rabbitmq_management 插件未启用(状态无 * ),通过命令启用该插件:rabbitmq-plugins enable rabbitmq_management
在这里插入图片描述

7、启用后,重新访问地址,用户名/密码默认:guest/guest
在这里插入图片描述

三、SpringAMQP

AMQP,Adanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开发标准,与语言和平台无关。

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

1、使用SpringBootAMQP- SimpleQueue的步骤

  • 引入AMQP的Starter依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 配置RabbitMQ地址
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        prefetch: 3 # 每次只能获取一条消息,处理完成才能获取下一个消息
  • 利用RabbitTemplate的convertAndSend方法
package com.example.rabbitmq_demo;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
class RabbitmqDemoApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        //queueName
        String queueName = "ty.simple.queue";

        //message
        String message = "hello world ";

        //send Message
        rabbitTemplate.convertAndSend(queueName, message);
    }

}

2、使用SpringBootAMQP- FanoutExchange的步骤

  • 创建Spring配置类,绑定交换机 - 队列
package com.example.rabbitmq_demo.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("ty.fanout");
    }

    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定交换机与队列
     */
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

  • 利用RabbitTemplate的convertAndSend方法
package com.example.rabbitmq_demo;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
class RabbitmqDemoApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendFanoutExchange() {
        //exchangeName
        String exchangeName = "ty.fanout";

        //message
        String message = "hello world fanout";

        //send Message
        rabbitTemplate.convertAndSend(exchangeName, "", message);

    }

}

2、使用SpringBootAMQP- Direct的步骤

  • 基于注解来声明队列和交换机
package com.example.rabbitmq_demo.consumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ConsumerDemo {

    /**
     * 基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明
     * 在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "ty.direct.queue1"),
            exchange = @Exchange(name = "ty.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "green"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("listener ty.direct.queue1 Get message : " + msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "ty.direct.queue2"),
            exchange = @Exchange(name = "ty.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("listener ty.direct.queue2 Get message : " + msg);
    }
}
  • 通过convertAndSend发送消息,会根据的RoutingKey,将消息发送至指定队列。
package com.example.rabbitmq_demo;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
class RabbitmqDemoApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Test
    public void testSendDirectExchange(){
        String exchangeName = "ty.direct";
        String message = "hello ty";
        rabbitTemplate.convertAndSend(exchangeName, "red", message);
    }
}

3、使用SpringBootAMQP- Tpic的步骤

  • 基于注解来声明队列和交换机
package com.example.rabbitmq_demo.consumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ConsumerDemo {
    /**
     * Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "ty.topic.queue1"),
            exchange = @Exchange(name = "ty.topic", type = ExchangeTypes.TOPIC),
            key = "ty.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("listener ty.topic.queue1 Get message : " + msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "ty.topic.queue2"),
            exchange = @Exchange(name = "ty.topic", type = ExchangeTypes.TOPIC),
            key = "#.tyty"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("listener ty.topic.queue2 Get message : " + msg);
    }
}
  • 根据RoutingKey通配符,发送到对应Queue
package com.example.rabbitmq_demo;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
class RabbitmqDemoApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendTopicExchange(){
        String exchangeName = "ty.topic";
        String message = "hello ty";
        rabbitTemplate.convertAndSend(exchangeName, "ty.tyty", message);
    }

}

4、SpringBootAMQP对象序列化

SpringBootAMQP默认使用的是 x-java-serialized-object,JDK序列化数据体积过大、有安全漏洞,且可读性差。
在这里插入图片描述
可通过配置JSON转换器,使用Json的方式做序列化和反序列化。

  • 引入jar
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>
  • 配置类中增加Bean
@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1071439.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

选择适合建筑公司的企业网盘平台

随着城市化进程的加速&#xff0c;越来越多的人开始关注乡村生活品质。Z公司以其标准化产品和优质资源整合&#xff0c;为回乡建房人群提供了一种全新的、高品质的整体解决方案。 Z公司深入调研了10W的回乡建房人群需求&#xff0c;组建了设计、工艺、供应链方面的专家团队&…

KUKA机器人通过直接输入法设定负载数据和附加负载数据的具体操作

KUKA机器人通过直接输入法设定负载数据和附加负载数据的具体操作 设置背景色: 工具负载数据 工具负载的定义: 工具负载数据是指所有装在机器人法兰上的负载。它是另外装在机器人上并由机器人一起移动的质量。需要输入的值有质量、重心位置、质量转动惯量以及所属的主惯性轴。…

边坡安全监测系统:守护边坡稳定的重要工具

在工程建设中&#xff0c;边坡安全监测系统一直被认为是掌握边坡安全及其支护结构维护决策系统的关键支撑条件。这一系统的主要目的在于确定边坡结构的稳定性&#xff0c;监控支护结构的承载能力、运营状态和耐久性能&#xff0c;并对边坡稳定性进行实时监控。 一、边坡安全监测…

CTF学习笔记——PWN(入门)

文章目录 [toc] CTF学习笔记——PWN&#xff08;入门&#xff09;PWN基础概念NC题[HGAME 2023 week1]test_nc 栈溢出[HNCTF 2022 Week1]easyoverflow 伪随机数[SWPUCTF 2022 新生赛]Darling 待补充待补充 CTF学习笔记——PWN&#xff08;入门&#xff09; &#x1f680;&#x…

python常用库之数据库orm框架之SQLAlchemy

文章目录 python常用库之数据库orm框架之SQLAlchemy一、什么是SQLAlchemySQLAlchemy 使用场景 二、SQLAlchemy使用SQLAlchemy根据模型查询SQLAlchemy SQL 格式化的方式db_session.query和 db_session.execute区别实测demo 总结&#xff1a;让我们留意一下SQLAlchemy 的 lazy lo…

电流,功率监控芯片INA226应用(基于STM32工程)

一芯片介绍 INA226是具有I2C™或SMBUS兼容接口的电流分流器和功率监控器。该设备同时监视并联电压降和总线电源电压。可编程的校准值&#xff0c;转换时间和平均值与内部乘法器结合使用&#xff0c;可以直接读取以安培为单位的电流和以瓦特为单位的功率。INA226感应共模总线电…

Spring【@Resource、@Autowired+lombook+Bean的生命周期】

Resource 和 Autowired 的区别 在Spring中找Bean的两种方式&#xff1a;①先根据类型查找②再根据名称查找 Autowired先根据类型查找&#xff0c;再根据名称查找【根据上述查找结果不唯一&#xff0c;再添加一个 Qualifier(value“”)&#xff0c;就可以查找】 Resource先根据名…

Spring Cloud Gateway2之断言Predicate详解

文章目录 1. 前言2. Spring Cloud Gateway断言的种类及各自功能2.1. Path断言 PathRoutePredicateFactory2.2.Method断言 MethodRoutePredicateFactory2.3.Header断言 HeaderRoutePredicateFactory2.4.Host断言 HostRoutePredicateFactory2.5.Query断言 QueryRoutePredicateFac…

【C++】unordered_set和unordered_map介绍及使用【附OJ题】

目录 一、unordered_set和unordered_map的介绍和使用 1、介绍 2、使用及与set和map的区别 3、O&#xff08;logN&#xff09;和 O&#xff08;1&#xff09;的效率对比 二、力扣OJ题 1、重复N次的元素 2、两个数组的交集 一、unordered_set和unordered_map的介绍和使用…

AI+Social Power,开创营销新纪元 | 2023数说故事年度社媒营销盛会,10月13日邀您共同见证

尊敬的嘉宾&#xff1a; AIGC成为2023年最热门的关键词之一&#xff0c;且以惊人的速度赢得了“圈层共识”&#xff0c;各行业都在探索如何利用AI技术创造更多可能性。尤其在社媒营销领域&#xff0c;AIGC的应用已成为势不可挡的趋势&#xff1a;品牌们用AI造新品&#xff0c;…

OpenHarmony嵌套类对象属性变化:@Observed装饰器和@ObjectLink装饰器

上文所述的装饰器仅能观察到第一层的变化&#xff0c;但是在实际应用开发中&#xff0c;应用会根据开发需要&#xff0c;封装自己的数据模型。对于多层嵌套的情况&#xff0c;比如二维数组&#xff0c;或者数组项class&#xff0c;或者class的属性是class&#xff0c;他们的第二…

子组件跳转父组件

描述&#xff1a;父组件Form.vue 点击关联&#xff0c;弹出子组件importForm.vue 选中一条数据之后&#xff0c;点击确定按钮&#xff0c;关闭子组件importForm.vue&#xff0c;将子组件的内容显示在父组件Form.vue中 选中第一条数据&#xff0c;点击确定 父组件对应的工作内容…

Java源码分析(三)ArrayList

ArrayList是我们经常用到的一个集合类&#xff0c;那么本篇我们一起学习下ArrayList的源码。 一、创建ArrayList 首先&#xff0c;我们从创建ArrayList开始。如下代码&#xff0c;创建一个空的ArrayList&#xff1a; List<String> list new ArrayList<>(); 看下…

Java中的锁与锁优化技术

文章目录 自旋锁与自适应自旋锁消除锁粗化轻量级锁偏向锁重量级锁 自旋锁与自适应自旋 自旋锁是一种锁的实现机制&#xff0c;其核心思想是当一个线程尝试获取锁时&#xff0c;如果锁已经被其他线程持有&#xff0c;那么这个线程会在一个循环中不断地检查锁是否被释放&#xf…

长效和短效HTTP:哪个适合爬虫的代理类型?

在进行网络爬虫任务时&#xff0c;选择适合的代理类型对爬虫的效率和稳定性至关重要。长效和短效HTTP代理是两种常见的代理类型&#xff0c;它们各具特点和适用场景。本文将为您分享长效和短效HTTP代理的区别以及选择适合爬虫的代理类型的实用技巧&#xff0c;帮助您提升爬虫效…

Linux Ftrace介绍

文章目录 一、简介二、内核函数调用跟踪参考链接&#xff1a; 一、简介 Ftrace 是 Linux 官方提供的跟踪工具&#xff0c;在 Linux 2.6.27 版本中引入。Ftrace 可在不引入任何前端工具的情况下使用&#xff0c;让其可以适合在任何系统环境中使用。 Ftrace 可用来快速排查以下相…

一个tomcat下如何部署多个项目?

1、不修改端口&#xff0c;部署多个项目 清楚tomcat目录结构的应该都知道&#xff0c;项目包是放在webapps目录下的&#xff0c;那能否在同一个tomcat的webapps目录下运行多个不同项目呢&#xff1f; 答案是可以的。 1、将多个项目包放入webapps文件夹下 2、修改conf下的serv…

10.8作业

自己封装一个矩形类(Rect)&#xff0c;拥有私有属性:宽度(width)、高度(height)&#xff0c; 定义公有成员函数: 初始化函数:void init(int w, int h) 更改宽度的函数:set_w(int w) 更改高度的函数:set_h(int h) 输出该矩形的周长和面积函数:void show() #include <io…

策略模式优雅实践

1 什么是策略模式 策略模式&#xff08;Strategy Pattern&#xff09;是一种常用的面向对象设计模式&#xff0c;它定义了一系列可互相替换的算法或策略&#xff0c;并将每个算法封装成独立的对象&#xff0c;使得它们可以在运行时动态地替换。具体来说&#xff0c;策略模式定义…

【uniapp】小程序开发6:自定义状态栏

一、自定义状态栏 可以设置某个页面的状态栏自定义或者全局状态栏自定义。 这里以首页状态栏为例。 1&#xff09;pages.json 中配置"navigationStyle": "custom"&#xff0c;代码如下&#xff1a; {"pages": [ {"path": "pa…