RabbitMQ 学习整理1 - 基础使用

news2025/3/27 13:05:10

项目代码:RabbitMQDemo: 学习RabbitMQ的一些整理

基本概念

  1. RabbitMQ是一种基于AMQP协议的消息队列实现框架
  2. RabbitMQ可以用于在系统与系统之间或者微服务节点之间,进行消息缓存,消息广播,消息分配以及限流消峰处理
  3. RabbitMQ-Server本身是一个可独立运行的第三方服务,开发者并不需要单独维护,相当于一个虚拟主机存储了RabbitMQ使用过程中所需要的各种变量和对象,同时也提供了UI级别的操作接口

关于消息队列的特点

形象一点:理解成快递驿站,快递员投放,客人取货,快递站暂存

  1. 生产者发起消费者订阅:生产者主动将消息发送到消息队列,消费者再从队列中获取消息。与HTTP请求消费者向生产者发起请求的数据流向相反。
  2. 异步而非同步:生产者推送消息到队列后生产者推逻辑已经结束,不再线程拥塞等待消费者接收消息后传回处理结果
  3. 推送而非调用:生产者只负责将信息推出去,消息怎么处理,有怎样的回应,不是他考虑的事,也不是RabbitMQ中间件需要考虑的事情,类似于广播机制

RabbitMQ核心概念

  1. 生产者:产生数据发送消息的程序是生产者。
  2. 交换机:交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个是由交换机类型决定的。
  3. 队列:队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。
  4. 消费者:消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

安装与控制台

  1. 安装Erlang:http://www.erlang.org/download,目前版本最好要求26 +
  2. 安装RabbitMQ服务端:http://www.rabbitmq.com/download.html ,需要梯子
  3. 启动可视化界面进行管理:
    1. 进入安装目录,rabbitmq_server-3.4.1\sbin,执行rabbitmq-plugins enable rabbitmq_management
    2. 重启WIndows下的RabbitMQ服务
    3. 登录网址http://127.0.0.1:15672/,用户名密码:guest/guest

在这里插入图片描述

开发准备

不一定非要按此结构构建项目这么写,只是为了方便测试和练习

父项目POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.18</version>
    </parent>
    <packaging>pom</packaging>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
    </dependencies>
    <groupId>com</groupId>
    <artifactId>RabbitMQDemo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>RabbitMQDemo</name>
    <description>RabbitMQDemo</description>
    <properties>
        <java.version>8</java.version>
        <spring-boot.version>2.7.18</spring-boot.version>
        <spring-cloud-alibaba.version>2021.0.6.0</spring-cloud-alibaba.version>
        <spring.cloud.version>2021.0.6</spring.cloud.version>
        <hutol.version>5.5.7</hutol.version>
        <dubbo.version>3.2.14</dubbo.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <!-- 统一管理,配置在此模块下的,子模块要引入依赖必须声明groupId和artifactId,不需要声明版本-->
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.16.18</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring.cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${spring-cloud-alibaba.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>cn.hutool</groupId>
                <artifactId>hutool-all</artifactId>
                <version>5.8.16</version>
            </dependency>

        </dependencies>
    </dependencyManagement>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

公共模块

定义一些消费者和生产者公用的常量或者数据类型

在这里插入图片描述

消费者和生产者POM文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <artifactId>实际模块名</artifactId>
    <modelVersion>4.0.0</modelVersion>
    <version>1.0</version>
    <parent>
        <groupId>com</groupId>
        <artifactId>RabbitMQDemo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath>../../RabbitMQDemo</relativePath>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bootstrap</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
        </dependency>
		<!--公共基础包 -->
        <dependency>
            <groupId>com</groupId>
            <artifactId>common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
</project>

生产者测试接口

@RestController
@RequestMapping("/rabbitMQDemo/具体调用地址")
public class DirectProviderController {
    @Resource
    DirectProviderService directProviderService;
    @GetMapping("/send")
    public String send() {
        return  directProviderService.sendMsg(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
    }
}

其他代码

依据不同交换机类型案例专项给出

交换机类型

直连模式-单一消费者

即一个生产者–一个队列–一个消费者

主要步骤
  1. 生产者创建队列,交换机对象,实现交换机与队列的绑定,并将这些信息登记到RabbitMQ服务中
  2. 生产者指明目标交换机和交换机-队列绑定关系后,向RabbitMQ服务推送消息
  3. 消费者订阅队列,并从队列中获取消息

可以看到,一般情况下消费者不用和交换机打交道,只需要知道自己要订阅哪个队列即可

生产者实现
  1. 配置文件

    server:
      port: 8881
    spring:
      application:
        name: RabbitMQ-Direct-Provider
      rabbitmq:
        addresses: 127.0.0.1
        port: 5672
        username: guest
        password: guest
    
  2. 创建队列等内容

    @Configuration
    //这里只要创建了SpringBean对象即可,具体调用时后台会自动将这些对象注册到RabbitMQ-Sevrver中
    public class DirectConfiguration {
        @Bean
        public Queue rabbitmqDemoDirectQueue() {
            /**
             * 1、name:    队列名称
             * 2、durable: 是否持久化
             * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
             * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
             * */
            return new Queue(DirectRabbitMQTag.Direct_TOPIC, true, false, false);
        }
    
        @Bean
        public DirectExchange rabbitmqDemoDirectExchange() {
            //Direct交换机
            return new DirectExchange(DirectRabbitMQTag.Direct_EXCHANGE, true, false);
        }
    
        @Bean
        public Binding bindDirect() {
            //链式写法,绑定交换机和队列,并设置匹配键
            return BindingBuilder
                    //绑定队列
                    .bind(rabbitmqDemoDirectQueue())
                    //到交换机
                    .to(rabbitmqDemoDirectExchange())
                    //并设置匹配键
                    .with(DirectRabbitMQTag.Direct_ROUTING_KEY);
        }
    }
    
  3. 配置消息转换器(该步骤目的,请参考常见问题:消息序列化一节内容)

    @Configuration
    public class RestTemplateConfiguration {
    
        @Bean
        public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory)
        {
            
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
            rabbitTemplate.setConnectionFactory(connectionFactory);
            //设置开启Mandatory,才能触发回调函数
            rabbitTemplate.setMandatory(true);
            //这里配置在RabbitMQ传递消息时,将请求实体对象先转为JsonObject在进行投递
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            return rabbitTemplate;
        }
    }
    
  4. 发送消息逻辑

    @Service
    @Slf4j
    public class DirectProviderService {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        public String sendMsg(String msg) {
            String result = "OK";
            try {
                //采用自定义消息体
                DirectDto directDto = new DirectDto();
                directDto.setMsg(msg);
                directDto.setMsgId(UUID.randomUUID().toString());
                directDto.setMsgType("test");
                //利用RabbitTempate对象进行发送,比使用原生RabbitMQ对象更加简单
                //注意时异步的,消息发送到交换机后函数就执行完成了,至于消息有没有被谁解析,无从得知
                rabbitTemplate.convertAndSend(DirectRabbitMQTag.Direct_EXCHANGE, DirectRabbitMQTag.Direct_ROUTING_KEY, directDto);
            } catch (Exception e) {
                log.error(e.getMessage(), e);
                result = "FAIL";
            }
            return result;
        }
    }
    
  5. 检查步骤:登录RabbitMQ-Server查看队列和交换机是否创建
    在这里插入图片描述
    在这里插入图片描述

消费者实现
  1. 配置文件

    server:
      port: 8882
    spring:
      application:
        name: RabbitMQ-Direct-Client
      rabbitmq:
        addresses: 127.0.0.1
        port: 5672
        username: guest
        password: guest
    
  2. 创建消息转换器(该步骤目的,请参考常见问题:消息序列化一节内容)

    @Configuration
    public class RabbitMQCustomerConfiguration {
        @Bean
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            return factory;
        }
    }
    
  3. 创建订阅并解析

    @Component
    @Slf4j
    public class DirectReceiver {
    
        //对于消费者而言并不知道交换机编号或者交换机类型是什么,只关心队列名称,以便实现订阅
        @RabbitListener(queues = {DirectRabbitMQTag.Direct_TOPIC})
        public void process(Message message) {
            //这里采用默认Message对象接收RabbitMQ的消息,
            //因为消费者和生产者都做了Json正反序列化,因此从message.getBody()获取的二进制流可以直接转换为消息体对应的JsonObjectStr
            log.info("Received message: {}", new String(message.getBody()));
            //实际返回结果 Received message: {"msgId":"a6c22848-76db-4f75-b24c-f5c49a335d77","msg":"2025-03-20 10:58:40","msgType":"test"}
        }
    }
    
    

直连模式-多个消费者

  1. 即一个生产者–一个队列–一个消费者
  2. 对于一个队列绑定了多个消费者的情况,RabbitMQ会议轮询的方式,将生产者的消息依次按顺寻分别分发到所有的消费者中,其效果非常类似等权值负载均衡
生产者代码
  1. 新增一个服务接口,以便测试功能

    @Service
    @Slf4j
    public class DirectProviderService {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        public String sendMsg(String msg) {
            String result = "OK";
            try {
                DirectDto directDto = new DirectDto();
                directDto.setMsg(msg);
                directDto.setMsgId(UUID.randomUUID().toString());
                directDto.setMsgType("test");
                rabbitTemplate.convertAndSend(DirectRabbitMQTag.Direct_EXCHANGE, DirectRabbitMQTag.Direct_ROUTING_KEY, directDto);
            } catch (Exception e) {
                log.error(e.getMessage(), e);
                result = "FAIL";
            }
            return result;
        }
    
    	//新增功能,重复投递十个消息到队列,Controller层代码省略,正常调用即可
        public String sendMsg2() {
            for (int index = 0; index < 6; index++) {
                this.sendMsg("index is " + index);
            }
            return "OK";
        }
    }
    
消费者代码
  1. 复制一份上一节消费者代码,并让两者分别在8882,8883端口同时启动
  2. 注意一定要保证都订阅DirectRabbitMQTag.Direct_TOPIC队列
执行结果

在这里插入图片描述
在这里插入图片描述

扇出模式

  1. 一个生产者–多个队列-每个队列一个消费者
  2. 这种情况下,每个队列的消费者都会收到相同的消息,类似于广播机制
  3. 这种模式,交换机和队列没有Key绑定
生产者实现
  1. 配置文件

    server:
      port: 8883
    spring:
      application:
        name: RabbitMQ-Fanout-Provider
      rabbitmq:
        addresses: 127.0.0.1
        port: 5672
        username: guest
        password: guest
    
  2. 创建队列等内容

    package com.provider.configuration;
    
    import com.common.config.FanoutRabbitMQTag;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class FanoutConfiguration {
    
        @Bean
        public Queue fanoutExchangeQueue01() {
            //队列A
            return new Queue(FanoutRabbitMQTag.FANOUT_QUEUE_01, true, false, false);
        }
    
        @Bean
        public Queue fanoutExchangeQueue02() {
            //队列B
            return new Queue(FanoutRabbitMQTag.FANOUT_QUEUE_02, true, false, false);
        }
    
        @Bean
        public FanoutExchange rabbitmqDemoFanoutExchange() {
            //创建FanoutExchange类型交换机
            return new FanoutExchange(FanoutRabbitMQTag.FANOUT_EXCHANGE, true, false);
        }
    
        @Bean
        public Binding bindFanoutA() {
            //队列A绑定到FanoutExchange交换机,这里不需要提供队列和交换机KEY值
            return BindingBuilder.bind(this.fanoutExchangeQueue01()).to(this.rabbitmqDemoFanoutExchange());
        }
    
        @Bean
        public Binding bindFanoutB() {
            //队列B绑定到FanoutExchange交换机,这里不需要提供队列和交换机KEY值
            return BindingBuilder.bind(this.fanoutExchangeQueue02()).to(this.rabbitmqDemoFanoutExchange());
        }
    
    }
    
    
  3. 配置消息转换器

    与直连模式相同,不与累述

  4. 发送消息逻辑(controller层省略)

    @Service
    @Slf4j
    public class FanoutProviderService {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        public String sendMsg(String msg) {
            String result = "OK";
            try {
                String msgId = UUID.randomUUID() + "\t" + msg;
                //没有KEY值,第二个参数传一个“”即可
                rabbitTemplate.convertAndSend(FanoutRabbitMQTag.FANOUT_EXCHANGE, "", msgId);
            } catch (Exception e) {
                log.error(e.getMessage(), e);
                result = "FAIL";
            }
            return result;
        }
    }
    
    
消费者实现
  1. 配置文件

    server:
      port: 8884
    spring:
      application:
        name: RabbitMQ-Fanout-Client
      rabbitmq:
        addresses: 127.0.0.1
        port: 5672
        username: guest
        password: guest
    
  2. 创建消息转换器

    与直连模式相同,不与累述

  3. 创建订阅并解析

    @Component
    @Slf4j
    public class FanoutReceiver {
    	
        //注意:这里只订阅了队列1
        @RabbitListener(queues = {FanoutRabbitMQTag.FANOUT_QUEUE_01})
        public void process(String msg) {
            log.info("FanoutReceiver process message: {}", msg);
        }
    }
    
    
  4. 复制上述工程让其可以同时在8885启动,并订阅队列2

    @Component
    @Slf4j
    public class FanoutReceiver {
         //注意:这里只订阅了队列2
        @RabbitListener(queues = {FanoutRabbitMQTag.FANOUT_QUEUE_02})
        public void process(String msg) {
            log.info("FanoutReceiver process message: {}", msg);
        }
    }
    
执行结果
-- 8884,8885两个端口的服务均会收到相同的消息
FanoutReceiver process message: 3fdd6882-97f3-489d-9c45-79bb3c598821	2025-03-24 10:42:47

常见问题:消息序列化

问题描述

  1. RabbitMQ在推送消息到队列时,支持传递自定义bean对象,但默认会直接转为二进制流字符数组(btye[])
  2. 消费者在接收消息时,很有可能无法正确的将byte[]反序列化为bean对象,进而引发Failed to convert message异常
  3. 即便将自定义Bean对象类型抽出到公用模块,在有些版本中依然不能解决问题

解决策略

  1. 强制修改RabbitMQ在推送消息过程中的转换器不转化为二进制对象,具体步骤包括
    1. 配置生产者转化器对象,让RabbitMQ将Bean转化为Json在转化为二进制byte[]传递
    2. 配置消费者解析器对象,让RabbintMQ将byte[]转化为JsonStr,再自行按需转换为Bean类型

实现步骤

生产者配置
@Configuration
public class RestTemplateConfiguration {

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory)
    {
        //设置开启Mandatory,才能触发回调函数,⽆论消息推送结果怎么样都强制调⽤回调函数
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
}
生产者推送
@Service
@Slf4j
public class DirectProviderService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public String sendMsg(String msg) {
        String result = "OK";
        try {
            DirectDto directDto = new DirectDto();
            directDto.setMsg(msg);
            directDto.setMsgId(UUID.randomUUID().toString());
            directDto.setMsgType("test");
            rabbitTemplate.convertAndSend(DirectRabbitMQTag.Direct_EXCHANGE, DirectRabbitMQTag.Direct_ROUTING_KEY, directDto);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            result = "FAIL";
        }
        return result;
    }

}
消费者配置
@Configuration
public class RabbitMQCustomerConfiguration {

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
}
消费者解析
@Component
@Slf4j
public class DirectReceiver {

    @RabbitListener(queues = {DirectRabbitMQTag.Direct_TOPIC})
    public void process(Message message) {
        log.info("Received message: {}", new String(message.getBody()));
    }
}
//输出结果
//Received message: {"msgId":"a6c22848-76db-4f75-b24c-f5c49a335d77","msg":"2025-03-20 10:58:40","msgType":"test"}

其他说明

  1. 对于String,int,double这样的原子类型为消息体时
    1. 消费者和生产者不配置Json转换器,可以正常传输与解析
    2. 消费者和生产者配置了Json转换器,可以正常传输与解析,不会与Bean传输冲突

高级应用:手动确认

问题描述

  1. 默认情况下RabbitMQ采用自动签收模式,这种情况下【生产者】并不知道以下内容
    1. 消息是否正确的从生产者发送到了交换机:可能目标交换机被析构了,或者目标交换机明写错了
    2. 消息是否正确的从交换机投递到了指定队列:可能时交换机找不到队列了
  2. 因此生产者为了知道投递消息到交换机以及交换机推送到队列的情况,可以使用手工确认模式

其他说明

  1. 手工确认模式分为生产者修改和消费者修改,一般而言
    1. 生产者开启了手工确认,则消费者也要使用手工确认
    2. 生产者没有开启手工确认,消费者也可以使用手工确认,比如消费重试,死信队列等场景,但这种模式下,生产这无法知道投递状态
  2. 手工确认模式不会让生产者处理消费者在从队列拿到消息后处理失败的问题,即生产者最远只关心到消息有没有到队列,一旦到了队列后续再出错就不是生产者的职责范围了

生产者的修改内容

  1. 开启手工确认模式,即允许监听投递交换机失败和推送队列失败的回调
  2. 实现ConfirmCallback回调,处理投递到交换机失败的异常
  3. 实现ReturnsCallback回调,处理推送到队列失败的异常
修改Yml文件
server:
  port: 8881
spring:
  application:
    name: RabbitMQ-Direct-Provider
  rabbitmq:
    addresses: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    # 开启手工确认模式
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true # 消息投递失败返回客户端
实现回调
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        //必须开启此参数,否则Returns回调不会被触发
        rabbitTemplate.setMandatory(true);
        //设置回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息投递成功! {}" , correlationData);
            } else {
                log.error("消息投递失败! {} ", cause);
            }
        });
        rabbitTemplate.setReturnsCallback(returnedMessage -> log.info("ReturnedMessage: {}", returnedMessage));
        return rabbitTemplate;
}

消费者的修改内容

  1. 在手工模式下消费者需要处理以下两个内容,特别注意这两个过程和生产者的confim,returns回调没有关系

    永远不要企图让生产者去捕获或者处理消费者解析消息时的异常!!!!!!

    1. 从队列拿到消息后,若正常处理需要告知RabbitMQ(不是生产者)当前消息需要从队列中出队
    2. 若处理异常要告知RabbitMQ(不是生产者),当前消息时回到队列等待再次拿取,还是丢弃不要了
  2. 关于处理异常时采用了回到队列这种模式的后续处理与注意事项,请参看【消费重试】一节内容

解析工程配置
@Configuration
public class RabbitMQCustomerConfiguration {
    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //对于配置了自定义类型转换的场景,如果想开启手工确认,必须在这里配置,yml中配置不生效!!!
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
}

一定要注意,如果注册了自定义解析工厂,那么在Yml无论怎么配置listener.simple.acknowledge-mode还是listener.direct.acknowledge-mode都是不起效的!!!!

修改接收逻辑
@Component
@Slf4j
public class DirectReceiver {

    //对于手工签收,一定要使用Message对象,因此不建议搭配@RabbitHandler强制解析消息体类型,因此直接对解析方法增加@RabbitListener
    //Channel参数为RabbitMQ自动注入的,不需要额外关心
    @RabbitListener(queues = {DirectRabbitMQTag.Direct_TOPIC})
    public void process(Message message, Channel channel) throws IOException {
        //获取消息传递的标记
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //处理业务逻辑
            log.info("开始解析消息...");
            log.info(new JSONObject(new String(message.getBody())).toString());
            //模拟随机出错的可能
            int temp = 1 / RandomUtil.randomInt(0,2);
            // 手动签收:第一个参数:表示收到的标签, 第二个参数:表示是否执行批处理确认
            channel.basicAck(deliveryTag, true);
            log.info("签收成功...");
        } catch (Exception e) {
            log.error("处理异常{},退回队列 ", e.getMessage());
            //如果处理业务逻辑的时候发生了错误,则要拒绝签收
            //参数1 multiple 是否批量处理
            //参数2 requeue = true 表示将错误的消息,重新放回到队列中并且一般放置在队列头部的位置,等待重提取
            //参数2 requeue = false 表示将错误的消息,不放回队列,如果有死信队列则放入到死信队列,没有则丢弃
            channel.basicNack(deliveryTag, true,true);
        }
    }
}

测试结果

-- 生产者日志输出
2025-03-21 10:15:54.952  INFO 21220 --- [nectionFactory1] c.p.c.RestTemplateConfiguration          : 消息投递成功! null
-- 消费者输出
2025-03-21 10:15:02.996  INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : 开始解析消息...
2025-03-21 10:15:02.997  INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : {"msgId":"8427d093-5df0-4258-b5fa-ea0e139b3325","msg":"2025-03-21 10:15:02","msgType":"test"}
2025-03-21 10:15:02.997 ERROR 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : 处理异常,退回队列 / by zero
2025-03-21 10:15:03.011  INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : 开始解析消息...
2025-03-21 10:15:03.011  INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : {"msgId":"8427d093-5df0-4258-b5fa-ea0e139b3325","msg":"2025-03-21 10:15:02","msgType":"test"}
2025-03-21 10:15:03.012 ERROR 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : 处理异常,退回队列 / by zero
2025-03-21 10:15:03.013  INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : 开始解析消息...
2025-03-21 10:15:03.013  INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : {"msgId":"8427d093-5df0-4258-b5fa-ea0e139b3325","msg":"2025-03-21 10:15:02","msgType":"test"}
2025-03-21 10:15:03.014  INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : 签收成功...

测试结果说明

  1. 只要生产者投递消息到交换机就会有Confim回调(不论成功与否)
  2. 交换机若转发到队列无异常,则不会触发Returns回调
  3. 若客户端手动确认消息时选择了【重回队列】那么该消息又会立刻被消费者重新接收,直到该消息被正确处理并出队
  4. 显然,步骤3这种逻辑存在缺陷,如果消息体自身就是包含问题数据,则重回队列-重新接收步骤就会无限重复,关于此问题请参考【消费重试】一节内容

高级应用:消费重试

问题描述

  1. 如果消息体自身可能包含问题数据,或者解析过程中出现IO等运行时异常,导致消费者解析会出错时,一般有两种处理方式
    1. 接收消息,消息出队,捕获异常,直接处理:直接,简单,但容错率低
    2. 拒绝消息,消息回队,重新抓取,再次处理:容错率搞,但复杂
  2. 因为消费者处理过程出现异常存在随机性,因此方式2更为合理,
  3. 但如果处理过程必然出错,例如解析逻辑bug,脏数据等,则重回队列-重新接收步骤就会无限重复,进而拥塞系统造成运行瘫痪
  4. 所以,消费重试必须要有次数限制,也必须要有兜底策略

其他说明

  1. 消费重试是【消费者】对解析过程异常的处理,和【生产者】没有关系
  2. 不论采用【手工确认】还是【自动确认】都可以接入消费重试,只是接入和实现方式有所不同
    1. 自动确认实现便利,但不够灵活
    2. 手工确认实现复杂,但足够灵活
  3. RabbitMQ集成Springboot中有很多中消费重试的实现框架,比如利用retryTemplate,Channel,可以按照需要自由选择
  4. 关于兜底策略RabbitMQ推荐将出错的消息推送到【死信队列】进而在RabbitMQ中统一管理,实际开发过程中也可以选择或其他的实现方式
  5. 关于死信队列相关内容,可以参考【死信队列】一节内容

手工确认模式下的重试实现

  1. 这只是一种很简单的实现逻辑
  2. 关于自动模式下的自动处理,可以参考这篇博文,注意处理异常不捕获和yml配置即可:RabbitMQ重试-自动签收模式
  3. 注意:请参考【高级应用:手动确认-消费者的修改内容-解析工厂配置】一节内容,配置解析工厂,确保手工确认模式起效
@Component
@Slf4j
public class DirectReceiver {

    private final int maxRetryTimes = 3;

    //对于手工签收,一定要使用Message对象,因此不建议搭配@RabbitHandler强制解析消息体类型,因此直接对解析方法增加@RabbitListener
    //Channel参数为RabbitMQ自动注入的,不需要额外关心
    @RabbitListener(queues = {DirectRabbitMQTag.Direct_TOPIC})
    public void process(Message message, Channel channel) throws IOException {
        //获取消息传递的标记
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //处理业务逻辑
            log.info("开始解析消息...");
            log.info(new JSONObject(new String(message.getBody())).toString());
            //必然的解析出错
            int temp = 1 / 0;
            // 手动签收:第一个参数:表示收到的标签, 第二个参数:表示是否执行批处理确认
            channel.basicAck(deliveryTag, true);
            log.info("签收成功...");
        } catch (Exception e) {
            log.error("处理异常{},退回队列 ", e.getMessage());
            //如果处理业务逻辑的时候发生了错误,则要拒绝签收
            //参数1 multiple 是否批量处理
            //参数2 requeue = true 表示将错误的消息,重新放回到队列中并且一般放置在队列头部的位置,
            //参数2 requeue = false 表示将错误的消息,不放回队列,如果有死信队列则放入到死信队列,没有则丢弃
            //channel.basicNack(deliveryTag, true,true);
            this.handleRetry(message, channel, e);
        }
    }
    private void handleRetry(Message message, Channel channel, Exception processException) {
        try {
            Map<String, Object> headers = message.getMessageProperties().getHeaders();
            Long retryCount = (Long) headers.getOrDefault("retry-count", 0L);
            if (retryCount < maxRetryTimes) {
                //手动进行应答, 接收此消息,先让消息出队,false表示只确认当前者一条消息出队
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                //增加重试次数标记
                headers.put("retry-count", retryCount + 1);
                //重新发送消息发送到MQ中原有队列的【队尾】
                channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
                        message.getMessageProperties().getReceivedRoutingKey(),
                        new AMQP.BasicProperties().builder().contentType(message.getMessageProperties().getContentType()).headers(headers).build(),
                        message.getBody());
            } else{
                //兜底方案
                //参数2 requeue = false 表示将错误的消息,不放回队列,如果有死信队列则放入到死信队列,没有则丢弃
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);
                //这里也可以存入将错误的请求体或者具体处理异常信息,存储日志,redis或者数据库中,方便人工查阅
                log.error("重试次数超限,处理异常原因" , processException);
            }
        } catch (Exception e) {
            log.error("重试处理异常{}, ", e.getMessage());
        }
    }
}

测试结果

2025-03-21 15:17:36.489  INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : 开始解析消息...
2025-03-21 15:17:36.513  INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : {"msgId":"b7c0b46c-3c80-4de9-a06b-d9c2d92142d7","msg":"2025-03-21 15:17:36","msgType":"test"}
2025-03-21 15:17:36.514 ERROR 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : 处理异常/ by zero,退回队列 
2025-03-21 15:17:36.519  INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : 开始解析消息...
2025-03-21 15:17:36.520  INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : {"msgId":"b7c0b46c-3c80-4de9-a06b-d9c2d92142d7","msg":"2025-03-21 15:17:36","msgType":"test"}
2025-03-21 15:17:36.520 ERROR 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : 处理异常/ by zero,退回队列 
2025-03-21 15:17:36.521  INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : 开始解析消息...
2025-03-21 15:17:36.521  INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : {"msgId":"b7c0b46c-3c80-4de9-a06b-d9c2d92142d7","msg":"2025-03-21 15:17:36","msgType":"test"}
2025-03-21 15:17:36.521 ERROR 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : 处理异常/ by zero,退回队列 
2025-03-21 15:17:36.524  INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : 开始解析消息...
2025-03-21 15:17:36.524  INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : {"msgId":"b7c0b46c-3c80-4de9-a06b-d9c2d92142d7","msg":"2025-03-21 15:17:36","msgType":"test"}
2025-03-21 15:17:36.524 ERROR 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : 处理异常/ by zero,退回队列 
2025-03-21 15:17:36.528 ERROR 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : 重试次数超限,处理异常原因

java.lang.ArithmeticException: / by zero
	at com.customer.receiver.DirectReceiver.process(DirectReceiver.java:35) ~[classes/:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_321]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_321]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_321]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_321]

测试结果说明

  1. 解析逻辑必然出错,触发异常后,消费者重试了三次
  2. 最终触发兜底方法,拒绝签收消息,让消息丢失

高级应用:死信队列

问题描述

  1. 在消费者处理信息时,可能因为一些环境原因导致解析过程发生异常,而消费者自身没有特殊的异常处理机制(自动重试,自动丢弃,本地存储等)
  2. 这时可以将引发异常的消息,从原有队列中迁出,转投递到死信队列,加以缓存,方便运维人员后期查询或者分析错误消息,

补充说明

  1. 与普通队列的相同点:
    1. 死信队列也是一种队列,需要搭配交换机才能完成,接收消息
    2. 死信队列也可以又消费者监听,进而实现消息从死信队列中出队
  2. 与普通队列的不同点:
    1. 死信队列不单独存在,一般需要和原有的业务队列进行绑定
    2. 死信队列的消息,不直接由生产者显式的投递,而是在特定条件下由RabbitMQ-Client自动投递
  3. 一般死信队列功能需要和【手工确认】搭配使用

生产者实现

配置文件
server:
  port: 8881
spring:
  application:
    name: RabbitMQ-Direct-Dead-Provider
  rabbitmq:
    addresses: 127.0.0.1
    port: 5672
    username: guest
    password: guest
#   不起效 暂时不用配置
#    listener:
#      type: simple
#      simple:
#        default-requeue-rejected: false
#        acknowledge-mode: manual
创建队列等内容

重点!!!!!

@Configuration
public class DirectConfiguration {

    public static final String DIRECT_EXCHANGE = "direct-exchange";
    public static final String DIRECT_ROUTING_KEY = "direct-routing-key";
    public static final String DIRECT_QUEUE = "direct-queue";

    public static final String DEAD_QUEUE = "dead-queue";
    public static final String DEAD_ROUTING_KEY = "dead-routing-key";
    public static final String DEAD_EXCHANGE = "dead-exchange";

    @Bean
    public DirectExchange rabbitmqDeadExchange() {
        //定义死信交换机,类型为Direct
        return new DirectExchange(DEAD_EXCHANGE, true, false);
    }

    @Bean
    public DirectExchange rabbitmqDirectExchange() {
        //定义业务交换机,类型为Direct
        return new DirectExchange(DIRECT_EXCHANGE, true, false);
    }

    @Bean Queue rabbitmqDeadQueue() {
        //定义死信队列
        return new Queue(DEAD_QUEUE);
    }

    @Bean
    public Queue rabbitmqDirectQueue() {
        //定义业务队列,并将业务队列与死信队列绑定,以便RabbitMQ自动投递
        Map<String, Object> args = new HashMap<>(2);
        //x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        //x-message-ttl, 生命TLL,超过5s没有被消费消息,也会自动投递到死信队列
        args.put("x-message-ttl",5000);
        return QueueBuilder.durable(DIRECT_QUEUE).withArguments(args).build();
    }

    @Bean
    public Binding bindDirect() {
        //链式写法,绑定交换机和队列,并设置匹配键
        return BindingBuilder
                //绑定队列
                .bind(rabbitmqDirectQueue())
                //到交换机
                .to(rabbitmqDirectExchange())
                //并设置匹配键
                .with(DIRECT_ROUTING_KEY);
    }

    @Bean
    public Binding bindDirectDead() {
        //链式写法,绑定交换机和队列,并设置匹配键
        return BindingBuilder
                //绑定队列
                .bind(rabbitmqDeadQueue())
                //到交换机
                .to(rabbitmqDeadExchange())
                //并设置匹配键
                .with(DEAD_ROUTING_KEY);
    }
}

显然,死信队列和业务队列一样,都有队列申明,交换机申明,队列交换机绑定三个步骤,不同的是,业务队列申明时需要额外与死信队列进行绑定

配置消息转换器

不予累述

发送消息逻辑
@Service
@Slf4j
public class DirectProviderService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public String sendMsg(String msg) {
        String result = "OK";
        try {
            DirectDto directDto = new DirectDto();
            directDto.setMsg(msg);
            directDto.setMsgId(UUID.randomUUID().toString());
            directDto.setMsgType(""+RandomUtil.randomInt(0,3));
            rabbitTemplate.convertAndSend(DirectConfiguration.DIRECT_EXCHANGE, DirectConfiguration.DIRECT_ROUTING_KEY, directDto);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            result = "FAIL";
        }
        return result;
    }

}

消费者实现

配置文件
server:
  port: 8882
spring:
  application:
    name: RabbitMQ-Direct-Dead-Client
  rabbitmq:
    addresses: 127.0.0.1
    port: 5672
    username: guest
    password: guest
消费者配置

注意:请参考【高级应用:手动确认-消费者的修改内容-解析工厂配置】一节内容,配置解析工厂,确保手工确认模式起效

消费者解析-正常接收消息
@Component
@Slf4j
public class DirectReceiver {
	
    //手工模式确认
    @RabbitListener(queues = "direct-queue")
    public void receiveA(Message message, Channel channel) throws IOException {
        DirectDto directDto = new JSONObject(new String(message.getBody())).toBean(DirectDto.class);
        log.info("收到业务消息A, 准备处理:{}", directDto.toString());
        try{
            Thread.sleep(Integer.parseInt(directDto.getMsgType()));
        } catch (Exception e) {
           log.error(e.getMessage());
        }
        //这里模拟一下发生异常时,将消息投递到死信队列
        if (RandomUtil.randomBoolean()){
            log.error("消息消费发生异常");
            //参数2 requeue = false 表示将错误的消息,不放回队列,因为有死信队列,所以放入到死信队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } else {
            log.error("消息消费完成");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
}
消费者解析-消费死信队列
@Component
@Slf4j
public class DeadReceiver {

    @RabbitListener(queues = "dead-queue")
    public void receiveA(Message message, Channel channel) throws IOException {
        log.info("收到死信消息:{}", new String(message.getBody()));
        //这里实际把消息从私信队列中迁出了,所以运行后RabbitMQ-Server在该私信队列中并没有看到记录
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

运行结果
2025-03-24 14:56:22.939  INFO 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver     : 收到业务消息A, 准备处理:DirectDto(msgId=67742847-101d-4074-a664-1ad0aa41ea41, msg=2025-03-24 14:56:22, msgType=0)
2025-03-24 14:56:22.941 ERROR 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver     : 消息消费完成
2025-03-24 14:56:22.942  INFO 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver     : 收到业务消息A, 准备处理:DirectDto(msgId=4e8236fb-1ae7-4427-b855-474603037c78, msg=2025-03-24 14:56:22, msgType=0)
2025-03-24 14:56:22.942 ERROR 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver     : 消息消费发生异常
2025-03-24 14:56:22.943  INFO 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver     : 收到业务消息A, 准备处理:DirectDto(msgId=428faf5f-107d-4295-abf7-33c6ac5da0bc, msg=2025-03-24 14:56:22, msgType=0)
2025-03-24 14:56:22.943 ERROR 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver     : 消息消费完成
2025-03-24 14:56:22.944  INFO 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver     : 收到业务消息A, 准备处理:DirectDto(msgId=d9c4121a-6e74-4944-80b3-3289a9c2d35d, msg=2025-03-24 14:56:22, msgType=0)
2025-03-24 14:56:22.944 ERROR 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver     : 消息消费完成
代码说明
  1. 因为在正常解析步骤中,采用了channel.basicNack方法并指定了requeue=false,RabbitMQ就会自动去找direct-queue队列是否有绑定死信队列
  2. 因为direct-queue绑定了dead-queue,所以出错的消息会被自动投递到dead-queue中,并自动标记为Ready状态,等待消息投递与解析
  3. 而对于死信队列解析,示例中为了方便演示采用了channel.basicAck方法,让当前消息从死信队列中出对了,如果不调用此方法,死信队列就会将此消息记为Unacked状态一直保留在死信队列中,具体采取哪种处理方式,应由实际工作需要决定
  4. 至于死信队列中Unacked的状态的消息,要如何在运维环境下处理或者查阅,另起文注

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2321846.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分布式渲染与云渲染:技术与应用的黄金搭档

一、核心概念&#xff1a;先区分再关联 分布式渲染是通过多台设备并行计算拆分渲染任务的技术&#xff08;如将一帧拆分为 64 个小块&#xff0c;64 台电脑同时渲染&#xff09;&#xff1b; 云渲染是基于云计算的渲染服务&#xff0c;本质是分布式渲染的商业化落地—— 用户无…

【实战ES】实战 Elasticsearch:快速上手与深度实践-5.2.1 多字段权重控制(标题、品牌、类目)

&#x1f449; 点击关注不迷路 &#x1f449; 点击关注不迷路 &#x1f449; 点击关注不迷路 文章大纲 电商商品搜索实战&#xff1a;多字段权重控制策略1. 业务场景与核心挑战1.1 典型搜索问题1.2 权重失衡的影响数据 2. 权重控制核心方案2.1 字段权重分配矩阵2.2 多策略组合方…

如何避免测试数据准备不充分或不可复用

避免测试数据准备不充分或不可复用的关键方法包括明确数据需求、统一数据管理工具、建立数据复用机制、定期维护更新测试数据以及加强团队沟通与协作。 其中&#xff0c;统一数据管理工具对确保数据质量和复用性尤为重要。例如&#xff0c;许多团队采用专门的测试数据管理工具以…

使用AI一步一步实现若依(23)

功能23&#xff1a;从后端获取路由/菜单数据 功能22&#xff1a;用户管理 功能21&#xff1a;使用axios发送请求 功能20&#xff1a;使用分页插件 功能19&#xff1a;集成MyBatis-Plus 功能18&#xff1a;创建后端工程 功能17&#xff1a;菜单管理 功能16&#xff1a;角色管理…

第一天学爬虫

阅读提示&#xff1a;我今天才开始尝试爬虫&#xff0c;写的不好请见谅。 一、准备工具 requests库&#xff1a;发送HTTP请求并获取网页内容。BeautifulSoup库&#xff1a;解析HTML页面并提取数据。pandas库&#xff1a;保存抓取到的数据到CSV文件中。 二、爬取步骤 发送请求…

W、M、C练题笔记(持续更新中)

web here are the flag 点击&#xff0c;页面跳转404.php&#xff0c;用bp抓包访问/flag.php页面&#xff0c;得到flag用base64解码 TryToFindFlag 打开后查看源代码 发现是robots协议&#xff0c;访问robots.txt 访问flllaaa......&#xff0c;得到空白页面&#xff0c;查看…

CVE-2021-45232未授权接口练习笔记

CVE-2021-45232 是 Apache APISIX Dashboard 中的一个严重权限漏洞&#xff0c;类似于攻击者无需密码即可拿到整个网关系统的“万能钥匙”。攻击者利用此漏洞&#xff0c;可直接操控网关流量转发规则&#xff0c;甚至远程执行代码&#xff0c;引发服务器沦陷。 默认账户密码导致…

贪心算法——c#

贪心算法通俗解释 贪心算法是一种"每一步都选择当前最优解"的算法策略。它不关心全局是否最优&#xff0c;而是通过局部最优的累积来逼近最终解。优点是简单高效&#xff0c;缺点是可能无法得到全局最优解。 一句话秒懂 自动售货机找零钱&#xff1a;用最少数量的…

Retrofit中scalars转换html为字符串

简介 在Retrofit中&#xff0c;如果你想直接获取HTML或其他文本格式的响应内容而不是将其映射到一个模型类&#xff0c;ScalarsConverterFactory 就派上用场了。ScalarsConverterFactory 是一个转换器工厂&#xff0c;它能够将响应体转换为Java基本类型如String、Integer或Byte…

【微服务架构】SpringCloud(七):配置中心 Spring Cloud Config

文章目录 配置中心为什么需要配置中心配置中心介绍 服务搭建基于GITHUB1.创建仓库2.新建微服务作为配置中心服务3.启动测试拉取 匹配规则分支读取 客户端配置配置文件引入依赖使用远程配置 刷新配置手动配置热更新自动刷新erlang安装RabbitMQ安装环境变量管理界面服务配置测试 …

Linux学习笔记(应用篇二)

基于I.MX6ULL.MINI开发板 开发板与电脑相互通信电脑与开发板互传文件 开发板与电脑相互通信 用网线将电脑与开发板连接 本人使用的是Ubuntu系统&#xff0c;不是虚拟机 一般来说刚开始电脑和开发板是ping不通的 首先查看电脑的 IP WinR&#xff0c;cmd调出终端 我使用的是…

记录一次部署k3s后,服务404 page not found,nginx显示正常

服务部署k3s后&#xff0c;正常入口端怎么返回都是80&#xff0c;且返回错误 TRAEFIK DEFAULT CERT ERR_CERT_AUTHORITY_INVALID ngnix显示也是正常&#xff0c;怎么找也找不到问题 后来通过 iptables -L -n -t nat|grep 80 发现入口端流量被DNAT转到新的服务 而k3s中&#…

mac上安装nvm及nvm的基本语法使用!!

种一棵树&#xff0c;最好是十年前&#xff0c;其次是现在&#xff01;想要改变&#xff0c;从此刻开始&#xff0c;一切都不晚&#xff01; 目录 nvm是什么&#xff1f;前提条件&#xff1a;安装homebrew如果系统已经有node版本&#xff1a;在mac上安装nvm&#xff1a;用nvm安…

(基本常识)C++中const与引用——面试常问

作者&#xff1a;求一个demo 版权声明&#xff1a;著作权归作者所有&#xff0c;商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处 内容通俗易懂&#xff0c;没有废话&#xff0c;文章最后是面试常问内容&#xff08;建议通过标题目录学习&#xff09; 废话不多…

dfs(深度优先)——太抽象了

1. 两种方法 #include<bits/stdc.h> using namespace std; //void dfs(int index,int n,vector<int> current) //{ // if(index>n){ // for(int i0;i<current.size();i){ // cout<<current[i]<<" "; // } // cout<<endl;…

python --face_recognition(人脸识别,检测,特征提取,绘制鼻子,眼睛,嘴巴,眉毛)/活体检测

dlib 安装方法 之前博文 https://blog.csdn.net/weixin_44634704/article/details/141332644 环境: python3.8 opencv-python4.11.0.86 face_recognition1.3.0 dlib19.24.6人脸检测 import cv2 import face_recognition# 读取人脸图片 img cv2.imread(r"C:\Users\123\…

redis解决缓存穿透/击穿/雪崩

文章目录 1.缓存穿透1.1 概念1.2 解决方案1.2.1 缓存空对象1.2.2 布隆过滤 1.2 店铺查询使用缓存穿透解决方案1.2.1 流程 2.缓存雪崩2.1 什么是缓存雪崩&#xff1f;2.2 雪崩解决方案 3.缓存击穿3.1 什么是缓存击穿&#xff1f;3.2解决方案3.2.1 基于互斥锁解决缓存击穿问题&am…

《TCP/IP网络编程》学习笔记 | Chapter 22:重叠 I/O 模型

《TCP/IP网络编程》学习笔记 | Chapter 22&#xff1a;重叠 I/O 模型 《TCP/IP网络编程》学习笔记 | Chapter 22&#xff1a;重叠 I/O 模型理解重叠 I/O 模型重叠 I/O本章讨论的重叠 I/O 的重点不在于 I/O 创建重叠 I/O 套接字执行重叠 I/O 的 WSASend 函数进行重叠 I/O 的 WSA…

python每日十题(10)

在Python语言中&#xff0c;源文件的扩展名&#xff08;后缀名&#xff09;一般使用.py。 保留字&#xff0c;也称关键字&#xff0c;是指被编程语言内部定义并保留使用的标识符。Python 3.x有35个关键字&#xff0c;分别为&#xff1a;and&#xff0c;as&#xff0c;assert&am…

LabVIEW液压振动锤控制系统

在现代工程机械领域&#xff0c;液压振动锤的高效与精准控制日益显得重要。本文通过LabVIEW软件&#xff0c;展开液压振动锤启停共振控制技术的研究与应用&#xff0c;探讨如何通过改进控制系统来优化液压振动锤的工作性能&#xff0c;确保其在复杂工况下的稳定性与效率。 ​ …