Spring RabbitMQ那些事(1-交换机配置消息发送订阅实操)

news2025/1/12 14:35:47

这里写目录标题

  • 一、序言
  • 二、配置文件application.yml
  • 三、RabbitMQ交换机和队列配置
    • 1、定义4个队列
    • 2、定义Fanout交换机和队列绑定关系
    • 2、定义Direct交换机和队列绑定关系
    • 3、定义Topic交换机和队列绑定关系
    • 4、定义Header交换机和队列绑定关系
  • 四、RabbitMQ消费者配置
  • 五、RabbitMQ生产者
  • 六、测试用例
    • 1、发送到FanoutExchage
    • 2、发送到DirectExchage
    • 3、发送到TopicExchange
    • 4、发动到HeadersExchage
  • 七、结语

一、序言

在上一节 RabbitMQ中的核心概念和交换机类型 中我们介绍了RabbitMQ中的一些核心概念,尤其是各种交换机的类型,接下来我们将具体讲解各种交换机的配置和消息订阅实操。


二、配置文件application.yml

我们先上应用启动配置文件application.yml,如下:

server:
  port: 8080
spring:
  rabbitmq:
    addresses: localhost:5672
    username: admin
    password: admin
    virtual-host: /
    listener:
      type: simple
      simple:
        acknowledge-mode: auto
        concurrency: 5
        max-concurrency: 20
        prefetch: 5

备注:这里我们指定了RabbitListenerContainerFactory的类型为SimpleRabbitListenerContainerFactory,并且指定消息确认模式为自动确认

三、RabbitMQ交换机和队列配置

Spring官方提供了一套 流式API 来定义队列交换机绑定关系,非常的方便,接下来我们定义4种类型的交换机和相应队列的绑定关系。

1、定义4个队列

/**
 * 定义4个队列
 */
@Configuration
protected static class QueueConfig {

	@Bean
	public Queue queue1() {
		return QueueBuilder.durable("queue-1").build();
	}

	@Bean
	public Queue queue2() {
		return QueueBuilder.durable("queue-2").build();
	}

	@Bean
	public Queue queue3() {
		return QueueBuilder.durable("queue-3").build();
	}

	@Bean
	public Queue queue4() {
		return QueueBuilder.durable("queue-4").build();
	}
}

2、定义Fanout交换机和队列绑定关系

/**
 * 定义Fanout交换机和对应的绑定关系
 */
@Configuration
protected static class FanoutExchangeBindingConfig {

	@Bean
	public FanoutExchange fanoutExchange() {
		return ExchangeBuilder.fanoutExchange("fanout-exchange").build();
	}

	/**
	 * 定义多个Fanout交换机和队列的绑定关系
	 * @param fanoutExchange
	 * @param queue1
	 * @param queue2
	 * @param queue3
	 * @param queue4
	 * @return
	 */
	@Bean
	public Declarables bindQueueToFanoutExchange(FanoutExchange fanoutExchange, Queue queue1, Queue queue2, Queue queue3, Queue queue4) {
		Binding queue1Binding = BindingBuilder.bind(queue1).to(fanoutExchange);
		Binding queue2Binding = BindingBuilder.bind(queue2).to(fanoutExchange);
		Binding queue3Binding = BindingBuilder.bind(queue3).to(fanoutExchange);
		Binding queue4Binding = BindingBuilder.bind(queue4).to(fanoutExchange);
		return new Declarables(queue1Binding, queue2Binding, queue3Binding, queue4Binding);
	}

}

备注:这里我们将4个队列绑定到了名为fanout-exchange的交换机上。

2、定义Direct交换机和队列绑定关系

@Configuration
protected static class DirectExchangeBindingConfig {

	@Bean
	public DirectExchange directExchange() {
		return ExchangeBuilder.directExchange("direct-exchange").build();
	}

	@Bean
	public Binding bindingQueue3ToDirectExchange(DirectExchange directExchange, Queue queue3) {
		return BindingBuilder.bind(queue3).to(directExchange).with("queue3-route-key");
	}
}

备注:这里我们定义了名为direct-exchange的交换机并通过路由keyqueue3-route-keyqueue-3绑定到了该交换机上。


3、定义Topic交换机和队列绑定关系

@Configuration
protected static class TopicExchangeBindingConfig {

	@Bean
	public TopicExchange topicExchange() {
		return ExchangeBuilder.topicExchange("topic-exchange").build();
	}

	@Bean
	public Declarables bindQueueToTopicExchange(TopicExchange topicExchange, Queue queue1, Queue queue2) {
		Binding queue1Binding = BindingBuilder.bind(queue1).to(topicExchange).with("com.order.*");
		Binding queue2Binding = BindingBuilder.bind(queue2).to(topicExchange).with("com.#");
		return new Declarables(queue1Binding, queue2Binding);
	}
}

这里我们定义了名为topic-exchange类型的交换机,该类型交换机支持路由key通配符匹配,*代表一个任意字符,#代表一个或多个任意字符。

备注:

  1. 通过路由keycom.order.*queue-1绑定到了该交换机上。
  2. 通过路由key com.#queue-2也绑定到了该交换机上。

4、定义Header交换机和队列绑定关系

@Configuration
protected static class HeaderExchangeBinding {

	@Bean
	public HeadersExchange headersExchange() {
		return ExchangeBuilder.headersExchange("headers-exchange").build();
	}

	@Bean
	public Binding bindQueueToHeadersExchange(HeadersExchange headersExchange, Queue queue4) {
		return BindingBuilder.bind(queue4).to(headersExchange).where("function").matches("logging");
	}
}

备注:这里我们定义了名为headers-exchange类型的交换机,并通过参数function=loggingqueue-4绑定到了该交换机上。


四、RabbitMQ消费者配置

Spring RabbitMQ中支持注解式监听端点配置,用于异步接收消息,如下:

@Slf4j
@Component
public class RabbitMqConsumer {

	@RabbitListener(queues = "queue-1")
	public void handleMsgFromQueue1(String msg) {
		log.info("Message received from queue-1, message body: {}", msg);
	}

	@RabbitListener(queues = "queue-2")
	public void handleMsgFromQueue2(String msg) {
		log.info("Message received from queue-2, message body: {}", msg);
	}

	@RabbitListener(queues = "queue-3")
	public void handleMsgFromQueue3(String msg) {
		log.info("Message received from queue-3, message body: {}", msg);
	}

	@RabbitListener(queues = "queue-4")
	public void handleMsgFromQueue4(String msg) {
		log.info("Message received from queue-4, message body: {}", msg);
	}
}

备注:这里我们分别定义了4个消费者,分别用来接受4个队列的消息。

五、RabbitMQ生产者

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {

	private final RabbitTemplate rabbitTemplate;

	public void sendMsgToFanoutExchange(String body) {
		log.info("开始发送消息到fanout-exchange, 消息体:{}", body);

		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
		rabbitTemplate.send("fanout-exchange", StringUtils.EMPTY, message);
	}

	public void sendMsgToDirectExchange(String body) {
		log.info("开始发送消息到direct-exchange, 消息体:{}", body);

		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
		rabbitTemplate.send("direct-exchange", "queue3-route-key", message);
	}

	public void sendMsgToTopicExchange(String routingKey, String body) {
		log.info("开始发送消息到topic-exchange, 消息体:{}", body);

		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
		rabbitTemplate.send("topic-exchange", routingKey, message);
	}

	public void sendMsgToHeadersExchange(String body) {
		log.info("开始发送消息到headers-exchange, 消息体:{}", body);

		MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setHeader("function", "logging").build();
		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();
		rabbitTemplate.send("headers-exchange", StringUtils.EMPTY, message);
	}

}


六、测试用例

这里写了个简单的Controller用来测试,如下:

@RestController
@RequiredArgsConstructor
public class RabbitMsgController {

	private final RabbitMqProducer rabbitMqProducer;

	@RequestMapping("/exchange/fanout")
	public ResponseEntity<String> sendMsgToFanoutExchange(String body) {
		rabbitMqProducer.sendMsgToFanoutExchange(body);
		return ResponseEntity.ok("广播消息发送成功");
	}

	@RequestMapping("/exchange/direct")
	public ResponseEntity<String> sendMsgToDirectExchange(String body) {
		rabbitMqProducer.sendMsgToDirectExchange(body);
		return ResponseEntity.ok("消息发送到Direct交换成功");
	}

	@RequestMapping("/exchange/topic")
	public ResponseEntity<String> sendMsgToTopicExchange(String routingKey, String body) {
		rabbitMqProducer.sendMsgToTopicExchange(routingKey, body);
		return ResponseEntity.ok("消息发送到Topic交换机成功");
	}

	@RequestMapping("/exchange/headers")
	public ResponseEntity<String> sendMsgToHeadersExchange(String body) {
		rabbitMqProducer.sendMsgToHeadersExchange(body);
		return ResponseEntity.ok("消息发送到Headers交换机成功");
	}

}

1、发送到FanoutExchage

直接访问http://localhost:8080/exchange/fanout?body=hello,可以看到该消息广播到了4个队列上。

2023-11-07 17:41:12.959  INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到fanout-exchange, 消息体:hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#1-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-1, message body: hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#0-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-4, message body: hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#3-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-3, message body: hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#2-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-2, message body: hello

2、发送到DirectExchage

访问http://localhost:8080/exchange/direct?body=hello,可以看到消息通过路由keyqueue3-route-key发送到了queue-3上。

2023-11-07 17:43:26.804  INFO 39460 --- [nio-8080-exec-1] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到direct-exchange, 消息体:hello
2023-11-07 17:43:26.822  INFO 39460 --- [ntContainer#3-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-3, message body: hello

3、发送到TopicExchange

访问http://localhost:8080/exchange/topic?body=hello&routingKey=com.order.create,路由key为 com.order.create的消息分别发送到了queue-1queue-2上。

2023-11-07 17:44:45.301  INFO 39460 --- [nio-8080-exec-4] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到topic-exchange, 消息体:hello
2023-11-07 17:44:45.312  INFO 39460 --- [ntContainer#1-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-1, message body: hello
2023-11-07 17:44:45.312  INFO 39460 --- [ntContainer#2-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-2, message body: hello

4、发动到HeadersExchage

访问http://localhost:8080/exchange/headers?body=hello,消息通过头部信息function=logging发送到了headers-exchange上。

2023-11-07 17:47:21.736  INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到headers-exchange, 消息体:hello
2023-11-07 17:47:21.749  INFO 39460 --- [ntContainer#0-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-4, message body: hello

七、结语

下一节我们将会介绍通过两种方式借由RabbitMQ实现延迟消息发送和订阅,敬请期待。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1182441.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

npm ERR! code ERESOLVE,npm ERR! ERESOLVE unable to resolve dependency tree

前言 当你第一次下包&#xff0c;或者删除node_module之后&#xff0c;突然npm i报错&#xff0c;这是因为npm版本导致的 可能是某些包版本跟npm 不兼容导致的&#xff08;peerDependencies&#xff09; npm ERR! code ERESOLVE,npm ERR! ERESOLVE unable to resolve depend…

mysql之高阶语句

1、使用select语句&#xff0c;用order by对表进行排序【尽量用数字列进行排序】 select id,name,score from info order by score desc; ASC升序排列&#xff08;默认&#xff09; DESC降序排列&#xff08;需要添加&#xff09; &#xff08;1&#xff09;order by结合whe…

怎么写好宣传稿件?纯干货

企业宣传稿件撰写并不难&#xff0c;有框架、有模板&#xff0c;再有知识储备就行&#xff0c;今天伯乐网络传媒和大家分享新闻稿写作套路&#xff0c;超实用的新闻稿套路➕模板&#xff0c;教你从0到1学会企业宣传稿写作&#xff01;纯干货&#xff0c;建议收藏起来慢慢看&…

uniapp原生插件之安卓友盟消息推送原生插件

插件介绍 安卓友盟消息推送原生插件&#xff0c;支持自定义响铃&#xff0c;震动&#xff0c;免打扰时间段&#xff0c;厂商离线推送等 插件地址 安卓友盟消息推送原生插件 - DCloud 插件市场 详细使用文档 uniapp 安卓友盟消息推送原生插件使用文档 超级福利 uniapp 插…

深度学习_10_softmax_实战

由于网上代码的画图功能是基于jupyter记事本&#xff0c;而我用的是pycham,这导致画图代码不兼容pycharm,所以删去部分代码&#xff0c;以便能更好的在pycharm上运行 完整代码&#xff1a; import torch from d2l import torch as d2l"创建训练集&创建检测集合"…

STM32 TIM定时器,配置,详解(1)

计数器寄存器(TIMx_CNT)、预分频器寄存器(TIMx_PSC)、自动重载寄存器(TIMx_ARR)。 PSC预分频器&#xff0c;顾名思义&#xff0c;先预备一下分频&#xff0c;有时候频率过高&#xff0c;后面的定时器承受不住&#xff0c;就先用PSC先分频一下。如何分频的&#xff1f;将每接受到…

PTE作文练习(一)

目录 65分备考建议 WE模版 范文 Supporting ideas: SWT 65分备考建议 RA重在多听标准的正确的示范&#xff0c;RS重在抓大放小&#xff0c;WFD重在整理错题&#xff0c;以及反反复复的车轮战&#xff0c;FIBRW重在“以对代记” 就是直接看答案&#xff0c;节约时间&#…

Python编程的四个关键点——都知道吗?快来查漏补缺!

文章目录 前言一、Python 中的类型提示二、Python 虚拟环境和包管理三、新的 Python 语法四、Python 测试关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Python学习书籍四、Python工具包项目源码合集①Python工具包②Python实战案例③Python小…

响应式新闻博客资讯网站模板源码带后台

模板信息&#xff1a; 模板编号&#xff1a;29779 模板编码&#xff1a;UTF8 模板分类&#xff1a;博客、文章、资讯、其他 适合行业&#xff1a;博客类企业 模板介绍&#xff1a; 本模板自带eyoucms内核&#xff0c;无需再下载eyou系统&#xff0c;原创设计、手工书写DIVCSS&a…

uniapp原生插件之安卓腾讯Bugly专业版原生插件

插件介绍 Bugly专业版是TDS腾讯端服务&#xff08;Tencent Device-oriented Service&#xff09;旗下的端质量监控平台&#xff0c;通过采集、监控、定位、告警等核心能力&#xff0c;提供专业的质量监控服务&#xff0c;帮助开发者及时发现并解决质量问题&#xff0c;打造高质…

SQL注入漏洞:CMS布尔盲注python脚本编写

SQL注入漏洞:CMS布尔盲注python脚本编写 文章目录 SQL注入漏洞:CMS布尔盲注python脚本编写库名爆破爆破表名用户名密码爆破 库名爆破 import requests #库名 database"" x0 while requests.get(urlf"http://10.9.47.77/cms/show.php?id33%20and%20length(data…

动态规划实例——01 背包详解

题目描述 有 n 件物品&#xff0c;每件物品有一个重量和一个价值&#xff0c;分别记为 w1&#xff0c;w2&#xff0c;…&#xff0c;wn 和 c1&#xff0c;c2&#xff0c;…&#xff0c;cn。现在有一个背包&#xff0c;其容量为 wk&#xff0c;要从 n 件物品种任取若干件。要求…

Python---capitalize() 方法---把字符串的首字母大写,其他字符全部小写,title()方法--把字符串中的所有单词的首字母大写,组成大驼峰

capitalize 英 /ˈkpɪtəlaɪz/ v. 用大写字母书写&#xff08;或印刷&#xff09;&#xff0c;把……首字母大写&#xff1b;为&#xff08;开办或发展企业&#xff09;提供资金&#xff1b;&#xff08;将资产或股票&#xff09;变现&#xff0c;使资本化&#xff1b;&…

Window10安装Docker

文章目录 Window10安装Docker前提条件Hyper -VWSL 2.0 安装包下载执行安装包更新 Window10安装Docker 前提条件 Hyper -V 如何启用 WSL 2.0 安装包下载 官网地址 下载后&#xff1a; 执行安装包 wsl --update等得有点久 重新打开 拉取一个helloworld镜像 说明已经…

[LeetCode] 4.寻找两个正序数组的中位数

一、题目描述 给定两个大小分别为 m 和 n 的正序&#xff08;从小到大&#xff09;数组 nums1 和 nums2。请你找出并返回这两个正序数组的 中位数 。 算法的时间复杂度应该为 O(log (mn)) 。 示例 1&#xff1a; 输入&#xff1a;nums1 [1,3], nums2 [2] 输出&#xff1a…

软件测试-根据状态迁移图设计测试用例

测试用例状态迁移图 许多需求用状态机的方式来描述&#xff0c;状态机的测试主要关注状态转移是否正确。对于一个有限状态机&#xff0c;通过测试验证其在给定的条件内是否能够产生需要的状态变化&#xff0c;有没有不可达的状态和非法的状态&#xff0c;是否可能产生非法的状…

【Spring】使用注解装配bean

目录 使用注解的两个必要步骤 正文 Cat Dog Animal beans.xml 测试 Qualifier 使用注解的两个必要步骤 1.导入约束 <?xml version"1.0" encoding"UTF-8"?> <beans xmlns"http://www.springframework.org/schema/beans"xmlns:…

RT-DETR 应用 BiFPN 结构 | 加权双向特征金字塔网络

模型效率在计算机视觉中变得越来越重要。在本文中,我们系统地研究了目标检测中的神经网络架构设计选择,并提出了几种关键的优化方法来提高效率。首先,我们提出了一种加权双向特征金字塔网络(BiFPN),它可以实现简单快速的多尺度特征融合;其次,我们提出了一种复合缩放方法…

C盘清理指南(二)——盘符划分操作

今天的内容是C盘清理系列的第二期——盘符划分操作。 1. 点击“我的电脑——左上角的管理” 2.进入后点击磁盘管理 3.右键单击某个想修改盘符&#xff0c;可进行扩展、压缩、删除三种操作 其中压缩卷是进行“分解反应”&#xff0c;即原盘过大要进行拆分。此处注意拆分的上限为…

数据结构与算法—双链表

前言 前面有很详细的讲过线性表(顺序表和链表)&#xff0c;当时讲的链表以单链表为主&#xff0c;但在实际应用中双链表有很多应用场景&#xff0c;例如大家熟知的LinkedList。 双链表与单链表区别 单链表和双链表都是线性表的链式实现&#xff0c;它们的主要区别在于节点结构…