RocketMQ-Producer

news2025/1/21 12:20:23

消息生产者的代码都在client模块中,相对于RocketMQ来讲,消息生产者就是客户端,也是消息的提供者。

启动流程

代码:DefaultMQProducerImpl#start

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
                this.checkConfig();
        // ...
        this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
        // ...
        //注册当前生产者到到MQClientInstance管理中,方便后续调用网路请求
        boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
        // ...
        // 启动生产者
        mQClientFactory.start();
        // ...
        
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        RequestFutureHolder.getInstance().startScheduledTask(this);
    }
}

整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表

ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String,MQClientInstance>();

同一个clientId只会创建一个MQClientInstance。

MQClientInstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道

org.apache.rocketmq.client.impl.MQClientManager#getOrCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)

消息发送

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

  1. 验证消息
    // 消息验证
    Validators.checkMessage(msg, this.defaultMQProducer);
    
    
        public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
            if (null == msg) {
                throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
            }
            // topic
            Validators.checkTopic(msg.getTopic());
            Validators.isNotAllowedSendTopic(msg.getTopic());
    
            // body
            if (null == msg.getBody()) {
                throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
            }
    
            if (0 == msg.getBody().length) {
                throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
            }
    
            if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
                throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
                    "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
            }
        }
  2. 查找路由
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    
        private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
            TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
            if (null == topicPublishInfo || !topicPublishInfo.ok()) {
                this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
            }
    
            if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
                return topicPublishInfo;
            } else {
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
                return topicPublishInfo;
            }
        }

     

    public class TopicPublishInfo {
        private boolean orderTopic = false;	//是否是顺序消息
        private boolean haveTopicRouterInfo = false; 
        private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();	//该主题消息队列
        private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();//每选择一次消息队列,该值+1
        private TopicRouteData topicRouteData;//关联Topic路由元信息
    }

     代码:MQClientInstance#updateTopicRouteInfoFromNameServer

    TopicRouteData topicRouteData;
    //使用默认主题从NameServer获取路由信息
                        if (isDefault && defaultMQProducer != null) {
                            topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(clientConfig.getMqClientApiTimeout());
                            if (topicRouteData != null) {
                                for (QueueData data : topicRouteData.getQueueDatas()) {
                                    int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                    data.setReadQueueNums(queueNums);
                                    data.setWriteQueueNums(queueNums);
                                }
                            }
                        } else {
        //使用指定主题从NameServer获取路由信息
                            topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
                        }
    if (topicRouteData != null) {
                            TopicRouteData old = this.topicRouteTable.get(topic);
                            boolean changed = topicRouteData.topicRouteDataChanged(old);
        //判断路由是否需要更改
                            if (!changed) {
                                changed = this.isNeedUpdateTopicRouteInfo(topic);
                            } else {
                                log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                            }
    if (changed) {
         // Update endpoint map
                                {
                                    ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
                                    if (!mqEndPoints.isEmpty()) {
                                        topicEndPointsTable.put(topic, mqEndPoints);
                                    }
                                }
    
                                // Update Pub info
                                {
                                    TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                    publishInfo.setHaveTopicRouterInfo(true);
                                    for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) {
                                        MQProducerInner impl = entry.getValue();
                                        if (impl != null) {
                                            impl.updateTopicPublishInfo(topic, publishInfo);
                                        }
                                    }
                                }
    
                                // Update sub info
                                if (!consumerTable.isEmpty()) {
                                    Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                    for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                                        MQConsumerInner impl = entry.getValue();
                                        if (impl != null) {
                                            impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                        }
                                    }
                                }
    }

选择队列

  •  默认不启用Broker故障延迟机制

代码:org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue

    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    //第一次选择队列
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
        //遍历消息队列集合
            for (int i = 0; i < this.messageQueueList.size(); i++) {
            //sendWhichQueue自增后取模
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = index % this.messageQueueList.size();
                MessageQueue mq = this.messageQueueList.get(pos);
            //规避上次Broker队列
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
        //如果以上情况都不满足,返回sendWhichQueue取模后的队列
            return selectOneMessageQueue();
        }
    }
  •  启用Broker故障延迟机制
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    //Broker故障延迟机制
    if (this.sendLatencyFaultEnable) {
        try {
            //对sendWhichQueue自增
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            //对消息队列轮询获取一个队列
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                //验证该队列是否可用
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    //可用
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }
			//从规避的Broker中选择一个可用的Broker
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            //获得Broker的写队列集合
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                //获得一个队列,指定broker和队列ID并返回
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();
    }

    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

 原理分析

代码:DefaultMQProducerImpl#sendDefaultImpl

endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);


    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        //计算broker规避的时长
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        //更新该FaultItem规避时长
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }


    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
    //获得原FaultItem
        FaultItem old = this.faultItemTable.get(name);
    //为空新建faultItem对象,设置规避时长和开始时间
        if (null == old) {
            final FaultItem faultItem = new FaultItem(name);
            faultItem.setCurrentLatency(currentLatency);
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
        //更新规避时长和开始时间
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }

        4.发送消息

消息发送API核心入口DefaultMQProducerImpl#sendKernelImpl

private SendResult sendKernelImpl(
    final Message msg,	//待发送消息
    final MessageQueue mq,	//消息发送队列
    final CommunicationMode communicationMode,		//消息发送内模式
    final SendCallback sendCallback,	pp	//异步消息回调函数
    final TopicPublishInfo topicPublishInfo,	//主题路由信息
    final long timeout	//超时时间
    )

 代码:DefaultMQProducerImpl#sendKernelImpl

        long beginStartTime = System.currentTimeMillis();
        String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
//获得broker网络地址信息
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
    //没有找到从NameServer更新broker网络地址信息
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
        }
//为消息分类唯一ID
if (!(msg instanceof MessageBatch)) {
    MessageClientIDSetter.setUniqID(msg);
}

boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
    topicWithNamespace = true;
}
//消息大小超过4K,启用消息压缩
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
    msgBodyCompressed = true;
}
//如果是事务消息,设置消息标记MessageSysFlag.TRANSACTION_PREPARED_TYPE
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
//如果注册了消息发送钩子函数,在执行消息发送前的增强逻辑
if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    context.setProducer(this);
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    context.setCommunicationMode(communicationMode);
    context.setBornHost(this.defaultMQProducer.getClientIP());
    context.setBrokerAddr(brokerAddr);
    context.setMessage(msg);
    context.setMq(mq);
    context.setNamespace(this.defaultMQProducer.getNamespace());
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (isTrans != null && isTrans.equals("true")) {
        context.setMsgType(MessageType.Trans_Msg_Half);
    }

    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
        context.setMsgType(MessageType.Delay_Msg);
    }
    this.executeSendMessageHookBefore(context);
}

代码:SendMessageHook

public interface SendMessageHook {
    String hookName();

    void sendMessageBefore(final SendMessageContext context);

    void sendMessageAfter(final SendMessageContext context);
}

 代码:DefaultMQProducerImpl#sendKernelImpl

//构建消息发送请求包
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
//生产者组
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
//主题
requestHeader.setTopic(msg.getTopic());
//默认创建主题Key
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
//该主题在单个Broker默认队列树
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
//队列ID
requestHeader.setQueueId(mq.getQueueId());
//消息系统标记
requestHeader.setSysFlag(sysFlag);
//消息发送时间
requestHeader.setBornTimestamp(System.currentTimeMillis());
//消息标记
requestHeader.setFlag(msg.getFlag());
//消息扩展信息
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
//消息重试次数
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
//是否是批量消息等
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
    if (reconsumeTimes != null) {
        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
    }

    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
    if (maxReconsumeTimes != null) {
        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
    }
}
//如果注册了钩子函数,则发送完毕后执行钩子函数
if (this.hasSendMessageHook()) {
    context.setSendResult(sendResult);
    this.executeSendMessageHookAfter(context);
}

2.3.4 批量消息发送

 2.4.7 刷盘机制

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/468978.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

轻松掌握消息队列RabbitMQ在SpringAMQP中的实践知识点

1、介绍 spring集成了rabbitmq&#xff0c;可以对rabbitmq先进行安装简单了解。参考我的rabbitmq文章。 2、使用 1、基本消息队列BasicQueue案例 一个消息消费者&#xff0c;上个消息未处理完&#xff0c;队列中的消息将阻塞&#xff0c;导致内存泄漏 1、引入AMQP依赖 2、添…

读书:《科技论文写作与发表教程(第6版)》

科技写作是指以符合标准格式的科技论文形式在科技期刊上陈述原创性的研究。 另外&#xff0c;还有广义上的科技写作。 科技写作的最主要特点是表达清晰。科技写作不需要漂亮的文学修饰&#xff0c;要把信息清楚地传递给读者。 IMRAD格式&#xff1a;Introduction Methods Re…

Redis常见问题整理

一、Redis使用场景相关问题 0. 单机版Redis部署 系统环境&#xff1a;CentOS7 1、下载Redis所需要的镜像 yum install -y gcc tcl2、下载redis安装包 mkdir /soft cd /soft wget https://download.redis.io/releases/redis-6.2.4.tar.gz3、解压缩&#xff1a; tar -xvf redi…

QT-DAY3

实现ui 字体、颜色、保存文件、打开文件 #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//字体按钮对应的槽函数 void Wid…

性能测试技术笔记:如何设计一个压测平台 ?

目录 为什么需要压测平台&#xff1f; 压测平台功能设计思路 压测平台技术实现方案 总结 为什么需要压测平台&#xff1f; 从实际工作场景出发&#xff0c;如果只有一两个人做性能测试工作&#xff0c;那其实没必要开发专门的压测平台&#xff0c;原因如下&#xff1a; 成…

【备战秋招】权限常见面试题

本文的面试题和答案均为本人自己收集&#xff0c;如有错误或者不足&#xff0c;欢迎大家指出 目录 你做过的系统&#xff0c;权限是怎么管理的 Shiro是靠什么做认证和授权的 什么是RBAC模型 如果让你手写一个Web过滤器验证权限&#xff0c;你会怎么写 Shiro的anon和authc都…

锅炉燃烧自动控制系统

串级控制 以外环温度、内环煤气流量为例 重点1 主PID输出0~100需要经过线性转换模块进行转换&#xff0c;转换成与煤气流量相同量纲才能作为副PID的设定值。假设煤气流量量程100000&#xff1b;则副PID设定值如下&#xff1a; secSET mainLMN/100*100000&#xff1b; 重点2…

创造者基金 2023 年 4 月亮点

隆重推出创作者基金的 2023 年 4 月亮点 NFT 系列——一系列令人着迷且令人惊叹的数字资产&#xff0c;让大家为之震撼&#xff01;该系列的角色令人惊叹&#xff0c;包括阿兹特克酋长、维京战士、残酷的国王、传奇的九尾狐等等&#xff0c;是收藏家和爱好者的必备之物。 无论你…

优化 | 随机原始对偶混合梯度(SPDHG)算法及在图像处理中的应用

论文解读者&#xff1a;陈宇文&#xff0c;胡明杰&#xff0c;史铭伟&#xff0c;赵田田 许多实际问题都可以建模为凸优化问题。相比于直接求解原问题&#xff0c;将问题转化为鞍点问题往往会带来好处。求解鞍点问题的一种常用算法是原对偶混合梯度算法 (PDHG)&#xff0c;它在…

ubuntu-server22.04编译Redis7.0.11源码支持TLS

1.克隆redis源码: git clone https://github.com/redis/redis.git 编译前确认已安装GCC11与G++11和cmake及make及pkg-config 安装命令如下: apt install gcc -y apt install g++ -y apt install cmake -y apt install pkg-config 因为要支持TLS所以要安装OPENSSL开发库 ap…

阿里云CPFS与OSS之间数据双向流动机制

随着云上对象存储成本的逐渐降低&#xff0c;越来越多的企业利用阿里云OSS存储他们的大量数据并构建数据湖。现在阿里云文件存储CPFS与对象存储OSS实现了深度集成&#xff0c;客户可以在三十分钟内创建一个高性能CPFS文件系统并链接到他们的OSS bucket。当链接到OSS bucket以后…

实验二 ROS结合OpenCV示例——人脸识别

ROS结合OpenCV示例——人脸识别 一、实验原理&#xff1a;二、实验步骤&#xff1a;<1> 安装opencv 以及串口功能包<2> 测试opencv串口是否安装成功 三、程序分析&#xff1a; 一、实验原理&#xff1a; Opencv库是一个基于BSD许可发行的跨平台开源计算机视觉库&a…

STM32WB55_NUCLEO开发(9)----接收手机数据点亮LED

概述 本篇文章主要介绍如何使用STM32CubeMX对生成STM32WB工程&#xff0c;并通过与STM32WB配对&#xff0c;向该特征写入一个任意字节&#xff0c;绿色LED会切换。 硬件准备 首先需要准备一个开发板&#xff0c;这里我准备的是NUCLEO-WB55RG 的开发板&#xff1a; 选择芯片…

加拿大各省接受公立教育的初始年龄汇总 — 供携子女赴加的访学、博后参考

近年来到加拿大从事访问学者和博士后研究的申请者日益增多&#xff0c;有些申请者想带孩子同去上公立学校。因为加拿大各省教育局政策有差异&#xff0c;所以入学&#xff08;包括学前班&#xff09;年龄不同&#xff0c;为此知识人网小编整理本文为大家解惑答疑。 加拿大为本国…

GitHub Actions自动发布Package到Pub.dev

一、创建package或plugin 先创建一个package或者plugin 二、手动上传第一个版本到pub.dev flutter packages pub publish --serverhttps://pub.dartlang.org 三、在admin配置自动化发布 打开pub.dev中的对应的package按照以下图片配置 四、在项目跟目录配置发布脚本 1、在…

XMLMapperBuilder解析*mapper.xml

springboot的MybatisAutoConfiguration自动配置类会创建SqlSessionFactory&#xff0c;创建过程就是填充configuration属性&#xff0c;调用buildSqlSessionFactory()方法完成SqlSessionFactory创建&#xff0c;这其中就会创建XMLMapperBuilder解析mapper.xml和XMLConfigBuilde…

UHD在DPDK下进行编译

1.安装choco windows环境,用管理员权限打开 powershell 命令行界面。 输入命令:Set-ExecutionPolicy AllSigned 继续输入命令:Set-ExecutionPolicy Bypass -Scope Process -Force; [System.Net.ServicePointManager]::SecurityProtocol = [System.Net.ServicePointManager]…

C++---区间DP/高精度计算---凸多边形的划分(每日一道算法2023.4.27)

注意事项&#xff1a; 本题是"区间DP—能量项链"的扩展题&#xff0c;可以先理解下那道题。 本题使用了"高精度乘法"和"高精度加法"&#xff0c;可以去这两篇文章看&#xff0c;有详解。 题目&#xff1a; 给定一个具有 N 个顶点的凸多边形&…

什么是SSO?

SSO&#xff08;Single Sign On&#xff09;单点登录。SSO是在多个应用系统中&#xff0c;用户只需要登录一次就可以访问所有相互信任的应用系统。它包括可以将这次主要的登录映射到其他应用中用于同一个用户的登录的机制。它是目前比较流行的企业业务整合的解决方案之一。 当…

中山大学_程序设计新手赛2023_4题的小朋友_总结

送自己一句话&#xff1a;即使生活很不顺利&#xff0c;也不要成为一个连自己都讨厌的人 那样&#xff0c;当你有一天回首过往&#xff0c;只不过是在演戏中匆匆做过这一生 题目&#xff1a;见我上传的资源 A:关于时间复杂度 解&#xff1a; 1.关键&#xff1a; 法一&…