[RocketMQ] Producer生产者启动源码 (五)

news2024/12/29 16:34:30

DefaultMQProducer的关系图:

在这里插入图片描述

文章目录

      • 1.创建DefaultMQProducer实例
      • 2.start启动生产者
        • 2.1 getOrCreateMQClientInstance获取或者创建MQClientInstance
        • 2.2 registerProducer注册生产者
        • 2.3 start启动MQClientInstance
        • 2.4 startScheduledTask启动各种定时任务
          • 2.4.1 updateTopicRouteInfoFromNameServer更新topic路由信息
            • 2.4.1.1 updateTopicRouteInfoFromNameServer更新topic路由信息
            • 2.4.1.2 getTopicRouteInfoFromNameServer从nameServer获取路由信息
            • 2.4.1.3 getAndCreateChannel随机选择nameServer建立连接
          • 2.4.2 cleanOfflineBroker清除下线的broker
          • 2.4.3 sendHeartbeatToAllBrokerWithLock发送心跳包
            • 2.4.3.1 sendHeartbeatToAllBroker给所有broker发送心跳包
            • 2.4.3.2 prepareHeartbeatData准备心跳数据包
      • 3.总结

DefaultMQProducer的构造器和start方法。

1.创建DefaultMQProducer实例

DefaultMQProducer的构造器有很多, 但是最终调用的是下面3个参数的构造函数:

/**
 * Constructor specifying namespace, producer group and RPC hook.
 * 指定命名空间、生产者组和RPC钩子的构造函数。
 *
 * @param namespace Namespace for this MQ Producer instance.
 * @param producerGroup Producer group, see the name-sake field.
 * @param rpcHook RPC hook to execute per each remoting command execution.
 */
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
    //命名空间
    this.namespace = namespace;
    //生产者组
    this.producerGroup = producerGroup;
    //根据RPC钩子创建DefaultMQProducerImpl实例,负责发送消息
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}

指定了命名空间、生产者组和RPC钩子的构造器, 内部创建了一个DefaultMQProducerImpl实例, DefaultMQProducer可以看作是DefaultMQProducerImpl的包装类, 是给开发人员用的。DefaultMQProducer中的几乎所有的方法内部都是由DefaultMQProducerImpl实现的, 这是门面设计模式。

DefaultMQProducerImpl的构造器, 初始化了一个异步发送信息的线程池。

/**
 * @param defaultMQProducer defaultMQProducer
 * @param rpcHook           rpc钩子
 */
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
    //保存defaultMQProducer和rpcHook
    this.defaultMQProducer = defaultMQProducer;
    this.rpcHook = rpcHook;
    /*
     * 异步发送消息的线程池队列
     */
    this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
    /*
     * 默认的异步发送消息的线程池
     * 核心线程和最大线程数量都是当前服务器的可用线程数
     */
    this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.asyncSenderThreadPoolQueue,
            new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
                }
            });
}

2.start启动生产者

在这里插入图片描述

start方法内部会执行很多初始化操作, 需要在发送或者查询消息前调用该方法。

/**
 * 启动生产者实例
 * 为了准备这个实例,需要执行许多内部初始化过程,因此,必须在发送或查询消息之前调用这个方法。
 */
@Override
public void start() throws MQClientException {
    //根据namespace和producerGroup设置生产者组
    this.setProducerGroup(withNamespace(this.producerGroup));
    //默认生产者实现启动
    this.defaultMQProducerImpl.start();
    //消息轨迹跟踪服务,默认null
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

defaultMQProducerImpl#start方法, 该方法实现生产者的启动。

在这里插入图片描述

  1. 调用checkConfig方法检查生产者的ProducerGroup是否符合规范, 如果ProducerGroup为空, 或者长度大于255个字符, 或者包含非法字符, 或者生产者组名为默认组名DEFAULT_PRODUCER, 满足以上任意条件都校验不通过抛出异常。
  2. 调用getOrCreateMQClientInstance方法, 根据clientId获取或者创建CreateMQClientInstance实例并赋给mQClientFactory变量。
  3. 将当前生产者注册到MQClientInstance实例的producerTable属性中。
  4. 添加一个默认topic “TBW102”, 将会在isAutoCreateTopicEnable属性开启时在broker上自动创建, RocketMQ会基于该Topic的配置创建新的Topic。
  5. 调用mQClientFactory#start方法启动CreateMQClientInstance客户端通信实例, 初始化netty服务, 各种定时任务, 拉取消息服务, rebalanceService服务等等。
  6. 主动调用一次sendHeartbeatToAllBrokerWithLock发送心跳信息给所有broker。
  7. 启动一个定时任务, 移除超时的request方法的请求, 执行异常回调, 时间间隔为1s。
/**
 * DefaultMQProducerImpl的启动方法
 */
public void start() throws MQClientException {
    this.start(true);
}

/**
 * DefaultMQProducerImpl的启动方法
 *
 * @param startFactory
 * @throws MQClientException
 */
public void start(final boolean startFactory) throws MQClientException {
    //根据服务状态选择走不同的代码分支
    switch (this.serviceState) {
        /**
         * 服务仅仅创建,而不是启动状态,那么启动服务
         */
        case CREATE_JUST:
            //首先修改服务状态为服务启动失败,如果最终启动成功则再修改为RUNNING
            this.serviceState = ServiceState.START_FAILED;
            /*
             * 1 检查生产者的配置信息
             * 主要是检查ProducerGroup是否符合规范,
             * 如果ProducerGroup为空,或者长度大于255个字符,或者包含非法字符(正常的匹配模式为 ^[%|a-zA-Z0-9_-]+$),或者生产者组名为默认组名DEFAULT_PRODUCER
             * 满足以上任意条件都校验不通过抛出异常。
             */
            this.checkConfig();
            //如果ProducerGroup不是CLIENT_INNER_PRODUCER,那么将修改当前的instanceName为当前进程pid,PID就是服务的进程号。
            //CLIENT_INNER_PRODUCER是客户端内部的生产者组名,该生产者用于发送消息回退请求
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }
            /*
             * 2 获取MQClientManager实例,然后根据clientId获取或者创建CreateMQClientInstance实例,并赋给mQClientFactory变量
             */
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
            /*
             * 3 将当前生产者注册到MQClientInstance实例的producerTable属性中
             */
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            //如果注册失败,那么设置服务属性为CREATE_JUST,并抛出异常
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
            }
            //添加一个默认topic “TBW102”,将会在isAutoCreateTopicEnable属性开启时在broker上自动创建,RocketMQ会基于该Topic的配置创建新的Topic
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
            /*
             * 4 启动CreateMQClientInstance客户端通信实例
             * netty服务、各种定时任务、拉取消息服务、rebalanceService服务
             */
            if (startFactory) {
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
            //服务状态改为RUNNING
            this.serviceState = ServiceState.RUNNING;
            break;
        /**
         * 服务状态是其他的,那么抛出异常,即start方法仅能调用一次
         */
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
        default:
            break;
    }
    /*
     * 5 发送心跳信息给所有broker
     */
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    /*
     * 6 启动一个定时任务,移除超时的请求,并执行异常回调,任务间隔1s
     */
    RequestFutureHolder.getInstance().startScheduledTask(this);

}

2.1 getOrCreateMQClientInstance获取或者创建MQClientInstance

在这里插入图片描述

先生成clientId, 格式为clientIP@instanceName@unitName。然后从本地缓存factoryTable中查找该clientId的MQClientInstance实例, 如果缓存没有的, 创建并加入缓存。

MQClientInstance封装了RocketMQ底层网络处理API, Producer, Consumer都使用MQClientInstance, 是Producer和Consumer与Broker和NameServer打交道的网络通道。

一个clientId对应一个MQClientInstance, 一个应用中多个producer和consumer使用同一个MQClientInstance实例。

/**
 * MQClientManager的方法
 *
 * @param clientConfig 生产者客户端配置类
 * @param rpcHook      rpc钩子
 * @return MQClientInstance
 */
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    /*
     * 构建clientId,格式为 clientIP@instanceName@unitName
     */
    String clientId = clientConfig.buildMQClientId();
    //从本地缓存factoryTable中,查找该clientId的MQClientInstance实例
    MQClientInstance instance = this.factoryTable.get(clientId);
    //如果不存在则创建并存入factoryTable
    if (null == instance) {
        instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                        this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }

    return instance;
}

创建MQClientInstance

在这里插入图片描述

创建MQClientInstance的时候, 会初始化netty客户端, 各种服务实例。

在这里插入图片描述

public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
    this.clientConfig = clientConfig;
    this.instanceIndex = instanceIndex;
    //创建netty客户端配置类实例
    this.nettyClientConfig = new NettyClientConfig();
    this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
    this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
    //客户端请求处理器
    this.clientRemotingProcessor = new ClientRemotingProcessor(this);
    //创建客户端远程通信API实现类的实例,内部持有一个remotingClient
    this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
    //更新namesrvAddr,即根据“;”将namesrvAddr字符串拆分为namesrvAddrList集合
    if (this.clientConfig.getNamesrvAddr() != null) {
        this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
        log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
    }
    //客户端id
    this.clientId = clientId;
    //MQ的admin控制台操作的实现
    this.mQAdminImpl = new MQAdminImpl(this);
    //push模式下,拉取消息的服务
    this.pullMessageService = new PullMessageService(this);
    //消息消费的负载均衡服务
    this.rebalanceService = new RebalanceService(this);
    //客户端内部的生产者,该生产者用于发送消息回退请求
    this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
    this.defaultMQProducer.resetClientConfig(clientConfig);
    //消费者状态管理器
    this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);

    log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
            this.instanceIndex,
            this.clientId,
            this.clientConfig,
            MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}

2.2 registerProducer注册生产者

在这里插入图片描述

registerProducer方法尝试将当前生产者组和生产者实例的映射关系加入到MQClientInstance内部的producerTable属性中。如果存在相同生产者组的数据, 则不会再次添加返回false。

/**
 * MQClientInstance的方法
 *
 * @param group    生产者组
 * @param producer 生产者实例
 * @return 是否加入成功
 */
public synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
    //任何一个为null直接返回false
    if (null == group || null == producer) {
        return false;
    }
    //如果生产者组不存在,则添加到producerTable集合中,并返回null,否则返回已存在的producer
    MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
    //如果已存入相同的生产者组的生产者,则返回false
    if (prev != null) {
        log.warn("the producer group[{}] exist already.", group);
        return false;
    }

    return true;
}

2.3 start启动MQClientInstance

DefaultMQProducerImpl#start() -> MQClientInstance#start() -> MQClientAPIImpl#start()

该方法是producer启动的核心方法。启动CreateMQClientInstance客户端通信实例, 会初始化netty服务, 各种定时任务, 拉取消息服务, rebalanceService服务, 内部的生产者服务等等。

在这里插入图片描述

在这里插入图片描述

/**
 * MQClientAPIImpl的方法
 */
public void start() {
    //调用NettyRemotingClient#start
    this.remotingClient.start();
}

内部就是调用的NettyRemotingClient#start方法。

/**
 * NettyRemotingClient的方法
 * 创建netty客户端,并没有真正启动
 */
@Override
public void start() {
    //创建默认事件处理器组,默认4个线程,线程名以NettyClientWorkerThread_为前缀。
    //主要用于执行在真正执行业务逻辑之前需要进行的SSL验证、编解码、空闲检查、网络连接管理等操作
    //其工作时间位于IO线程组之后,process线程组之前
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyClientConfig.getClientWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
                }
            });
    /*
     * 初始化netty客户端
     */
    //eventLoopGroupWorker线程组,默认一个线程
    Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
            //对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关
            .option(ChannelOption.TCP_NODELAY, true)
            //对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态
            .option(ChannelOption.SO_KEEPALIVE, false)
            //用来设置连接超时时长,单位是毫秒,默认3000
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) {
                        if (null != sslContext) {
                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            log.info("Prepend SSL handler");
                        } else {
                            log.warn("Connections are insecure as SSLContext is null!");
                        }
                    }
                    //为defaultEventExecutorGroup,添加handler
                    pipeline.addLast(
                            defaultEventExecutorGroup,
                            //RocketMQ自定义的请求解码器
                            new NettyEncoder(),
                            //RocketMQ自定义的请求编码器
                            new NettyDecoder(),
                            //Netty自带的心跳管理器,主要是用来检测远端是否存活
                            //即测试端一定时间内未接受到被测试端消息和一定时间内向被测试端发送消息的超时时间为120秒
                            new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                            //连接管理器,他负责连接的激活、断开、超时、异常等事件
                            new NettyConnectManageHandler(),
                            //服务请求处理器,处理RemotingCommand消息,即请求和响应的业务处理,并且返回相应的处理结果。这是重点
                            //例如broker注册、producer/consumer获取Broker、Topic信息等请求都是该处理器处理
                            //serverHandler最终会将请求根据不同的消息类型code分发到不同的process线程池处理
                            new NettyClientHandler());
                }
            });
    if (nettyClientConfig.getClientSocketSndBufSize() > 0) {
        log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());
        handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());
    }
    if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {
        log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());
        handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
    }
    if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {
        log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",
                nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());
        handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
                nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));
    }
    /*
     * 启动定时任务,初始启动3秒后执行,此后每隔1秒执行一次
     * 扫描responseTable,将超时的ResponseFuture直接移除,并且执行这些超时ResponseFuture的回调
     */
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                NettyRemotingClient.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
    /*
     * 启动netty事件监听器,处理各种事件
     */
    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }
}

start方法最后, 启动了一个定时任务scanResponseTable, 主要用于处理通信时异常情况, RocketMQ会将请求结果封装为一个ResponseFuture并且存入responseTable中。

发送消息时, 如果服务器没有response返回给客户端或者response因网络而丢失情况, 此时可能会造成responseTable中的ResponseFuture积累, 因此该任务会每隔一秒扫描一次responseTable, 将超时的ResponseFuture直接移除, 并且执行超时ResponseFuture的回调。

2.4 startScheduledTask启动各种定时任务

在这里插入图片描述

  1. 如果没有手动指定namesrvAddr, 那么每隔2s从nameServer地址服务器拉取最新的nameServer地址并更新。
  2. 每隔30S尝试从nameServer更新topic路由信息。
  3. 每隔30S尝试清除无效的broker信息, 以及发送心跳信息给所有broker。
  4. 每隔5S尝试持久化消费者偏移量, 消费进度。广播消费模式下持久化到本地, 集群消费模式下推送给broker端。该定时任务针对消费者。
  5. 每隔1m尝试调整push模式的消费线程池的线程数量, 该定时任务针对消费者。

这些定时任务是MQClientInstance实例启动的, 而一个应用的所有consumer和producer共用该MQClientInstance实例, 所以这些定时任务也可以说属于该应用的每一个consumer和producer。

/**
 * MQClientInstance的方法
 */
private void startScheduledTask() {
    /**
     * 1 如果没有手动指定namesrvAddr,那么每隔2m从nameServer地址服务器拉取最新的nameServer地址并更新
     * 要想动态更新nameServer地址,需要指定一个地址服务器的url
     */
    if (null == this.clientConfig.getNamesrvAddr()) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                } catch (Exception e) {
                    log.error("ScheduledTask fetchNameServerAddr exception", e);
                }
            }
        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }
    /**
     * 2 每隔30S尝试从nameServer更新topic路由信息
     */
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    /**
     * 3 每隔30S尝试清除无效的broker信息,以及发送心跳信息给所有broker
     */
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.cleanOfflineBroker();
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
            }
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
    /**
     * 4 每隔5S尝试持久化消费者偏移量,即消费进度
     * 广播消费模式下持久化到本地,集群消费模式下推送到broker端
     */
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    /**
     * 5 每隔1min尝试调整push模式的消费线程池的线程数量,目前默认没有实现该功能
     */
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {
                log.error("ScheduledTask adjustThreadPool exception", e);
            }
        }
    }, 1, 1, TimeUnit.MINUTES);
}
2.4.1 updateTopicRouteInfoFromNameServer更新topic路由信息

在这里插入图片描述

每隔30s从nameServer拉取并更新topic的路由信息的定时任务方法。

首先会从MQClientInstance内部的consumerTable以及producerTable这两个map中获取配置的所有topic集合topicList, 包含consumer订阅的topic集合以及producer中topicPublishTable集合中的数据。

/**
 * MQClientInstance的方法
 * 从nameServer拉取并更新topic信息
 */
public void updateTopicRouteInfoFromNameServer() {
    //使用set集合存储topic,去除重复的数据
    Set<String> topicList = new HashSet<String>();

    // Consumer
    /*
     * 从consumerTable中获取所有的内部生产者DefaultMQPushConsumerImpl的实例,然后获取内部的rebalanceImpl#subscriptionInner集合中的topic
     */
    {
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                Set<SubscriptionData> subList = impl.subscriptions();
                if (subList != null) {
                    for (SubscriptionData subData : subList) {
                        topicList.add(subData.getTopic());
                    }
                }
            }
        }
    }

    // Producer
    /*
     * 从producerTable中获取所有的内部生产者DefaultMQProducerImpl的实例,然后获取内部的topicPublishInfoTable集合中的topic
     */
    {
        Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQProducerInner> entry = it.next();
            MQProducerInner impl = entry.getValue();
            if (impl != null) {
                Set<String> lst = impl.getPublishTopicList();
                topicList.addAll(lst);
            }
        }
    }
    /*
     * 遍历每一个topic,从nameServer获取topic的路由信息
     * 然后比较topic是否改变,如果改变了,对客户端本地保存的路由信息进行更新
     */
    for (String topic : topicList) {
        this.updateTopicRouteInfoFromNameServer(topic);
    }
}
2.4.1.1 updateTopicRouteInfoFromNameServer更新topic路由信息

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

该方法用于从nameServer拉取并更新topic的路由信息, topic的路由信息由broker上报给nameServer。

从nameSerer拉取到topic路由信息之后, 调用topicRouteDataIsChange方法与本地的旧topic路由信息比较看是否更改, 比较的数据为: topic的队列信息queueDatas, topic的broker信息brokerDatas, 顺序topic配置orderTopicConf, 消息过滤信息filterServerTable。

当判断是否需要更新的时候, 会更新本地topic缓存:

  1. 更新brokerName到brokerAddr的地址的映射关系, brokerAddrTable。
  2. 更新生产者的producerTable集合, 更新MQProducerInner的topicPublishInfoTable属性。
  3. 更新消费者的consumerTable集合, 更新MQConsumerInner的rebalanceImpl.topicSubscribeInfoTable属性。
  4. 更新topicRouteTable集合, 更新本地topic路由信息。
/**
 * MQClientInstance的方法
 * 从nameServer拉取并更新topic信息
 */
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
                                                  DefaultMQProducer defaultMQProducer) {
    try {
        //加锁防止并发
        if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                TopicRouteData topicRouteData;
                /*
                 * 1 通过默认topic和producer去获取路由信息时走此逻辑
                 */
                if (isDefault && defaultMQProducer != null) {
                    //获取指定的topic “TBW102” 的路由信息
                    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            clientConfig.getMqClientApiTimeout());
                    if (topicRouteData != null) {
                        //更新队列数据,设置读写队列数量
                        for (QueueData data : topicRouteData.getQueueDatas()) {
                            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                            data.setReadQueueNums(queueNums);
                            data.setWriteQueueNums(queueNums);
                        }
                    }
                } else {
                    /*
                     * 2 定时任务会走该逻辑
                     * 从nameServer获取当前指定topic的路由信息
                     */
                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
                }
                /*
                 * 3 如果获取到的路由信息不为空,那么比较看是否需要更新本地路由信息
                 */
                if (topicRouteData != null) {
                    //从topicRouteTable中获取本地缓存的topic路由信息
                    TopicRouteData old = this.topicRouteTable.get(topic);
                    /*
                     * 4 比较nameServer的topic路由和本地topic路由,看是否不相同
                     */
                    boolean changed = topicRouteDataIsChange(old, topicRouteData);
                    if (!changed) {
                        //如果相同,看本地是否还存在当前topic的配置,如果不存在则还是需要更新topic路由
                        changed = this.isNeedUpdateTopicRouteInfo(topic);
                    } else {
                        log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                    }
                    /*
                     * 5 如果需要更新本地路由信息,那么执行更新操作
                     */
                    if (changed) {
                        //克隆一个TopicRouteData
                        TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                        /*
                         * 更新更新brokerName到brokerAddr的地址的映射关系,即brokerAddrTable
                         */
                        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                            this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                        }

                        // Update Pub info
                        /*
                         * 更新生产者的producerTable集合,更新MQProducerInner的topicPublishInfoTable属性
                         */
                        if (!producerTable.isEmpty()) {
                            TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                            publishInfo.setHaveTopicRouterInfo(true);
                            Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                            while (it.hasNext()) {
                                Entry<String, MQProducerInner> entry = it.next();
                                MQProducerInner impl = entry.getValue();
                                if (impl != null) {
                                    impl.updateTopicPublishInfo(topic, publishInfo);
                                }
                            }
                        }

                        // Update sub info
                        /*
                         * 更新消费者的consumerTable集合,更新MQConsumerInner的rebalanceImpl.topicSubscribeInfoTable属性
                         */
                        if (!consumerTable.isEmpty()) {
                            Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                            Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                            while (it.hasNext()) {
                                Entry<String, MQConsumerInner> entry = it.next();
                                MQConsumerInner impl = entry.getValue();
                                if (impl != null) {
                                    impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                }
                            }
                        }
                        log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                        /*
                         * 更新topicRouteTable集合,更新本地topic信息
                         */
                        this.topicRouteTable.put(topic, cloneTopicRouteData);
                        return true;
                    }
                } else {
                    log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
                }
            } catch (MQClientException e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                    log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                }
            } catch (RemotingException e) {
                log.error("updateTopicRouteInfoFromNameServer Exception", e);
                throw new IllegalStateException(e);
            } finally {
                this.lockNamesrv.unlock();
            }
        } else {
            log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
        }
    } catch (InterruptedException e) {
        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
    }

    return false;
}
2.4.1.2 getTopicRouteInfoFromNameServer从nameServer获取路由信息

在这里插入图片描述

通过remotingClient发起请求调用远程接口, 从nameServer获取当前指定topic的路由信息。

/**
 * MQClientAPIImpl的方法
 * <p>
 * 从nameServer获取当前指定topic的路由信息
 *
 * @param topic         指定topic名字
 * @param timeoutMillis 超时时间3s
 */
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
        throws RemotingException, MQClientException, InterruptedException {
    //允许topic不存在
    return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
}

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
                                                      boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
    //构建请求头
    GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
    //topic设置到请求头中
    requestHeader.setTopic(topic);
    //获取请求命令对象,请求的Code为GET_ROUTEINFO_BY_TOPIC,105
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
    //同步远程调用,请求nameServer服务,注意这里的addr参数为nll,即producer会随机连接一个nameServer
    RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.TOPIC_NOT_EXIST: {
            if (allowTopicNotExist) {
                log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
            }

            break;
        }
        case ResponseCode.SUCCESS: {
            //如果响应成功,那么对响应体进行解码
            byte[] body = response.getBody();
            if (body != null) {
                return TopicRouteData.decode(body, TopicRouteData.class);
            }
        }
        default:
            break;
    }

    throw new MQClientException(response.getCode(), response.getRemark());
}

最终通过remotingClient.invokeSync方法同步请求nameServer获取topic路由信息,。

/**
 * NettyRemotingClient的方法
 * <p>
 * 同步调用
 */
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    long beginStartTime = System.currentTimeMillis();
    //根据addr建立连接,获取channel,随机选择nameServer连接的逻辑就在这里
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            //执行rpc钩子的doBeforeRequest方法
            doBeforeRpcHooks(addr, request);
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call the addr[" + addr + "] timeout");
            }
            //执行同步远程调用,获得调用结果
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            //执行rpc钩子的doAfterResponse方法
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            return response;
        } catch (RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        } catch (RemotingTimeoutException e) {
            if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}
2.4.1.3 getAndCreateChannel随机选择nameServer建立连接

在这里插入图片描述

在这里插入图片描述

getAndCreateChannel方法根据addr建立连接获取channel。

  1. 首先获取一个随机位于nameServer集合长度范围内的整数, 作为第一个被选取的nameServer的地址。
  2. 然后建立长连接, 如果建立成功, 那么该长连接放入缓存, 以后直接使用。
  3. 如果没有建立成功, 那么选择当前位置开始的下一个nameServer地址继续尝试建立, 直到成功为止。如果所有的都失败了, 抛出异常。
/**
 * NettyRemotingClient的方法
 * <p>
 * 获取并建立连接
 *
 * @param addr 地址
 */
private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
    /*
     * 如果addr为null,则获取并且创建nameServer长连接
     */
    if (null == addr) {
        return getAndCreateNameserverChannel();
    }
    /*
     * 否则,尝试从缓存中获取长连接,获取不到才会创建
     */
    ChannelWrapper cw = this.channelTables.get(addr);
    if (cw != null && cw.isOK()) {
        return cw.getChannel();
    }

    return this.createChannel(addr);
}

/**
 * NettyRemotingClient的方法
 * 获取并建立nameServer连接
 */
private Channel getAndCreateNameserverChannel() throws RemotingConnectException, InterruptedException {
    //如果此前已经选择了nameServer,并且通道存在且是激活状态,那么直接获取该nameServer的通道
    //即通道可以被不同的consumer和producer复用
    String addr = this.namesrvAddrChoosed.get();
    if (addr != null) {
        ChannelWrapper cw = this.channelTables.get(addr);
        if (cw != null && cw.isOK()) {
            return cw.getChannel();
        }
    }

    final List<String> addrList = this.namesrvAddrList.get();
    //加锁
    if (this.namesrvChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
        //加锁之后再次判断,防止并发
        try {
            addr = this.namesrvAddrChoosed.get();
            if (addr != null) {
                ChannelWrapper cw = this.channelTables.get(addr);
                if (cw != null && cw.isOK()) {
                    return cw.getChannel();
                }
            }
            /*
             * 选择一个nameServer建立长连接
             */
            if (addrList != null && !addrList.isEmpty()) {
                for (int i = 0; i < addrList.size(); i++) {
                    //获取下一个nameServer的索引,初始化数据是随机生成的
                    int index = this.namesrvIndex.incrementAndGet();
                    //取绝对值
                    index = Math.abs(index);
                    //对nameServer集合长度取余数
                    index = index % addrList.size();
                    //获取最终确定的nameServer地址
                    String newAddr = addrList.get(index);
                    //设置到被选择的namesrvAddrChoosed属性中
                    this.namesrvAddrChoosed.set(newAddr);
                    log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
                    //针对被选择的地址建立长连接
                    Channel channelNew = this.createChannel(newAddr);
                    //如果建立连接成功就返回,否则集训循环选择下一个nameServer地址创建
                    if (channelNew != null) {
                        return channelNew;
                    }
                }
                //没有一个nameServer地址建立成功,那么抛出异常
                throw new RemotingConnectException(addrList.toString());
            }
        } finally {
            //解锁
            this.namesrvChannelLock.unlock();
        }
    } else {
        log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
    }

    return null;
}
2.4.2 cleanOfflineBroker清除下线的broker

在这里插入图片描述

每隔30s清除下线的broker的定时任务方法。遍历并且更新brokerAddrTable这个map集合。

获取每一个address, 去本地路由信息集合topicRouteTable中查找判断broker地址是否存在于topicRouteTable的任意一个topic的路由信息中, 如果不存在, 则表示下线, 清除broker地址。

/**
 * MQClientInstance的方法
 * <p>
 * 清除下线broker
 */
private void cleanOfflineBroker() {
    try {
        //加锁
        if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
            try {
                ConcurrentHashMap<String, HashMap<Long, String>> updatedTable = new ConcurrentHashMap<String, HashMap<Long, String>>();

                Iterator<Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet().iterator();
                //遍历brokerAddrTable
                while (itBrokerTable.hasNext()) {
                    Entry<String, HashMap<Long, String>> entry = itBrokerTable.next();
                    String brokerName = entry.getKey();
                    HashMap<Long, String> oneTable = entry.getValue();

                    HashMap<Long, String> cloneAddrTable = new HashMap<Long, String>();
                    cloneAddrTable.putAll(oneTable);

                    Iterator<Entry<Long, String>> it = cloneAddrTable.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<Long, String> ee = it.next();
                        String addr = ee.getValue();
                        //判断broker地址是否存在于topicRouteTable的任意一个topic的路由信息中,如果不存在则移除该broker地址
                        if (!this.isBrokerAddrExistInTopicRouteTable(addr)) {
                            it.remove();
                            log.info("the broker addr[{} {}] is offline, remove it", brokerName, addr);
                        }
                    }
                    //如果集合为空,则移除brokerAddrTable的此项键值对
                    if (cloneAddrTable.isEmpty()) {
                        itBrokerTable.remove();
                        log.info("the broker[{}] name's host is offline, remove it", brokerName);
                    } else {
                        //否则更新剩下的broker地址信息
                        updatedTable.put(brokerName, cloneAddrTable);
                    }
                }
                //更新brokerAddrTable的信息
                if (!updatedTable.isEmpty()) {
                    this.brokerAddrTable.putAll(updatedTable);
                }
            } finally {
                this.lockNamesrv.unlock();
            }
    } catch (InterruptedException e) {
        log.warn("cleanOfflineBroker Exception", e);
    }
}

在这里插入图片描述

isBrokerAddrExistInTopicRouteTable方法用判断给定的broker地址是否存在于topicRouteTable的任意一个topic的路由信息中。

/**
 * MQClientInstance的方法
 * broker地址是否存在于topicRouteTable的任意一个topic的路由信息中
 */
private boolean isBrokerAddrExistInTopicRouteTable(final String addr) {
    //遍历topicRouteTable
    Iterator<Entry<String, TopicRouteData>> it = this.topicRouteTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, TopicRouteData> entry = it.next();
        TopicRouteData topicRouteData = entry.getValue();
        List<BrokerData> bds = topicRouteData.getBrokerDatas();
        for (BrokerData bd : bds) {
            //broker地址是否存在于topicRouteTable的任意一个topic的路由信息中
            if (bd.getBrokerAddrs() != null) {
                boolean exist = bd.getBrokerAddrs().containsValue(addr);
                if (exist)
                    return true;
            }
        }
    }

    return false;
}
2.4.3 sendHeartbeatToAllBrokerWithLock发送心跳包

每隔30s向所有broker发送心跳包的定时任务方法。客户的consumer和producer都是通过该定时任务发送心跳数据包的。
在这里插入图片描述

/**
 * MQClientInstance的方法
 * 发送心跳包给所有broker
 */
public void sendHeartbeatToAllBrokerWithLock() {
    //加锁
    if (this.lockHeartbeat.tryLock()) {
        try {
            //发送心跳包给所有broker
            this.sendHeartbeatToAllBroker();
            //上传过滤类到Broker对应的所有Filtersrv,push模式消费使用
            //但是在RocketMQ5.0.0中filterServer被整体移除,推荐使用MessageSelector
            this.uploadFilterClassSource();
        } catch (final Exception e) {
            log.error("sendHeartbeatToAllBroker exception", e);
        } finally {
            this.lockHeartbeat.unlock();
        }
    } else {
        log.warn("lock heartBeat, but failed. [{}]", this.clientId);
    }
}
2.4.3.1 sendHeartbeatToAllBroker给所有broker发送心跳包

该方法用于给所有broker发送心跳包, 首先会prepareHeartbeatData准备心跳包, 如果数据包中没有任何生产者和消费者的信息, 那么不会发送心跳包, 直接返回。

然后会遍历brokerAddrTable中的broker地址, 循环发送心跳包。如果数据包中没有消费者的信息并且当前broker不是Master节点, 无需向该broker发送心跳包。因为此时该应用中只有生产者启动, 而生产者只会给Master发送消费数据。

发送心跳包的请求编码为请求编码为HEART_BEAT。

/**
 * MQClientInstance的方法
 * 给所有broker发送心跳包
 */
private void sendHeartbeatToAllBroker() {
    /*
     * 1 准备心跳数据包
     */
    final HeartbeatData heartbeatData = this.prepareHeartbeatData();
    //如果没有任何关于生产者和消费者的数据,那么不需要发送心跳包
    final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
    final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
    if (producerEmpty && consumerEmpty) {
        log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
        return;
    }

    if (!this.brokerAddrTable.isEmpty()) {
        //发送心跳的次数自增1
        long times = this.sendHeartbeatTimesTotal.getAndIncrement();
        //循环遍历brokerAddrTable
        Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, HashMap<Long, String>> entry = it.next();
            String brokerName = entry.getKey();
            HashMap<Long, String> oneTable = entry.getValue();
            if (oneTable != null) {
                for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                    Long id = entry1.getKey();
                    String addr = entry1.getValue();
                    if (addr != null) {
                        /*
                         * 2 如果消费者配置为空,并且当前broker节点不是Master,那么跳过当前broker注册,因为producer只需要向和Master维持心跳即可
                         * 如果当前应用中启动了消费者,那么就会向所有的broker注册
                         */
                        if (consumerEmpty) {
                            if (id != MixAll.MASTER_ID)
                                continue;
                        }

                        try {
                            /*
                             * 3 给broker发送心跳数据包,返回版本
                             */
                            int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
                            if (!this.brokerVersionTable.containsKey(brokerName)) {
                                this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                            }
                            this.brokerVersionTable.get(brokerName).put(addr, version);
                            //每二十次心跳,打印一次日志
                            if (times % 20 == 0) {
                                log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
                                log.info(heartbeatData.toString());
                            }
                        } catch (Exception e) {
                            if (this.isBrokerInNameServer(addr)) {
                                log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
                            } else {
                                log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
                                        id, addr, e);
                            }
                        }
                    }
                }
            }
        }
    }
}
2.4.3.2 prepareHeartbeatData准备心跳数据包

在这里插入图片描述
构建心跳数据包, 包含客户端id、消费者信息集合(消费者组名、消费类型、消费模式、启动消费者时从哪开始消费、订阅信息)、生产者信息集合(生产者组名)。

/**
 * MQClientInstance的方法
 * 准备心跳信息
 */
private HeartbeatData prepareHeartbeatData() {
    HeartbeatData heartbeatData = new HeartbeatData();

    // clientID
    heartbeatData.setClientID(this.clientId);

    // Consumer
    /*
     * 消费者心跳信息
     */
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            //构建ConsumerData数据
            ConsumerData consumerData = new ConsumerData();
            //消费者组名
            consumerData.setGroupName(impl.groupName());
            //消费类型 pull push
            consumerData.setConsumeType(impl.consumeType());
            //消费模式 集群 广播
            consumerData.setMessageModel(impl.messageModel());
            //启动消费者时从哪开始消费
            consumerData.setConsumeFromWhere(impl.consumeFromWhere());
            //订阅信息,包括过滤消息相关标签、SQL规则
            consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
            consumerData.setUnitMode(impl.isUnitMode());
            //加入到消费者数据集合中
            heartbeatData.getConsumerDataSet().add(consumerData);
        }
    }

    // Producer
    /*
     * 生产者心跳信息
     */
    for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
        MQProducerInner impl = entry.getValue();
        if (impl != null) {
            ProducerData producerData = new ProducerData();
            //生产者组名
            producerData.setGroupName(entry.getKey());

            heartbeatData.getProducerDataSet().add(producerData);
        }
    }

    return heartbeatData;
}

3.总结

DefaultMQProducer的构造器和start方法。

  • 在Producer启动和初始化过程中, 会获取或者创建一个非常重要的对象MQClientInstance, MQClientInstance封装了RocketMQ底层网络处理API, 其本身位于通信层, 客户端层的Producer、Consumer都会使用到这个类, 是Producer、Consumer与NameServer、Broker 打交道的网络通道。

  • 在同一个应用中, 即使存在多个不同的consume和producer实例, 该MQClientInstance实例的start方法中的初始化操作也仅会被调用一次。

  • MQClientInstance的start方法将会初始化客户端的netty服务、启动各种定时任务(例如定时更新topic信息、定时发送心跳数据等等)、拉取消息服务(后面消费者模块再学习)、rebalanceService负载均衡服务(后面消费者模块再学习)、内部的生产者服务等等服务, 这些服务只需要启动一次就好。

  • 由于同一个应用中不同的consume和producer实例共享这个MQClientInstance实例, 因此MQClientInstance中的很多方法均需加锁。

生产者源码中的设计模式:

  1. 门面设计模式: DefaultMQProducer是我们使用的生产者, 但是方法几乎均委托给内部的DefaultMQProducerImpl来实现。
  2. 单例设计模式: 如MQClientManager, 典型的饿汉单例模式。
  3. 状态设计模式: MQClientInstance和DefaultMQProducerImpl中均有状态字段serviceState, 根据不同的状态方法进行不同的方法调用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/682307.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux】切换内核版本(Centos)

1.查看服务器已安装的所有内核版本 awk -F\ $1"menuentry " {print i " : " $2} /etc/grub2.cfg0 : CentOS Linux (3.10.0-1160.83.1.el7.x86_64) 7 (Core) 1 : CentOS Linux (3.10.0-1160.80.1.el7.x86_64) 7 (Core) 2 : CentOS Linux (3.10.0-1127.el7.…

求最小函数依赖集(例题讲解)超详细,易理解

在关系R<U,F>中&#xff0c;UABCDEG F{BG->C&#xff0c;BD->E,DG->C,ADG->BC,AG->B,B->D} 先进行第一大步&#xff1a; 先看右边&#xff1a; 如果有BG->C,G-->C,因为单G就可以推出C了就不需要BG--->C了&#xff0c;可以把BG--->C这个…

Segment Anything Model(SAM)

Segment Anything Model&#xff08;SAM&#xff09;是Facebook Research近来开源的一种新的图像分割任务、模型。Segment Anything Model&#xff08;SAM&#xff09;可以从输入提示&#xff08;如点或框&#xff09;生成高质量的对象掩模&#xff0c;并可用于生成图像中所有对…

1、一条 SQL 查询语句是如何执行的?

基础篇 第一课、一条 SQL 查询语句是如何执行的&#xff1f; 下面是 MySQL 的基本架构示意图&#xff0c;从中可以清楚地看到 SQL 语句在 MySQL 的各个功能模块中的执行过程。 大体来说&#xff0c;MySQL 可以分为 Server 层和存储引擎层两部分。 Server 层包括连接器、查询缓存…

AI近十年盘点:纵览AI发展历程,探寻AI未来走向

编者按&#xff1a;当我们回顾过去十年的人工智能发展历程时&#xff0c;可以看到一场现在还正在进行的变革&#xff0c;对我们的工作方式、商业运营模式和人际交往行为都产生了深远的影响。从2013年的AlexNet到变分自编码器&#xff0c;再到最近的生成式大模型&#xff0c;人工…

Vue 2 - 常见面试题汇总大全

文章目录 一 基础知识1.vue 的生命周期1-1 钩子函数1-2 钩子函数的使用方法1-3 父子组件生命周期构造函数执行顺序 2 v-show 与 v-if v-for2-1.v-show 与 v-if 的区别2-2 v-if 和v-for的优先级 3.组件通信有哪些方式&#xff1f;4 computed和watch &#xff0c;有什么区别4-1.c…

西门子S7-1200定时器

一、西门子S7-1200定时器介绍 S7-1200的定时器为IEC定时器&#xff0c;用户程序中可以使用的定时器数量仅仅受CPU的存储器容量限制。 使用定时器需要使用定时器相关的背景数据块或者数据类型为IEC_TIMER&#xff08;或TP_TIME、TON_TIME、TOF_TIME、TONR_TIME&#xff09;的D…

别再弄混流程挖掘、BPM和BI

信息时代以来&#xff0c;各类信息系统不仅在企业中得到了广泛的应用和普及&#xff0c;种类和选择伴随全球数字技术的发展也逐渐变得丰富多样。而随着数字化转型的不断深入&#xff0c;企业对系统管理的需求面临着全新的变化。 如何配置最适合当下业务的系统&#xff1f;这个…

Axure教程——图片轮播(纵向滚动)

本文介绍的时用Axure中的动态面板制作图片轮播之纵向滚动 一、预览效果 预览地址&#xff1a;https://c1s4i1.axshare.com 二、功能描述 图片纵向自动滚动播放 三、元件制作 拖入一个动态面板&#xff0c;命名为1&#xff0c;设置大小为375*155&#xff0c;并设置四个状态&…

文科生从0学Python转数据分析学习建议避坑指南

我本科是财务管理&#xff0c;文科专业&#xff0c;零基础学习Python转行数分后&#xff0c;现在我的日常工作都离不开它。 接下来&#xff0c;给各位跟我一样无编程经验的朋友一些学习的建议 目标导向&#xff1a;先搞清楚为啥要学 Python几乎可以做任何事&#xff0c;但我…

2023汽车供应链大会暨爱普搜采购商交流会

2023汽车供应链大会暨爱普搜采购商交流会 Automotive Supply Chain Conference & APSOTO Purchasing Exchange Meeting 补链强链 绿色发展--打造中国汽车供应链新生态 北京是我国汽车工业重地之一&#xff0c;产业实力雄厚&#xff0c;发展强劲。北京车企及零部件公司分布…

springboot高校宿舍报修管理系统计算机毕设 附源码83946

springboot高校宿舍报修管理系统 摘 要 科技进步的飞速发展引起人们日常生活的巨大变化&#xff0c;电子信息技术的飞速发展使得电子信息技术的各个领域的应用水平得到普及和应用。信息时代的到来已成为不可阻挡的时尚潮流&#xff0c;人类发展的历史正进入一个新时代。在现实…

VUE L 事件处理 ⑤

目录 文章有误请指正&#xff0c;如果觉得对你有用&#xff0c;请点三连一波&#xff0c;蟹蟹支持✨ V u e j s Vuejs Vuejs E v e n t j s Eventjs Eventjs总结 文章有误请指正&#xff0c;如果觉得对你有用&#xff0c;请点三连一波&#xff0c;蟹蟹支持✨ ⡖⠒⠒⠒⠤⢄⠀⠀⠀…

Java反射机制以及应用

Java反射机制以及应用 1、Java反射 在运行状态中&#xff0c;对于任意一个类&#xff0c;都能够获取到这个类的所有属性和方法&#xff0c;对于任意一个对象&#xff0c;都能够调用它的任意一个方法和属性(包括私有的方法和属性)&#xff0c;这种动态获取的信息以及动态调用对…

【推荐】免费PDF编辑修改软件排行榜Top11!

随着组织进入数字时代&#xff0c;他们需要处理大量数字文档&#xff0c;尤其是 PDF。PDF 是数字文档中使用最广泛的文件格式之一&#xff0c;能够编辑 PDF 文件至关重要。PDF编辑并不是任何软件都能轻松完成的。不过&#xff0c;网络上有大量免费、开源的 PDF 编辑软件可以帮助…

基于Springboot+vue+协同过滤+前后端分离+电子产品推荐系统(用户,多商户,管理员)+全套视频教程

基于Springbootvue协同过滤前后端分离电子产品推荐系统(用户,多商户,管理员)(毕业论文11000字以上,共33页,程序代码,MySQL数据库) 代码下载 链接&#xff1a;https://pan.baidu.com/s/1ckCKm4zKT0m4RqfHULYyOg 提取码&#xff1a;8888 【运行环境】Idea JDK1.8 Maven MySQL …

基于Servlet+JDBC实现的基础博客系统>>系列2 -- 前端基础页面

目录 1. 博客公共页面样式 2. 博客列表页 3. 博客详情页 4. 博客登录页 5. 博客编辑页 1. 博客公共页面样式 导航栏以及背景图设置 <body> <!-- 1.navigation 导航栏 --><div class"nav"><!-- logo --><img src"image/logo.png&q…

【深圳触觉智能技术分享】Purple Pi OH Android11 ROOT 方法

为了让应用程序可以直接调用su执行系统命令和获取root权限&#xff0c;本文基于Purple Pi OH主板的Android SDK&#xff0c;介绍如果修改和编译一个root版本的Android11系统,以下为sdk源码修改方法。 Purple Pi OH作为一款兼容树莓派的开源主板&#xff0c;采用瑞芯微RK3566 (…

【yocto2】利用yocto工具构建嵌入式Linux系统

1.定制化嵌入式linux系统 在实际项目中&#xff0c;一款嵌入式产品往往具有不同的硬件平台和软件需求&#xff0c;因此需要对嵌入式Linux系统进行定制&#xff0c;以满足不同的产品需求。之前的章节中基于Freescale官方提供的例程&#xff0c;构建了运行于imx6ull14x14evk硬件…

OpenCVForUnity(一)简介与插件

文章目录 前言:下载地址&#xff1a;官方文档/教学地址&#xff1a;主要模块的功能&#xff1a; 前言: 端午假期到了&#xff0c;最近我发现了一个非常有意思的课题——OpenCV&#xff0c;而我需要研究的则是OpenCVForUnity。它可以用于在Unity里处理图像、视频和摄像头数据&am…