第十七章 : Spring Boot 集成RabbitMQ(一)

news2025/1/14 18:11:24

第十七章 : Spring Boot 集成RabbitMQ(一)

前言

本章介绍RabbitMQ的核心概念和消息中间件中非常重要的协议——AMQP协议,然后介绍Direct、Topic、Headers、Fanout等交换机的作用和特点;RabbitMQ的五种消息发送模式-简单队列、工作队列、发布订阅、路由、广播;以及RabbitMQ的消息确认机制。

Springboot 版本 2.3.2.RELEASE ,RabbitMQ 3.9.11,Erlang 24.2

RabbitMQ是什么 ?

RabbitMQ基于开源的AMQP协议实现,服务器端用Erlang语言编写,支持多种客户端,如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP、AJAX等。

RabbitMQ优势

可靠性(Reliability):使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认。

灵活的路由(Flexible Routing):在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,既可以将多个Exchange绑定在一起,又可以通过插件机制实现自己的Exchange。

消息集群(Clustering):多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。

高可用(Highly Available Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出现问题的情况下队列仍然可用。

多种协议(Multi-Protocol):支持多种消息队列协议,如STOMP、MQTT等。

多种语言客户端(Many Clients):几乎支持所有常用语言,比如Java、.NET、Ruby等。

管理界面(Management UI):提供了易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。

跟踪机制(Tracing):如果消息异常,RabbitMQ提供了消息的跟踪机制,使用者可以找出发生了什么。

插件机制(Plugin System):提供了许多插件进行扩展,也可以编辑自己的插件。

RabbitMQ作为流行的消息中间件,实现了应用程序的异步和解耦,同时也能起到消息缓冲、消息分发的作用,在易用性、扩展性、高可用性等方面表现不俗。

AMQP

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是应用层协议的开放标准,是为面向消息的中间件设计。基于此协议的客户端可与消息中间件传递消息,从而不受产品、开发语言等条件限制。

消息中间件主要用于组件之间的解耦,消息发送者无须知道消息使用者的存在,反之亦然。与其他消息队列协议不同的是,AMQP中增加了Exchange和Binging角色。生产者把消息发布到Exchange上,消息最终到达队列并被消费者接收;而Binding决定Exchange的消息应该发送到哪个队列。

与其他消息队列协议不同的是,AMQP中增加了ExchangeBinging角色。生产者把消息发布到Exchange上,消息最终到达队列并被消费者接收;而Binding决定Exchange的消息应该发送到哪个队列。

在这里插入图片描述

消息路由传递的过程:生产者首先将消息发送到Exchange,通过Exchange转发到绑定的各个消息队列上,然后消费者从队列中读取消息。

RabbitMQ如何保证数据可靠性?

  1. 持久化机制:RabbitMQ支持消息持久化,可以将消息保存在磁盘中,确保即使在RabbitMQ服务重启后,消息也不会丢失。持久化队列在RabbitMQ中被管理,而非持久化队列不会被保存在磁盘中,Rabbit服务重启后队列就会消失。

  2. 事务机制:RabbitMQ支持事务,当生产者将一个持久化消息发送给服务器时,如果服务器崩溃,没有持久化该消息,生产者无法获知该消息已经丢失。

  3. 发送端的消息确认机制:生产者可以通过设置消息的确认模式(confirm mode),在消息发送到RabbitMQ后等待RabbitMQ的确认响应,以确保消息成功发送到RabbitMQ。

  4. 消费端的消息确认机制:消费者可以通过设置消息的确认模式(ack mode),在成功处理消息后向RabbitMQ发送确认响应,以确保消息被成功消费。

  5. 异常捕获机制:RabbitMQ提供了异常捕获机制,可以在消息发送或接收过程中出现异常时进行捕获和处理,避免因异常导致消息丢失或未处理。

  6. 高可用集群:RabbitMQ支持高可用集群,多个RabbitMQ节点可以组成一个集群,共同分担消息存储和处理的任务,提高系统的可靠性和稳定性。

  7. 限流机制:在消费端,RabbitMQ提供了限流机制,可以限制消费者接收消息的速度,避免因消息处理速度过快导致消息丢失或处理失败。

  8. 幂等性:RabbitMQ支持幂等性操作,即对同一个队列进行多次相同的操作只会产生一次效果,避免因重复操作导致数据不一致。

RabbitMQ组件功能

RabbitMQ中有几个非常重要的组件:服务实体(Broker)、虚拟主机(Virtual Host)、交换机(Exchange)、队列(Queue)和绑定(Binging)等,如图12-2所示。

在这里插入图片描述

交换机

交换机(Exchange)的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,只是把消息分发给各自的队列。但是我们给交换机发送消息,**它怎么知道给哪个消息队列发送呢?**这里就要用到RoutingKeyBindingKeyBindingKey是交换机和消息队列绑定的规则描述。RoutingKey是消息发送时携带的消息路由信息描述。当消息发送到交换机(Exchange)时,通过消息携带的RoutingKey与当前交换机所有绑定的BindingKey进行匹配,如果满足匹配规则,则往BindingKey所绑定的消息队列发送消息,这样就解决了向RabbitMQ发送一次消息,可以分发到不同的消息队列,实现消息路由分发的功能

交换机有DirectTopicHeadersFanout四种消息分发类型。不同的类型在处理绑定到队列方面的行为时会有所不同。

1)Direct:其类型的行为是“先匹配,再发送”,即在绑定时设置一个BindingKey,当消息的RoutingKey匹配队列绑定的BindingKey时,才会被交换机发送到绑定的队列中。

在这里插入图片描述

2)Topic:按规则转发消息(最灵活)。支持用“”或“#”的模式进行绑定。“”表示匹配一个单词,“#”表示匹配0个或者多个单词。比如,某消息队列绑定的BindingKey为“*.user.#”时,能够匹配到RoutingKey为usd.user和eur.user.db的消息,但是不匹配user.hello。

在这里插入图片描述

3)Headers:设置header attribute参数类型的交换机。根据应用程序消息的特定属性进行匹配,这些消息可能在绑定key中标记为可选或者必选。

在这里插入图片描述

4)Fanout:转发消息到所有绑定队列(广播)。将消息广播到所有绑定到它的队列中,而不考虑队列绑定的BindingKey的值。

在这里插入图片描述

消息发送模式

  1. 基本消息模型 (简单队列) :这是RabbitMQ的核心,生产者不断向队列中添加新的消息,消费者则不断的从队列中获取并且处理这些消息。

    在这里插入图片描述

  2. 工作消息模型 (Work模式) :这种模型下,当消费者无法处理消息时,会将该消息重新放回队列中,以便其他消费者进行处理。

    • 一个生产者、2个消费者

    • 一个消息只能被一个消费者获取

    在这里插入图片描述

  3. 发布/订阅模型 (Direct模式) :在发布/订阅模型中,生产者将消息发送到交换机,然后交换机将消息路由到一个或多个队列,最后消费者从队列中获取并处理这些消息。

    • 1个生产者,多个消费者
    • 每一个消费者都有自己的一个队列
    • 生产者没有将消息直接发送到队列,而是发送到了交换机
    • 每个队列都要绑定到交换机
    • 生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
      注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费

在这里插入图片描述

  1. 路由模型 ( Fanout) :路由模型和发布/订阅模型相似,但不同之处在于交换机根据路由键来决定如何路由消息。

    • Fanout,也称为广播模式;

    • 可以有多个消费者;

    • 每个消费者有自己的queue(队列)

    • 每个队列都要绑定到Exchange(交换机)

    • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。

    • 交换机把消息发送给绑定过的所有队列

    • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

    在这里插入图片描述

  1. 主题模型 ( Topic):在这种模型中,交换机将根据路由键和绑定键(主题)来决定如何路由消息。

    • 通配符模式

    • 同一个消息被多个消费者获取。

    • 一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

在这里插入图片描述

消息确认机制

虽然使用RabbitMQ可以降低系统的耦合度,提高整个系统的高并发能力,但是也使得业务变得复杂,可能造成消息丢失,导致业务中断的情况。

消息确认的场景

使用RabbitMQ很可能造成消息丢失,导致业务中断的情况,

例如:

  • 生产者发送消息到RabbitMQ服务器失败。

  • RabbitMQ服务器自身故障导致消息丢失。

  • 消费者处理消息失败。

针对上面的情况,RabbitMQ提供了多种消息确认机制,确保消息的正常处理,主要有生产者消息确认机制Return消息机制消费端ACK和Nack机制3种消息确认模式。

生产者消息确认机制

生产者消息的确认是指生产者发送消息后,如果Broker收到消息,则会给生产者一个应答。生产者接收应答,用来确定这条消息是否正常地发送到Broker,这种方式也是消息可靠性发送的核心保障。如图12-21所示,当生产者发送消息到MQ Broker时,Broker会发送一个确认(Confirm),通知发送端已经收到此消息。

在这里插入图片描述

Return机制

我们知道,消息生产者通过指定一个Exchange和routingKey将消息送达某一个队列中,然后消费者监听队列进行消费处理操作。在某些情况下,如果在发送消息的时候,当前的Exchange不存在或者指定的routingKey路由不到,这个时候如果需要监听这种不可达的消息,可以使用RabbitMQ提供的Return机制处理一些不可路由的消息,如图12-24所示。

在这里插入图片描述

消费端ACK和NACK机制

消费者在处理消息时,由于业务异常,我们可以进行日志的记录,然后进行补偿。但是,如果由于服务器宕机等严重问题无法记录日志,如何确保消息被正确处理呢?这就需要消费端ACK和NACK机制,手工进行ACK确认,保障消费者成功处理消息,把未成功处理的消息再次发送,直到消息处理成功。RabbitMQ消费端的确认机制分为3种:nonemanualauto(默认)

none:表示没有任何应答会被发送。

manual:表示监听者必须通过调用channel.basicAck()来告知消息被处理。

① channel.basicAck(long,boolean):确认收到消息,消息将从队列中被移除,为false时只确认当前一个消费者收到的消息,为true时确认所有消费者收到的消息。

② channel.basicNack(long,boolean,boolean):确认没有收到消息,第一个boolean表示是一个消费者还是所有的消费者,第二个boolean表示消息是否重新回到队列,为true时表示重新入队。

③ channel.basicReject(long,boolean):拒绝消息,requeue=false表示消息不再重新入队,如果配置了死信队列,则消息进入死信队列。消息重回队列时,该消息不会回到队列尾部,仍在队列头部,这时消费者又会接收到这条消息,如果想让消息进入队列尾部,需确认消息后再次发送消息。

**auto:**表示自动应答,除非MessageListener抛出异常,这是默认配置方式。

① 如果消息成功处理,则自动确认。

② 当发生异常抛出AmqpRejectAndDontRequeueException时,则消息会被拒绝且不重新进入队列。

③ 当发生异常抛出ImmediateAcknowledgeAmqpException时,则消费者会被确认。

仍在队列头部,这时消费者又会接收到这条消息,如果想让消息进入队列尾部,需确认消息后再次发送消息。

**auto:**表示自动应答,除非MessageListener抛出异常,这是默认配置方式。

① 如果消息成功处理,则自动确认。

② 当发生异常抛出AmqpRejectAndDontRequeueException时,则消息会被拒绝且不重新进入队列。

③ 当发生异常抛出ImmediateAcknowledgeAmqpException时,则消费者会被确认。

④ 当抛出其他的异常时,则消息会被拒绝,且requeue=true时会发生死循环,可以通过setDefaultRequeueRejected(默认是true)设置抛弃消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1324755.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Bert-vits2-v2.2新版本本地训练推理整合包(原神八重神子英文模型miko)

近日,Bert-vits2-v2.2如约更新,该新版本v2.2主要把Emotion 模型换用CLAP多模态模型,推理支持输入text prompt提示词和audio prompt提示语音来进行引导风格化合成,让推理音色更具情感特色,并且推出了新的预处理webuI&am…

图神经网络 (GNN) 概述

GNN 作者 with DALLE 3 一、说明 神经网络是受人脑工作启发的计算模型,能够从复杂的非结构化数据(如图像、文本、音频和视频)中学习。然而,还有许多其他类型的数据无法用传统的神经网络轻松表示,例如那些具有图形结构的…

实验4.2 默认路由和浮动静态路由的配置

实验4.2 默认路由和浮动静态路由的配置 一、任务描述二、任务分析三、具体要求四、实验拓扑五、任务实施1.路由器的基本配置。2.配置默认路由,实现全网互通。3.配置浮动静态路由,实现链路备份。 六、任务验收七、任务小结八、知识链接1.默认路…

tensorflow入门 自定义模型

前面说了自定义的层,接下来自定义模型,我们以下图为例子 这个模型没啥意义,单纯是为了写代码实现这个模型 首先呢,我们看有几个部分,dense不需要我们实现了,我们就实现Res,为了实现那个*3,我们…

Axure情形动作篇(ERP登录效验)

目录 一、ERP系统用户登录效验 1.1 完成步骤 1.2 最终效果 二、省市区联动 三、ERP菜单栏页面跳转 四、下拉加载效果实现 4.1 加载动画实现步骤 4.2 下划界面加载实现 4.3 最终效果 一、ERP系统用户登录效验 1.1 完成步骤 首先搭建ERP系统的登录界面(输入…

Codeforces Round 916 (Div. 3)(G未补)

目录 A. Problemsolving Log B. Preparing for the Contest C. Quests D. Three Activities E1.E2. Game with Marbles F. Programming Competition A. Problemsolving Log 题意:A任务需要一分钟完成,B任务需要两分钟完成,……以此类推…

Dockerfile指令参考

写在前面 这里是原文链接,本文学习Dockerfile中的指令。 指令表格 指令描述ADD添加本地文件或远程文件到imageARG环境变量CMD运行container时执行的命令COPY复制文件或目录到imageENTRYPOINT运行container时执行的命令(优先级高)ENV环境变…

Hudi Clustering

核心概念 Hudi Clustering对于在数据写入和读取提供一套相对完善的解决方案。它的核心思想就是: 在数据写入时,运行并发写入多个小文件,从而提升写入的性能;同时通过一个异步(也可以配置同步,但不推荐&…

SQL注入绕过正则及无列名注入

渗透测试 一、select\b[\s\S]*\bfrom正则二、科学计数法绕过三、过滤information四、无列名注入1、利用 join-using 注列名。2、无列名查询 五、报错注入7大常用函数1.ST_LatFromGeoHash()(mysql>5.7.x)payload 2.ST_LongFromGeoHash(mysq…

3 - Electron app BrowserWindow对象-关于窗口

优雅的打开应用~ 当加载缓慢,打开应用的一瞬间会出现白屏,以下方法可以解决 const mainWindow new BrowserWindow({ show: false }) mainWindow.once(ready-to-show, () > {mainWindow.show() }) 设置背景颜色 const win new BrowserWindow({ b…

OpenHarmony应用开发环境搭建指南

OpenHarmony的应用开发主要是基于Deveco Studio(目前只支持Windows及Mac平台)搭配相应的SDK进行,现对开发环境的搭建进行说明。 1:Deveco下载安装 下载对应平台的安装包即可。接下来以Windows平台为例,进行开发环境的搭建。 下载…

C#文件操作(一)

一、前言 学习心得:C# 入门经典第8版书中的第20章《文件》 二、操作文件的相关类 在C#应用程序中Syste.IO名称空间包含用于在文件中读写数据的类。在此我列举一下File、Directory、Path、FileInfo、DirectoryInfo、FileSystemInfo、FileSystemWatcher。其中在Syste…

pnpm :无法加载文件 D:\nodejs\node_global\pnpm.ps1,因为在此系统上禁止运行脚本

目录 一、问题描述 二、原因分析 三、解决问题 一、问题描述 pnpm : 无法加载文件 D:\learningsoftware\nodejs\node_global\pnpm.ps1,因为在此系统上禁止运行脚本。有关详细信息,请参阅 https:/go.microsoft.com/fwlink/?LinkID1351 70 中的 a…

文件包含 [SWPUCTF 2021 新生赛]include

打开题目 要求我们传入一个file进去,那我们get传入 /?file1 得到源码,并且提示我们flag在flag,php下 在源代码中,我们看见了allow_url_include函数,我们知道这涉及到文件包含。 一般默认allow_url_fopen是on的,那在…

线性回归中的似然函数、最大似然估计、最小二乘法怎么来的(让你彻底懂原理)收官之篇

图1 图2 图3 图4 问1:为什么要引入似然函数? 在线性回归中引入似然函数是为了通过概率统计的方法对模型参数进行估计。简单来说,我们希望找到一组参数,使得我们观测到的数据在给定这组参数的情况下最有可能发生。 问:1&#xf…

0155 - Java 数组

1 数组介绍 数组可以存放多个同一类型的数据。数组也是一种数据类型,是引用类型。 即:数(数据)组(一组)就是一组数据 2 数组的使用 2.1 使用方式一 2.2 使用方式二 3 数组使用注意事项和细节 数组是多个相同类型数据的组合,实现对这些数据…

Android Canvas状态save与restore,Kotlin

Android Canvas状态save与restore,Kotlin private fun f1() {val bitmap BitmapFactory.decodeResource(resources, R.mipmap.pic).copy(Bitmap.Config.ARGB_8888, true)val canvas Canvas(bitmap)val paint Paint(Paint.ANTI_ALIAS_FLAG)paint.color Color.RED…

信息收集 - 网站架构

网站架构组成 通常,一个典型的网站架构包括以下组件: 动态脚本语言:动态脚本语言用于处理网站的逻辑和动态内容生成。常见的动态脚本语言包括PHP、Python、Ruby和Node.js等。这些脚本语言可以根据用户请求生成动态的网页内容。 数据库:数据库用于存储网站的数据,包括用户…

网易面试:亿级用户,如何做微服务底层架构?

尼恩说在前面 在40岁老架构师 尼恩的读者交流群(50)中,最近有小伙伴拿到了一线互联网企业网易、美团、字节、如阿里、滴滴、极兔、有赞、希音、百度、美团的面试资格,遇到很多很重要的面试题: 微服务改造,你是怎么做的&#xff1…

TransXNet实战:使用 TransXNet实现图像分类任务(二)

文章目录 训练部分导入项目使用的库设置随机因子设置全局参数图像预处理与增强读取数据设置Loss设置模型设置优化器和学习率调整策略设置混合精度,DP多卡,EMA定义训练和验证函数训练函数验证函数调用训练和验证方法 运行以及结果查看测试完整的代码 在上…