Apache Kafka - ConsumerInterceptor 实战 (1)

news2024/11/19 22:53:57

文章目录

  • 概述
  • 使用场景
  • 实战
    • 配置文件
    • 配置类
    • 自定义ConSumerInterceptor
    • 使用

在这里插入图片描述


概述

ConsumerInterceptor是Kafka中的一个重要组件,它允许开发人员在Kafka消费者端拦截和修改消息的处理过程。ConsumerInterceptor可以用于实现各种功能,从消息监控到数据转换和错误处理,为开发人员提供了更大的灵活性和可定制性。

ConsumerInterceptor的主要作用是在消息被消费之前和之后对其进行拦截和处理。它可以用于以下几个方面:

  1. 监控:通过ConsumerInterceptor,可以在消息被消费之前和之后记录和监控消息的元数据,例如消息的偏移量、主题、分区等信息。这对于跟踪和分析消息流的健康状况以及性能优化非常有用。

  2. 转换:ConsumerInterceptor还可以用于对消息进行转换和修改。通过拦截消息并对其进行操作,可以在消费者端对消息进行格式转换、数据解析或者其他自定义处理。例如,你可以将消息从一种格式转换为另一种格式,或者对消息进行特定的业务处理。

  3. 错误处理:当消费者在处理消息时发生错误或异常情况时,ConsumerInterceptor可以捕获这些错误并采取适当的措施。你可以在拦截器中实现自定义的错误处理逻辑,例如记录错误日志、发送告警通知或者进行重试操作,从而提高应用程序的可靠性和容错性。


使用场景

使用场景方面,ConsumerInterceptor可以在多种情况下发挥作用,例如:

  1. 监控和统计:你可以使用ConsumerInterceptor来收集和记录消费者端的统计信息,例如消费速率、处理延迟等。这样可以帮助你监控应用程序的性能并进行性能优化。

  2. 数据转换:如果你需要将消息从一种格式转换为另一种格式,例如将JSON消息转换为Avro格式,你可以使用ConsumerInterceptor来实现这个转换过程。

  3. 数据验证:ConsumerInterceptor可以用于验证消息的有效性和完整性。你可以在拦截器中实现验证逻辑,例如检查消息的签名或者校验消息的结构,以确保只有符合要求的消息被消费。

  4. 错误处理和重试:当消费者在处理消息时遇到错误,例如数据库连接失败或者网络故障,你可以使用ConsumerInterceptor来捕获这些错误并采取适当的措施。你可以在拦截器中实现自定义的错误处理逻辑,例如记录错误日志、发送告警通知或者进行消息重试。

总之,ConsumerInterceptor为开发人员提供了在消费者端对消息进行拦截、处理和定制的能力。通过使用ConsumerInterceptor,你可以实现一系列功能,包括监控、数据转换和错误处理,从而更好地控制和管理Kafka消费者端的消息处理过程。


实战

在这里插入图片描述

配置文件

spring:
  kafka:
    bootstrap-servers: 20.10.110.137:9888 # Kafka服务的地址
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器
      value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器
      acks: 1 # acks=0:表示producer不需要等待任何确认收到的信息。副本将立即加到socket缓冲区并认为已经发送。如果使用此选项,则存在丢失数据的风险,因为服务器在数据到达副本之前可能会崩溃。
      retries: 0 # 失败重试次数,0表示不启用重试机制
      batch-size: 16384 # 发送缓冲区大小,按照字节计算
      linger-ms: 1 # 发送延时,单位毫秒
      buffer-memory: 33554432 # 发送缓存区的大小,按照字节计算
      compression-type: gzip # 压缩类型,默认是none,可选snappy、gzip、lz4
    consumer:
      #Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
      auto-offset-reset: earliest
      #是否开启自动提交
      enable-auto-commit: false
      #key的解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #value的解码方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #消费者组groupid
      group-id: process-group
      #消费者最大拉取的消息数量
      max-poll-records: 2000
      #消费者最大等待时间
      max-poll-interval-ms: 2000
    listener:
      type: batch
      ack-mode: manual # 手动提交
      concurrency: 12 # 并发数

配置类

package net.zf.module.system.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * @author artisan
 */
@Slf4j
@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServer;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;

    @Value("${spring.kafka.consumer.group-id}")
    private String group_id;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;

    @Value("${spring.kafka.consumer.max-poll-interval-ms}")
    private String maxPollIntervalMs;

    @Value("${spring.kafka.listener.concurrency}")
    private Integer concurrency;

    private final String consumerInterceptor = "net.zf.module.system.kafka.interceptor.FailureRateInterceptor";


    /**
     * 消费者配置信息
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>(32);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,maxPollIntervalMs);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,group_id);
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,consumerInterceptor );
        return props;
    }


    /**
     * 消费者批量工厂
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setBatchListener(true);
        factory.setConcurrency(concurrency);
        return factory;
    }




    /**
     * 异常处理器
     *
     * @return
     */
    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() {
        return (message, exception, consumer) -> {
//            log.error("消息{} , 异常原因{}", message, exception.getMessage());
            log.error("consumerAwareListenerErrorHandler called");

            return null;
        };
    }

}

这段代码是一个用于配置Kafka消费者的Spring配置类。它使用了Spring Kafka库来设置Kafka的消费者配置和相关的监听器。

以下是代码的主要部分的解释:

  1. 通过@Configuration注解将该类标记为一个Spring配置类。
  2. 使用@Value注解注入配置属性,这些属性来自于应用的配置文件(比如application.properties)。
  3. consumerConfigs()方法创建了一个包含Kafka消费者配置信息的props对象,并将其返回。这些配置包括Kafka服务器地址、消费者组ID、序列化/反序列化类等。
  4. batchFactory()方法创建了一个ConcurrentKafkaListenerContainerFactory对象,并设置了相关的属性。它使用了前面定义的消费者配置,并设置了批量消费和并发处理的参数。
  5. consumerAwareListenerErrorHandler()方法创建了一个ConsumerAwareListenerErrorHandler对象,用于处理消费过程中出现的异常。在这个例子中,它只是打印了错误日志。

总体而言,这段代码的目的是配置Kafka消费者的相关属性,包括连接到Kafka服务器的配置、消费者组ID、序列化/反序列化类等。它还定义了一个批量消费的监听器工厂和一个异常处理器。这些配置可以通过注入KafkaListenerContainerFactoryConsumerAwareListenerErrorHandler来在应用中使用。


自定义ConSumerInterceptor

package net.zf.module.system.kafka.interceptor;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.stereotype.Component;

import java.util.Map;


/**
 * @author artisan
 */

@Slf4j
@Component
public class FailureRateInterceptor implements ConsumerInterceptor<Object, Object> {

    /**
     * 消息消费前的拦截处理
     *
     * @param consumerRecords
     * @return
     */
    @Override
    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> consumerRecords) {
        // TODO
        log.info("FailureRateInterceptor#onConsume");

        // 根据设定的规则计算失败率,并进行判断是否跳过消息的消费
        // 返回ConsumerRecords对象, 继续执行下游的消费逻辑或者直接返回空的ConsumerRecords对象

        return consumerRecords;
    }

    /**
     * 消息提交前进行拦截处理
     *
     * @param map
     */
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        log.info("FailureRateInterceptor#onCommit");
    }


    /**
     * 拦截器关闭前进行拦截处理(如果有的话)
     */
    @Override
    public void close() {
        log.info("FailureRateInterceptor#close");
    }

    /**
     * 初始化配置(如果有的话)
     *
     * @param map
     */
    @Override
    public void configure(Map<String, ?> map) {
        log.info("FailureRateInterceptor#configure");
    }
}

onConsume 可以控制 ConsumerRecords, 通过返回null ,可以暂停消费。

这段代码是一个自定义的Kafka消费者拦截器,实现了ConsumerInterceptor接口。拦截器可以在消息消费和提交的过程中插入自定义的逻辑,用于处理消息或拦截操作。

以下是代码的主要部分的解释:

  1. @Slf4j注解用于自动生成日志记录器。
  2. @Component注解将该类标记为Spring组件,使得它可以被自动扫描和注入到应用中。
  3. 实现了ConsumerInterceptor接口,并重写了其中的方法。
    • onConsume()方法在消费者消费消息之前被调用。在这个例子中,它只是打印了日志信息,表示拦截器的执行。
    • onCommit()方法在消息提交之前被调用。在这个例子中,它只是打印了日志信息,表示拦截器的执行。
    • close()方法在拦截器关闭之前被调用。在这个例子中,它只是打印了日志信息,表示拦截器的执行。
    • configure()方法在拦截器初始化配置时被调用。在这个例子中,它只是打印了日志信息,表示拦截器的执行。
  4. 拦截器的具体逻辑还没有实现,而是用// TODO标记了需要填充的部分。根据注释的描述,它可能会根据设定的规则计算消费失败率,并根据判断跳过或继续消费消息。

总体而言,这段代码定义了一个自定义的Kafka消费者拦截器。拦截器可以在消息消费和提交的过程中执行自定义的逻辑。在这个例子中,拦截器的逻辑还没有实现,只是打印了日志信息以表示拦截器的执行。你需要根据需求实现onConsume()方法中的拦截逻辑,以便根据设定的规则处理消息消费的失败率。


使用

package net.zf.module.system.kafka.consumer;

import lombok.extern.slf4j.Slf4j;
import net.zf.module.system.entity.AttackMessage;
import net.zf.module.system.executors.factory.MessageExecutorFactory;
import net.zf.module.system.service.es.AttackMessageESService;
import net.zf.module.system.util.constants.KafkaTopicConstant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

/**
 * @author artisan
 */
@Component
@Slf4j
public class AttackKafkaConsumer {

    @Autowired
    private MessageExecutorFactory messageExecutorFactory;

    @Autowired
    private AttackMessageESService attackMessageESService;


    @KafkaListener(topicPattern = KafkaTopicConstant.ATTACK_MESSAGE + ".*",
            containerFactory = "batchFactory",
            errorHandler = "consumerAwareListenerErrorHandler")
    public void processMessage(List<String> records, Acknowledgment ack)  {
        log.info("AttackKafkaConsumer 当前线程 {} , 本次拉取的数据总量:{} ", Thread.currentThread().getId(), records.size());
        try {
            List<AttackMessage> attackMessages = new ArrayList();
            records.stream().forEach(record -> {
                messageExecutorFactory.process(KafkaTopicConstant.ATTACK_MESSAGE).execute(record, attackMessages);
            });
            if (!attackMessages.isEmpty()) {
                String response = attackMessageESService.addDocuments(attackMessages, false);
                log.info("AttackKafkaConsumer本次处理的数据总量:{}, 响应结果: {}", attackMessages.size(), response);
            }
        } finally {
            ack.acknowledge();
        }
    }
}

这段代码定义了一个名为AttackKafkaConsumer的类,它是一个Kafka消费者。它使用了Spring Kafka提供的@KafkaListener注解来指定消费者的相关配置。

以下是代码的主要部分的解释:

  1. @Component注解将该类标记为Spring组件,使得它可以被自动扫描和注入到应用中。
  2. @Slf4j注解用于自动生成日志记录器。
  3. AttackKafkaConsumer类中注入了MessageExecutorFactoryAttackMessageESService两个依赖,通过@Autowired注解实现自动注入。
  4. @KafkaListener注解标记了processMessage()方法作为Kafka消费者的消息处理方法。
    • topicPattern属性指定了要监听的Kafka主题的模式,使用了常量KafkaTopicConstant.ATTACK_MESSAGE并结合通配符.*
    • containerFactory属性指定了用于创建Kafka监听容器的工厂bean的名称,使用了名为batchFactory的工厂。
    • errorHandler属性指定了用于处理消费者异常的错误处理器的bean的名称,使用了名为consumerAwareListenerErrorHandler的错误处理器。
  5. processMessage()方法是消息的实际处理逻辑。它接收一个List<String>类型的消息记录和一个Acknowledgment对象作为参数。
    • 首先,它记录了当前线程ID和本次拉取的数据总量的日志信息。
    • 然后,它创建了一个空的AttackMessage列表,用于存储处理后的消息。
    • 使用records.stream().forEach()遍历每条消息记录,并通过messageExecutorFactory调用process()方法来处理每条记录,同时将处理结果添加到attackMessages列表中。
    • 在处理完所有消息后,如果attackMessages列表不为空,将调用attackMessageESServiceaddDocuments()方法将消息添加到Elasticsearch中,并记录处理的数据总量和响应结果的日志信息。
    • 最后,在finally块中调用ack.acknowledge()手动确认消费完成。

总体而言,这段代码定义了一个Kafka消费者类AttackKafkaConsumer,并使用@KafkaListener注解指定了监听的主题、容器工厂和错误处理器。processMessage()方法是处理消息的具体逻辑,它遍历消息记录并调用适当的执行器进行处理,最后将处理结果添加到列表中,并通过Elasticsearch服务将消息存储到数据库中。消费完成后,手动确认消息的消费。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/568969.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Python开发】FastAPI 01:hello world

FastAPI 是一个轻量级的后端框架&#xff0c;对于定制化不高或者功能简单的业务完全可以拿他作为后台。 FastAPI 一个比较重要的特性就是异步&#xff0c;简单来说就是相比 django 和 flask 快&#xff0c;FastAPI 和 flask 的语法类似。本篇文章介绍 FastAPI 运用的简单实例&a…

从索引结点出发探索软、硬链接

索引结点的初步认识 对于整个计算机系统的资源管理&#xff0c;我们可以认为&#xff0c;OS先将这些资源的数据信息&#xff0c;给描述起来构成一个部分&#xff0c;然后再将它们组织起来&#xff0c;就能够实现由OS集中管理。举一个最经典的例子&#xff0c;进程的引入是为了…

Cos上传(腾讯云):图片存储方案

Cos上传(腾讯云) 01.图片存储方案介绍 目标 了解主流的图片存储方案 两种常见方案 方案一&#xff1a;存到自己公司购买的服务器上 优点&#xff1a;好控制 缺点&#xff1a;成本高由于图片都存放到自己的服务器上&#xff0c;占据空间很大 方案二&#xff1a;存到三方…

总结SpringBoot常用读取配置文件的3种方法

文章目录 1、使用 Value 读取配置文件2、 使用 ConfigurationProperties 读取配置文件3、读取配置文件中的List 1、使用 Value 读取配置文件 注&#xff1a;这种方法适用于对象的参数比较少的情况 使用方法&#xff1a; 在类上添加configuration注解&#xff0c;将这个配置对…

Hotbit交易平台停运,百万用户待清退,币圈危机再度蔓延

“币圈”的危机似乎还没有走到尽头。5月22日&#xff0c;加密货币交易平台Hotbit发文宣布&#xff0c;决定从世界标准时间当日4:00停止所有CEX&#xff08;中心化交易所&#xff09;操作&#xff0c;希望所有用户在6月21日4:00之前提取剩余资产。据悉&#xff0c;该平台在其任期…

微前端乾坤

1. 乾坤 简介 qiankun 是一个基于 single-spa 的微前端实现库&#xff0c;旨在帮助大家能更简单、无痛的构建一个生产可用微前端架构系统 官网&#xff1a;https://qiankun.umijs.org/zh/guide 2.使用 背景&#xff1a; vue2.0 , vue-cli 5.0 主应用&#xff1a; 安装乾坤…

基于上下文折扣的多模态医学图像分割证据融合

文章目录 Evidence Fusion with Contextual Discounting for Multi-modality Medical Image Segmentation摘要本文方法Evidential SegmentationMulti-modality Evidence FusionDiscounted Dice Loss 实验结果 Evidence Fusion with Contextual Discounting for Multi-modality …

利用PaddleOCR识别增值税发票平台验证码(开箱即用)

前言:全国增值税发票查验平台验证码没什么好说的,根据指定的颜色识别验证码中的文字,图片如下 下面直接讲解利用paddleocr识别的思路,为什么使用paddleocr,因为paddle中集成了较好的ocr文字识别模型,开箱即用即可,废话不多说,剑指主题,识别思路步骤如下 步骤如下 1、…

BI技巧丨度量值的动态格式字符串

2023年4月版本新增了度量值的动态格式字符串功能。从字面上看可能小伙伴们不是很理解这个功能的用途&#xff0c;这里白茶给大家解释一下。 通俗一点来说&#xff0c;就是可以在数值中加入文本&#xff0c;将其转化为字符串&#xff0c;而不改变其原有的数据类型。 看到这里&…

Java调用第三方库JNA(C/C++)

GitHub - java-native-access/jna: Java Native Access 源代码 在Java 中使用C语言库的传统做法是使用JNI编程。但是现在有更好的替代方案&#xff0c;即JNA(Java Native Access)&#xff1b;JNA是一个开源的Java框架,是SUN公司推出的调用本地库方法的技术&#xff0c;是建立在…

传染病学模型 | Matlab实现SEIRS传染病学模型 (SEIRS Epidemic Model)

文章目录 效果一览基本描述模型介绍程序设计参考资料效果一览 基本描述 传染病学模型 | Matlab实现SEIRS传染病学模型 (SEIRS Epidemic Model) 模型介绍 SEIRS是一种基于计算机模拟的传染病学模型,用于研究人群中传染病的传播和控制。与其他传染病学模型不同,SEIRS模型考虑了…

第二章.­ Learning to Answer Yes­_No

第二章. Learning to Answer Yes_No 2.1 Perceptron Hypothesis Set 1.机器学习流程图&#xff1a; 在机器学习的整个流程中&#xff0c;模型的选择(Hypothesis Set)是非常重要的&#xff0c;它决定了机器学习的最终效果。 2.常用的机器学习模型——感知机&#xff08;Percep…

L2-001 紧急救援(dijkstra算法练习)

作为一个城市的应急救援队伍的负责人&#xff0c;你有一张特殊的全国地图。在地图上显示有多个分散的城市和一些连接城市的快速道路。每个城市的救援队数量和每一条连接两个城市的快速道路长度都标在地图上。当其他城市有紧急求助电话给你的时候&#xff0c;你的任务是带领你的…

Android 12系统源码_窗口管理(二)WindowManager对窗口的管理过程

前言 上一篇我们具体分析了窗口管理者WindowManagerService的启动流程&#xff0c;对于WindowManagerService有了一个初步的认识。在此基础上&#xff0c;我本打算应该进一步分析WindowManagerService是如何管理系统中的各种窗口的&#xff0c;然而由于Android系统的架构设计&…

如何搭建远程服务器-(cpolar)

文章目录 前言一、安装注册下载安装包认证开通指定端口监听开机自启动设置 二、使用步骤电脑端远程手机端远程 三、卸载软件安装说明&#xff1a; 总结 前言 之前已经有写到一篇文章《如何用树莓派搭建远程服务器 (zerotier)》&#xff0c;对此已经使用了很长一段时间。 优点…

MySQL 事务(w字)

目录 事务 首先我们来看一个简单的问题 什么是事务 为什么会出现事务 事务的版本支持 事务提交方式 事务常见操作方式 设置隔离级别 事物操作 事物结论 事务隔离级别 理解隔离性 隔离级别 查看与设置隔离性 注意可重复读【Repeatable Read】的可能问题&#xff…

AI数字人盛行,如何选择合适的AI数字人制作平台?

2023万象大会已然开启了直播&#xff0c;当AI照进生活、照亮你我&#xff0c;为我们的想象力插上翅膀&#xff0c;世界变得更加便捷、更加智能。可以说近年来&#xff0c;AI帮助人们解决了各种问题&#xff0c;在提高生产效率、改善生活质量等方面做出来很大的贡献&#xff0c;…

LeetCode: 二叉树的直径(java)

二叉树的直径 leetcode 543题。原题链接题目描述解题代码二叉树专题 leetcode 543题。原题链接 543题&#xff1a;二叉树的直径 题目描述 给你一棵二叉树的根节点&#xff0c;返回该树的 直径 。 二叉树的 直径 是指树中任意两个节点之间最长路径的 长度 。这条路径可能经过也…

WICC · 出海嘉年华|嘉宾就位、话题揭晓,峰会 派对报名倒计时

双厨狂喜&#xff01;移步【融云全球互联网通信云】回复“地图”免费领 6 月 2 日即将在广州举办的“WICC 社交泛娱乐出海嘉年华”&#xff0c;将是一场集 WICC 通信行业大会高端峰会规格、前沿技术内容和社交泛娱乐出海务实场景落地、垂直圈子社交于一体的大型盛会。 大咖嘉…

【弹性分布式EMA】在智能电网中DoS攻击和虚假数据注入攻击(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…