文章目录
- 一、流控
- 1.1 流控机制
- 1.2 流控原理
- 1.3 流控状态显示
- 1.4 流控对象
- 1.5 性能提升
- 二、镜像队列
- 2.1 机制原理
- 2.1.1 集群结构
- 2.2 镜像结构
- 2.2.1 组播GM
- 2.2.1.1 实现原理
- 2.2.1.2 加入新节点
- 2.2.1.3 节点宕机的影响
- 2.3 配置镜像队列
- 2.3.1 定义参数
- 2.3.2 命令配置
- 2.3.4 相关命令
- 2.3.4.1 查看节点消息同步状态
- 2.3.4.2 取消某队列同步消息功能
- 2.3.4.3 手动同步某队列消息
一、流控
- 前面我们在讲解存储机制时,提到过流量控制。当消息积压时,消息会进入到队列深处,消费消息会使服务器性能大大降低。而内存告警和磁盘告警就是通过设置阈值来预防此情形,当达到阈值后,阻塞集群中所有的Connection,直到对应项恢复正常,属于全局性的流量控制 (Global Flow Control)。而这里将要提到的流控是针对连接Connection来的(Per-Connection Flow Control或者Internal Flow Control)。
- 流控作用:
- 流控机制是用来避免消息的发送速率过快而导致服务器难以支撑的情形,从而保障服务器资源在安全范围内,提高业务的稳定性。
1.1 流控机制
流控是对什么进行控制?
- 是对进程邮箱的阈值进行控制。
rabbitmq进程邮箱:
- 进程通信方式有四种,分别为主从式、会话式、消息传递(邮箱机制)、共享存储区。而Erlang 进程之间并不共享内存 (binary 类型的除外),而是通过消息传递来通信,每个进程都有自己的进程邮箱(mailbox)。
- 默认情况下,Erlang并没有对进程邮箱的大小进行限制,所以当有大量消息持续发往某个进程时,会导致该进程邮箱过大,最终内存溢出而崩溃。
- 在rabbitmq中,如果生产者持续高速发送,而消费者消费速度较低时,如果没有流控,很快就会使内部进程邮箱的大小达到内存阈值。
流控机制是什么?
- rabbitmq使用了一种基于信用证算法(credit-based algorithm)的流控机制,来限制发送消息的速率来解决进程邮箱大小达到内存阈值而导致服务器崩溃问题。
- 通过监控各个进程的进程邮箱,当某个进程负载过高而来不及处理消息时,这个进程的进程邮箱就会开始堆积消息。
- 当堆积到一定量时,就会阻塞而不接收上游的新消息。从而慢慢地,上游进程的进程邮箱也会开始堆积消息。当堆积到一定量时也会阻塞而停止接收上游的消息,最后就会使负责网络数据包接收的进程阻塞而暂停接收新的数据。
1.2 流控原理
- 流控原理流程:
- 进程XXX——> 进程A——> 进程B——> 进程C—— 进程…,每个进程中都有一对关于收发消息的credit值。
- 拿进程B来看,有两个参数:
- {{credit_from,C},value]参数,表示能发送多少条消息给进程C,每发送一条消息该值减 1,当为 0 时,进程 B 不再往进程 C 发送消息也不再接收进程 A 的消息。
- [{credit_to,A},value}参数,表示再接收多少条消息就向进程A 发送增加credit值的通知,进程 A 接收到该通知后就增加{{credit_from,B),value] 所对应的值,这样进程 A 就能持续发送消息。
- 当上游发送速率高于下游接收速率时,credit值就会被逐渐耗光,此时进程被阻塞,阻塞情况会一直传递到最上游。
- 当上游进程收到来自下游进程的增加 credit 值的通知时,若此时上游进程处于阻塞状态则解除阻塞,开始按收更上游进程的消息,一个一个传导最终能够解除最上游的阻塞状态。
- 综上分析可知,基于信用证的流控机制最终将消息发送进程的发送速率限制在消息处理进程的处理能力范围之内。
1.3 流控状态显示
- 当一个连接(Connection)触发流控时会处于“flow”的状态,代表此Connection状态每秒在 blocked 和 unblocked 之间来回切换数次。以此可以将消息发送的速率控制在服务器能够支撑的范围之内。web页面可以查看,也可以通过 rabbitmactl list_connections 命令查看。
1.4 流控对象
流控机制对象:
- 连接(Connection)
- 信道 (Channel)
- 队列(Queue)
如下图,从Connection进程——> Channel进程——> Queue进程——> 消息持久化存储进程,是一条消息从接收到存储的一个必需的流控连,对于处于整个流控链中的任意进程,只要该进程阻塞,上游的进程必定全部被阻塞。
主要进程:
- rabbit_reader进程:连接的处理进程,负责接收、解析 AMQP 协议数据包等。
- rabbit_channel进程:信道的处理进程,负责处理AMQP协议的各种方法、进行路由解析等。
- rabbit_amqqueue_process进程:队列的处理进程,负责实现队列的所有逻辑。
- rabbit_msg_store进程:负责实现消息的持久化。
各进程状态情形分析:
- 当某个Connection处于 flow 状态时,但这个 Connection 中没有一个 Channel 处于 flow 状态,代表这个Connection中有一个或者多个 Channel 出现了性能瓶颈。
- 存在情形:某些 Channel进程的运作(比如处理路中逻辑) 导致服务器 CPU 的负载过高,尤其是在发送大量较小的非持久化消息时最容易发生。
- 当某个 Connection处于flow 状态,这个Connection 中也有若干个 Channel 处于 flow状态,但没有任何一个对应的队列处于 flow 状态时,这就意味着有一个或者多个队列出现了性能瓶颈。
- 存在情形:消息存入队列时导致服务器CPU 负载过高;持久化消息入盘导致服务器 I/O 负载过高,尤其是在发送大量较小的持久化消息时最容易发生。
- 当某个 Connection处于 flow 状态,这个 Connection 中的若干个 Channel 处于 flow状态,并且也有若干个对应的队列处于 flow 状态时,这就意味着在消息持久化时出现了性能瓶颈。
- 存在情形:持久化消息入盘导致服务器 I/O 负载过高,尤其是在发送大量较大的持久化消息时最容易发生。
1.5 性能提升
- 一般情况下,向一个队列里推送消息时,往往会在rabbit_amqqueue_process中(即队列进程中) 产生性能瓶颈。
- 提升队列性能方式:
- 第一种,若Erlang版本在18.x以上,可以开启HiPE 功能,可以提高 30%~40%的性能。
- 第二种,代码层面提升,将单个rabbit_amqqueue_process替换成多个rabbit_amqqueue_process。这里并不是使用多个队列,而是将交换器、队列、绑定关系、生产和消费的方法全部进行封装,这样对于应用来说好是在操作一个(逻辑) 队列。至于怎么封装的,纯运维就不用去了解了,开发就自行百度测试。
二、镜像队列
- 镜像队列在解决什么?
- 镜像队列 (Mirror Queue) 机制,可以将队列镜像到集群中的其他Broker节点之上。当集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上,以此可以保证服务的可用性,而不是节点实现其上的队列不可用。
2.1 机制原理
- 一般情况下,给每一个配置镜像的队列,即镜像队列都包含一个主节点(master) 和若千个从节点 (slave)。如下图,给qingjun_queue队列配置镜像包含了一个主节点和两个从节点。这里要注意镜像的主从节点和rabbitmq服务主从节点的区别。
- 实现原理:
- 镜像队列的slave节点会准确地按照镜像队列的master节点执行命令的顺序进行动作,所以镜像队列的master节点和slave节点数据是相同的,维护状态也应该是相同的。
- 当镜像队列的master节点失效,"资历最老"的镜像队列slave1就会被提升为新的master。选主机制是根据镜像队列save节点加入时间顺序来的,时间最长的slave 即为“资历最老”。
- 发送到镜像队列的所有消息会被同时发往 master 和所有的 slave 上,如果此时 master 挂掉了,消息还会在 slave 上,这样 slave提升为 master 的时候消息也不会丢失。除发送消息 (Basic.Publish) 外的所有动作都只会向 master 发送,然后再由 master 将命令执行的结果广播给各个 slave。
- 注意事项:
- 若消费者与slave建立连接并进行订阅消费,其实质上都是从 master 上获取消息,只不过看似是从 slave 上消费而已。
- 比如消费者与 slave 建立了 TCP 连接之后执行一个 Basic.Get的操作,那么首先是由 slave 将 Basic.Get 请求发往 master,再由 master 准备好数据返回给slave,最后由 slave 投递给消费者。
2.1.1 集群结构
- 如下图,集群一共三个节点,broker1、broker2、broker3。每个broker节点都包含1个镜像队列的master 和2 个镜像队列的slave。
- qingjun_queue镜像队列的负载大多都集中在broker1上,baimu_queue镜像队列的负载大多都集中在broker2上,wuhan_queue镜像队列的负载大多都集中在broker3 上。
- 只要确保镜像队列的 master 节点均匀散落在集群中的各 Broker 节点上就可以确保很大程度上的负载均衡。为什么不是绝对?因为每个队列的流量会有不同,因此均匀散落各个队列的 master 也无法确保绝对的负载均衡)。
- 注意事项:
- rabbitmq镜像队列同时支持发送方确认机制(publisher confirm )和事务机制。
- 在事务机制中,只有当前事务在全部镜像中执行之后,客户端才会收到 Tx.Commit-ok 的消息。
- 在publisher confirmm 机制中,生产者进行当前消息确认的前提是该消息被全部进行所接收了。
2.2 镜像结构
- 镜像队列结构和普通队列结构不同点在于backing_queue的不同,普通队列使用的是rabbit_variable_queue,而镜像队列使用的backing_queue内部包裹了普通backing_queue进行本地消息消息持久化处理,在此基础上增加了将消息和 ack 复制到所有镜像的功能。
- 如下图,master使用的backing_queue是 rabbit_mirror_queue_master,slave使用的backing_queue是rabbit_mirror_queue_slave。
- 普通队列结构
- 镜像队列结构
2.2.1 组播GM
- GM的作用:
- 所有对 rabbit_mirror_queue_master 的操作都会通过组播 GM(GuaranteedMulticast)的方式同步到各个 slave 中。
- GM 负责消息的广播,rabbit_mirror_queue_slave 负责回调处理,而 master 上的回调处理是由 coordinator 负责完成的。
- 除了Basic.Publish,所有的操作都是通过 master 来完成的,master 对消息进行处理的同时将消息的处理通过 GM 广播给所有的 slave,slave 的 GM 收到消息后,通过回调交由rabbit_mirror_queue_slave进行实际的处理。
2.2.1.1 实现原理
- GM 模块是通过组播通信协议来实现,该协议能够保证组播消息的原子性。换言之,保证组中活着的节点要么都收到消息,要么都收不到。
- 实现过程:
- 将所有的节点形成一个循环链表,每个节点都会监控位于自己左右两边的节点。
- 当有节点新增时,相邻的节点保证当前广播的消息会复制到新的节点上。
- 当有节点失效时,相邻的节点会接管以保证本次广播的消息会复制到所有的节点。
- master 和 slave 上的组播GM会形成一个组 (gm group),这个组的信息会记录在 Mnesia 中。
- 不同的镜像队列形成不同的组。操作命令从 master 对应的 GM 发出后,顺着链表传送到所有的节点。由于所有节点组成了一个循环链表,master 对应的 GM 最终会收到自己发送的操作命令,这个时候 master 就知道该操作命令都同步到了所有的 slave 上。
2.2.1.2 加入新节点
- 如下图,新增了一个节点4,整个过程就像在链表中间插入一个节点。
- 注意事项:
- 每当个节点加入或者重新加入到这个镜像链路中时,之前队列保存的内容会被全部清空。
2.2.1.3 节点宕机的影响
- 当 slave 挂掉之后,除了与 slave 相连的客户端连接全部断开,没有其他影响。
- 当 master 挂掉之后,会有以下连锁反应:
- 第一步,先与 master 连接的客户端连接全部断开。
- 第二步,选举最老的slave作为新的 master。因为最老的 slave 与旧的 master 之间的同步状态是最好的。如果此时所有 slave 处于未同步状态,则未同步的消息会丢失。
- 第三步,新的 master 重新入队所有 unack 的消息。因为新的 slave 无法区分这些 unack 的消息是否已经到达客户端,或者是 ack 信息丢失在老的 master 链路上,再或者是丢失在老的 master 组播 ack 消息到所有 slave 的链路上,所以出于消息可靠性的考虑,重新入队所有 unack 的消息,带来的弊端就是客户端可能会有重复消息。
- 第四步,若客户端连接着 slave,并且 Basic.Consume 消费时指定了 x-cancel-on-ha:failover 参数,那么断开时客户端会收到一个 Consumer Cancellation Notifcation 的通知,消费者客户端中会回调 consumer 接口的 handleCancel 方法。如果未指定 x-cancel-on-ha-failover 参数,那么消费者将无法感知 master 宕机。
2.3 配置镜像队列
镜像队列的配置主要是通过添加相应的 Policy 来完成的,两种方式,一是在代码里提前写好配置,二是使用管理命令配置。
注意事项:
- 镜像队列中最后一个停止的节点会是 master,启动顺序必须是 master 先启动。
- 如果 slave先启动,它会先等待30 秒等待master 的启动,Master节点加入到集群中后,slave才启动。
- 如果 30 秒内 master没有启动,slave 会自动停止。
- 当所有节点因故(断电等) 同时离线时,每个节点都认为自己不是最后一个停止的节点,要恢复镜像队列,可以尝试在 30 秒内启动所有节点。
2.3.1 定义参数
参数含义:
- ha-mode:指明镜像队列的模式,可选项有 all、exactly、nodes,默认为 all。
- all:表示在集群中所有的节点上进行镜像。
- exactly:表示在指定个数的节点上进行镜像,节点个数由 ha-params 指定。
- nodes: 表示在指定节点上进行镜像,节点名称由 ha-params 指定,节点的名称通常类似于 rabbit@hostname ,可以通过rabbitmqctl_cluster_status命令查看。
- ha-params:不同的 ha-mode 配置中需要用到的参数。
- ha-sync-mode:队列中消息的同步方式,可选项有 automatic(自动) 和manual(手动)。
- ha-promote-on-shutdown:可选参数when-synced(何时同步)和always(始终同步)。
- ha-promote-on-failure:可选参数when-synced(何时同步)和always(始终同步)。
注意事项:
- ha-mode参数对排他(exclusive) 队列不生效,因为排他队列是连接独占的,当连接断开时队列会自动删除,所以实际上这个参数对排他队列没有任何意义。
- 将新节点加入已存在的镜像队列时,默认情况下 ha-sync-mode 取值为 manual(手动),镜像队列中的消息不会主动同步到新的 slave 中,除非显式调用同步命令。当调用同步命令后,队列开始阻塞,无法对其进行其他操作,直到同步完成。
- 当 ha-sync-mode 设置为 automatic(自动)时,新加入的 slave 会默认同步已知的镜像队列。由于同步过程的限制,所以不建议对生产环境中正在使用的队列进行操作。
- 当所有slave都出现未同步状态,且ha-promote-on-shutdown参数设为 always,那么不论 master 因为何种原因停止,slave 都会接管 master,优先保证可用性,存在消息丢失情况。当ha-promote-on-shutdown参数设置为 when-synced(默认)时,分两种情况:
- 若master 因主动原因停掉,比如通过rabbitmgctl stop命令停止,那么slave不会接管 master,此时镜像队列不可用。
- 若master 因被动原因停掉,比如 Erlang 虚拟机或者操作系统崩溃,那么 slave 会接管 master。
2.3.2 命令配置
- 命令格式:
- rabbitmqctl set_policy [-p vhost] [–prioritypriority] [–apply-to apply-to] {name} {pattern} {definition}
- 参数释义:
- [-p vhost]参数:可选参数,针对指定vhost下的queue进行设置。
- [–prioritypriority]:可选参数,policy的优先级。
- [–apply-to apply-to ]参数:可选参数,指定对象为交换器还是队列。
- {name}参数:自定义策略名称。
- {pattern}参数: queue的匹配模式(正则表达式)
- {definition}参数:ha-mode 、ha-params、ha-sync-mode三部分。
1.查看要进行镜像的队列。
2. 设置策略。
# 优先级为0。
#指定对象为所有交换器。
#策略名称为qingjun_policy。
#匹配正则为"以qingjun开头的所有交换器"。
#镜像策略为,在3个节点上进行镜像,并设置消息同步方式为自动同步。
[root@node1 ~]# rabbitmqctl set_policy --priority 0 --apply-to exchanges qingjun_policy "^qingjun*" '{"ha-mode":"exactly","ha-params":3,"ha-sync-mode":"automatic"}'
- 查看结果。
4. 查看状态
2.3.4 相关命令
2.3.4.1 查看节点消息同步状态
[root@node1 ~]# rabbitmqctl list_queues name slave_pids synchronised_slave_pids
2.3.4.2 取消某队列同步消息功能
#取消qingjue_queue队列的消息同步功能,注意这个队列是能匹配到上面设置的镜像队列策略的。
[root@node1 ~]# rabbitmqctl cancel_sync_queue qingjue_queue
2.3.4.3 手动同步某队列消息
# 手动给qingjue_queue队列进行消息同步,注意这个队列是能匹配到上面设置的镜像队列策略的。
[root@node1 ~]# rabbitmqctl sync_queue qingjue_queue