Vert.x 源码解析(4.x)——ClusteredEventBus入门使用和源码解析

news2025/1/18 10:48:21

目录

在这里插入图片描述

1. 简介

如果哪里有错误,欢迎指正。

如果哪里有不明白的地方,欢迎讨论

Vert.x集群器

Vert.x 集群管理器的可插拔性,可轻易切换至其它的集群管理器。

Vert.x 集群管理器包含以下几项功能:

  • 发现并管理集群中的节点
  • 管理集群的 EventBus 地址订阅清单(这样就可以轻松得知集群中的哪些节点订阅了哪些 EventBus 地址)
  • 分布式 Map 支持
  • 分布式锁
  • 分布式计数器

Vert.x 集群器 并不 处理节点之间的通信。在 Vert.x 中,集群节点间通信是直接由 TCP 连接处理的。

2. 集群器使用

Vert.x基于多种框架实现了集群器

比如Hazelcast 集群器只需要引入该依赖就行

依赖引入:

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-hazelcast</artifactId>
 <version>4.4.0</version>
</dependency>

Vert.x初始化

ClusterManager mgr = new HazelcastClusterManager();

VertxOptions options = new VertxOptions().setClusterManager(mgr);

Vertx.clusteredVertx(options, res -> {
  if (res.succeeded()) {
    Vertx vertx = res.result();
  } else {
    // 失败
  }
});

其中涉及到不一样的类

3. 主要类简介

**ClusterManager:**管理和维护集群信息以支持分布式应用程序的开发和运行

**ClusteredHandlerHolder:**用于维护事件处理程序的信息,包括处理程序的地址、序列号以及其他与处理程序相关的属性。

ClusteredEventBus:事件总线的集群版本。它负责在集群中传递和分发事件和消息。

ClusteredMessage:传递的消息的表示。它包含消息的内容、包含编码解码等数据处理

RoundRobinSelector:集群中选择节点的算法

集群消息协议

  1. 协议版本号(Protocol Version):协议的第一个字节通常用于表示协议的版本号。这个版本号用于确保不同 Vert.x 节点之间使用相同的协议版本进行通信。如果不同节点的协议版本不匹配,可能会导致通信失败。
  2. 系统编解码器(System Codec):协议的第二个字节通常用于表示系统级别的消息编解码器。这个字节的值对应于一个系统内置的消息编解码器。系统编解码器用于处理一些特殊的系统级别消息,例如 Ping 和 Pong 消息,用于检测节点的健康状态。
  3. 用户自定义编解码器(User Codec):如果系统编解码器的字节值为 -1,表示后续的数据是用户自定义消息的编码信息。这包括消息的编码名称和消息体的二进制数据。在协议中,首先会包含一个整数,表示消息编码名称的长度,然后是编码名称的字节表示,最后是消息体的二进制数据。
  4. 发送标志(Send Flag):协议的下一个字节通常用于表示消息的发送标志。这个字节指示消息是要发送给其他节点还是用于发布(广播)给订阅者的。通常,0 表示发送给其他节点,1 表示发布给订阅者。
  5. 消息地址(Address):接下来的数据用于表示消息的地址。首先,会包含一个整数,表示地址的长度,然后是地址的字节表示。消息地址用于路由消息到正确的目标节点或订阅者。
  6. 回复地址(Reply Address):如果有的话,接下来的数据用于表示消息的回复地址,格式与消息地址类似。
  7. 发送者(Sender):之后的数据表示消息的发送者的标识,通常是节点的标识。
  8. 消息头(Headers):消息头是一个键值对的集合,用于存储与消息相关的元数据。消息头的数据结构可以因实现而异,但通常包括一系列键值对,用于描述消息的特性和属性。
  9. 消息体(Body):消息体包含实际的消息内容,它的格式和内容可以根据消息的类型和编码方式而变化。

4. 源码分析

4.1 源码分析案例

    Vertx v1 = Vertx.clusteredVertx(new VertxOptions()VertxOptions().setClusterManager(mgr)).toCompletionStage().toCompletableFuture().get(30, TimeUnit.SECONDS);
    Vertx v2 = Vertx.clusteredVertx(new VertxOptions()VertxOptions().setClusterManager(mgr)).toCompletionStage().toCompletableFuture().get(30, TimeUnit.SECONDS);

    v1.eventBus().consumer("the-address").handler(msg -> {
      if ("ping".equals(msg.body())) {
        result.complete(null);
      } else {
        result.completeExceptionally(new Exception());
      }
    });
    v2.eventBus().send("the-address", "ping")

4.2 Vertx.clusteredVertx(初始化创建EventBus等)

这里我只介绍与EventBus相关的,整体的我后在后续文章中写出

clusteredVertx

static Future<Vertx> clusteredVertx(VertxOptions options) {
  return new VertxBuilder(options).init().clusteredVertx();
}

init()

  public VertxBuilder init() {
    //服务列表,VertxServiceProvider 就一个init方法
    Collection<VertxServiceProvider> providers = new ArrayList<>();
    //初始化集群管理器配置和集群管理器服务提供者
    initClusterManager(options, providers);
    return this;
  }

initClusterManager

private static void initClusterManager(VertxOptions options, Collection<VertxServiceProvider> providers) {
  ClusterManager clusterManager = options.getClusterManager();
  if (clusterManager == null) {
    String clusterManagerClassName = System.getProperty("vertx.cluster.managerClass");
    if (clusterManagerClassName != null) {
      // We allow specify a sys prop for the cluster manager factory which overrides ServiceLoader
      try {
        Class<?> clazz = Class.forName(clusterManagerClassName);
        clusterManager = (ClusterManager) clazz.getDeclaredConstructor().newInstance();
      } catch (Exception e) {
        throw new IllegalStateException("Failed to instantiate " + clusterManagerClassName, e);
      }
    }
  }
  if (clusterManager != null) {
    providers.add(clusterManager);
  }
}

这边就是把clusterManager进行了初始化,重新回到clusteredVertx()方法

clusteredVertx

public Future<Vertx> clusteredVertx() {
  checkBeforeInstantiating();
  if (clusterManager == null) {
    throw new IllegalStateException("No ClusterManagerFactory instances found on classpath");
  }
  //初始化VertxImpl,这里面是整个Vertx初始化,与之主要相关的就是会初始化EventBus实例,根据clusterManager是否为空来初始化集群EvnetBus或本地EventBus
  VertxImpl vertx = new VertxImpl(
    options,
    clusterManager,
    clusterNodeSelector == null ? new DefaultNodeSelector() : clusterNodeSelector,
    metrics,
    tracer,
    transport,
    fileResolver,
    threadFactory,
    executorServiceFactory);
  return vertx.initClustered(options);
}

重点initClustered

Future<Vertx> initClustered(VertxOptions options) {
  //初始化节点选择器
  nodeSelector.init(this, clusterManager);
  // 初始化集群管理器
  clusterManager.init(this, nodeSelector);
  // 创建一个用于加入集群的Promise
  Promise<Void> initPromise = Promise.promise();
  Promise<Void> joinPromise = Promise.promise();
  joinPromise.future().onComplete(ar -> {
    //创建回调加入集群后的操作
    if (ar.succeeded()) {
      // 创建高可用管理器(HA Manager)
      createHaManager(options, initPromise);
    } else {
      initPromise.fail(ar.cause());
    }
  });
  // 加入集群
  clusterManager.join(joinPromise);
  return initPromise
    .future()
    .transform(ar -> {
      if (ar.succeeded()) {
        if (metrics != null) {
          metrics.vertxCreated(this);
        }
        return Future.succeededFuture(this);
      } else {
        log.error("Failed to initialize clustered Vert.x", ar.cause());
        return close().transform(v -> Future.failedFuture(ar.cause()));
      }
    });
}

createHaManager

创建高可用以及开启EventBUs

private void createHaManager(VertxOptions options, Promise<Void> initPromise) {
  //是否开启高可用
  if (options.isHAEnabled()) {
    this.<HAManager>executeBlocking(fut -> {
      //创建HAManager(高可用,负责节点转移等)
      haManager = new HAManager(this, deploymentManager, verticleManager, clusterManager, clusterManager.getSyncMap(CLUSTER_MAP_NAME), options.getQuorumSize(), options.getHAGroup());
      fut.complete(haManager);
    }, false).onComplete(ar -> {
      if (ar.succeeded()) {
        //成功后打开EvnentBus
        startEventBus(true, initPromise);
      } else {
        initPromise.fail(ar.cause());
      }
    });
  } else {
    //打开EvnentBus
    startEventBus(false, initPromise);
  }
}

startEventBus

private void startEventBus(boolean haEnabled, Promise<Void> initPromise) {
  Promise<Void> promise = Promise.promise();
  //执行eventBus开始事件
  eventBus.start(promise);
  promise.future().onComplete(ar -> {
    if (ar.succeeded()) {
      if (haEnabled) {
        //高可用初始化,里面主要就是把当前节点进行存储,以及使用clusterManager.nodeListener监听节点的加入和离开
        initializeHaManager(initPromise);
      } else {
        initPromise.complete();
      }
    } else {
      initPromise.fail(ar.cause());
    }
  });
}

eventBus.start(promise);

这里会启动一个服务,所以实际每个集群节点都会起来一个服务用来传输信息。

@Override
public void start(Promise<Void> promise) {
  NetServerOptions serverOptions = getServerOptions();
  // 创建网络服务器
  server = vertx.createNetServer(serverOptions);
  // 设置连接处理器,处理其他节点的连接请求
  server.connectHandler(getServerHandler());
  // 集群端口和主机地址
  int port = getClusterPort();
  String host = getClusterHost();
  ebContext.runOnContext(v -> {
    // 启动网络服务器并监听指定端口和主机
    server.listen(port, host).flatMap(v2 -> {
      // 获取集群公共端口和主机地址
      int publicPort = getClusterPublicPort(server.actualPort());
      String publicHost = getClusterPublicHost(host);
      // 创建本节点的信息
      nodeInfo = new NodeInfo(publicHost, publicPort, options.getClusterNodeMetadata());
      nodeId = clusterManager.getNodeId();
      // 设置本节点信息到集群管理器
      Promise<Void> setPromise = Promise.promise();
      clusterManager.setNodeInfo(nodeInfo, setPromise);
      return setPromise.future();
    }).andThen(ar -> {
      if (ar.succeeded()) {
        started = true;
        nodeSelector.eventBusStarted();
      }
    }).onComplete(promise);
  });
}

4.3 getServerHandler(接受数据)

这里再分析一下这个方法,他里面包含了解析数据

它的头部四字节是长度,后面是它具体协议的分布。解析的方式就i是通过fixedSizeMode方法设置下次读取长度

private Handler<NetSocket> getServerHandler() {
  return socket -> {
    //创建解析器,四字节
    RecordParser parser = RecordParser.newFixed(4);
    Handler<Buffer> handler = new Handler<Buffer>() {
      int size = -1;

      public void handle(Buffer buff) {
        if (size == -1) {
          //如果是开始解析,则直接解析头四字节
          size = buff.getInt(0);
          //将读取出来的长度设置下次解析size长度的数据
          parser.fixedSizeMode(size);
        } else {
          //创建信息
          ClusteredMessage received = new ClusteredMessage(ClusteredEventBus.this);
          //这边的buff相当于是上面通过size以及解析出来的数据
          received.readFromWire(buff, codecManager);
          // 如果有metrics则记录消息的读取事件
          if (metrics != null) {
            metrics.messageRead(received.address(), buff.length());
          }
          // 重置消息解析器的固定大小为 4(用于解析下一条消息的长度字段)
          parser.fixedSizeMode(4);
          size = -1;
          // 如果消息包含错误信息
          if (received.hasFailure()) {
            received.internalError();
          } else if (received.codec() == CodecManager.PING_MESSAGE_CODEC) {
            // 如果消息是 Ping 消息,则直接回复 Pong 消息到连接的套接字
            // Just send back pong directly on connection
            socket.write(PONG);
          } else {
            //调用父类的方法,这里还是一样进行信息接受处理
            deliverMessageLocally(received);
          }
        }
      }
    };
    //设置处理器和接受信息处理
    parser.setOutput(handler);
    socket.handler(parser);
  };
}

我这里就讲下readFromWire 接受数据的作用,其他比如deliverMessageLocally这些就是本地调用的处理方法

received.readFromWire(buff, codecManager);

数据编码协议

/**
 * 前面四字节长度(整个数据的长度)+
 * 读取数据 协议版本(1位)+系统编码器索引(1位)+自定义编码器名字长度(4位)+自定义编码器名字长度
 * +发送还是发布(1位)
 * +读取地址长度(4位)+地址长度
 * +回复地址长度(4位)+地址长度
 * +发送者长度(4位)+发送者名字长度
 * +消息头的长度(4位)+头的长度
 * @param buffer
 * @param codecManager
 */
public void readFromWire(Buffer buffer, CodecManager codecManager) {
  int pos = 0;
  // 从缓冲区中读取协议版本号
  // Overall Length already read when passed in here
  byte protocolVersion = buffer.getByte(pos);
  if (protocolVersion > WIRE_PROTOCOL_VERSION) {
    setFailure("Invalid wire protocol version " + protocolVersion + " should be <= " + WIRE_PROTOCOL_VERSION);
  }
  pos++;
  //读取系统编解码器的编码索引
  byte systemCodecCode = buffer.getByte(pos);
  pos++;
  //如果等于-1则就是用户自定义编码器
  if (systemCodecCode == -1) {
    // 读取长度,占四个字节,所以+4
    int length = buffer.getInt(pos);
    pos += 4;
    //根据读取到的长度和原先pos的位置获取自定义编码器名字
    byte[] bytes = buffer.getBytes(pos, pos + length);
    //
    String codecName = new String(bytes, CharsetUtil.UTF_8);
    messageCodec = codecManager.getCodec(codecName);
    //如果获取不到则记录失败
    if (messageCodec == null) {
      setFailure("No message codec registered with name " + codecName);
    }
    pos += length;
  } else {
    //这里就是有索引就直接获取
    messageCodec = codecManager.systemCodecs()[systemCodecCode];
  }
  byte bsend = buffer.getByte(pos);
  //判断是发送还是发布
  send = bsend == 0;
  pos++;
  //读取消息地址 4位
  int length = buffer.getInt(pos);
  pos += 4;
  byte[] bytes = buffer.getBytes(pos, pos + length);
  address = new String(bytes, CharsetUtil.UTF_8);
  pos += length;
  //读取回复地址
  length = buffer.getInt(pos);
  pos += 4;
  //如果地址不等于0则获取回复地址以及pos上加上长度
  if (length != 0) {
    bytes = buffer.getBytes(pos, pos + length);
    replyAddress = new String(bytes, CharsetUtil.UTF_8);
    pos += length;
  }
  //发送者4字节
  length = buffer.getInt(pos);
  pos += 4;
  bytes = buffer.getBytes(pos, pos + length);
  sender = new String(bytes, CharsetUtil.UTF_8);
  pos += length;
  //前面全部长度赋值
  headersPos = pos;
  //获取消息头的长度
  int headersLength = buffer.getInt(pos);
  pos += headersLength;
  //接下来就是body的起始位置
  bodyPos = pos;
  wireBuffer = buffer;
  fromWire = true;
}

4.4 consumer(订阅主题)

这里的方法和本地的是一样的,就是创建一个MessageConsumerImpl,具体解析在上一篇有

  @Override
  public <T> MessageConsumer<T> consumer(String address) {
    //检查是否开始,如果没开始则抛出异常
    checkStarted();
    //检查地址是否为空
    Objects.requireNonNull(address, "address");
    return new MessageConsumerImpl<>(vertx, vertx.getOrCreateContext(), this, address,  false);
  }

4.5 handler(添加消费者)

这里与本地的EventBus一样

不一样的是里面register后续的执行步骤

@Override
public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
  if (h != null) {
    //保证只有一个handler注册
    synchronized (this) {
      handler = h;
      if (!registered) {
        registered = true;
        Promise<Void> p = result;
        Promise<Void> registration = context.promise();
        //调用父类注册方法
        register(null, localOnly, registration);
        registration.future().onComplete(ar -> {
          if (ar.succeeded()) {
            p.tryComplete();
          } else {
            p.tryFail(ar.cause());
          }
        });
      }
    }
  } else {
    unregister();
  }
  return this;
}

register

synchronized void register(String repliedAddress, boolean localOnly, Promise<Void> promise) {
  if (registered != null) {
    throw new IllegalStateException();
  }
  //将该MessageConsumer用户添加到bus
  registered = bus.addRegistration(address, this, repliedAddress != null, localOnly, promise);
  if (bus.metrics != null) {
    metric = bus.metrics.handlerRegistered(address, repliedAddress);
  }
}

addRegistration

protected <T> HandlerHolder<T> addRegistration(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly, Promise<Void> promise) {
  HandlerHolder<T> holder = addLocalRegistration(address, registration, replyHandler, localOnly);
  //集群方法
  onLocalRegistration(holder, promise);
  return holder;
}

addLocalRegistration

private <T> HandlerHolder<T> addLocalRegistration(String address, HandlerRegistration<T> registration,
                                                  boolean replyHandler, boolean localOnly) {
  Objects.requireNonNull(address, "address");

  ContextInternal context = registration.context;

  //创建HandlerHolder,作用就是把要执行(集群方法)
  HandlerHolder<T> holder = createHandlerHolder(registration, replyHandler, localOnly, context);

  //将新的holder添加到到的该地址的ConcurrentCyclicSequence里
  ConcurrentCyclicSequence<HandlerHolder> handlers = new ConcurrentCyclicSequence<HandlerHolder>().add(holder);
  ConcurrentCyclicSequence<HandlerHolder> actualHandlers = handlerMap.merge(
    address,
    handlers,
    (old, prev) -> old.add(prev.first()));

  //如果是部署的话增加关闭回调
  if (context.isDeployment()) {
    context.addCloseHook(registration);
  }

  return holder;
}

集群方法createHandlerHolder

这里唯一不同的就是会生成一个唯一码

 private final AtomicLong handlerSequence = new AtomicLong(0);
protected <T> HandlerHolder<T> createHandlerHolder(HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly, ContextInternal context) {
  return new ClusteredHandlerHolder<>(registration, replyHandler, localOnly, context, handlerSequence.getAndIncrement());
}
public class ClusteredHandlerHolder<T> extends HandlerHolder<T> {

  //生成唯一标识
  private final long seq;

  public ClusteredHandlerHolder(HandlerRegistration<T> handler, boolean replyHandler, boolean localOnly, ContextInternal context, long seq) {
    super(handler, replyHandler, localOnly, context);
    this.seq = seq;
  }

  @Override
  public long getSeq() {
    return seq;
  }
}

集群方法onLocalRegistration

protected <T> void onLocalRegistration(HandlerHolder<T> handlerHolder, Promise<Void> promise) {
  //判断是否是回复处理器
  if (!handlerHolder.isReplyHandler()) {
    //如果是的话就创建RegistrationInfo,它内部就是节点信息nodeId,seq就是处理器序列号以及是否本地处理器
    RegistrationInfo registrationInfo = new RegistrationInfo(
      nodeId,
      handlerHolder.getSeq(),
      handlerHolder.isLocalOnly()
    );
    //接着把它添加进集群管理器进行注册
    clusterManager.addRegistration(handlerHolder.getHandler().address, registrationInfo, Objects.requireNonNull(promise));
  } else if (promise != null) {
    promise.complete();
  }
}

4.6 send(发送生产信息和编码数据)

@Override
public EventBus send(String address, Object message, DeliveryOptions options) {
   //createMessage创建信息这里调用的是集群方法
  MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName());
  sendOrPubInternal(msg, options, null, null);
  return this;
}

集群方法createMessage

这里与本地主要的区别是创建了clusteredMessage,这个方法构造函数多了一个参数就是nodeId,他是我们当前节点的唯一标识

  // 本节点的唯一标识符
private String nodeId;
@Override
public MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
  Objects.requireNonNull(address, "no null address accepted");
   //选择信息解码器
  MessageCodec codec = codecManager.lookupCodec(body, codecName, false);
  @SuppressWarnings("unchecked")
   //创建ClusteredMessage
  ClusteredMessage msg = new ClusteredMessage(nodeId, address, headers, body, codec, send, this);
  return msg;
}
public ClusteredMessage(String sender, String address, MultiMap headers, U sentBody,
                        MessageCodec<U, V> messageCodec, boolean send, EventBusImpl bus) {
  super(address, headers, sentBody, messageCodec, send, bus);
  this.sender = sender;
}

sendOrPubInternal

这里都是一样的就是判断是否开始,接着开始调用OutboundDeliveryContext方法,执行监听器和下一步操作

  public <T> void sendOrPubInternal(OutboundDeliveryContext<T> senderCtx) {
    //判断是否开始
    checkStarted();
    senderCtx.bus = this;
    senderCtx.metrics = metrics;
    //就是调用刚刚的context执行next方法,主要作用就是执行拦截器以及发送方法
    senderCtx.next();
  }

senderCtx.next()

这边都是一样的就是执行监听器以及execute方法

  @Override
  public void next() {
    //判断现在是否正在执行
    if (invoking) {
      invokeNext = true;
    } else {
      //当前拦截器id是否小于拦截器数量
      while (interceptorIdx < interceptors.length) {
        Handler<DeliveryContext> interceptor = interceptors[interceptorIdx];
        invoking = true;
        interceptorIdx++;
        //判断当前执行线程是否与总线线程相同
        if (context.inThread()) {
          //是的话则直接执行
          context.dispatch(this, interceptor);
        } else {
          try {
            //如果线程不同,则直接执行拦截器方法
            interceptor.handle(this);
          } catch (Throwable t) {
            context.reportException(t);
          }
        }
        //设置false并检查是否继续调用下一个拦截器
        invoking = false;
        if (!invokeNext) {
          return;
        }
        invokeNext = false;
      }
      //所有拦截器执行完后将id设置为0
      interceptorIdx = 0;
      //调用execute方法,该方法实在子类里进行实现的
      execute();
    }
  }
}

execute

该方法也是一样的

接下来才是重点了

bus.sendOrPub(this)则是调用的集群方法

@Override
protected void execute() {
  VertxTracer tracer = ctx.tracer();
  //确认是否有追踪器
  if (tracer != null) {
    //如果信息为为被追踪,则将标为信息起点
    if (message.trace == null) {
      src = true;
      BiConsumer<String, String> biConsumer = (String key, String val) -> message.headers().set(key, val);
      TracingPolicy tracingPolicy = options.getTracingPolicy();
      //没有指定则使用默认追踪策略
      if (tracingPolicy == null) {
        tracingPolicy = TracingPolicy.PROPAGATE;
      }
      //创建发送追踪请求
      message.trace = tracer.sendRequest(ctx, SpanKind.RPC, tracingPolicy, message, message.send ? "send" : "publish", biConsumer, MessageTagExtractor.INSTANCE);
    } else {
      //如果消息存在就直接发送
      // Handle failure here
      tracer.sendResponse(ctx, null, message.trace, null, TagExtractor.empty());
    }
  }
  // 实际执行消息的发送
  bus.sendOrPub(this);
}

集群方法 bus.sendOrPub(this);

这里也会区分是回复信息还是本地信息,还是发送和发布

这里重点讲

serializer:消息处理器

nodeSelector::selectForSend 节点选择器

protected <T> void sendOrPub(OutboundDeliveryContext<T> sendContext) {
  //如果消息回复信息不为空则调用clusteredSendReply方法回复
  if (((ClusteredMessage) sendContext.message).getRepliedTo() != null) {
    clusteredSendReply(((ClusteredMessage) sendContext.message).getRepliedTo(), sendContext);
  } else if (sendContext.options.isLocalOnly()) {
    //如果是本地,则直接本地回复
    super.sendOrPub(sendContext);
  } else {
    //没有回复信息
    Serializer serializer = Serializer.get(sendContext.ctx);
    //点对点发送
    if (sendContext.message.isSend()) {
      Promise<String> promise = sendContext.ctx.promise();
      //选择目标节点
      serializer.queue(sendContext.message, nodeSelector::selectForSend, promise);
      promise.future().onComplete(ar -> {
        // 如果选择目标节点成功,发送消息到目标节点
        if (ar.succeeded()) {
          sendToNode(sendContext, ar.result());
        } else {
          // 如果选择目标节点失败,处理发送失败
          sendOrPublishFailed(sendContext, ar.cause());
        }
      });
    } else {
      // 如果消息是发布消息
      Promise<Iterable<String>> promise = sendContext.ctx.promise();
      // 使用序列化器排队消息并选择多个目标节点
      serializer.queue(sendContext.message, nodeSelector::selectForPublish, promise);
      promise.future().onComplete(ar -> {
        if (ar.succeeded()) {
          // 如果选择目标节点成功,发送消息到多个目标节点
          sendToNodes(sendContext, ar.result());
        } else {
          sendOrPublishFailed(sendContext, ar.cause());
        }
      });
    }
  }
}

Serializer serializer = Serializer.get(sendContext.ctx);

public static Serializer get(ContextInternal context) {
  // 从上下文中获取上下文数据
  ConcurrentMap<Object, Object> contextData = context.contextData();
  // 尝试从上下文数据中获取 Serializer 实例
  Serializer serializer = (Serializer) contextData.get(Serializer.class);
  if (serializer == null) {
    // 如果未找到 Serializer 实例,则创建一个新的 Serializer 实例
    Serializer candidate = new Serializer(context);
    //将Serializer添加到contextData
    Serializer previous = (Serializer) contextData.putIfAbsent(Serializer.class, candidate);
    //返回Serializer
    if (previous == null) {
      serializer = candidate;
    } else {
      serializer = previous;
    }
  }
  return serializer;
}

new Serializer

这里context一定是eventloop,目的就是为了执行数据的时候是顺序执行

private Serializer(ContextInternal context) {
  ContextInternal unwrapped = context.unwrap();
  //判断当前是否是EventLoopContext,如果不是的话则通过Vertx直接创建
  if (unwrapped.isEventLoopContext()) {
    ctx = unwrapped;
  } else {
    VertxInternal vertx = unwrapped.owner();
    ctx = vertx.createEventLoopContext(unwrapped.nettyEventLoop(), unwrapped.workerPool(), unwrapped.classLoader());
  }
  //创建一个queues
  queues = new HashMap<>();
  if (unwrapped.isDeployment()) {
    unwrapped.addCloseHook(this);
  }
}

serializer.queue

public <T> void queue(Message<?> message, BiConsumer<Message<?>, Promise<T>> selectHandler, Promise<T> promise) {
  ctx.emit(v -> {
    String address = message.address();
    //根据地址创建一个SerializerQueue队列
    SerializerQueue queue = queues.computeIfAbsent(address, SerializerQueue::new);
    //将消息、选择处理器和 Promise 添加到队列中
    queue.add(message, selectHandler, promise);
  });
}

queue.add

这里创建了一个SerializedTask,内部就是消息和选择执行器。task则是一个linkedlist为了保证数据顺序

 private final Queue<SerializedTask<?>> tasks;
//SerializerQueue构造方法
 SerializerQueue(String address) {
      this.address = address;
     //linkedlist
      this.tasks = new LinkedList<>();
 }

<U> void add(Message<?> msg, BiConsumer<Message<?>, Promise<U>> selectHandler, Promise<U> promise) {
  // 创建一个 SerializedTask 对象,将消息、选择处理器和 Promise 关联
  SerializedTask<U> serializedTask = new SerializedTask<>(ctx, msg, selectHandler);
  Future<U> fut = serializedTask.internalPromise.future();
  //在 Future 完成时执行与 Promise 关联的操作
  fut.onComplete(promise);
  //在 Future 完成时执行 SerializedTask
  fut.onComplete(serializedTask);
  // 将任务添加到队列中
  tasks.add(serializedTask);
  // 检查是否有待处理的任务
  checkPending();
}

checkPending

void checkPending() {
  if (!running) {
    running = true;
    while (true) {
      SerializedTask<?> task = tasks.peek();
      // 循环处理队列中的任务
      if (task != null) {
        task.process();
        if (tasks.peek() == task) {
          // Task will be completed later
          break;
        }
      } else {
        queues.remove(address);
        break;
      }
    }
    running = false;
  }
}

task.process()

这里就是最开始传进来的节点选择方法,由她来做具体操作

void process() {
  selectHandler.accept(msg, internalPromise);
}

nodeSelector::selectForSend

这里主要是调用selectorswithselector方法,那我们继续跟下去

 private Selectors selectors;
public void selectForSend(Message<?> message, Promise<String> promise) {
  // 确保方法仅用于发送消息
  Arguments.require(message.isSend(), "selectForSend used for publishing");
  // 目标节点选择
  selectors.withSelector(message, promise, (prom, selector) -> {
    prom.tryComplete(selector.selectForSend());
  });
}

withSelector

public <T> void withSelector(Message<?> message, Promise<T> promise, BiConsumer<Promise<T>, RoundRobinSelector> task) {
  //获取消息地址
  String address = message.address();
  //如果地址不存在 则创建新的SelectorEntry
  //已经存在但不是就绪状态则增加计数
  SelectorEntry entry = map.compute(address, (addr, curr) -> {
    return curr == null ? new SelectorEntry() : (curr.isNotReady() ? curr.increment() : curr);
  });
  if (entry.isNotReady()) {
    //是否初始化,节点数量0则要初始化
    if (entry.shouldInitialize()) {
      //初始化
      initialize(address);
    }
    // 在 SelectorPromise 完成后执行任务
    entry.selectorPromise.future().onComplete(ar -> {
      if (ar.succeeded()) {
        task.accept(promise, ar.result());
      } else {
        promise.fail(ar.cause());
      }
    });
  } else {
    // 如果地址已经就绪,直接执行任务,并将 Promise 和选择器传递给任务处理函数
    task.accept(promise, entry.selector);
  }
}

previous.data(accessible)

选择相应的选择器,这里选择器具体实现我后面会单独讲

到这里位置就把前面的初始化方法initialize(address);讲完了,重新回去讲withSelector方法

SelectorEntry data(List<String> nodeIds) {
  if (nodeIds == null || nodeIds.isEmpty()) {
    return null;
  }
  //根据节点计算权重
  Map<String, Weight> weights = computeWeights(nodeIds);
  RoundRobinSelector selector;
  //判断是否均匀分布
  if (isEvenlyDistributed(weights)) {
    //如果是均匀分布则创建SimpleRoundRobinSelector(因为均匀分布吗,所以他是按照顺序)
    selector = new SimpleRoundRobinSelector(new ArrayList<>(weights.keySet()));
  } else {
    //否则是WeightedRoundRobinSelector(它按照权重来)
    selector = new WeightedRoundRobinSelector(weights);
  }
  return new SelectorEntry(selector, selectorPromise, counter);
}

task.accept(promise, ar.result());

withSelector方法最后还是执行task.accept

task就是我们这里withSelector方法的第三个参数

prom.tryComplete(selector.selectForSend());

这个方法就是选取相应的节点

selectors.withSelector(message, promise, (prom, selector) -> {
  prom.tryComplete(selector.selectForSend());
});

接着重新回到sendOrPub方法内部

当节点获取成功后执行的是sendToNode方法

sendToNode(sendContext, ar.result());

private <T> void sendToNode(OutboundDeliveryContext<T> sendContext, String nodeId) {
  if (nodeId != null && !nodeId.equals(this.nodeId)) {
    sendRemote(sendContext, nodeId, sendContext.message);
  } else {
    super.sendOrPub(sendContext);
  }
}
  private void sendRemote(OutboundDeliveryContext<?> sendContext, String remoteNodeId, MessageImpl message) {
    // We need to deal with the fact that connecting can take some time and is async, and we cannot
    // block to wait for it. So we add any sends to a pending list if not connected yet.
    // Once we connect we send them.
    // This can also be invoked concurrently from different threads, so it gets a little
    // tricky
    //获取远程连接
    ConnectionHolder holder = connections.get(remoteNodeId);
    //如果为空就创建一个远程连接
    if (holder == null) {
      // When process is creating a lot of connections this can take some time
      // so increase the timeout
      holder = new ConnectionHolder(this, remoteNodeId);
      ConnectionHolder prevHolder = connections.putIfAbsent(remoteNodeId, holder);
      if (prevHolder != null) {
        // Another one sneaked in
        holder = prevHolder;
      } else {
        holder.connect();
      }
    }
    //开始发送信息
    holder.writeMessage(sendContext);
  }

holder.connect();

void connect() {
  Promise<NodeInfo> promise = Promise.promise();
  //获取该节点相关信息
  eventBus.vertx().getClusterManager().getNodeInfo(remoteNodeId, promise);
  //接着通过client连接,client在ClusteredEventBus初始化的时候就已经创建好了
  promise.future()
    .flatMap(info -> eventBus.client().connect(info.port(), info.host()))
    .onComplete(ar -> {
      if (ar.succeeded()) {
        //成功连接后的信息处理
        connected(ar.result());
      } else {
        log.warn("Connecting to server " + remoteNodeId + " failed", ar.cause());
        close(ar.cause());
      }
    });
}

connected

private synchronized void connected(NetSocket socket) {
  // 获取当前socket
  this.socket = socket;
  //设置为已连接
  connected = true;
  //异常处理
  socket.exceptionHandler(err -> {
    close(err);
  });
  //关闭处理
  socket.closeHandler(v -> close());
  //处理从socket内接受到的数据
  socket.handler(data -> {
    // Got a pong back
    vertx.cancelTimer(timeoutID);
    schedulePing();
  });

schedulePing

这里会发送一个ping的消息,就关乎到EventBus的协议组成,我放在真正发送信息的地方讲

private void schedulePing() {
  EventBusOptions options = eventBus.options();
  pingTimeoutID = vertx.setTimer(options.getClusterPingInterval(), id1 -> {
    // If we don't get a pong back in time we close the connection
    timeoutID = vertx.setTimer(options.getClusterPingReplyInterval(), id2 -> {
      // Didn't get pong in time - consider connection dead
      log.warn("No pong from server " + remoteNodeId + " - will consider it dead");
      close();
    });
    ClusteredMessage pingMessage =
      new ClusteredMessage<>(remoteNodeId, PING_ADDRESS, null, null, new PingMessageCodec(), true, eventBus);
    Buffer data = pingMessage.encodeToWire();
    socket.write(data);
  });
}

holder.writeMessage(sendContext);

这里重新讲回sendRemote方法里的writeMessage方法,这里进行数据编码之后直接调用socket.write直接发送

synchronized void writeMessage(OutboundDeliveryContext<?> ctx) {
  //判断是否连接
  if (connected) {
    //组成协议
    Buffer data = ((ClusteredMessage) ctx.message).encodeToWire();
    if (metrics != null) {
      metrics.messageWritten(ctx.message.address(), data.length());
    }
    //发送数据
    socket.write(data).onComplete(ctx);
  } else {
    //如果没连接先将信息存储起来
    if (pending == null) {
      if (log.isDebugEnabled()) {
        log.debug("Not connected to server " + remoteNodeId + " - starting queuing");
      }
      pending = new ArrayDeque<>();
    }
    pending.add(ctx);
  }
}

encodeToWire

这个就是协议组成部分编码好协议后调用**socket.write(data).onComplete(ctx);**进行数据发送

public Buffer encodeToWire() {
  toWire = true;
  //定义buffer长度
  int length = 1024; // TODO make this configurable
  Buffer buffer = Buffer.buffer(length);
  // 先占位 4 字节,表示整个消息的长度,后面会修改
  buffer.appendInt(0);
  //协议版本
  buffer.appendByte(WIRE_PROTOCOL_VERSION);
  //解析器id
  byte systemCodecID = messageCodec.systemCodecID();
  buffer.appendByte(systemCodecID);
  //-1的话就是自定义协议名字
  if (systemCodecID == -1) {
    // User codec
    writeString(buffer, messageCodec.name());
  }
  //发送还是发布
  buffer.appendByte(send ? (byte) 0 : (byte) 1);
  //发送地址
  writeString(buffer, address);
  //回复地址不为空则加上
  if (replyAddress != null) {
    writeString(buffer, replyAddress);
  } else {
    buffer.appendInt(0);
  }
  //发送者
  writeString(buffer, sender);
  //头部数据编码
  encodeHeaders(buffer);
  //body数据编码
  writeBody(buffer);
  //修改最开始占位的数据
  buffer.setInt(0, buffer.length() - 4);
  return buffer;
}

encodeHeaders

private void encodeHeaders(Buffer buffer) {
  //如果headers不为空
  if (headers != null && !headers.isEmpty()) {
    int headersLengthPos = buffer.length();
    //先缓存占位
    buffer.appendInt(0);
    //获取头部数据的长度写入
    buffer.appendInt(headers.entries().size());
    List<Map.Entry<String, String>> entries = headers.entries();
    //把map数据循环写入
    for (Map.Entry<String, String> entry: entries) {
      writeString(buffer, entry.getKey());
      writeString(buffer, entry.getValue());
    }
    //设置原先占位的长度
    int headersEndPos = buffer.length();
    buffer.setInt(headersLengthPos, headersEndPos - headersLengthPos);
  } else {
    //为空就一个长度
    buffer.appendInt(4);
  }
}

4.7 节点算法

4.7.1 说明

它有一个接口内部方法就是选择单个节点和获取全部节点

一共默认两种实现方式,

第一种就是平均分配。

第二种就是根据权重分配

都是实现RoundRobinSelector接口

public interface RoundRobinSelector {

  String selectForSend();

  Iterable<String> selectForPublish();java

}

//这个类是用来取数的

class Index implements IntUnaryOperator {

  private final int max;
  private final AtomicInteger idx = new AtomicInteger(0);

  Index(int max) {
    this.max = max;
  }

  int nextVal() {
    //继承了IntUnaryOperator所以根据applyAsInt来计算值
    //如果等于max-1则从0开始不然就是+1
    return idx.getAndUpdate(this);
  }

    //当到达max-1就直接归0
  @Override
  public int applyAsInt(int i) {
    return i == max - 1 ? 0 : i + 1;
  }
}

4.7.2 轮询节点选择

class SimpleRoundRobinSelector implements RoundRobinSelector {

  private final List<String> nodeIds;
  private final Index index;

  SimpleRoundRobinSelector(List<String> nodeIds) {
    if (nodeIds.size() > 1) {
      this.nodeIds = Collections.unmodifiableList(nodeIds);
      //就是Index里面包含一个最大值和顺序增加的值
      index = new Index(nodeIds.size());
    } else {
      this.nodeIds = Collections.singletonList(nodeIds.get(0));
      index = null;
    }
  }

  @Override
  public String selectForSend() {
     //获取就是根据顺序来
    if (index == null) return nodeIds.get(0);
    return nodeIds.get(index.nextVal());
  }

  @Override
  public Iterable<String> selectForPublish() {
    return nodeIds;
  }
}

4.7.3 权重选择器

它的原理就是先按照权重小到大进行排序,根据你的权重构造执行范围区间,在区间内来循环目前所在的节点

class WeightedRoundRobinSelector implements RoundRobinSelector {

  //存储唯一值nodeId
  private final List<String> uniqueIds;
  private final TreeMap<Integer, Integer> offsets = new TreeMap<>();
  private final Index index;

  WeightedRoundRobinSelector(Map<String, Weight> weights) {
    List<String> uniqueIds = new ArrayList<>(weights.size());
    // 创建一个用于排序节点的列表,按照权重值排序
    List<Map.Entry<String, Weight>> sorted = new ArrayList<>(weights.entrySet());
    sorted.sort(Map.Entry.comparingByValue());
    // 计算节点的总权重
    int totalWeight = 0;
    int increment, limit;
    for (int i = 0; i < sorted.size(); i++) {
      Map.Entry<String, Weight> current = sorted.get(i);
      uniqueIds.add(current.getKey());
      //获取当前节点的权重
      int weight = current.getValue().value();
      totalWeight += weight;
      if (i < sorted.size() - 1) {
        //如果i==0则前一个权重为0,如果不为0则获取前一个权重的值,并且将当前权重减去前一个权重=权重增加值
        increment = weight - (i == 0 ? 0 : sorted.get(i - 1).getValue().value());
        //上一个节点的偏移量,如果位第一个则为0 + 为计算权重数量乘以权重增量,这是因为要确定范围值
        limit = (i == 0 ? 0 : offsets.lastKey()) + (weights.size() - i) * increment;
        //将计算出来的偏移量进行存储
        offsets.put(limit, i + 1);
      }
    }
    //构造不可改变list
    this.uniqueIds = Collections.unmodifiableList(uniqueIds);
    //初始化,上线用全部的权重值相加,为了当总权重次数用完了哦,处于
    index = new Index(totalWeight);
  }

  @Override
  public String selectForSend() {
    //从index顺序获取
    int idx = index.nextVal();
    //获取小于idx最小的键值对
    Map.Entry<Integer, Integer> entry = offsets.floorEntry(idx);
    //如果找不到,则用idx的余数。保证在循环内
    if (entry == null) return uniqueIds.get(idx % uniqueIds.size());
    //获取偏移量值,获取相应的值
    int offset = entry.getValue();
    //如果偏移量等于节点列表的大小减一,那么这表示它是最后一个节点,因此选择最后一个节点。
    if (offset == uniqueIds.size() - 1)
      return uniqueIds.get(offset);
    //uniqueIds.size() - offset) 偏移量位置到列表末尾的节点数
    //idx % (uniqueIds.size() - offset)轮询索引除以剩余节点数的余数,这里的idx决定了我们在这三个剩余节点中选择哪一个
    //举例子,如果idx为0,那么offset + idx % (uniqueIds.size() - offset)就等于1 + 0% 3,也就是选择节点B。
    return uniqueIds.get(offset + idx % (uniqueIds.size() - offset));
  }

  @Override
  public Iterable<String> selectForPublish() {
    return uniqueIds;
  }
}

5. 总结

整体流程:

  1. 节点会起来一个接受信息的SocketServer服务,以及一个用于发送信息的SocketClient客户端
  2. 并且会把当前节点通过ClusteredMessage注册到集群管理器中
  3. 当要发送信息的时候根据选择选择相应的节点
  4. 再根据是发送还是发布,选择所有的注册点来进行处理。
  5. 如果是发送选择一个节点,被选中的节点选择其中一个处理器进行处理。
  6. 如果是发布则选择所有的节点进行信息发送

如果哪里有错误,欢迎指正。

如果哪里有不明白的地方,欢迎讨论

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/983045.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

node.js下载安装使用

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

WangEditor在Vue前端的应用

1、在Vue项目中安装WangEditor 对于Vue2&#xff1a; npm install wangeditor/editor-for-vue --save 或者 yarn add wangeditor/editor-for-vue 对于Vue3&#xff1a; npm install wangeditor/editor-for-vuenext --save 或者 yarn add wangeditor/editor-for-vuenext 2、将Wa…

站在AI大模型十字路口:实地探访2023服贸会

服贸会恰是一面镜子。小到针对蓝领市场的刷脸招聘机器&#xff0c;大到向政企展示的生活服务数据监测平台&#xff0c;无一不在折射出&#xff0c;中国的数字化服务已渗透到个人生活与企业管理的方方面面。 作者|思杭 出品|产业家 处暑过后的北京&#xff0c;仍留着夏天些…

DevEco Studio开发工具无法预览的问题处理

预览如上图报错信息 解决办法&#xff1a; 1. 首先打开 "SDK管理" 下载后即可解决

Docker使用及本地Yolov5打包教程

1. Docker的安装 注意&#xff1a;官方也提供了直接Pull Yolov5的渠道&#xff1a; docker pull ultralytics/yolov5 详见&#xff1a;https://hub.docker.com/r/ultralytics/yolov5 --------------------------------------------------以下正文------------------------…

QT之形态学操作

形态学操作包含以下操作&#xff1a; 腐蚀 (Erosion)膨胀 (Dilation)开运算 (Opening)闭运算 (Closing)形态梯度 (Morphological Gradient)顶帽 (Top Hat)黑帽(Black Hat) 其中腐蚀和膨胀操作是最基本的操作&#xff0c;其他操作由这两个操作变换而来。 腐蚀 用一个结构元素…

ABAP 一般采购申请创建、服务类型采购申请创建BAPI_REQUISITION_CREATE

前言 此文的示例同时可创建一般采购申请或服务采购申请&#xff0c;当采购类型为Z007时&#xff0c;触发服务采购申请相关字段填写 正文 创建服务类型采购申请的时候参数间互相绑定很绕&#xff0c;看了这篇博客才理明白&#xff0c;有兴趣的可以阅读原文 https://wiki.scn.…

Stable Diffusion插件:StyleSelectorXL 之七十七种绘画风格任君选择

本文给大家分享一个应用于 SDXL 的新插件&#xff1a;StyleSelectorXL。通过在UI界面上简单的选择&#xff0c;我们就可以生成多种多样的风格图片&#xff0c;如动漫、水彩、平面、3D、线稿、涂鸦、剪纸、朋克、童话等等。 基本介绍 用过 SDXL 的同学&#xff0c;应该能切身感…

算法leetcode|79. 单词搜索(rust重拳出击)

文章目录 79. 单词搜索&#xff1a;样例 1&#xff1a;样例 2&#xff1a;样例 3&#xff1a;提示&#xff1a;进阶&#xff1a; 分析&#xff1a;题解&#xff1a;rust&#xff1a;go&#xff1a;c&#xff1a;python&#xff1a;java&#xff1a; 79. 单词搜索&#xff1a; …

提高设计效率,你还需要掌握FeatureManager这些小窍门

FeatureManager 设计树使多种选择和过滤器操作变得更为方便&#xff0c;并在处理模型时提供对多个文件夹和有用工具的访问。 FeatureManager 设计树和图形区域为动态链接。可在任一窗格中选择特征、草图、工程视图和构造几何线。 您可以分割 FeatureManager 设计树&#xff0c;…

力扣每日一题---2594. 修车的最少时间

文章目录 思路解题方法复杂度Code 思路 请注意&#xff0c;能力值越低&#xff0c;修车越快&#xff0c;应该翻译成「排名」&#xff0c;排名越靠前&#xff0c;修车越快。&#xff09;根据题意可以知道r * n * n < t 的&#xff0c;所以可以利用数学知识进行改变公式&#…

灰度变换与空间滤波

灰度变换与空间滤波 背景知识 空间域指包含图像像素的平面&#xff0c;灰度变换与空间滤波均在空间域进行&#xff0c;即直接在图像像素上操作&#xff0c;表示为 g ( x , y ) T [ f ( x , y ) ] g(x,y)T[f(x,y)] g(x,y)T[f(x,y)] &#xff0c;其中 T T T 是在点 ( x , y…

【SWT】 使 ScrolledComposite 中内容动态变化后依然可以滚动

引言&#xff1a; 在用户界面设计中&#xff0c;有时需要在有限的空间内显示大量内容。如果内容超过可视区域的大小&#xff0c;滚动功能可以帮助用户滚动并查看所有内容。本文将介绍如何使用 Eclipse SWT 库中的 ScrolledComposite 控件来实现在滚动区域中显示可滚动的标签。 …

Discuz论坛帖子标题随机高亮颜色,拒绝千篇一律!

DZ论坛帖子标题默认是没有高亮、加粗效果的&#xff0c;如果是要实现某篇帖子标题高亮、加粗&#xff0c;站长或是版主可以点开这篇帖子&#xff0c;在发帖的下方可以看到精华、高亮、图章、置顶等操作&#xff0c;然后点击高亮&#xff0c;可以选择高亮颜色&#xff0c;是否加…

直播预告 | 博睿学院 Bonree ONE接入zabbix数据源提高可观测运维能力

Zabbix是业界覆盖面非常普遍的监控工具。本课程将介绍目前公有云的基础监控体系的构建思路&#xff0c;讲述One产品对接Zabbix数据的必要性与可观测性赋能效果。 课程中会分享数据接入的过程&#xff0c;重点讲解zabbix工作机制&#xff0c;深入分析zabbix数据库表结构&#x…

TCP IP网络编程(四) 基于TCP的服务器端、客户端

文章目录 理解TCP、UDPTCP/IP协议栈链路层IP层TCP/UDP层应用层 实现基于TCP的服务器端、客户端TCP服务器端的默认函数调用顺序进入等待连接请求状态受理客户端连接请求TCP客户端的默认函数调用顺序基于TCP的服务器端、客户端函数调用关系 实现迭代服务器端、客户端实现迭代服务…

算法通关村第十六关:青铜挑战-滑动窗口其实很简单

青铜挑战-滑动窗口其实很简单 1. 滑动窗口基本思想 数组引入双指针的背景&#xff1a; 很多算法会大量移动数组中的元素&#xff0c;频繁移动元素会导致执行效率低下或者超时&#xff0c;使用两个变量能比较好的解决很多相关问题 数组双指针&#xff0c;之前介绍过 对撞型 和…

无涯教程-JavaScript - COMPLEX函数

描述 COMPLEX函数将实系数和虚系数转换为x yi或x yj形式的复数。 语法 COMPLEX (real_num, i_num, [suffix])争论 Argument描述Required/Optionalreal_numThe real coefficient of the complex number. Requiredi_numThe imaginary coefficient of the complex number.Re…

Java基础知识点汇总

一、Java基础知识点整体框架 详细知识点见链接资源&#xff0c;注&#xff1a;框架是用Xmind App完成&#xff0c;查看需下载。 二、基础知识各部分概况 2.1 认识Java 2.2 数据类型和变量 2.3 运算符 2.4 程序逻辑控制 2.5 方法的使用 2.6 数组的定义和使用 2.7 类和对象 2.8 …

Python入门学习14(面向对象)

一、内置方法 二、封装 1. 封装的概念是指&#xff1f; 将现实世界事物在类中描述为属性和方法&#xff0c;即为封装。 2. 什么是私有成员&#xff1f;为什么需要私有成员&#xff1f; 现实事物有部分属性和行为是不公开对使用者开放的。同样在类中描述属性和方法的时…