关于RabbitMQ你了解多少?
文章目录
- 关于RabbitMQ你了解多少?
- 基础篇
- 同步和异步
- MQ技术选型
- 介绍和安装
- 数据隔离
- SpringAMQP
- 快速入门
- Work queues
- 交换机
- Fanout交换机
- Direct交换机
- Topic交换机
- 声明队列和交换机
- MQ消息转换器
- 高级篇
- 消息可靠性问题
- 发送者的可靠性
- 生产者重连
- 生产者确认
- SpringAMQP实现生产者确认
RabbitMQ是目前企业中应用非常广泛的高性能的异步通讯组件
基础篇
同步和异步
同步调用:
- 优势:时效性强,等待到结果后才返回
- 不足:拓展性差、性能下降、级联失败问题
异步调用:
- 异步调用方式其实就是基于消息通知的方式,一般包含三个角色
- 消息发送者:投递消息的人,就是原来的调用方
- 消息代理:管理、暂存、转发消息
- 消息接收者:接收和处理消息的人,就是原来的服务提供方
- 优势:解除耦合,拓展性强、无需等待,性能好、故障隔离、缓存消息,流量削峰填谷
- 不足:不能立刻得到调用结果,时效性差、不确定下游业务执行是否成功、业务安全依赖于Broker的可靠性
MQ技术选型
MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broker。
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP、XMPP、SMTP、STOMP | OpenWire、STOMP、REST、XMPP、AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
介绍和安装
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址
同样基于Docker来安装RabbitMQ,使用下面的命令即可:
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin123 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hmall \
-d \
rabbitmq:3.8-management
-
15672:RabbitMQ提供的管理控制台的端口
-
5672:RabbitMQ的消息发送处理接口
-
安装完成后访问管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。 登录后即可看到管理控制台总览页面:
RabbitMQ对应的整体架构核心概念:
- publisher:生产者,也就是发送消息的一方
- consumer:消费者,也就是消费消息的一方
- queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
- exchange:交换机,负责消息路由、转发消息。生产者发送的消息由交换机决定投递到哪个队列,无存储能力
- virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
数据隔离
- 每个Virtual Host提供了一个独立的消息代理,它们之间完全隔离,相互之间不共享任何资源。通过将不同的应用程序或服务分配到不同的Virtual Host,可以实现数据的隔离。
- 每个Virtual Host都有自己独立的交换机、队列和绑定规则。这样,消息只能在同一个Virtual Host内进行路由和传递,不会跨越Virtual Host。
- 通过使用Virtual Hosts,可以将不同的应用程序或服务的消息进行逻辑隔离,确保它们之间不会互相干扰或访问彼此的数据。这在多租户环境下尤为有用,可以确保不同的租户之间的消息数据完全隔离,提高安全性和隐私性。
SpringAMQP
SpringAMQP的官方地址
- AMQP:Advanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
- Spring AMQP:Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
快速入门
-
引入spring-amqp依赖,这样publisher和consumer服务都可以使用:
<!——AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
配置RabbitMQ服务端信息
spring: rabbitmq: host: 192.168.100.101 #主机名 port: 5672 # 端口 virtual-host: /liner #虚拟主机 username: admin #用户名 password: admin123 #密码
-
发送消息:SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { //队列名称 String queueName = "simple.queue"; //消息 String message = "hello,spring amqp!"; //发送消息 rabbitTemplate.convertAndSend(queueName,message); }
-
接收消息:SpringAMQP提供声明式的消息监听,只需通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法
@slf4j @Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { log.info("spring消费者接收到消息:【"+ msg + "】"); if (true) { throw new MessageconversionException("故意的"); } log.info("消息处理完成"); } }
Work queues
Work queues,任务模型。让多个消费者绑定到一个队列,共同消费队列中的消息。
消费者消息推送限制
默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者。这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
因此需要修改application.yml
,设置preFetch
值为1,确保同一时刻最多投递给消费者1条消息
spring:
rabbitmq:
listener:
simple:
prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
work模型的使用:
- 多个消费者绑定到一个队列,可以加快消息处理速度
- 同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳
交换机
真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机的类型有以下三种:
- Fanout:广播
- Direct:定向
- Topic:话题
交换机的作用:
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
Fanout交换机
Fanout Exchange:会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式。
Direct交换机
Direct Exchange :会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
Topic交换机
TopicExchange:与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以 “.” 分割。
Queue与Exchange指定BIndingKey时可以使用通配符:
#
:代指0个或多个单词*
:代指一个单词
声明队列和交换机
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
- Queue:用于声明队列,可以用工厂类QueueBuilder构建
- Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
@Configuration
public class FanoutConfiguration{
@Bean
public FanoutExchange fanoutExchange(){
//ExchangeBuilder.fanoutExchange("").build();
return new FanoutExchange("liner.fanout");
}
@Bean
public Queue fanoutQueue(){
//QueueBuilder.durable("").build();
return new Queue("fanout.queue");
}
@Bean
public Binding fanoutBinding(Queue fanoutQueue,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
}
@Bean
public Binding fanoutBinding2(){
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}
}
SpringAMQP还提供了基于 @RabbitListener 注解来声明队列和交换机的方式:
@RabbitListener(bindings = @QueueBinding(
value = Queue (name = "direct.queue",durable = "true"),exchange = @Exchange(name = "liner.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}
))
public void listenDirectQueue(String msg){
System.out.println("消费者1接收到Direct消息:【"+msg+"】");
}
MQ消息转换器
Spring的消息发送代码接收的消息体是一个Object:
而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 默认情况下Spring采用的序列化方式是JDK序列化。存在问题:数据体积过大、有安全漏洞、可读性差
因此建议采用JSON序列化代替默认的JDK序列化
- 在
publisher
和consumer
两个服务中都引入依赖:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
注意,如果项目中引入了spring-boot-starter-web
依赖,则无需再次引入Jackson
依赖。
- 配置消息转换器,在
publisher
和consumer
两个服务的启动类中添加Bean:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
消息转换器中添加的messageId可以便于我们将来做幂等性判断
高级篇
消息可靠性问题
发送者的可靠性
生产者重连
有的时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:
spring:
rabbitmq:
connection-timeout: 1s #设置MQ的连接超时时间
template:
retry:
enabled: true #开启超时重试机制
initial-interval: 1000ms #失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 #最大重试次数
注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
生产者确认
RabbitMQ有Publisher Confirm和Publisher Return两种确认机制。开启确机制认后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:
- 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
- 其它情况都会返回NACK,告知投递失败
SpringAMQP实现生产者确认
-
在publisher这个微服务的application.yml中添加配置:
spring: rabbitmq: publisher-confirm-type: correlated #开启publisher confirm机制,并设置confirm类型 publisher-returns: true #开启publisher return机制 #这里publisher-confirm-type有三种模式可选: # none: 关闭confirm机制 # simple: 同步阻塞等待MQ的回执消息 # correlated: MQ异步回调方式返回回执消息
-
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { //获取RabbitTemplate RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); //设置ReturnCallback rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> { log.info("消息发送失败,应答码{{},原因{},交换机{},路由键{},消息{}", replyCode,replyText,exchange,routingKey,message.toString()); }); } }
-
发送消息,指定消息ID、消息ConfirmCallback
@Test void testPublisherConfirm() throws InterruptedException { //1.创建CorrelationData CorrelationData cd = new CorrelationData(); //2.给Future添加ConfirmCallback cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { @Override public void onFailure(Throwable ex) { // 2.1.Future发生异常时的处理逻辑,基本不会触发 log.error("handle message ack fail", ex); } @Override public void onSuccess(CorrelationData.Confirm result) { // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容 if(result.isAck()){ // result.isAck(), boolean类型, true代表ack回执,false代表nack回执 log.debug("发送消息成功,收到ack!"); }else{ // result.getReason(), String类型,返回nack时的异常描述 log.error("发送消息失败,收到nack,reason:{}",result.getReason()); } } }); //3.发送消息 rabbitTemplate.convertAndSend("liner.direct","red","hello",cd); }