全网最细RocketMQ源码二:Producer

news2025/1/11 22:39:41

入口

这里分析源码用的入口是:
org.apache.rocketmq.example.quickstart

package org.apache.rocketmq.example.quickstart;

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        /*
         * Instantiate with a producer group name.
         */
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */
        producer.setNamesrvAddr("127.0.0.1:9876");

        /*
         * Launch the instance.
         */
        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 */
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                /*
                 * Call send message to deliver message to one of brokers.
                 */
                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        /*
         * Shut down once the producer instance is not longer in use.
         */
        producer.shutdown();
    }
}

在发送消息的时候用的是DefaultMQProducer

DefaultMQProducer

这个就是我们业务层经常使用的对象,用来发送消息

public class DefaultMQProducer extends ClientConfig implements MQProducer {
    /**
     * Wrapping internal implementations for virtually all methods presented in this class.
     */
    // 生产者实现类对象
    protected final transient DefaultMQProducerImpl defaultMQProducerImpl;

    private final InternalLogger log = ClientLogger.getLog();
    /**
     * Producer group conceptually aggregates all producer instances of exactly same role, which is particularly
     * important when transactional messages are involved. </p>
     *
     * For non-transactional messages, it does not matter as long as it's unique per process. </p>
     *
     * See {@linktourl http://rocketmq.apache.org/docs/core-concept/} for more discussion.
     */
    // 生产者组(发送事务消息,broker端进行事务回查时,可以选择 当前 生产者组下的任意一个生产者 进行 事务回查)
    private String producerGroup;

    /**
     * Just for testing or demo program
     */
    // TBW102 :broker写死的主题队列信息,当发送消息指定的topic在 nm 未找到路由信息,则使用 该TBW102 作为 模板 去创建 主题发布信息。
    private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;

    /**
     * Number of queues to create per default topic.
     */
    // 默认broker每个主题 创建的 队列数量
    private volatile int defaultTopicQueueNums = 4;

    /**
     * Timeout for sending messages.
     */
    // 发送消息超时限制 默认:3s
    private int sendMsgTimeout = 3000;

    /**
     * Compress message body threshold, namely, message body larger than 4k will be compressed on default.
     */
    // 压缩阈值,当msg body 超过 “4k” 后,选择使用压缩。
    private int compressMsgBodyOverHowmuch = 1024 * 4;

    /**
     * Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p>
     *
     * This may potentially cause message duplication which is up to application developers to resolve.
     */
    // 同步发送失败之后,重试发送次数:2  再加上第一次发送 =》 3
    private int retryTimesWhenSendFailed = 2;

    /**
     * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
     *
     * This may potentially cause message duplication which is up to application developers to resolve.
     */
    // 异步发送失败之后,重试发送次数:2
    private int retryTimesWhenSendAsyncFailed = 2;

    /**
     * Indicate whether to retry another broker on sending failure internally.
     */
    // 消息未存储成功,是否选择其它broker节点进行消息重试? 一般需要设置为true。
    private boolean retryAnotherBrokerWhenNotStoreOK = false;

    /**
     * Maximum allowed message size in bytes.
     */
    // 消息体最大限制,默认值:4mb
    private int maxMessageSize = 1024 * 1024 * 4; // 4M

其中MQProducer是生产者接口,定义生产者发送消息的一些规范
在这里插入图片描述

ClientConfig是客户端配置类,继承ClientConfig说白了就是共享一些通用的客户端配置属性和功能
在这里插入图片描述

  • 构造方法
    在这里插入图片描述

  • start方法
    在这里插入图片描述
    最终还是调用的是defaultMQProducderImpl.start方法

  • send方法
    在这里插入图片描述
    最终调用的还是defaultMQProducderImpl.send方法

DefaultMQProducerImpl详解

public class DefaultMQProducerImpl implements MQProducerInner {
    private final InternalLogger log = ClientLogger.getLog();

    // 生成invokeID,无实际业务意义,打印日志使用
    private final Random random = new Random();

    // 生产者门面对象,在这里主要当做 config 使用。
    private final DefaultMQProducer defaultMQProducer;

    // 主题发布信息映射表
    // key:主题
    // value:主题的发布信息
    private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
        new ConcurrentHashMap<String, TopicPublishInfo>();

    // 发送消息的钩子,留给用户扩展框架使用的。
    private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();

    // rpc Hook 最终会传递给 NettyRemotingClient。 留给用户扩展框架使用的。
    private final RPCHook rpcHook;

    // 异步发送消息,异步任务线程池使用的队列
    private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;

    // 缺省的异步送法消息线程池
    private final ExecutorService defaultAsyncSenderExecutor;

    // 定时任务,执行:RequestFutureTable.scanExpiredRequest();
    private final Timer timer = new Timer("RequestHouseKeepingService", true);

    protected BlockingQueue<Runnable> checkRequestQueue;
    protected ExecutorService checkExecutor;

    // 状态
    private ServiceState serviceState = ServiceState.CREATE_JUST;

    // 客户端实例对象,生产者启动后 需要注册到该客户端对象内。(观察者模式)
    private MQClientInstance mQClientFactory;

    // 注意和 SendMessageHook 区别,它可以抛异常,控制消息 是否发送。
    private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();

    // zip 压缩算法 压缩级别,默认:5
    private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));

    // 选择队列容错策略
    private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();

    // 异步消息发送线程池,如果指定的话,就不在选择使用defaultAsyncSenderExecutor这个线程池了..
    private ExecutorService asyncSenderExecutor;
  • 构造方法
    在这里插入图片描述

  • start方法

 /**
     * 正常路径:startFactory => true
     */
    public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                // 修改状态为 启动失败,后面启动成功后会修改...
                this.serviceState = ServiceState.START_FAILED;

                // 判断生产者组名,不能是空,也不能是 “DEFAULT_PRODUCER”
                this.checkConfig();

                // 条件成立:说明当前生产者 不是 内部生产者 (什么是内部生产者?  处理消息回退这种情况使用的生产者 )
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    // 修改生产者实例名称为:当前进程的 PID
                    this.defaultMQProducer.changeInstanceNameToPID();
                }

                // todo 获取当前进程的RocketMQ 客户端实例对象
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

                // 将生产者自己注册到 RocketMQ 客户端实例内 (观察者模式)
                // todo 将生产者自己注册到rockemt 客户端实例
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                // 添加一个主题发布信息
                // key:TBW102   value:空对象
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());


                if (startFactory) {
                    // 启动RocketMQ 客户端实例对象 入口:
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                // 设置生产者实例状态为:运行中
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        // 强制RocketMQ 客户端实例 向 已知的broker节点 发送一次心跳。(讲 客户端定时任务时 再聊..)
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

        // request 发送的消息 需要 消费者 回执一条消息。
        // 怎么实现的呢?
        // 生产者 msg 加了一些信息  关联ID  客户端ID ,发送到 broker 之后
        // 消费者 从 broker 拿到 这条 消息,检查msg 类型 发现 是一个 需要回执的消息
        // 处理完消息 之后,根据 msg 关联ID 和 客户端ID  生成一条消息 (封装响应给 生产者的结果) 发送到 broker
        // Broker 拿到这条消息之后,它知道这是一条 回执消息,根据 客户端ID 找到 ch ,将消息 推送给 生产者。
        // 生产者 这边 拿到 回执消息之后,读取出来 关联ID 找到 对应的  RequestFuture ,将阻塞的线程 唤醒。
        // 类似于 生产者 和 消费者 之间 进行了 一次 RPC ,只不过 中间 由  broker 代理完成的。

        // 定时任务 处理 回执太慢的情况..
        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    RequestFutureTable.scanExpiredRequest();
                } catch (Throwable e) {
                    log.error("scan RequestFutureTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }
  • send方法
    在这里插入图片描述
 // 同步发送
    // 参数1:msg
    // 参数2:发送模式 (同步)
    // 参数3:回调  同步发送时,不需要提供这个参数。 只有异步时才需要!
    // 参数4:发送超时时间(默认 3秒 )
    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 确定生产者状态 是 运行中,否则抛异常。
        this.makeSureStateOK();

        // 判断消息规格
        Validators.checkMessage(msg, this.defaultMQProducer);

        // 生成一个 调用ID,打日志使用
        final long invokeID = random.nextLong();

        // 发送的初始时间
        long beginTimestampFirst = System.currentTimeMillis();

        // 本轮发送开始时间
        long beginTimestampPrev = beginTimestampFirst;

        // 本轮发送结束时间
        long endTimestamp = beginTimestampFirst;

        // 获取当前消息 主题的发布信息,需要依赖它里面的 MessageQueues 信息,选择 一个队列 后面去发送消息使用。
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            // 是否超时
            boolean callTimeout = false;
            // 选中的队列
            MessageQueue mq = null;
            // 异常
            Exception exception = null;
            // 发送结果
            SendResult sendResult = null;
            // timesTotal 发送总尝试次数,同步模式发送时:1 + "2" = 3      ,异步情况 重试次数: 1
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            // 当前第几次发送
            int times = 0;
            // 下标值 代表 发送的第几次, 值 代表 这次发送 选择的 brokerName
            String[] brokersSent = new String[timesTotal];

            // 循环发送,什么时候跳出循环?  1. 发送成功   2. 发送尝试次数 达到上限
            for (; times < timesTotal; times++) {
                // 上次发送时的 brokerName ,第一次发送时 lastBrokerName 值为 null,其它情况 就是 上次发送时的 BrokerName
                String lastBrokerName = null == mq ? null : mq.getBrokerName();

                // 从 主题发布信息中 选择一个 队列
                // 参数1:主题发布信息
                // 参数:上次发送失败的BrokerName
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

                // 条件成立:说明已经选择出来一个 可以 发送的 MessageQueue
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();

                    try {
                        // 记录 本轮发送的开始时间
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.
                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        // 计算执行到这里时的耗时,如果已经超过 timeout 限制,则直接不发送消息了,跳出循环。
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }

                        // 参数1:msg
                        // 参数2:mq,选择的队列
                        // 参数3:发送模式 (可选 同步 或者 异步 或者 单向)
                        // 参数4:异步发送时 需要传递一个 回调处理对象,同步 或者 单向时 这里为null
                        // 参数5:主题发布信息
                        // 参数6:计算出一个剩余的超时限制
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

                        // 本轮发送结束时间
                        endTimestamp = System.currentTimeMillis();

                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);


                        switch (communicationMode) {
                            // 异步 或者 单向,直接返回null
                            // 异步:返回值由sendCallback和回调线程处理。
                            // 单向:服务器不返回任何数据
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;

                            case SYNC:
                                // 条件成立:说明 服务端broker 存储失败..
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    // 消息未存储成功,是否选择其它broker节点进行消息重试? 一般需要设置为true。
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        // 重试
                                        continue;
                                    }
                                }
                                // 正常 从这里返回
                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }

                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());

                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }

            // 执行到这里 会有几种情况? 3种情况
            // 1. for循环尝试发送了 几次 都没能发送成功
            // 2. sendKernelImpl方法异常,此时 sendResult 是null
            // 3. 发送超时


            if (sendResult != null) {// 1. for循环尝试发送了 几次 都没能发送成功
                return sendResult;
            }

            // 2. sendKernelImpl方法异常,此时 sendResult 是null
            // 3. 发送超时

            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));

            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

            MQClientException mqClientException = new MQClientException(info, exception);

            if (callTimeout) {      // 3. 发送超时
                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
            }

            // 2. sendKernelImpl方法异常,此时 sendResult 是null
            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
            }

            throw mqClientException;
        }

        validateNameServerSetting();

        // 未找到当前主题的路由数据异常...没办法发送消息。
        throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }

总结一下做了几件事:

  1. 获取当前消息 主题的发布信息
    在这里插入图片描述

  2. 从主题发布信息中,选择一个队列
    在这里插入图片描述
    选的时候,用的算法就是一个自增值%队列数目

  3. 调用sendKernelImpl

 // 参数1:msg
    // 参数2:mq,选择的队列
    // 参数3:发送模式 (可选 同步 或者 异步 或者 单向)
    // 参数4:异步发送时 需要传递一个 回调处理对象,同步 或者 单向时 这里为null
    // 参数5:主题发布信息
    // 参数6:计算出一个剩余的超时限制
    private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 开始时间
        long beginStartTime = System.currentTimeMillis();
        // 获取指定brokerName的主机地址 master 节点 addr
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());

        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        // 执行sendMessageHook 时 使用的 context
        SendMessageContext context = null;
        if (brokerAddr != null) {

            // 如果客户端开启了 走 VIP 通道的话,broker地址的 端口 将是 VIP 通道的端口号。
            // broker 启动的时候,会绑定两个服务器端口,一个是 普通端口  一个是 VIP 端口。(服务器端 根据 不同端口 创建 的NioSocketChannel 提供 线程
            // 资源 不是同一组。)
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            // 获取消息体
            byte[] prevBody = msg.getBody();
            try {

                //for MessageBatch,ID has been set in the generating process
                if (!(msg instanceof MessageBatch)) {
                    /**
                     * msgId 由 前缀 + 内容 组成:
                     * 前缀
                     * ip 地址,进程号,classLoader 的 hashcode
                     * 内容
                     * 时间差(当前时间减去当月一日),计数器
                     */
                    // 给消息生成唯一ID,即在 msg.properties.put("UNIQ_KEY", "msgId")
                    // 服务器 broker 会给 消息 按照 UNIQ_KEY 建立一个 hash索引。
                    MessageClientIDSetter.setUniqID(msg);
                }

                boolean topicWithNamespace = false;
                if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                    topicWithNamespace = true;
                }


                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                // 返回true 说明消息 已经压缩了
                if (this.tryToCompressMessage(msg)) {
                    // 设置标记位 ,表明此条消息 被压缩过。
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }

                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }


                // 用户扩展点,用户可以注册 CheckForbiddenHook 控制消息发送。
                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }

                // 用户扩展点,执行 msgHook.before 方法。(比如实现 监控埋点...)
                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    context.setNamespace(this.defaultMQProducer.getNamespace());
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }


                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                // 生产者组
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                // 消息主题
                requestHeader.setTopic(msg.getTopic());
                // 缺省主题:TBW102
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                // 生产者主题队列数:4
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                // 选中的消息队列 队列ID
                requestHeader.setQueueId(mq.getQueueId());
                // 系统标记变量
                requestHeader.setSysFlag(sysFlag);
                // 消息创建时间
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                // 消息 flag
                requestHeader.setFlag(msg.getFlag());
                // 消息properties
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);

                // 消息重试的逻辑...以后讲消费时 再说。
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }


                // 发送结果
                SendResult sendResult = null;
                switch (communicationMode) {
                    case ASYNC:
                        Message tmpMessage = msg;
                        boolean messageCloned = false;
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                            msg.setBody(prevBody);
                        }

                        if (topicWithNamespace) {
                            if (!messageCloned) {
                                tmpMessage = MessageAccessor.cloneMessage(msg);
                                messageCloned = true;
                            }
                            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                        }

                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        // 单向 和 同步 都是走这里!

                        // 当前耗时,如果已经大于 timeout 限制,则抛异常,不再发送消息了...
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }

                        // 获取API对象,调用它的 发送方法,完成发送!
                        // 参数1:broker地址
                        // 参数2:brokerName
                        // 参数3:消息
                        // 参数4:SendMessageRequestHeader
                        // 参数5:剩余的超时限制
                        // 参数6:发送模式
                        // 参数7:context
                        // 参数8:生产者对象
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);

                        break;
                    default:
                        assert false;
                        break;
                }

                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
            } catch (RemotingException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (MQBrokerException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (InterruptedException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } finally {
                msg.setBody(prevBody);
                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
            }
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

这里做了几件事:

  1. 获取指定brokerName主机地址master节点地址,在MQClientInstance里面存放了broker 映射表,具体内容如下:在这里插入图片描述
    通过brokerName就知道了broker节点地址

  2. 创建了requestHeader,带了很多发送message的元数据

  3. 调用mQClientFactory.getMQClientAPIImpl().sendMessage()发送消息

MQClientInstance 详解

在sendMessage的过程中一直用到这个类,我们来讲讲这个类是做什么用

public class MQClientInstance {
    private final static long LOCK_TIMEOUT_MILLIS = 3000;
    private final InternalLogger log = ClientLogger.getLog();
    // 客户端配置
    private final ClientConfig clientConfig;
    // 索引值,一般是0,因为 客户端实例 一般都是一个进程 只有一个。
    private final int instanceIndex;
    // 客户端ID,ip@pid
    private final String clientId;
    // 客户端启动时间
    private final long bootTimestamp = System.currentTimeMillis();

    // 生产者 消费者 映射表,key:组名  value:生产者 或者 消费者
    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();

    // 客户端网络层(netty)配置
    private final NettyClientConfig nettyClientConfig;

    // 核心的一个API实现,它几乎包含了 所有 服务端的API,它的作用就是将 MQ业务层的数据 转换为 网络层 RemotingCommand 对象,
    // 然后使用内部的 NettyRemotingClient 的invoke 系列方法 完成 网络IO。
    private final MQClientAPIImpl mQClientAPIImpl;
    private final MQAdminImpl mQAdminImpl;


    // 客户端本地路由数据
    // key:主题名称  value:主题路由数据
    private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();

    private final Lock lockNamesrv = new ReentrantLock();
    private final Lock lockHeartbeat = new ReentrantLock();

    // broker 物理节点映射表
    // key: brokerName 逻辑层面的东西
    // value: map<long /* brokerId 0 的节点是 master,其它的是slave*/, String /* addr ip:port*/>
    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
        new ConcurrentHashMap<String, HashMap<Long, String>>();

    // broker 物理节点版本映射表
    // key:brokerName 逻辑层面的东西
    // value:map<String /* addr 物理节点地址*/, Integer /* 版本号 */>
    private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
        new ConcurrentHashMap<String, HashMap<String, Integer>>();

    // 单线程的调度线程池,用于执行定时任务
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "MQClientFactoryScheduledThread");
        }
    });

    // 客户端协议处理器,用于处理 IO 事件
    private final ClientRemotingProcessor clientRemotingProcessor;

    // 拉消息服务
    private final PullMessageService pullMessageService;
    // 消费者负载均衡服务
    // todo 消费者负载均衡 源头
    //  1、内部线程调用rebalanceService 2、查询 consumerTable中所有的消费者,然后调用消费者的doReblance 3、最后调用消费者的rebalanceImpl.doRebalance
    private final RebalanceService rebalanceService;

    // 内部生产者实例,用于处理 消费端 消息回退。
    private final DefaultMQProducer defaultMQProducer;
    private final ConsumerStatsManager consumerStatsManager;

    // 心跳次数统计
    private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);

    // RocketMQ 客户端状态
    private ServiceState serviceState = ServiceState.CREATE_JUST;

通过属性其实也看的出来,这个类做的几件事:

  1. 缓存生产者、消费者等数据:
    在这里插入图片描述
    在这里插入图片描述

其中DefaultMQProducerImpl就是MQProducerInner的实现类
3. 缓存客户端路由数据:ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable
在这里插入图片描述

  1. 缓存borker元数据:ConcurrentMap<String/* Broker Name /, HashMap<Long/ brokerId /, String/ address */>> brokerAddrTable
    在这里插入图片描述

  2. 客户端统一rpc:MQClientAPIImpl mQClientAPIImpl,使用他来发送消息的

MQClientAPIImpl

客户端与server通信的实现,通过名字就能看得出来

public class MQClientAPIImpl {

    private final static InternalLogger log = ClientLogger.getLog();
    private static boolean sendSmartMsg =
        Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));

    static {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    }
    // 客户端网络层对象,管理 客户端 与服务器 之间 连接 NioSocketChannel 对象。
    // 通过  它 提供的 invoke 系列方法,客户端可以与服务端进行远程调用。
    // 服务器 也可以 直接 调用 客户端
    private final RemotingClient remotingClient;

    private final TopAddressing topAddressing;

    private final ClientRemotingProcessor clientRemotingProcessor;

    private String nameSrvAddr = null;

还是继续之前的sendMessage流程:

  public SendResult sendMessage(
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        RemotingCommand request = null;
        String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
        boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
        if (isReply) {
            if (sendSmartMsg) {
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
            } else {
            // 在封装RemotingCommand, 很有意思的东西,后面学习网络通信的可以再研究
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
            }
        } else {
            if (sendSmartMsg || msg instanceof MessageBatch) {
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
            } else {
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
            }
        }
        // 将消息的消息体 放到 网络传输层的body中。
        request.setBody(msg.getBody());

        switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                // 当前耗时,如果已经超过 超时限制,则异常
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                // 进行同步调用,将消息传递到 broker ,broker完成存储后 或者其他 情况 都会返回。
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            default:
                assert false;
                break;
        }

        return null;
    }

在这里插入图片描述
这里会使用remotingClient.invokeSync(客户端网络通信)

在这里插入图片描述
在这里插入图片描述
最终会使用netty 将request writeAndFlush

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1374867.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构之排序二叉树

排序二叉树 基本概念 二叉树是一种从上往下的树状结构的数据结构&#xff0c;从根节点开始每个节点最多有两个子节点&#xff0c;左边的为左子节点&#xff0c;右边的为右子节点。 排序二叉树–有顺序&#xff0c;且没有重复元素的二叉树。顺序为&#xff1a; 对每个节点而…

CMake入门教程【高级篇】配置文件(configure_file)

😈「CSDN主页」:传送门 😈「Bilibil首页」:传送门 😈「动动你的小手」:点赞👍收藏⭐️评论📝 文章目录 1.configure_file作用2.详细使用说明3.完整代码示例4.实战使用技巧与注意事项5.总结分析1.configure_file作用

Netty-Netty组件了解

EventLoop 和 EventLoopGroup 回想一下我们在 NIO 中是如何处理我们关心的事件的&#xff1f;在一个 while 循环中 select 出事 件&#xff0c;然后依次处理每种事件。我们可以把它称为事件循环&#xff0c;这就是 EventLoop 。 interface io.netty.channel. EventLoo…

js中的class类

目录 class构造函数方法原型方法访问器方法静态方法 继承super minxin关于多态 class 在ES6中之前如果我们想实现类只能通过原型链和构造函数的形式&#xff0c;不仅难以理解步骤也十分繁琐 在ES6中推出了class关键字&#xff0c;它可以在js中定一个类&#xff0c;通过new来实…

力扣日记1.10-【二叉树篇】701. 二叉搜索树中的插入操作

力扣日记&#xff1a;【二叉树篇】701. 二叉搜索树中的插入操作 日期&#xff1a;2024. 参考&#xff1a;代码随想录、力扣 —————————————————————— 天哪&#xff0c;上次打开力扣还是2023&#xff0c;转眼已经2024&#xff1f;&#xff01; 两个星期过去…

2024.1.11

#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);speechnew QTextToSpeech(this);id1startTimer(1000);//设置文本到中间ui->sys_label->setAlignment(Qt:…

国产系统-银河麒麟桌面版V10安装字体-wps安装字体

安装系统:银河麒麟V10 demodemo-pc:~/桌面$ cat /proc/version Linux version 5.10.0-8-generic (builddfa379600e539) (gcc (Ubuntu 9.4.0-1kylin1~20.04.1) 9.4.0, GNU ld (GNU Binutils for Ubuntu) 2.34) #33~v10pro-KYLINOS SMP Wed Mar 22 07:21:49 UTC 20230.系统缺失…

智慧医院之定位导航解决方案

移动端LBS应用 通过绘制院方各楼栋各层平面图,利用无线/蓝牙技术可对终端进行实时定位,方便病人、家属等就医,提高就医体验,减少工作人员工作量,减少医患冲突,打造智慧医院。 移动端的LBS位置应用,可分为医院的室内地图展现、室内地图搜索、室内导航、室内定位、室内位…

TinyLlama-1.1B(小羊驼)模型开源-Github高星项目分享

简介 TinyLlama项目旨在在3万亿tokens上进行预训练&#xff0c;构建一个拥有11亿参数的Llama模型。经过精心优化&#xff0c;我们"仅"需16块A100-40G的GPU&#xff0c;便可在90天内完成这个任务&#x1f680;&#x1f680;。训练已于2023-09-01开始。项目地址&#…

档案统一管理的优点有哪些?

档案统一管理是一种有效的档案管理方式&#xff0c;能够提高档案资料的管理效率和利用价值&#xff0c;适用于各种组织和机构。 档案统一管理的优点包括&#xff1a; 1. 提高档案的管理效率&#xff0c;减少档案的丢失和遗漏。 2. 提升档案利用价值&#xff0c;方便用户查找和使…

01.11

#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent) :QWidget(parent),ui(new Ui::Widget) {ui->setupUi(this);//label1 时间snew QTimer(this);s->start(1000);//两个qt5连接//第一个连接为 timeout信号和timeout函数连接…

Java并发编程——伪共享和缓存行问题

在Java并发编程中&#xff0c;伪共享&#xff08;False Sharing&#xff09;和缓存行&#xff08;Cache Line&#xff09;是与多线程访问共享数据相关的两个重要概念。 伪共享指的是多个线程同时访问同一个缓存行中的不同变量或数据&#xff0c;其中至少一个线程对其中一个变…

Blazor快速开发框架Known-V2.0.0

Known2.0 Known是基于Blazor的企业级快速开发框架&#xff0c;低代码&#xff0c;跨平台&#xff0c;开箱即用&#xff0c;一处代码&#xff0c;多处运行。 官网&#xff1a;http://known.pumantech.comGitee&#xff1a; https://gitee.com/known/KnownGithub&#xff1a;ht…

docker微服务案例

文章目录 建立简单的springboot项目(boot3)boot2建立通过dockerfile发布微服务部署到docker容器编写Dockerfile打包成镜像运行镜像微服务 建立简单的springboot项目(boot3) 1.建立module 2. 改pom <?xml version"1.0" encoding"UTF-8"?> <…

Java面试之并发篇(二)

1、前言 本篇主要基于Java面试题之并发篇&#xff08;一&#xff09;继续梳理java中关于并发相关的高频面试题。本篇的面试题基于网络整理&#xff0c;和自己编辑。在不断的完善补充哦。 2、synchronized 的原理是什么? synchronized是 Java 内置的关键字&#xff0c;它提供…

【LeetCode:49. 字母异位词分组 | 哈希表】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

k8s--集群调度(kube-scheduler)

了解kube-scheduler 由之前博客可知kube-scheduler是k8s中master的核心组件之一 scheduler&#xff1a;负责调度资源。把pod调度到node节点。他有两种策略&#xff1a; 预算策略&#xff1a;人为部署&#xff0c;指定node节点去部署新建的pod 优先策略&#xff1a;通过算法选…

js 数据回调 异步 Promise

回调顺序 JavaScript 函数按照它们被调用的顺序执行。而不是以它们被定义的顺序。 js数据顺序问题 <!DOCTYPE html> <html> <body><h2>JavaScript 函数序列</h2><p>JavaScript 函数按照它们被调用的顺序执行。</p><p id"de…

社区团购配送超市与小程序的共赢之路

对于社区服务来说&#xff0c;搭建一个小程序可以提供更加便捷、高效的服务&#xff0c;提升用户体验。下面我们将详细介绍如何通过乔拓云第三方平台搭建一个社区团购小程序。 首先&#xff0c;你需要打开乔拓云第三方平台&#xff0c;这是一个专门为小程序开发提供的平台。在浏…

【HarmonyOS4.0】第九篇-ArkUI布局容器组件(一)

容器组件指的是它可以包含一个或多个子组件的组件&#xff0c;除了前边介绍过的公共属性外。 一、线性布局容器&#xff08;Row、Column&#xff09; 线性容器类表示按照水平方向或者竖直方向排列子组件的容器&#xff0c;ArkUI开发框架通过 Row 和 Colum 来实现线性布局。 …