MQ消息队列

news2024/11/16 15:35:07

mq

    • 1、什么是消息队列
      • 1.1 MQ基本框架
      • 1.2 消息队列的优点
      • 1.3 消息队列的缺点
      • 1.4 消息队列比对
    • 2、RabbitMQ
      • 2.1、RabbitMQ如何保证消息不被重复消费
      • 2.2、RabbitMQ如何保证消息不丢失
        • 2.2.1 生产者丢数据
        • 2.2.2 消息队列丢数据
        • 2.2.3 消费者丢数据
      • 2.3、RabbitMQ如何保证消息有序
      • 2.4、RabbitMQ如何处理消息堆积情况
        • 2.4.1 原因
        • 2.4.2 临时扩容,快速处理积压的消息
        • 2.4.3 恢复队列中丢失的数据
        • 2.4.4 MQ长时间未处理导致MQ写满的情况如何处理
      • 2.5、RabbitMQ如何保证消息队列的高可用
        • 2.5.1 单机模式
        • 2.5.2 普通集群模式
        • 2.5.3 镜像集群模式
    • 3、Kafka
      • 3.1、Kafka如何保证消息不丢失
      • 3.2、Kafka如何保证消息有序
      • 3.3、Kafka如何实现幂等
      • 3.4、Kafka消费重试机制

1、什么是消息队列

消息队列是用队列这种数据结构存储消息,一般用于进程间通信或线程间通信。

1.1 MQ基本框架

JMS中规定的消费模式有两种:P2P模式、Pub/Sub模式。针对这两种模式存在两种MQ基础框架:

  • P2P模式:
    P2P模式包含三个角色:消息队列(Queue)、发送者(Sender)、接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。接收者在成功接收消息之后需向队列应答成功。
    在这里插入图片描述
  • Pub/Sub模式:
    Pub/Sub模式包含三个角色:主题(Topic)、发布者(Publisher)、订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
    在这里插入图片描述

1.2 消息队列的优点

(1)消息通信
MQ的基础功能即为消息通信。使用MQ的客户端可以将消息发送到MQ中,也可以从MQ中消费消息。
(2)异步处理
生产者将消息生产放入MQ中,可以继续生产其他消息,不要求获得消费者的消费信号之后再生产。MQ也可以将RPC异步化。传统的RPC直接调用,可以转化为调用方生产消息,将原本RPC请求的内容放在消息体中,然后由RPC的被调用方去消费消息并进行处理。如果有返回结果,可以将结果再通过另外一个队列发送给调用方。
(3)应用解耦
将系统按照不同的业务功能拆分出来,消息生产者只管把消息发布到 MQ 中而不用管谁来取,消息消费者只管从 MQ 中取消息而不管是谁发布的。消息生产者和消费者都不知道对方的存在;
(4)削峰/限流:将所有请求都写到消息队列中,消费服务器按照自身能够处理的请求数从队列中拿到请求,防止请求并发过高将系统搞崩溃;

1.3 消息队列的缺点

(1)系统的可用性降低:系统引用的外部依赖越多,越容易挂掉,如果MQ 服务器挂掉,那么可能会导致整套系统崩溃。这时就要考虑如何保证消息队列的高可用了
(2)系统复杂度提高:加入消息队列之后,需要保证消息没有重复消费、如何处理消息丢失的情况、如何保证消息传递的有序性等问题;
(3)数据一致性问题:A 系统处理完了直接返回成功了,使用者都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,就会导致数据不一致了

1.4 消息队列比对

ActiveMQRabbitMQRocketMQKafka
开发语言JavaerlangJavascala
单机吞吐量万级(较低)万级(较低)10万级,支持高吞吐10万级,支持高吞吐
时效性ms级us级,延迟最低ms级ms级以内
可用性高,基于主从架构实现高可用高,基于主从架构实现高可用非常高,分布式架构非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性有较低的概率丢失数据基本不丢失经过参数化配置,可以做到0丢失经过参数化配置,可以做到0丢失
优点功能成熟强大,在业内大量的公司和项目都有应用基于erlang开发,并发能力强,性能极好,延时很低,管理界面非常棒,社区活跃1. 性能、吞吐量经过阿里高并发业务的验证 2. 可靠性和可用性都不错、分布式扩展也方便 3. 源码是Java,方便公司定制化开发,支持复杂MQ业务场景提供超高的吞吐量,极高的可靠性和可用性,分布式支持任意扩展
缺点1. 有较低的概率丢失数据 2. 官方社区对ActiveMQ维护越来越少1. 吞吐量较低 2. 基于erlang开发,国内缺乏对erlang源码级别的研究和定制的能力,依赖开源社区的维护和修复bug 2. 集群动态扩展会很麻烦1. 社区活跃一般,阿里出台的技术,有可能这个技术被抛弃 2. 接口没有按照标准的JMS规范,有些系统迁移需要修改大量代码1. 功能较为简单,只支持主要的MQ功能 2. 有可能消息重复消费,会对数据准确性造成影响
功能特性主要用于解藕和异步来用,较少在大规模吞吐的场景中使用基于erlang开发,并发能力强,性能极好,延时很低,管理界面较丰富MQ功能较为完善,分布式扩展性好天然适合大数据领域的实时计算以及日志收集

2、RabbitMQ

RabbitMQ 是 AMQP(高级消息队列协议) 协议的一个开源实现。

2.1、RabbitMQ如何保证消息不被重复消费

正常情况下,消费者在消费消息后,会给消息队列发送一个确认,消息队列接收后就知道消息已经被成功消费了,然后就从队列中删除该消息,也就不会将该消息再发送给其他消费者了。不同消息队列发出的确认消息形式不同,RabbitMQ是通过发送一个ACK确认消息。但是因为网络故障,消费者发出的确认并没有传到消息队列,导致消息队列不知道该消息已经被消费,然后就再次消息发送给了其他消费者,从而造成重复消费的情况。
重复消费问题的解决思路是:保证消息幂等性

① 通过数据库:比如处理订单时,记录订单ID,在消费前,去数据库中进行查询该记录是否存在,如果存在则直接返回。
② 使用全局唯一ID,再配合第三组主键做消费记录,比如使用 redis 的 set 结构,生产者发送消息时给消息分配一个全局ID,在每次消费者开始消费前,先去redis中查询有没有消费记录,如果消费过则不进行处理,如果没消费过,则进行处理,消费完之后,就将这个ID以k-v的形式存入redis中(过期时间根据具体情况设置)。

2.2、RabbitMQ如何保证消息不丢失

对于消息的可靠性传输,每种MQ都要从三个角度来分析:生产者丢数据、消息队列丢数据、消费者丢数据。

2.2.1 生产者丢数据

RabbitMQ提供事务机制(transaction)和确认机制(confirm)两种模式来确保生产者不丢消息

(1)事务机制:
发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())

该方式的缺点是生产者发送消息会同步阻塞等待发送结果是成功还是失败,导致生产者发送消息的吞吐量降下降。

(2)确认机制:
生产环境常用的是confirm模式。生产者将信道 channel 设置成 confirm 模式,一旦 channel 进入 confirm 模式,所有在该信道上发布的消息都将会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个确认给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确到达目的队列了。如果rabbitMQ没能处理该消息,也会发送一个Nack消息给你,这时就可以进行重试操作。

Confirm模式最大的好处在于它是异步的,一旦发布消息,生产者就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者便可以通过回调方法来处理该确认消息。

2.2.2 消息队列丢数据

处理消息队列丢数据的情况,一般是开启持久化磁盘。持久化配置可以和生产者的 confirm 机制配合使用,在消息持久化磁盘后,再给生产者发送一个Ack信号。这样的话,如果消息持久化磁盘之前,即使 RabbitMQ 挂掉了,生产者也会因为收不到Ack信号而再次重发消息。

持久化设置如下(必须同时设置以下 2 个配置):
(1)创建queue的时候,将queue的持久化标志durable在设置为true,代表是一个持久的队列,这样就可以保证 rabbitmq 持久化 queue 的元数据,但是不会持久化queue里的数据;
(2)发送消息的时候将 deliveryMode 设置为 2,将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。

2.2.3 消费者丢数据

消费者丢数据一般是因为采用了自动确认消息模式。该模式下,虽然消息还在处理中,但是消费中者会自动发送一个确认,通知 RabbitMQ 已经收到消息了,这时 RabbitMQ 就会立即将消息删除。这种情况下,如果消费者出现异常而未能处理消息,那就会丢失该消息。

解决方案就是采用手动确认消息,设置 autoAck = False,等到消息被真正消费之后,再手动发送一个确认信号,即使中途消息没处理完,但是服务器宕机了,那 RabbitMQ 就收不到发的ack,然后 RabbitMQ 就会将这条消息重新分配给其他的消费者去处理。

但是 RabbitMQ 并没有使用超时机制,RabbitMQ 仅通过与消费者的连接来确认是否需要重新发送消息,也就是说,只要连接不中断,RabbitMQ 会给消费者足够长的时间来处理消息。另外,采用手动确认消息的方式,我们也需要考虑一下几种特殊情况:

  • 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ
    会认为消息没有被消费,然后重新分发给下一个订阅的消费者,所以存在消息重复消费的隐患
  • 如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息。

2、当设置 autoAck = False 时,如果忘记手动 ack,那么将会导致大量任务都处于 Unacked 状态,造成队列堆积,直至消费者断开才会重新回到队列。解决方法是及时 ack,确保异常时 ack 或者拒绝消息。
3、启用消息拒绝或者发送 nack 后导致死循环的问题:如果在消息处理异常时,直接拒绝消息,消息会重新进入队列。这时候如果消息再次被处理时又被拒绝 。这样就会形成死循环。

2.3、RabbitMQ如何保证消息有序

针对保证消息有序性的问题,解决方法就是保证生产者入队的顺序是有序的,出队后的顺序消费则交给消费者去保证。
(1)方法一:拆分queue,使得一个queue只对应一个消费者。由于MQ一般都能保证内部队列是先进先出的,所以把需要保持先后顺序的一组消息使用某种算法都分配到同一个消息队列中。然后只用一个消费者单线程去消费该队列,这样就能保证消费者是按照顺序进行消费的了。但是消费者的吞吐量会出现瓶颈。如果多个消费者同时消费一个队列,还是可能会出现顺序错乱的情况,这就相当于是多线程消费了
(2)方法二:对于多线程的消费同一个队列的情况,可以使用重试机制。

2.4、RabbitMQ如何处理消息堆积情况

2.4.1 原因

消息堆积往往是生产者的生产速度与消费者的消费速度不匹配导致的。有可能就是消费者消费能力弱,渐渐地消息就积压了,也有可能是因为消息消费失败反复复重试造成的,也有可能是消费端出了问题,导致不消费了或者消费极其慢。

2.4.2 临时扩容,快速处理积压的消息

(1)先修复 consumer 的问题,确保其恢复消费速度,然后将现有的 consumer 都停掉;
(2)临时创建原先 N 倍数量的 queue ,然后写一个临时分发数据的消费者程序,将该程序部署上去消费队列中积压的数据,消费之后不做任何耗时处理,直接均匀轮询写入临时建立好的 N 倍数量的 queue 中;
(3)接着,临时征用 N 倍的机器来部署 consumer,每个 consumer 消费一个临时 queue 的数据
(4)等快速消费完积压数据之后,恢复原先部署架构 ,重新用原先的 consumer 机器消费消息。

2.4.3 恢复队列中丢失的数据

如果使用的是 rabbitMQ,并且设置了过期时间,消息在 queue 里积压超过一定的时间会被 rabbitmq 清理掉,导致数据丢失。这种情况下,实际上队列中没有什么消息挤压,而是丢了大量的消息。所以就不能说增加 consumer 消费积压的数据了,这种情况可以采取 “批量重导” 的方案来进行解决。在流量低峰期,写一个程序,手动去查询丢失的那部分数据,然后将消息重新发送到mq里面,把丢失的数据重新补回来。

2.4.4 MQ长时间未处理导致MQ写满的情况如何处理

如果消息积压在MQ里,并且长时间都没处理掉,导致MQ都快写满了,这种情况肯定是临时扩容方案执行太慢,这种时候只好采用 “丢弃+批量重导” 的方式来解决了。首先,临时写个程序,连接到mq里面消费数据,消费一个丢弃一个,快速消费掉积压的消息,降低MQ的压力,然后在流量低峰期时去手动查询重导丢失的这部分数据。

2.5、RabbitMQ如何保证消息队列的高可用

RabbitMQ 是基于主从(非分布式)做高可用性的,RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

2.5.1 单机模式

一般没人生产用单机模式

2.5.2 普通集群模式

普通集群模式用于提高系统的吞吐量,通过添加节点来线性扩展消息队列的吞吐量。也就是在多台机器上启动多个 RabbitMQ 实例,而队列 queue 的消息只会存放在其中一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。消费的时候,如果连接到了另外的实例,那么该实例就会从数据实际所在的实例上的queue拉取消息过来,就是说让集群中多个节点来服务某个 queue 的读写操作。

缺点:无高可用性,queue所在的节点宕机了,其他实例就无法从那个实例拉取数据;RabbitMQ 内部也会产生大量的数据传输。

2.5.3 镜像集群模式

镜像队列集群是RabbitMQ 真正的高可用模式,集群中一般会包含一个主节点master和若干个从节点slave,如果master由于某种原因失效,那么按照slave加入的时间排序,"资历最老"的slave会被提升为新的master。
镜像队列下,所有的消息只会向master发送,再由master将命令的执行结果广播给slave,所以master与slave节点的状态是相同的

(1)缺点:
① 性能开销大,消息需要同步到所有机器上,导致网络带宽压力和消耗很重
② 非分布式,没有扩展性,如果 queue 的数据量大到这个机器上的容量无法容纳了,此时该方案就会出现问题了

3、Kafka

kafka是拉取模式的消息队列,是消费者控制什么时候拉取消息的;每条消息都有一个偏移量,每个消费者都会跟踪最近消费消息的偏移量。

3.1、Kafka如何保证消息不丢失

3.2、Kafka如何保证消息有序

3.3、Kafka如何实现幂等

kafka实现幂等的关键就是要实现broker的去重。为了实现消息发送的幂等性,kafka引入了两个概念:

  • pid。每个新的producer在初始化的时候会被分配一个唯一的PID,这个PID对于用户是不可见的。
  • Sequence Number。对于每个PID,该Producer发送数据的每个<Topic,Partition>都对应一个从0开始单调递增的Sequence Number。而borker端会对<PID,Topic,Partition>做缓存,当具有相同主键的消息提交的时,broker只会持久化一条。

3.4、Kafka消费重试机制

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/184684.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于蜣螂算法改进的随机森林分类算法-附代码

基于蜣螂算法改进的随机森林分类算法 - 附代码 文章目录基于蜣螂算法改进的随机森林分类算法 - 附代码1.数据集2.RF模型3.基于蜣螂算法优化的RF4.测试结果5.Matlab代码摘要&#xff1a;为了提高随机森林数据的分类预测准确率&#xff0c;对随机森林中的树木个数和最小叶子点数参…

SVN关联PyCharm使用

前言 本人因为要搭建一个自动化测试的框架&#xff0c;编程语言选择的python&#xff0c;python编辑器选择的PyCharm&#xff0c;代码管理工具使用的SVN。为了方便协作开发&#xff0c;需要将SVN关联PyCharm进行使用。 一、SVN关联PyCharm 1.点击左上角File–>选择Settin…

mongodb的聚合操作

mongodb的聚合操作 学习目标 了解 mongodb的聚合原理掌握 mongdb的管道命令掌握 mongdb的表达式 1 mongodb的聚合是什么 聚合(aggregate)是基于数据处理的聚合管道&#xff0c;每个文档通过一个由多个阶段&#xff08;stage&#xff09;组成的管道&#xff0c;可以对每个阶…

【Python】生成本项目的requeirments.txt

有的时候&#xff0c;我们需要对自己写的项目生成一个requeirments.txt&#xff0c;方便其他使用者快速安装依赖项 参考https://www.cnblogs.com/shun7man/p/14080921.html 1.使用pip 如果你的项目本身就是在venv虚拟环境下跑的&#xff0c;那么可以直接用下面的语句生成一个依…

java集合类-List/Queue

List List集合代表一个元素有序、可重复的集合&#xff0c;集合中每个元素都有其对应的顺序索引。可以通过索引来访问指定位置的集合元素。List集合默认按元素的添加顺序设置元素的索引。 List接口&#xff08;被改进&#xff09;和ListIterator接口&#xff08;被改进&#xf…

四六级英语学习(一)医疗健康类

suffer from 遭受 经历 high blood pressurehypertension 高血压 take steps to take measures to 采取措施 silent 沉默的 silence 沉默 -er表示人 或机器 eg: killer computer strokes 中风 attack 攻击&#xff0c;进攻 almostnearly 几乎 差不多 disease 大病 illn…

【uniapp】H5和小程序动态导入模块的方法

做uniapp项目通常都是用import Module from "./../module.js"方式引用模块的&#xff0c;但是&#xff0c;这种方式是静态的&#xff0c;还是只能放在执行代码段的顶部&#xff08;或者外部&#xff09;&#xff0c;若想放在代码中执行&#xff0c;那就会报错的&…

微信小程序——生命周期,生命周期的分类,页面生命周期,生命周期函数的分类,应用的生命周期函数,页面的生命周期函数,wxs脚本概述

一.生命周期什么是生命周期生命周期&#xff08; Life Cycle &#xff09;是指一个对象从创建->运行->销毁的整个阶段&#xff0c;强调的是一个时间段。例如&#xff1a;&#xff0e;张三出生&#xff0c;表示这个人生命周期的开始&#xff0e;张三离世&#xff0c;表示这…

单源最短路的建图方式(Dijkstra)

由于是复习&#xff0c;所以不会解释太多。 主要为Dijkstra的堆优化板子和朴素版&#xff08;看数据范围&#xff09; 再次看看时间复杂度[ n 为点数&#xff0c;m 为边数 ]&#xff1a;朴素版&#xff1a;O()&#xff0c;堆优化版&#xff1a;O( (nm)logm )。 目录 1.热浪&…

CS专业学习回顾

前言 起因是想清理一下github的仓库&#xff0c;没错是清理&#xff0c;之前fork了很多仓库学习代码&#xff0c;还有自己随便上传的一些代码&#xff0c;因为感觉没有留着的必要&#xff0c;博客多少可以review&#xff0c;这些早期写的代码&#xff0c;如洪水猛兽&#xff0…

java集合类-操作集合

Collections&#xff08;操作集合的工具类&#xff09; 该工具类里提供了大量方法对集合元素进行排序、查询和修改等操作&#xff0c;还提供了将集合对象1.设置为不可变、对集合对象实现同步控制等方法。自行看API即可。 2.有查找、替换集合元素的类方法。 有同步控制的方法&am…

使用 Python 深度学习方法对电影评论进行情绪预测

情感分析是一种自然语言处理问题&#xff0c;可以理解文本并预测潜在意图。 在本文中&#xff0c;你将了解如何使用 Keras 深度学习库将电影评论的情绪预测为正面或负面。 看完这篇文章&#xff0c;你会知道&#xff1a; 关于自然语言处理的 IMDB 情感分析问题以及如何在 Ke…

核心表结构

核心表结构目录概述需求&#xff1a;参考资料和推荐阅读Survive by day and develop by night. talk for import biz , show your perfect code,full busy&#xff0c;skip hardness,make a better result,wait for change,challenge Survive. happy for hardess to solve denp…

Android重新签名APK

前提已经配置好了Java环境。在要签名的apk文件目录路径位置&#xff0c;输入cmd&#xff0c;打开命令窗口。在命令窗口中输入jarsigner&#xff0c;有相应的提示。然后输入重新签名指令如下&#xff1a;jarsigner -verbose -keystore E:\tmc\keystore\androidsign.jks -signedj…

51单片机学习笔记-10IIC总线

10 I2C总线 [toc] 注&#xff1a;笔记主要参考B站江科大自化协教学视频“51单片机入门教程-2020版 程序全程纯手打 从零开始入门”。 10.1 AT24C02和I2C介绍 10.1.1 存储器介绍 图10-1 存储器分类 一般来说&#xff0c;RAM读写速度极快&#xff0c;但掉电丢失&#xff1b;而…

InfluxDB OSS v2.6.0安装使用小结(ubuntu Linux)

1 InfluxDB简介 InfluxDB是一款用Go语言编写的开源分布式时序、事件和指标数据库。 官网&#xff1a;https://www.influxdata.com 1.1 特色 InfluxDB的主要特色 1&#xff09;无结构&#xff08;无模式&#xff09;&#xff1a;可以是任意数量的列 2&#xff09;可拓展的 3&…

学习云原生的阅读书单

以下是我从豆瓣阅读上找到的书单 《云原生服务网格lstio》 《云原生操作系统Kubernetes》 《OpenShift云原生架构&#xff1a;原理与实践》

[oeasy]python0066_控制序列_光标位置设置_ESC_逃逸字符_CSI

光标位置 回忆上次内容 上次讲了 三引号的输出三引号中 回车和引号 都会 被原样输出\ 还是需要从 \\转义 黑暗森林 快被摸排清了 还有哪个 转义序列 没 研究过吗&#xff1f;&#x1f914; \e是 干什么的&#xff1f;&#x1f914; 回忆转义 转义转义 转化含义 \反斜杠(…

CnOpenData劳务外包企业工商注册基本信息数据

一、数据简介 随着我国社会主义市场经济的发展&#xff0c;劳务市场中的用工方式也朝着多样化方向演变&#xff0c;劳务外包正是现代化人力资源管理和企业生产实际结合的一种独特的新模式。 在劳务外包过程中&#xff0c;企业将人事管理的部分或者全部工作外包给一个专门的服务…

Hadoop 复习 ---- chapter01【大数据概念】

Hadoop 复习 ---- chapter01【大数据概念】1. 什么是大数据大数据的简介从IT过渡到DT2. Hadoop生态系统工具HADOOPHBASEHIVESTORMZooKeeperSqoopMAHOUT1. 什么是大数据 大数据的简介 指“无法由现有软件工具进行提取、存储、搜索、共享、分析和处理的庞大而复杂的数据集”。 通…