Spring Boot | Spring Boot 整合 “RabbitMQ“ ( 消息中间件 ) 实现

news2025/1/12 9:48:07

目录:

  • Spring Boot 整合 "RabbitMQ" ( 消息中间件 )实现 :
    • 一、Spring Boot 整合 整合实现 : Publish/Subscribe ( 发布订阅 ) 工作模式 ( "3种"整合实现方式 )
      • 1.1 基于"API"的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) - 偶然使用
        • (1) 创建项目,"全局配置文件" 中配置信息
        • (2) 使用 AmqpAdmin "定制消息发送组件"
        • (3) 查看 "RabbitMQ消息组件" 的定制效果
        • (4) "消息发送者" 发送消息 :
          • ① 创建 "实体类对象" ( 存储发送的"消息内容" )
          • ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
          • ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
        • (5) "消息消费者" 接收消息
      • 1.2 基于 "配置类" 的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) - 常用
        • (1) 创建项目,"全局配置文件" 中配置信息
        • (2) 使用 "配置类"的方式 "定制消息发送组件"
        • (3) "消息发送者" 发送消息 :
          • ① 创建 "实体类对象" ( 存储发送的"消息内容" )
          • ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
          • ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
        • (4) "消息消费者" 接收消息
      • 1.3 基于 "注解" 的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) -常用
        • (1) 创建项目,"全局配置文件" 中配置信息
        • (2) 使用 "注解" 的方式定制 "消息发送组件" 和 "消息消费者" "接收消息"
        • (3) "消息发送者" 发送消息 :
          • ① 创建 "实体类对象" ( 存储发送的"消息内容" )
          • ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
          • ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
        • (4) 控制台查看 "消费者" 消费信息情况
    • 二、Spring Boot 整合 整合实现 : Routing ( 路由模式 ) 工作模式
      • 2.1 基于 "注解" 的方式 ( 实现 Routing "路由模式"工作模式 )
        • (1) 创建项目,"全局配置文件" 中配置信息
        • (2) 使用 "注解" 的方式 "定制消息发送组件" 和 "消息消费者" "接收消息"
        • (3) "消息发送者" 发送消息 :
          • ① 创建 "实体类对象" ( 存储发送的"消息内容" )
          • ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
          • ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
        • (4) 控制台 查看 "消费者" 消费信息情况
    • 三、Spring Boot 整合 整合实现 : Topics ( 通配符模式 ) 工作模式
      • 3.1 基于 "注解" 的方式 ( 实现 Topics"通配符模式"工作模式 )
        • (1) 创建项目,"全局配置文件" 中配置信息
        • (2) 使用 "注解" 的方式 "定制消息发送组件" 和 "消息消费者" "接收消息"
        • (3) "消息发送者" 发送消息 :
          • ① 创建 "实体类对象" ( 存储发送的"消息内容" )
          • ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
          • ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
        • (4) 控制台 查看 "消费者" 消费信息情况

Spring Boot 整合 “RabbitMQ” ( 消息中间件 )实现 :

在这里插入图片描述

作者简介 :一只大皮卡丘,计算机专业学生,正在努力学习、努力敲代码中! 让我们一起继续努力学习!

该文章参考学习教材为:
《Spring Boot企业级开发教程》 黑马程序员 / 编著
文章以课本知识点 + 代码为主线,结合自己看书学习过程中的理解和感悟 ,最终成就了该文章

文章用于本人学习使用 , 同时希望能帮助大家。
欢迎大家点赞👍 收藏⭐ 关注💖哦!!!

(侵权可联系我,进行删除,如果雷同,纯属巧合)


  • Spring Boot 可对 RabbitMQ工作模式 ( 6种工作模式 )进行了 整合,并支持 多种整合方式,包括 : 基于API 的方式 基于配置类的方式 基于注解的方式
  • 下面将 选取常用 Publish/Subscribe ( 发布订阅 )工作模式 Routing ( 路由 )工作模式Topics ( 通配符模式 ) 工作模式 3种工作模式完成在 Spring Boot 项目 中的 消息服务整合实现 ( 整合实现"消息中间件" )。

一、Spring Boot 整合 整合实现 : Publish/Subscribe ( 发布订阅 ) 工作模式 ( "3种"整合实现方式 )

  • Spring Boot 整合 RabbitMQ 中间件实现消息服务,主要围绕3个部分的工作进行展开 :
    定制中间件消息发送者发送消息消息消费者接收消息。其中,定制中间件比较麻烦的工作,且必须预先定制
  • 下面我们以 用户注册成功同时 “发送邮件通知”发送短信通知 这一场景为例,分别使用 : 基于API 的方式 基于配置类的方式 基于注解的方式3种 方式实现 Publish/Subscribe ( 发布订阅 )工作模式 整合

1.1 基于"API"的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) - 偶然使用

  • 基于API 的方式主要讲的是使用 Spring 框架提供的 API管理类 : AmqpAdmin 定制 消息发送组件,并进行消息发送
  • 这种 定制消息发送组件方式与在 RabbitMQ 可视化界面上通过 对应面板 进行 组件操作实现基本一样,都是通过管理员的身份预先手动声明交换器队列路由键等,然后 “组装消息队列” 供应用程序调用,从而 实现消息服务。下面我们就对这种 基于 API的方式进行 讲解和演示
(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

    application.properties ( 全局配置文件 ):

    #配置RabbitMQ消息中间件的"连接配置"
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    #配置RabbitMQ虚拟主机路径/ , 默认可省略
    spring.rabbitmq.virtual-host=/
    

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-parent</artifactId>
           <version>3.2.5</version>
           <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.myh</groupId>
        <artifactId>chapter_22</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>chapter_22</name>
        <description>chapter_22</description>
        <properties>
           <java.version>17</java.version>
        </properties>
        <dependencies>
           <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-amqp</artifactId>
           </dependency>
           <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-web</artifactId>
           </dependency>
    
           <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-test</artifactId>
              <scope>test</scope>
           </dependency>
           <dependency>
              <groupId>org.springframework.amqp</groupId>
              <artifactId>spring-rabbit-test</artifactId>
              <scope>test</scope>
           </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
    <!--	<build>-->
    <!--		<plugins>-->
    <!--			<plugin>-->
    <!--				<groupId>org.springframework.boot</groupId>-->
    <!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
    <!--			</plugin>-->
    <!--		</plugins>-->
    <!--	</build>-->
    
    </project>
    
(2) 使用 AmqpAdmin “定制消息发送组件”
  • 在项目的测试类 : Chapter22ApplicationTests , 在该测试类先引入 AmqpAdmin 管理类 定制 Publish/Subscribe 工作模式 所需的 消息组件代码如下所示 :

    Chapter22ApplicationTests.Java ( 测试类 ) :

    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter22ApplicationTests {
    
        @Test
        void contextLoads() {
        }
    
        //自动注入 RabbitMQ消息中间件的API操作类: AmqpAdmin
        @Autowired
        private AmqpAdmin amqpAdmin;
    
        @Test
        public void amqpAdmin() {
            //1.定义"fanout"类型的交换器
            amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
            //2.定义两个"默认持久化队列" , 分别处理email 和 sms
            amqpAdmin.declareQueue(new Queue("fanout_queue_email")); //该队列处理"邮件信息"
            amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));//该队列处理"短信信息"
            //3.将两个队列分别与"交换器"进行绑定
            amqpAdmin.declareBinding(new Binding("fanout_queue_email",
                    Binding.DestinationType.QUEUE,"fanout_exchange","",null));
            amqpAdmin.declareBinding(new Binding("fanout_queue_sms",
                    Binding.DestinationType.QUEUE,"fanout_exchange","",null));
        }
    
    }
    

    上面的代码中,使用 Spring 框架提供的消息管理组件 AmqpAdmin定制了消息组件。其中定义了一个 fanout 类型交换器 : fanout_exchange ;
    还定义两个消息队列 : fanout_queue_emailfanout_queue_sms,分别用来处理邮件信息短信信息 ;最后 将定义的两个队列分别交换器绑定

(3) 查看 “RabbitMQ消息组件” 的定制效果
  • 执行上述单元测试方法 : amgpAdmin()验证 RabbitMQ 消息组件的定制效果单元测试方法执行成功,可通过 RabbitMQ 可视化管理页面具体操作为: 打开RabbitMQ安装目录中的 sbin目录,双击 rabbitmq-server.bat ,后在浏览器访问访问 : http://localhost:15672 ,输入密码和密码 : guest , 此时 可在可视化界面查询 “RabbitMQ消息组件定制效果 ,如下图所示

    在这里插入图片描述


    在这里插入图片描述

    通过上面图片看出,在 Queues 队列面板页面中,展示有定制消息队列信息与程序中 "定制"消息队列一致。可以单击消息队列名称查看每个队列详情

    通过上述操作可以发现,在管理页面中提供了 消息组件交换器队列的定制功能。在程序中使用 Spring 框架提供的管理员 API组件 AmqpAdmin 定制消息组件和在管理页面上手动定制消息组件本质是一样 的。

(4) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"
        private Integer id;
        private String username;
    
        public Integer getId() {
            return id;
        }
    
        public void setId(Integer id) {
            this.id = id;
        }
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        @Override
        public String toString() {
            return "User{" +
                    "id=" + id +
                    ", username='" + username + '\'' +
                    '}';
        }
    }
    
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架提供的 RabbitTemplate 模板类实现消息发送示例代码如下 ( 在原测试类中加入以下"主要代码") :

    Chapter22ApplicationTests.java :

    import com.myh.chapter_22.domain.User;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter22ApplicationTests {
    
        @Test
        void contextLoads() {
        }
    
        @Autowired
        //引入进行"消息中间件"管理的 RabbitTemplate组件对象
        private RabbitTemplate rabbitTemplate;
    
       /**
        * "消息发送者" 发送信息
        */
        @Test
        public void psubPublisher() {
            User user = new User(); //消息发送者 要发送的"消息内容"
            user.setId(1);
            user.setUsername("石头");
            /*
              使用 RabbitTemplate中的
              convertAndSend(String exchange, String routingKey, Object object)方法完成 "消息发送者" "发送信息" 这一行为
                 String exchange : 表示发送信息的"交换器"
                 String routingKey : 表示路由键,因为此处实现的Publish/Subscrible工作模式,所以"不需要指定"
                 Object object : 表示要"发送的信息内容"
             */
            //执行哪个"交换器" 和 指定给该"交换器"的"消息内容"
            rabbitTemplate.convertAndSend("fanout_exchange","",user); //user 表示要发送的"信息内容"
        }
     }
    

    上述代码中,先使用@Autowired 注解引入了进行 消息中间件管理RabbitTemplate 组件对象,然后使用该模板工具类
    convertAndSend ( String exchange , String routingKey , Object object )方法进行消息发布。其中,该方法中的
    第 1个参数表示发送消息交换器
    ,这个参数值与之前定制交换器名称一致第2个参数表示路由键,因为实现的是Publish/Subscribe 工作模式所以不需要指定第3 个参数发送的消息内容接收 Object 类型

③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决 上述 消息中间件发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般 推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"
    
        @Bean //将该类的返回值对象加入到IOC容器中
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" 
        }
    }
    
  • 再次执行 psubPublisher( ) 方法,该方法 执行成功后,查看 RabbitMQ可视化管理页面 中的 Queues面板信息,具体如下图所示

    在这里插入图片描述

  • 进入 队列详情页面查看消息 , 如下图所示

    在这里插入图片描述

    上图看出,在 消息队列存储有指定发送消息详情其他参数信息,这与程序指定发送信息完全一致

(5) “消息消费者” 接收消息
  • 项目中创建 : service 包,并在该包下创建一个 针对RabbitMQ 消息中间件进行 "消息接收" 和 “处理” 的业务类 :RabbitMQService
    代码如下所示

    RabbitMQService.java

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    @Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类
    
        /**
         * Publish/Subscribe 工作模式接收、处理 "邮件业务"
         *
         * @RabbitListener(queues = "fanout_queue_email") :
         *  使用该注解监听"队列信息"后,一旦服务 "启动且监听到" 指定的队列有 "消息" 存在,该注解对应的方法就会 "接收并消费" 队列中的消息。
         *  
         */
    @RabbitListener(queues = "fanout_queue_email") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作
        public void psubConsumerEmail(Message message) {
            byte[] body = message.getBody();
            //转化为字符串
            String str = new String(body);
            System.out.println("邮件业务接收到的信息为: "+str);
        }
    
       /**
       * Publish/Subscribe 工作模式接收、处理 "短信业务"
       */
    @RabbitListener(queues = "fanout_queue_sms") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作
      public void psubConsumerSms(Message message) {
          byte[] body = message.getBody();
          //转化为字符串
          String str = new String(body);
          System.out.println("短信业务接收到的信息为: "+str);
         }
      }
    

    上面的代码中创建了一个 接收处理 RabbitMQ消息业务处理类 : RabbitMQService,在该类中使用 Spring 框架提供的 @Rabbitistener 注解监听队列名称为 fanout_queue_emailfanout_queue_sms消息监听 的这 两个 队列是 前面 “指定发送并存储” 消息消息队列

    需要说明的是,使用 @RabbitListener 注解 监听队列消息后,一旦服务启动且监听指定的队列消息存在 ( 目前两个队列都存在消息 ),对应注解方法立即接收并消费队列消息 ( 即 注解对应方法会被调用 )。另外,在接收消息的方法中,参数类型 可以与 发送的消息类型保持一致,或者使用 Object 类型Message 类型如果使用与消息类型对应参数接收消息的话,只能够得到 具体的消息体信息如果使用 Object或者 Message 类型参数接收消息的话,还可以获得除了消息体外的消息参数信息 MessageProperties


    此时 成功启动项目后控制台显示的消息消费效果下图所示

    在这里插入图片描述

    从上图可以看出项目启动成功后消息消费者监听” 到 “消息队列” 中存在两条消息,并进行了 各自的消费 ( 执行了 @Rabbitistener 注解 对应的"方法" )。与此同时,通过 RabbitMQ 可视化管理页面Queues 面板查看队列消息情况会发现两个队列中存储的消息 已经 被消费如下图所示

    在这里插入图片描述

    至此,一条完整消息发送、消息中间件存储消息消费Publish/Subscribe 工作模式的业务案例 已经实现


    ps

    使用的是开发中常用@RabbitListener注解 监听指定名称队列消息情况这种方式会在监听指定队列存在消息后立即进行消费处理。除此之外,还可以使用 RabbitTemplate 模板类receiveAndConvert ( String queueName )方法 手动消费指定队列中的消息

1.2 基于 “配置类” 的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) - 常用

  • 基于 配置类方式 主要讲的是使用 Spring Boot 框架提供的 @Configuration 注解 配置类 定制消息发送组件并进行消息发送例子代码如下所示
(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

    application.properties ( 全局配置文件 ):

    #配置RabbitMQ消息中间件的"连接配置"
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    #配置RabbitMQ虚拟主机路径/ , 默认可省略
    spring.rabbitmq.virtual-host=/
    

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-parent</artifactId>
           <version>3.2.5</version>
           <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.myh</groupId>
        <artifactId>chapter_22</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>chapter_22</name>
        <description>chapter_22</description>
        <properties>
           <java.version>17</java.version>
        </properties>
        <dependencies>
           <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-amqp</artifactId>
           </dependency>
           <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-web</artifactId>
           </dependency>
    
           <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-test</artifactId>
              <scope>test</scope>
           </dependency>
           <dependency>
              <groupId>org.springframework.amqp</groupId>
              <artifactId>spring-rabbit-test</artifactId>
              <scope>test</scope>
           </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
    <!--	<build>-->
    <!--		<plugins>-->
    <!--			<plugin>-->
    <!--				<groupId>org.springframework.boot</groupId>-->
    <!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
    <!--			</plugin>-->
    <!--		</plugins>-->
    <!--	</build>-->
    
    </project>
    
(2) 使用 "配置类"的方式 “定制消息发送组件”
  • 使用 "配置类"的方式 “定制消息发送组件(Publish/Subscribe 工作模式) 代码如下所示 :

    RabbitMQConfig.Java ( 配置类 ) :

    import org.springframework.amqp.core.*;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"
    
        /**
         * 使用基于"配置类"的方式定义"消息发送组件"
         */
        //1.定义定义fanout类型的交换器
        @Bean
        public Exchange fanout_exchange() {
            //创建一个名为"fanout_exchange" 的 "交换器"
            return ExchangeBuilder.fanoutExchange("fanout_exchange").build();
        }
    
        //2.定义两个不同名称的消息队列
        @Bean
        public Queue fanout_queue_email() { //创建消息队列
            //创建一个名为 "fanout_queue_email" 的 "消息队列"
            return new Queue("fanout_queue_email");
        }
        @Bean
        public Queue fanout_queue_sms() { //创建消息队列
            //创建一个名为 "fanout_queue_sms" 的 "消息队列"
            return new Queue("fanout_queue_sms");
        }
    
        //3.将两个不同名称的"消息队列"
        @Bean
        public Binding bindingEmail() { //将"fanout_queue_email"消息队列 和 "fanout_exchange"交换器 进行"绑定"
            return BindingBuilder.bind(fanout_queue_email()).to(fanout_exchange()).with("").noargs();
        }
        @Bean
        public Binding bindingSms() { //将"fanout_queue_sms"消息队列 和 "fanout_exchange"交换器 进行"绑定"
            return BindingBuilder.bind(fanout_queue_sms()).to(fanout_exchange()).with("").noargs();
        }
    
    }
    
(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"
        private Integer id;
        private String username;
    
        public Integer getId() {
            return id;
        }
    
        public void setId(Integer id) {
            this.id = id;
        }
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        @Override
        public String toString() {
            return "User{" +
                    "id=" + id +
                    ", username='" + username + '\'' +
                    '}';
        }
    }
    
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架提供的 RabbitTemplate 模板类实现消息发送示例代码如下 ( 在原测试类中加入以下"主要代码") :

    Chapter22ApplicationTests.java :

    import com.myh.chapter_22.domain.User;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter22ApplicationTests {
    
        @Test
        void contextLoads() {
        }
    
        @Autowired
        //引入进行"消息中间件"管理的 RabbitTemplate组件对象
        private RabbitTemplate rabbitTemplate;
    
       /**
        * "消息发送者" 发送信息
        */
        @Test
        public void psubPublisher() {
            User user = new User(); //消息发送者 要发送的"消息内容"
            user.setId(1);
            user.setUsername("石头");
            /*
              使用 RabbitTemplate中的
              convertAndSend(String exchange, String routingKey, Object object)方法完成 "消息发送者" "发送信息" 这一行为
                 String exchange : 表示发送信息的"交换器"
                 String routingKey : 表示路由键,因为此处实现的Publish/Subscrible工作模式,所以"不需要指定"
                 Object object : 表示要"发送的信息内容"
             */
            //执行哪个"交换器" 和 指定给该"交换器"的"消息内容"
            rabbitTemplate.convertAndSend("fanout_exchange","",user); //user 表示要发送的"信息内容"
        }
     }
    

    上述代码中,先使用@Autowired 注解引入了进行 消息中间件管理RabbitTemplate 组件对象,然后使用该模板工具类
    convertAndSend ( String exchange , String routingKey , Object object )方法进行消息发布。其中,该方法中的
    第 1个参数表示发送消息交换器
    ,这个参数值与之前定制交换器名称一致第2个参数表示路由键,因为实现的是Publish/Subscribe 工作模式所以不需要指定第3 个参数发送的消息内容接收 Object 类型

③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决 上述 消息中间件发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般 推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"
    
        @Bean //将该类的返回值对象加入到IOC容器中
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" 
        }
    }
    
(4) “消息消费者” 接收消息
  • 项目中创建 : service 包,并在该包下创建一个 针对RabbitMQ 消息中间件进行 "消息接收" 和 “处理” 的业务类 :RabbitMQService
    代码如下所示

    RabbitMQService.java

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    @Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类
    
        /**
         * Publish/Subscribe 工作模式接收、处理 "邮件业务"
         *
         * @RabbitListener(queues = "fanout_queue_email") :
         *  使用该注解监听"队列信息"后,一旦服务 "启动且监听到" 指定的队列有 "消息" 存在,该注解对应的方法就会 "接收并消费" 队列中的消息。
         *  
         */
    @RabbitListener(queues = "fanout_queue_email") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作
        public void psubConsumerEmail(Message message) {
            byte[] body = message.getBody();
            //转化为字符串
            String str = new String(body);
            System.out.println("邮件业务接收到的信息为: "+str);
        }
    
       /**
       * Publish/Subscribe 工作模式接收、处理 "短信业务"
       */
    @RabbitListener(queues = "fanout_queue_sms") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作
      public void psubConsumerSms(Message message) {
          byte[] body = message.getBody();
          //转化为字符串
          String str = new String(body);
          System.out.println("短信业务接收到的信息为: "+str);
         }
      }
    

    上面的代码中创建了一个 接收处理 RabbitMQ消息业务处理类 : RabbitMQService,在该类中使用 Spring 框架提供的 @Rabbitistener 注解监听队列名称为 fanout_queue_emailfanout_queue_sms消息监听的这 两个 队列是 前面 “指定发送并存储” 消息消息队列

    需要说明的是,使用 @RabbitListener 注解 监听队列消息后,一旦服务启动且监听指定的队列消息存在 ( 目前两个队列都存在消息 ),对应注解方法会**立即接收并消费队列消息 ( 即注解对应方法会被调用 )。另外,在 接收消息的方法中,参数类型 可以与 发送的消息类型保持一致,或者使用 Object 类型Message 类型如果 使用与消息类型对应参数**接收消息的话,只能够得到 具体的消息体信息如果 使用 Object 或者 Message 类型参数 接收消息 的话,还可以获得 除了 消息体外的消息参数信息 MessageProperties


    此时 成功启动项目后控制台显示的消息消费效果下图所示

    在这里插入图片描述

    从上图可以看出项目启动成功后消息消费者监听” 到 “消息队列” 中存在两条消息,并进行了 各自的消费 ( 执行了 @Rabbitistener 注解 对应的"方法" )。与此同时,通过 RabbitMQ 可视化管理页面Queues 面板查看队列消息情况会发现两个队列中存储的消息 已经 被消费如下图所示

    在这里插入图片描述

    至此,一条完整消息发送、消息中间件存储消息消费Publish/Subscribe 工作模式的业务案例 已经实现


    ps

    使用的是开发中常用@RabbitListener注解 监听指定名称队列消息情况这种方式会在监听指定队列存在消息后立即进行消费处理。除此之外,还可以使用 RabbitTemplate 模板类receiveAndConvert ( String queueName )方法 手动消费指定队列中的消息

1.3 基于 “注解” 的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) -常用

  • 基于注解的方式指的是使用 Spring框架@RabbitListener注解 定制消息发送组件 "消息消费者" 接收信息 , 当然还要结合之前的代码,才能完整实现个功能需求
(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

application.properties ( 全局配置文件 ):

#配置RabbitMQ消息中间件的"连接配置"
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#配置RabbitMQ虚拟主机路径/ , 默认可省略
spring.rabbitmq.virtual-host=/

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>3.2.5</version>
       <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.myh</groupId>
    <artifactId>chapter_22</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>chapter_22</name>
    <description>chapter_22</description>
    <properties>
       <java.version>17</java.version>
    </properties>
    <dependencies>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
       </dependency>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
       </dependency>

       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
       </dependency>
       <dependency>
          <groupId>org.springframework.amqp</groupId>
          <artifactId>spring-rabbit-test</artifactId>
          <scope>test</scope>
       </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

<!--	<build>-->
<!--		<plugins>-->
<!--			<plugin>-->
<!--				<groupId>org.springframework.boot</groupId>-->
<!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
<!--			</plugin>-->
<!--		</plugins>-->
<!--	</build>-->

</project>
(2) 使用 “注解” 的方式定制 “消息发送组件” 和 “消息消费者” “接收消息”
  • 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息” , 实例代码如下

    RabbitMQService.java

    import com.myh.chapter_24.domain.User;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    @Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类
    
        /**
         * 使用基于注解的方式实现消息服务 ( 使用"注解"的方式 ①定制消息组件 ②接收信息)
         * 1.1 Publish/Subscribe工作模式接收、处理"邮件业务"
         */
        @RabbitListener(bindings = @QueueBinding(value =
                                   @Queue("fanout_queue_email"),exchange =
                                   @Exchange(value = "fanout_exchange",type = "fanout")))
        public void psubConsumerEmailAno(User user) {
            System.out.println("邮件业务接收到消息: "+user);
        }
    
         /**
       *
       * 1.2 Publish/Subscribe工作模式接收、处理"短信业务"
       */
      @RabbitListener(bindings = @QueueBinding(value =
                                 @Queue("fanout_queue_sms"),exchange =
                                 @Exchange(value = "fanout_exchange",type = "fanout")))
      public void psubConsumerSmsAno(User user) {
          System.out.println("短信业务接收到消息: "+user);
        }
     }
    

    上面代码中,使用 @RabbitListener 注解及 其相关属性定制了两个消息组件消费者,这两个消费者都接收实体类 User 并消费。在 @RabbitListener 注解中,bindings 属性用于创建并绑定交换器消息队列组件,需要注意的是,为了能使两个消息组件的消费者接受实体类User,需要我们在 定制交换器将交换器类型 type 设置为 fanout。另外,bindings 属性@QueueBinding 注解除了有 valuetype 属性外,还有key 属性用于定制路由键 : routingKey (当前发布订阅模式不需要)。

(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"
        private Integer id;
        private String username;
    
        public Integer getId() {
            return id;
        }
    
        public void setId(Integer id) {
            this.id = id;
        }
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        @Override
        public String toString() {
            return "User{" +
                    "id=" + id +
                    ", username='" + username + '\'' +
                    '}';
        }
    }
    
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架 提供的 RabbitTemplate 模板类实现消息发送示例代码如下 :

    Chapter22ApplicationTests.java :

    import com.myh.chapter_22.domain.User;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter22ApplicationTests {
    
        @Test
        void contextLoads() {
        }
    
        @Autowired
        //引入进行"消息中间件"管理的 RabbitTemplate组件对象
        private RabbitTemplate rabbitTemplate;
    
       /**
        * "消息发送者" 发送信息
        */
        @Test
        public void psubPublisher() {
            User user = new User(); //消息发送者 要发送的"消息内容"
            user.setId(1);
            user.setUsername("石头");
            /*
              使用 RabbitTemplate中的
              convertAndSend(String exchange, String routingKey, Object object)方法完成 "消息发送者" "发送信息" 这一行为
                 String exchange : 表示发送信息的"交换器"
                 String routingKey : 表示路由键,因为此处实现的Publish/Subscrible工作模式,所以"不需要指定"
                 Object object : 表示要"发送的信息内容"
             */
            //执行哪个"交换器" 和 指定给该"交换器"的"消息内容"
            rabbitTemplate.convertAndSend("fanout_exchange","",user); //user 表示要发送的"信息内容"
        }
     }
    

    上述代码中,先使用@Autowired 注解引入了进行 消息中间件管理RabbitTemplate 组件对象,然后使用该模板工具类
    convertAndSend ( String exchange , String routingKey , Object object )方法进行消息发布。其中,该方法中的
    第 1个参数表示发送消息交换器
    ,这个参数值与之前定制交换器名称一致第2个参数表示路由键,因为实现的是Publish/Subscribe 工作模式所以不需要指定第3 个参数发送的消息内容接收 Object 类型

③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决上述 消息中间件 发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"
    
        @Bean //将该类的返回值对象加入到IOC容器中
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" 
        }
    }
    
(4) 控制台查看 “消费者” 消费信息情况
  • 此时 成功启动项目后执行psubPublisher( )方法控制台显示的消息消费效果下图所示

    在这里插入图片描述

  • 至此,在 Spring Boot 中完成了 使用基于 API基于配置类基于注解3 种方式来实现 Publish/Subscribe 工作模式整合讲解 在这 3 种实现消息服务的方式中,

  • 上面讲述过三种配置方式区别
    基于 API 的方式
    相对简单、直观
    ,但容易与业务代码产生 耦合

    基于 配置类方式相对隔离 容易统一管理、符合Spring Boot 框架思想;

    基于 注解的方式 清晰明了方便各自管理,但是也容易与业务代码产生耦合。在实际开发中,使用 基于配置类的方式 和 基于注解的方式定制组件 实现消息服务较为常见。使用 基于 API的方式偶尔使用,具体还需要根据实际情况进行选择

二、Spring Boot 整合 整合实现 : Routing ( 路由模式 ) 工作模式

2.1 基于 “注解” 的方式 ( 实现 Routing "路由模式"工作模式 )

(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

    application.properties ( 全局配置文件 ):

    #配置RabbitMQ消息中间件的"连接配置"
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    #配置RabbitMQ虚拟主机路径/ , 默认可省略
    spring.rabbitmq.virtual-host=/
    

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-parent</artifactId>
           <version>3.2.5</version>
           <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.myh</groupId>
        <artifactId>chapter_22</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>chapter_22</name>
        <description>chapter_22</description>
        <properties>
           <java.version>17</java.version>
        </properties>
        <dependencies>
           <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-amqp</artifactId>
           </dependency>
           <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-web</artifactId>
           </dependency>
    
           <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-test</artifactId>
              <scope>test</scope>
           </dependency>
           <dependency>
              <groupId>org.springframework.amqp</groupId>
              <artifactId>spring-rabbit-test</artifactId>
              <scope>test</scope>
           </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
    <!--	<build>-->
    <!--		<plugins>-->
    <!--			<plugin>-->
    <!--				<groupId>org.springframework.boot</groupId>-->
    <!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
    <!--			</plugin>-->
    <!--		</plugins>-->
    <!--	</build>-->
    
    </project>
    
(2) 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息”
  • 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息” , 实例代码如下

    RabbitMQService.java

    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    @Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类
    
        //使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息"
    
        /**
         * 2.1路由模式消息接收、处理error级别日志信息
         * (使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息")
         */
        @RabbitListener(bindings = @QueueBinding(value =
                                   @Queue("routing_queue_error"), exchange =
                                   @Exchange(value = "routing_exchange", type = "direct"),
                       key = "error_routing_key"))
        public void routingConsumerError(String message) {
            System.out.println("接收到error级别日志信息: "+message);
        }
    
    
        /**
         * 2.2路由模式消息接收、处理info、error、warning级别日志信息
         * (使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息")
         */
        @RabbitListener(bindings = @QueueBinding(value =
                                   @Queue("routing_queue_all"), exchange =
                                   @Exchange(value = "routing_exchange", type = "direct"),
                        key = {"error_routing_key","info_routing_key","warning_routing_key"}))
        public void routingConsumerAll(String message) {
            System.out.println("接收到info、error、warning级别日志信息: "+message);
        }
    
    }
    
    

    上述代码中,在消息业务处理类 : RabbitMQService 中编写了两个用来 处理 Routing 路由模式消息消费者方法,在两个消费者方法上使用 @RabbitListener 注解及其相关属性定制路由模式消息服务组件

    Routing路由模式下的交换器类型 : type属性direct,而且还必须 指定 key 属性
    ( 每个消息队列可以
    映射多个路由键
    但在 Spring Boot 1.X版本中@QueueBinding 中的 key 属性只接收 Spring 类型不接收 Spring [ ] 类型 )。

(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"
        private Integer id;
        private String username;
    
        public Integer getId() {
            return id;
        }
    
        public void setId(Integer id) {
            this.id = id;
        }
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        @Override
        public String toString() {
            return "User{" +
                    "id=" + id +
                    ", username='" + username + '\'' +
                    '}';
        }
    }
    
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架 提供的 RabbitTemplate 模板类实现消息发送示例代码如下 :

    Chapter24ApplicationTests.java : ( 项目测试类 )

    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter24ApplicationTests {
    
        @Test
        void contextLoads() {
        }
    
        @Autowired
        //引入进行"消息中间件"管理的 RabbitTemplate组件对象
        private RabbitTemplate rabbitTemplate;
    
        //Routing工作模式消息发送端 ( "消息发布者" "发送信息" )
        @Test
        public void routingPublisher() {
            //发送信息
            //给指定的"交换器" "发送信息"
            rabbitTemplate.convertAndSend("routing_exchange","error_routing_key","routing send 错误信息...");
        }
    
    }
    

    上述代码中,通过调用 RabbitTemplateconverAndSend ( String exchange , String routingKey , Object object )方法来 "发送消息"。在 Routing 工作模式下发送消息时,必须指定路由键参数该参数要与消息队列映射路由键保持一致,否则发送的消息将会丢失

    本次示例中使用的是error_routing_key 路由键,根据定制规则,编写的两个消息消费者方法应该都可以正常接收并消费发送端发送消息

③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决上述 消息中间件 发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般 推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"
    
        @Bean //将该类的返回值对象加入到IOC容器中
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" 
        }
    }
    
(4) 控制台 查看 “消费者” 消费信息情况
  • 此时 成功启动项目后执行routingPublisher( )方法控制台显示的消息消费效果下图所示

    在这里插入图片描述


    此时,修改 routingPublisher( )方法中的消息发送参数调整发送info 级别日志信息( 注意同修改 info_routing_key 路由键 ),再次启动 routingPublisher( )方法控制台效果如下图所示

    在这里插入图片描述

    上图所示,控制台打印出使用 info_routing_key 路由键发送 info 级别日志信息,说明只有配置映射 info_routing_key 路由键消息消费者 的方法 消费了消息

三、Spring Boot 整合 整合实现 : Topics ( 通配符模式 ) 工作模式

3.1 基于 “注解” 的方式 ( 实现 Topics"通配符模式"工作模式 )

(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

application.properties ( 全局配置文件 ):

#配置RabbitMQ消息中间件的"连接配置"
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#配置RabbitMQ虚拟主机路径/ , 默认可省略
spring.rabbitmq.virtual-host=/

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>3.2.5</version>
       <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.myh</groupId>
    <artifactId>chapter_22</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>chapter_22</name>
    <description>chapter_22</description>
    <properties>
       <java.version>17</java.version>
    </properties>
    <dependencies>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
       </dependency>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
       </dependency>

       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
       </dependency>
       <dependency>
          <groupId>org.springframework.amqp</groupId>
          <artifactId>spring-rabbit-test</artifactId>
          <scope>test</scope>
       </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

<!--	<build>-->
<!--		<plugins>-->
<!--			<plugin>-->
<!--				<groupId>org.springframework.boot</groupId>-->
<!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
<!--			</plugin>-->
<!--		</plugins>-->
<!--	</build>-->

</project>
(2) 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息”
  • 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息” , 实例代码如下

    RabbitMQService.java

    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    @Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类
    
        //使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息"
    
        /**
         * 3.1使用"通配符模式"消息接收、进行"邮件业务"订阅处理
         *   ---( 定制"消息组件" 和 定制"消息消费者"且 "接收信息" )
         */
        @RabbitListener(bindings = @QueueBinding(value =
                                   @Queue("topic_queue_email"),exchange =
                                   @Exchange(value = "topic_exchange",type = "topic"),
                                   key = "info.#.email.#"))
        public void topicConsumerEmail(String message) {
            System.out.println("接收到邮件订阅需求处理信息: "+message);
        }
    
          /**
       * 3.2使用"通配符模式"消息接收、进行"短信业务"订阅处理
       *   ---( 定制"消息组件" 和 定制"消息消费者"且 "接收信息" )
       */
      @RabbitListener(bindings = @QueueBinding(value =
                                 @Queue("topic_queue_sms"),exchange =
                                 @Exchange(value = "topic_exchange",type = "topic"),
                                 key = "info.#.sms.#"))
      public void topicConsumerSms(String message) {
          System.out.println("接收到短信订阅需求处理信息: "+message);
        }
     }
    

    上述代码中,在消息业务处理类RabbitMQService 中编写了两个处理 Topics 通配符模式消息消费者方法 ( 这两个方法即创建了"消息组件",又创建了"消息消费者接收且消费信息 "),在两个消费者方法上使用 @RabbitListener 注解及其相关属性定制了通配符模式下的 消息组件

    从上述示例可以看出Topics 通配符模式下注解使用方式与 Routing 路由模式使用基本一样,主要是将交换器类型 type 修改为了 topic,还分别使用通配符的样式指定路由键 routingKey

(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"
        private Integer id;
        private String username;
    
        public Integer getId() {
            return id;
        }
    
        public void setId(Integer id) {
            this.id = id;
        }
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        @Override
        public String toString() {
            return "User{" +
                    "id=" + id +
                    ", username='" + username + '\'' +
                    '}';
        }
    }
    

    针对不同的用户订阅需求,使用 RabbitTemplate 模板工具类的 convertAndSend ( Stringexchange,String routingKey,Object object )方法发送不同的消息发送消息时,必须 根据具体需求已经定制路由键通配符 设置 具体的路由键

② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架 提供的 RabbitTemplate 模板类实现消息发送示例代码如下 :

    Chapter24ApplicationTests.java : ( 项目测试类 )

    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter24ApplicationTests {
    
        @Test
        void contextLoads() {
        }
    
        @Autowired
        //引入进行"消息中间件"管理的 RabbitTemplate组件对象
        private RabbitTemplate rabbitTemplate;
    
        //3.Topcis工作模式下的"消息发布者" "发送消息"
        @Test
        public void topicPublisher() {
               //(1)只发送"邮件"订阅用户信息 ---(给指定的"交换器" "发送信息")
            rabbitTemplate.convertAndSend("topic_exchange","info.email","topics send email message...");
    
              //(2)只发送"短信"订阅用户信息
            rabbitTemplate.convertAndSend("topic_exchange","info.sms","topics send  sms message...");
    
              //(3)"邮件"订阅用户 和 "短信"订阅用户 都发送消息
            rabbitTemplate.convertAndSend("topic_exchange","info.email.sms","topics send email and sms  message...");
        }
    
      }
    
③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决上述 消息中间件 发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般 推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"
    
        @Bean //将该类的返回值对象加入到IOC容器中
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" 
        }
    }
    
(4) 控制台 查看 “消费者” 消费信息情况
  • 执行topicPublisher( ) 方法 中的 步骤(1)邮件订阅用户的消息发送控制台效果下图所示 :

    在这里插入图片描述


    在这里插入图片描述

  • topicPublisher( )方法步骤(1) 进行 注释打开步骤(2) 中只进行 短信订阅用户消息发送方法,并 再次启动该测试方法控制台效果如下图所示 : 在这里插入图片描述


在这里插入图片描述

  • 为了查看 topicPublisher( )方法步骤(3)的效果,我们需要把 步骤(2) 的代码注释步骤(3) 的代码主要进行 邮件短信订阅用户消息发送方法项目重新启动 后的效果如下图所示 :

    在这里插入图片描述


在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1654417.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【智能楼宇秘籍】一网关多协议无缝对接BACnet+OPC+MQTT

在繁华的都市中心&#xff0c;一座崭新的大型商业综合体拔地而起&#xff0c;集购物、餐饮、娱乐、办公于一体&#xff0c;是现代城市生活的缩影。然而&#xff0c;这座综合体的幕后英雄——一套高度集成的楼宇自动化系统&#xff0c;正是依靠多功能协议网关&#xff0c;实现了…

VALSE 2024 Workshop报告分享┆ 大规模自动驾驶仿真系统研究

视觉与学习青年学者研讨会&#xff08;VALSE&#xff09;旨在为从事计算机视觉、图像处理、模式识别与机器学习研究的中国青年学者提供一个广泛而深入的学术交流平台。该平台旨在促进国内青年学者的思想交流和学术合作&#xff0c;以期在相关领域做出显著的学术贡献&#xff0c…

精通GDBus:Linux IPC的现代C接口

目录标题 1. GDBus介绍2. GDBus的优点3. 安装GDBus4. 使用GDBus连接到D-Bus总线实现D-Bus服务调用D-Bus方法发送和接收信号 5. 总结 在Linux环境下&#xff0c;不同的程序需要通过某种方式进行通信和协同工作。GDBus是GLib库的一部分&#xff0c;提供了一个基于GObject系统的、…

英语新概念2-回译法-lesson13

The Greenwood Boys 绿林少年是一组流行歌手们。现在他们正在参观城市里的所有公园&#xff0c;他们明天就要到这。他们将坐火车到并且大多数小镇上的年轻人将要欢迎他们&#xff0c;明天晚上他们将要在工人俱乐部唱歌。绿林少年将在这待五天&#xff0c;在这期间&#xff0c;…

Redis集群分片

什么是集群 集群是由多个复制集组成的,能提供在多个redis节点间共享数据的程序集 简而言之就是将原来的单master主机拆分为多个master主机,将整个数据集分配到各主机上 集群的作用 集群中可以存在多个master,而每个master可以挂载多个slave自带哨兵的故障转移机制,不需要再去…

华为车BU迈入新阶段,新任CEO对智能车的3个预判

作者 |张马也 编辑 |德新 4月24日&#xff0c;北京车展前夕&#xff0c;华为召开了新一年的智能汽车解决方案新品发布会。 这次发布会&#xff0c;也是华为智能汽车解决方案BU&#xff08;简称「车BU」&#xff09;CEO 靳玉志的公开首秀。 一开场&#xff0c;靳玉志即抛出了…

刷题训练之模拟

> 作者&#xff1a;დ旧言~ > 座右铭&#xff1a;松树千年终是朽&#xff0c;槿花一日自为荣。 > 目标&#xff1a;熟练掌握模拟算法。 > 毒鸡汤&#xff1a;学习&#xff0c;学习&#xff0c;再学习 ! 学&#xff0c;然后知不足。 > 专栏选自&#xff1a;刷题训…

ICode国际青少年编程竞赛- Python-2级训练场-列表入门

ICode国际青少年编程竞赛- Python-2级训练场-列表入门 1、 Dev.step(3)2、 Flyer.step(1) Dev.step(-2)3、 Flyer.step(1) Spaceship.step(7)4、 Flyer.step(5) Dev.turnRight() Dev.step(5) Dev.turnLeft() Dev.step(3) Dev.turnLeft() Dev.step(7) Dev.turnLeft() Dev.…

TMS320F2812DSP最小系统原理图及PCB文件

目录 1、原理图 2、PCB 资料下载地址&#xff1a;TMS320F2812DSP最小系统原理图及PCB文件 1、原理图 2、PCB

Unity数据持久化之Json

Json概述 Json是什么? 全称:JavaScript对象简谱(JavaScript Object Notation) Json是国际通用的一种轻量级的数据交换格式 主要在网络通讯中用于传输数据,或本地数据存储和读取 易于人阅读和编写,同时也易于机器解析和生成,并有效地提升网络传输效率 我们一般使用Json文件来…

2024蓝桥杯CTF writeUP--Theorem

密码方向的签到题&#xff0c;根据题目已知n、e和c&#xff0c;并且p和q是相邻的素数&#xff0c;可以考虑分解。 通过prevprime函数分解n&#xff0c;然后 RSA解密即可&#xff1a; from Crypto.Util.number import long_to_bytes import gmpy2 import libnumfrom sympy im…

【linux】进程概念|task_struct|getpid|getppid

目录 ​编辑 1.进程的概念 进程的基本概念 进程与程序的主要区别 进程的特征 进程的状态 描述进程—PCB task_struct中的内容 查看进程 1.创建一个进程&#xff0c;运行以下代码 通过系统调用获取进程标示符 getpid()以及getppid() 1.进程的概念 进程的基本概念…

最常用的AI工具

在日常工作生活中&#xff0c;我试用了几十种AI人工智能工具&#xff0c;下面我来推荐下我最常使用&#xff0c;也是最方便快捷的AI工具。 1百度文心一言 文心一言是一个综合性的大语言模型&#xff0c;整合了很多优秀的提示词&#xff0c;尤其是文心4.0大模型&#xff0c;在中…

《手把手教你怎么上手做一个小程序》

准备工作&#xff1a; 硬件准备&#xff1a; 装有微信的手机一台。 账号注册&#xff1a; 进入https://mp.weixin.qq.com/cgi-bin/registermidpage?actionindex&langzh_CN&token注册一个微信小程序账号。 然后输入邮箱注册账号。一个邮箱只能注册一个微信公众平台…

《Python编程从入门到实践》day22

# 昨日知识点回顾 方法重构、驾驶飞船左右移动、全屏显示 飞船不移动解决&#xff0c;问题出在移动变量x更新 # Ship.pysnipdef update(self):"""根据移动标志调整飞船的位置"""# 更新飞船而不是rect对象的x值# 如果飞船右移的标志和飞船外接…

重定向_缓冲区

目录 重定向 文件属性操作 浅谈重定向​编辑 深入重定向 dup2 缓冲区 缓冲区的理论理解 代码分析 重定向 文件属性操作 #include <sys/types.h> #include <sys/stat.h> #include <unistd.h> int stat(const char *path, struct stat *buf); int fstat(i…

如何购买阿里云99计划的ECS云服务器?99元购买阿里云2核2G3M服务器教程

阿里云助力中小企业和开发者无忧上云的“99计划”中有两款性价比超高的ECS云服务器&#xff0c;2026年3月31日活动结束前新购和续费价格一样。 其中个人和企业新老用户同享的2核2G3M服务器仅需99元/年&#xff08;续费同价&#xff09;&#xff0c;企业新老用户同学的2核4G5M仅…

鸿蒙OpenHarmony开发板:【子系统配置规则】

子系统 子系统配置规则 通过build仓下的subsystem_config.json可以查看所有子系统的配置规则。 {"arkui": {"path": "foundation/arkui", # 路径"name": "arkui" # 子系统名},"ai": {&q…

单片机-点亮第一盏灯

原理图 需求&#xff1a;点亮或是熄灭LED 通过控制 P5.3引脚输出高电平时&#xff0c;LED灯就点亮&#xff0c;输出低电平时LED灯就熄灭 1.项目创建 新建项目 配置开发板信息 当前位STC芯片的开发板&#xff0c;选择STC MCU Database 搜素具体芯片型号&#xff0c;进行配置…

C# Web控件与数据感应之 TreeView 类

目录 关于 TreeView 一些区别 准备数据源 范例运行环境 一些实用方法 获取数据进行呈现 ​根据ID设置节点 获取所有结点的索引 小结 关于 TreeView 数据感应也即数据捆绑&#xff0c;是一种动态的&#xff0c;Web控件与数据源之间的交互&#xff0c;本文将继续介绍与…