【RabbitMQ】SpringBoot整合RabbitMQ、实现RabbitMQ五大工作模式(万字长文)

news2025/1/20 6:02:53

目录

一、准备

1、创建SpringBoot项目

2、添加配置信息

3、创建配置类

二、RabbitMQ的配置类里创建队列

三、RabbitMQ的配置类里创建交换机及绑定队列

四、SpringBoot整合RabbitMQ入门案例

1、生产者

2、消费者

四、SpringBoot里实现RabbitMQ五大工作模式

1、简单模式

2、work queues工作队列模式

3、pub/sub订阅模式

4、routing路由模式

5、topic通配符模式


一、准备

1、创建SpringBoot项目

我们需要创建一个生产者项目与消费者项目

【Java】两张图帮你的社区版IDEA创建SpringBoot项目_1373i的博客-CSDN博客icon-default.png?t=N2N8https://blog.csdn.net/qq_61903414/article/details/130174514?spm=1001.2014.3001.5501

如何创建SpringBoot项目在该文章里,项目创建完成后即可进行后续操作

2、添加配置信息

项目创建完成后,需要在各自的配置文件里添加RabbitMQ的相关配置

在该文件里添加以下配置信息

spring:
  rabbitmq:
    host: 127.0.0.1       # IP
    port: 5672            # 端口
    username: guest       # 用户名
    password: guest       # 密码
    virtual-host: /learn  # 虚拟机名

3、创建配置类

完成上述操作后我们还需要创建一个RabbitMQ的配置类,后续会通过这个类来创建队列等,此时我们就可以编写RabbitMQ代码了

二、RabbitMQ的配置类里创建队列

在之前Spring项目里我们是通过xml文件来创建队列,此时我们可以通过@Bean注解在这个配置类来创建队列并注入Spring容器里,有两种方式

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 配置RabbitMQ
 * 配置交换机、队列、绑定队列与交换机
 */
@Configuration
public class RabbitMQConfig {
    // 定义队列名
    private static final String QUEUE1_NAME = "queue1";
    private static final String QUEUE2_NAME = "queue2";

    // 注入队列
    @Bean
    public Queue queue1 () {
        // 通过QueueBuiller.durable(队列名).build();
        return QueueBuilder.durable(QUEUE1_NAME).build();
    }

    @Bean
    public Queue queue2() {
        // 直接通过Queue来创建
        return new Queue(QUEUE2_NAME);
    }
   
}

注意此处的队列Queue不是我们之前在Java集合类里学过的Queue而是Rabbit MQ 里的Queue,他在 org.springframework.amqp.core这个包底下

三、RabbitMQ的配置类里创建交换机及绑定队列

同样交换机我们也可以通过注解来创建并注入Spring容器

 创建交换机也有两种方法

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 配置RabbitMQ
 * 配置交换机、队列、绑定队列与交换机
 */
@Configuration
public class RabbitMQConfig {

    // 定义交换机名称
    private static final String EXCHANGE = "ex";

    @Bean
    public Exchange exchange() {
        // ExchangeBuilder.fanoutExchange(交换机名).durable(是否持久化).build()
        return ExchangeBuilder.fanoutExchange(EXCHANGE).durable(true).build();
    }
    
    @Bean 
    public Exchange exchange2() {
        return new FanoutExchange("交换机名");
    }

}

 将之前创建的两个队列与该交换机进行绑定

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 配置RabbitMQ
 * 配置交换机、队列、绑定队列与交换机
 */
@Configuration
public class RabbitMQConfig {
    // 定义队列名
    private static final String QUEUE1_NAME = "queue1";
    // 定义交换机名称
    private static final String EXCHANGE = "ex";

    // 注入队列
    @Bean
    public Queue queue1 () {
        // QueueBuilder.durable(队列名).build()
        return QueueBuilder.durable(QUEUE1_NAME).build();
    }
    
    // 注入交换机
    @Bean
    public Exchange exchange() {
        // ExchangeBuilder.fanoutExchange(交换机名).durable(是否持久化).build()
        return ExchangeBuilder.fanoutExchange(EXCHANGE).durable(true).build();
    }
    
    // 绑定
    @Bean
    public Binding binding(@Qualifier("queue1") Queue queue1,@Qualifier("exchange") FanoutExchange exchange) {
        // BindingBuilder.bind(队列).to(交换机).with(队列路由) 此处交换机类型为fanout所以不需要设置路由其他类型需要设置路由
        return BindingBuilder.bind(queue1).to(exchange);
    }

}

四、SpringBoot整合RabbitMQ入门案例

创建了队列与交换机,我们来实现一个简单的案例,生产者生产一条消息,消费者进行消费

1、生产者

我们在生产者项目里需要在配置类里创建队列

package com.example.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 配置RabbitMQ
 * 配置交换机、队列、绑定队列与交换机
 */
@Configuration
public class RabbitMQConfig {
    
    // 创建demo队列
    public static final String QUEUE = "demoQueue";
    
    @Bean
    public Queue demoQueue() {
        return QueueBuilder.durable(QUEUE).build();
    }
}

然后设置消息发送情景为当前端访问/send接口时进行发送,此时我们编写该接口

 

package com.example.demo.controller;

import com.example.demo.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
@ResponseBody
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/send")
    public void sendMessage() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE,"hello demo");
    }
}

 运行项目,在浏览器访问该接口查看控制台

消息已发送 

2、消费者

我们需要在消费者项目里创建一个监听该队列的类,将他通过类注解注入Spring容器,让他能随着项目的启动而启动去监听

我们在类里定义一个监听方法加上RabbitListener注解进行监听

package com.example.demo.controller;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DemoQueueListener {
    @RabbitListener(queues = "demoQueue")
    public void listener(Message message) {
        System.out.println("消费消息" + new String(message.getBody()));
    }
}

 

运行代码查看控制台  

四、SpringBoot里实现RabbitMQ五大工作模式

1、简单模式

简单模式的实现与上述代码相同

2、work queues工作队列模式

工作队列模式与简单模式相同,只不过多了一个消费者监听队列,在实现时创建两个消费者即可

3、pub/sub订阅模式

实现订阅模式我们需要先在RabbitMQ配置类里创建两个队列以及fanout类型的交换机,并将他们绑定

package com.example.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 配置RabbitMQ
 * 配置交换机、队列、绑定队列与交换机
 */
@Configuration
public class RabbitMQConfig {

    /**
     * pub/sub订阅模式
     */
    // 定义队列与交换机
    private static final String F_QUEUE1 = "fQueue1";
    private static final String F_QUEUE2 = "fQueue2";
    private static final String F_EXCHANGE = "fEx";

    @Bean
    public Queue fQueue1() {
        // 创建队列1
        return QueueBuilder.durable(F_QUEUE1).build();
    }

    @Bean
    public Queue fQueue2() {
        // 创建队列2
        return new Queue(F_QUEUE2);
    }

    @Bean
    public Exchange fEx() {
        // 创建交换机
        return ExchangeBuilder.fanoutExchange(F_EXCHANGE).durable(true).build();
    }

    @Bean
    public Binding binding1(@Qualifier("fQueue1")Queue fQueue1,@Qualifier("fEx") FanoutExchange exchange) {
        // 绑定队列1
        return BindingBuilder.bind(fQueue1).to(exchange);
    }

    @Bean
    public Binding binding2(@Qualifier("fQueue2")Queue fQueue2,@Qualifier("fEx") FanoutExchange exchange) {
        // 绑定队列1
        return BindingBuilder.bind(fQueue2).to(exchange);
    }


}

此时访问fEx接口时发送消息

package com.example.demo.controller;

import com.example.demo.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
@ResponseBody
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/send")
    public void sendMessage() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE,"hello demo");
    }
    
    @RequestMapping("/fEx")
    public void sendByF() {
        rabbitTemplate.convertAndSend("fEx","","hello mq");
    }
}

 运行代码查看MQ控制台

消费者只需监听对应的队列即可 

4、routing路由模式

首先我们需要在RabbitMQ配置类里面创建队列以及direct类型的交换机,并将他们绑定指定队列路由key

package com.example.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 配置RabbitMQ
 * 配置交换机、队列、绑定队列与交换机
 */
@Configuration
public class RabbitMQConfig {
    // 创建队列与交换机
    private static final String D_QUEUE1 = "dQueue1";
    private static final String D_QUEUE2 = "dQueue2";
    private static final String D_EXCHANGE = "dEx";

    @Bean
    public Queue dQueue1() {
        // 创建队列1
        return QueueBuilder.durable(D_QUEUE1).build();
    }
    
    @Bean
    public Queue dQueue2() {
        // 创建队列2
        return new Queue(D_QUEUE2);
    }
    
    @Bean
    public DirectExchange dEx() {
        // 创建交换机
        return ExchangeBuilder.directExchange(D_EXCHANGE).durable(true).build();
    }
    
    @Bean
    public Binding binding1(@Qualifier("dQueue1")Queue dQueue1,DirectExchange dEx) {
        // 绑定交换机通过with指定路由
        return BindingBuilder.bind(dQueue1).to(dEx).with("error");
    }


    @Bean
    public Binding binding2(@Qualifier("dQueue2")Queue dQueue2,DirectExchange dEx) {
        // 绑定交换机通过with指定路由
        return BindingBuilder.bind(dQueue2).to(dEx).with("info");
    }
}

此时访问/dEx接口时发送消息到路由为error队列,不给路由为info队列发送 

package com.example.demo.controller;

import com.example.demo.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
@ResponseBody
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/send")
    public void sendMessage() {
        rabbitTemplate.convertAndSend("","hello demo");
    }

    @RequestMapping("/fEx")
    public void sendByF() {
        rabbitTemplate.convertAndSend("fEx","","hello mq");
    }
    
    @RequestMapping("/dEx")
    public void sendByD() {
        rabbitTemplate.convertAndSend("dEx","error","hello mq");
    }
}

运行代码访问接口查看MQ控制台 

消费者相同只需修改监听队列名即可 

5、topic通配符模式

首先需要在RabbitMQ配置类里创建队列与topic类型的交换机,并将它们进行绑定设置通配符匹配规则

package com.example.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 配置RabbitMQ
 * 配置交换机、队列、绑定队列与交换机
 */
@Configuration
public class RabbitMQConfig {
    /**
     * topic通配符模式
     */
    private static final String T_QUEUE1 = "tQueue1";
    private static final String T_QUEUE2 = "tQueue2";
    private static final String T_EXCHANGE = "tEx";

    @Bean
    public Queue tQueue1() {
        return QueueBuilder.durable(T_QUEUE1).build();  // 创建队列
    }
    
    @Bean
    public Queue tQueue2() {
        return new Queue(T_QUEUE2);          // 创建队列
    }
    
    @Bean 
    public TopicExchange tEx() {
        return ExchangeBuilder.topicExchange(T_EXCHANGE).durable(true).build();//创建交换机
    }
    
    @Bean
    public Binding binding1(@Qualifier("tQueue1") Queue tQueue1,@Qualifier("tEx") TopicExchange tEx) {
        return BindingBuilder.bind(tQueue1).to(tEx).with("A.*"); //绑定并配置通配符匹配规则
    }
    
    @Bean
    public Binding binding2(@Qualifier("tQueue2") Queue tQueue2,@Qualifier("tEx") TopicExchange tEx) {
        return BindingBuilder.bind(tQueue2).to(tEx).with("#.error");
    }
}

此时访问/tEx接口,给tQueue2发送消息不给tQueue1发送,具体匹配规则可查看之前文章

【RabbitMQ】RabbbitMQ的六种工作模式以及代码实现_1373i的博客-CSDN博客icon-default.png?t=N2N8https://blog.csdn.net/qq_61903414/article/details/130156097?spm=1001.2014.3001.5501

package com.example.demo.controller;

import com.example.demo.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
@ResponseBody
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/send")
    public void sendMessage() {
        rabbitTemplate.convertAndSend("","hello demo");
    }

    @RequestMapping("/fEx")
    public void sendByF() {
        rabbitTemplate.convertAndSend("fEx","","hello mq");
    }

    @RequestMapping("/dEx")
    public void sendByD() {
        rabbitTemplate.convertAndSend("dEx","error","hello mq");
    }

    @RequestMapping("/tEx")
    public void sendByT() {
        rabbitTemplate.convertAndSend("tEx","B.error","hello mq");
    }
}

运行程序访问接口查看MQ控制台

 同理消费者也只需修改监听队列名即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/414620.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux--进程多线程(上)

前言 精神内耗一方面可能是消极的,人好像一直在跟自己过不去,但其实它也是一种积极的情绪。精神内耗在某种程度上,是在寻找一种出口,寻找他自己人生的出口,寻找我今天的出口,或者寻找我一觉醒来明天的出口。…

【k8s完整实战教程5】网络服务配置(nodeport/loadbalancer/ingress)

系列文章:这个系列已完结,如对您有帮助,求点赞收藏评论。 读者寄语:再小的帆,也能远航! 【k8s完整实战教程0】前言【k8s完整实战教程1】源码管理-Coding【k8s完整实战教程2】腾讯云搭建k8s托管集群【k8s完…

恐怖的ChatGPT!

大家好,我是飞哥!不知道大家那边咋样。反正我最近感觉是快被ChatGPT包围了。打开手机也全是ChatGPT相关的信息,我的好几个老同学都在问我ChatGPT怎么用,部门内也在尝试用ChatGPT做一点新业务出来。那就干脆我就趁清明假期这一天宝…

AB测试基本原理

AB测试基本原理AB测试AB测试的基本步骤1、AB测试的基本步骤①选取指标指标的分类②建立假设③选取实验单位④计算样本量⑤流量分割⑥实验周期计算⑦线上验证⑧数据检验AB测试 所谓的AB测试就是使用实验组和对照组,通过控制变量法保证实验组和对照组基本条件一致&am…

NumPy 数组学习手册:6~7

原文:Learning NumPy Array 协议:CC BY-NC-SA 4.0 译者:飞龙 六、性能分析,调试和测试 分析,调试和测试是开发过程的组成部分。 您可能熟悉单元测试的概念。 单元测试是程序员编写的用于测试其代码的自动测试。 例如&…

AI —— 一看就懂的代码助手Copilot获取教程

背景 随着chatgpt的发布,人工智能领域近期站上了风口浪尖。GitHub Copilot由github与 OpenAI 合作创建,是世界上第一个使用 OpenAI 的 Codex 模型(GPT-3 的后代)制作的大规模生成式 AI 开发工具。GitHub Copilot 作为 AI 结对程序…

【条件判断】

目录知识框架No.0 筑基No.1 条件判断题目来源:PTA-L1-031 到底是不是太胖了题目来源:PTA-L1-063 吃鱼还是吃肉题目来源:PTA-L1-069 胎压监测题目来源:PTA-L1-077 大笨钟的心情题目来源:PTA-L1-083 谁能进图书馆知识框架…

Day15-二维数组字符串

文章目录一 二维数组二 字符串案例1案例2案例3-随堂练习案例4-输入-类型转换案例5案例6案例7一 二维数组 <script>// 书:编号 名称 描述 价格/*** 二维数组* */let arr [ [1001,"HTML","网页设计",100],[1002,"CSS","样式设计"…

Leetcode刷题之环形链表

莫等闲&#xff0c;白了少年头&#xff0c;空悲切。 --岳飞 目录 1.环形链表 2.环形链表Ⅱ 1.环形链表 给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next …

Stable Diffusion扩散模型

1 GAN到Stable Diffusion的改朝换代 随着人工智能在图像生成&#xff0c;文本生成以及多模态生成等生成领域的技术不断累积&#xff0c;生成对抗网络&#xff08;GAN&#xff09;、变微分自动编码器&#xff08;VAE&#xff09;、normalizing flow models、自回归模型&#xf…

Android Textview Button 等基础组件学习

一 Textview 1 基本使用 <?xml version"1.0" encoding"utf-8"?><LinearLayout android:layout_height"match_parent"android:layout_width"match_parent"android:orientation"vertical"xmlns:android"http…

vue中的pinia使用和持久化 - 粘贴即用

学习关键语句&#xff1a; pinia怎么用 写在前面 pinia 作为 vuex 的替代品好像变得不得不学习了&#xff0c;学起来一用发现 vuex 是什么麻烦的东西&#xff0c;我不认识 这篇文章一共包含的内容有&#xff1a; 安装 pinia读取数据修改数据数据持久化 其中&#xff0c;修…

代码不熟没关系,让AI替你写

程序员早已不是一个陌生的群体&#xff0c;但程序、代码相对普通人而言&#xff0c;看着还是比较深奥难懂&#xff0c;但自从有了ChatGPT&#xff0c;不少对此有兴趣的外行人士&#xff0c;也能轻松写出代码了&#xff0c;比如让ChatGPT写一个贪吃蛇游戏&#xff0c;按它给出的…

C++入门(2)

C入门1.缺省参数1.1. 缺省参数举例和概念1.2. 函数的传参是从左到右给参数的1.3.缺省参数分类1.4. 缺省参数的函数声明与定义2.函数重载2.1.函数重载的概念2.2. 函数重载的情况2.3.剖析C语言不能函数重载而C却可以的原因2.3.1. 编译链接过程2.3.2. 函数名修饰规则3.引用3.1. 引…

Java并行流:一次解决多线程编程难题,让你的程序飞起来

前言 在日常的工作中&#xff0c;为了提高程序的处理速度&#xff0c;充分利用多核处理器的性能&#xff0c;我们需要手动编写多线程代码。但是多线程编程非常复杂&#xff0c;容易出现死锁、竞态条件等问题&#xff0c;给我们带来了很大的困扰。而 Java 并行流则提供了一种更加…

python机器学习和深度学习在气象中的应用

查看原文>>> Python人工智能在气象中的实践技术应用 Python 是功能强大、免费、开源&#xff0c;实现面向对象的编程语言&#xff0c;在数据处理、科学计算、数学建模、数据挖掘和数据可视化方面具备优异的性能&#xff0c;这些优势使得 Python 在气象、海洋、地理、…

14:24面试,14:32就出来了 ,问的实在是太...

从外包出来&#xff0c;没想到算法死在另一家厂子&#xff0c;自从加入这家公司&#xff0c;每天都在加班&#xff0c;钱倒是给的不少&#xff0c;所以也就忍了。没想到8月一纸通知&#xff0c;所有人不许加班&#xff0c;薪资直降30%&#xff0c;顿时有吃不起饭的赶脚。 好在有…

PythonFlash+MySQL实现简单管理系统的增删改查

今天简单分享一下用Python的flash框架结合MySQL来实现信息管理系统的增删改查&#xff01; ps&#xff1a;该博客只完成了信息的添加和查看&#xff0c;删除和修改按照该方法下推即可&#xff01; 实现功能之前我们先在数据库里设置数据&#xff0c;例如&#xff1a; 我们创…

日常记录:天梯赛练习集L1-046 整除光棍

题目&#xff1a; 这里所谓的“光棍”&#xff0c;并不是指单身汪啦~ 说的是全部由1组成的数字&#xff0c;比如1、11、111、1111等。传说任何一个光棍都能被一个不以5结尾的奇数整除。比如&#xff0c;111111就可以被13整除。 现在&#xff0c;你的程序要读入一个整数x&#x…

Mac环境下nvm的安装与环境配置

目录 1.nvm简介 2.nvm安装 3.配置nvm环境 1.nvm简介 nvm全称 Node Version Manager &#xff0c;意思为node版本控制&#xff1b;它是一个命令行应用&#xff0c;可以快速地更新、安装、使用、卸载本机的全局 node.js 版本。他可以在同一台电脑上进行多个node版本之间的切换…