2023最新谷粒商城笔记之MQ消息队列篇(全文总共13万字,超详细)

news2025/1/11 7:12:46

MQ消息队列

其实队列JDK中本身就有,不过这种队列也只能单体服务可能会使用,一旦项目使用的分布式架构,那么一定还是需要用到一个消息中间件的。我们引入消息队列的原因就是对我们的页面相应速度再优化,让用户的体验更好,原来下订单可能需要1s等待时间,引入队列之后可能只需要50ms。

消息中间件的好处

  1. 异步处理

    最开始我们执行任务时都是同步的,比如下图的第一种模式。我们必须等各个操作的做完才能返回响应,这样效率就会很慢。例如:发送邮件、发送短信,但它们能不能收到其实并不是侧重点。因此,可以启动两个线程来执行,也就是第二种模式(异步执行),但是使用消息中间件mq可以让效率更上一层楼,我们可以把要处理的任务放进mq中,然后直接返回结果,至于任务则可以慢慢在后面进行处理。

image-20221028192254906
  1. 应用解耦

    最开始我们如果需要调用不同服务直接的代码时需要在我们的代码中加上调用其他服务方法的逻辑,如果方法需要修改比如逻辑要修改,参数要修改,我们就要修改源代码,有了mq之后我们可以把这些调用都交给mq进行处理。即使下单时库存系统不能正常使用,也不影响正常下单。因为下单后,订单系统写入消息队列就不再关心其他的后续操作了,实现订单系统库存系统的应用解耦

    image-20221028192912831
  2. 流量控制

    服务器接收用户的请求后,先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。

    比如对于大并发量的情况(秒杀),我们可以先把请求放进mq中,不需要立刻处理,让服务根据能力处理mq中的请求就可以了,达到流量削峰的目的。

    image-20221028193053095

MQ的相关概念

消息中间件概述

1.大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力

2.消息服务中两个重要概念:

  • 消息代理(message broker就是运行消息中间件的服务器,这个服务器替我们接收、发送消息)和目的地(destination)
  • 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。

3.消息队列主要有两种形式的目的地

  • 队列(queue): 点对点消息通信(point-to-point)
  • 主题(topic): 发布(publish)/订阅(subscribe)消息通信

4.点对点式

  • 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获 取消息内容,消息读取后被移出队列
  • 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者,即点对点可以有很多的消息的接收者,但消息的接受者只能有一个,谁能拿到消息需要靠抢

5.发布订阅式:

  • 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个 主题,那么就会在消息到达时同时收到消息

6.JMS(Java Message Service)JAVA消息服务:

  • 基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现

7.AMQP(Advanced Message Queuing Protocol)

  • 高级消息队列协议,也是一个消息代理的规范,兼容JMS
  • RabbitMQ是AMQP的实现

8.Spring支持

  • spring-jms提供了对JMS的支持
  • spring-rabbit提供了对AMQP的支持
  • 需要ConnectionFactory的实现来连接消息代理
  • 提供JmsTemplate、RabbitTemplate来发送消息
  • @JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息
  • @EnableJms@EnableRabbit开启支持

9.Spring Boot自动配置

  • JmsAutoConfiguration
  • RabbitAutoConfiguration

10.市面上的MQ产品

ActiveMQRabbitMQRocketMQKafka

消息代理规范

  • JMS(Java Message Service)JAVA消息服务
    基于JVM消息代理的规范。ActiveMQ、HornetMQ是 JMS 实现
  • AMQP(Advanced Message Queuing Protocol)
    高级消息队列协议,也是一个消息代理的规范,兼容JMS
    RabbitMQ 是 AMQP 的实现

下面我们来看一看JMS和AMQP两个规范(协议):

在这里插入图片描述

RabbitMQ是基于AMQP协议实现的并且兼容JMS,ActiveMQ是基于JMS实现的。

JMS和AMQP的区别在于:JMS面向纯java平台不不支持跨平台而AMQP是可以跨平台,假如后台服务有用PHP编写则可以兼容。

JMS和AMQP的简单对比 :

①.AMQP的消息模型中direct exchange是类比JMS中P2P(Queue),AMQP的其它四种消息模型则是类比于JMS的Topic

②.JMS支持的各种消息类型,AMQP只支持byte[]但也无妨最后都可以json序列化后传输

RabbitMQ概念

RabbitMQ是一个由erlang开发的遵循AMQP(Advanved Message Queue Protocol)协议的开源消息队列实现。

核心概念

Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成, 这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可 能需要持久性存储)等。

Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

Exchange有4种类型:direct(默认)fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别

Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直 在队列里面,等待消费者连接到这个队列将其取走。

Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交 换器理解成一个由绑定构成的路由表。

Exchange 和Queue的绑定可以是多对多的关系。

Connection

网络连接,比如一个TCP连接。

Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道 发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都 是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥 有自己的队列、交换器、绑定和权限机制。vhostAMQP 概念的基础,必须在连接时 指定,RabbitMQ 默认的 vhost 是 / 。

Broker

表示消息队列服务器实体。

在这里插入图片描述

工作流程: 首先,生产者客户端会向消息中间件发送Message,Message由消息头和消息体组成,消息头中有一个route-key属性用于标识存储的队列位置,消息中间件接收到消息之后会由相应的交换机将消息存储到指定的消息队列中,交换机和队列具有绑定关系,无论生产者还是消费者客户端想发送或者接收消息都需要使用connnection去创建一个长连接,长连接类似于高速公路,信道类似于高速公路中的每个车道。RabbitMQ还有一个虚拟主机即类似于Docker中的容器彼此互不干扰,不需要创建多个RabbitMQ只需要创建多个虚拟机即可实现向java后台、PHP后台发送消息(也可以用虚拟主机实现生产和开发环境,其提供一个隔离的RabbitMQ环境)。

长连接的好处是当客户端宕机之后,RabbitMQ将不会向消费者客户端发送消息而是将消息持久化保证消息不会丢失。

image-20221028195049347

docker安装RabbitMQ

这里不需要下载镜像,直接安装。默认会帮你下载

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

修改RabbitMQ配置为只要启动docker自动重启rabbitMQ

docker update rabbitmq --restart=always

下面为RabbitMQ中涉及的端口号:

对应端口号解释:

  • 4369, 25672 (Erlang发现&集群端口)
  • 5672, 5671 (AMQP端口)
  • 15672 (web管理后台端口)
  • 61613, 61614 (STOMP协议端口)
  • 1883, 8883 (MQTT协议端口)

可访问的可视化RabbitMQ端口号 : 15672 访问RabbitMQ控制页面

下面关于在RabbitMQ可视化界面的操作就不记笔记了,因为大多需要粘贴图片,排版出来会很难看,相信小伙伴们看视频也是轻松get。

AMQP中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别, AMQP 中增加了 Exchange 和 Binding 的角色 :生产者把消息发布 到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交 换器的消息应该发送到那个队列。

在这里插入图片描述

因此这里我们简要说明一下RabbitMQ中的自动创建出的四种消息模型的交换机即Exchange的类型

  1. Direct是点对点模式,一个消息只能发送给一个队列,且被一个消息接收者接收。只有消息中的路由键(routing key)和 Binding 中的 binding key 完全一致时, 交换器才会将消息发到对应的队列中,即路由键与队列名完全匹配,如果一个队列绑定到交换机要求其路由键为“dog”,那么交换机只会给这个队列转发 routing key 为“dog”的消息,不会转发 “dog.puppy”,也不会转发“dog.guard” 等等。它是完全匹配、单播的模式。

    在这里插入图片描述

  2. headers这种也是点对点的,同direct。但headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了。

  3. Fanout广播式,只要跟这个交换机绑定的队列都会收到发送到这个交换机的消息。Fanout 交换器不处理路由键,只是简单的将队列 绑定到交换器上,每个发送到交换器的 消息都会被转发到与该交换器绑定的所 有队列上。它很像子网广播,每台子网内 的主机都获得了一份复制的消息。Fanout 类型转发消息是最快的。

    在这里插入图片描述

  4. topic这种也是广播式,不过它会根据路由键进行匹配(可以通配符模糊匹配),只有匹配成功的队列才会接收到消息。将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。
    识别通配符: #匹配 0 个或多个单词, *匹配一个单词

在这里插入图片描述

SpringBoot整合RabbitMQ

  1. 导入amqp依赖
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

我们引入这个依赖之后,RabbitAutoConfiguration这个类也就自然而然引入了我们的项目中,这个配置类也就会自动注册进容器,将其配置信息生效,比如这个类就给容器中自动配置了RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate等类

@Configuration
@ConditionalOnClass({RabbitTemplate.class, Channel.class})
@EnableConfigurationProperties({RabbitProperties.class})
@Import({RabbitAnnotationDrivenConfiguration.class})
public class RabbitAutoConfiguration {
  1. 添加RabbitMQ的配置信息(看到上面配置类中加载了一个配置文件,点进去就会发现这个类绑定了我们spring默认的属性配置文件

    @ConfigurationProperties(
        prefix = "spring.rabbitmq"
    )
    public class RabbitProperties {
    

由此我们就可以去application.properties文件中配置我们rabbitMQ的信息了

spring.rabbitmq.host=192.168.10.10
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
#其他信息就不需要我们配置了,大部分都在RabbitProperties中配置了默认值
  1. 主启动类添加@EnableRabbit注解()
@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication {
    public static void main(String[] args) {
        SpringApplication.run(GulimallOrderApplication.class, args);
    }
}

RabbitMQ在Java中的使用

CachingConnectionFactory

RabbitTemplate使用CachingConnectionFactory作为连接工厂

配置类上标有这样的注解:@EnableConfigurationProperties(RabbitProperties.class)

向容器中注入CachingConnectionFactory的代码中是从配置文件中加载配置信息的。

spring.rabbitmq为配置的前缀,可以指定一些端口号,ip地址等信息。

AmqpAdmin

AmqpAdminorg.springframework.amqp.core下的类,这个类主要是用来进行一些资源的创建的,如创建交换机,队列,绑定信息等。通过此类,可以用代码的方式创建Exchange、Queue还有Binding:

@Autowired
AmqpAdmin amqpAdmin;

/**
 * 创建绑定
 */
@Test
public void createBinding() {
    // String destination 目的地
    // DestinationType destinationType 绑定类型:队列/交换机
    // String exchange 交换机名称
    // String routingKey 路由键
    //、Map<String, Object> arguments 参数
    Binding binding = new Binding("hello.queue" , Binding.DestinationType.QUEUE, "hello", "hello.queue",null);
    amqpAdmin.declareBinding(binding);
}

/**
 * 创建队列
 */
@Test
public void createMQ() {
    /**
     * @param name 队列的名称
     * @param durable 是否持久化队列
     * @param exclusive 是否声明为一个独占队列
     * @param autoDelete 如果服务不在使用时是否自动删除队列
     */
    Queue queue = new Queue("hello.queue", true, false, false);
    String s = amqpAdmin.declareQueue(queue);
    log.info("创建queue成功... {}", queue);
}

/**
 * 创建交换机
 * TopicExchange
 * FanoutExchange
 * DirectExchange
 */
@Test
public void createExchange() {
    // String name 交换机名称
    // boolean durable 是否持久化
    // boolean autoDelete 是否自动删除
    Exchange exchange = new DirectExchange("hello", true, false);
    amqpAdmin.declareExchange(exchange);
    log.info("创建exchange成功...");
}

RabbitTemplate

这个类就是用来控制消息的收发了。通过RabbitTemplate类中的方法,可以像使用Rabbit客户端一样向队列发送消息以及更多其他的操作,并且多个重载的”send“(发送消息)方法。

@Autowired
RabbitTemplate rabbitTemplate;

/**
* convertAndSend(String exchange, String routingKey, Object object)
* String exchange, 交换器
* String routingKey,   路由值
* Object object    消息,如果发送的消息是对象,我们会使用序列化机制将对象写出去。
 */
@Test
public void test() {
    // 发送消息
    rabbitTemplate.convertAndSend("hello", "hello.queue"  ,"msg");
}
  1. 发送消息,如果发送的消息是个对象,我们会使用序列化机制,将对象写出去。对象必须实现Serializable
  2. 或者我们想要发送的对象序列化为JSON格式

通过指定不同的MessageConverter来实现,可以向容器中注入我们想要的MessageConverter从而使用。

在这里插入图片描述

配置MyRabbitConfig,让发送的对象类型的消息可以是一个json

添加“com.atguigu.gulimall.order.config.MyRabbitConfig”类,代码如下:

@Configuration
public class MyRabbitConfig {
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

注意:

配置MyRabbitConfig配置类,向容器中添加一个Json的消息转换器,让发送的对象类型的消息可以是一个json(如果我们不添加,RabbitMQ就会用它配置类中自带的一个对象序列化的转换器,因为对象无法直接在网络中传输,需要转换成字符串。如果使用RabbitMQ自带的不要忘了在传输对象的类上实现Serialize序列化接口)。这里我们添加Json消息转换器,代码如下:

@RabbitListener和@RabbitHandler注解

监听消息:使用@RabbitListener和@RabbitHandler,主启动类必须有@EnableRabbit。

  • @RabbitListener: 类+方法上(监听哪些队列即可)
  • @RabbitHandler: 标在方法上(重载区分不同的消息)

@RabbitListener注解和@RabbitHandler都可以接受消息队列中的消息,并进行处理。

@RabbitListener注解:

使用@RabbitListener时主启动类必须有@EnableRabbit,其可以标记方法或类上进行使用

自定义方法的参数可以为以下类型:

1、Message message:原生消息详细信息。头 + 体

2、T <发送的消息的类型> 可以是我们自定义的对象

3、Channel channel :当前传输数据的信道。

@RabbitListener(queues = {"hello.queue"})
public String receiveMessage(Message message, OrderEntity content) {
    //消息体信息
    byte[] body = message.getBody();
    // 消息头信息
    MessageProperties messageProperties = message.getMessageProperties();
    log.info("收到的消息: {}", content);
    return "ok";
}

同时要注意:Queue可以由很多方法来监听,只要收到消息,队列就删除消息,并且只能有一个方法收到消息。并且一个方法接收消息是一个线性的操作,只有处理完一个消息之后才能接收下条消息。

@RabbitHandler注解:

@RabbitHandler标在方法上用于接受不同类型的消息对象。

@RabbitHandler标记的方法结合@RabbitListener,@RabbitHandler使用可以变得更加灵活:采用在类上加 @RabbitListener 注解,标识监听哪些消息队列。在方法上添加@RabbitHandler注解,重载区分不同的消息。

比如说,当两个方法对一个消息队列进行监听时,用于监听的两个方法用于接收消息内容的参数不同,根据消息的内容可以自动的确定使用那个方法。

@RestController
public class RabbitController {
    @Autowired
    RabbitTemplate rabbitTemplate;
    
    @GetMapping("/sendMq")
    public String sendMq(@RequestParam(value = "num",defaultValue = "10") Integer num){
        for (int i = 0; i < num; i++){
            //向一个队列中发送两种不同类型的消息
            if (i%2==0){
                OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
                orderReturnApplyEntity.setId(1L);
                orderReturnApplyEntity.setCreateTime(new Date());
                orderReturnApplyEntity.setReturnName("哈哈哈");
                //配置MyRabbitConfig,让发送的对象类型的消息,可以是一个json
                rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderReturnApplyEntity, new 				 CorrelationData(UUID.randomUUID().toString()));
            }else {
                OrderEntity entity = new OrderEntity();
                entity.setOrderSn(UUID.randomUUID().toString());
                rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",entity, new 								CorrelationData(UUID.randomUUID().toString()));
            }
        }
        return "OK";
    }
}

修改“com.atguigu.gulimall.order.service.impl.OrderItemServiceImpl”类,代码如下:

@RabbitListener(queues = {"hello-java-queue"})//queues:声明需要监听的所有队列
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
    @Override
    public PageUtils queryPage(Map<String, Object> params) {
        IPage<OrderItemEntity> page = this.page(
                new Query<OrderItemEntity>().getPage(params),
                new QueryWrapper<OrderItemEntity>()
        );
        return new PageUtils(page);
    }
     
    /**
     * 以下参数是我们自定义的,spring会自动帮我们解析
     * 参数1、Message message:原生消息详细信息。头+体
     * 参数2、T<发送的消息类型> OrderReturnApplyEntity content
     * 参数3、Channel channel 当前传输数据的通道
     * Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息
     * 场景:
     *       1)、订单服务启动多个;同一个消息,只能有一个客户端收到
     *       2)、只有一个消息完全处理完,方法运行结束,才可以接收到下一个消息
     */
    //@RabbitListener(queues = {"hello-java-queue"})
    @RabbitHandler
    public void receiverMessage(Message message,OrderReturnApplyEntity content,
                                Channel channel) throws InterruptedException {
        //消息体
        byte[] body = message.getBody();
        //消息头属性信息
        MessageProperties properties = message.getMessageProperties();
        System.out.println("接收到消息...内容:" + content);
		//Thread.sleep(3000);
        System.out.println("消息处理完成=》"+content.getReturnName());
    }
    
    @RabbitHandler
    public void receiverMessage(OrderEntity orderEntity){
        System.out.println("接收到消息...内容:" + orderEntity);
    }
}

可靠投递-发送端确认

image-20221028221058895
  • 服务器收到消息 p->b:ConfirmCallback
    1. pring.rabbitmq.publisher-confirms=true
    2. 设置确认回调 ConfirmCallback
  • 消息抵达队列就回调 e->q:ReturnCallback
    1. spring.rabbitmq.publisher-returns: true
    2. spring.rabbitmq.template.mandatory: true
    3. 设置确认回调 ReturnCallback

ConfirmCallback

image-20221028221623629

ConfirmCallbackRetruhnCallback一样都是RabbitTemplate内部的接口。

消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback。

也就是说当消息到达RabbitMQ的服务器就会执行回调方法。但是被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里,所以需要用到接下来的 returnCallback。

首先需要修改配置文件:

#开启发送端确认
spring.rabbitmq.publisher-confirms=true

然后准备一个发送消息使用的接口和两个用来监听消息队列并接收消息的方法

发送消息接口:

@RestController
public class SendMsgController {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg")
    public String sendMsg() {
        for (int i = 0; i < 10; i++) {
            if (i % 2 == 0) {
                OrderEntity orderEntity = new OrderEntity();
                orderEntity.setId(1L);
                orderEntity.setMemberUsername("Tom");
                orderEntity.setReceiveTime(new Date());
                rabbitTemplate.convertAndSend("hello-java-exchange", "hello.news", orderEntity, new CorrelationData(UUID.randomUUID().toString()));
            } else {
                OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
                orderReturnReasonEntity.setCreateTime(new Date());
                orderReturnReasonEntity.setId(2L);
                orderReturnReasonEntity.setName("test");
                orderReturnReasonEntity.setSort(1);
                rabbitTemplate.convertAndSend("hello-java-exchange", "hello.news", orderReturnReasonEntity, new CorrelationData(UUID.randomUUID().toString()));
            }
        }
        return "ok";
    }
}

监听消息队列并接收消息的方法:

@RabbitListener(queues = {"hello.news"})
@Slf4j
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
    @RabbitHandler
    public void receiveMessage1(Message message, OrderReturnReasonEntity content, Channel channel) {
        //消息体信息
        byte[] body = message.getBody();
        // 消息头信息
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("receiveMessage1 接收消息: " + content);
    }
    @RabbitHandler
    public void receiveMessage2(Message message, OrderEntity content, Channel channel) {
        //消息体信息
        byte[] body = message.getBody();
        // 消息头信息
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("receiveMessage2 接收消息: " + content);
    }
}

第三步,在配置类中定制RedisTemplate:

@Configuration
public class MyRabbitConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @PostConstruct // 该注解表示在初始化构造器之后就调用,初始化定制 RabbitTemplate
    public void initRabbitTemplate() {
        // 设置确认回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData 当前消息的唯一相关数据 (这个是消息的唯一id)
             * @param ack 消息是否成功收到
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("ConfirmCallback... correlationData: [" + correlationData + "] ==> ack: [" + ack + "] ==> cause: [" + cause + "]");
            }
        });
    }
}

那么一旦消息抵达消息队列服务器就会调用我们自己定义的配置类中实现的回调方法(打印输出消息信息)。

ReturnCallback

首先修改application.properties,配置上回调确认

#开启发送端抵达队列确认
spring.rabbitmq.publisher-returns=true
#只要抵达队列,以异步发送优先回调我们这个returnConfirm
spring.rabbitmq.template.mandatory=true

被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback

如果在交换机将消息投递到queue的过程中,发生了某些问题,最终导致消息投递失败,就会触发这个方法。

为定制的RabbitTemplate添加这个方法:

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    /**
     * @param message 投递失败的消息的详细信息
     * @param replyCode 回复的状态码
     * @param replyText 回复的文本内容
     * @param exchange 但是这个消息发给哪个交换机
     * @param routingKey 当时这个消息使用哪个路由键
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("FailMessage: [" + message + "] ==> replyCode: [" + replyText + "] ==> exchange: [" + exchange + "] ==> routingKey: [" + routingKey + "]");
    }
});

我们在发送消息的一端故意写错路由键,致使exchange投递消息失败。最后会看到回调方法ReturnCallback中打印的内容:

FailMessage: [(Body:'{"id":2,"name":"test","sort":1,"status":null,"createTime":1641608721639}' MessageProperties [headers={spring_returned_message_correlation=b6b21f2d-73ad-473d-9639-feec76953c7b, __TypeId__=com.atguigu.gulimall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])] ==> replyCode: [NO_ROUTE] ==> exchange: [hello-java-exchange] ==> routingKey: [hello.news1]

补充:在发送消息的时候还可以指定一个CorrelationData类型的参数(可以回顾上文的发送消息的方法),这个CorrelationData类的构造器参数可以填一个UUID,代表消息的唯一id,在重写ConfirmCallback中的方法的第一个参数就是这个,通过这个参数就可以获取消息的唯一id。

注意:监听方法返回值必须为void,否则控制台会不断打印报错信息。(血的教训)

可靠投递-消费端确认

image-20221029100951566

ACK(Acknowledge)消息确认机制

消费者获取到消息,成功处理,可以回复Ack给Broker

  • basic.ack用于肯定确认;broker可以删除此消息
  • basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量
  • basic.reject用于否定确认;同上,但不能批量

在默认状况下,ACK消息确认机制是当消息一旦抵达消费方,客户端会自动确认,服务端就会删除这个消息(出队),但是如果在消息消费过程中服务器宕机了,这些消息也会被删除,这就造成了消息丢失的问题。
问题:
我们收到很多消息,客户端会自动回复给服务器ack进行确认,但如果只有一个消息处理成功,然后客户端宕机了。就会发生消息丢失
这时我们改为手动确认模式。只要我们没有明确告诉MQ,消息被签收了,也就是没有ACK,消息就一直unacked状态,
即使Consumer宕机。消息也不会丢失,状态会重新变为Ready,下一次有新的Consumer连接进来就发给他

通过配置可以开启消息需要经过手动确认,才能从队列中删除消息

#手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual

修改我们的RabbitMQ的配置类MyRabbitConfig,代码如下:

@RabbitHandler
public void receiveMessage2(Message message, OrderEntity content, Channel channel) {
    //消息体信息
    byte[] body = message.getBody();
    // 消息头信息
    MessageProperties messageProperties = message.getMessageProperties();
    long deliveryTag = messageProperties.getDeliveryTag();
    //手动接收消息
    //long deliveryTag相当当前消息派发的标签,从messageProperties中获取,并且在Channel中自增的
    //boolean multiple 是否批量确认
    try {
        channel.basicAck(deliveryTag, false);
    } catch (Exception e) {
        e.printStackTrace();
    }
    System.out.println("receiveMessage2 接收消息: " + content);
}

我们在上方的代码打上断点并观察RabbitMQ客户端的状况:

在这里插入图片描述

对中总共有5条消息,并且进入了Unacked,即未被确认的状态。

但是这里使用debug模式启动然后关掉服务模拟服务器宕机会发生一个问题,就是在关闭服务之前,idea会将未执行完的方法先执行完再关闭服务。

所以可以在cmd杀掉进程模拟宕机。

这时,由于打了断点,没有走到消息确认的那一行代码,随机,服务器宕机,所有没有确认的消息都会从Unacked的状态回调Ready的状态。

有接收消息的方法就有拒绝消息的方法:basicNackbasicReject

//long deliveryTag 当前消息派发的标签
//boolean multiple 是否批量处理
//boolean requeue 拒绝后是否将消息重新入队
channel.basicNack(deliveryTag, false, true);
channel.basicReject(deliveryTag, true);

basicNackbasicReject都可以用来拒绝消息,但是basicNackbasicReject多了一个参数boolean multiple(是否批量处理)

如果将requeue设置为true,被拒绝的消息就会重新入队等待消费,false则拒绝消息就相当于丢弃此消息。

修改配置文件application.properties

#手动确认收货(ack)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@RabbitHandler
public void receiverMessage(Message message,OrderReturnApplyEntity content,
                                Channel channel) throws InterruptedException {
        //消息体
        byte[] body = message.getBody();
        //消息头属性信息
        MessageProperties properties = message.getMessageProperties();
        System.out.println("接收到消息...内容:" + content);
		//Thread.sleep(3000);
        System.out.println("消息处理完成=》"+content.getReturnName());
        //channel内按顺序自增的
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("deliveryTag:"+deliveryTag);
        //签收货物,非批量模式
        try{
            if (deliveryTag % 2 == 0){
                //收货
                channel.basicAck(deliveryTag,false);
                System.out.println("签收了货物。。。"+deliveryTag);
            }else {
                //退货第二个参数是multiple批量处理,第三个参数是丢弃的消息是否重新入队requeue=false 丢弃  requeue=true发回服务器,服务器重新将消息入队。
                channel.basicNack(deliveryTag,false,true);
                System.out.println("没有签收货物..."+deliveryTag);
            }
        }catch (Exception e){
            //网络中断
        }
}

如何签收信息:
业务成功就应该签收:channel.basicAck(deliveryTag,false);
业务处理失败就应该拒签,让别人处理:channel.basicNack(deliveryTag,false,true);


感谢耐心看到这里的同学,觉得文章对您有帮助的话希望同学们不要吝啬您手中的赞,动动您智慧的小手,您的认可就是我创作的动力!
之后还会勤更自己的学习笔记,感兴趣的朋友点点关注哦。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/373941.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ae:使用占位符

占位符 Placeholder本质上是一个静止的彩条图像&#xff0c;用来临时代替缺失的素材。自动占位符当 Ae 找不到源素材&#xff0c;比如被移动、删除或重命名&#xff0c;Ae 将自动生成占位符&#xff0c;在项目面板中用斜体显示&#xff0c;使用该素材的任何合成将用一个占位符图…

【R统计】R语言相关性分析及其可视化

&#x1f482; 个人信息&#xff1a;酷在前行&#x1f44d; 版权: 博文由【酷在前行】原创、需要转载请联系博主&#x1f440; 如果博文对您有帮助&#xff0c;欢迎点赞、关注、收藏 订阅专栏&#x1f516; 本文收录于【R统计】&#xff0c;该专栏主要介绍R语言实现统计分析的…

libxlsxwriter簇状柱形图绘制

libxlsxwriter的功能覆盖面很大&#xff0c;今天一起来看一下如何用这个库来生成带有簇状柱形图的表格。 1 簇形柱状图 首先来看一下Excel的样例表格&#xff0c;簇状柱形图往往是用来对比若干“系列”的数据在某一时间段内&#xff0c;或某一情境下的差异情况。在商务领域还…

小白量化《穿云箭集群量化》(4)指标公式写策略

小白量化《穿云箭集群量化》&#xff08;4&#xff09;指标公式写策略 穿云箭量化平台支持中文Python写量化策略&#xff0c;同时也直接支持股票公式指标写策略。下面我们看看是如何实现的。 股票软件的指标公式语法是一样的&#xff0c;不同仅仅是个别函数或绘图函数或绘图命令…

java多态理解和底层实现原理剖析

java多态理解和底层实现原理剖析多态怎么理解java中方法调用指令invokespecial和invokevirtual指令的区别invokeinterface指令方法表接口方法调用为什么不能利用方法表快速定位小结多态怎么理解 抽象事务的多种具体表现&#xff0c;称为事务的多态性。我们在编码过程中通常都是…

计算机网络 第4章 作业1

一、选择题 1. 由网络层负责差错控制与流量控制,使分组按序被递交的传输方式是_________&#xff08;C&#xff09; A&#xff0e;电路交换 B&#xff0e;报文交换 C&#xff0e;基于虚电路的分组交换 D&#xff0e;基于数据报的分组交换 2. TCP/IP 参考…

Bunifu.UI.WinForms 6.0.2 Crack

Bunifu.UI.WinForms为 WinForms创建令人惊叹的UI Bunifu.UI.WinForms我们为您提供了现代化的快速用户界面控件。用于 WinForms C# 和 VB.NET 应用程序开发的完美 UI 工具 简单 Bunifu.UI.WinForms没有臃肿的特征。正是您构建令人惊叹的 WinForms 应用程序所需要的。只需拖放然…

计算机网络高频知识点

目录 一、http状态码 二、强缓存与协商缓存 三、简单请求与复杂请求 四、PUT 请求类型 五、GET请求类型 六、GET 和 POST 的区别 七、跨域 1、什么时候会跨域 2、解决方式 八、计算机网络的七层协议与五层协议分别指的是什么 1、七层协议 2、五层协议 九、计算机网…

监控生产环境中的机器学习模型

简介 一旦您将机器学习模型部署到生产环境中&#xff0c;很快就会发现工作还没有结束。 在许多方面&#xff0c;旅程才刚刚开始。你怎么知道你的模型的行为是否符合你的预期&#xff1f;下周/月/年&#xff0c;当客户&#xff08;或欺诈者&#xff09;行为发生变化并且您的训练…

服务器部署—部署springboot之Linux服务器安装jdk和tomcat【建议收藏】

我是用的xshell连接的云服务器&#xff0c;今天想在服务器上面部署一个前后端分离【springbootvue】项目&#xff0c;打开我的云服务器才发现&#xff0c;过期了&#xff0c;然后又买了一个&#xff0c;里面环境啥都没有&#xff0c;正好出一期教程&#xff0c;方便大家也方便自…

大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——ReduceTask工作机制

1、ReduceTask工作机制 ReduceTask工作机制&#xff0c;如下图所示。 &#xff08;1&#xff09;Copy阶段&#xff1a;ReduceTask从各个MapTask上远程拷贝一片数据&#xff0c;并针对某一片数据&#xff0c;如果其大小超过一定阈值&#xff0c;则写到磁盘上&#xff0c;否则直…

DHTMLX Suite 8.0.0 Crack

适用于现代 Web 应用程序的强大 JavaScript 小部件库 - DHTMLX 套件 用于创建现代用户界面的轻量级、快速且通用的 JavaScript/HTML5 UI 小部件库。 DHTMLX Suite 有助于推进 Web 开发和构建具有丰富功能的数据密集型应用程序。 DHTMLX Suite 是一个 UI 小部件库&#xff0c;用…

指针数组和数组指针的区别

数组指针&#xff08;也称行指针&#xff09;定义 int (*p)[n];()优先级高&#xff0c;首先说明p是一个指针&#xff0c;指向一个整型的一维数组&#xff0c;这个一维数组的长度是n&#xff0c;也可以说是p的步长。也就是说执行p1时&#xff0c;p要跨过n个整型数据的长度。如要…

【前端】JavaScript构造函数

文章目录概念执行过程返回值原型与constructor继承方式原型链其他继承方式&#xff08;还没写&#xff09;参考概念 在JS中&#xff0c;通过new来实例化对象的函数叫构造函数。实例化对象&#xff0c;也就是初始化一个实例对象。构造函数一般首字母大写。 构造函数的目的&…

Android性能调优 - 启动优化

一、APP启动优化1、 你对 APP 的启动有过研究吗? 有做过相关的启动优化吗?程序员&#xff1a;之前做项目的时候&#xff0c;我发现程序在冷启动时&#xff0c;会有 1s 左右的白屏闪现&#xff0c;低版本是黑屏的现象&#xff0c;在这期间我通过翻阅系统主题源码&#xff0c;发…

26 openEuler管理网络-使用ip命令配置网络

文章目录26 openEuler管理网络-使用ip命令配置网络26.1 配置IP地址26.1.1 配置静态地址26.1.2 配置多个地址26.2 配置静态路由26 openEuler管理网络-使用ip命令配置网络 说明&#xff1a; 使用ip命令配置的网络配置可以立即生效但系统重启后配置会丢失。 26.1 配置IP地址 使用…

JVM - G1垃圾收集器深入剖析

​​​​​​​1、G1收集器概述 HotSpot团队一直努力朝着高效收集、减少停顿(STW: Stop The World)的方向努力&#xff0c;也贡献了从串行Serial收集器、到并行收集器Parallerl收集器&#xff0c;再到CMS并发收集器&#xff0c;乃至如今的G1在内的一系列优秀的垃圾收集器。 G…

ER图、ERD图

ER图、ERD图1. 什么是ERD1.1 举例2. ERD符号指南2.1 实体2.2 属性2.3 主键2.4 外键2.4 关系2.5 基数2.5.1 一对一的基数的例子2.5.2 一对多的基数的例子2.5.3 多对多的基数的例子3.概念、逻辑和物理数据模型3.1 概念数据模型3.2 逻辑数据模型3.3 物理数据模型4.如何绘制ER图?5…

python的装饰器与设计模式中的装饰器模式

相信很多人在初次接触python中的装饰器时&#xff0c;会跟我一样有个疑问&#xff0c;这跟设计模式中的装饰器模式有什么区别吗&#xff1f;本质上是一样的&#xff0c;都是对现有对象&#xff0c;包括函数或者类的一种扩展。这篇文档将进行对比分析。 python的装饰器 装饰器…

Acwing 蓝桥杯 第一章 递归与递推

我上周在干什么&#xff0c;感觉我上周啥也没训&#xff0c;本来两天一次的vp也没v很寄啊&#xff0c;再这样下去真不行了先总结一下如何爆搜&#xff1a;先去确定好枚举的对象枚举的对象很重要&#xff01;&#xff01;这直接影响了复杂度然后就是去想递归树就好了一、确定状态…