目录
消息分发
负载均衡
幂等性保障
顺序性保障
顺序性保障方案
二号策略:分区消费
三号策略:消息确认机制
四号策略:
消息积压
RabbitMQ集群
选举过程
RabbitMQ是基于AMQP协议实现的,该协议实现了事务机制,要么全部成功,要么全部失败,
1.不采用事务:
正常来说,这种情况发一个,然后一个出错了,,第一条消息成功,这样就会第二条消息会不成功
@RequestMapping("/trans") public String trans(){ System.out.println("trans test ...."); rabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE,"trans test 1.."); int num=5/0; rabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE,"trans test 2.."); return "消息发布成功"; }
2.采用事务
还需要下面这两个一个创建Manager,还要加上@Transactional,事务,要么都成功,要么都失败
消息分发
当队列中有多个消费者时候,队列会把消息分派给不同的消费者,每条消息只会发送给订阅列表里面的一个消费者
channel.basicQos方法:来限制当前信道上的消费者所能保持的最大未确认消息的数量
场景理解:
通过设置prefetchCount参数,同时必须要设置消息应答方式为手动应答.
prefetchCount:控制消费者从队列中预取(prefetch)消息的数量,以此来满足流控制和负载均衡
未确认5个,等待发送15个
通过这里可以看到1号处理速度快,2号速度慢
负载均衡
公平分发,五五分成,一个处理任务快,一个处理慢,就会造成快的一直处于忙的状态,而
幂等性保障
幂等性是数学和计算机中某些运算的性质,可以被多次应用,而不改变初始应用结果
数据库的select操作,不同时间两次查询结果可能不同,但是这个操作符合幂等,幂等事对资源的影响,而不是返回结果,查询操作对资源本身不会影响,之所以结果不同,可能是中间有其他操作对资源造成了修改。
应用上的幂等性,对于同一个订单,订单系统多次调用支付系统,支付系统只能处理一次扣款
MQ幂等性介绍,同等消息多次消费,对系统的影响是相同的。
Exactly once:恰好一次,每条消息肯定会被传输一次且仅传输一次(没人实现,性能太低)
At most once:可能会丢失,但是绝对不会重复传输
At least once:最少一次,消息绝对不会消失,但是有可能会重复(比如支付系统,出现这个就麻烦了,
解决方案:(所有MQ都面临这个问题)当消费者收到重复消息,如何处理
全局唯一ID:
1.每一条消息分派唯一ID(UUID,自增ID,业务ID,时间戳+业务ID,时间戳+业务ID)
2.消费者收到消息后,先用id判断该消息是否已经消费过,假如已经消费过,则放弃处理
3.如果未消费过,消费者开始消费消息,业务处理成功后,把唯一ID保存起来(数据库/redis)redis 原子性操作setnx(set if not exists)来保证幂等性,唯一ID作为key放到redis中,返回1,说明之前没有消费过,正常消费返回0,说明这条消息已经消费过,抛弃。
业务逻辑判断
检查数据库中是否存在相关数据记录,或者使用乐观锁机制来避免更新已被其他事务更改的数据,再或者处理消息之前,先检查相关事务状态,确保消息对应操作尚未执行,然后才进行处理,具体根据业务场景来处理.
顺序性保障
消费者,消费消息的顺序和生产者生产消息的顺序保持一致,比如生产者发送消息的顺序为msg1,msg2,msg3,那么消费者也安装msg1,msg2,msg3顺序进行消费
哪些情况可能会打破RabbitMQ的顺序性呢?
1.多个消费者:队列配置多个消费者时候,消息可能被不同的消费者并行处理,从而导致消息处理的顺序性无法保证。
2.网络抖动或者异常:消息传递过程中,出现网络波动或者异常
3.消息重试:消费者处理消息后未能及时发送确认或者确认消息在传输过程中丢失,那么MQ可能会认为消息未被成功消费而重试
4.消息路由:在复杂的路由场景中,可能会根据路由键被发送到不同队列,无法保障顺序
5.死信队列:消息因为某些原因被放入死信队列,死信队列被消费,无法保证消息的顺序和生产者发送顺序一致。
顺序性保障方案
局部顺序性保证,全局顺序性保障
二号策略:分区消费
当需要多个消费者来提高处理速度时候,可以使用分区消费,把一个队列分割成多个分区,每个分区由一个消费者处理,以此来保持每个分区消息的顺序性.
订单状态修改:1.创建 2.取消 3.已支付 4.已经删除
同一个订单ID的消息,保持顺序性就可以了
业务逻辑:根据订单ID,进行hash(或者其他算法),同个订单ID,经过这个算法,得到的队列名称是一致的(基于spring-clou-stream操作)
三号策略:消息确认机制
保证在一个队列中,是顺序性的
四号策略:
有时候,即使消息乱序,但是也可以在业务层实现顺序控制,如在消息中嵌入序列号等,并在消费时候,根据这些消息来处理。
消息积压
消息生产过快:在高流量或者高负载的情况下,生产者以极高的速率发送消息,超过了消费者的处理能力(消费者处理不过来)
消费者处理能力不足:消费者处理处理消息的速度跟不上消息生产的速度,也会导致消息在队列中积压
1)消费端业务逻辑复杂,消耗时间长
2)消费端代码性能低
3)系统资源限制,如cpu,内存,磁盘IO等也会限制消费者处理消息的速率。
4)异常处理不当:消费者在处理消息时候,出现异常,导致消息无法被正常处理和确认
网络问题:网络延迟或则会不稳定,消费者处理无法及时接收或者确认消息,最终导致消息积压
RabbitMQ配置过低
解决方案:
1.提高消费者效率:
1)增加消费者实例数量,比如新增机器
2)优化业务逻辑,比如实现多线程来处理业务
3)设置prefetchCount,当一个消费者阻塞时,消息转发到其他未阻塞的消费者
4)消息发生异常,设置合适的重试策略,或者转入到死信队列
2.限制生产者速率:比如流量控制,限流算法等
a.流量控制:在消息生产者中实现流量控制逻辑,根据消费者处理能力动态调整发送速率
b.限流:使用限流工具,为消息发送速率设置一个上限
c.设置过期时间.如果消息未被消费,可以配置死信队列,以避免消息丢失,并且减少对主队列的压力
3.资源与配置优化:
比如升级RabbitMQ服务器硬件,调整RabbitMQ的配置参数等等。
RabbitMQ集群
多机多节点:要求在同一个局域网上,这样(他会没有延迟)
三台机器分别安装RabbitMQ,三个版本最好一致
ifconfig(内网IP) more /etc/hostname #查看主机名称
vi /etc/hosts (在这里面编辑,获取完上面两个后粘贴进去)不能写错
比如本地是7Z,我们在b
scp /var/lib/rabbitmq/.erlang.cookie root@iZ2xxxxx8Z:/var/lib/rabbitmq/
单机多节点:伪集群
主节点关闭后,队列从节点也会自动消失
换句话说,数据只存在于主节点,从节点并不存在,如果关闭的是rabbit2,那么testQueue2的数据会消失.
仲裁队列:
一种基于Raft一致性算法实现的持久化实现的队列,仲裁队列提高队列复制能力,保证数据的高可用和安全性,使用仲裁队列可用在RabbitMQ节点进行队列数据的复制,从而达到在一个节点宕机时,队列仍然可以提供服务的效果.
Raft是一种共识算法,旨在实现高可用性和数据的持久性,通过节点间复制数据,来保证分布式系统中的一致性.
选主:
每个节点处于以下三种角色之一:
Leader(领导制):负责处理所有的客户请求,并将这些请求作为日志复制项,复制到所有Foolower,Leader定期向所有Foller发送心跳消息,以维持领导者地位,防止Follower进入选举过程
Follow(跟随者):接收来自Leader的日志条目,并且在本地应用这些条目,跟随者不直接处理客户请求
Candidate(候选者):当跟随者在一段时间内没有收到来自Leader的心跳消息时候,他就不确定Leader是否可用,这种情况下,跟随者变成候选者,并且尝试投票过程选举Leader.
正常情况:
集群中只有一个Leader,其余都是follow
任期:
Raft将时间划分任意长度的任期,每一段任期从一次选举开始,此时会有一个或者多个candidate尝试变成leader,在成功一次完成选举后,这个leader会一致管理集群,直到任期结束,在某些情况下,一次选举选不出来leader,这个时候任期会以没有leader而结束
RequestVote RPC:请求投票,由candidate在选举过程中发出
AppendEntries RPCs:追加条目,由leadeer发出,用来做日志复制和提供心跳机制。
选举过程
当服务器启动,所有节点都是follow状态,如果follower在election timeout内没有收到来自leader的心跳,则会主动发起选举
为了解决情况三循环:Raft采取选举超时时间,确保很少产生无结果投票,并且就算发生了也能很快解决,为了防止选票一开始就瓜分,一半超时时间有一个固定区间(150-300ms)随机选择,一半超时时间短了,会重新选举更快,这样就可以赢得选举,并且在其他服务器超时之前,发送心跳
仲裁队列,如果集群中节点少于5个,一主两从
大于5个,则一主四从,假如7个,也是一主四从
使用HAProxy进行负载均衡,电脑不咋好使,就不截图了。
单个节点宕机,并不影响整个集群的使用。