RabbitMQ使用手册

news2024/11/16 21:40:45

SpringCloud微服务

什么是微服务

维基百科:微服务是一种软件架构风格,它是以专注于单一职责的很多小型项目为基础,组合出复杂的大型应用。

微服务拆分

拆分目标

高内聚:每个微服务的职责要尽量单一,包含的业务相互关联度高、完整度高。

低耦合:每个微服务的功能要相对独立,尽量减少对其它微服务的依赖。

拆分方式

纵向拆分:按照业务模块来拆分。

横向拆分:抽取公共服务,提高复用性。

RabbitMQ

同步调用

优点:时效性强,等待到结果后才返回

缺点:扩展性差、性能下降(调用链越长耗时越久)、级联失败问题(一个调用点卡住,后面的链路都不能执行)

异步调用

异步调用通常是基于消息通知的方式,包含三个角色:

消息发送者:投递消息的人,就是原来的调用者

消息接收者:接收和处理消息的人,就是原来的服务提供者

消息代理:管理、暂存、转发消息,你可以把它理解成微信服务器

image-20240620225011677

优点:

耦合度低,拓展性强

异步调用,无需等待,性能好

故障隔离,下游服务故障不影响上游业务

缓存消息,流量削峰填谷

缺点:

不能立即得到调用结果,时效性差

不确定下游业务执行是否成功

业务安全依赖于Broker(消息代理)的可靠性

MQ技术选型

image-20240620233034775

RabbitMQ安装

  1. 查找镜像:docker search rabbitmq

    image-20240622142056336

  2. 拉取镜像:docker pull rabbitmq:3.8.19,指定拉取版本为3.18.19,如果不指定则默认拉取latest

    image-20240622142226656

  3. 查看镜像:docker images

    image-20240622142422222

  4. 启动镜像:设置账号登录为admin,登录密码为admin,不指定镜像版本,默认启动rabbitmq:latest

    docker run \
    -e RABBITMQ_DEFAULT_USER=admin \
    -e RABBITMQ_DEFAULT_PASS=admin \
    --name mq \
    --hostname localhost \
    -p 15672:15672 \
    -p 5672:5672 \
    -d \
    rabbitmq:3.8.19
    

    image-20240622142648269

  5. 查看容器:docker ps

    image-20240622142819875

  6. 进入RabbitMQ容器:docker exec -it 4df /bin/bash

    image-20240622143041949

  7. 开启RabbitMQ后台访问:rabbitmq-plugins enable rabbitmq_management

    image-20240622143150816

  8. 退出容器bash:exit

    image-20240622143240346

  9. 网页访问RabbitMQ后台:访问http://localhost:15672,账号admin,密码admin

    image-20240622143517023

常见问题:

  1. 后台管理系统的可视化界面中出现:All stable feature flags must be enabled after completing an upgrade

    **解决方案:**点击Admin -> Feature Flags,确保所有稳定的特性标志都是启用状态。如果有任何标志未启用,请将其启用。

    image-20240622214453608

  2. 后台管理系统的可视化界面中出现:Stats in management UI are disabled on this node

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    **解决方案:**进入RabbitMQ容器,运行命令:echo management_agent.disable_metrics_collector=false>/etc/rabbitmq/conf.d/management_agent.disable_metrics_collector.conf,退出RabbitMQ容器,然后运行docker restart 容器id重启RabbitMQ容器。

    image-20240622215340267

  3. 后台管理系统的可视化界面中 Overview 不显示图形的问题

    解决方案:同《2. 后台管理系统的可视化界面中出现:Stats in management UI are disabled on this node》

RabbitMQ介绍

  • publisher:消息发送者

  • comsumer:消息消费者

  • queue:队列-存储消息

  • exchange:交换机-接收发送者发送的消息,并将消息路由到与其绑定的队列

  • virtual-host:虚拟主机-将数据隔离(多个项目使用同一个RabbitMQ时,可以为每个项目建立一个virtual-host,将不同项目之间的exchange和queue隔离)

image-20240621002222015

Work Queues(任务模型)

任务模型简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。同一个消息只会被一个消费者处理。多个消费者绑定到一个队列,可以加快消息处理速度。

image-20240622235716183

默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。因此我们需要修改消费者的application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息,消费者处理完后再投递下一条消息。

image-20240623005915858

Fanout交换机

Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue,所以也叫广播模式。 Fanout交换机会将收到的消息复制成n份,然后将消息发送到n个与其绑定的队列中。

image-20240623010648252

应用场景:用户支付成功后,交易服务更新订单状态,短信服务通知用户,积分服务为用户增加积分。

实现:交易服务的queue、短信服务的queue、积分服务的queue都绑定到Fanout交换机,用户支付成功后,支付服务将消息发送到Fanout交换机,这样交易服务、短信服务、积分服务九都能收到这条消息了。

案例演示

实现思路:

  1. 在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
  2. 在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定
  3. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
  4. 在publisher中编写测试方法,向hmall.fanout发送消息

代码实现:

image-20240623013810879

发送者:

@SpringBootTest
class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue(){
        String exchangeName="hmall.fanout";
        String message="hello everyone";    						rabbitTemplate.convertAndSend(exchangeName,null,message);
    }
}

消费者:

@Component
@Slf4j
public class SpringRabbitListener {
    @RabbitListener(queues = "fanout.queue1")
    public void listenerWorkQueue1(String message){
        log.info("消费者1接收到消息:{},---{}",message, LocalTime.now());
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenerWorkQueue2(String message){
        log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());
    }
}

消费者输出:

image-20240623015031061

Direct交换机

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

  • 每一个Queue都与Exchange设置一个Bindingkey(可以为每一个Queue指定相同的Bindingkey,实现和Fanout交换机相同的功能)。
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

image-20240623124250467

应用场景:用户取消后,只需要给交易服务发送消息,通知交易服务更新订单状态,而不需要给短信服务和积分服务发送消息。

案例演示

实现思路:

  1. 在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2
  2. 在RabbitMQ控制台中,声明交换机hmall. direct,将两个队列与其绑定,routeKey 为blue时路由到direct.queue1,为yellow时路由到direct.queue2,为red时路由到direct.queue1和direct.queue2
  3. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
  4. 在publisher中编写测试方法,利用不同的RoutingKey向hmall. direct发送消息

代码实现:

image-20240623130217877

消费者

@Component
@Slf4j
public class SpringRabbitListener {
    @RabbitListener(queues = "direct.queue1")
    public void listenerWorkQueue1(String message){
        log.info("消费者1接收到消息:{},---{}",message, LocalTime.now());
    }
    @RabbitListener(queues = "direct.queue2")
    public void listenerWorkQueue2(String message){
        log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());
    }
}

发送者

@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue(){
  //交换机名称
  String exchangeName="hmall.direct";
  //消息
  String message_blue="hello blue";
  String message_yellow="hello yellow";
  String message_red="hello red";
  //发送消息
 rabbitTemplate.convertAndSend(exchangeName,"blue",message_blue);
 rabbitTemplate.convertAndSend(exchangeName,"yellow",message_yellow);
 rabbitTemplate.convertAndSend(exchangeName,"red",message_red);
}

消费者输出:

image-20240623130816236

Topic交换机

TopicExchange也是基于routingkey做消息路由,但是routingkey通常是多个单词的组合,并且以.分割。

Queue与Exchange指定routingkey时可以使用通配符:

  • #:代指0个或多个单词
  • *:代指一个单词

image-20240623131430290

案例演示

实现思路:

  1. 在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2
  2. 在RabbitMQ控制台中,声明交换机hmall. topic,将两个队列与其绑定
  3. 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
  4. 在publisher中编写测试方法,利用不同的RoutingKey向hmall. topic发送消息

代码实现:

image-20240623133252103

消费者:

@Component
@Slf4j
public class SpringRabbitListener {
    @RabbitListener(queues = "topic.queue1")
    public void listenerWorkQueue1(String message){
        log.info("消费者1接收到消息:{},---{}",message, LocalTime.now());
    }
    @RabbitListener(queues = "topic.queue2")
    public void listenerWorkQueue2(String message){
        log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());
    }
}

发送者1:

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue(){
        //交换机名称
        String exchangeName="hmall.topic";
        //消息
        String message="中国新闻";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
    }

消费者输出:

image-20240623133933171

发送者2:

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue(){
        //交换机名称
        String exchangeName="hmall.topic";
        //消息
        String message="中国天气";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"china.weather",message);
    }

消费者输出:

image-20240623134104501

AMQP

Advanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

RabbitMQ使用

后台可视化界面操作

  • 创建用户

    image-20240622221803997

  • 创建虚拟主机

    image-20240622221157604

  • 为用户添加可访问的虚拟主机

    image-20240622222126051

    image-20240622222326991

    注意:当前登录用户默认有权限访问其创建的所有虚拟主机。

  • 创建队列

    image-20240622232912672

    • Durability:

      Durable:持久化队列,Rabbit服务器重启后,这个队列还会存在

      Transient:临时队列,Rabbit服务器重启后,这个队列将会被删除

  • 查看队列的消费者

    image-20240623000921437

  • 向队列中发布消息

    image-20240623001452004

  • 获取队列中消息

    队列中可以存储消息。当队列中的消息未被消费时,消息将存储在队列中,此时可以查看队列中的消息。

    image-20240623002855751

    • Act Mode:

      Nack message requeue true:获取消息,但是不做ack应答确认,消息重新入队

      Ack message requeue false:获取消息,应答确认,消息不重新入队,将会从队列中删除

      reject requeue true:拒绝获取消息,消息重新入队

      reject requeue false:拒绝获取消息,消息不重新入队,将会被删除

    • Encoding:可以选择将消息进行base64编码

    • Messages:从队列中获取的消息数量

  • 清理消息

    image-20240623214725176

代码操作

  1. 引入依赖

    <!-- AMQP依赖,包含RabbitMQ-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  2. application.yaml中配置RabbitMQ

    spring:
      rabbitmq:
        host: 192.168.1.2 # RabbitMQ地址
        port: 5672 # 端口
        virtual-host: /hmall # 虚拟主机
        username: jack # 用户名
        password: jack # 密码
    
创建队列和交换机

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

image-20240623135444692

如果已经存在交换机、队列、绑定关系,运行代码时则不会进行创建,而且也不会报错。

通常发送者只需要关心消息发送,消费者关心队列、交换机、以及绑定关系,所以创建操作一般写在消费者中。

Sping提供了基于java bean和基于@RabbitListener注解两种方式创建。

  • 基于bean代码演示:
package com.itheima.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfiguration {
  //声明交换机
    @Bean
    public FanoutExchange fanoutExchange(){
//        方式1
//        return new FanoutExchange("hmall.fanout");
//        方式2
        return ExchangeBuilder.fanoutExchange("hmall.fanout").build();
    }
  //声明队列
    @Bean
    public Queue fanoutQueue1(){
//        方式1
//        return new Queue("fanout.queue1",true);
//        方式2
        return QueueBuilder.durable("fanout.queue1").build();
    }
  //将队列和交换机绑定
    @Bean
    public Binding fanoutQueue1Binding(Queue fanoutQueue1,FanoutExchange fanoutExchange){//spring会去找beanName='fanoutQueue1'的bean作为参数传进来
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    @Bean
    public Queue fanoutQueue2(){
        return QueueBuilder.durable("fanout.queue2").build();
    }
    @Bean
    public Binding fanoutQueue2Binding(Queue fanoutQueue2,FanoutExchange fanoutExchange){//spring会去找beanName='fanoutQueue2'的bean作为参数传进来
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}
  • 基于@RabbitListener注解代码演示:
@Component
@Slf4j
public class SpringRabbitListener {
    @RabbitListener(bindings = @QueueBinding( //将交换机和队列绑定
            value = @Queue(name="direct.queue1",durable = "true"), //如果没有队列direct.queue1则创建队列,并监听队列direct.queue1
            exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT), //如果没有交换机hmall.direct则创建交换机
            key = {"blue","red"} //routingKey
            ))
    public void listenerWorkQueue1(String message){
        log.info("消费者1接收到消息:{},---{}",message, LocalTime.now());
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name="direct.queue2",durable = "true"),
            exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
            key = {"yellow","red"}
    ))
    public void listenerWorkQueue2(String message){
        log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());
    }
}
发送消息
  • 直接发送给队列

    方法:public void convertAndSend(String routingKey, final Object object),直接发给队列时,routingKey相当于队列名。

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimp leQueue() {
      //队列名称
      String queueName = "simple. queue";
      //消息
      String message = "hello, spring amqp!";
      //发送消息
      rabbitTemplate.convertAndSend(queueName, message)}
    

    注意:队列不显示绑定交换机时,默认还是会绑定到defalut exchange上

    image-20240623143726683

  • 发送给Fanout Exchange

    方法:public void convertAndSend(String exchange, String routingKey, final Object object),使用Fanout Exchange时,routingKey相当于队列名,发送给Fanout Exchange时,routingKey传null或""

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue(){
      //交换机名称
      String exchangeName="hmall.fanout";
      //消息
      String message="hello everyone";
      //发送消息
    rabbitTemplate.convertAndSend(exchangeName,null,message);
    }
    
  • 发送给direct交换机

    方法:public void convertAndSend(String exchange, String routingKey, final Object object),routingKey就是交换机和队列绑定时的routingKey

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue(){
      //交换机名称
      String exchangeName="hmall.direct";
      //消息
      String message_blue="hello blue";
      String message_yellow="hello yellow";
      String message_red="hello red";
      //发送消息
     rabbitTemplate.convertAndSend(exchangeName,"blue",message_blue);
     rabbitTemplate.convertAndSend(exchangeName,"yellow",message_yellow);
     rabbitTemplate.convertAndSend(exchangeName,"red",message_red);
    }
    
  • 发送给topic交换机

    方法:方法:public void convertAndSend(String exchange, String routingKey, final Object object)

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue(){
      //交换机名称
      String exchangeName="hmall.topic";
      //消息
      String message="中国新闻";
      //发送消息
      rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
    }
    
接收消息
@Component
@Slf4j
public class SpringRabbitListener {
    @RabbitListener(queues = "队列名")
    public void listenerSimpleQueue(String message){
        log.info("消费者收到消息:{}",message);
    }
}
配置消息转换器

convertAndSend方法会先将消息进行序列化,然后再发送。

Spring的对消息对象的处理是由org.springframework.amap.support.converter.Messageconverter来处理的。而
默认实现是SimpleMessageConverter,如果消息实现了Serializable接口,则会使用serialize方法进行序列化,而serialize方法是基于JDK的Objectoutputstream完成序列化的。存在下列问题:

  • JDK的序列化有安全风险
  • JDK序列化的消息太大
  • JDK序列化的消息可读性差

image-20240623151642723

建议采用JSON序列化代替默认的JDK序列化,要做两件事情:

  1. 在publisher和consumer中都要引入jackson依赖,发送者和消费者要使用相同的消息转换器:

    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
    </dependency>
    
  2. 在publisher和consumer中都要配置MessageConverter:

    @Bean
    public MessageConverter messageConverter() {
      return new Jackson2JsonMessageConverter();
    }
    

测试:

  • 使用默认的消息转换器

    发送者:

    package com.itheima.publisher;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    import java.io.Serializable;
    
    @SpringBootTest
    class SpringAmqpTest {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSimpleQueue() {
            User jack = new User("jack", 18);
            rabbitTemplate.convertAndSend("testConvertMessage", jack);
        }
    }
    @Data
    @AllArgsConstructor
    class User implements Serializable { //要实现Serializable接口,否则convertAndSend方法进行消息转换时会抛出异常
        private String name;
        private Integer age;
    }
    

    查看消息:

    image-20240623155543865

    消费者:

    package com.itheima.consumer.mq;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.Serializable;
    
    @Component
    @Slf4j
    public class SpringRabbitListener {
        @RabbitListener(queues = "testConvertMessage")
        public void listenerWorkQueue(User message){
            log.info("消费者接收到消息:{}",message);
        }
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    class User implements Serializable {
        private String name;
        private Integer age;
    }
    

    消费者输出:

    image-20240623155819000

  • 配置消息转换器

    发送者:

    package com.itheima.publisher;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    class SpringAmqpTest {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSimpleQueue() {
            User jack = new User("jack", 18);
            rabbitTemplate.convertAndSend("testConvertMessage", jack);
        }
    }
    @Data
    @AllArgsConstructor
    class User {
        private String name;
        private Integer age;
    }
    

    查看消息:

    image-20240623154053513

    消费者:

    package com.itheima.consumer.mq;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    
    @Component
    @Slf4j
    public class SpringRabbitListener {
        @RabbitListener(queues = "testConvertMessage")
        public void listenerWorkQueue(User message){ //自动将json字符串转为User独享
            log.info("消费者接收到消息:{}",message);
        }
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor //消费者将消息转为User对象时,User对象一定要有空参构造器
    class User {
        private String name;
        private Integer age;
    }
    

    消费者输出:

    image-20240623155128402

消息可靠性

消息丢失三种情况:

  • 发送者到MQ服务器时消息丢失
  • MQ服务器宕机导致消息丢失
  • MQ服务器将消息发送给消费者时消息丢失

发送者的可靠性

发送者重连

有的时候由于网络波动,可能会出现发送者连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制,默认重连机制是关闭的。

spring:
  rabbitmq:
    connection-timeout: 1s #设置MQ的连接超时时间,超过1秒钟还没有连上MQ则表示连接超时
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

案例演示:

  1. 停止MQ

    image-20240623173037488

  2. 开启重连

    spring:
      rabbitmq:
        host: 192.168.1.2 
        port: 5672 
        virtual-host: /hmall 
        username: jack 
        password: jack 
        connection-timeout: 1s 
        template:
          retry:
            enabled: true 
            initial-interval: 1000ms 
            multiplier: 1 
            max-attempts: 3 
    
  3. 发送者发送消息

    package com.itheima.publisher;
    
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    class SpringAmqpTest {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSimpleQueue() {
            rabbitTemplate.convertAndSend("testConvertMessage", "你好");
        }
    }
    
  4. 消息发送失败

    image-20240623173414691

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能,如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

发送者确认

SpringAMQP提供了Publisher Confirm和Publisher Return两种确认机制。开启确人机制后,当发送者发送消息给MQ后,MQ会返回确认结果给发送者。返回的结果有以下几种情况:

  • 消息投递到了MQ,但是MQ路由失败。此时会通过PublisherReturn返回路由异常原因,然后PublisherConfirm返回ACK,告知发送者投递成功
  • 临时消息投递到了MQ,并且入队成功,PublisherConfirm返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队且完成持久化,PublisherConfirm返回ACK,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

image-20240623174629789

开启发送者确认机制:

  1. 开启配置

    spring:
      rabbitmq:
        publisher-confirm-type: correlated # 开局publisher confirm机制,并设置confirm类型
        publisher-returns: true # 开局publisher return机制
    

    这里publisher-confirm-type有三种模式可选:

    • none:关闭confirm机制

    • simple:同步阻塞等待MQ的回执消息

    • correlated:MQ异步回调方式返回回执消息

  2. 为RabbitTemplate配置ReturnsCallback

    每个RabbitTemplate只能配置一个ReturnsCallback,因此需要在项目启动过程中配置:

    image-20240623205437418

  3. 每次发送消息时,指定消息ID、消息ConfirmCallback

    image-20240623175841421

案例演示:

  1. 开启发送者确认配置

    spring:
      rabbitmq:
        host: 192.168.1.2 # RabbitMQ地址
        port: 5672 # 端口
        virtual-host: /hmall # 虚拟主机
        username: jack # 用户名
        password: jack # 密码
        publisher-confirm-type: correlated # 开局publisher confirm机制,并设置confirm类型
        publisher-returns: true # 开局publisher return机制
    
  2. 定义ReturnsCallback

    package com.itheima.publisher;
    
    import lombok.AllArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Configuration;
    import javax.annotation.PostConstruct;
    
    @Configuration
    @Slf4j
    @AllArgsConstructor
    public class MqConfig {
        private final RabbitTemplate rabbitTemplate;
        @PostConstruct
        public void init() {
            rabbitTemplate.setReturnsCallback(returnedMessage -> {
                log.info("监听到了消息return callback");
                log.info("exchange: {}", returnedMessage.getExchange());
                log.info("routingKey: {}", returnedMessage.getRoutingKey());
                log.info("message:{}", returnedMessage.getMessage());
                log.info("replyCode: {}", returnedMessage.getReplyCode());
                log.info("replyText: {}", returnedMessage.getReplyText());
            });
        }
    }
    
  3. 定义ConfirmCallback并发送消息

    3.1 发送成功

    package com.itheima.publisher;
    
    import lombok.extern.slf4j.Slf4j;
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    
    @SpringBootTest
    @Slf4j
    class SpringAmqpTest {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testConfirmCallback() {
            //0. 创建CorrelationData,并设置消息ID
            CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
            cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
                @Override
                public void onFailure(Throwable ex) {
                    log.error("spring amqp 处理确认结果异常", ex);
                }
                @Override
                public void onSuccess(CorrelationData.Confirm result) {
                    if (result.isAck()) {
                        log.info("收到ConfirmCallback ack,消息发送成功!");
                    } else {
                        log.info("收到ConfirmCallback nack,消息发送失败!", result.getReason());
                    }
                }
            });
            //1. 交换机名称
            String exchangeName = "hmall.direct";
            //2. 消息
            String message = "测试发送者确认";
            //3. 发送消息
            rabbitTemplate.convertAndSend(exchangeName, "blue", message, cd);
            //4. 此单元测试方法执行完,main线程就结束了,因此需要睡眠2s接收回调函数
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    image-20240623212257140

    3.2 发送失败-路由失败

    rabbitTemplate.convertAndSend(exchangeName, "blue22", message, cd);
    

    image-20240623212531629

注意:发送者确认机制需要发送者和MQ进行确认,会大大影响消息发送的效率,通常情况下不建议开启发送者确认机制。

MQ的可靠性

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:

  • 一旦MQ宕机,内存中的消息会丢失
  • 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞
数据持久化

RabbitMQ实现数据持久化包括3个方面,设置为持久化后,重启MQ,交换机、队列、消息也不会丢失。

  • 交换机持久化(新建交换机默认就是持久化)

    image-20240623213739596

    D表示持久化

    image-20240623214323156

  • 队列持久化(新建队列默认就是持久化)

    image-20240623213820123

  • 消息持久化(可视化界面发送消息时默认是非持久化,SpringAmqp发送消息时默认是持久化的)

    image-20240623213936822

案例演示:

MQ接收非持久化消息

发送者发送1百万条非持久化消息

image-20240623221207869

发送耗时:

image-20240623222816310

MQ收到了一百万条非持久化消息

注意:本测试使用的MQ是3.13.3,默认使用的是Lazy Queue模式:所有的消息直接存入磁盘,不再存储到内存,所以In memory显示1。(paged out代表的就是从内存移动到磁盘中的消息的数量,Persistent表示存入磁盘且持久化的消息的数量)

image-20240623221117585

重启MQ后,一百万条非持久化消息全部丢失

image-20240623221416018

MQ接收持久化消息

发送者发送1百万条持久化消息

image-20240623222209093

发送耗时:

image-20240623222929928

MQ收到了一百万条持久化消息

注意:本测试使用的MQ是3.13.3,默认使用的是Lazy Queue模式:所有的消息直接存入磁盘,不再存储到内存,所以In memory显示1。(paged out代表的就是从内存移动到磁盘中的消息的数量,Persistent表示存入磁盘且持久化的消息的数量)

image-20240623222150284

重启MQ后,一百万条持久化消息不会丢失

image-20240623222334921

结论

在接收非持久化消息时,MQ收到消息后会先将消息存到内存中的队列中,队列满了之后会把先收到的消息存到磁盘中(这个行为称为paged out,paged out会导致MQ阻塞),然后再继续接收消息,把消息存进内存中的队列中,队列满了之后再把队列中的消息存入磁盘中,以此类推。

在接收持久化消息时,MQ会直接将消息存到磁盘中,不会等内存中的队列满了之后再将消息保存到磁盘中。

发送一千万条非持久化消息耗时:

image-20240623223503631

发送一千万条持久化消息耗时:

image-20240623223826459

从上面发送者发送一百万条消息的耗时来看,发送持久化消息比发送非持久化消息耗时更少(不需要paged out),而且持久化消息在MQ重启后不会丢失,所以建议发送持久化消息。

Lazy Queue

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列。

惰性队列的特征如下:

  • 接收到消息后直接存入磁盘,不再存储到内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条)

在3.12版本后,所有队列都是Lazy Queue模式,无法更改。

3.12版本之前的MQ设置Lazy Queue模式有三种方式:

  • 可视化界面设置

    要设置一个队列为情性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可:

    image-20240623224928445

  • Spring Bean方式设置

    image-20240623225142076

  • 注解方式设置

    image-20240623225259397

非Lazy Queue模式+持久化消息和Lazy Queue模式+持久化消息MQ接收消息速度对比:

image-20240623225516918

消费者的可靠性

消费者确认机制

消费者确认机制(Consumer Acknowledgement)是为了确认消费者是否成功处理消息。MQ将一条消息发送给消费者后,MQ上的这条消息处理待确认状态,当消费者处理消息结束后,应该向RabbitMO发送一个回执,告知RabbitMQ自己消息处理状态:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

image-20240624001937778

SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活

  • auto:自动模式(默认模式)。SpringAMQP利用 AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack。当业务出现异常时,根据异常判断返回不同结果:

    • 如果是业务异常(throw new RuntimeException),会自动返回nack
    • 如果是消息处理或校验异常,自动返回reject

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

案例演示

  1. 消费者配置

    spring:
      rabbitmq:
        host: 192.168.1.2 # RabbitMQ地址
        port: 5672 # 端口
        virtual-host: /hmall # 虚拟主机
        username: jack # 用户名
        password: jack # 密码
        listener:
          simple:
            prefetch: 1
            acknowledge-mode: auto
    
  2. 消费者

    image-20240624004654368

  3. 发送者

    image-20240624004733266

    查看消息状态:

    image-20240624004948237

    • 因为消费者抛出业务异常,所以会给MQ发送nack,然后MQ不停地向消费者投递消息

      image-20240624005751210

    查看消息内容

    image-20240624005115377

    • 查看队列中的消息,提示队列是空的,所以得出结论:待确认的消息不保存在队列中
失败重试机制

SpringAMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限的requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制:

image-20240624003901201

案例演示

  1. 消费者配置
spring:
  rabbitmq:
    host: 192.168.1.2 # RabbitMQ地址
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: jack # 用户名
    password: jack # 密码
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto
        retry:
          enabled: true
          initial-interval: 1000ms
          multiplier: 1
          max-attempts: 3
          stateless: true
  1. 消费者

    image-20240624010523176

  2. 发送者

    image-20240624010548871

  3. 消费者输出

    image-20240624011143793

  4. 查看消息状态

    image-20240624011251261

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer(默认):重试耗尽后,给MQ返回reject,MQ收到reject后会将消息丢弃。
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。

将失败处理策略改为RepublishMessageRecoverer:

  1. 首先,定义接收失败消息的交换机、队列及其绑定关系。

  2. 然后,定义RepublishMessageRecoverer:

    image-20240624012025791

案例演示

  1. 定义接收失败消息的交换机、队列、绑定关系、RepublishMessageRecoverer

image-20240624012657886

  1. 消费者

    image-20240624012751968

  2. 消费者输出

    image-20240624013013748

  3. 查看error.queue上的消息

    image-20240624013151477

业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x)),例如求绝对值的函数。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。

image-20240624013938181

消除非幂等性的手段

  • 唯一消息id

    image-20240624014152269

案例演示:

  1. 配置消息转换器

    image-20240624014540109

  2. 发送者发送消息

    image-20240624014854578

  3. 查看消息

    image-20240624014817996

  4. 消费者使用Message接收

image-20240624015108951

  • 业务判断

    image-20240624015630994

延迟消息

延迟消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。

延迟任务:设置在一定时间之后才执行的任务

image-20240624021057066

死信交换机

当一个队列中的消息满足下列情况之一时,就会成为死信 (dead letter)

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

image-20240624021939871

案例演示

  1. 消费者中定义死信交换机和队列,并监听

  2. 定义普通交换机,不需要消费者

    image-20240624022506823

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1857628.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

详解 | DigiCert EV代码签名证书

简介 DigiCert EV 代码签名证书是一种高级别的代码签名证书&#xff0c;它不仅提供了标准代码签名证书的所有安全特性&#xff0c;还增加了额外的身份验证流程&#xff0c;以确保软件开发者或发布者的身份得到最严格验证。这对于提升软件的信任度、防止恶意篡改和确保下载安全…

深入剖析Tomcat(十、十一) 详解StandardWrapper

《深入剖析Tomcat》第十章介绍了Tomcat的安全机制&#xff0c;主要就是对servlet的访问做安全验证&#xff0c;如果Tomcat中设置了某些servlet需要指定角色的用户才能访问&#xff0c;则需要客户端进行登录验证&#xff0c;如果用户名密码正确并且该用户拥有该角色的话&#xf…

Charles 显示内存不足解决方法

弹窗出现&#xff1a;Charles is running low on memory. Recording has been stopped. Please clear the session to free memory and continue recording. 官网解决方法&#xff1a; Charles runs out of memory After recording for a while Charles will run low on ava…

智能视频监控平台智能边缘分析一体机安防监控平台吸烟检测算法应用场景

智能边缘分析一体机吸烟检测算法是一种集成了先进图像处理、模式识别和深度学习技术的算法&#xff0c;专门用于实时监测和识别公共场所中的吸烟行为。以下是关于该算法的详细介绍&#xff1a; 工作原理 1、视频采集&#xff1a; 通过安装在公共场所的摄像头&#xff0c;实时…

js实现拖拽排序

<!DOCTYPE html> <html lang"zh"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>拖拽排序</title><style>* {margin: 0;p…

成为一个NB程序员,必看的5大定律!

请把这篇文章读进脑子里去&#xff0c;且在现实中用起来 除了超有意思也真的能“镀金”~~ 顺便吆喝一声&#xff0c;如果你计算机、软件工程、电子等相关专业本科及以上学历&#xff0c;欢迎来共事。前后端/测试​均可投&#xff0c;技术大厂。 定律一&#xff1a;晕轮效应 又…

头歌——机器、深度学习——手写体识别

第1关&#xff1a;神经网络基本概念 任务描述 本关任务&#xff1a;根据本节课所学知识完成本关所设置的选择题。 相关知识 为了完成本关任务&#xff0c;你需要掌握&#xff1a;1.神经网络基本概念。 神经网络基本概念 神经网络由输入层、隐藏层、输出层组成&#xff1b;…

IS022000与HACCP:提升食品安全管理的完美结合

国际标准化组织&#xff08;ISO&#xff09;于2005年9月发布了IS022000:2005标准&#xff0c;这是一项针对食品安全管理体系的国际标准。我国以等同采用的方式制定了国家标准GB/T 22000-2006《食品安全管理体系食品链中各类组织的要求》&#xff08;以下简称“GB/T22000”&…

# Kafka_深入探秘者(4):kafka 主题 topic

Kafka_深入探秘者&#xff08;4&#xff09;&#xff1a;kafka 主题 topic 一、kafka 主题管理 1、kafka 创建主题 topic 命令 1&#xff09;命令&#xff1a; # 切换到 kafka 安装目录 cd /usr/local/kafka/kafka_2.12-2.8.0/# 创建一个名为 heima 的 主题 bin/kafka-topic…

Java项目:基于SSM框架实现的电子竞技管理平台【ssm+B/S架构+源码+数据库+毕业论文】

一、项目简介 本项目是一套基于SSM框架实现的电子竞技管理平台 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严格调试&#xff0c;eclipse或者idea 确保可以运行&#xff01; 该系统功能完善、界面美观、操作简单、功能…

最新AI智能聊天对话问答系统源码(图文搭建部署教程)+AI绘画,文生图,TTS语音识别输入,文档分析

一、人工智能语言模型和AI绘画在多个领域广泛应用 人工智能语言模型和AI绘画在多个领域都有广泛的应用。以下是一些它们的主要用处&#xff1a; 人工智能语言模型 内容生成 写作辅助&#xff1a;帮助撰写文章、博客、报告、剧本等。 代码生成&#xff1a;自动生成或补全代码&…

python项目加密和增加时间许可证

1.bat&#xff0c;执行如下的命令&#xff0c;第一句是更新或增加许可证 第二句是加密draw_face.py python offer.py pyarmor obfuscate -O dist draw_face.py绘制自制人脸.py&#xff0c;调用加密的代码draw_face代码 import sys import os import cv2# 添加加密模块所在的路…

国内顶级汽车制造厂的创新实践:如何利用实时数据湖为更多业务提供新鲜数据?

使用 TapData&#xff0c;化繁为简&#xff0c;摆脱手动搭建、维护数据管道的诸多烦扰&#xff0c;轻量代替 OGG、DSG 等同步工具&#xff0c;「CDC 流处理 数据集成」组合拳&#xff0c;加速仓内数据流转&#xff0c;帮助企业将真正具有业务价值的数据作用到实处&#xff0c…

如何在服务器之间同步文件?

业务需求 因业务需求需要在多台服务器之间做文件资源的双向同步&#xff0c;选择 ownCloud davfs2 rsync 来实现 ownCloud ownCloud 是一个开源免费专业的私有云存储项目&#xff0c;它能帮你快速在个人电脑或服务器上架设一套专属的私有云文件同步网盘。 ownCloud 能让你…

ArkTS开发系列之导航 (2.5.2 页面组件导航)

上篇回顾: ArkTS开发系列之导航 (2.5.1 页面路由&#xff09; 本篇内容&#xff1a;主要学习页面内组件导航 一、 知识储备 1. Navigation 一般作为页面的根容器&#xff0c;包括单页面、分栏和自适应三种显示模式。 自适应模式 (NavigationMode.Auto) &#xff0c;需要注意…

三相变压器:应用和连接配置

变压器的功能和应用 变压器的类型和用途多种多样&#xff0c;可根据其应用、结构类型和尺寸进行分类。 一般来说&#xff0c;变压器的主要功能是改变交流电&#xff08;AC&#xff09;的电压水平&#xff0c;提高电压以供长距离传输或降低电压以供家庭和工业消费者使用。 它…

优先级队列模拟实现

目录 1.堆的概念 2.堆性质堆中的某个元素小于或大于他的左右孩子 3.小根堆实例 4.堆创建 4.1调整思路 4.2向下调整思路 4.3代码实现&#xff08;大根堆&#xff09; 5.堆的删除 6.堆的插入 7.常用接口 7.1PriorityQueue和PriorityBlockingQueue 1.堆的概念 如果有一…

常见硬件工程师面试题(二)

大家好&#xff0c;我是山羊君Goat。 对于硬件工程师&#xff0c;学习的东西主要和电路硬件相关&#xff0c;所以在硬件工程师的面试中&#xff0c;对于经验是十分看重的&#xff0c;像PCB设计&#xff0c;电路设计原理&#xff0c;模拟电路&#xff0c;数字电路等等相关的知识…

微服务(服务治理)

服务远程调用时存在的问题 注册中心原理 服务治理中的三个角色分别是什么&#xff1f; 服务提供者&#xff1a;暴露服务接口&#xff0c;供其它服务调用服务消费者&#xff1a;调用其它服务提供的接口注册中心&#xff1a;记录并监控微服务各实例状态&#xff0c;推送服务变更信…

软件工程体系概念

软件工程 软件工程是应用计算机科学、数学及 管理科学等原理开发软件的工程。它借鉴 传统工程的原则、方法&#xff0c;以提高质量&#xff0c;降 低成本为目的。 一、软件生命周期 二、软件开发模型 1.传统模型 瀑布模型、V模型、W模型、X 模型、H 模型 (1)瀑布模型 瀑布…