1.RabbitMQ是高性能的异步通讯组件
何为异步通讯
打电话就是同步通讯,微信聊天可以理解为异步通讯,不是实时的进行通讯:时效性差。
同步调用的缺点:
拓展性差(需求不尽提)
性能下降
级联失败
优势:时效性强,等到结果再返回
异步调用
异步调用是基于消息通知形式实现,一般包含三个角色:消息发送者 消息代理 消息接收者
消息代理:负责管理转存转发消息
优势:解除耦合,拓展性强
无需等待,性能好
故障隔离
缓存消息、削峰填谷
缺点:不能立即得到结果
不确定下游服务是否成功
依赖某一个业务
技术选型
2.RabbitMQ基本介绍
MQ整体架构及核心概念
-
publisher:消息发送者
-
consumer:消息的消费者
-
queue:队列,存储消息
-
virtual-host:虚拟主机,起到数据隔离作用
-
exchange:交换机,负责路由消息
3.数据隔离
在MQ中创建虚拟主机实现数据隔离
4.SpringAMQP
AMQP:消息通信协议与语言无关,更符合微服务中独立性的要求
Spring AMQP:基于AMQP协议定义的API规范,提供模板来发送和接受消息。->spring rabbit默认实现
AMQP依赖:父工程加入依赖
配置文件:每个微服务端引入MQ服务的信息
发送消息:RabbitTemplate.converAndSend(队列名,消息体内容)
消息接受:
4.1 WorkQueue
实现一个队列绑定多个消费者
默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者,但并不会考虑消费者是否处理完消息,可能会出现消息堆积
可以通过修改配置文件,设置preFetch为1,确保同一时刻最多传递给消费者1条消息
Work模型的使用:
1.多个消费者绑定到一个队列,可以加快消息的处理速度
2.同一条消息只会被一个消费者处理
3.通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳
4.2 Fanout交换机
fanout的交换机会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式
交换机的作用:
接收生产者发送的消息
将消息按照规则路由到与之绑定的队列
Fanout交换机会将消息路由到每个绑定的队列
4.3 Direct交换机
Direct交换机会将接受到的消息根据规则路由到指定的queue,因此称为定向路由
每一个queue都与exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到 BKey和Rkey一致的队列
生产者:
4.4 Topic交换机
与Direct交换机类似,区别在于routingKey可以是多个单词的列表,并且以.分割
queue与exchange指定bindingKey时,可以使用通配符:
*:代指一个单词;#:代指0个或多个单词
描述二者差异:
direct的routingKey必须是具体topic可以是多个单词,用.分割,并且队列与交换机绑定时的routingkey可以使用通配符
4.5 用java代码声明队列和交换机
SpringAMQP提供几个类,用来声明队列、交换机及其绑定关系
代码demo:
除了上述基于Bean形式声明队列和交换机还有一种方便的方式:
4.6 消息转换器
Spring的对消息对象的处理是由MessageConverter来处理的,而默认实现是simpleMessageConverter,基于JDK的ObjectOutputStream
有几个问题:
- JDK的序列化有安全性问题
- JDK序列化的消息太大
- JDK序列化的消息可读性差
推荐使用JSON序列化代替默认的JDK序列化:
publisher和consumer都要引入jackson依赖
publisher和consumer配置messageConverter
5.消息传输可靠性
5.1 生产者可靠性
5.1.1 生产者重连
利用重试机制可有效提高消息发送的成功率,不过SpringAMQP提供的重试机制是阻塞式的重试,线程是被阻塞的,会影响到业务性能
5.1.2 生产者确认
原理:
代码具体实现:
在生产者微服务中添加配置:
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:
Confirmcallback 发消息
demo:
- 生产者确认需要额外的资源,尽量不使用
- 如果一定使用,无需开启Publisher-Return机制,因为一般路由失败是业务问题
- 对于nack消息可以有限次重试,依然失败则记录异常信息
5.2 MQ的可靠性
5.2.1 数据持久化
在默认情况下,RabbitMQ会将收到的信息保存到内存中以降低消息收发的延迟。这样会导致俩个问题:
- 一旦MQ宕机,内存的消息就会丢失
- 内存空间有限,消费者发生故障或处理过慢,会导致消息积压,引发MQ阻塞
实现数据持久化:
交换机持久化
队列持久化
消息持久化
5.2.2 Lazy Queue
从MQ3.6.0版本之后,引入懒惰队列的概念,懒惰队列的特征:
1、接收到消息不保存到内存中,而是存入磁盘(内存只保留最近的消息,默认是2048条)
2、消费者要消费信息时才会从磁盘读取消息
3、支持百万级别的消息存储
3.12版本之后队列只有懒惰队列,无法更改
配置方法:
5.3 消费者的可靠性
为了确认消费者是否成功处理消息,MQ提供了消费者确认机制,当消费者处理消息结束后,应该向MQ发送一个回执,告知自己的消息处理状态。回执有三种:
ack、nack、reject
5.3.1 消费者确认机制
SpringAMQP已经实现了消息确认功能,通过配置文件选择ACK处理方式:
5.3.2 消费失败处理
失败重试机制:
当消费者出现异常后,消息会不断requeue到队列,再重新发送给消费者,无限循环,导致MQ的消息处理飙升,带来不必要的压力
失败消息处理策略:
在开启重试模式后,重试次数耗尽,需要MessageRecoverer接口来处理
消费者如何保证消息一定被消费
开启消费者确认机制为Auto,由Spring确认消息并返回ack或者其他;
开启失败重传机制后,并设置了MessageRecoverer,多次重传失败后将消息投递给指定的交换机
5.3.3 业务幂等性
同一个业务,执行一次或多次对业务状态的影响是一致的
天生幂等业务:查询业务、删除业务
唯一消息id
源码:
第二种方案就是根据业务实现:
比如说修改订单状态,想要从已支付变成未支付的情况下,可以不用设置uuid,而是在改状态前加一个判断状态的实现就可以了。
6. 延迟消息
生产者发送消息时指定一个时间,消费者不会立刻收到消息而是等待时间之后才收到消息
6.1 死信交换机
如果一个队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会指定给这个交换机,这个交换机就是死新交换机
使用场景:
控制台实现绑定死信交换机
发送消息:需要用到消息后置处理器要不然抛异常~
消费者
6.2 延迟消息插件
MQ官方推出一个插件,原生支持延迟消息功能。其原理是设计一种支持延迟消息功能的交换机,当消息投递给交换机后可以暂存一定时间,到期后再投递给交换机。
装好插件之后,交换机加参数:
发送消息实现:
6.3 取消超时订单
首先来说这个延迟消息的发送时间的确定是根据具体业务实现,就比如说针对取消超时订单的场景下,用户支付确认之后有30min的时间进行支付,那么我们可以设置一个数组,每个数组的元素就是延迟消息,第一个是10s、10s、10s、30s、30s、1min.....这样的形式来优化延迟消息的发送。