SpringBoot整合RabbitMQ的快速使用教程

news2025/1/11 23:48:58

     

目录

一、引入依赖

二、配置rabbitmq的连接信息等

1、生产者配置

2、消费者配置 

三、设置消息转换器

四、生产者代码示例

 1、配置交换机和队列信息

2、生产消息代码

五、消费者代码示例

1、消费层代码

2、业务层代码 


        在分布式系统中,消息队列是一种重要的通信方式,它能够有效地将消息从一个应用程序传递到另一个应用程序。RabbitMQ是一款流行的开源消息队列系统,简单易用且功能强大。本文将介绍如何使用SpringBoot快速整合RabbitMQ,实现消息的发送和接收。

 

交换机: 主要负责接收生产者发送的消息,并根据特定的规则将这些消息路由到一个或多个队列中。交换机的类型有:

  •  Fanout Exchange(扇出交换机)

        Fanout交换机会将接收到的所有消息广播到它知道的所有队列中。这种类型的交换机不考虑路由键,只是简单地将消息复制到所有绑定的队列中。适用于不需要选择性地发送消息给特定队列的情况,例如,广播系统通知或有多个服务需要消费同一份数据的场景。

  • Direct Exchange(直连交换机)

       Direct交换机根据消息的路由键将消息发送到与之匹配的队列中。只有当路由键与绑定关键字完全匹配时,消息才会被路由到相应的队列。适合于精确控制消息投递的场景,如特定的服务或功能模块只关心特定类型的消息。

  • Topic Exchange(主题交换机)

       Topic交换机允许更复杂的匹配规则,通过模式匹配的方式将消息路由到一个或多个队列。路由键和绑定键都使用点分隔的字符串,可以包含特殊字符如“#”和“*”来实现模糊匹配。"*"用于匹配一个单词,而“#”则用于匹配零个或多个单词。适合于需要按内容分类消息的系统,如日志处理系统,可以根据日志等级或来源将日志消息分发到不同的队列。

  • Headers Exchange(头交换机)

        Headers交换机使用消息头的一组键值对来决定消息应该被路由到哪个队列。这种交换机允许更细粒度的路由控制,但配置和使用较为复杂。适合需要基于消息多个属性来动态决定路由的场景,例如某些高级的路由策略或复杂的事件处理系统。

队列:主要用于存储消息,实现先进先出(FIFO)的特性。

一、引入依赖

这里引入了两个依赖。一个是rabbitmq的依赖,另一个是配置json转换器所需要的依赖。生产者和消费者服务都需要引入这两个依赖。

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
      <groupId>com.fasterxml.jackson.dataformat</groupId>
       <artifactId>jackson-dataformat-xml</artifactId>
 </dependency>

二、配置rabbitmq的连接信息等

1、生产者配置

  rabbitmq:
    host: 170.40.20.16
    port: 5672
    username: zhuoye
    password: zy521
    virtual-host: /

2、消费者配置 

   rabbitmq:
    host: 170.40.20.16
  port: 5672
    username: zhuoye
    password: zy521
    virtual-host: /
    listener:
      simple:
        prefetch: 1 #每次只能处理一个,处理完成才能获取下一个消息

三、设置消息转换器

        默认情况下Spring采用的序列化方式是JDK序列化,而JDK的序列化存在可读性性差、占用内存大、存在安全漏洞等问题。所以,这里我们一般使用Jackson的序列化代替JDk的序列化。

在生产者和消费者的启动类上加上如下代码:  

@SpringBootApplication
@EnableRabbit //开启rabbitmq的使用
public class ConsumerApp {
    public static void main( String[] args ) {
        SpringApplication.run(ConsumerApp.class, args);
    }

    //使用的是Jackson库中的Jackson2JsonMessageConverter类,代替使用jdk自带的序列化
    @Bean
    public MessageConverter jacksonMessageConvertor(){
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        jackson2JsonMessageConverter.setCreateMessageIds(true);//开启消息id的自动生成功能
        return jackson2JsonMessageConverter;
    }
}

四、生产者代码示例

 1、配置交换机和队列信息
@Configuration
public class RabbitMqConfig {

    private static String EXCHANGE_NAME="amq.topic";
    private static String QUEUE_NAME="alarm.data.topic.queue";
    private static String CONFIRM_ALARM_QUEUE_NAME="alarm.confirm.data.topic.queue";

    /**
     * 声明交换机
     */

    @Bean
    public TopicExchange exchange(){
         // durable:是否持久化,默认是false
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此交换机,该交换机会自动删除。
        return new TopicExchange(EXCHANGE_NAME,true,false);
    }

    /**
     * 声明告警队列
     * @return
     */
    @Bean("alarmQueue")
    public Queue alarmQueue(){
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        return new Queue(QUEUE_NAME,true,false,false);
    }

    /**
     * 声明确认告警队列
     * @return
     */
    @Bean("confirmAlarmQueue")
    public Queue confirmAlarmQueue(){
        return new Queue(CONFIRM_ALARM_QUEUE_NAME,true,false,false);
    }

    /**
     * 声明告警队列绑定关系
     * @param queue
     * @param topicExchange
     * @return
     */
    @Bean
    public Binding alarmBinding(@Qualifier("alarmQueue") Queue queue, TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("server.event.#");
    }

    /**
     * 声明确认告警队列绑定关系
     * @param queue
     * @param topicExchange
     * @return
     */
    @Bean
    public Binding confirmAlarmBinding(@Qualifier("confirmAlarmQueue") Queue queue, TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("server.event_confirm.#");
    }
2、生产消息代码
    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static String EXCHANGE_NAME="amq.topic";
    private static String CONFIRM_ALARM_QUEUE_NAME="alarm.confirm.data.topic.queue";

    @Test
    void producerAlarmMsg() {
        String msg = "发送一条告警消息";
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "server.event.#",msg);
        System.out.println("msg = " + msg);
    }

    @Test
    void producerConfirmAlarmMsg() {
        String msg = "发送一条确认告警消息";
        rabbitTemplate.convertAndSend(CONFIRM_ALARM_QUEUE_NAME, "server.event_confirm.#",msg);
        System.out.println("msg = " + msg);
    }

五、消费者代码示例

1、消费层代码
@Component
public class AlarmConsumer {

        @Autowired
        private IAlarmService alarmService;


        @RabbitListener(queues ="alarm.data.topic.queue",concurrency = "5")
        public void getAlarmInfo(String data){
            alarmService.dealAlarmData(data);
        }

        @RabbitListener(queues ="alarm.confirm.data.topic.queue",concurrency = "5")
        public void getConfirmAlarmInfo(String data){
            alarmService.dealConfirmAlarmData(data);
        }
}
2、业务层代码 
@Service
public class IAlarmServiceImpl implements IAlarmService {

    @Override
    public void dealAlarmData(String data) {

        EquipAlarmResp equipAlarmResp= JSON.parseObject(result,EquipAlarmResp.class);
        List<String> alarmIdsOld = dceEquipAlarmMapper.queryAllAlarmIds();
        DceEquipAlarmDto dceEquipAlarmDto = CopyBeanUtils.copyProperties(equipAlarmResp, DceEquipAlarmDto.class);
        dceEquipAlarmDto.setCreateTime(new Date());
        dceEquipAlarmDto.setAlarmTime(dceEquipAlarmDto.getAlarmTime()/1000);
        //查询出需要新增或者更新的数据
        Boolean flag=alarmIdsOld.stream().filter(a->a.equals(dceEquipAlarmDto.getAlarmId())).findFirst().isPresent();
        //开启事务,保证新增、更新、删除的原子性
        TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);
        List<DceEquipAlarmDto> list=new ArrayList<>();
        list.add(dceEquipAlarmDto);
        try {
            //新增
            if (!flag) {
                dceEquipAlarmMapper.insertBatch(list);
            }
            //更新
            if (flag) {
                dceEquipAlarmMapper.updateBatch(list);
            }
            //提交事务
            transactionManager.commit(transaction);
        } catch (Exception e) {
            //回滚
            transactionManager.rollback(transaction);
            log.error("DynamicEnvironmentServiceImpl.getAlarmInfoByRabbitMq 新华报业动环设备告警信息更新失败!", e);
        }
    }

    @Override
    public void dealConfirmAlarmData(String data) {

        EquipConfirmAlarmResp alarmResp = JSON.parseObject(data,EquipConfirmAlarmResp.class);
        Integer confirmTime = Integer.parseInt(String.valueOf(System.currentTimeMillis() / 1000));
        alarmResp.setConfirmTime(confirmTime);
        dceEquipAlarmMapper.updateConfirmAlarmBatch(alarmResp,alarmResp.getAlarmIds());

    }

}

注:以上代码为对接告警信息和对接告警确认消息的示例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1708878.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RTOS(6)任务

重点&#xff1a; 一、FreeRtos任务的API调用 1.创建任务&#xff08;静态、动态创建&#xff09; 动态创建: ①先写任务函数 ②定义函数的handle指针 ③调用动态创建任务的API&#xff1a;xTaskCreate&#xff08;任务函数&#xff0c;任务名称&#xff0c;栈深度&#x…

【css3】01-css3新特性样式篇

目录 1 背景 1.1 设置背景图片的定位 1.2 背景裁切-规定背景的绘制区域 1.3 设置背景图片尺寸 2 边框 2.1 盒子阴影box-shadow 2.2 边框图片border-image 3 文本 -文字阴影text-shadow 1 背景 1.1 设置背景图片的定位 background-origin&#xff1a;规定背景图片的定位…

遇到了导师放养,该怎么坚持?

最近收到学生读者的留言&#xff0c;抱怨科研的困难。导师忙碌且学生众多&#xff0c;自己只是众多学生之一&#xff0c;常常处于放养状态。除了每周的组会外&#xff0c;几乎无法接触到导师。在这种状态下&#xff0c;缺乏方向和动力&#xff0c;非常担心无法顺利毕业&#xf…

Llama模型家族之使用 Supervised Fine-Tuning(SFT)微调预训练Llama 3 语言模型(三)通过web页面方式微调

LlaMA 3 系列博客 基于 LlaMA 3 LangGraph 在windows本地部署大模型 &#xff08;一&#xff09; 基于 LlaMA 3 LangGraph 在windows本地部署大模型 &#xff08;二&#xff09; 基于 LlaMA 3 LangGraph 在windows本地部署大模型 &#xff08;三&#xff09; 基于 LlaMA…

【Linux】自己实现一个bash进程

bash就是命令行解释器&#xff0c;就是Linux操作系统让我们看到的&#xff0c;与用户进行交互的一种外壳&#xff08;shell&#xff09;&#xff0c;当然了bash也是一个进程&#xff0c;它有时候就是通过创建子进程来执行我们输入的命令的。这无疑就离不开我们上篇博客所说的进…

如何解决链游中可能出现的延迟或网络拥堵问题?

随着区块链技术的不断发展和普及&#xff0c;链游&#xff08;基于区块链的游戏&#xff09;作为新兴的娱乐形式&#xff0c;正逐渐走进大众的视野。然而&#xff0c;与传统游戏相比&#xff0c;链游在运行过程中可能会遇到一些特有的问题&#xff0c;其中最为突出的就是延迟和…

Windows hook介绍与代码演示

Windows Hook 是一种机制&#xff0c;允许应用程序监视系统或处理特定事件。它可以拦截和更改消息&#xff0c;甚至可以插入到其他应用程序的消息处理机制中。Windows 提供了多种挂钩类型&#xff0c;例如键盘挂钩、鼠标挂钩、消息挂钩等。 hook代码实现 下面是一个使用 Wind…

微服务架构下的‘黑带’安全大师:Spring Cloud Security全攻略!

深入探讨了微服务间的安全通信、安全策略设计以及面对经典安全问题的应对策略。无论你是微服务的新手还是资深开发者&#xff0c;都能在本文中找到提升安全功力的秘籍。让我们一起成为微服务架构下的‘黑带’安全大师&#xff01; 文章目录 1. 引言微服务安全挑战与重要性Sprin…

【软件工程】【23.04】p1

关键字&#xff1a; 软件模型、提炼、加工表达工具、通信内聚、访问依赖、边界类交互分析、RUP核心工作流、首先测试数据流、软件验证过程、CMMI过程域分类工程类&#xff1b; 软件工程目的、功能需求是需求的主体、结构化方法、耦合、详细设计工具、类、类图、RUP采用用例技…

rk3568_mutex

文章目录 前言1、什么是mutex?1.1mutex互斥体API函数二、实验2.1实验目的2.2源码2.3结果图前言 本文记录的是rk3568开发板基础上做的mutex实验 1、什么是mutex? mutex是互斥体,它是比信号量semaphore更加专业的机制。 在我们编写Linux驱动的时候遇到需要互斥的地方建议使用…

Nginx企业级负载均衡:技术详解系列(12)—— 深入解析root、alias及location

你好&#xff0c;我是赵兴晨&#xff0c;97年文科程序员。 在生产服务器的Nginx配置中&#xff0c;我们总会遇到形形色色的配置方案。你是否曾注意到root和alias指令的巧妙应用&#xff1f;是否对那些五花八门的location匹配规则感到好奇&#xff1f; 今天&#xff0c;咱们来聊…

微服务架构-分支微服务设计模式

微服务架构-分支微服务设计模式 这种模式是聚合器模式的扩展&#xff0c;允许同时调用两个微服务链 分支微服务设计模式是一种用于构建大型系统的微服务架构模式&#xff0c;其核心思想是 将复杂的业务逻辑拆解为多个小的、相互独立的子系统&#xff0c;每个子系统由一个或多…

家政项目day2 需求分析(模拟入职后熟悉业务流程)

目录 1 项目主体介绍1.1 项目背景1.2 运营模式1.3 项目业务流程 2 运营端需求2.1 服务类型管理2.2 服务项目&#xff08;服务&#xff09;管理2.3 区域管理2.4 区域服务管理2.5 相关数据库表的管理2.6 设计工程结构2.7 测试接口&#xff08;接口断点查看业务代码&#xff09; 1…

SQL学习小记(三)

SQL学习小记&#xff08;三&#xff09; 功能实现思路代码部分名词解释 代码打包为可执行文件 功能说明&#xff1a;使用python代码&#xff0c;将数据库database1中的表格table1同步到数据库database2中 功能实现 思路 #mermaid-svg-R1pWrEWA799M299a {font-family:"tre…

Redis 中 List 数据结构详解

目录 List 用法 1. 增 2. 删 3. 查 内部编码 应用场景 前言 Redis 中的 List 和 Set 数据结构各有特点&#xff0c;适用于不同的应用场景。List 提供了有序的列表结构&#xff0c;适合用于消息队列和任务列表等场景&#xff1b;Set 提供了无序且不重复的集合结构&#…

【全开源】旅游系统源码(Uniapp+FastAdmin+ThinkPHP)

一款基于UniappFastAdminThinkPHP开发的旅游系统&#xff0c;包含消费者端&#xff08;手机端&#xff09;、机构工作人员&#xff08;手机端&#xff09;、机构端&#xff08;PC&#xff09;、平台管理端&#xff08;PC&#xff09;。机构可以发布旅游线路、景点项目&#xff…

Wpf 使用 Prism 实战开发Day27

首页汇总和数据动态显示 一.创建首页数据汇总数据接口 汇总&#xff1a;待办事项的总数已完成&#xff1a;待办事项里面有多少条完成的待办完成比例&#xff1a;已完成和汇总之间的比例备忘录&#xff1a;显示备忘录的总数待办事项&#xff1a;显示待办事项未完成的集合备忘录&…

Java实现对PDF、纵向、横向页面添加自定义水印功能

Java实现对PDF、纵向、横向页面添加自定义水印 效果图 -- 纵向 页面PDF使用到JAR Maven依赖版本效果图 -- 横向页面PDF 效果图 – 纵向 页面PDF 代码如下&#xff1a; 使用到JAR Maven依赖版本 <dependency><groupId>org.apache.pdfbox</groupId><artifa…

视频监控平台AS-V1000 的场景管理,一键查看多画面视频的场景配置、调用、管理(一键浏览多路视频)

目录 一、场景管理的定义 二、场景管理的功能和特点 1、功能 &#xff08;1&#xff09;场景配置 &#xff08;2&#xff09;实时监控 &#xff08;3&#xff09;权限管理 2、特点 三、AS-V1000的场景配置和调用 1、场景配置 &#xff08;1&#xff09;实时视频预览 …

【Linux】Linux的权限_2 + Linux环境基础开发工具_1

文章目录 三、权限3. Linux权限管理修改文件的拥有者和所属组 4. 文件的类型5. 权限掩码 四、Linux环境基础开发工具1. yumyum 工具的使用 未完待续 三、权限 3. Linux权限管理 修改文件的拥有者和所属组 在上一节我们讲到如何更改文件的访问权限&#xff0c;那我们需要更改…