MQ分类
Acitvemq
kafka
优点:性能好,吞吐量高百万级,分布式,消息有序
缺点:单机超过64分区,cpu会飙高,消费失败不支持重试 ,
Rocket
阿里的mq产品
优点:单机吞吐量也很高10w级,分布式,可做到消息9丢失
缺点:支持的客户端语言不错
RabbitMQ
优点:高并发,吞吐量万级,基于AMQP(高级消息队列协议),支持多语言
缺点:商业版收费
MQ选择 :
Kafka :日志采集首选kafka ,适合大量数据的场景
ROcket :适合可靠性高的场景,电商等
RabbitMQ :性能时效 微妙级,数据量没有太大可以使用,功能比较完备
RabbitMQ是什么
RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在不同的应用之间共享数据(跨平台跨语言)。RabbitMQ是使用Erlang语言编写,并且基于AMQP协议实现。
AMQP协议
AMQP(Advanced Message Queuing Protocol)是一种开放标准的消息队列协议,它定义了消息的传递格式、消息模型和交互协议,用于在应用程序之间可靠地传递消息。
下面是AMQP协议的一些关键特点和核心概念:
消息模型:
生产者(Producer):发送消息的应用程序。
消费者(Consumer):接收和处理消息的应用程序。
代理(Broker):负责消息的路由和传递的中间件组件。
消息传递:
消息(Message):AMQP协议中的基本单元,包含消息头和消息体。消息头包括元数据信息,如消息ID、时间戳、优先级等。消息体则包含实际的数据。
队列(Queue):用于存储消息的容器。消费者从队列中接收消息,生产者将消息发送到队列中。
交换器(Exchange):接收生产者发送的消息,并将其路由到一个或多个队列。交换器根据预定义的规则(路由键)将消息发送给相应的队列。
绑定(Binding):将队列和交换器关联起来,定义了消息从交换器到队列的路由规则。
路由模型:
直连交换器(Direct Exchange):根据消息的路由键将消息发送到与之完全匹配的队列。
主题交换器(Topic Exchange):根据消息的路由键模式将消息发送到与之匹配的队列。路由键可以使用通配符进行匹配。
扇形交换器(Fanout Exchange):将消息广播到与之绑定的所有队列,忽略路由键。
可靠性和事务:
消息确认机制:AMQP支持生产者确认机制,生产者可以等待来自代理的消息确认,确保消息已被成功接收和处理。
事务支持:AMQP支持事务机制,生产者可以将一组消息放在一个事务中发送,保证这些消息要么全部成功发布,要么全部回滚。
RabbitMQ的优势:
**可靠性(Reliablity):**使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认。
**灵活的路由(Flexible Routing):**在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的Exchange。
**消息集群(Clustering):**多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
**高可用(Highly Avaliable Queues):**队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
多种协议(Multi-protocol):支持多种消息队列协议,如STOMP、MQTT等。
多种语言客户端(Many Clients):几乎支持所有常用语言,比如Java、.NET、Ruby等。
管理界面(Management UI):提供了易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。
跟踪机制(Tracing):如果消息异常,RabbitMQ提供了消息的跟踪机制,使用者可以找出发生了什么。
插件机制(Plugin System):提供了许多插件,来从多方面进行扩展,也可以编辑自己的插件。
RabbitMQ 对比 Kafka
1. 设计目标:
Kafka:Kafka是一个分布式的、高吞吐量的发布/订阅消息系统,旨在处理大规模的实时数据流。它特别适合用于日志处理、指标收集、流处理等场景。
RabbitMQ:RabbitMQ是一个开源的、可靠的企业级消息队列系统,旨在支持各种消息传递模式,如点对点、发布/订阅、请求/响应等。它适用于异步通信、任务队列、事件驱动等应用。
2. 架构:
- Kafka:Kafka采用分布式、分区、复制的架构。消息以主题(topics)的形式进行组织,每个主题可以分为多个分区(partitions),并且每个分区可以有多个副本(replicas)。Kafka的消息存储采用持久化日志(log)的方式,允许高效的顺序读写。
- RabbitMQ:RabbitMQ基于AMQP(Advanced Message Queuing Protocol)协议,采用中心化的架构。消息发送者将消息发送到交换器(exchange),交换器根据规则将消息路由到队列(queue),消息接收者从队列中接收消息。RabbitMQ支持多种消息传递模式,通过不同的交换器类型(如直连交换器、主题交换器、扇形交换器等)来实现。
3. 可用性和可靠性:
- Kafka**:Kafka通过分区和副本机制提供了高可用性和容错性。每个分区都有一个主副本(leader)和多个副本(follower),当主副本发生故障时,可以从副本中选举出新的主副本。此外,Kafka还支持数据复制和持久化,确保消息不会丢失**。
- RabbitMQ:RabbitMQ通过消息确认机制和持久化存储来保证消息的可靠性。发送者可以通过等待接收到确认消息来确保消息已经成功发送到队列中,并且可以将消息标记为持久化,以防止消息在服务器故障时丢失。
4. 性能:
**- Kafka:Kafka的设计目标之一是高吞吐量和低延迟。**它通过顺序写磁盘、零拷贝技术和批量压缩等方式来提高性能,并且能够处理大规模的消息流。
- RabbitMQ:RabbitMQ的性能较Kafka略低,但对于大多数应用场景来说已经足够。它使用内存缓冲区来提高性能,并且支持消息预取和持久化等机制。
5. 生态:
- Kafka:Kafka拥有丰富的生态系统,广泛应用于大数据领域。它与Hadoop、Spark等工具集成紧密,可用于数据管道、流处理、日志收集等场景。
- RabbitMQ:RabbitMQ也有一定的生态系统,适用于各种企业应用。它提供了多种语言的客户端库,并且易于与其他系统集成,可用于任务分发、事件驱动、微服务架构等。
6.应用场景
ps(Kafka和RabbitMQ的使用场景并不是互斥的,有些场景可能两者都可以胜任。选择合适的消息传递系统应该根据具体的需求、系统架构和团队技术栈来进行评估和决策。)
kafka的使用场景:
数据流处理:Kafka适用于处理大规模的实时数据流。它可以接收和传输大量的数据,支持高吞吐量和低延迟。这使得Kafka在大数据处理、流式数据分析和实时监控等领域非常有用。
**日志收集和聚合:**Kafka的持久化日志特性使其成为一个理想的日志收集和聚合工具。它可以接收来自多个源的日志数据,并将其传递给日志处理系统,如ELK(Elasticsearch, Logstash, Kibana)堆栈,用于存储、索引和分析日志。
RabbitMQ的使用场景:
异步任务处理:**RabbitMQ是一个可靠的消息队列系统,适合用于处理异步任务。**应用程序可以将任务发布到RabbitMQ中,然后由消费者异步地处理任务。这样可以有效地解耦和分离任务处理逻辑,提高系统的可伸缩性和可靠性。
**事件驱动架构:**RabbitMQ支持各种消息传递模式,如直连、主题和扇形交换器,以及消息路由和过滤等功能。这使得RabbitMQ成为构建事件驱动架构的理想选择,应用程序可以通过发布和订阅事件来实现松耦合的系统集成和通信。
**系统之间的数据同步:**RabbitMQ可以用作系统之间的数据同步工具。当一个系统产生数据更新时,可以将更新事件发布到RabbitMQ,然后其他系统订阅这些事件并进行相应的数据同步操作。这种方式可以确保数据的一致性和可靠性。
RabbitMQ结构
四大核心 :生产者、 消费者、交换机、队列
生产者发送消息流程:
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
消费者接收消息流程:
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
6、ack回复
RabbitMq 各组件功能
Broker:标识消息队列服务器实体.
Virtual Host:虚拟主机。标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个Broker有多个vhost,每个vhost相互隔离
**RabbitMQ服务器,**拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /
Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Banding:绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟链接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
Connection:网络连接,比如一个TCP连接。
Publisher:生产者,也是一个向交换器发布消息的客户端应用程序。
Consumer:消费者,表示一个从一个消息队列中取得消息的客户端应用程序。
Message:消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等
RabbitMQ交换机
rabbitMQ有 四种类型的交换机:direct、fanout、topic、headers
direct
路由键与队列名完全匹配交换机, 此种类型交换机,通过RoutingKey路由键将交换机和队列进行绑定,消息被发送到exchange时,需要根据消息的RoutingKey进行匹配
只将消息发送到完全匹配到此RoutingKey的队列
比如:如果一个队列绑定到交换机要求路由键为转发 key 则不会转发 key.1〞 。
fanout
扇出类型交换机,此种交换机,会将消息分发给所有绑定了此交换机的队列,此时RountingKey参数无效
Fanout类型交换机下发送一条,无论RoutingKey,queue1.queue2,queue3,queue4都可以收到消息
topic
主题类型交换机 :和direct类似,也是通过RountingKey匹配 ,但是 topic 是模糊匹配
1: topic中 将RountingKey 通过 “.” 来分为多个部分
2: “*”代表一个部分
3: “#” 代表0个或多个部分 (如果绑定的 路由键为 # 时,则接受所有消息,因为路由键所有都匹配)
上图中 :发送 key1.key2.key3.key4 则 queue 1 2 3 4 都可以匹配到
headers
header 匹配AMQP 消息的header 而不是路由键,此外 header和direct完全一只,但性能差很多
消费方指定 的 header中必须要有一个 x-match 键
x -match 键值 有两个 :
x-match = all :所有的键值对都匹配才能接收到消息
x-match = any 任意键值对匹配就能接受到消息
Rabbit消息确认机制
生产者息确认机制
生产者通过 再 rabbit服务器上注册 回调函数 来实现消息确认机制
当消息被生产者成功发送到 RabbitMQ 服务器并得到确认或未确认时,RabbitMQ 服务器将触发确认回调。
在 RabbitMQ 中,确认回调通常是通过注册一个回调函数来实现的。一旦生产者成功将消息发送到 RabbitMQ 服务器并收到确认或未确认的响应,RabbitMQ 服务器将调用已注册的确认回调函数。
消费者消息确认机制
和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。
所以,消息接收的确认机制主要存在三种模式:
1、自动确认, 这也是默认的消息确认情况。 AcknowledgeMode.NONE
RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。
2、根据情况确认, 这个不做介绍
3、手动确认 ,
这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
消费者收到消息后**,手动调用basic.ack/basic.nack/basic.reject后**,RabbitMQ收到这些消息后,才认为本次投递成功。
basic.ack用于肯定确认
basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)
basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息
消费者端以上的3``个方法都表示消息已经被正确投递,但是basic.ack表示消息已经被正确处理。
RabbitMQ持久化
持久化消息:
消息的持久化需要满足两个条件:消息本身必须是持久化的,且消息所在的队列必须是持久化的。
在发布消息时,通过将消息的 **delivery_mode 属性设置为 2 来标记消息为持久化消息。**这个属性告诉 RabbitMQ 服务器将消息写入磁盘而不仅仅是存储在内存中。
请注意,将消息标记为持久化并不能完全保证消息不会丢失。它只是确保在 RabbitMQ 服务器重启后,消息能够从磁盘中恢复。如果需要更高级别的消息可靠性保证,可以使用发布确认机制。
持久化队列:
队列的持久化需要在创建队列时指定 durable 参数为 true。
当一个持久化队列被创建时,RabbitMQ 服务器会将队列的元数据(包括队列的名称、持久化标记等)存储在磁盘上,以便在服务器重启后能够恢复队列。
请注意,如果一个队列已经被声明为持久化队列,那么后续对该队列的操作也必须使用相同的参数,否则会引发错误。
交换机和绑定的持久化:
交换机和绑定的持久化与队列的持久化类似。在创建交换机和绑定时,可以通过将参数 durable 设置为 true 来指定它们为持久化的。
持久化的交换机和绑定将在 RabbitMQ 服务器重启后重新创建,以便恢复与持久化队列之间的关系。
持久化日志文件:
RabbitMQ 使用持久化日志文件(transaction log)来记录消息和元数据的变化。该日志文件存储在磁盘上,并用于在服务器重启后恢复消息和队列的状态。
持久化日志文件记录了消息的发布、路由和传递过程,以及队列和交换机的创建、绑定和删除操作。
RabbitMQ 使用 **Write-Ahead Log(WAL)技术来确保持久化日志的可靠性,并将写入磁盘的顺序操作缓冲在内存中,**以提高性能。
WAL 索引:
RabbitMQ 使用 WAL 索引来提供快速的持久化日志访问和检索。 索引与持久化日志文件分开存储
**索引用于追踪消息的位置和偏移量,**以便在需要时快速定位和读取特定的日志记录。
**WAL 索引通常是基于 B+ 树或其他高效的数据结构实现的,**以支持快速的插入、查找和范围查询操作。
索引通常包括日志记录的偏移量、消息的标识符(如消息 ID 或交换机/队列名称)、记录类型等信息。这些信息被用于在恢复时正确地解析和处理持久化日志文件。
恢复过程:
在 RabbitMQ 服务器启动时,它会读取存储在磁盘上的持久化日志文件,并使用索引来重建消息和队列的状态。
通过读取持久化日志文件中的日志记录,并根据索引信息进行解析和处理,RabbitMQ 服务器可以恢复消息的发布、路由和传递过程,以及队列和交换机的创建、绑定和删除操作。
恢复过程中的索引可以加速日志文件的读取和定位,从而提高恢复的性能和效率
RabbitMQ集群
集群优点:
消费者或生产者 在某个节点崩溃的情况下继续运行
增加节点可以使我们的MQ 处理更多的消息,承载更多的业务量
搭建集群方式
1 从已经搭建好的RabbitMQ 中 克隆出两台机器
2 修改克隆机器的IP地址
3 修改三台机器的 hostname(比如:node1,node2,node3)修改完重启
4 修改三台机器 host 文件
5 把第一个节点的cookie文件复制到另外两台机器上 ,确保公用同一个cookie
6 启动RabbitMq服务,启动Erlang虚拟机, RabbitMQ应用
镜像队列
虽然集群共享队列,但默认情况下,消息只会被路由到某个符合条件的队列,不会同步到其他节点,会有消息丢失的风险
队列镜像:镜像队列是主队列的副本,分布在不同节点上,,消息就会被拷贝到处于同一个镜像分组的所有队列上
- Name:policy 的名称
- Appliy to:指定该策略用于交换器还是队列,或是两者
- Pattern:一个用来匹配队列或交换器的匹配模式(正则表达式)
- priority:可选参数,指定策略的优先级
- Definition:镜像定义,包括三个部分 ha-mode, ha-params, ha-sync-modeha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由 ha-params 指定
nodes:表示在指定的节点上进行镜像,节点名称通过 ha-params 指定ha-params:设置镜像队列的参数,根据 ha-mode 的取值,该 ha-params 的设置值有所不同。如果 ha-mode 为 all,则不使用该参数;如果 ha-mode 为 exactly,则为数字;如果 ha-mode 为 nodes,则为字符串列表。ha-sync-mode:进行队列中消息的同步方式,有效值为 automatic(自动方式)和 manual(手动方式)。
slave 升级为 master
镜像队列 master 出现故障时,最老的 slave 会被提升为新的 master。如果新提升为 master 的这个副本与原有的 master 并未完成数据的同步,那么就会出现数据的丢失,
RabbitMQ 提供了ha-promote-on-shutdown,ha-promote-on-failure两个参数让用户决策是保证队列的可用性,还是保证队列的一致性;
两个参数分别控制正常关闭、异常故障情况下 slave 是否提升为 master
when-synced:从节点与主节点完成数据同步,才会被提升为主节点
always:无论什么情况下从节点都将被提升为主节