从0开始搭建一个生产级SpringBoot2.0.X项目(十)SpringBoot 集成RabbitMQ

news2025/1/10 6:14:04

前言

最近有个想法想整理一个内容比较完整springboot项目初始化Demo。

SpringBoot集成RabbitMQ

 RabbitMQ中的一些角色:

  • publisher:生产者

  • consumer:消费者

  • exchange个:交换机,负责消息路由

  • queue:队列,存储消息

  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。 

junit用于测试。

一、pom引入依赖amqp

        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        <!--rabbitmq-->

二、application-dev.yaml 增加RabbitMQ相关配置

spring:
  #RabbitMQ服务器配置,地址账号密码,virtualhost等配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: murg
    password: 123456
    virtual-host: murg-host
    #队列中没有消息,阻塞等待时间
    template:
      receive-timeout: 2000
logging:
  level:
    org.springframework.security: debug


三、发布/订阅

RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型。此处只列举发布/订阅模式。

此模式下根据交换机类型又分为三种。

1.Fanout类型:   广播模式    把消息交给所有绑定到交换机的队列

2.Direct类型:    路由模式     把消息交给符合指定routing key 的队列

3Topic类型:     主题模式      把消息交给符合主题通配符的队列

3.1 Fanout类型

在广播模式下,消息发送流程是这样的:

1) 可以有多个队列

2) 每个队列都要绑定到Exchange(交换机)

3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定

4) 交换机把消息发送给绑定过的所有队列

5) 订阅队列的消费者都能拿到消

3.1.1 声明Fanout类型交换机和队列 将交换机和队列绑定在一起

创建配置类FanoutConfig 

声明一个Fanout类型交换机命名为murg.fanout

声明两个Queue队列分别为fanout.queue1和fanout.queue2

分别将两个队列和交换机绑定,后续用于消费消息。

package com.murg.bootdemo.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Fanout 广播模式下
 *声明交换机和队列
 */
@Configuration
public class FanoutConfig {
    /**
     * 声明交换机和队列 将交换机和队列绑定在一起
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("murg.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

3.1.2创建消息生产服务

创建消息生产服务MessageProducerService ,注入RabbitTemplate 用于发送消息。

 注意一点是rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列

新增测试广播模式下发送消息方法

package com.murg.bootdemo.rabbitmq;

import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

/**
 * Rabbitmq消息生产者
 */
@Service
@RequiredArgsConstructor
public class MessageProducerService {
    private final RabbitTemplate rabbitTemplate;

    /**
     * 测试广播模式下发送消息
     * @param msg
     */
    public void testFanoutExchange(String msg) {
        // 发送消息
        //murg.fanout交换机名字 已经在 FanoutConfig中配置并且初始化了
        //rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列
        rabbitTemplate.convertAndSend("murg.fanout","",msg);
    }

   
}

3.13创建消息消费服务

创建消息生产服务MessageConsumerService 

@RabbitListener是 SpringAMQP AMQP提供的注解,用于简化 RabbitMQ 消息监听器的创建。通过在方法上添加 @RabbitListener 注解,可以将方法注册为消息监听器用于处理从 RabbitMQ 中接收到的消息。queues 参数定义队列名字 ,此处创建两个监听 在上述FanoutConfig 中已经将这两个队列和交换机murg.fanout绑定,所有可同时消费murg.fanout交换机的消息。

package com.murg.bootdemo.rabbitmq;

import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

/**
 * 消息消费者
 *
 */
@Service
@RequiredArgsConstructor
public class MessageConsumerService {

    /**
     * 下面是Fanout,广播模式的监听
     * 通过RabbitListener监听队列名字FanoutConfig 中定义的 fanout.queue1 和fanout.queue2
     * @param msg
     */
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {
        System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) {
        System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
    }


  
}

3.14修改测试类进行测试

修改创建项目时生成的 BootdemoApplicationTests.class

增加以下注解

@ActiveProfiles("dev") 

指定运行环境为开发环境
@RunWith(SpringRunner.class)

指定测试类的运行器(Runner)。其主要作用是将Spring的测试支持集成到JUnit测试中,使得在运行JUnit测试时,Spring的上下文可以被正确地加载和配置。

@SpringBootTest(classes={BootdemoApplication.class})

指定启动类

package com.murg.bootdemo;


import com.murg.bootdemo.rabbitmq.MessageProducerService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Profile;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("dev")
@RunWith(SpringRunner.class)
@SpringBootTest(classes={BootdemoApplication.class})// 指定启动类
public class BootdemoApplicationTests {
    @Autowired
    MessageProducerService messageProducerService;
    @Test
    public void contextLoads() {
        System.out.printf("aaaaaaaaaaaaa");
    }
    //测试
    @Test
    public void testFanoutExchange(){
        String msg = "遍身罗绮者,不是养蚕人";
        for (int i=0;i<10;i++){
            messageProducerService.testFanoutExchange(msg);
        }
    }

}

3.15 增加一个测试方法调用消息生产服务,发送 Fanout类型的消息。运行测试控制台输出结果

两个队列都可消费。 

3.2 Direct类型

3.2.1MessageProducerService生产服务增加方法testDirectExchange

在Direct模型下:
 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
 Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

/**
     * 测试Direct模式下发送消息
     * 在Direct模型下:
     *
     * 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
     *
     * 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
     *
     * Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
     * murg.Direct
     * @param msg
     */
    public void testDirectExchange(String msg,String rountingkey) {
        // 发送消息
        //murg.fanout交换机名字 已经在 FanoutConfig中配置并且初始化了
        //rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列
        rabbitTemplate.convertAndSend("murg.direct",rountingkey,msg);
    }

3.2.2MessageConsumerService消费服务增加方法testDirectExchange

Direct模式的消费监听修改队列和交换机的方式改为注解方式。
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:@Exchange(name = "murg.direct",type = ExchangeTypes.DIRECT)声明交换机名字及类型
通过key的值声明接收不同路由的消息

direct.queue1 消费 rountingkey为“蚕妇”的消息

direct.queue2 消费 rountingkey为“自京赴奉先县咏怀五百字”的消息

    /**
     *
     * 下面是rountingKey 路由key 模式的消费监听
     * 基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
     *
     * 在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:
     *
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),//定义队列名字 direct.queue1
            exchange = @Exchange(name = "murg.direct", type = ExchangeTypes.DIRECT),//指定交换机和交换机类型
            key = {"蚕妇"} //指定消费rountingkey
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),//定义队列名字 direct.queue2
            exchange = @Exchange(name = "murg.direct", type = ExchangeTypes.DIRECT),//指定交换机和交换机类型
            key = {"自京赴奉先县咏怀五百字"}//指定消费rountingkey
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    }

3.2.3测试类增加测试方法

    @Test
    public void testDirectExchange() throws InterruptedException {
        String msg1 = "遍身罗绮者,不是养蚕人";
        String key1 ="蚕妇";
        String msg2 = "朱门酒肉臭,路有冻死骨";
        String key2 ="自京赴奉先县咏怀五百字";

        for (int i=0;i<10;i++){
            if (i % 2 == 0){
                messageProducerService.testDirectExchange(msg2,key2);
                Thread.sleep(1000);
            }else {
                messageProducerService.testDirectExchange(msg1,key1);
                Thread.sleep(1000);

            }

        }
    }

调用消息生产服务,发送 Direct类型的消息。运行测试控制台输出结果

3.3 Topic类型

3.3.1MessageProducerService生产服务增加方法testTopicExchange

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
demo.#:能够匹配demo.spu.insert 或者 demo.spu    #.demo写法也可以
demo.*:只能匹配demo.spu

  public void testTopicExchange(String msg, String key) {
        rabbitTemplate.convertAndSend("murg.topic",key,msg);
    }

3.3.2MessageConsumerService消费服务增加方法testDirectExchange

@Exchange(name = "murg.topic",type = ExchangeTypes.TOPIC)声明交换机名字及类型
通过key的值声明接收不同路由的消息

topic.queue1 消费 rountingkey为“罗隐.*”的消息

topic.queue2 消费 rountingkey为“#.贫女”的消息

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),//定义队列名字 topic.queue1
            exchange = @Exchange(name = "murg.topic", type = ExchangeTypes.TOPIC),//指定交换机和交换机类型
            key = {"罗隐.*"} //指定消费rountingkey
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),//定义队列名字 topic.queue2
            exchange = @Exchange(name = "murg.topic", type = ExchangeTypes.TOPIC),//指定交换机和交换机类型
            key = {"#.贫女"}//指定消费rountingkey
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }

3.3.3测试类增加测试方法 

    @Test
    public void testTopicExchange() throws InterruptedException {
        String msg = "采得百花成蜜后,为谁辛苦为谁甜";
        String key ="罗隐.蜂";
        String msg2 = "苦恨年年压金线,为他人作嫁衣裳";
        String key2 ="秦韬玉.贫女";
        for (int i=0;i<10;i++){
            if (i % 2 == 0){
                messageProducerService.testTopicExchange(msg,key);
                Thread.sleep(1000);
            }else {
                messageProducerService.testTopicExchange(msg2,key2);
                Thread.sleep(1000);

            }
        }
    }

调用消息生产服务,发送 Topic类型的消息。运行测试控制台输出结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2237359.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

比流计算资源效率最高提升 1000 倍,“增量计算”新模式能否颠覆数据分析?

作者 | 关涛 云器科技CTO 数据平台领域发展 20 年&#xff0c;逐渐成为每个企业的基础设施。作为一个进入“普惠期”的领域&#xff0c;当下的架构已经完美了吗&#xff0c;主要问题和挑战是什么&#xff1f;在 2023 年 AI 跃变式爆发的大背景下&#xff0c;数据平台又该如何演…

MySQL性能测试方案设计

在现代互联网系统中&#xff0c;数据库性能直接影响到整体应用的速度和用户体验。而MySQL作为广泛使用的关系型数据库&#xff0c;随着数据量和并发请求的增长&#xff0c;其性能问题也日益突出。今天我们将深入探讨如何设计一套高效的MySQL性能测试方案&#xff0c;帮助你精准…

彻底解决单片机BootLoader升级程序失败问题

文章目录 1、引言2、MicroBoot&#xff1a;优雅的解决升级问题问题1&#xff1a;bootloader 在跳转到app前没有清理干净存在的痕迹问题2&#xff1a; 需要 APP 传递信息给 Bootloader问题3&#xff1a; APP单独运行没有问题&#xff0c;通过Bootloader跳转到APP运行莫名死机问题…

Oracle OCP认证考试考点详解082系列17

题记&#xff1a; 本系列主要讲解Oracle OCP认证考试考点&#xff08;题目&#xff09;&#xff0c;适用于19C/21C,跟着学OCP考试必过。 81. 第81题&#xff1a; 题目 81.Examine these SOL statements which execute successfully Which two statements are true after exec…

【EFK】Linux集群部署Elasticsearch最新版本8.x

【EFK】Linux集群部署Elasticsearch最新版本8.x 摘要环境准备环境信息系统初始化启动先决条件 下载&安装修改elasticsearch.yml控制台启动Linux服务启动访问验证查看集群信息查看es健康状态查看集群节点查询集群状态 生成service token验证service tokenIK分词器下载 摘要 …

基于python的天气数据采集与可视化分析,对20个城市的天气适宜出行度分析

摘要 本项目旨在基于Python对20个城市的天气数据进行采集与可视化分析&#xff0c;以评估天气的适宜出行度。该分析通过四个主要指标进行量化&#xff0c;这些指标分别是天气状况良好率、空气质量优良率、气温适宜率和安全天气率。通过这些指标&#xff0c;我们能够有效地判断…

外贸管理利器7选,助力高效办公

推荐7款外贸管理软件&#xff0c;包括ZohoBooks、ZohoCRM、富通天下等&#xff0c;各具特色&#xff0c;满足外贸企业不同需求&#xff0c;提高管理效率&#xff0c;助力企业全球化竞争。、 一、Zoho Books Zoho Books是一款外贸财务管理软件&#xff0c;不仅为用户提供了一个…

【JWT】Asp.Net Core中JWT刷新Token解决方案

Asp.Net Core中JWT刷新Token解决方案 前言方案一:当我们操作某个需要token作为请求头的接口时,返回的数据错误error.response.status === 401,说明我们的token已经过期了。方案二:实现用户无感知的刷新token值,我们希望当响应返回的数据是401身份过期时,响应阻拦器自动帮我…

当AI遇上时尚:未来的衣橱会由机器人来打理吗?

内容概要 在当今这个快速发展的时代&#xff0c;人工智能与时尚的结合正在逐渐改写我们对衣橱管理的认知。传统的衣橱管理常常面临着空间不足、穿搭单调及库存过多等挑战&#xff0c;许多人在挑选服饰时难以做出决策。然而&#xff0c;随着技术的进步&#xff0c;智能推荐和自…

编写虚拟的GPIO控制器的驱动程序:和pinctrl的交互使用

往期内容 本专栏往期内容&#xff1a; Pinctrl子系统和其主要结构体引入Pinctrl子系统pinctrl_desc结构体进一步介绍Pinctrl子系统中client端设备树相关数据结构介绍和解析inctrl子系统中Pincontroller构造过程驱动分析&#xff1a;imx_pinctrl_soc_info结构体Pinctrl子系统中c…

【MySQL】数据库整合攻略 :表操作技巧与详解

前言&#xff1a;本节内容讲述表的操作&#xff0c; 对表结构的操作。 是对表结构中的字段的增删查改以及表本身的创建以及删除。 ps&#xff1a;本节内容本节内容适合安装了MySQL的友友们进行观看&#xff0c; 实操更有利于记住哦。 目录 创建表 查看表结构 修改表结构 …

CocoaPods安装步骤详解 - 2024

引言 CocoaPods的安装&#xff0c;如果有VPN就一直开启&#xff0c;会让整个流程非常顺畅。 在现代 iOS 开发中&#xff0c;依赖管理变得越来越重要&#xff0c;CocoaPods 成为开发者们首选的依赖管理工具。它不仅可以简化库的安装与更新&#xff0c;还能帮助开发者更高效地管…

二叉树-堆

树的几个重要定义 1.树根子树根亲缘关系 2.节点的度:有几个子树或根有几个孩子 3.叶子节点:没有孩子的终端节点 度为0 4.分支节点:度不为0的节点 5.树叶子分支节点 6.父亲节点/双亲节点 7.子节点 8.树的度:最大节点的度就是树的度 9.树的层:一般从第一层开始数,也有从0层开始数…

内置RTK北斗高精度定位的4G执法记录仪、国网供电服务器记录仪

内置RTK北斗高精度定位的4G执法记录仪、国网供电服务器记录仪BD311R 发布时间: 2024-10-23 11:28:42 一、 产品图片&#xff1a; 二、 产品特性&#xff1a; 4G性能&#xff1a;支持2K超高清图传&#xff0c;数据传输不掉帧&#xff0c;更稳定。 独立北…

浮动路由:实现出口线路的负载均衡冗余备份。

浮动路由 Tip&#xff1a;浮动路由指在多条默认路由基础上加入优先级参数&#xff0c;实现出口线路冗余备份。 ip routing-table //查看路由表命令 路由优先级参数&#xff1a;越小越优 本次实验测试两条默认路由&#xff0c;其中一条默认路由添加优先级参数&#xff0c;设置…

ssm077铁岭河医院医患管理系统+vue(论文+源码)_kaic

毕业设计&#xff08;论文) 题 目&#xff1a; 医院医患管理系统 姓 名&#xff1a; 学 号&#xff1a; 所属学院&#xff1a; 专业班级&#xff1a; 指导&#xff1a; 职 称&#xff1a; 完成日期 2021年 月 摘 要 21世纪的今天&#xf…

关于在VS中使用Qt不同版本报错的问题

最开始需要配置的地方 首先看一下我的Qt有关的环境变量&#xff1a; Path环境变量里&#xff1a; 这里就是把对应Qt编译器环境下的bin目录放进来&#xff1a;比如你使用的是msvc2017_64或者MinGW QMAKESPEC环境变量&#xff1a; 这个就选择Qt对应的编译器目录下的\mkspecs\w…

Redis 权限控制(ACL)|ACL 命令详解、ACL 持久化

官网文档地址&#xff1a;https://redis.io/docs/latest/operate/oss_and_stack/management/security/acl/ 使用版本&#xff1a;Redis7.4.1 什么是 ACL&#xff1f; ACL&#xff08;Access Control List&#xff09;&#xff0c;权限控制列表&#xff0c;是 Redis 提供的一种…

任务中心全新升级,新增分享接口文档功能,MeterSphere开源持续测试工具v3.4版本发布

2024年11月5日&#xff0c;MeterSphere开源持续测试工具正式发布v3.4版本。 在这一版本中&#xff0c;系统设置方面&#xff0c;任务中心支持实时查看系统即时任务与系统后台任务&#xff1b;接口测试方面&#xff0c;新增接口文档分享功能、接口场景导入导出功能&#xff0c;…

GEE 数据集——美国gNATSGO(网格化国家土壤调查地理数据库)完整覆盖了美国所有地区和岛屿领土的最佳可用土壤信息

目录 简介 代码 引用 网址推荐 知识星球 机器学习 gNATSGO&#xff08;网格化国家土壤调查地理数据库&#xff09; 简介 gNATSGO&#xff08;网格化国家土壤调查地理数据库&#xff09;数据库是一个综合数据库&#xff0c;完整覆盖了美国所有地区和岛屿领土的最佳可用土…