RocketMQ消息队列-@RocketMQMessageListener实现原理

news2024/12/23 8:10:09

使用Spring-RocketMQ时,只需要引入rocketmq-spring-boot-starter包,并且定义以下消费者,就可以很简单的实现消息消费

@Component
@RocketMQMessageListener(topic = "first-topic", consumerGroup = "my-producer-group", selectorExpression = "tag1")
public class RocketMQConsumer implements RocketMQListener<String>{

    @Override
    public void onMessage(String message) {
        System.out.println(message);
    }

可以看到只需要添加@RocketMQMessageListener注解,并实现RocketMQListener接口就可以完成消息的接受、处理逻辑


@RocketMQMessageListener实现原理

在这里插入图片描述

可以看到在ListenerContainerConfiguration中获取了所有加了RocketMQMessageListener注解的bean

 ListenerContainerConfiguration#afterSingletonsInstantiated
     
     //ListenerContainerConfiguration实现了SmartInitializingSingleton接口,会在bean都实例化完之后,触发afterSingletonsInstantiated方法
	@Override
    public void afterSingletonsInstantiated() {
     //获取所有加了RocketMQMessageListener注解的bean
        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
            .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
	    //循环调用registerContainer方法
        beans.forEach(this::registerContainer);
    }
    private void registerContainer(String beanName, Object bean) {
        ......
            
        //拿到RocketMQMessageListener注解
        RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
	   //获取注解上定义的consumerGroup
        String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
        //获取注解上定义的topic
        String topic = this.environment.resolvePlaceholders(annotation.topic());

        //定义beanName
        String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),counter.incrementAndGet());
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
        //注册bean  调用createRocketMQListenerContainer初始化一些属性
        genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
            () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
        DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
            DefaultRocketMQListenerContainer.class);
        if (!container.isRunning()) {
            try {
                //调用start方法
                container.start();
            } catch (Exception e) {
                log.error("Started container failed. {}", container, e);
                throw new RuntimeException(e);
            }
        }

        log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
    }

createRocketMQListenerContainer里面就是初始化了DefaultRocketMQListenerContainer这个对象,并且设置了一些消费相关的属性,比如nameServertopictagsconsumerGroup消费者组,rocketMQListener我们定义的消费监听者等

可以看到这里面并没有定义具体的消费者实例

//DefaultRocketMQListenerContainer定义  实现了InitializingBean接口,在bean初始化的时候会调用afterPropertiesSet方法
public class DefaultRocketMQListenerContainer implements InitializingBean,
    RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
        
       
     @Override
    public void afterPropertiesSet() throws Exception {
        //通过方法名可以看到是初始化MQ消费者实例
        initRocketMQPushConsumer();

        this.messageType = getMessageType();
        this.methodParameter = getMethodParameter();
        log.debug("RocketMQ messageType: {}", messageType);
    }
    private void initRocketMQPushConsumer() throws MQClientException {
        ......
            
        if (Objects.nonNull(rpcHook)) {
            //初始化DefaultMQPushConsumer对象
            consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
                enableMsgTrace, this.applicationContext.getEnvironment().
                resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
            consumer.setVipChannelEnabled(false);
            consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));
        } else {
            log.debug("Access-key or secret-key not configure in " + this + ".");
            consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
                this.applicationContext.getEnvironment().
                    resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
        }
	    //消息模式 广播还是集群
        switch (messageModel) {
            case BROADCASTING:
                consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
                break;
            case CLUSTERING:
                consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
                break;
            default:
                throw new IllegalArgumentException("Property 'messageModel' was wrong.");
        }

        //筛选方式  TAG和SQL92
        switch (selectorType) {
            case TAG:
                consumer.subscribe(topic, selectorExpression);
                break;
            case SQL92:
                consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
                break;
            default:
                throw new IllegalArgumentException("Property 'selectorType' was wrong.");
        }
	   //消费模式 顺序和并发
        switch (consumeMode) {
            case ORDERLY:
                consumer.setMessageListener(new DefaultMessageListenerOrderly());
                break;
            case CONCURRENTLY:
                consumer.setMessageListener(new DefaultMessageListenerConcurrently());
                break;
            default:
                throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
        }

    }

回到上面registerContainer,最后拿到DefaultRocketMQListenerContainer的bean,调用start方法

    DefaultRocketMQListenerContainer#start
        
	@Override
    public void start() {
        if (this.isRunning()) {
            throw new IllegalStateException("container already running. " + this.toString());
        }

        try {
            //当前consumer就是上面分析的DefaultMQPushConsumer
            consumer.start();
        } catch (MQClientException e) {
            throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
        }
        this.setRunning(true);

        log.info("running container: {}", this.toString());
    }

    DefaultMQPushConsumer#start
	@Override
    public void start() throws MQClientException {
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        //可以看到调用的是defaultMQPushConsumerImpl.start()方法
        this.defaultMQPushConsumerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

defaultMQPushConsumerImpl是什么时候初始化的呢

上面说到初始化DefaultMQPushConsumer对象时,点进去构造方法

    public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
        this.consumerGroup = consumerGroup;
        this.namespace = namespace;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        //可以看到就在构造方法里面初始化的,通过名字可以猜想就是DefaultMQPushConsumer的实现类,但是并不是通过实现接口的方式,而是组合的方式
        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
        if (enableMsgTrace) {
            try {
                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
                dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
                traceDispatcher = dispatcher;
                this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
                    new ConsumeMessageTraceHookImpl(traceDispatcher));
            } catch (Throwable e) {
                log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
            }
        }
    }

接口看DefaultMQPushConsumerImplstart方法

    DefaultMQPushConsumerImpl#start
        
	public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
			   //顺序消息 ConsumeMessageOrderlyService处理
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                //并发消息 ConsumeMessageConcurrentlyService处理
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }

                this.consumeMessageService.start();
                //调用start方法
                mQClientFactory.start();
    }
     MQClientInstance#start
         
	public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service 可以看到这里开启拉取消息
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }
    ServiceThread#start
        
	public void start() {
        log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
        if (!started.compareAndSet(false, true)) {
            return;
        }
        stopped = false;
        //new了一个Thread对象,this表示自己就是一个Runnable对象
        this.thread = new Thread(this, getServiceName());
        this.thread.setDaemon(isDaemon);
        //调用start方法
        this.thread.start();
    }
//ServiceThread实现了Runnable接口,并且是抽象的,找实现类
public abstract class ServiceThread implements Runnable {
}

在这里插入图片描述

可以看到PullMessageService和我们找的有关,找到它的run方法

    PullMessageService#run
        
	@Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        //通过while循环拉取消息
        while (!this.isStopped()) {
            try {
                //消息存入LinkedBlockingQueue中,通过take方法阻塞获取
                PullRequest pullRequest = this.pullRequestQueue.take();
                //调用pullMessage处理消息
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }
    private void pullMessage(final PullRequest pullRequest) {
        //选择一个消费者实例
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            //转换为DefaultMQPushConsumerImpl对象,应该很熟悉吧
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            //调用pullMessage方法继续处理
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
    }
    public void pullMessage(final PullRequest pullRequest) {
        ......
        
        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
         
                //调用consumeMessageService的submitConsumeRequest方法
                //consumeMessageService上面提到过,包含顺序和并发消费
                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);
             ......
            }

            @Override
            public void onException(Throwable e) {
                if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("execute the pull request exception", e);
                }

                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
            }
        };
    }
   ConsumeMessageOrderlyService#submitConsumeRequest
       
	@Override
    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) {
        if (dispathToConsume) {
            //ConsumeRequest是一个Runnable
            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            //提交到线程池中处理
            this.consumeExecutor.submit(consumeRequest);
        }
    }

来看ConsumeRequestrun方法

       ConsumeMessageOrderlyService.ConsumeRequest#run
           
	   @Override
        public void run() {
         ......
         //核心在这里消费消息       
		status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
        }
DefaultRocketMQListenerContainer.DefaultMessageListenerOrderly#consumeMessage        

        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            for (MessageExt messageExt : msgs) {
                log.debug("received msg: {}", messageExt);
                try {
                    long now = System.currentTimeMillis();
                    //处理消息
                    handleMessage(messageExt);
                    long costTime = System.currentTimeMillis() - now;
                    log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                } catch (Exception e) {
                    log.warn("consume message failed. messageExt:{}", messageExt, e);
                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }

            return ConsumeOrderlyStatus.SUCCESS;
        }
    DefaultRocketMQListenerContainer#handleMessage
        
	private void handleMessage(
        MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
        if (rocketMQListener != null) {
            //可以看到最终调用到onMessage方法,也就是开头我们实现的接口中的onMessage方法
            rocketMQListener.onMessage(doConvertMessage(messageExt));
        } else if (rocketMQReplyListener != null) {
            Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
            Message<?> message = MessageBuilder.withPayload(replyContent).build();

            org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
            consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, new SendCallback() {
                @Override public void onSuccess(SendResult sendResult) {
                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                        log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
                    } else {
                        log.info("Consumer replies message success.");
                    }
                }

                @Override public void onException(Throwable e) {
                    log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
                }
            });
        }
    }

至此整个流程也就通了


总结

@RocketMQMessageListener相当于定义一个消费者,topic、consumerGroup、selectorExpression、consumeMode、messageModel定义了消费者的一些属性

实现RocketMQListener接口来处理具体消费逻辑

每个消费者初始化了一个DefaultRocketMQListenerContainer对象,该对象中包含消费实例和消费者的属性

服务启动的时候开启一个线程轮训队列中的消息,如果没有就一直阻塞,拿到消息后,最终会调用自己实现的onMessage方法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/964573.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

过滤器的应用-Filter

过滤器 1.工作原理 2.创建Filter 2.1通过注解的方式实现 //创建一个类&#xff0c;实现Filter接口 WebFilter(urlPatterns "/myfilter") //urlPatterns表示需要拦截的路径 public class MyFilter implements Filter {Overridepublic void doFilter(ServletReques…

深度解读NeuS代码(1):输入数据格式

准备训练数据 在preprocess_cutsom_data中&#xff0c;笔者采取第二个经过colmap的方法。但是这个方法其实需要一个前置结果&#xff0c;即colmap与LLFF的处理。接下来分别讨论&#xff1a; Colmap 一个非常好的操作流程在这里&#xff1a; https://zhuanlan.zhihu.com/p/57…

Kubernetes(k8s)安装NFS动态供给存储类并安装KubeSphere

Kubernetes安装NFS动态供给存储类并安装KubeSphere KubeSphere介绍环境准备KubeSphereNFS动态供给 安装NFS动态供给搭建NFS下载动态供给驱动修改驱动文件安装动态供给 安装KubeSphere下载KubeSphere的yaml资源清单文件安装KubeSphere 使用KubeSphere部署应用创建项目部署MySQL …

Android 13 - Media框架(9)- NuPlayer::Decoder

这一节我们将了解 NuPlayer::Decoder&#xff0c;学习如何将 MediaCodec wrap 成一个强大的 Decoder。这一节会提前讲到 MediaCodec 相关的内容&#xff0c;如果看不大懂可以先跳过此篇。原先觉得 Decoder 部分简单&#xff0c;越读越发现自己的无知&#xff0c;Android 源码真…

nginx-缓存

disk cache&#xff1a;磁盘缓存数据&#xff0c;有时间延迟&#xff0c;但是非常小&#xff0c;相对于直接请求服务器返回 对于用户来说基本无感知。 memory cache&#xff1a;磁盘缓存数据&#xff0c;基本上没有时间延迟 协商缓存&#xff08;nginx自带功能&#xff0c; 不…

机器人中的数值优化(五)——信赖域方法

本系列文章主要是我在学习《数值优化》过程中的一些笔记和相关思考&#xff0c;主要的学习资料是深蓝学院的课程《机器人中的数值优化》和高立编著的《数值最优化方法》等&#xff0c;本系列文章篇数较多&#xff0c;不定期更新&#xff0c;上半部分介绍无约束优化&#xff0c;…

18.kthread_worker:内核线程异步传输

目录 kthread_worker 驱动传输数据的方式 同步传输 异步传输 头文件 kthread_worker结构体 kthread_work结构体 kthread_flush_work结构体 init_kthread_worker()函数 为kthread_worker创建内核线程 init_kthread_work()函数 启动工作 刷新工作队列 停止内核线程…

3D点云测量:计算三个平面的交点

文章目录 0. 测试效果1. 基本内容文章目录:3D视觉测量目录微信:dhlddxB站: Non-Stop_0. 测试效果 1. 基本内容 计算三个平面的交点需要找到满足所有三个平面方程的点。三个平面通常由它们的法向量和通过它们的点(或参数形式的方程)来定义。以下是计算三个平面的交点的一般步…

在VScode中使用sftp传输本地文件到服务器端

安装SFTP 在VScode的扩展中安装sftp 注意这里需要在你没连接服务器的状态下安装&#xff0c;即本机需要有sftp 配置传输端口 安装成功后&#xff0c;使用快捷键"ctrlshiftp",输入sftp&#xff0c;选择Config 根据自己的实际情况修改配置文件&#xff0c;主要改h…

设计模式-6--装饰者模式(Decorator Pattern)

一、什么是装饰者模式&#xff08;Decorator Pattern&#xff09; 装饰者模式&#xff08;Decorator Pattern&#xff09;是一种结构型设计模式&#xff0c;它允许你在不修改现有对象的情况下&#xff0c;动态地将新功能附加到对象上。这种模式通过创建一个包装类&#xff0c;…

排序之交换排序

文章目录 前言一、冒泡排序1、冒泡排序基本思想2、冒泡排序的效率 二、快速排序 -- hoare版本1、快速排序基本思想2、快速排序代码实现3、为什么最左边值做key时&#xff0c;右边先走 三、快速排序 -- 挖坑法1、快速排序 -- 挖坑法基本思想2、快速排序 -- 挖坑法代码实现3、为什…

stable diffusion实践操作-随机种子seed

系列文章目录 stable diffusion实践操作 文章目录 系列文章目录前言一、seed是什么&#xff1f;二、使用步骤1.多批次随机生成多张图片2.提取图片seed3. 根据seed 再次培养4 seed使用4.1 复原别人图4.1 轻微修改 三、差异随机种子1. webUI位置2. 什么是差异随机种子3.使用差异…

找redis大key工具rdb_bigkeys

github官网 https://github.com/weiyanwei412/rdb_bigkeys 在centos下安装go [roothadoop102 rdb_bigkeys-master]# wget https://dl.google.com/go/go1.13.5.linux-amd64.tar.gz [roothadoop102 rdb_bigkeys-master]# tar -zxf go1.13.5.linux-amd64.tar.gz -C /usr/local将g…

安装bpftrace和bcc的踩坑记录

最后在Ubuntu22.04使用Ubuntu提供的安装命令完成了安装。这里是记录尝试在Ubuntu18.04和Ubuntu22.04使用源码安装未果的过程。 文章目录 22版本安装bcc准备工具安装命令使用报错&#xff1a;iovisor封装的安装方式ubuntu的安装方式 For Bionic (18.04 LTS)官方提供的源码安装准…

SpringCloudGateway集成SpringDoc

SpringCloudGateway集成SpringDoc 最近在搞Spring版本升级&#xff0c;按客户要求升级Spring版本&#xff0c;原来用着SpringBoot 2.2.X版本&#xff0c;只需要升级SpringBoot 2.X最新版本也就可以满足客户Spring版本安全要求&#xff0c;可是好像最新的SpringBoot 2.X貌似也不…

Laravel chunk和chunkById的坑

在编写定时任务脚本的时候&#xff0c;经常会用到chunk和chunkById的API。 一、前言 数据库引擎为innodb。 表结构简述&#xff0c;只列出了本文用到的字段。 字段类型注释idint(11)IDtypeint(11)类型mark_timeint(10)标注时间&#xff08;时间戳&#xff09; 索引&#x…

手撕 视觉slam14讲 ch13 代码(1)工程框架与代码结构

在学习slam一年之后开始&#xff0c;开始自己理思路&#xff0c;全手敲完成ch13的整个代码 我们按照自己写系统的思路进行&#xff0c;首先确定好SLAM整体系统的流程&#xff0c;见下图&#xff0c;输入为双目图像&#xff0c;之后进入前端位姿估计和后端优化&#xff0c;中间…

滑动窗口实例3(最大连续1的个数Ⅲ)

题目&#xff1a; 给定一个二进制数组 nums 和一个整数 k&#xff0c;如果可以翻转最多 k 个 0 &#xff0c;则返回 数组中连续 1 的最大个数 。 示例 1&#xff1a; 输入&#xff1a;nums [1,1,1,0,0,0,1,1,1,1,0], K 2 输出&#xff1a;6 解释&#xff1a;[1,1,1,0,0,1,1…

stable diffusion实践操作-宽高设置以及高清修复

系列文章目录 stable diffusion实践操作 文章目录 系列文章目录前言一、SD宽高怎么设置&#xff1f;1.1 宽高历史 二、高清修复总结 前言 主要介绍SD的宽高设置以及高清修复 一、SD宽高怎么设置&#xff1f; 1.1 宽高历史 SD生成256256图片效果最好。512512是SD一直使用的画…

【管理运筹学】第 7 章 | 图与网络分析(1,图论背景以及基本概念、术语)

文章目录 引言一、图与网络的基本知识1.1 图与网络的基本概念1.1.1 图的定义1.1.2 图中相关术语1.1.3 一些特殊图类1.1.4 图的运算 写在最后 引言 按照正常进度应该学习动态规划了&#xff0c;但我想换换口味&#xff0c;而且动态规划听说也有一定难度&#xff0c;还不一定会考…