RabbitMQ入门指南

news2025/1/11 5:58:16

在这里插入图片描述

人生永没有终点。只有等到你瞑目的那一刻,才能说你走完了人生路,在此之前,新的第一次始终有,新的挑战依然在,新的感悟不断涌现。

文章目录

  • 一、MQ与RabbitMQ概述
    • 1. MQ简述
    • 2. MQ的优势
    • 3. MQ的劣势
    • 4. 常见的MQ产品
    • 5. RabbitMQ(兔子MQ😀)
  • 二、RabbitMQ安装与配置
    • 1. 基于docker快速安装RabbitMQ
    • 2. 创建用户和虚拟机
  • 三、RabbitMQ快速入门
    • 1. 基础环境搭建
    • 2. publisher消息发布者实现
    • 3. consumer消费者实现
  • 四、SpringAMQP与RabbitMQ工作模型
    • 1. SpringAMQP概述
    • 2. BasicQueue 基本模型(简单模型)
    • 3. WorkQueue 工作模型
    • 4. Publish、Subscribe 发布订阅模型
      • 4.1 Fanout 广播模型
      • 4.2 Direct 路由模型
      • 4.3 Topic 主题模型
    • 5. 消息转换器
      • 5.1 使用默认消息转换器发送Object类型消息
      • 5.2 使用Jackson消息转换器收发JSON消息
      • 5.3 使用默认消息转换器收发JSON消息


一、MQ与RabbitMQ概述


1. MQ简述


MQ(全称:Message Queue)直译是消息队列,是基础数据结构中 “先进先出” 的一种数据结构,也是在消息的传输过程中保存消息的容器(中间件),多用于分布式系统之间进行通信。

一般MQ用来解决系统耦合、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性架构。(AP架构)

在这里插入图片描述

总结:

  • 消息队列(MQ),是一种中间件,用于存储和传递消息。

  • 分布式系统有两种通信方式:直接远程调用(如OpenFeign) 和 借助第三方完成间接通信(如RabbitMQ)。

  • 发送方称为生产者,接收方称为消费者。


2. MQ的优势


MQ的优势:(应用解耦、异步、削峰)

  • 应用解耦:提高系统容错性和可维护性;
  • 异步提速:提升用户体验和系统吞吐量;
  • 削峰填谷:提高系统稳定性。

1、应用解耦

在这里插入图片描述

在这里插入图片描述


2、异步提速

在这里插入图片描述

在这里插入图片描述


3、削峰填谷(秒杀)

在这里插入图片描述

在这里插入图片描述

使用MQ之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了。

但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”,从而提升系统的稳定性。


3. MQ的劣势


引入MQ会遇到下列问题:

  • 消息可靠性问题(如何确保发送的消息至少被消费者消费一次,避免消息丢失问题)
  • 延迟消息问题 (如何实现消息的延迟投递,解决方案:使用延时队列、TTL、延迟队列插件实现)
  • 高可用问题(如何避免单点MQ故障而导致的不可用问题,解决方案:搭建MQ集群)
  • 消息堆积问题(如何解决数百万消息堆积,无法及时消费的问题)

4. 常见的MQ产品


市面上有很多MQ产品,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ、EMQ(物联网) 等,也有直接使用Redis充当消息队列的场景。在实际技术选型时,需要结合自身需求及MQ产品特点来综合考虑。

在这里插入图片描述

  • Kafka: 是一个高可用性、高吞吐量的分布式消息系统。它具有持久化、持续性、可扩展性和副本机制,并支持多分区和多消费者组。Kafka适用于大规模的数据流处理,如日志聚合、流处理和实时数据流。在追求可用性和高吞吐能力方面,Kafka是一个不错的选择。
  • RocketMQ: 是一个低延迟、高吞吐量的分布式消息队列系统。它提供了可靠的消息传递机制,支持高并发和高可用性的消息发布和订阅。RocketMQ适用于大规模的消息处理和异步通信场景。在追求可用性、可靠性和吞吐能力方面,RocketMQ是一个较好的选择。
  • RabbitMQ: 是一个可靠性较高、低延迟的开源消息队列系统。它采用AMQP协议,支持多种消息模式和消息确认机制。RabbitMQ适用于可靠性要求较高的任务和通信场景。在追求可用性、可靠性和低延迟方面,RabbitMQ是一个合适的选择。

追求可用性(高->低):Kafka、 RocketMQ 、RabbitMQ;

追求可靠性:RabbitMQ、RocketMQ;

追求吞吐能力:RocketMQ、Kafka;

追求消息低延迟:RabbitMQ、Kafka。


5. RabbitMQ(兔子MQ😀)


RabbitMQ官网:http://www.rabbitmq.com/

在这里插入图片描述

RabbitMQ是基于AMQP协议使用Erlang语言开发的一款消息队列产品。

AMQP (全称Advanced Message Queuing Protocol,表示高级消息队列协议),是一个网络协议,是应用层协议的一个开放标、准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类比HTTP。(同类的协议还有MQTT用于物联网场景下)


在RabbitMQ中,有以下一些角色:

  • Producer(生产者):生产者是指发送消息到RabbitMQ的应用程序。它创建消息并将其发送到交换器。

  • Consumer(消费者):消费者是指从RabbitMQ接收消息的应用程序。它基于订阅的方式从队列中获取消息并进行处理。

  • Exchange(交换器):交换器是消息的路由中心。当生产者发送消息时,通过交换器将消息路由到一个或多个队列。

  • Queue(队列):队列是RabbitMQ中存储消息的地方。消费者从队列中接收消息,并进行处理。

  • Binding(绑定):绑定将交换器与队列相关联。它定义了消息从交换器到队列的路由规则。

  • Broker(代理服务器):代理服务器是RabbitMQ的核心组件,负责接收和传递消息。它负责处理交换器、队列、消息的路由和转发。

  • Channel(信道):信道是RabbitMQ使用的通信通道,生产者和消费者通过信道与代理服务器进行交互。

  • Virtual Host(虚拟主机):虚拟主机在RabbitMQ中用于将不同的应用隔离开来。每个虚拟主机具有自己的交换器、队列和绑定。

这些角色共同组成了RabbitMQ的基本架构。生产者发送消息到交换器,通过绑定将消息路由到队列,消费者从队列中接收消息并进行处理。代理服务器负责消息的接收、传递和路由。信道用于生产者和消费者与代理服务器的通信,而虚拟主机提供了应用隔离的环境。
在这里插入图片描述

RabbitMQ工作模式:

文档地址:https://www.rabbitmq.com/getstarted.html

RabbitMQ提供了6种工作模型,但是我们常用的只有5种:简单队列模型、工作队列模型、发布订阅模型(广播、路由、主题)。(第6种RPC远程调用不属于MQ)

在这里插入图片描述

JMS (Sun公司提供一套Java操做消息队列的接口)

  • JMS(JavaMessage Service),Java消息服务应用程序接口,即Java操作消息中间件的API;
  • JMS是JavaEE规范的一种,类比JDBC;
  • 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ官方没有提供JMS的实现包,但是开源社区有提供。

二、RabbitMQ安装与配置


1. 基于docker快速安装RabbitMQ


扩展:docker-compose安装rabbitmq:https://gitee.com/aopmin/docker-compose/blob/master/Linux/RabbitMQ/docker-compose.yml

1、拉取镜像

docker pull rabbitmq:3.8-management

在这里插入图片描述

2、运行容器

在这里插入图片描述

 docker run -di \
 -e RABBITMQ_DEFAULT_USER=admin \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 -v mq-plugins:/plugins \
 --name rabbitmq \
 --hostname my-rabbit \
 -p 15672:15672 \
 -p 5672:5672 \
 --restart=always \
 rabbitmq:3.8-management
  • \ 代表换行
  • -e 指定环境变量
  • -e RABBITMQ_DEFAULT_USER=admin 用户名
  • -e RABBITMQ_DEFAULT_PASS=123456 密码
  • -v 挂载数据卷
  • -p 15672:15672 用于web管理页面使用的端口 (管理员页面,端口15672)
  • -p 5672:5672 用于生产和消费端使用的端口(通信端口,也就是在代码里要使用的)
  • -di ,d后台运行,i打开控制台交互
  • –name mq 容器名字
  • –hostname mq (这个参数在单机版mq配不配置都可以,用来设置主机名,搭建集群会用到);

扩展:启动xxx插件(后面会用到这个命令)

# 进入容器
docker exec -it rabbitmq /bin/bash

# 启动xxx插件
rabbitmq-plugins enable xxx

RabbitMQ管理端:

管理端访问地址:http://192.168.150.103:15672/

在这里插入图片描述

在这里插入图片描述


2. 创建用户和虚拟机


1、添加一个新用户:

在这里插入图片描述

添加成功后列表会显示该用户,但是这个用户没有操作权限,需要为他创建一个虚拟机:

在这里插入图片描述


2、创建虚拟机

在这里插入图片描述

为指定用户授权:

在这里插入图片描述

最后该用户就可以操作这个虚拟机了:

在这里插入图片描述


三、RabbitMQ快速入门


使用传统写法完成简单模的消息传递:(特点:一条消息只能被一个消费者消费)

在这里插入图片描述

官方的HelloWorld示例是基于简单消息队列模来实现的,其中包括三个角色:

  • publisher:消息发布者,将消息发送到队列queue;
  • queue:消息队列,负责接受并缓存消息;
  • consumer:订阅队列,处理队列中的消息。

1. 基础环境搭建


1、创建maven工程,并在pom文件中导入如下依赖:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.9.RELEASE</version>
    <relativePath/>
</parent>

<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!--SpringAMQP依赖,可以操作RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!--单元测试-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

2、创建子模块publisher(生产者)、consumer(消费者),并编写启动类和yml配置文件:

# 日志输出格式配置
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS

在这里插入图片描述


2. publisher消息发布者实现


消息收发流程:Connection连接、Channel通道、queue队列和exchange 交换机。

publisher消息发布者实现思路:

  • 建立连接
  • 创建Channel
  • 声明队列
  • 发送消息
  • 关闭连接和channel

1、编写publisher测试代码:

package cn.aopmin.mq.test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者(传统写法)
 *
 * @author 白豆五
 * @version 2023/07/2
 * @since JDK8
 */
public class PublisherTest {

    /**
     * 发送消息
     *
     * @throws IOException
     * @throws TimeoutException
     */
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost虚拟主机、用户名、密码
        factory.setHost("192.168.150.103");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("123456");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue"; // 队列名称
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}

在这里插入图片描述


2、在建立连接处打断点,并以debug方式启动(方便观察每个组件的创建)

在这里插入图片描述

查看连接信息:

在这里插入图片描述


回到IDEA继续按F8,查看通道信息:

在这里插入图片描述

在这里插入图片描述


继续按F8,查看队列信息:
在这里插入图片描述
在这里插入图片描述


最后直接放行程序,查看队列中的消息:

在这里插入图片描述
在这里插入图片描述


3. consumer消费者实现


consumer消费者实现思路:

  • 建立连接
  • 创建Channel
  • 声明队列
  • 订阅消息

1、编写消费者代码

package cn.aopmin.mq.test;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者(传统写法)
 * @author 白豆五
 * @version 2023/04/27
 * @since JDK8
 */
public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.103");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("123456");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            // 接收消息的回调函数
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

在这里插入图片描述

2、测试(消费者启动程序后会一直执行,不用的时候将程序结束即可)

在这里插入图片描述
在这里插入图片描述


四、SpringAMQP与RabbitMQ工作模型


1. SpringAMQP概述


AMQP是消息中间件收发消息的协议(规范),具体实现由各个消息中间厂商实现;(例如 RabbitMQ)

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAMQP的官方地址:https://spring.io/projects/spring-amqp

在这里插入图片描述

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系;
  • 基于注解的监听器模式,异步接收消息;
  • 封装了RabbitTemplate工具,用于发送消息 。

RabbitMQ工作模型:简单队列模型、工作队列模型、发布订阅模型(广播、路由、主题)。

在这里插入图片描述


2. BasicQueue 基本模型(简单模型)


使用SpringAMQP实现简单模型的消息收发:

在这里插入图片描述

1、在父工程中引入spring-amqp起步依赖:

<!--SpringAMQP:可以操作RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

完整的pom.xml配置:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.aopmin</groupId>
    <artifactId>rabbitmq02-BasicQueue</artifactId>
    <version>1.0.0</version>
    <packaging>pom</packaging>
    <description>springAMQP实现简单模型消息传递</description>
    <modules>
        <module>publisher</module>
        <module>consumer</module>
    </modules>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.2</version>
        <relativePath/>
    </parent>

    <dependencies>
        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- SpringAMQP:可以操作RabbitMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- 单元测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>


    <!-- 打包插件 -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2、消息发送

2.1、在publisher服务的application.yml中添加rabbitmq配置:

# RabbitMQ配置
spring:
  rabbitmq:
    host: 192.168.150.103 # 主机名
    port: 5672       # 端口
    virtual-host: /  # 虚拟主机
    username: admin  # 用户名
    password: 123456 # 密码

# 日志配置
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS

2.2、在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

package cn.aopmin.mq.test;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * 使用SpringAMQP实现简单模型的消息发送
 *
 * @author 白豆五
 * @version 2023/07/2
 * @since JDK8
 */
@SpringBootTest
// @RequiredArgsConstructor // 生成构造方法(构造器注入,要求注入的字段必须final修饰)
public class SpringAmqpTest {

    /**
     * RabbitTemplate是SpringAMQP中的核心类,用于实现消息的发送和接收
     */
    @Autowired
    private  RabbitTemplate rabbitTemplate;

    /**
     * 测试简单模型的消息发送
     */
    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

在这里插入图片描述


3、消息接收

3.1、在consumer服务的application.yml中添加rabbitmq配置:

# RabbitMQ配置
spring:
  rabbitmq:
    host: 192.168.150.103 # 主机名
    port: 5672       # 端口
    virtual-host: /  # 虚拟主机
    username: admin  # 用户名
    password: 123456 # 密码

# 日志配置
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS

3.2、在consumer服务的com.baidou.mq.listener包中创建SpringRabbitListener类:

package cn.aopmin.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息监听类
 *
 * @author 白豆五
 * @version 2023/07/2
 * @since JDK8
 */
@Component
public class SpringRabbitListener {

    /**
     * 订阅消息
     *
     * @param msg 消息
     * @throws InterruptedException
     */
    @RabbitListener(queues = "simple.queue")  // 配置要监听的队列: simple.queue
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("消费者接收到消息:【" + msg + "】");
    }
}

4、测试

先启动consumer服务(启动类),然后再运行publisher服务中发送消息的测试代码。
在这里插入图片描述
在这里插入图片描述


3. WorkQueue 工作模型


工作队列模型(Work Queue Mode):消息按照一定的策略分配给多个消费者来解决消息堆积问题,适用于任务分发和负载均衡场景。

角色:生产者、队列、消费者

在这里插入图片描述

使用SpringAMQP实现工作队列模型的消息收发:

1、在消费者监听类中编写两个方法,监听同一个队列,模拟多个消费者。

package cn.aopmin.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息监听类
 *
 * @author 白豆五
 * @version 2023/07/2
 * @since JDK8
 */
@Component
public class SpringRabbitListener {

    /*
       编写两个方法监听同一个队列,可以实现多个消费者同时消费一个队列的消息
     */
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】");
    }

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage2(String msg) throws InterruptedException {
        System.out.println("消费者2接收到消息:【" + msg + "】");
    }
}

在这里插入图片描述

2、模拟生产者发多条消息:

/**
* 测试工作模型发消息
*/
@Test
public void testWork() {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello,rabbitmq";

    // 模拟发送100条消息
    for (int i = 1; i <= 100; i++) {
        rabbitTemplate.convertAndSend(queueName, message + i);
    }
    System.out.println("消息发送完毕!");
}

在这里插入图片描述

3、测试:先启动消费者服务,再执行生产者发送消息的代码。

在这里插入图片描述


消费者预取消息限制:

工作模型默认一人一半消息,可以通过修改消费者application.yml文件,配置prefetch属性,控制消费者预取消息的上限:

# RabbitMQ配置
spring:
  rabbitmq:
    host: 192.168.150.103 # 主机名
    port: 5672       # 端口
    virtual-host: /  # 虚拟主机
    username: admin  # 用户名
    password: 123456 # 密码
    listener:
      simple:
        prefetch: 1  # 消息预取策略(每次获取一条消息,处理完后再获取下一条)

prefetch属性用于指定消费者一次从RabbitMQ服务器预取的消息数量。通过限制预取消息的数量,你可以控制每个消费者同时处理的消息数量,从而实现负载均衡和资源控制。


4. Publish、Subscribe 发布订阅模型


发布订阅模型特点: 可以通过交换机(exchange)将一条消息发给多个队列(消费者)进行处理。

常见的exchange类型包括:Fanout广播、Direct路由、Topic主题。

交换机的主要作用:

  • 接收生产者发送的消息
  • 将消息按照规则路由到绑定过的队列中
  • 它不能缓存消息,路由失败,消息丢失
    在这里插入图片描述

SpringAMQP提供了一个Exchange接口,来表示所有不同类型的交换机:

在这里插入图片描述


4.1 Fanout 广播模型


Fanout Exchange,交换机会把收到的消息发送给绑定过的所有队列。(队列需要与交换机建立关系,然后才能收到对应消息)

在这里插入图片描述


接下来使用SpringAMQP演示Fanout Exchange收发消息:

1、在consumer服务中,利用代码声明队列、交换机,并将两者绑定

package cn.aopmin.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 在消费端声明队列、交换机、绑定关系,这样就不用在rabbitmq管理页面手动创建了,这样在服务启动后springAMQP会自动创建
 *
 * @author 白豆五
 * @version 2023/07/2
 * @since JDK8
 */
@Configuration
public class MqConfig {

    /**
     * 声明Fanout交换机
     * 交换机名: exchange.fanout
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("exchange.fanout");
    }


    /**
     * 声明队列
     * 队列名: fanout.queue1
     */
    @Bean
    public Queue queue1() {
        return new Queue("fanout.queue1");
    }


    /**
     * 声明队列
     * 队列名: fanout.queue2
     */
    @Bean
    public Queue queue2() {
        return new Queue("fanout.queue2");
    }


    /**
     * 绑定关系
     * 将队列1绑定到Fanout交换机上
     */
    @Bean
    public Binding binding1(FanoutExchange fanoutExchange, Queue queue1) {
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }

    /**
     * 绑定关系
     * 将队列2绑定到Fanout交换机上
     */
    @Bean
    public Binding binding2(FanoutExchange fanoutExchange, Queue queue2) {//参数注入,即参数名就是bean的名字
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }
}

消费者application.yml配置:

# RabbitMQ配置
spring:
  rabbitmq:
    host: 192.168.150.103 # 主机名
    port: 5672       # 端口
    virtual-host: /  # 虚拟主机
    username: admin  # 用户名
    password: 123456 # 密码

# 日志配置
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS

2、在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

package cn.aopmin.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息监听类
 *
 * @author 白豆五
 * @version 2023/07/2
 * @since JDK8
 */
@Component
public class FanoutListener {

    /*
       编写两个方法,分别监听队列1和队列2
     */
    @RabbitListener(queues = "fanout.queue1")
    public void listenerFanoutQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenerFanoutQueue2(String msg) throws InterruptedException {
        System.out.println("消费者2接收到消息:【" + msg + "】");
    }
}

编写监听类后,启动消费者服务会自动创建交换机和队列组件:

在这里插入图片描述

3、在publisher的测试类中编写向exchange.fanout发消息的代码:

/**
 * 测试广播模式发送消息
 */
@Test
public void testFanout() {
    // 交换机名称
    String exchangeName = "exchange.fanout";
    // 消息
    String message = "hello,rabbitmq";

    // 发送消息
    // 第一个参数是交换机名称
    // 第二个参数是routingKey(路由key),在广播模式下不需要指定
    // 第三个参数是消息
    rabbitTemplate.convertAndSend(exchangeName, "", message);
    System.out.println("消息发送完毕!");
}

执行测试方法,查看运行结果:

在这里插入图片描述


4.2 Direct 路由模型


Direct exchange,会将接收到的消息按照规则(Routing key)转发到指定的队列,因此称为路由模式(routes)

在交换机上做了一层规则判断操作。

Fanou模型要求:

  • 每一个Queue都与Exchange设置一个BindingKey;
  • 发布者发送消息时,指定消息的RoutingKey;
  • Exchange会将消息路由到BindingKey与消息RoutingKey一致的队列上。

在这里插入图片描述

基于AMQP演示Direct模型::

1、在consumer服务的监听类中,编写两个消费者方法,并在方法上通过@RabbitListener组合注解声明Exchange、Queue、RoutingKey,然后分别监听direct.queue1和direct.queue2队列中的消息:

package cn.aopmin.mq.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.xml.ws.BindingType;

/**
 * 消息监听类 (通过注解的方式声明交换机和队列、及绑定关系)
 *
 * @author 白豆五
 * @version 2023/07/2
 * @since JDK8
 */
@Component
public class DirectListener {

    /**
     * 在监听方法上通过注解的方式声明交换机和队列、及绑定关系
     * 队列: 通过@Queue注解创建队列
     * 交换机: 通过@Exchange注解创建交换机
     * 绑定关系: 通过bindingkey绑定{"blue", "red"}
     *
     * @param msg
     * @throws InterruptedException
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),//创建队列
            exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),//创建direct交换机
            key = {"blue", "red"} // bindingkey
    ))
    public void listenDirect1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),//创建队列
            exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),//创建direct交换机
            key = {"yellow", "red"} // bindingkey
    ))
    public void listenDirect2(String msg) throws InterruptedException {
        System.out.println("消费者2接收到消息:【" + msg + "】");
    }
}

在这里插入图片描述


2、在publisher中编写测试方法,向exchange.direct发送消息

/**
* 测试路由模式发送消息
*/
@Test
public void testDirect() {
    // 交换机名称
    String exchangeName = "exchange.direct";
    // 消息
    String message = "helloworld!";
    
    // 发送消息
    // 第一个参数是交换机名称
    // 第二个参数是routingKey(路由key,发消息时候用的),在路由模式下需要指定
    // 第三个参数是消息
    rabbitTemplate.convertAndSend(exchangeName, "blue", "routingKey:blue ---" + message);
    rabbitTemplate.convertAndSend(exchangeName, "red", "routingKey:red ---" + message);
    System.out.println("消息发送完毕!");
}

在这里插入图片描述


3、测试:启动消费者服务创建交换机和队列,然后执行生产者发消息方法

在这里插入图片描述
在这里插入图片描述


小节:

1、Direct与Fanout交换机的区别?

  • Fanout相对于Direct更灵活些。

  • Fanout交换机不做判断,收到消息就会广播给绑定的队列。

  • Direct交换机会根据RouthingKey判断,然后路由给满足规则的队列。

  • 在Direct模型中,如果多个队列都有相同的RouthingKey,则与Fanout功能类似。

2、基于@RabbitListeneri注解声明队列和交换机有哪些常见注解?

  • @QueueBinding 绑定关系
  • @Queue 队列
  • @Exchange 交换机

4.3 Topic 主题模型


Topic Exchange 与 Direct Exchange类似,区别在与RoutingKey必须是多个单词组成,并且以==.== 分割。(用的最多)

队列与交换机指定BindingKey时可以使用通配符:

  • #:表示匹配0或多个单词;例如 china.# 、#.new
  • *:表示匹配1个单词;

在这里插入图片描述

  • Queue1:绑定的是java.# ,因此凡是以 java.开头的routing key 都会被匹配到,例如 java.news、java.blog;
  • Queue2:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配到,例如java.news、weather.news、heihe.weather.news。

基于AMQP演示Topic 模型:

1、在消费端的监听方法上声明交换机和队列、及绑定关系

package cn.aopmin.mq.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.xml.ws.BindingType;

/**
 * 消息监听类 (通过注解的方式声明交换机和队列、及绑定关系)
 *
 * @author 白豆五
 * @version 2023/07/2
 * @since JDK8
 */
@Component
public class TopicListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),//创建队列
            exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),//创建direct交换机
            key = "java.#" // bindingkey
    ))
    public void listenTopic1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),//创建队列
            exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),//创建direct交换机
            key = "#.news" // bindingkey
    ))
    public void listenTopic2(String msg) throws InterruptedException {
        System.out.println("消费者2接收到消息:【" + msg + "】");
    }
}

2、在生产端,编写发消息方法

/**
 * 测试主题模式发送消息
 */
@Test
public void testTopic() {
    // 交换机名称
    String exchangeName = "exchange.topic";
    // 消息
    String message = "helloworld!";

    // 发送消息
    // 第一个参数是交换机名称
    // 第二个参数是routingKey(路由key),在路由模式下需要指定
    // 第三个参数是消息
    rabbitTemplate.convertAndSend(exchangeName, "java.blog", message);
    rabbitTemplate.convertAndSend(exchangeName, "java.news", message);
    System.out.println("消息发送完毕!");
}

3、测试

在这里插入图片描述


小节

1、Direct和Topic交换机的区别?

  • 相同点:两个交换机都会key进行判断。(即消息的路由key与队列的绑定key进行比较)
  • 不同点:Topic队列的绑定key支持通配符更加灵活。Direct队列的绑定key不支持通配符,只能匹配具体key的消息。

5. 消息转换器


默认情况下,Spring会帮我们把发送的任意对象类型消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
在这里插入图片描述

但是,Spring默认使用的是JDK序列化,JDK序列化会存在一些问题:

  • 数据体积过大;
  • 有安全隐患;
  • 可读性差。

5.1 使用默认消息转换器发送Object类型消息


1、声明队列、编写监听方法

package cn.aopmin.mq.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 声明队列
 *
 * @author 白豆五
 * @version 2023/07/3
 * @since JDK8
 */
@Configuration
public class MqConfig {

    @Bean
    public Queue ObjectQueue() {
        return new Queue("object.queue");
    }
}
@Component
public class ObjectListener {
    @RabbitListener(queues = "object.queue")
    public void listenObj(Object obj) {
        System.out.println("收到消息:" + obj.toString());
    }
}

2、编写发消息代码

/**
 * 测试默认消息转换器收发消息(JDK序列化)
 */
@Test
public void testDefault() {
    // 队列名称
    String queueName = "object.queue";
    // 对象消息
    Map<String, Object> message = new HashMap<>();
    message.put("name", "张三");
    message.put("age", 18);

    // 发送消息
    rabbitTemplate.convertAndSend(queueName, message);
    System.out.println("消息发送完毕!");
}

测试,查看队列数据:(默认情况下JDK序列化的结果不直观,可以把消息转成json格式发送)

在这里插入图片描述


5.2 使用Jackson消息转换器收发JSON消息


Spring提供了org.springframework.amqp.support.converter.MessageConverter接口来处理对象消息的转换。在AMQP中默认实现是SimpleMessageConverter,而SimpleMessageConverter它基于JDK的ObjectOutputStream完成序列化。

如果我们不想使用默认的消息转换器,只需在生产端和消费端配置MessageConverter类型的Bean即可。

1、生产端和消费端都引入jackson依赖:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

2、生产端和消费的都配置消息转换器

package cn.aopmin.mq;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

//生产端
@SpringBootApplication
public class PublisherApp {
    public static void main(String[] args) {
        SpringApplication.run(PublisherApp.class, args);
    }

    // 使用json序列化机制,进行消息转换
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}
package cn.aopmin.mq;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

//消费端
@SpringBootApplication
public class ConsumerApp {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApp.class, args);
    }
    // 使用json序列化机制,进行消息转换
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

3、修改消费端的监听方法

@Component
public class ObjectListener {
    @RabbitListener(queues = "object.queue")
    public void listenObj(Map<String,Object> msg) {
        System.out.println("收到消息:" + msg);
    }
}

4、测试

在这里插入图片描述


5.3 使用默认消息转换器收发JSON消息


上一种方案,配来配去非常麻烦,而且一旦消息转换器不一样,就不能达到想要的结果。默认情况下,对于字符串类型的消息,默认的JDK消息转换器会使用UTF-8编码将字符串转换为字节数组,并将其作为消息体进行发送。

这样我们在发消息的时候手工将对象序列化为json字符串,在接收消息时再序列化为Java对象即可。

JSON工具类:

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.76</version>
</dependency>

常用方法:

  • 序列化:JSON.toJSONString(xxx);
  • 反序列化:JSONObject(str,Xxx.class);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/713880.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Redis】Redis高可用

目录 主从复制1. 全量复制2. 增量复制3. 主从复制的问题&#xff08;1&#xff09;. 主从复制延迟&#xff08;2&#xff09;. 读到过期数据&#xff08;3&#xff09;. 主从配置不一致导致数据丢失&#xff08;4&#xff09;. 全量复制性能损耗大&#xff08;5&#xff09;. 主…

机器人动力学与控制学习笔记(十五)——机器人路径规划

十五、机器人路径规划 15.1 机器人运动规划 机器人运动规划包含三个层次的内容&#xff1a;即路径规划、轨迹规划、轨迹跟踪或轨迹控制。路径规划是确定不含时间信息的几何路径。一般的工业机器人中都含有点到点&#xff0c;直线&#xff0c;圆弧及样条曲线等常用轨迹的路径…

【进程】进程概念及相关函数实现

目录 0. 进程概述 1. 创建进程 1.1 进程的创建&#xff1a;fork函数 1.2 进程的等待&#xff1a;wait()、waitpid() 1.3 特殊进程&#xff1a;僵尸进程、孤儿进程、守护进程 1.4 进程的终止&#xff1a;exit和_exit函数 1.5 进程退出清理&#xff1a;atexit函数 1.6 进…

【我的创作纪念日】关于某站的音频爬虫+GUI

文章目录 一、前言&机遇二、爬虫代码三、爬虫GUI四、文件打包五、结果展示未来可期 一、前言&机遇 许久没看私信内容&#xff0c;一上线就看到了官方的私信&#xff0c;我已经来到CSDN1024天啦&#xff01; 想到注册这个号的初衷是学习记录爬虫&#xff0c;后面渐渐变…

抖音产业带服务商哪些类目在招募?开通需要什么条件?

5月&#xff0c;刚刚结束的抖音电商生态大会上&#xff0c;抖音电商总裁魏雯雯披露&#xff0c;近一年平台GMV增幅超80%。其中&#xff0c;商城GMV同比增长277%&#xff0c;电商搜索GMV同比增长159%&#xff0c;货架场景GMV在平台GMV占比超30%。过去一年&#xff0c;抖音电商直…

数据结构与算法——树与二叉树

&#x1f60a;数据结构与算法——树与二叉树 &#x1f680;前言&#x1f680;树&#x1f6a2;树的定义&#x1f6a2;树的基本术语&#x1f6a2;有序树和无序树&#x1f6a2;森林 &#x1f680;二叉树&#x1f6a2;二叉树的定义&#x1f6a2;二叉树的性质&#x1f6a2;满二叉树&…

【VUE】Element UI 表单校验出现async-validator: [‘discipline is required‘]报错

问题:async-validator: [discipline is required] 选择器已经获取到数值&#xff0c;却显示获取到 解决办法如下

线性规划算法

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家&#xff1a;点击跳转 本文部分内容来自网友博客 一&#xff0c;线性规划 例如&#xff0c;一个企业“生产计划”的线性规划模型如下&#xff1a; 是subjec…

​如何优雅的卸载Edge浏览器

如何优雅的卸载Edge浏览器 由于Edge浏览器越来越复杂&#xff0c;功能越来越繁琐我是真的一刻也用不下去了。虽然我主力是火狐浏览器&#xff0c;Edge用来访问一些只能使用Chromium内核的网页作为备用。 但是我现在一打开Edge浏览器我就窝火&#xff0c;也懒得再去调整优化&a…

【表格树状】jqgrid表格树状折叠效果实现(附代码,留邮箱发demo)

【写在前面】有段时间没好好的整理一篇前端文章了&#xff0c;之前的6月城市活动也结束了&#xff0c;期待下周的榜单公布&#xff0c;其实这个月还有一个东西也让我牵肠挂肚的&#xff0c;就是软考的成绩也会在这个月的中旬公布&#xff0c;也是感觉很悬。既成定局&#xff0c…

【中间件-Openjob】高性能任务调度框架Openjob简介及快速搭建

介绍基础基础信息任务调度框架对比 特性高可靠高性能定时调度分布式计算延迟任务工作流程权限管理告警监控跨语言 安装访问docker-compose安装在线访问 总结 介绍 一款分布式高性能任务调度框架&#xff0c;支持多种定时任务、延时任务、工作流设计、轻量级分布式计算、无限水平…

el-form动态嵌套表单验证

v-for 遍历的表单校验 根据官网的介绍&#xff0c;是在 el-form-item 中使用 :rules 属性&#xff0c;同时 prop 属性直接定位到具体循环元素。这个用法的前提是在循环外面包裹一个 el-form 元素&#xff0c;v-for 位于 el-form-item 中。 <template><el-form:model…

深入浅出对话系统——对话管理与对话生成

引言 对话管理 我们知道对话管理主要包括状态追踪(DST)和策略优化(DPO)。 对话管理模块包含两个子任务&#xff1a; 对话状态追踪(Dialogue State Tracking) 根据用户输入和对话历史识别对话状态&#xff1b;策略学习(Policy Learning) 根据识别到的对话状态选择合适的下一步…

BC SU21 对象 ZJHD_LGO 已交付;只能进行有限更改

ECC 升级S4 库位从ECC的唯一库位 → S4 工厂 库位才能唯一。 那原先的 依据库存地 控制库位的zjhd打印机的权限需要新增工厂字段。 但是su21的修改的时候 提示 &#xff1a;对象 ZJHD_LGO 已交付&#xff1b;只能进行有限更改 查了一下&#xff0c;SAP官方说只能 把该权限…

HCIP第二次作业

要求&#xff1a;R1-R2-R3-R4-R5 RIP 100运行版本2 R6-R7 RIP 200 运行版本1 1.使用合理IP地址规划网络&#xff0c;各自创建环回接口 2.R1创建环回 172.16.1.1/24 172.16.2.1/24 172.16.3.1/24 3.要求R3使用R2访问R1环回 4.减少路由条目数量&#xff0c;R1-R2之间增加路由传递…

部署springboot项目读取外部配置文件

我们在部署springboot项目的时候&#xff0c;经常会遇到这样的情况&#xff1a;测试环境与生产环境的配置不一样&#xff0c;这就导致每次部署的时候都要修改配置文件再打包&#xff0c;即使用了nacos进行配置管理&#xff0c;但测试环境与生产环境的nacos部署的地方肯定不一样…

MyBatis简单入门

文章目录 快速入Mapper代理开发具体步骤使用mapper代理中的包扫描 查询字段名称不一致问题方法一&#xff1a;对SQL语句起别名方法二&#xff1a; 采用resultMap映射 条件查询单条件查询SQL中特殊字符的处理 多条件查询方式一&#xff1a;散装参数方式二&#xff1a;对象参数方…

ABAP:ABAP解析xml文件的方法

目前我在ECC的系统找到两种实现XML解析的办法&#xff0c;第一种是通过strans创建转化例程&#xff0c;然后在程序中调用转化例程来转化xml&#xff0c;第二种是调用方法按照node解析xml。 要转化的xml文件demo如下 <?xml version"1.0" encoding"Windows-…

1.5 编写自定位ShellCode弹窗

在笔者上一篇文章中简单的介绍了如何运用汇编语言编写一段弹窗代码&#xff0c;虽然简易ShellCode可以被正常执行&#xff0c;但却存在很多问题&#xff0c;由于采用了硬编址的方式来调用相应API函数的&#xff0c;那么就会存在一个很大的缺陷&#xff0c;如果操作系统的版本不…

提取图像中的文本信息(Tesseract OCR 和 pytesseract)

环境准备 安装Tesseract&#xff1a;点这里参考本人博客 下载第三方库 pip install Pytesseract这个库只自带了一个英语的语言包&#xff0c;这个时候如果我们图片中有对中文或者其他语言的识别需求&#xff0c;就需要去下载其他语言包 下载其他语言包 进入官网以后进入Tra…