RabbitMQ 2025/3/5

news2025/3/10 21:14:03

高性能异步通信组件。

同步调用

以支付为例:

可见容易发生雪崩。

异步调用

以支付为例:

支付服务当甩手掌柜了,不管后面的几个服务的结果。只管库库发,后面那几个服务想取的时候就取,因为消息代理里可以一直装,缓存消息。

消息代理(英文Broker)

消息代理相关的技术MQ技术

Erlang面向并发的语言

 

RabbitMQ安装部署

可直接采用Docker,方便。

RabbitMQ消息流转的过程(整体架构)

消费者监听队列,发送者不直接发给队列,而是发给exchange交换机,交换机会根据规则把消息路由给不同队列。

因为RabbitMQ的性能很强,每秒钟可以达到数万的并发,所以企业有多个项目的话,往往部署一套RabbitMQ就够了,多个项目可以共享RabbitMQ服务。但是大家一起的话,很可能交换机出现冲突,所以引出RabbitMQ的新概念virtual-host(虚拟主机),类似与MySQL里的database(众所周知,MySQL里可以创建多个database,每个database的表和其他database的表直接是相互隔离的),每个项目都创一个自己的virtual-host,就可以相互隔离开了。

例子:MQ入门-06.RabbitMQ-快速入门_哔哩哔哩_bilibili

交换机不存只负责转发。交换机和队列必须有一个关系,才能给队列发消息。binding绑定关系。

数据隔离

虚拟主机是实现了数据隔离。

不同的项目创建不同的用户。为新建用户建一个虚拟主机。

视频举例:MQ入门-07.RabbitMQ-数据隔离_哔哩哔哩_bilibili

RabbitMQ的java客户端

这里我们不采用Rabbit官方提供的java客户端,而是Spring AMQP,它是基于AMQP协议(消息收发与语言和平台无关),官方提供的java客户端使用起来繁琐,所以使用Spring AMQP。

4:00  MQ入门-08.Java客户端-快速入门_哔哩哔哩_bilibili

因为是简单入门案例,可以省去交换机这个步骤。

控制台创建队列->pom里引入spring-amqp依赖->yaml里配置RabbitMQ服务端信息(如地址主机名、端口、虚拟主机名、用户、密码),这样微服务才能连接到RabbitMQ->发送消息(SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息)->接受消息

接收者接受消息的代码:加一个@Comonent把它注册成Spring的一个Bean,这个类的内部要有一个方法,这个方法要加上@RabbitListener(也就是消费者的监听者)的注解后面再带上队列名,现在只要队列有消息,方法就可以拿到了。方法参数自己设。

WorkQueues

任务模型,让多个消费者绑定到一个队列,共同消费队列中的消息。

那么队列中的消息会被哪个消费者收到呢?

模拟WorkQueue(实现一个队列绑定多个消费者)

MQ入门-09.Java客户端-WorkQueue_哔哩哔哩_bilibili

在RabbitMQ控制台建立一个队列->一个发消息的->2个消费者

下图两个消费者,一个发送者。

 

队列同一个消息只能被一个消费者处理,很多条消息的话均匀分配(默认轮询,1人1条)。(上图2个消费者结果有奇偶规律)

把很多消息平均分给消费者,可以加快消息处理速度。

每个消费者能力不一样(通过sleep(ms)修改),均匀分配消息(默认)肯定是不合理的,只需要修改application.yml,设置preFetch值为1,实现能者多劳。

Fanout交换机(广播交换机)

再看一下以前学的这个发消息的过程。带有交换机的完整模型。

交换机的作用是接受发送者发送的消息,并将消息路由到与其绑定的队列。

那么什么是Fanout交换器?

特征:把接收到的消息路由到每一个和他绑定的队列。(队列中的消息只能被一个消费者处理,有了Fanout交换机,发的消息就可以被多个消费者处理了。我们完全可以给每个微服务创建一个队列,然后队列绑到交换机上,fanout交换机想广播一样,给每个队列群发/复制消息。)

案例步骤:声明2个队列,一个交换机exhanges,然后banding绑定,第一种方法可以直接用控制台发消息然后查看,还有一种方法java代码,2段接受代码,1段发送代码(这回调3个参数的api,exchange,null,message,之前写的例子用到的都是两个参数,队列名和消息)。

Direct交换机(定向)

MQ入门-11.Java客户端-Direct交换机_哔哩哔哩_bilibili

和原来差不多,这回交换机选direct,队列设置bindingkey,发送时发送者的参数分别设好exchange,routingkey值,message。其余的步骤和原来一样。

所以两个key值一样,direct交换机也能实现Fanout交换机的功能。一个queue可以设多个bindingkey。

Topic交换机(话题)

 队列bindingkey可以通过通配符简易设置。

topic和direct比除了多了一个统配符,功能差不多。

声明队列交换机

之前队列和交换机的创建都是依靠控制台,这次学习用代码声明队列交换机,这样项目一启动就会自己创建队列和交换机了。

可以用new的方式。(更简单)

也可以用builder方式。

发送者只管发,什么也不关心,所以通常我们在消费者这一端,声明队列交换机及绑定关系。

步骤(以Fanout交换机为例):创建FanoutConfiguration类,声明交换机,队列,绑定关系。可以new(比较简单),也可以builder构建(比较专业)。

上述基于JavaBean绑定太麻烦了,这回学习注解@RatbbitListener。

基于JavaBean还是基于注解,完全个人喜好了。(但是如果基于javabean,声明direct交换机好像没法写很多bindingKey)

消息转换器

以前我们发消息一直用到rabbitTemplate的convertAndSend方法。我们发消息是可以传任何java对象作为消息,网络传输其实是以字节传的,直接对象不行,因此传的时候要把java对象转换成字节。这个转换就是由消息转换器转的。

java里有一种JDK自带的序列化的一个工具(能把任一java对象序列化成字节的形式),所以这个消息转换的过程就是采用JDK自带的序列化方式。

ObjectOutputStream:对象流,jdk自带的序列化工具,能把任意java对象序列化成字节。

但是推荐使用JSON的消息转换器。

发送方和接收方一定要用相同的消息转换器。

pom配置好之后到MAVEN里刷新一下。

给发送方和接收方都配置一个消息转换类。

对比图(下边的JSON的消息转换器)

业务改造

把业务从OpenFeign的同步调用改成基于MQ的异步调用。

OpenFeign:02-基本概念_哔哩哔哩_bilibili

nacos可以抽取共享配置,不用重复进行同样的配置。

共同代码可以写到common里

消息可靠性

发送者的可靠性

发送者重连

yml

发送者确认

路由失败的两种原因:

exchange2交换机没连队列,路由失败。返回ACK(因为消息确实发出去了)

routingkey和bandingkey没有匹配上的。

如果返回NACK要重发消息。

none是关闭(默认)

simple是同步阻塞

correlated异步的

至此,ConfirmCallback和ReturnCallback全部写完了。

唯一,uuid,最简单的随机算法

MQ的可靠性

消息传到MQ也不一样可靠,因为MQ本身也可能把消息弄丢。

MQ内存储存的方式,可能弄丢消息,还有可能导致MQ性能下降和阻塞。

为此,产生两个方案:数据持久化(把数据提前持久化存到磁盘,提前的意思:不是等到满了再存到磁盘,提前就开始往磁盘存了)、LazyQueue

数据持久化

持久化的一种方法:写出到磁盘,这样就永久保存了。

交换机的持久化(默认都是持久的交换机、durable属性)(Spring的AMQP代码生成的交换机默认也是持久化的)

队列持久化(默认持久化durable属性)(Spring的AMQP代码生成的默认也是持久化的)

消息持久化(手动设置为Persistent)

LazyQueue

上节课写了数据持久化,当我们把交换机、队列、消息持久化了以后,就不用再担心MQ宕机而导致消息丢失了,不仅如此,RabbitMQ也不会再因为消息堆积配置out而出现阻塞了。但是数据持久化后,不仅要在内存里写,还要再磁盘里写一份,这样每条消息处理的耗时就增加了,这也就导致它的整体并发能力有点下降。为了解决这个问题,引入了一种新的队列模式,lazy Queue(消息直接写磁盘里)。它不仅仅具有数据持久化的优势,同时还解决了并发能力下降的问题。

那么怎样去设置一个队列变成Lazy Queue模式呢

第一种方法,控制台添加

第二种方法,代码添加(声明bean的方式或者注解都可以)

消费者的可靠性

消费者确认机制

三种状态(ack,nack,reject):不管哪种状态,都不能在刚收到消息的时候就返回,我们应该根据处理结果去做判断,也就是consumer处理完的时候。

注意:这回配置的是消费者下的application文件。

有个缺点:消息异常了就会像踢球一样反复被踢来踢去,下面学的失败重试机制就可以解决这个问题(设置最大尝试次数)

失败重试机制

第一种:重试次数耗尽后直接reject,不要了

第二种:重试次数耗尽后,返回nack,消息回到队列,再重试

第三种:重试次数耗尽后,消费者将失败消息扔进另一个交换机,这个交换机连着另一个队列

下面以第三种方式为例(相比于前两种更复杂,多了一个交换机和一个新队列)

业务幂等性

解决重复消费问题(比如事务发到消费者那了,本来消费者要给MQ返回一个ACK,因为断电没发送出去,MQ以为消费者宕机了,于是恢复电的时候,MQ又会重新给消费者发送一次,可消费者已经处理完这个事务了,如果这个事务涉及到支付,那消费者就会白白消费两次。)这时就用到业务幂等性了。

下面以消息重复提交导致业务被重复执行的这个场景为例的几个方案:

1.唯一id(我们可以给消息带上唯一的id)

这种方案存在业务侵入的问题:本来是没有id这个属性的,现在的消息都带了id属性,并且还放在了数据库里!!!!!

2.业务判断

加了一个业务判断,当订单来了后,不急着标记订单为已支付,而是每次都在标记前查询一下订单状态。

延迟消息

在上面三个可靠性都失败下的兜底方案——延迟消息

死信交换机

一旦我们通过dead-letter-exchange属性指定了一个交换机,那么死信就都会被投到这个交换机里,不会被丢弃了。

我们已经有了一组交换机和队列,再准备一组特殊的交换机和队列,给上面的队列通过dead-letter-exchange属性指定下面的交换机,现在下面的交换机就变成上面的队列的死信交换机了。我们不会给上面的队列绑定消费者,通常是给死信交换机的队列绑定消费者。发送者向上面的交换机发消息,发消息时给消息设置过期时间TTL=30s,然后消息就发到上面的队列,因为上面的队列没有消费者,就在这里停下来了开始等待,有个计时器开始等待数30秒,30秒后消息变成死信进入死信交换机。这样就通过死信交换机的形式实现了延迟效果。

延迟消息插件

用了这个插件,只要简单的把delay属性配置成true就好了,然后就可以实现延迟消息。

步骤:首先下载配置好插件->声明交换机并设置其delay属性为true(可用注解或bean方式)->发消息的时候要设置一个过期时间

利用延迟消息解决用户订单超时未支付问题

可以分成几步:下单之后发消息,完成延迟消息的发送。编写消息监听,它要干的事是去修改订单状态。利用延迟消息实现延时任务的效果。

MQ高级-13.延迟消息-取消超时订单_哔哩哔哩_bilibili

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2312887.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JSP+Servlet实现对数据库增删改查功能

前提概要 需要理解的重要概念 ​MVC模式: Model(person类):数据模型View(JSP):显示界面Controller(Servlet):处理业务逻辑 ​请求流程: 浏览器 …

C++【类和对象】

类和对象 1.this 指针2.类的默认成员函数3.构造函数4.析构函数5.拷贝构造函数 1.this 指针 接上文 this指针存在内存的栈区域。 2.类的默认成员函数 定义:编译器自动生成的成员函数。一个类,我们不写的情况下会默认生成六个成员函数。 3.构造函数 函…

GStreamer —— 2.13、Windows下Qt加载GStreamer库后运行 - “教程13:播放控制“(附:完整源码)

运行效果(音频) 简介 上一个教程演示了GStreamer工具。本教程介绍视频播放控制。快进、反向播放和慢动作都是技术 统称为 Trick Modes,它们都有一个共同点 修改 Normal playback rate。本教程介绍如何实现 这些效果并在交易中添加了帧步进。特别是,它 显…

MongoDB winx64 msi包安装详细教程

首先我们可以从官网上选择对应版本和对应的包类型进行安装: 下载地址:Download MongoDB Community Server | MongoDB 这里可以根据自己的需求, 这里我选择的是8.0.5 msi的版本,采用的传统装软件的方式安装。无需配置命令。 下载…

WinUI 3 支持的三种窗口 及 受限的窗口透明

我的目标 希望能够熟悉 WinUI 3 窗口的基本使用方式,了解可能出现的问题 。 WinUI 3 支持三种窗口模式,分别为:常规窗口模式、画中画模式、全屏模式。 窗口模式:常规 即我们最常见的普通窗口。 支持:显示最大化按钮…

如何借助 ArcGIS Pro 高效统计基站 10km 范围内的村庄数量?

在当今数字化时代,地理信息系统(GIS)技术在各个领域都发挥着重要作用。 特别是在通信行业,对于基站周边覆盖范围内的地理信息分析,能够帮助我们更好地进行网络规划、资源分配以及市场分析等工作。 今天,就…

Linux网络之数据链路层协议

目录 数据链路层 MAC地址与IP地址 数据帧 ARP协议 NAT技术 代理服务器 正向代理 反向代理 上期我们学习了网络层中的相关协议,为IP协议。IP协议通过报头中的目的IP地址告知了数据最终要传送的目的主机的IP地址,从而指引了数据在网络中的一步…

如何使用 PyInstaller 打包 Python 脚本?一看就懂的完整教程!

PyInstaller 打包指令教程 1. 写在前面 通常,在用 Python 编写完一个脚本后,需要将它部署并集成到一个更大的项目中。常见的集成方式有以下几种: 使用 PyInstaller 打包。使用 Docker 打包。将 Python 嵌入到 C 代码中,并封装成…

解锁DeepSpeek-R1大模型微调:从训练到部署,打造定制化AI会话系统

目录 1. 前言 2.大模型微调概念简述 2.1. 按学习范式分类 2.2. 按参数更新范围分类 2.3. 大模型微调框架简介 3. DeepSpeek R1大模型微调实战 3.1.LLaMA-Factory基础环境安装 3.1大模型下载 3.2. 大模型训练 3.3. 大模型部署 3.4. 微调大模型融合基于SpirngBootVue2…

Hadoop、Hive、Spark的关系

Part1:Hadoop、Hive、Spark关系概览 1、MapReduce on Hadoop 和spark都是数据计算框架,一般认为spark的速度比MR快2-3倍。 2、mapreduce是数据计算的过程,map将一个任务分成多个小任务,reduce的部分将结果汇总之后返回。 3、HIv…

基于VMware虚拟机的Ubuntu22.04系统安装和配置(新手保姆级教程)

文章目录 一、前期准备1. 硬件要求2. 软件下载2-1. 下载虚拟机运行软件 二、安装虚拟机三、创建 Ubuntu 系统虚拟机四、Ubuntu 系统安装过程的配置五、更换国内镜像源六、设置静态 IP七、安装常用软件1. 编译工具2. 代码管理工具3. 安装代码编辑软件(VIM&#xff09…

基于SpringBoot的历史馆藏系统设计与实现(源码+SQL脚本+LW+部署讲解等)

专注于大学生项目实战开发,讲解,毕业答疑辅导,欢迎高校老师/同行前辈交流合作✌。 技术范围:SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容:…

蓝桥杯[每日两题] 真题:好数 神奇闹钟 (java版)

题目一:好数 题目描述 一个整数如果按从低位到高位的顺序,奇数位(个位、百位、万位 )上的数字是奇数,偶数位(十位、千位、十万位 )上的数字是偶数,我们就称之为“好数”。给定…

基于BMO磁性细菌优化的WSN网络最优节点部署算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 5.完整程序 1.程序功能描述 无线传感器网络(Wireless Sensor Network, WSN)由大量分布式传感器节点组成,用于监测物理或环境状况。节点部署是 WSN 的关键问…

学习笔记:Python网络编程初探之基本概念(一)

一、网络目的 让你设备上的数据和其他设备上进行共享,使用网络能够把多方链接在一起,然后可以进行数据传递。 网络编程就是,让在不同的电脑上的软件能够进行数据传递,即进程之间的通信。 二、IP地址的作用 用来标记唯一一台电脑…

Laya中runtime的用法

文章目录 0、环境:2.x版本1、runtime是什么2、使用实例情景需要做 3、script组件模式 0、环境:2.x版本 1、runtime是什么 简单来说,如果创建了一个scene,加了runtime和没加runtime的区别就是: 没加runtime&#xff…

OpenCV计算摄影学(16)调整图像光照效果函数illuminationChange()

操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 对选定区域内的梯度场应用适当的非线性变换,然后通过泊松求解器重新积分,可以局部修改图像的表观照明。 cv::illuminati…

【爬虫】开篇词

一、网络爬虫概述 二、网络爬虫的应用场景 三、爬虫的痛点 四、需要掌握哪些技术? 在这个信息爆炸的时代,如何高效地获取和处理海量数据成为一项核心技能。无论是数据分析、商业情报、学术研究,还是人工智能训练,网络爬虫&…

力扣-股票买入问题

dp dp元素代表最大利润 f[j][1] 代表第 j 次交易后持有股票的最大利润。在初始状态,持有股票意味着你花钱买入了股票,此时的利润应该是负数(扣除了买入股票的成本),而不是 0。所以,把 f[j][1] 初始化为负…

微服务保护:Sentinel

home | Sentinelhttps://sentinelguard.io/zh-cn/ 微服务保护的方案有很多,比如: 请求限流 线程隔离 服务熔断 服务故障最重要原因,就是并发太高!解决了这个问题,就能避免大部分故障。当然,接口的并发…