RocketMQ MQClientInstance、生产者实例启动源码分析

news2025/1/22 18:58:04

在这里插入图片描述

🔭 嗨,您好 👋 我是 vnjohn,在互联网企业担任 Java 开发,CSDN 优质创作者
📖 推荐专栏:Spring、MySQL、Nacos、Java,后续其他专栏会持续优化更新迭代
🌲文章所在专栏:RocketMQ
🤔 我当前正在学习微服务领域、云原生领域、消息中间件等架构、原理知识
💬 向我询问任何您想要的东西,ID:vnjohn
🔥觉得博主文章写的还 OK,能够帮助到您的,感谢三连支持博客🙏
😄 代词: vnjohn
⚡ 有趣的事实:音乐、跑步、电影、游戏

目录

  • 前言
  • 类结构
    • MQClientInstance
    • TopicRouteData
      • QueueData
      • BrokerData
  • DefaultMQProducer#start
    • DefaultProducerImpl#start
    • MQClientInstance#start
  • 总结

前言

从 RocketMQ 源码来看,在生产者、消费者上都是使用的一个相同的客户端实例类:MQClientInstance,在该篇博文会分析该类里面核心的结构信息以及介绍生产者它的一个启动过程.

DefaultProducetImpl、DefaultLitePullConsumerImpl、DefaultMQPushConsumerImpl

RocketMQ 专栏篇:

从零开始:手把手搭建 RocketMQ 单节点、集群节点实例

保护数据完整性:探索 RocketMQ 分布式事务消息的力量

RocketMQ 分布式事务消息实战指南:确保数据一致性的关键设计

RocketMQ 生产者源码分析:DefaultMQProducer、DefaultMQProducerImpl

类结构

MQClientInstance

MQClientInstance 作为生产者、消费者共同使用的 MQ 客户端实例类,先来分析一下它内部重要的属性信息:

// 客户端id > IP+实例名称
private final String clientId;

// 生产者组 -> 生产者实例
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
// 消费者组 -> 消费者实例
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
// Admin 组 -> MQ 管理实现类,比如:RocketMQ Dashboard,通过它来获取 MQ 集群、MQ Broker、MQ Topic 等元数据信息
private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
// Topic -> Topic Queue、Broker 信息
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();

// NettyRemoteClient 使用的配置
// MQClientAPIImpl#start 启动就是启动 NettyRemoteClient 与 NettyServer 建立 TCP 连接,进行 Epoll 网络传输
// ClientRemotingProcessor 处理发起请求以及响应服务端请求的处理类,它是 MQClientAPIImpl 内的属性,由它负责发起请求
private final NettyClientConfig nettyClientConfig;
private final ClientRemotingProcessor clientRemotingProcessor;
private final MQClientAPIImpl mQClientAPIImpl;

// 从 nameserver 获取 Topic 元数据的同步锁|定时获取 broker 心跳,确保在同一个应用程序内多个生产者、多个消费者同时触发调用
private final Lock lockNamesrv = new ReentrantLock();
private final Lock lockHeartbeat = new ReentrantLock();

// 单线程定时任务调度
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
  @Override
  public Thread newThread(Runnable r) {
    return new Thread(r, "MQClientFactoryScheduledThread");
  }
});

// 消费推模式所使用的核心类、消费者数量发生重平衡问题如何再均衡分派的核心类
private final PullMessageService pullMessageService;
private final RebalanceService rebalanceService;

以下是相关的类结构图

在这里插入图片描述

属性:ConcurrentMap<String /* Group */, MQProducerInner> producerTable 作用于生产者

属性:ConcurrentMap<String /* Group */, MQConsumerInner> consumerTable、PullMessageService、RebalanceService 作用于消费者

RebalanceService 具体的实现类会在实例化消费者时指定好.

生产者、消费者都是承当 NettyRemoteClient 角色,它里面的处理类型包含了生产者、消费者的请求,相关的请求编码可查看:ClientRemoteProcessor

TopicRouteData

生产者、消费者都会使用到 Topic 路由信息,同时都会定时从 nameserver 拉取更新这些元数据信息,以下是其内部属性结构:

// 多个 broker ; 分割
private String orderTopicConf;
// BrokerName、读队列数、写队列数、队列权限级别(读、写)
private List<QueueData> queueDatas;
// BrokerName、Broker-Id、Broker-Addr
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

这些属性都会提供对应的 getter/setter 方法

QueueData

它属于主题元数据的子类,队列数据:Broker-Name、读取队列、写队列、权限级别、主题标签

private String brokerName;
private int readQueueNums;
private int writeQueueNums;
// 代表权限级别:读-4、写-2
private int perm;
// 主题标签:默认为 0,在创建主题时重试 RETRY 为 2、其余主题为 1
private int topicSysFlag;

在创建主题时会给主题打上对应的标签

int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
  // RETRY_GROUP_TOPIC_PREFIX = "%RETRY%"
  if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
  } else {
    topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
  }
}

读取、写入队列数量不指定时默认都是 4:DefaultMQProducer#defaultTopicQueueNums

BrokerData

在这里会存储好 Broker-Name、Broker-Id、Broker-Address

private String cluster;
private String brokerName;
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

总的可以通过 RocketMQ Dashboard 控制台打开查看 Topic Router 信息

在这里插入图片描述

在 RocketMQ 第一篇文章搭建 RocketMQ 集群时,采用的是「多 Master 多 Slave 模式—异步复制」集群模式,所以在上面能够看到两个 BrokerId -> Broker Address

从零开始:手把手搭建 RocketMQ 单节点、集群节点实例

DefaultMQProducer#start

一开始的入口就是要调用该方法启动生产者实例才能运用生产者投递消息

在这里插入图片描述

public void start() throws MQClientException {
  // 组装生产者组
  this.setProducerGroup(withNamespace(this.producerGroup));
  this.defaultMQProducerImpl.start();
  // 追踪消息日志的一个异步守护线程,通过 enableMsgTrace 参数开启
  if (null != traceDispatcher) {
    try {
      traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
    } catch (MQClientException e) {
      log.warn("trace dispatcher start failed ", e);
    }
  }
}

主要关注的是在 DefaultProducerImpl#start 做的事情

DefaultProducerImpl#start

在生产者实例的实现类来看,它主要是将客户端实例先进行实例化、初始化,然后再将其启动起来.

public void start(final boolean startFactory) throws MQClientException {
  switch (this.serviceState) {
      // 默认状态
    case CREATE_JUST:
      this.serviceState = ServiceState.START_FAILED;
      // 检查生产者组名是否满足要求
      this.checkConfig();

      if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
        this.defaultMQProducer.changeInstanceNameToPID();
      }
      // 创建一个客户端实例
      this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
      // 在一个应用程序内通过不同的生产者组名来区分不同的 MQ 生产者,若出现一样的情况下会出现以下异常
      boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
      if (!registerOK) {
        this.serviceState = ServiceState.CREATE_JUST;
        throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                                    null);
      }
      // 自动创建 Topic 开启时指定的默认 Key,将其存在到 Topic 信息表中
      this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
      // 外部使用的客户端实例需要将其启动使用
      if (startFactory) {
        mQClientFactory.start();
      }
      log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
               this.defaultMQProducer.isSendMessageWithVIPChannel());
      // 将服务状态调整为运行态
      this.serviceState = ServiceState.RUNNING;
      break;
    case RUNNING:
    case START_FAILED:
    case SHUTDOWN_ALREADY:
      throw new MQClientException("The producer service state not OK, maybe started once, "
                                  + this.serviceState
                                  + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                                  null);
    default:
      break;
  }

  this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

  RequestFutureHolder.getInstance().startScheduledTask(this);

}

一开始 ServiceState 默认就是 CREATE_JUST,如果是多次调用 start 方法就会出现: MQClientException("The producer service state not OK, maybe started once

它的主要作用是为了将客户端实例 MQClientInstance 给启动起来,在它里面承载了向 Broker 发起请求以及元数据同步的事情,启动完成以后,会向所有的 Broker 发起请求建立 TCP 连接维持健康心跳.

MQClientInstance#start

public void start() throws MQClientException {
  synchronized (this) {
    switch (this.serviceState) {
      case CREATE_JUST:
        this.serviceState = ServiceState.START_FAILED;
        // If not specified,looking address from name server
        if (null == this.clientConfig.getNamesrvAddr()) {
          this.mQClientAPIImpl.fetchNameServerAddr();
        }
        // 开启客户端请求-响应的通道
        this.mQClientAPIImpl.start();
        // 定时任务调度更新信息
        this.startScheduledTask();
        // 启动推送消息的服务.
        this.pullMessageService.start();
        // 启动重平衡服务
        this.rebalanceService.start();
        // 不作为,在高版本中被删减,参数为 FALSE
        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
        log.info("the client factory [{}] start OK", this.clientId);
        this.serviceState = ServiceState.RUNNING;
        break;
      case START_FAILED:
        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
      default:
        break;
    }
  }
}

维持好一个既承当生产者、消费者的客户端实例,它里面包含了针对生产者的工作也包含了对消费者的工作处理

1、维护好客户端 API 调用 Broker 服务端的链 > MQClientAPIImpl

2、通过定时任务从 nameserver 获取 Topic 元数据信息、定时维持好与 Broker 之间的心跳 > startScheduledTask

3、生产者、消费者都是 Netty 客户端角色,无阻塞方式建立好 TCP 连接,接受来自 Broker 读、写以及向 Broker 发起读、写事件 > MQClientAPIImpl.NettyRemotingClient

MQClientAPIImpl 启动的工作就是将在实例化时构建好的 NettyRemotingClient 实例启动,以下是 NettyRemotingClient#start 启动时的源码:

public void start() {
  this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
    // 默认线程数为 4
    nettyClientConfig.getClientWorkerThreads(),
    new ThreadFactory() {
      private AtomicInteger threadIndex = new AtomicInteger(0);
      @Override
      public Thread newThread(Runnable r) {
        return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
      }
    });
  // 后续会发起请求时会通过 eventLoopGroupWorker 去建立 Socket 连接与服务端之间进行读、写交互
  // NioSocketChannel 代表的就是非阻塞的 SocketChannel
  Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
    // 数据包组装为更大的帧然后进行发送
    .option(ChannelOption.TCP_NODELAY, true)
    // 定时发送探测包来探测连接的对端是否存活
    .option(ChannelOption.SO_KEEPALIVE, false)
    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
    .handler(new ChannelInitializer<SocketChannel>() {
      @Override
      public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (nettyClientConfig.isUseTLS()) {
          if (null != sslContext) {
            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
            log.info("Prepend SSL handler");
          } else {
            log.warn("Connections are insecure as SSLContext is null!");
          }
        }
        // DefaultEventExecutorGroup 用来执行以下五个 ChannelHandler
        pipeline.addLast(
          defaultEventExecutorGroup,
          // 编码 -> 处理请求
          new NettyEncoder(),
          // 解码 -> 处理响应
          new NettyDecoder(),
          new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
          new NettyConnectManageHandler(),
          // 远程调用->请求、响应处理器
          new NettyClientHandler());
      }
    });
  // 操作系统客户端发送缓冲区的大小
  if (nettyClientConfig.getClientSocketSndBufSize() > 0) {
    log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());
    handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());
  }
  // 操作系统客户端接收缓冲区的大小
  if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {
    log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());
    handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
  }
  if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {
    log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",
             nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());
    handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
      nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));
  }
  // Timer 定时执行哪些请求过期的事件,每隔 3 秒
  this.timer.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
      try {
        NettyRemotingClient.this.scanResponseTable();
      } catch (Throwable e) {
        log.error("scanResponseTable exception", e);
      }
    }
  }, 1000 * 3, 1000);
  // 客户端一般为空,在 nameserver 与 Broker 交互时会使用到,做一些连接、关闭、异常、死亡状态的回调处理
  if (this.channelEventListener != null) {
    this.nettyEventExecutor.start();
  }
}

在这里会将 Netty Worker Group Selector 创建好,等到客户端实例发起请求时再去建立 TCP 连接,事件到来有五个需要执行的类,区分为请求和响应

NettyEncoder:编码器-处理请求的事件

NettyDecoder:解码器-处理响应的事件

IdleStateHandler:当客户端空闲时,会处理的事件

NettyConnectManageHandler:处理连接的事件,当事件监听器不为空时同时会给监听器也派发连接的事件

NettyClientHandler:客户端的读写处理逻辑,非常重要!!

客户端都是以非阻塞的方式与服务端建立 TCP 连接的,如下图:

在这里插入图片描述

关于更多 RocketMQ 网络通信模型的核心类属性结构以及方法、请求-响应之间如何协调在后续文章再具体分析.

在 MQClientInstance#start 方法还会启动五个定时任务调度 MQClientInstance#startScheduledTask,用于更新元数据、维持健康心跳等

private void startScheduledTask() {
  if (null == this.clientConfig.getNamesrvAddr()) {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        try {
          MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
        } catch (Exception e) {
          log.error("ScheduledTask fetchNameServerAddr exception", e);
        }
      }
    }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
  }
  this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
      try {
        MQClientInstance.this.updateTopicRouteInfoFromNameServer();
      } catch (Exception e) {
        log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
      }
    }
  }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
  this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
      try {
        MQClientInstance.this.cleanOfflineBroker();
        MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
      } catch (Exception e) {
        log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
      }
    }
  }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
  this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
      try {
        MQClientInstance.this.persistAllConsumerOffset();
      } catch (Exception e) {
        log.error("ScheduledTask persistAllConsumerOffset exception", e);
      }
    }
  }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

  this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
      try {
        MQClientInstance.this.adjustThreadPool();
      } catch (Exception e) {
        log.error("ScheduledTask adjustThreadPool exception", e);
      }
    }
  }, 1, 1, TimeUnit.MINUTES);
}
  1. MQClientAPIImpl#fetchNameServerAddr:若 nameserver 信息为空时,则 2 分钟更新一次 nameserver 信息

  2. MQClientAPIImpl#getTopicRouteInfoFromNameServer(java.lang.String, long)默认每隔 30 秒检查|更新一次 Topic 元数据信息 > TopicRouteData

    生产者:producerTable
    消费者:consumerTable
    判别是否需要更新「生产者-发布信息、消费者-订阅信息」取决于元数据是否变更

  3. MQClientAPIImpl#sendHearbeat默认每隔 30 秒检查|更新 broker 信息,对比是否 Topic 元数据中的 broker 匹配

  4. MQConsumerInner#persistConsumerOffset:取决于消费者的不同实现,默认每隔 5 秒持久化消费者的偏移量信息,向本地文件或其他 broker 发出请求同步持久

  5. DefaultMQPushConsumerImpl#adjustThreadPool:默认为 1 分钟执行一次动态调整「推」消费模式的核心线程数.

PullMessageService:一个单独的线程,当它启动以后,会从一个阻塞队列中获取元素,当元素不为空时就向 broker 发起拉取消息的请求

它有两个核心的方法:PullMessageService#executePullRequestLater、PullMessageService#executePullRequestImmediately

两个方法都是由推送模式的消费者实例调起:DefaultMQPushConsumerImpl#executePullRequestImmediately、DefaultMQPushConsumerImpl#executePullRequestLater

executePullRequestImmediately:直接往阻塞队列中塞入一个元素,元素就是拉取的请求信息 > PullRequest

executePullRequestLater:当消息在客户端未处理完或发生了异常,就延迟一会时间再进行拉取,这一块具体在讲解消费者源码时再详细说明,在 PullMessageService 就是通过定时任务延迟再调用 executePullRequestImmediately 方法

RebalanceService:一个单独的线程,当消费者数量变更以后,由 Broker 向消费者发起请求,消费者实例收到响应后再延迟一段时间触发重平衡的工作,在重平衡时处理时会区分消费模式属于集群模式还是广播模式,集群模式会选择对应的分配策略:AllocateMessageQueueStrategy 重新为 Topic 下 MessageQueue 重新均衡分配消息.

重平衡是一个很漫长的过程,会采用 ProcessQueue 加锁的方式为 Topic 下所有 Queue 重新分配以及维护好 Offset 工作.

总结

该篇博文主要介绍 MQClientInstance 客户端实例以及 TopicRouteData 主题元数据信息,结合 Dashboard 看会更加的清晰,后续介绍了生产者实例启动的一个过程,其中介绍了几个核心的地方:NettyRemotingClient Netty 客户端、MQClientInstance#startScheduledTask 涉及到的定时调度任务、PullMessageService 推送模式作用的类、RebalanceService 重平衡服务,希望该篇博文你能够喜欢,感谢三连支持❤️

后续文章会分享生产者如何发送消息至 Broker 以及它如何处理返回的.

🌟🌟🌟愿你我都能够在寒冬中相互取暖,互相成长,只有不断积累、沉淀自己,后面有机会自然能破冰而行!

博文放在 RocketMQ 专栏里,欢迎订阅,会持续更新!

如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

推荐专栏:Spring、MySQL,订阅一波不再迷路

大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1360302.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【机器学习】循环神经网络(一)

一、网络结构 RNN 处理输入序列时的信息流。 粗体箭头为各时间点信息流的活跃路径&#xff0c;虚线箭头显示当时不活动的连接。 一个简单RNN例子 RNN 不是一类网络&#xff0c;而是适用于不同问题的拓扑结构的集合。循环网络的一个有趣的方面是&#xff0c;有了足够的层和节点&…

手写视频裁剪框

<!-- 截取框 --><divv-show"isShow"class"crop-box":style"{width: cropWidth px,height: cropHeight px,left: cropX px,top: cropY px,}"ref"cropBox"mousedown"startInteraction"><!-- 内容在这里 --…

【信息论与编码】习题-单选题

目录 单选题1.下列说法正确的是&#xff08;B&#xff09;2.在信息论中&#xff0c;若用对数底2为&#xff0c;则信息量的单位为&#xff08;C&#xff09;3.率失真函数的下限为&#xff08;A&#xff09;4.给定xi条件下随机事件yj所包含的不确定度和条件自信息量p(yj /xi)。&a…

stable diffusion 人物高级提示词(二)衣物、身材

一、衣服大类 英文中文Shirt衬衫Blouse女式衬衫Dress连衣裙Skirt裙子Pants裤子Jeans牛仔裤Swimsuit泳衣Underwear内衣Bra文胸Panties内裤Stockings长筒袜Shoes鞋子Socks袜子 二、细分分类 dress 是连衣裙&#xff1a; 英文解释Formal Dress正式礼服&#xff0c;通常用于正式…

代码随想录刷题第三十八天| 理论基础 ● 509. 斐波那契数 ● 70. 爬楼梯 ● 746. 使用最小花费爬楼梯

代码随想录刷题第三十八天 动态规划基础理论 斐波那契数 (LC 509) 题目思路&#xff1a; 代码实现&#xff1a; class Solution:def fib(self, n: int) -> int:if n<1: return ndp [0 for _ in range(n1)]dp[1] 1for i in range(2, n1):dp[i] dp[i-1]dp[i-2] …

专业课130+,总分390+四川大学951信号与系统考研通信,电子信息经验分享

今年专业课130&#xff0c;总分390&#xff0c;顺利上岸&#xff0c;将近一年复习一路走来&#xff0c;感慨很多&#xff0c;希望以下经历可以给后来的同学提供一些参考。 初试备考经验 公共课&#xff1a;三门公共课&#xff0c;政治&#xff0c;英语&#xff0c;数学。在备…

软件测试|SQL TOP提取顶部数据该如何使用?

简介 在SQL查询语言中&#xff0c;TOP子句是一个非常有用的功能&#xff0c;它允许我们从数据库中提取指定数量的顶部数据记录。本文将深入探讨SQL TOP子句的使用方法&#xff0c;以及在实际应用中的一些常见场景和技巧。 SQL TOP SQL是一种用于管理和操作关系型数据库的强大…

16 Linux 内核定时器

一、Linux 时间管理和内核定时器简介 1. 内核时间管理简介 Linux 内核中有大量的函数需要时间管理&#xff0c;比如周期性的调度程序、延时程序、定时器等。 硬件定时器提供时钟源&#xff0c;时钟源的频率可以设置&#xff0c;设置好以后就周期性的产生定时中断&#xff0c;系…

提升图像分割精度:学习UNet++算法

文章目录 一、UNet 算法简介1.1 什么是 UNet 算法1.2 UNet 的优缺点1.3 UNet 在图像分割领域的应用 二、准备工作2.1 Python 环境配置2.2 相关库的安装 三、数据处理3.1 数据的获取与预处理3.2 数据的可视化与分析 四、网络结构4.1 UNet 的网络结构4.2 UNet 各层的作用 五、训练…

04、Kafka ------ CMAK 各个功能的作用解释(Cluster、集群、Broker、位移主题、复制因子、领导者副本、主题)

目录 启动命令&#xff1a;CMAK的用法★ 在CMAK中添加 Cluster★ 在CMAK中查看指定集群★ 在CMAK中查看 Broker★ 位移主题★ 复制因子★ 领导者副本和追随者副本★ 查看主题 启动命令&#xff1a; 1、启动 zookeeper 服务器端 小黑窗输入命令&#xff1a; zkServer 2、启动 …

编写一个弹跳小球的程序,小球在窗口中四处反弹(python)

import pygame import random# 初始化Pygame pygame.init()# 窗口尺寸 width 800 height 600# 创建窗口 screen pygame.display.set_mode((width, height)) pygame.display.set_caption("Bouncing Ball")# 小球初始位置和速度 ball_radius 20 ball_color (255, …

【kettle】pdi/data-integration 打开ktr文件报错“Unable to load step info from XML“

一、报错内容&#xff1a; Unable to load step info from XML step nodeorg.pentaho.di.core.exception.KettleXMLException: Unable to load step info from XMLat org.pentaho.commons.launcher.Launcher.main (Launcher.java:92)at java.lang.reflect.Method.invoke (Met…

【ARMv8架构系统安装PySide2】

ARMv8架构系统安装PySide2 Step1. 下载Qt资源包Step2. 配置和安装Qt5Step3. 检查Qt-5.15.2安装情况Step4. 安装PySide2所需的依赖库Step5. 下载和配置PySide2Step6. 检验PySide2是否安装成功 Step1. 下载Qt资源包 if you need the whole Qt5 (~900MB): wget http://master.qt…

数据结构(JS实现)

目录 链表链表的特点链表中的常见操作单链表append(data)尾部追加新节点toString()输出链表的节点数据插入节点insert(position,data)get(position)获取链表指定位置节点的数据indexOf(data)查找对应数据节点的位置update(position, newData)更新指定位置节点数据removeAt(posi…

Unity | NGO网络框架

目录 一、相关属性及变量 1.ServerRpc属性 2.ClientRpc属性 3.NetworkVariable变量 二、相关组件 1.NetworkManager 2.Unity Transport 3.Network Object 4.NetworkBehaviour&#xff1a; 5.NetworkTransform Syncing(Synchronizing) Thresholds Interpolation 三…

windows通过ssh连接Liunx服务器并实现上传下载文件

连接ssh 输入&#xff1a;ssh空格用户名ip地址&#xff0c;然后按Enter 有可能出现下图提示&#xff0c;输入yes 回车即可 输入 password &#xff0c;注意密码是不显示的&#xff0c;输入完&#xff0c;再按回车就行了 以上是端口默认22情况下ssh连接&#xff0c;有些公司它…

【VSCode】CMake Language Support 总是下载 .NET 超时,但又不想升级dotnet

错误信息 Error: Could not resolve dotnet path!An error occurred while installing .NET (6.0): .NET Acquisition Failed: Installation failed: Error: .NET installation timed out. You may need to change the timeout time if you have a slow connection. Please se…

VuePress部署到GitHub Pages

一、git push自动部署 1、创建用于工作流的文件 在项目根目录下创建一个用于 GitHub Actions 的工作流 .yml 文件 name: docson:# 每当 push 到 main 分支时触发部署push:branches: [main]# 手动触发部署workflow_dispatch:jobs:docs:runs-on: ubuntu-lateststeps:- uses: a…

湖仓架构的演进

1.数据仓库架构的历史演进 起初&#xff0c;业界数据处理首选方式是数仓架构。通常数据处理的流程是把一些业务数据库&#xff0c;通过ETL的方式加载到Data Warehouse中&#xff0c;再在前端接入一些报表或者BI的工具去展示。 数据仓库概念是 Inmon 于 1990 年提出并给出了完…

Spark Streaming的容错性与高可用性

在实时数据处理领域&#xff0c;容错性和高可用性是至关重要的。Apache Spark Streaming是一个强大的工具&#xff0c;用于实时数据处理和分析&#xff0c;具备卓越的容错性和高可用性。本文将深入探讨Spark Streaming的容错性机制&#xff0c;以及如何实现高可用性的实时数据处…