Spring Boot 整合 RabbitMQ 实现延迟消息

news2025/1/16 22:06:23

关于 RabbitMQ

消息队列(Message Queuing,简写为 MQ)最初是为了解决金融行业的特定业务需求而产生的。慢慢的,MQ 被应用到了更多的领域,然而商业 MQ 高昂的价格让很多初创公司望而却步,于是 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)应运而生。

随着 AMQP 草案的发布,两个月后,RabbitMQ 1.0 就发布了。

RabbitMQ 的架构模型可以分为客户端和服务端两部分,客户端包括生产者和消费者,服务端包括虚拟主机、交换器和队列。

整体的流程非常简单,生产者将消息发送到服务端,消费者从服务端获取对应的消息。

生产者在发送消息前需要先确认发送给哪个虚拟主机的哪个交换器,再由交换器通过路由键将消息转发给与之绑定的队列。

最后,消费者到指定的队列中获取自己的消息进行消费。

客户端

生产者和消费者都属于客户端。

生产者:消息的发送方,将要发送的消息封装成一定的格式,发送给服务端。消息包括消息体和标签。

消费者:消息的接收方,负责消费消息体。

服务端

虚拟主机、交换机、队列都属于服务端。

虚拟主机:用来对交换器和队列进行逻辑隔离,在同一个虚拟主机下,交换器和队列的名称不能重复。有点类似 Java 中的包,同一个包下,不能有相同名称的类或者接口。

交换器:负责接收生产者发来的消息,并根据规则分配给对应的队列,不生产消息,只是消息的搬运工。

队列:负责存储消息,生产者发送的消息会放在这里,消费者从这里取。

连接和信道

连接和信道是两个不同的概念,连接的英文叫 connection,信道叫 channel。

连接里包含了多条信道,连接用的是 TCP 连接,因为 AMQP 就是用 TCP 实现的。

为什么不直接使用连接,而要在连接的基础上新建信道呢?

因为 TCP 连接是比较昂贵的,新建需要三次握手,销毁需要四次挥手,所以如果每个线程在想 RabbitMQ 服务端发送/接收消息的时候都新建一个 TCP 连接,就会非常的消耗资源,于是就有了信道。

信道是线程私有的,连接是线程共享的。

信道+连接的模式,既保证了线程之间的私密性,又减少了系统开销。

业务场景

消息队列的主要功能有三种:

  • 异步处理,比如说在做电商业务的时候,提交订单的动作可能涉及到创建订单、扣除库存、增加用户积分、发送订单邮件等。它们并不是一个串行的操作,可以把发送订单邮件和增加用户积分交给消息队列去做。
  • 系统解耦,消息队列可以作为不同系统之间的桥梁,且不受系统技术栈的约束。
  • 缓冲削峰,消息队列可以将大量的请求放到队列中,然后再按照一定的顺序规则交给业务服务器处理。

工作模式

RabbitMQ 支持 7 种工作模式:

  • 简单模式
  • 工作队列模式
  • 广播模式
  • 路由模式
  • 动态路由模式
  • 远程模式
  • 生产者确认模式

我们这里只演示前三种,

简单模式

简单模式真的超级简单,生产者将消息发送给队列,消费者从队列中获取消息队列即可。

生活中就类似于 快递员将包裹放到快递柜,然后给取件人发一个取件码,取件人通过取件码去快递柜里取包裹📦。

工作队列模式

工作队列模式在本质上只比简单模式对了一个队列,消费者从一个变成了多个。生产者将消息放入到队列中,多个消费者会一次进行消费。

比如说有 3 个消费者,生产者向队列发送 3 条消息,3 个消费者会没人消费一条消息,有点雨露均沾的意味。

当然了,也可以通过配置,将其改成能者多劳的模式。

广播模式

与工作队列模式不同,广播模式就有交换器参与了。在广播模式下,即使生产者只生产了一条消息,它对应的所有消费者都能全部接收,真正做到了公平公正公开。

安装配置 RabbitMQ

RabbitMQ 的安装方式可以参考官方:

Installing RabbitMQ | RabbitMQ

  • 服务器数据统计——消息投递情况,以及连接、信道、交换器、队列、消费者的数量
  • RabbitMQ 节点信息——erlang 进程、内存、磁盘空间等
  • 端口和 Web 信息
  • 。。。

启动RabbitMQ服务。

rabbitmq-server.bat

我们点击 admin 面板 点击虚拟主机新建一个 codingmore 的虚拟主机。

并新建一个用户 admin:

并设置它的权限。

整合 RabbitMQ

第一步,在 pom.xml 文件中添加 RabbitMQ 的 starter 依赖。

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步,在 application.yml 文件中添加 RabbitMQ 的配置。

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    virtual-host: codingmore

简单模式

第三步,新建 RabbitMQController 控制器类,添加 sendSimple 生产者接口。

@RestController
@Api(tags = "整合 RabbitMQ")
@RequestMapping("/rabbitmq")
public class RabbitMQController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/sendSimple")
    @ApiOperation("简单模式")
    public ResultObject sendSimple(String routingKey, String message) {
        rabbitTemplate.convertAndSend(routingKey, message);
        return ResultObject.success("ok");
    }
}

RabbitTemplate 是 Spring Boot 为我们封装好的操作 RabbitMQ 的工具类。

第四步,新建 SimpleConsumer 类,添加简单模式的消费者。

@Slf4j
@Component
@RabbitListener(queuesToDeclare = @Queue("simple"))
public class SimpleConsumer {
    @RabbitHandler
    public void receive(String message) {
        log.info("简单模式:{}", message);
    }
}

启动服务,在浏览器地址栏访问 http://localhost:8080/doc.html 打开 Swagger。

输入参数,点击发送。

在Intellij IDEA 中可以看到输出信息。

这就表示我们完成了 RabbitMQ 的简单模式。

工作队列模式

在 RabbitMQController 控制器中添加 sendWork 工作队列接口:

@PostMapping("/sendWork")
@ApiOperation("工作队列模式")
public ResultObject sendWork(String routingKey, String message) {
    for (int i = 0; i < 10; i++) {
        rabbitTemplate.convertAndSend(routingKey, "第" + i + "消息:" + message);
    }
    return ResultObject.success("ok");
}

新建 WorkConsumer 类,添加工作队列模式的消费者。

@Slf4j
@Component
public class WorkConsumer {
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receiveOne(String message) {
        log.info("工作队列模式 receiveOne:{}", message);
    }

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receiveTwo(String message) {
        log.info("工作队列模式 receiveTwo:{}", message);
    }
}

build 服务,在浏览器地址栏打开 http://localhost:8080/doc.html 刷新 Swagger。

输入参数,点击发送。

在Intellij IDEA 中可以看到输出信息。

这就表示我们完成了 RabbitMQ 的工作队列模式。

广播模式

在 RabbitMQController 控制器中添加 sendBroadcast 广播接口:

@PostMapping("/sendBroadcast")
@ApiOperation("广播模式")
public ResultObject sendBroadcast(String exchange, String message) {
    rabbitTemplate.convertAndSend(exchange, "",message);
    return ResultObject.success("ok");
}

新建 BroadcastConsumer 类,添加广播模式的消费者。

@Slf4j
@Component
public class BroadcastConsumer {
    @RabbitListener(bindings = @QueueBinding(value = @Queue,
            exchange = @Exchange(name = "fanout", type = "fanout")))
    public void receiveOne(String message) {
        log.info("广播模式 receiveOne:{}", message);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue,
            exchange = @Exchange(name = "fanout", type = "fanout")))
    public void receiveTwo(String message) {
        log.info("广播模式 receiveTwo:{}", message);
    }
}

注意这里的 Exchange(交换器)名字要是 fanout,它是 RabbitMQ 默认的一种交换器。

Fanout模式不需要处理路由键(所以我们在 sendBroadcast 接口中,convertAndSend 方法中传递的 routingKey 是空的),我们只需要简单的将队列绑定到exchange上,发送到exchange的每一个消息都会被转发到与该exchange绑定的所有队列上。

Fanout类型的Exchange转发消息是最快的。除此之外,还有 Direct Exchange、Topic Exchange,大家可以去了解一下。

build 服务,在浏览器地址栏打开 http://localhost:8080/doc.html 刷新 Swagger。

输入参数,点击发送。

在Intellij IDEA 中可以看到输出信息。

可以看到两个消费者都消费了消息,这就表示我们完成了 RabbitMQ 的广播模式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1569380.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

人工智能改变教育:理解和在课堂上使用 ChatGPT 的指南

原文&#xff1a;Talking to Machines: The Fascinating Story of ChatGPT and AI Language Models 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 这本直言不讳、幽默风趣的指南充满了可操作的提示、技巧和策略&#xff0c;帮助你在业务中充分利用 ChatGPT 的优势。在…

【氮化镓】GaN SP-HEMT的栅极可靠性

概括总结&#xff1a; 本文研究了氮化镓&#xff08;GaN&#xff09;肖特基型p-栅高电子迁移率晶体管&#xff08;GaN SP-HEMT&#xff09;的栅极鲁棒性和可靠性&#xff0c;通过一种新的电路方法评估了在实际转换器中栅极电压&#xff08;VGS&#xff09;过冲波形的栅极电压应…

网络基础二——TCP可靠性实现机制补充

11.3.4确认应答机制 ​ 1.双方通信时要返回确认应答报文&#xff0c;保证对方发送的报文是有效的&#xff1b;尽管整个通信过程中无法保证数据全部可靠&#xff0c;但是可以保证单个方向发送的数据是可靠的&#xff1b; ​ 发送的报文要设置序号&#xff0c;如果是应答报文要…

一、持续集成介绍

持续集成介绍 一、什么是持续集成二、持续集成的流程三、持续集成的组成要素四、持续集成的好处 一、什么是持续集成 持续集成&#xff08;CI&#xff09;指的是&#xff0c;频繁地&#xff08;一天多次&#xff09;将代码集成到主干。持续集成的目的&#xff0c;就是让产品可…

《Java面试自救指南》(专题二)计算机网络

文章目录 力推的计网神课get请求和post请求的区别在浏览器网址输入一个url后直到浏览器显示页面的过程常用状态码session 和 cookie的区别TCP的三次握手和四次挥手七层OSI模型&#xff08;TCP/IP协议模型&#xff09;各种io模型的知识http协议和tcp协议的区别https和http的区别…

理解pytorch的广播语义

目录 什么是广播运算 广播的条件 示例 示例1 示例2 示例3 补1 示例4 原位运算 示例5 参与广播运算的两个tensor&#xff0c;必须是从右向左对齐 总结规律 两个tensor可以做广播运算的条件&#xff1a; 两个可以互相广播的tensor运算的步骤&#xff1a; 例子&#x…

Java | Leetcode Java题解之第8题字符串转换整数atoi

题目&#xff1a; 题解&#xff1a; class Solution {public int myAtoi(String str) {Automaton automaton new Automaton();int length str.length();for (int i 0; i < length; i) {automaton.get(str.charAt(i));}return (int) (automaton.sign * automaton.ans);} …

Scala第二十章节(Akka并发编程框架、Akka入门案例、Akka定时任务代码实现、两个进程间通信的案例以及简易版spark通信框架案例)

Scala第二十章节 章节目标 理解Akka并发编程框架简介掌握Akka入门案例掌握Akka定时任务代码实现掌握两个进程间通信的案例掌握简易版spark通信框架案例 1. Akka并发编程框架简介 1.1 Akka概述 Akka是一个用于构建高并发、分布式和可扩展的基于事件驱动的应用工具包。Akka是…

MySQL 导入库/建表时/出现乱码

问题描述&#xff1a; 新建不久的项目在使用Navicat for MySQL进行查看数据&#xff0c;发现表中注释的部分乱码&#xff0c;但是项目中获取的数据使用不会。 猜测因为是数据库编码和项目中使用的不一样&#xff0c;又因为项目的连接语句定义了需要编码&#xff0c;故项目运行…

Golang实现一个聊天工具

简介 聊天工具作为实时通讯的必要工具&#xff0c;在现代互联网世界中扮演着重要的角色。本博客将指导如何使用 Golang 构建一个简单但功能完善的聊天工具&#xff0c;利用 WebSocket 技术实现即时通讯的功能。 项目源码 点击下载 为什么选择 Golang Golang 是一种高效、简…

win10+Intel显卡安装配置stable-diffusion-webui绘画网页

系列文章目录 提示&#xff1a;这里可以添加系列文章的所有文章的目录&#xff0c;目录需要自己手动添加 例如&#xff1a;第一章 Python 机器学习入门之pandas的使用 提示&#xff1a;写完文章后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目…

【opencv】教程代码 —video(3) 视频背景剔除

bg_sub.cpp 这段代码的功能是把视频中的背景和前景分离&#xff0c;提取出前景的运动物体。根据用户选择的不同的模式&#xff0c;可以选择基于MOG2或者基于KNN的方法来进行背景减除。在处理每一帧图像的过程中&#xff0c;首先使用背景减除模型对图像帧进行处理&#xff0c;得…

ChatGPT 与 OpenAI 的现代生成式 AI(下)

原文&#xff1a;Modern Generative AI with ChatGPT and OpenAI Models 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 七、通过 ChatGPT 掌握营销技巧 在本章中&#xff0c;我们将重点介绍营销人员如何利用 ChatGPT&#xff0c;在这一领域中查看 ChatGPT 的主要用例…

[RK3128_LINUX5.1] 关于 RetroArch 使用

问题描述 查看文档 docs\cn\Linux\ApplicationNote\Rockchip_Use_Guide_Linux_RetroArch_CN.pdf&#xff0c;描述为实验 make menuconfig 后勾选选项 Libretro cores and retroarch -> retroarch 但是SDK中并没有这个选项 解决方案&#xff1a; 目前发布的buildroot SDK…

4核8G服务器配置性能怎么样?4核8G12M配置服务器能干啥?

腾讯云4核8G服务器多少钱&#xff1f;腾讯云4核8G轻量应用服务器12M带宽租用价格646元15个月&#xff0c;活动页面 txybk.com/go/txy 活动链接打开如下图所示&#xff1a; 腾讯云4核8G服务器优惠价格 这台4核8G服务器是轻量应用服务器&#xff0c;详细配置为&#xff1a;轻量4核…

flex:1是干嘛的

直接上图&#xff1a; flex:1实际代表的是三个属性的简写&#xff0c;如上图所示。 其中flex-grow是用来增大盒子的&#xff0c;比如&#xff0c;当子盒子的宽度小于父盒子的宽度&#xff0c;父盒子的剩余空间可以 利用flex-grow来设置子盒子增大的占比&#xff1b; flex-shri…

每日五道java面试题之ZooKeeper篇(二)

目录&#xff1a; 第一题. 客户端注册 Watcher 实现第二题. 服务端处理 Watcher 实现第三题. ACL 权限控制机制第四题. Chroot 特性第五题. 客户端回调 Watcher 第一题. 客户端注册 Watcher 实现 &#xff08;1&#xff09;调用 getData()/getChildren()/exist()三个 API&…

腾讯云4核8g服务器价格,CVM和轻量哪个优惠?

2024年腾讯云4核8G服务器租用优惠价格&#xff1a;轻量应用服务器4核8G12M带宽646元15个月&#xff0c;CVM云服务器S5实例优惠价格1437.24元买一年送3个月&#xff0c;腾讯云4核8G服务器活动页面 txybk.com/go/txy 活动链接打开如下图&#xff1a; 腾讯云4核8G服务器优惠价格 轻…

竞赛 Yolov安全帽佩戴检测 危险区域进入检测 - 深度学习 opencv

1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; Yolov安全帽佩戴检测 危险区域进入检测 &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度系数&#xff1a;3分工作量&#xff1a;3分创新点&#xff1a;4分 该项目较为新颖&am…

PTA L2-046 天梯赛的赛场安排

天梯赛使用 OMS 监考系统&#xff0c;需要将参赛队员安排到系统中的虚拟赛场里&#xff0c;并为每个赛场分配一位监考老师。每位监考老师需要联系自己赛场内队员对应的教练们&#xff0c;以便发放比赛账号。为了尽可能减少教练和监考的沟通负担&#xff0c;我们要求赛场的安排满…