RabbitMQ发布订阅模式Publish/Subscribe详解

news2024/9/20 23:03:19

订阅模式Publish/Subscribe

  • 基于API的方式
      • 1.使用AmqpAdmin定制消息发送组件
      • 2.消息发送者发送消息
      • 3.消息消费者接收消息
  • 基于配置类的方式
  • 基于注解的方式
    • 总结

SpringBoot整合RabbitMQ中间件实现消息服务,主要围绕3个部分的工作进行展开:定制中间件、消息发送者发送消息、消息消费者接收消息。其中,定制中间件是比较麻烦的工作,且必须预先定制。

下面以用户注册成功后,同时发送邮件通知和短信通知这一场景为例, 分别使用基于API、基于配置类和基于注解这3种方式,来实现Publish/Subscribe工作模式的整合。

基于API的方式

基于API的方式,是指使用Spring框架提供的API管理类AmqpAdmin定制消息发送组件,并进行消息的发送。这种定制消息发送组件的方式,与在RabbitMQ可视化界面上通过对应面板进行组件操作的实现基本一样,都是通过管理员的身份,预先手动声明交换器、队列、路由键等,然后组装消息队列供应用程序调用,从而实现消息服务。下面我们就对这种基于API的方式进行讲解和演示。

1.使用AmqpAdmin定制消息发送组件

我们先打开chapter08项目的测试类Chapter08ApplicationTests,在该测试类中先引入AmqpAdmin管理类定制Publish/Subscribe工作模式所需的消息组件。

package com.ytx;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class Chapter08ApplicationTests {
    @Autowired
    private AmqpAdmin amqpAdmin;

    @Test
    void contextLoads() {
    }

    /** 使用AmqpAdmin管理员API定制消息组件 */
    @Test
    public void amqpAdmin() {
        // 1.定义fanout类型的交换器
        amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
        // 2.定义两个默认持久化队列,分别处理email和sms
        amqpAdmin.declareQueue(new Queue("fanout_queue_email"));
        amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));
        // 3.将队列分别与交换器进行绑定
        amqpAdmin.declareBinding(new Binding("fanout_queue_email", Binding.DestinationType.QUEUE, "fanout_exchange", "", null));
        amqpAdmin.declareBinding(new Binding("fanout_queue_sms", Binding.DestinationType.QUEUE, "fanout_exchange", "", null));
    }
}

执行上述单元测试方法amqpAdmin(),验证RabbitMQ消息组件的定制效果。单元测试方法执行成功后,通过RabbitMQ可视化管理页面的Exchanges面板查看效果。

在这里插入图片描述

从上图可以看出,在RabbitMQ可视化管理页面的Exchanges面板中,新出现了一个名称为fanout_exchange的交换器(其他7个交换器是RabbitMQ自带的),且其类型是我们设置的fanout类型。我们可以单击fanout_exchange交换器进入查看。

在这里插入图片描述
从上图可以看出,在fanout_exchange交换器详情页面中展示有该交换器的具体信息,还有与之绑定的两个消息队列fanout_queue_email和fanout_queue_sms,并且与程序中设置的绑定规则一致。切换到Queues面板页面,查看定制生成的消息队列信息。
在这里插入图片描述

从上图可以看出,在Queues队列面板页面中,展示有定制的消息队列信息,这与程序中定制的消息队列一致,我们可以单击消息队列名称查看每个队列的详情。

通过上述操作可以发现,在管理页面中提供了消息组件交换器、队列的定制功能。在程序中使用Spring框架提供的管理员API组件AmqpAdmin,定制消息组件和在管理页面上手动定制消息组件的本质是一样的。

2.消息发送者发送消息

完成消息组件的定制工作后,创建消息发送者发送消息到消息队列中。发送消息时,我们可以借助一个实体类传递消息,需要预先创建一个实体类对象。

首先,在chapter08项目中创建名为com.cy.domain的包,并在该包下创建一个实体类User。

package com.ytx.domain;

/** 发布消息的实体类可以通过实现Serializable序列化接口进行发布 */
public class User {
    private Integer id;
    private String username;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", username='" + username + '\'' +
                '}';
    }
}

其次,我们在项目测试类Chapter08ApplicationTests中,使用Spring框架提供的RabbitTemplate模板类实现消息发送。

@Autowired
private RabbitTemplate rabbitTemplate;

/** 1.Publish/Subscribe工作模式消息发送端 */
@Test
public void subPublisher() {
    User user = new User();
    user.setId(1);
    user.setUsername("小明");
    rabbitTemplate.convertAndSend("fanout_exchange", "", user);
}

上述代码中,我们先使用@Autowired注解,引入消息中间件管理的RabbitTemplate组件对象,然后使用该模板工具类的convertAndSend(String exchange, String routingKey, Object object)方法进行消息发布。此方法中的第1个参数表示发送消息的交换器,这个参数值要与之前定制的交换器名称一致;第2个参数表示路由键,因为实现的是Publish/Subscribe工作模式,所以不需要指定;第3个参数是发送的消息内容,接收Object类型。

然后,执行上述消息发送的测试方法subPublisher(),控制台执行效果见下图所示。
在这里插入图片描述

从上图可以看出,发送实体类对象消息时程序发生异常,从异常信息“SimpleMessageConverter only supports String, byte[] and Serializable payloads”可以看出,消息发送过程中默认使用了SimpleMessageConverter转换器进行消息转换存储,该转换器只支持字符串或实体类对象序列化后的消息。而测试类中发送的是User实体类对象消息,所以发生异常。

如果要解决上述消息中间件发送实体类消息出现的异常,我们通常可以采用两种解决方案:第一种是执行JDK自带的Serializable序列化接口;第二种是定制其他类型的消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后的可视化效果较差,转换后的消息无法辨识,所以一般使用第二种方式。

接着我们在chapter08项目中创建名为com.ytx.config的包,并在该包下创建一个RabbitMQ消息配置类RabbitMQConfig。

package com.ytx.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/** RabbitMQ消息配置类 */
@Configuration
public class RabbitMQConfig {
    /** 定制JSON格式的消息转换器 */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

代码中创建一个RabbitMQ消息配置类RabbitMQConfig,并在该配置类中通过@Bean注解自定义一个Jackson2JsonMessageConverter类型的消息转换器组件,该组件的返回值必须为MessageConverter类型。

再次执行subPublisher()方法,该方法执行成功后,查看RabbitMQ可视化管理页面Queues面板信息。
在这里插入图片描述
从上图可以看出,消息发送完成后,Publish/Subscribe工作模式下绑定的两个消息队列中各自拥有一条待接收的消息, 由于目前尚未提供消息。
在这里插入图片描述

3.消息消费者接收消息

在chapter08项目中创建名为com.ytx.service的包,并在该包下创建一个针对RabbitMQ消息中间件进行消息接收和处理的业务类RabbitMQService。

package com.ytx.chapter08.service;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/** RabbitMQ消息接收处理的业务类 */
@Service
public class RabbitMQService {
    /** Publish/Subscribe工作模式接收,处理邮件业务 */
    @RabbitListener(queues = "fanout_queue_email")
    public void subConsumerEmail(Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        System.out.println("邮件业务接收到消息:" + msg);
    }

    /** Publish/Subscribe工作模式接收,处理短信业务 */
    @RabbitListener(queues = "fanout_queue_sms")
    public void subConsumerSms(Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        System.out.println("短信业务接收到消息:" + msg);
    }
}

上述代码中,创建了一个接收处理RabbitMQ消息的业务处理类RabbitMQService,在该类中使用Spring框架提供的@RabbitListener注解,我们可以监听队列名称为fanout_queue_email和fanout_queue_sms的消息,监听的这两个队列是前面指定发送并存储消息的消息队列。

需要说明的是,使用@RabbitListener注解监听队列消息后,一旦服务启动且监听到指定的队列中有消息存在(目前两个队列中各有一条相同的消息),对应注解的方法就会立即接收并消费队列中的消息。另外,在接收消息的方法中,参数类型可以与发送的消息类型保持一致,或者使用Object类型和Message类型。如果使用与消息类型对应的参数接收消息的话,只能够得到具体的消息体信息;如果使用Object或者Message类型参数接收消息的话,还可以获得除了消息体外的消息参数信息MessageProperties。

启动chapter08项目,控制台显示的消息消费效果如下图所示。
在这里插入图片描述
从上图可以看出,项目启动成功后,消息消费者监听到消息队列中存在的两条消息,并进行了各自的消费。与此同时,通过RabbitMQ可视化管理页面的Queues面板查看队列消息情况,会发现两个队列中存储的消息已经被消费。至此,一条完整的消息发送、 消息中间件存储、消息消费的Publish/Subscribe(发布订阅模式)工作模式的业务案例已经实现。

注意,如果没有引入Spring Web模块的依赖,启动chapter08项目时,消息消费者接收消息会报以下错误。

Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.amqp.support.converter.MessageConverter]: Factory method 'messageConverter' threw exception; nested exception is java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/ObjectMapper
  at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185) ~[spring-beans-5.3.25.jar:5.3.25]
  at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:653) ~[spring-beans-5.3.25.jar:5.3.25]
  ... 18 common frames omitted

小提示:

上述代码中,使用的是开发中常用的@RabbitListener注解,来监听指定名称队列的消息情况,这种方式会在监听到指定队列存在消息后立即进行消费处理。除此之外,我们还可以使用RabbitTemplate模板类的receiveAndConvert(String queueName)方法手动消费指定队列中的消息。

基于配置类的方式

基于配置类的方式,主要讲的是使用SpringBoot框架提供的@Configuration注解,配置定制消息发送组件,并进行消息发送。下面我们来对这种基于配置类的方式进行讲解和演示。

打开RabbitMQ消息配置类RabbitMQConfig,在该配置类中使用基于配置类的方式定制消息发送相关组件。

package com.ytx.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/** RabbitMQ消息配置类 */
@Configuration
public class RabbitMQConfig {
    /** 定制JSON格式的消息转换器 */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /** 使用基于配置类的方式定制消息中间件 */
    // 1.定义fanout类型的交换器
    @Bean
    public Exchange fanoutExchange() {
        return ExchangeBuilder.fanoutExchange("fanout_exchange").build();
    }
    // 2.定义两个不同名称的消息队列
    @Bean
    public Queue fanoutQueueEmail() {
        return new Queue("fanout_queue_email");
    }
    @Bean
    public Queue fanoutQueueSms() {
        return new Queue("fanout_queue_sms");
    }
    // 3.将两个不同名称的消息队列与交换器进行绑定
    @Bean
    public Binding bindingEmail() {
        return BindingBuilder.bind(fanoutQueueEmail()).to(fanoutExchange()).with("").noargs();
    }
    @Bean
    public Binding bindingSms() {
        return BindingBuilder.bind(fanoutQueueSms()).to(fanoutExchange()).with("").noargs();
    }
}

上述代码中,使用@Bean注解定制了3种类型的Bean组件,这3种组件分别表示交换器、消息队列和消息队列与交换器的绑定。这种基于配置类方式定制的消息组件,其实现和基于API方式定制的消息组件完全一样,只不过是实现方式不同而已。

按照消息服务整合实现步骤,完成消息组件的定制后,还需要编写消息发送者和消息消费者,而在基于API的方式中已经实现了消息发送者和消息消费者,并且基于配置类方式定制的消息组件名称,和之前测试用的消息发送和消息消费组件名称都是一致的,所以这里我们可以直接重复使用。

重新运行消息发送者测试方法subPublisher(),消息消费者可以自动监听并消费消息队列中存在的消息,效果与基于API的方式测试效果一样。

基于注解的方式

基于注解的方式指的是使用Spring框架的@RabbitListener注解定制消息发送组件并发送消息。

在消息接收和处理的业务类RabbitMQService中,将针对邮件业务和短信业务处理的消息消费者方法进行注释,使用@RabbitListener注解及其相关属性定制消息发送组件。

package com.ytx.chapter08.service;
import com.ytx.chapter08.domain.User;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/** RabbitMQ消息接收处理的业务类 */
@Service
public class RabbitMQService {
    /** Publish/Subscribe工作模式接收,处理邮件业务 */
    /*
    @RabbitListener(queues = "fanout_queue_email")
    public void subConsumerEmail(Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        System.out.println("邮件业务接收到消息:" + msg);
    }
    */

    /** Publish/Subscribe工作模式接收,处理短信业务 */
    /*
    @RabbitListener(queues = "fanout_queue_sms")
    public void subConsumerSms(Message message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        System.out.println("短信业务接收到消息:" + msg);
    }
    */

    /** 使用基于注解的方式实现消息服务 */
    // 1.1 Publish/Subscribe工作模式接收,处理邮件业务
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("fanout_queue_email"),
            exchange = @Exchange(value = "fanout_exchange", type = "fanout")))
    public void subConsumerEmailAno(User user) {
        System.out.println("邮件业务接收到消息:" + user);
    }
    // 1.2 Publish/Subscribe工作模式接收,处理短信业务
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("fanout_queue_sms"),
            exchange = @Exchange(value = "fanout_exchange", type = "fanout")))
    public void subConsumerSmsAno(User user) {
        System.out.println("短信业务接收到消息:" + user);
    }
}

上述代码中,使用@RabbitListener注解及其相关属性定制了两个消息组件的消费者,这两个消费者都接收实体类User并消费。在@RabbitListener注解中,bindings属性用于创建并绑定交换器和消息队列组件,需要注意的是,为了能使两个消息组件的消费者接收到实体类User,需要我们在定制交换器时将交换器类型type设置为fanout。另外,bindings属性的@QueueBinding注解除了有value、exchange属性外,还有key属性用于定制路由键routingKey(当前发布订阅模式不需要)。

重启测试方法subPublisher(),消息消费者可以自动监听并消费消息队列中存在的消息,效果与基于API的方式测试效果一样。

至此,我们就在SpringBoot中完成了基于API、基于配置类和基于注解这3种方式,来实现Publish/Subscribe工作模式的整合讲解。在这3种实现消息服务的方式中,基于API的方式相对简单、直观,但容易与业务代码产生耦合;基于配置类的方式相对隔离、容易统一管理、符合Spring Boot框架思想;基于注解的方式清晰明了、方便各自管理,但是也容易与业务代码产生耦合。

在实际开发中,使用基于配置类的方式和基于注解的方式较为常见,基于API的方式则偶尔使用,当然大家要根据实际情况进行具体选择。

总结

今天介绍了基于API方式、配置类方式和注解的3种消息队列,并展示了实现发布订阅Publish/Subscribe模式的整合及代码实现,基于注解方式的实现需要重点掌握。有关RabbitMQ的其他内容,袁老后续更新

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2059014.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网络硬盘录像机NVR程序源码NVR全套运用方案

在当今社会,随着科技的飞速发展和人们对安全需求的日益增长,安防监控系统已成为保障公共安全、维护社会稳定的重要手段。其中,网络视频录像机(NVR)作为安防监控系统的核心设备,其智能化升级运用方案对于提高…

OpenLayers3, 加载鹰眼控件

文章目录 一、前言二、代码实现三、总结 一、前言 鹰眼图即地图显示范围的缩略图,可显示当前地图窗口在整幅地图中的位置。通过拖动鹰眼图对话框中的矩形框可改变当前地图的显示区域范围,是地图浏览中常用的功能之一。 本案例使用OpenLayers3框架&…

数字虚拟人原理

重磅推荐专栏: 《大模型AIGC》 《课程大纲》 《知识星球》 本专栏致力于探索和讨论当今最前沿的技术趋势和应用领域,包括但不限于ChatGPT和Stable Diffusion等。我们将深入研究大型模型的开发和应用,以及与之相关的人工智能生成内容(AIGC)技术。通过深入的技术解析和实践经…

振兴杯全国青年职业技能大赛信息通信网络线务员解决方案

一、引言 随着数字化时代的到来,信息技术的飞速发展正深刻改变着人们的生活与工作方式。智能楼宇作为这一时代的产物,以其提升生活和工作效率、改善居住和办公环境的特点,受到了广泛关注。智能安防作为智能楼宇的重要组成部分,其…

解决flutter运行项目后报错 java.util.zip.ZipException: zip END header not found

全新项目运行后直接报错 java.util.zip.ZipException: zip END header not found网上找了其他案例试了没有效果 根据官网说法,针对不同机型处理 我的是windows,然后按照图片说明的目录删除了文件夹之后重新运行项目了 注意.gradle文件夹是隐藏的&…

分析 Runtime.getRuntime() 执行阻塞原因

1、起因 线上系统通过 git 命令执行的方式获取远程仓库分支,一直运行正常的接口,突然出现超时,接口无法响应,分析验证发现只有个别仓库获取分支会出现这种情况,其他都还是可以正常获取到分支结果信息。 2、分析异常原…

音频分割怎么弄?手把手教会你实用的音频分割技巧

在巴黎的浪漫街头,打卡地标的方式已经达到了next level!而今,想让这份记忆更加生动,不再只有照片与视频,更有音频的加入~ 想象一下,倘若用音频分割免费版工具来为这份旅行日志添上独一无二的音符&#xff…

Coze开发工作流

工作流可以理解是工作流程,就像流程审批的节点,它允许用户处理逻辑复杂且有较高稳定性要求的任务流。通过使用扣子提供的大量灵活可组合的节点,比如大语言模型 LLM、自定义代码、判断逻辑等,用户可以快速搭建工作流,无…

儿童乘坐火车高铁,忘带户口本了该怎么办?儿童临时身份证怎么办理?12306可申请儿童临时身份证明

儿童乘火车,没有携带户口本怎么办? 儿童乘坐火车/高铁时,家长需要携带儿童的有效身份证件才能乘车,如果到车站后发现忘记带户口本了,无法乘车怎么办?此时不要慌张,铁路部门已经为我们解决了这个…

界面控件DevExpress WinForms中文教程:Data Grid(数据网格)简介(二)

DevExpress WinForms Data Grid是一个高性能的UI组件,由DirectX渲染引擎提供支持。数据网格(GridControl)提供了一个灵活的基于视图的体系结构,包括许多数据塑造和UI自定义特性,数据网格可以显示和编辑来自任何大小和复杂数据源的数据。 P.S&…

详细扒一扒css的背景渐变(通俗易懂)

前言: CSS 渐变使您可以显示两种或多种指定颜色之间的平滑过渡。 CSS 定义了两种渐变类型: 线性渐变(向下/向上/向左/向右/对角线)径向渐变(由其中心定义) 下面来详细看看吧~ 🌈🌈文…

【2.10】回溯算法-解黄金矿工问题

一、题目 你要开发一座金矿,地质勘测学家已经探明了这座金矿中的资源分布,并用大小为 m * n 的网格grid 进行了标注。每个单元格中的整数就表示这一单元格中的黄金数量;如果该单元格是空的,那么就是 0。 为了使收益最大化&#x…

代码随想录 刷题记录-12 回溯(1) 基本理论

什么是回溯法 回溯法也可以叫做回溯搜索法,它是一种搜索的方式。 回溯法的效率 虽然回溯法很难,很不好理解,但是回溯法并不是什么高效的算法。 因为回溯的本质是穷举,穷举所有可能,然后选出我们想要的答案&#xff…

【图文并茂】ant design pro 如何对接后端个人信息接口

上一节我们有讲到如何对接登录接口的 【图文并茂】ant design pro 如何对接登录接口 仅仅能登录是最基本的,但是我们要进入后台还是需要另一个接口。 这个接口有两个作用: 来获取当前登录账号的信息,比如头像,用户名&#xff0…

SAP Lock Object锁机制

一、锁机制 SAP LUW要求数据库对象的锁定在SAP LUW结束释放,并且该数据库锁要求对所有SAP程序可见。SAP提供了一个逻辑数据锁定机制,该机制基于系统特定的锁定服务应用服务器中的中心锁定表(即将加锁的信息记入数据库表)。一个AB…

文档翻译软件哪个好用?后悔没早发现这5款

在学术研究的道路上,英文文献翻译无疑是一项挑战重重的任务! 作为一名经常与英文文献打交道的学者,我一直在寻找能够简化这一过程的工具。最近,我发现了一些英文文献翻译在线免费工具,它们提供了文档翻译的功能&#…

IC rankIC

IC IC衡量的是预测值和实际值之间的相关系数 计算公式为:IC Pearson(R(predicted),R(actual)) 取值范围:[-1, 1],其中1表示完全相关,也就是预测值和实际值完全一样。0表示完全不相关,-1表示,反向相关 ra…

Catf1ag CTF Web(八)

前言 Catf1agCTF 是一个面向所有CTF(Capture The Flag)爱好者的综合训练平台,尤其适合新手学习和提升技能 。该平台由catf1ag团队打造,拥有超过200个原创题目,题目设计注重知识点的掌握,旨在帮助新手掌握C…

「Qt Widget中文示例指南」如何实现一个旋转框(二)

Qt 是目前最先进、最完整的跨平台C开发工具。它不仅完全实现了一次编写,所有平台无差别运行,更提供了几乎所有开发过程中需要用到的工具。如今,Qt已被运用于超过70个行业、数千家企业,支持数百万设备及应用。 旋转框示例展示了如…

别选错了!一篇文章讲清Midjourney 和 Stable Diffusion的区别

24年无疑标志着AI时代的崭新篇章,各类AI软件如同春日里迅速生长的竹笋,层出不穷,其功能之丰富令人目不暇接,竞相在各自的领域内抢占前沿阵地。 在众多 AI 绘画工具中,Midjourney 和 Stable Diffusion 是最为人熟知的两…