RocketMQ 源码解析:生产者投递消息 DefaultMQProducer#send(一)

news2025/1/9 20:31:35

在这里插入图片描述

🔭 嗨,您好 👋 我是 vnjohn,在互联网企业担任 Java 开发,CSDN 优质创作者
📖 推荐专栏:Spring、MySQL、Nacos、Java,后续其他专栏会持续优化更新迭代
🌲文章所在专栏:RocketMQ
🤔 我当前正在学习微服务领域、云原生领域、消息中间件等架构、原理知识
💬 向我询问任何您想要的东西,ID:vnjohn
🔥觉得博主文章写的还 OK,能够帮助到您的,感谢三连支持博客🙏
😄 代词: vnjohn
⚡ 有趣的事实:音乐、跑步、电影、游戏

目录

  • 前言
  • Normal Message Send
    • SYNC
    • ASYNC
    • ONEWAY
    • sendDefaultImpl
  • tryToFindTopicPublishInfo
  • RPC Message Send
  • MessageQueue Message Send
  • MessageSelectorQueue Message Send
  • DefaultMQProducerImpl#sendKernelImpl
  • MQClientAPIImpl#sendMessage
  • 总结

前言

RocketMQ 专栏篇:

从零开始:手把手搭建 RocketMQ 单节点、集群节点实例

保护数据完整性:探索 RocketMQ 分布式事务消息的力量

RocketMQ 分布式事务消息实战指南:确保数据一致性的关键设计

RocketMQ 生产者源码分析:DefaultMQProducer、DefaultMQProducerImpl

RocketMQ MQClientInstance、生产者实例启动源码分析

RocketMQ 投递消息方式以及消息体结构分析:Message、MessageQueueSelector

在这里插入图片描述
在 send 方法中分为三种方式单向、同步、异步,而在异步方式投递消息时,采用的是异步线程池的方式去进行处理的,在初始化 DefaultMQProducerImpl 实例时,会构造一个默认的线程池,核心最大线程数为 CPU 核数,当然如果自定义了线程池就会采用自定义的线程池进行处理,如下:

// 初始化默认异步线程池
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
  Runtime.getRuntime().availableProcessors(),
  Runtime.getRuntime().availableProcessors(),
  1000 * 60,
  TimeUnit.MILLISECONDS,
  this.asyncSenderThreadPoolQueue,
  new ThreadFactory() {
    private AtomicInteger threadIndex = new AtomicInteger(0);

    @Override
    public Thread newThread(Runnable r) {
      return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
    }
});

// 选择自定义线程池或默认的线程池
public ExecutorService getAsyncSenderExecutor() {
  return null == asyncSenderExecutor ? defaultAsyncSenderExecutor : asyncSenderExecutor;
}

Send 方法还提供了更多选项,支持自定义选择的 MessageQueue 以及 MessageQueueSelector

MessageQueue:选择好 Topic 下指定的一个 MessageQueue 进行消息投递,而普通模式下是轮询或随机选择一个可用的 MessageQueue

MessageQueueSelector:通过自定义的 MessageSelector 按自定义算法将业务上需要按顺序执行的消息分配到同一个 MessageQueue 下进行存储

同时还提供 request 方法可以保证 RPC 通信方式,request-response 之间通过阻塞的方式等待消息投递以及消息处理成功

围绕在生产者、消费者两侧处理完成以后才返回.

Normal Message Send

当前消息属于普通消息时,比如采用同步、异步、单向

同步:send(Message msg)

异步:send(Message msg, SendCallback sendCallback)

单向:sendOneway(Message msg)

SYNC

/**
 * DEFAULT ASYNC -------------------------------------------------------
 */
public void send(Message msg,
                 SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
  send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}

同步方式:发送消息的超时时间 sendMsgTimeout,默认为 3 秒钟,支持自定义

ASYNC

public void send(final Message msg, final SendCallback sendCallback, final long timeout)
  throws MQClientException, RemotingException, InterruptedException {
  final long beginStartTime = System.currentTimeMillis();
  ExecutorService executor = this.getAsyncSenderExecutor();
  try {
    executor.submit(() -> {
      long costTime = System.currentTimeMillis() - beginStartTime;
      if (timeout > costTime) {
        try {
          sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
        } catch (Exception e) {
          sendCallback.onException(e);
        }
      } else {
        sendCallback.onException(
          new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
      }
    });
  } catch (RejectedExecutionException e) {
    throw new MQClientException("executor rejected ", e);
  }
}

异步方式:要自定义一个 SendCallback 回调函数来处理消息投递成功或失败时要做的处理,在异步情况下,会对消息进行重试两次.

ONEWAY

/**
 * DEFAULT ONEWAY -------------------------------------------------------
 */
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
  try {
    this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
  } catch (MQBrokerException e) {
    throw new MQClientException("unknown exception", e);
  }
}

单向方式:无返回值无回调,无论消息是否投递成功,不作任何处理.

sendDefaultImpl

从以上三种方式发送时,都同样调用了 sendDefaultImpl 方法,该方法就是如何处理普通消息处理的相关源码:

private SendResult sendDefaultImpl(
  Message msg,
  final CommunicationMode communicationMode,
  final SendCallback sendCallback,
  final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  // 检查生产者服务是否正常启动
  this.makeSureStateOK();
  // 校验 Topic 是否符合命名规则、Message 大小是否符合条件
  Validators.checkMessage(msg, this.defaultMQProducer);
  final long invokeID = random.nextLong();
  long beginTimestampFirst = System.currentTimeMillis();
  long beginTimestampPrev = beginTimestampFirst;
  long endTimestamp = beginTimestampFirst;
  // 优先从本地缓存中获取 Topic 元数据信息,不存在时在从 nameserver 中进行拉取
  TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
  if (topicPublishInfo != null && topicPublishInfo.ok()) {
    boolean callTimeout = false;
    MessageQueue mq = null;
    Exception exception = null;
    SendResult sendResult = null;
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    int times = 0;
    String[] brokersSent = new String[timesTotal];
    for (; times < timesTotal; times++) {
      String lastBrokerName = null == mq ? null : mq.getBrokerName();
      // 从 Topic 中选择好一个队列
      MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
      if (mqSelected != null) {
        mq = mqSelected;
        brokersSent[times] = mq.getBrokerName();
        try {
          beginTimestampPrev = System.currentTimeMillis();
          if (times > 0) {
            //Reset topic with namespace during resend.
            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
          }
          // 计算花费时间,是否调用超时
          long costTime = beginTimestampPrev - beginTimestampFirst;
          if (timeout < costTime) {
            callTimeout = true;
            break;
          }
          // 发起真正的请求
          sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
          endTimestamp = System.currentTimeMillis();
          this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
          switch (communicationMode) {
            case ASYNC:
              return null;
            case ONEWAY:
              return null;
            case SYNC:
              if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                // MQ 提供的容错机制,针对 sendLatencyFaultEnable 参数是否开启故障转移的功能
                // 当该参数未开启配置时,RetryAnotherBroker 参数是无意义的.
                // Producer:retryAnotherBrokerWhenNotStoreOK -> true、Broker:sendLatencyFaultEnable -> true  = OK
                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                  continue;
                }
              }
              return sendResult;
            default:
              break;
          }
        } catch (RemotingException e) {
          endTimestamp = System.currentTimeMillis();
          this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
          log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
          log.warn(msg.toString());
          exception = e;
          continue;
        } catch (MQClientException e) {
          endTimestamp = System.currentTimeMillis();
          this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
          log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
          log.warn(msg.toString());
          exception = e;
          continue;
        } catch (MQBrokerException e) {
          endTimestamp = System.currentTimeMillis();
          this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
          log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
          log.warn(msg.toString());
          exception = e;
          if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
            continue;
          } else {
            if (sendResult != null) {
              return sendResult;
            }
            throw e;
          }
        } catch (InterruptedException e) {
          endTimestamp = System.currentTimeMillis();
          this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
          log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
          log.warn(msg.toString());

          log.warn("sendKernelImpl exception", e);
          log.warn(msg.toString());
          throw e;
        }
      } else {
        break;
      }
    }
    if (sendResult != null) {
      return sendResult;
    }

    String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                                times,
                                System.currentTimeMillis() - beginTimestampFirst,
                                msg.getTopic(),
                                Arrays.toString(brokersSent));

    info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

    MQClientException mqClientException = new MQClientException(info, exception);
    if (callTimeout) {
      throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
    }

    if (exception instanceof MQBrokerException) {
      mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
    } else if (exception instanceof RemotingConnectException) {
      mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
    } else if (exception instanceof RemotingTimeoutException) {
      mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
    } else if (exception instanceof MQClientException) {
      mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
    }

    throw mqClientException;
  }
  // 校验 nameserver 配置是否存在,不存在抛出异常:No name server address, please set it.
  validateNameServerSetting();
  // 否则提示无法找到该 Topic
  throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
                              null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

CommunicationMode:SYNC、ASYNC、ONEWAY

  1. 检查生产者服务是否正常启动,其实就是是否调用了 start 方法;校验 Topic 是否符合命名规则、Message 大小是否符合条件

    Topic 命名规则对应的正则:regex: ^[%|a-zA-Z0-9_-]+$

    Message 字节大小超过:1024 * 1024 * 4,也不可满足.

  2. 调用 DefaultMQProducerImpl#tryToFindTopicPublishInfo 方法从 nameserver 中获取到 Topic、MessageQueue 元数据信息,主要目的是为了从其中选择一个 MessageQueue 进行消息的存储,将消息落盘到 Topic 指定的一个 MessageQueue 下

  3. 获取 Topic、MessageQueue 元数据信息成功以后,判别当前属于什么模式

    若当前模式属于同步发送时,会将当前循环遍历的总次数设为 3,当消息投递出现异常时,确保能够通过重试的方式将消息进行可靠存储

    
    
  4. 通过 DefaultMQProducerImpl#selectOneMessageQueue 方法轮询或随机选择 Topic 其中一个 MessageQueue 进行消息投递

  5. 最终通过 DefaultMQProducerImpl#sendKernelImpl 方法去组装请求头以及消息体向 Broker 发起请求,该方法的实现后续展开.

  6. 只有同步模式情况下才会一直等待投递结果的到达,分为以下几种情况对消息是否进行重新投递,如下:

    1. 当 Broker 返回不是 SendStatus.SEND_OK 时,判别生产者是否开启 retryAnotherBrokerWhenNotStoreOK「是否在内部重试失败时重试另外一个」 参数配置,未开启时,当前消息就不会投递成功会直接返回给客户端投递失败的结果,开启时还有另外的限制,如下:

      当开启 retryAnotherBrokerWhenNotStoreOK 参数的目的是为了转换 Broker 进行消息的投递,开启它时必须保证 sendLatencyFaultEnable「故障转移的功能」 参数也是开启的,Producer:retryAnotherBrokerWhenNotStoreOK -> true、Broker:sendLatencyFaultEnable -> true = OK,然后在 DefaultMQProducerImpl#selectOneMessageQueue 方法选择 MessagQueue 会转换目标,选择另外一个可用的 Broker 进行消息投递

    2. 当处理消息持久化出现异常:RemotingException、MQClientException、MQBrokerException 时,会进行重试,默认的重试次数为 3

      当出现 MQBrokerException 异常时,在 Broker 端处理时出现如下错误码说明是要进行重试处理的,其他的异常错误码会直接抛出不作后续的工作

      this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode()
      retryResponseCodes = [
      	ResponseCode.TOPIC_NOT_EXIST,
      	ResponseCode.SERVICE_NOT_AVAILABLE,
      	ResponseCode.SYSTEM_ERROR,
      	ResponseCode.NO_PERMISSION,
      	ResponseCode.NO_BUYER_ID,
      	ResponseCode.NOT_IN_CURRENT_UNIT
      
      

tryToFindTopicPublishInfo

在这里插入图片描述
投递的消息是否会向 broker 发起远程调用请求,先要看 Topic 是否存在以及 Broker 参数的配置,分为以下几种情况:

  1. 当 Topic 不存在时,Broker 中开启了 autoCreateTopicEnable 参数配置时,那么会返回默认 Topic:TBW102 元数据信息返回给生产者,默认的读写队列数为 4,未来当请求到达 broker 端时,它会创建当前真实的 Topic 以及队列数
  2. 当 Topic 不存在时,Broker 中未开启 autoCreateTopicEnable 参数配置,在 sendDefaultImpl 方法时就会直接抛出:No route info of this topic: 异常.
  3. 当 Topic 存在时,直接将该 Topic 信息以及队列数返回即可.

核心的源码如下:

/**
 * 优先从本地缓存中获取 Topic 元数据信息,不存在时在从 nameserver 中进行拉取
 * MQInstance 会调用 updateTopicPublishInfo 方法更新本地的 Topic 对应的元数据信息
 * SendMessageProcessor#preSend 该方法
 *
 * @param topic 主题
 * @return 发布的 Topic 信息:Topic Queue、Topic 元数据
*/
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
  TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
  if (null == topicPublishInfo || !topicPublishInfo.ok()) {
    this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
    // 从 nameserver 获取目标 topic 路由信息
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
  }
  // 当 Topic 信息为空时,说明该 Topic 处于未创建的状态,就会将默认 Topic:TBW102 赋值过来
  // 当开启自动创建 Topic 时,等到 Broker 处理消息时再调用 SendMessageProcessor#preSend 方法去进行创建
  if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
    return topicPublishInfo;
  } else {
    // 从 nameserver 获取默认 topic 路由信息
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
    return topicPublishInfo;
  }
}

RPC Message Send

RPC 方式就是将生产者与消费者之间采用 RPC 方式通信,当生产者生产完消息以后必须等到消费者将消息消费完成,然后通知到你生产者侧才算整个流程完成。

RPC :Message request(final Message msg, final long timeout)

如下是通过 request 方法 RPC 调用的源码:

public Message request(Message msg,
                       long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {
  long beginTimestamp = System.currentTimeMillis();
  prepareSendRequest(msg, timeout);
  final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);

  try {
    final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
    RequestFutureHolder.getInstance().getRequestFutureTable().put(correlationId, requestResponseFuture);

    long cost = System.currentTimeMillis() - beginTimestamp;
    this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
      @Override
      public void onSuccess(SendResult sendResult) {
        requestResponseFuture.setSendRequestOk(true);
      }

      @Override
      public void onException(Throwable e) {
        requestResponseFuture.setSendRequestOk(false);
        requestResponseFuture.putResponseMessage(null);
        requestResponseFuture.setCause(e);
      }
    }, timeout - cost);

    return waitResponse(msg, timeout, requestResponseFuture, cost);
  } finally {
    RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId);
  }
}

每当通过 request 发出消息以后会实例化一个 RequestResponseFuture 对象,它内部有一个 CountDownLatch 长度为 1 的属性,用于 waitResponse 方法,它是采用异步的方式将消息发送到 Broker 端,有一个回调机制去处理消费者侧对该条消息作出的回复反应.

当消费者端处理完消息的业务逻辑以后通过 MessageUtil#createReplyMessage 方法构建出回复消息以后,Broker 端会通过 ReplyMessageProcessor#processReplyMessageRequest 方法处理这种回复消息,最终 Broker 端会通过 ReplyMessageProcessor#pushReplyMessage 方法向生产者端推送一条消息,生产者客户端通过 ClientRemotingProcessor#processReplyMessage 方法处理回复的消息,最终就是会 RequestResponseFuture 对象里属性信息:requestResponseFuture.putResponseMessage(replyMsg)

Broker 有一个配置项,参数:storeReplyMessageEnable 是否开启存储回复消息

当开启时,每次的回复消息都会被持久化起来

在生产者通过 request 方法投递消息时,会一直阻塞直到消费者将消息处理完再把回复的消息发出来,它在此期间会一直等待,观察 waitResponse 方法的源码一目了然:

private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture,
                             long cost) throws InterruptedException, RequestTimeoutException, MQClientException {
  // requestResponseFuture.waitResponseMessage = countDownLatch.await(timeout, TimeUnit.MILLISECONDS)
  Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
  if (responseMessage == null) {
    if (requestResponseFuture.isSendRequestOk()) {
      throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
                                        "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
    } else {
      throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
    }
  }
  return responseMessage;
}

MessageQueue Message Send

send 方法带有 MessageQueue 参数时,说明它是要将消息直接存储到某一个指定的 Queue 中,类似我们在投递 Kafka 时只往一个 partition 中塞入消息一样的道理.

/**
 * KERNEL SYNC -------------------------------------------------------
 */
public SendResult send(Message msg, MessageQueue mq)
  throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  return send(msg, mq, this.defaultMQProducer.getSendMsgTimeout());
}

public SendResult send(Message msg, MessageQueue mq, long timeout)
  throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  long beginStartTime = System.currentTimeMillis();
  this.makeSureStateOK();
  Validators.checkMessage(msg, this.defaultMQProducer);
  // 投递消息 Topic != 指定 MessageQueue 所在的 Topic
  if (!msg.getTopic().equals(mq.getTopic())) {
    throw new MQClientException("message's topic not equal mq's topic", null);
  }
  long costTime = System.currentTimeMillis() - beginStartTime;
  if (timeout < costTime) {
    throw new RemotingTooMuchRequestException("call timeout");
  }
  return this.sendKernelImpl(msg, mq, CommunicationMode.SYNC, null, null, timeout);
}

在此处只是对生产者服务状态以及 Topic 命名规则、消息体大小进行校验,随机就采用同步的方式调用 sendKernelImpl 方法进行后续处理,同时它也支持异步、单向的方式进行投递.

MessageSelectorQueue Message Send

通过自定义 MessageSelectorQueue#select 算法选举 Messag Queue 将消息投递到指定的 Message Queue 中,它的相关源码如下:

/**
 * SELECT SYNC -------------------------------------------------------
 */
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
  throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout());
}

public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
  throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
}

private SendResult sendSelectImpl(
  Message msg,
  MessageQueueSelector selector,
  Object arg,
  final CommunicationMode communicationMode,
  final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  long beginStartTime = System.currentTimeMillis();
  this.makeSureStateOK();
  Validators.checkMessage(msg, this.defaultMQProducer);
  // 获取 Topic 元数据、MessageQueue 信息
  TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
  if (topicPublishInfo != null && topicPublishInfo.ok()) {
    MessageQueue mq = null;
    try {
      List<MessageQueue> messageQueueList =
        mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
      Message userMessage = MessageAccessor.cloneMessage(msg);
      String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
      userMessage.setTopic(userTopic);
      // 调用自定义 MessageQueueSelector#select 方法选中一个 MessageQueue
      mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
    } catch (Throwable e) {
      throw new MQClientException("select message queue threw exception.", e);
    }

    long costTime = System.currentTimeMillis() - beginStartTime;
    if (timeout < costTime) {
      throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
    }
    if (mq != null) {
      return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
    } else {
      throw new MQClientException("select message queue return null.", null);
    }
  }
  validateNameServerSetting();
  throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}

核心方法:sendSelectImpl,它处理的工作分为以下几步走:

  1. 校验生产者服务状态以及 Topic 命名规则、消息体大小是否满足条件
  2. 通过 DefaultMQProducerImpl#tryToFindTopicPublishInfo 方法从 nameserver 中获取到 Topic、Message Queue 元数据信息
  3. 解析获取到的 MessageQueue 集合,再调用自定义的 MessageQueueSelector#select 方法按规则选中一个 MessageQueue
  4. 最终通过 DefaultMQProducerImpl#sendKernelImpl 方法去组装请求头以及消息体向 Broker 发起请求

同时它也支持异步、单向的方式进行投递.

DefaultMQProducerImpl#sendKernelImpl

无论是单向、同步、异步,指定 MessageQueue、MessageQueueSelector,它最终都是指向的 DefaultMQProducerImpl#sendKernelImpl 方法,接下来我们就分析这个方法的源码做了那一些事情.

/**
 * @param msg               消息内容
 * @param mq                存储 MessageQueue
 * @param communicationMode 交流模式:单向、同步、异步
 * @param sendCallback      异步时要处理的回调函数
 * @param topicPublishInfo  Topic 元数据信息
 * @param timeout           可用时间
 * @return 投递消息的结果
 */
private SendResult sendKernelImpl(final Message msg,
                                  final MessageQueue mq,
                                  final CommunicationMode communicationMode,
                                  final SendCallback sendCallback,
                                  final TopicPublishInfo topicPublishInfo,
                                  final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  long beginStartTime = System.currentTimeMillis();
  // 获取 MASTER Broker 节点地址
  String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  if (null == brokerAddr) {
    tryToFindTopicPublishInfo(mq.getTopic());
    brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  }
  SendMessageContext context = null;
  if (brokerAddr != null) {
    brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
    byte[] prevBody = msg.getBody();
    try {
      //for MessageBatch,ID has been set in the generating process
      // 当前类型属于批次消息时,由 MessageClientIDSetter 生成 UNIQ_KEY 属性
      if (!(msg instanceof MessageBatch)) {
        MessageClientIDSetter.setUniqID(msg);
      }
      boolean topicWithNamespace = false;
      if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
        msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
        topicWithNamespace = true;
      }
      int sysFlag = 0;
      boolean msgBodyCompressed = false;
      // 批次消息不支持压缩,单条消息超过 4K 会进行压缩
      if (this.tryToCompressMessage(msg)) {
        sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
        msgBodyCompressed = true;
      }
      // 事务消息设置 sysFlag = 4
      final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
      if (Boolean.parseBoolean(tranMsg)) {
        sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
      }
      if (hasCheckForbiddenHook()) {
        CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
        checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
        checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
        checkForbiddenContext.setCommunicationMode(communicationMode);
        checkForbiddenContext.setBrokerAddr(brokerAddr);
        checkForbiddenContext.setMessage(msg);
        checkForbiddenContext.setMq(mq);
        checkForbiddenContext.setUnitMode(this.isUnitMode());
        this.executeCheckForbiddenHook(checkForbiddenContext);
      }
      // 当生产者实例化时开启了 enableMsgTrace 消息追踪时会触发钩子之前的方法调用
      if (this.hasSendMessageHook()) {
        context = new SendMessageContext();
        context.setProducer(this);
        context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        context.setCommunicationMode(communicationMode);
        context.setBornHost(this.defaultMQProducer.getClientIP());
        context.setBrokerAddr(brokerAddr);
        context.setMessage(msg);
        context.setMq(mq);
        context.setNamespace(this.defaultMQProducer.getNamespace());
        // 事务半消息
        String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (isTrans != null && isTrans.equals("true")) {
          context.setMsgType(MessageType.Trans_Msg_Half);
        }
        // 延迟消息
        if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
          context.setMsgType(MessageType.Delay_Msg);
        }
        this.executeSendMessageHookBefore(context);
      }
      // 组装发送请求头:Topic、QueueId
      SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
      requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
      requestHeader.setTopic(msg.getTopic());
      requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
      requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
      requestHeader.setQueueId(mq.getQueueId());
      requestHeader.setSysFlag(sysFlag);
      requestHeader.setBornTimestamp(System.currentTimeMillis());
      requestHeader.setFlag(msg.getFlag());
      requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
      requestHeader.setReconsumeTimes(0);
      requestHeader.setUnitMode(this.isUnitMode());
      // 判别是否批次发送的方式
      requestHeader.setBatch(msg instanceof MessageBatch);
      // 重试 Topic:设置重试次数
      if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
        String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
        if (reconsumeTimes != null) {
          requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
          MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
        }
        String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
        if (maxReconsumeTimes != null) {
          requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
          MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
        }
      }
      SendResult sendResult = null;
      // 判别发送方式:异步、同步、单向发送,最终都是调用 MQClientAPIImpl#sendMessage 方法
      switch (communicationMode) {
        case ASYNC:
          Message tmpMessage = msg;
          boolean messageCloned = false;
          if (msgBodyCompressed) {
            // If msg body was compressed, msgbody should be reset using prevBody.
            // 使用压缩消息体克隆新消息并恢复原始信息
            // Fix bug:https://github.com/apache/rocketmq-externals/issues/66
            tmpMessage = MessageAccessor.cloneMessage(msg);
            messageCloned = true;
            msg.setBody(prevBody);
          }
          if (topicWithNamespace) {
            if (!messageCloned) {
              tmpMessage = MessageAccessor.cloneMessage(msg);
              messageCloned = true;
            }
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
          }
          long costTimeAsync = System.currentTimeMillis() - beginStartTime;
          if (timeout < costTimeAsync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
          }
          sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            tmpMessage,
            requestHeader,
            timeout - costTimeAsync,
            communicationMode,
            sendCallback,
            topicPublishInfo,
            this.mQClientFactory,
            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
            context,
            this);
          break;
        case ONEWAY:
        case SYNC:
          long costTimeSync = System.currentTimeMillis() - beginStartTime;
          if (timeout < costTimeSync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
          }
          sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            msg,
            requestHeader,
            timeout - costTimeSync,
            communicationMode,
            context,
            this);
          break;
        default:
          assert false;
          break;
      }
      // 当生产者实例化时开启了 enableMsgTrace 消息追踪时会触发钩子之后的方法调用
      if (this.hasSendMessageHook()) {
        context.setSendResult(sendResult);
        this.executeSendMessageHookAfter(context);
      }
      return sendResult;
    } catch (RemotingException | MQBrokerException | InterruptedException e) {
      if (this.hasSendMessageHook()) {
        context.setException(e);
        this.executeSendMessageHookAfter(context);
      }
      throw e;
    } finally {
      msg.setBody(prevBody);
      msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
    }
  }
  throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
  1. 首先获取 Broker-Master 地址,当获取不到时,重新从 nameserver 中进行一次 Topic、MessageQueue 元数据拉取请求,请求返回的这些信息中包含了 Broker 信息,会更新 ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable 集合元素.

  2. 若当前消息类型属于批次消息-MessageBatch 发送时,由 MessageClientIDSetter 生成 UNIQ_KEY 属性

  3. 判断消息大小是否超过 4K,超过了 4K 在此处会对其进行压缩操作,在判断是否属于事务消息类型:TRAN_MSG,是则更新其 sysFlag 标记值

  4. 组装 SendMessageRequestHeader 发送消息体,设置其批次类型、重试次数.

  5. 判别其发送方式:异步、同步、单向发送,在后面最终调用的都是 MQClientAPIImpl#sendMessage 方法

    • 当发送方式为异步时,先判别其发送的消息是否被压缩过,若被压缩过,将其消息 Message 深拷贝为一个 Message 对象,再调用 sendMessage 方法

      在异步方式投递消息时,会出现失败的情况,在对其进行重试时,不是在当前 sendKernelImpl 方法进行重试的,而在当前方法的 finally 块时,会对其进行重新赋值,赋值为未压缩之前的 Message 对象,所以,为了避免在异步发送失败进行重试时所投递的消息不是未被压缩过的,故而言之在这里采用了深拷贝赋值,具体的更多描述

      Fix bug:https://github.com/apache/rocketmq-externals/issues/66

    • 当发送方式为单向、同步时,直接调用 sendMessage 方法

MQClientAPIImpl#sendMessage

MQClientAPIImpl#sendMessage 方法也是所有投递消息的方式都要进行调用的方法,该方法的源码如下:

public SendResult sendMessage(
  final String addr,
  final String brokerName,
  final Message msg,
  final SendMessageRequestHeader requestHeader,
  final long timeoutMillis,
  final CommunicationMode communicationMode,
  final SendCallback sendCallback,
  final TopicPublishInfo topicPublishInfo,
  final MQClientInstance instance,
  final int retryTimesWhenSendFailed,
  final SendMessageContext context,
  final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
  long beginStartTime = System.currentTimeMillis();
  RemotingCommand request = null;
  String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
  // RocketMQ 4.6.0 版本中,新增的 Request-Reply 特性,消费者通过 MessageUtil.createReplyMessage 组装 replyMessage 再通过生产者发送.
  // 允许在调用 Producer#request 方法时以同步或异步的方式等待 consumer 消息完成以后返回响应消息,类似 RPC 调用效果
  boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
  // SendMessageRequestHeaderV2 对比普通的 SendMessageRequestHeader 来说,缩短了字段名长度,可以加载序列化、反序列化的速度
  if (isReply) {
    if (sendSmartMsg) {
      SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
      request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
    } else {
      request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
    }
  } else {
    if (sendSmartMsg || msg instanceof MessageBatch) {
      SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
      request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
    } else {
      request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
    }
  }
  request.setBody(msg.getBody());
  switch (communicationMode) {
    case ONEWAY:
      this.remotingClient.invokeOneway(addr, request, timeoutMillis);
      return null;
    case ASYNC:
      final AtomicInteger times = new AtomicInteger();
      long costTimeAsync = System.currentTimeMillis() - beginStartTime;
      if (timeoutMillis < costTimeAsync) {
        throw new RemotingTooMuchRequestException("sendMessage call timeout");
      }
      this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, context, producer);
      return null;
    case SYNC:
      long costTimeSync = System.currentTimeMillis() - beginStartTime;
      if (timeoutMillis < costTimeSync) {
        throw new RemotingTooMuchRequestException("sendMessage call timeout");
      }
      return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
    default:
      assert false;
      break;
  }
  return null;
}

该方法的处理过程大致分为以下几步,如下:

  1. 先获取 Message 的属性:MSG_TYPE,若其属性值为 reply,说明其是采用的 MQ RPC 的方式投递消息的,在生产者侧通过 request 方法调用,在一直阻塞,在消费者接收到该消息时并且处理完以后,再发送一条回复消息给到 Broker,Broker 再将回复消息的内容通知给到生产者端,在不超时的情况下,生产者端正常结束处理.

    属于 RPC 方式时,发出的请求码为 SEND_REPLY_MESSAGE 或 SEND_REPLY_MESSAGE_V2(324、325)

    不采用 RPC 方式时,发出的请求码为 SEND_MESSAGE、SEND_MESSAGE_V2、SEND_BATCH_MESSAGE(10、310、320)

    带 _V2 后缀的与不带的区别只是缩短了字段名长度,可以加快序列化、反序列化的速度,如下代码所示:

    public static SendMessageRequestHeaderV2 createSendMessageRequestHeaderV2(final SendMessageRequestHeader v1) {
      SendMessageRequestHeaderV2 v2 = new SendMessageRequestHeaderV2();
      v2.a = v1.getProducerGroup();
      v2.b = v1.getTopic();
      v2.c = v1.getDefaultTopic();
      v2.d = v1.getDefaultTopicQueueNums();
      v2.e = v1.getQueueId();
      v2.f = v1.getSysFlag();
      v2.g = v1.getBornTimestamp();
      v2.h = v1.getFlag();
      v2.i = v1.getProperties();
      v2.j = v1.getReconsumeTimes();
      v2.k = v1.isUnitMode();
      v2.l = v1.getMaxReconsumeTimes();
      v2.m = v1.isBatch();
      return v2;
    }
    
  2. 发送方式不同,在此处调用的方法也各有不同,当属于单向发送时,调用 NettyRemotingClient#invokeOneway 方法;当属于异步发送时,调用 MQClientAPIImpl#sendMessageAsync 以及 NettyRemotingClient#invokeAsync 方法,此处会传入当异步发送失败时,要进行重试的次数:2,它在 sendMessageAsync 方法内进行执行,后续在介绍;当属于同步方式时,调用 NettyRemotingClient#invokeSync 方法

总结

该篇博文主要先简单分析几种不同方式的投递消息方式:SYNC、ASYNC、ONEWAY,以及支持通过指定的 MessageQueue、MessageQueueSelector 方式来对消息进行投递,同步模式投递情况下是通过 DefaultMQProducerImpl#sendDefaultImpl 方法进行重试的,而在异步模式是通过 MQClientAPIImpl#onExceptionImpl 方法重试的,这在后面一篇文章展开介绍,在消息发送之前,会对消息进行压缩,确保客户端与服务端之间交互的数据包大小是最小的,关于 NettyRemotingClient#invokeOneway、MQClientAPIImpl#sendMessageAsync、MQClientAPIImpl#sendMessageSync 方法在下篇文章再具体介绍.

在这里插入图片描述

博文放在 RocketMQ 专栏里,欢迎订阅,会持续更新!

如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

推荐专栏:Spring、MySQL,订阅一波不再迷路

大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1394535.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

区间预测 | Matlab实现GRU-Adaboost-ABKDE的集成门控循环单元自适应带宽核密度估计多变量回归区间预测

区间预测 | Matlab实现GRU-Adaboost-ABKDE的集成门控循环单元自适应带宽核密度估计多变量回归区间预测 目录 区间预测 | Matlab实现GRU-Adaboost-ABKDE的集成门控循环单元自适应带宽核密度估计多变量回归区间预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.Matlab实…

Linux shell美化 zsh+oh-my-zsh+power10k

文章目录 安装zsh安装on-my-zsh安装power10k主题安装power10k将oh-my-zsh主题改为power10k字体 设置安装字体配置字体 power10k配置相关插件安装zsh-autosuggestionszsh-syntax-highlighting安装插件完成&#xff0c;重新加载配置文件 美化效果示意&#xff1a; 安装zsh 安装…

连接超时的问题

连接超时的问题 通用第三方工具连接超时 connect timeout 方案一&#xff1a; /etc/ssh/sshd_config node1上操作&#xff0c;图是错的 方案二&#xff1a; windows上Hosts文件域名解析有问题 比如&#xff1a; 192.168.xx.100 node1 192.168.xx.161 node1 两个都解析成node…

Failed to start OpenSSH server daemon-SSH启动失败

一、SSH服务启动失败 或者报错误&#xff1a; journalctl -xe sshd.service 二、查看SSHD的服务状态 3、重新安装openssh [rootzbx ~]# yum -y remove openssh 卸载原来的 [rootzbx ~]# yum -y install openssh openssh-clients openssh-server 重新安装 [rootzbx ~]# system…

阿里云云原生助力安永创新驱动力实践探索

云原生正在成为新质生产力变革的核心要素和企业创新的数字基础设施。2023 年 12 月 1 日&#xff0c;由中国信通院举办的“2023 云原生产业大会”在北京召开。在大会“阿里云云原生”专场&#xff0c;安永科技咨询合伙人王祺分享了对云原生市场的总览及趋势洞见&#xff0c;及安…

代码随想录算法训练营第三十六天 | 435.无重叠区间、763.划分字母区间、56.合并区间

435.无重叠区间 题目链接&#xff1a;435.无重叠区间 给定一个区间的集合 intervals &#xff0c;其中 intervals[i] [starti, endi] 。返回 需要移除区间的最小数量&#xff0c;使剩余区间互不重叠 。 文章讲解/视频讲解&#xff1a;https://programmercarl.com/0435.%E6%9…

Git学习笔记(第2章):Git安装

官网地址&#xff1a;Githttps://git-scm.com/ Step1&#xff1a;查看Git的GNU协议 → 点击“Next” Step2&#xff1a;设置Git的安装位置(非中文、无空格的目录) → 点击“Next” Step3&#xff1a;选择Git的选项配置(推荐默认设置) → 点击“Next” Step4&#xff1a;设置Git…

What is `addFormattersdoes` in `WebMvcConfigurer` ?

addFormatters 方法在SpringMVC框架中主要用于向Spring容器注册自定义的格式化器&#xff08;Formatter&#xff09; SpringMVC内置了一系列的标准格式化器&#xff0c;用于处理日期、数字和其他常见类型的转换。 开发者也可以通过实现 WebMvcConfigurer 接口&#xff0c;并重写…

笔记本电脑如何连接显示屏?

目录 1.按下快捷键 winP,选择扩展 2.连接显示器&#xff0c;连好接线 3.笔记本驱动有问题&#xff0c;显示错误如下&#xff1a; 4.驱动已经下载完成&#xff0c; 按下快捷键&#xff0c;还是显示第3步中的错误 5.驱动已经下载完成&#xff0c; 按下快捷键&#xff0c;参照…

Page268~270 11.3.4 wxWidgets项目配置

项目w28_gui的项目配置&#xff1a; 一&#xff0c;编译选项&#xff0c; -pipe -mthreads [[if (GetCompilerFactory().GetCompilerVersionString(_T("gcc")) > _T("4.8.0")) print(_T("-Wno-unused-local-typedefs"));]] 1, -pipe&#…

京东获得JD商品详情 API (jd.item_get):电商发展中的中重要性

数据整合与同步&#xff1a;对于许多电商企业来说&#xff0c;商品数据的管理是一个重要的环节。通过JD商品详情API&#xff0c;商家可以方便地获取京东平台上的商品详情&#xff0c;实现数据的整合与同步。这有助于确保商品信息的准确性&#xff0c;提高库存管理和订单处理的效…

前端react入门day04-useEffect与Hook函数

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;请留下您的足迹&#xff09; 目录 useEffect 的使用 useEffect 的概念理解 useEffect 依赖项参数说明 useEffect — 清除副作用 自定义Ho…

新数智空间:阿里云边缘云持续保持中国公有云市场第一

全球领先的 IT 市场研究和咨询公司 IDC 发布 《中国边缘云市场解读&#xff08;2023H1&#xff09;》报告 中国边缘公有云服务市场 阿里云持续第一 稳居市场第一&#xff0c;“边缘”逆势生长 近日&#xff0c;全球领先的 IT 市场研究和咨询公司 IDC 最新发布《中国边缘云市…

通信入门系列——信号的频谱分析

一、信号频谱 信号的频谱&#xff0c;指的是一段频率范围内的情况&#xff0c;信号的幅度和相位的情况。 以一个频率为1Hz的余弦电压信号进行说明&#xff0c;这个信号的傅里叶变换为X(ω)πδ(ω-2π)πδ(ω2π)&#xff0c;也就是所谓的频谱密度&#xff0c;单位为V/(rad/…

Manjora 中使用idm,linux通用

说明 在Mnajora中的idm需要在wine中运行&#xff0c;idm是一款很不错的下载工具&#xff0c;但是在linux不能直接使用&#xff0c;借助wine也无法使用浏览器的集成插件&#xff0c;在网上偶然发现一款第三方插件能够在linux的浏览器中将链接捕捉到idm中&#xff0c;虽然使用起…

Java的便捷输入方法及解析

在 Java 中&#xff0c;有多种便捷的输入方法可以从用户那里获取输入。下面是一些常见的便捷输入方法及解析&#xff1a; 使用 Scanner 类&#xff1a;在上述示例中&#xff0c;首先导入了 java.util.Scanner 类&#xff0c;创建了一个 Scanner 对象&#xff0c;并使用 System…

PXE批量高效网络装机

总结 1实验流程只能抄老师&#xff0c;记忆浅 2排错能力几乎无 3 指令用的太死&#xff0c; 一 系统装机的三种引导方式 启动 操作 系统 1.硬盘 2.光驱&#xff08;u盘&#xff09; 3.网络启动 pxe 重装系统&#xff1f; 在已有操作系统 新到货了一台服务器&#xff…

【数据结构】二叉树(遍历,递归)

&#x1f308;个人主页&#xff1a;秦jh__https://blog.csdn.net/qinjh_?spm1010.2135.3001.5343&#x1f525; 系列专栏&#xff1a;《数据结构》https://blog.csdn.net/qinjh_/category_12536791.html?spm1001.2014.3001.5482 ​​​ 目录 二叉树遍历规则 前序遍历 ​…

数组练习 Leetcode 566.重塑矩阵

在 MATLAB 中&#xff0c;有一个非常有用的函数 reshape &#xff0c;它可以将一个 m x n 矩阵重塑为另一个大小不同&#xff08;r x c&#xff09;的新矩阵&#xff0c;但保留其原始数据。 给你一个由二维数组 mat 表示的 m x n 矩阵&#xff0c;以及两个正整数 r 和 c &#…

使用docker部署RStudio容器并结合内网穿透实现公网访问

文章目录 前言1. 安装RStudio Server2. 本地访问3. Linux 安装cpolar4. 配置RStudio server公网访问地址5. 公网远程访问RStudio6. 固定RStudio公网地址 前言 RStudio Server 使你能够在 Linux 服务器上运行你所熟悉和喜爱的 RStudio IDE&#xff0c;并通过 Web 浏览器进行访问…