目录
1. 简介
如果哪里有错误,欢迎指正。
如果哪里有不明白的地方,欢迎讨论
Vert.x集群器
Vert.x 集群管理器的可插拔性,可轻易切换至其它的集群管理器。
Vert.x 集群管理器包含以下几项功能:
- 发现并管理集群中的节点
- 管理集群的 EventBus 地址订阅清单(这样就可以轻松得知集群中的哪些节点订阅了哪些 EventBus 地址)
- 分布式 Map 支持
- 分布式锁
- 分布式计数器
Vert.x 集群器 并不 处理节点之间的通信。在 Vert.x 中,集群节点间通信是直接由 TCP 连接处理的。
2. 集群器使用
Vert.x基于多种框架实现了集群器
-
Hazelcast 集群器
基于 Hazelcast 实现的集群管理器。
-
Infinispan 集群
基于 Infinispan 实现的集群管理器。
-
Apache Ignite 集群
基于 Apache Ignite 实现的集群管理器。
-
Apache Zookeeper 集群
基于 Apache Zookeeper 实现的集群管理器。
比如Hazelcast
集群器只需要引入该依赖就行
依赖引入:
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-hazelcast</artifactId>
<version>4.4.0</version>
</dependency>
Vert.x初始化
ClusterManager mgr = new HazelcastClusterManager();
VertxOptions options = new VertxOptions().setClusterManager(mgr);
Vertx.clusteredVertx(options, res -> {
if (res.succeeded()) {
Vertx vertx = res.result();
} else {
// 失败
}
});
其中涉及到不一样的类
3. 主要类简介
**ClusterManager:**管理和维护集群信息以支持分布式应用程序的开发和运行
**ClusteredHandlerHolder:**用于维护事件处理程序的信息,包括处理程序的地址、序列号以及其他与处理程序相关的属性。
ClusteredEventBus:事件总线的集群版本。它负责在集群中传递和分发事件和消息。
ClusteredMessage:传递的消息的表示。它包含消息的内容、包含编码解码等数据处理
RoundRobinSelector:集群中选择节点的算法
集群消息协议
- 协议版本号(Protocol Version):协议的第一个字节通常用于表示协议的版本号。这个版本号用于确保不同 Vert.x 节点之间使用相同的协议版本进行通信。如果不同节点的协议版本不匹配,可能会导致通信失败。
- 系统编解码器(System Codec):协议的第二个字节通常用于表示系统级别的消息编解码器。这个字节的值对应于一个系统内置的消息编解码器。系统编解码器用于处理一些特殊的系统级别消息,例如 Ping 和 Pong 消息,用于检测节点的健康状态。
- 用户自定义编解码器(User Codec):如果系统编解码器的字节值为 -1,表示后续的数据是用户自定义消息的编码信息。这包括消息的编码名称和消息体的二进制数据。在协议中,首先会包含一个整数,表示消息编码名称的长度,然后是编码名称的字节表示,最后是消息体的二进制数据。
- 发送标志(Send Flag):协议的下一个字节通常用于表示消息的发送标志。这个字节指示消息是要发送给其他节点还是用于发布(广播)给订阅者的。通常,0 表示发送给其他节点,1 表示发布给订阅者。
- 消息地址(Address):接下来的数据用于表示消息的地址。首先,会包含一个整数,表示地址的长度,然后是地址的字节表示。消息地址用于路由消息到正确的目标节点或订阅者。
- 回复地址(Reply Address):如果有的话,接下来的数据用于表示消息的回复地址,格式与消息地址类似。
- 发送者(Sender):之后的数据表示消息的发送者的标识,通常是节点的标识。
- 消息头(Headers):消息头是一个键值对的集合,用于存储与消息相关的元数据。消息头的数据结构可以因实现而异,但通常包括一系列键值对,用于描述消息的特性和属性。
- 消息体(Body):消息体包含实际的消息内容,它的格式和内容可以根据消息的类型和编码方式而变化。
4. 源码分析
4.1 源码分析案例
Vertx v1 = Vertx.clusteredVertx(new VertxOptions()VertxOptions().setClusterManager(mgr)).toCompletionStage().toCompletableFuture().get(30, TimeUnit.SECONDS);
Vertx v2 = Vertx.clusteredVertx(new VertxOptions()VertxOptions().setClusterManager(mgr)).toCompletionStage().toCompletableFuture().get(30, TimeUnit.SECONDS);
v1.eventBus().consumer("the-address").handler(msg -> {
if ("ping".equals(msg.body())) {
result.complete(null);
} else {
result.completeExceptionally(new Exception());
}
});
v2.eventBus().send("the-address", "ping")
4.2 Vertx.clusteredVertx(初始化创建EventBus等)
这里我只介绍与EventBus相关的,整体的我后在后续文章中写出
clusteredVertx
static Future<Vertx> clusteredVertx(VertxOptions options) {
return new VertxBuilder(options).init().clusteredVertx();
}
init()
public VertxBuilder init() {
//服务列表,VertxServiceProvider 就一个init方法
Collection<VertxServiceProvider> providers = new ArrayList<>();
//初始化集群管理器配置和集群管理器服务提供者
initClusterManager(options, providers);
return this;
}
initClusterManager
private static void initClusterManager(VertxOptions options, Collection<VertxServiceProvider> providers) {
ClusterManager clusterManager = options.getClusterManager();
if (clusterManager == null) {
String clusterManagerClassName = System.getProperty("vertx.cluster.managerClass");
if (clusterManagerClassName != null) {
// We allow specify a sys prop for the cluster manager factory which overrides ServiceLoader
try {
Class<?> clazz = Class.forName(clusterManagerClassName);
clusterManager = (ClusterManager) clazz.getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new IllegalStateException("Failed to instantiate " + clusterManagerClassName, e);
}
}
}
if (clusterManager != null) {
providers.add(clusterManager);
}
}
这边就是把clusterManager进行了初始化,重新回到clusteredVertx()
方法
clusteredVertx
public Future<Vertx> clusteredVertx() {
checkBeforeInstantiating();
if (clusterManager == null) {
throw new IllegalStateException("No ClusterManagerFactory instances found on classpath");
}
//初始化VertxImpl,这里面是整个Vertx初始化,与之主要相关的就是会初始化EventBus实例,根据clusterManager是否为空来初始化集群EvnetBus或本地EventBus
VertxImpl vertx = new VertxImpl(
options,
clusterManager,
clusterNodeSelector == null ? new DefaultNodeSelector() : clusterNodeSelector,
metrics,
tracer,
transport,
fileResolver,
threadFactory,
executorServiceFactory);
return vertx.initClustered(options);
}
重点initClustered
Future<Vertx> initClustered(VertxOptions options) {
//初始化节点选择器
nodeSelector.init(this, clusterManager);
// 初始化集群管理器
clusterManager.init(this, nodeSelector);
// 创建一个用于加入集群的Promise
Promise<Void> initPromise = Promise.promise();
Promise<Void> joinPromise = Promise.promise();
joinPromise.future().onComplete(ar -> {
//创建回调加入集群后的操作
if (ar.succeeded()) {
// 创建高可用管理器(HA Manager)
createHaManager(options, initPromise);
} else {
initPromise.fail(ar.cause());
}
});
// 加入集群
clusterManager.join(joinPromise);
return initPromise
.future()
.transform(ar -> {
if (ar.succeeded()) {
if (metrics != null) {
metrics.vertxCreated(this);
}
return Future.succeededFuture(this);
} else {
log.error("Failed to initialize clustered Vert.x", ar.cause());
return close().transform(v -> Future.failedFuture(ar.cause()));
}
});
}
createHaManager
创建高可用以及开启EventBUs
private void createHaManager(VertxOptions options, Promise<Void> initPromise) {
//是否开启高可用
if (options.isHAEnabled()) {
this.<HAManager>executeBlocking(fut -> {
//创建HAManager(高可用,负责节点转移等)
haManager = new HAManager(this, deploymentManager, verticleManager, clusterManager, clusterManager.getSyncMap(CLUSTER_MAP_NAME), options.getQuorumSize(), options.getHAGroup());
fut.complete(haManager);
}, false).onComplete(ar -> {
if (ar.succeeded()) {
//成功后打开EvnentBus
startEventBus(true, initPromise);
} else {
initPromise.fail(ar.cause());
}
});
} else {
//打开EvnentBus
startEventBus(false, initPromise);
}
}
startEventBus
private void startEventBus(boolean haEnabled, Promise<Void> initPromise) {
Promise<Void> promise = Promise.promise();
//执行eventBus开始事件
eventBus.start(promise);
promise.future().onComplete(ar -> {
if (ar.succeeded()) {
if (haEnabled) {
//高可用初始化,里面主要就是把当前节点进行存储,以及使用clusterManager.nodeListener监听节点的加入和离开
initializeHaManager(initPromise);
} else {
initPromise.complete();
}
} else {
initPromise.fail(ar.cause());
}
});
}
eventBus.start(promise);
这里会启动一个服务,所以实际每个集群节点都会起来一个服务用来传输信息。
@Override
public void start(Promise<Void> promise) {
NetServerOptions serverOptions = getServerOptions();
// 创建网络服务器
server = vertx.createNetServer(serverOptions);
// 设置连接处理器,处理其他节点的连接请求
server.connectHandler(getServerHandler());
// 集群端口和主机地址
int port = getClusterPort();
String host = getClusterHost();
ebContext.runOnContext(v -> {
// 启动网络服务器并监听指定端口和主机
server.listen(port, host).flatMap(v2 -> {
// 获取集群公共端口和主机地址
int publicPort = getClusterPublicPort(server.actualPort());
String publicHost = getClusterPublicHost(host);
// 创建本节点的信息
nodeInfo = new NodeInfo(publicHost, publicPort, options.getClusterNodeMetadata());
nodeId = clusterManager.getNodeId();
// 设置本节点信息到集群管理器
Promise<Void> setPromise = Promise.promise();
clusterManager.setNodeInfo(nodeInfo, setPromise);
return setPromise.future();
}).andThen(ar -> {
if (ar.succeeded()) {
started = true;
nodeSelector.eventBusStarted();
}
}).onComplete(promise);
});
}
4.3 getServerHandler(接受数据)
这里再分析一下这个方法,他里面包含了解析数据
它的头部四字节是长度,后面是它具体协议的分布。解析的方式就i是通过fixedSizeMode方法设置下次读取长度
private Handler<NetSocket> getServerHandler() {
return socket -> {
//创建解析器,四字节
RecordParser parser = RecordParser.newFixed(4);
Handler<Buffer> handler = new Handler<Buffer>() {
int size = -1;
public void handle(Buffer buff) {
if (size == -1) {
//如果是开始解析,则直接解析头四字节
size = buff.getInt(0);
//将读取出来的长度设置下次解析size长度的数据
parser.fixedSizeMode(size);
} else {
//创建信息
ClusteredMessage received = new ClusteredMessage(ClusteredEventBus.this);
//这边的buff相当于是上面通过size以及解析出来的数据
received.readFromWire(buff, codecManager);
// 如果有metrics则记录消息的读取事件
if (metrics != null) {
metrics.messageRead(received.address(), buff.length());
}
// 重置消息解析器的固定大小为 4(用于解析下一条消息的长度字段)
parser.fixedSizeMode(4);
size = -1;
// 如果消息包含错误信息
if (received.hasFailure()) {
received.internalError();
} else if (received.codec() == CodecManager.PING_MESSAGE_CODEC) {
// 如果消息是 Ping 消息,则直接回复 Pong 消息到连接的套接字
// Just send back pong directly on connection
socket.write(PONG);
} else {
//调用父类的方法,这里还是一样进行信息接受处理
deliverMessageLocally(received);
}
}
}
};
//设置处理器和接受信息处理
parser.setOutput(handler);
socket.handler(parser);
};
}
我这里就讲下readFromWire
接受数据的作用,其他比如deliverMessageLocally这些就是本地调用的处理方法
received.readFromWire(buff, codecManager);
数据编码协议
/**
* 前面四字节长度(整个数据的长度)+
* 读取数据 协议版本(1位)+系统编码器索引(1位)+自定义编码器名字长度(4位)+自定义编码器名字长度
* +发送还是发布(1位)
* +读取地址长度(4位)+地址长度
* +回复地址长度(4位)+地址长度
* +发送者长度(4位)+发送者名字长度
* +消息头的长度(4位)+头的长度
* @param buffer
* @param codecManager
*/
public void readFromWire(Buffer buffer, CodecManager codecManager) {
int pos = 0;
// 从缓冲区中读取协议版本号
// Overall Length already read when passed in here
byte protocolVersion = buffer.getByte(pos);
if (protocolVersion > WIRE_PROTOCOL_VERSION) {
setFailure("Invalid wire protocol version " + protocolVersion + " should be <= " + WIRE_PROTOCOL_VERSION);
}
pos++;
//读取系统编解码器的编码索引
byte systemCodecCode = buffer.getByte(pos);
pos++;
//如果等于-1则就是用户自定义编码器
if (systemCodecCode == -1) {
// 读取长度,占四个字节,所以+4
int length = buffer.getInt(pos);
pos += 4;
//根据读取到的长度和原先pos的位置获取自定义编码器名字
byte[] bytes = buffer.getBytes(pos, pos + length);
//
String codecName = new String(bytes, CharsetUtil.UTF_8);
messageCodec = codecManager.getCodec(codecName);
//如果获取不到则记录失败
if (messageCodec == null) {
setFailure("No message codec registered with name " + codecName);
}
pos += length;
} else {
//这里就是有索引就直接获取
messageCodec = codecManager.systemCodecs()[systemCodecCode];
}
byte bsend = buffer.getByte(pos);
//判断是发送还是发布
send = bsend == 0;
pos++;
//读取消息地址 4位
int length = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + length);
address = new String(bytes, CharsetUtil.UTF_8);
pos += length;
//读取回复地址
length = buffer.getInt(pos);
pos += 4;
//如果地址不等于0则获取回复地址以及pos上加上长度
if (length != 0) {
bytes = buffer.getBytes(pos, pos + length);
replyAddress = new String(bytes, CharsetUtil.UTF_8);
pos += length;
}
//发送者4字节
length = buffer.getInt(pos);
pos += 4;
bytes = buffer.getBytes(pos, pos + length);
sender = new String(bytes, CharsetUtil.UTF_8);
pos += length;
//前面全部长度赋值
headersPos = pos;
//获取消息头的长度
int headersLength = buffer.getInt(pos);
pos += headersLength;
//接下来就是body的起始位置
bodyPos = pos;
wireBuffer = buffer;
fromWire = true;
}
4.4 consumer(订阅主题)
这里的方法和本地的是一样的,就是创建一个MessageConsumerImpl,具体解析在上一篇有
@Override
public <T> MessageConsumer<T> consumer(String address) {
//检查是否开始,如果没开始则抛出异常
checkStarted();
//检查地址是否为空
Objects.requireNonNull(address, "address");
return new MessageConsumerImpl<>(vertx, vertx.getOrCreateContext(), this, address, false);
}
4.5 handler(添加消费者)
这里与本地的EventBus一样
不一样的是里面register后续的执行步骤
@Override
public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
if (h != null) {
//保证只有一个handler注册
synchronized (this) {
handler = h;
if (!registered) {
registered = true;
Promise<Void> p = result;
Promise<Void> registration = context.promise();
//调用父类注册方法
register(null, localOnly, registration);
registration.future().onComplete(ar -> {
if (ar.succeeded()) {
p.tryComplete();
} else {
p.tryFail(ar.cause());
}
});
}
}
} else {
unregister();
}
return this;
}
register
synchronized void register(String repliedAddress, boolean localOnly, Promise<Void> promise) {
if (registered != null) {
throw new IllegalStateException();
}
//将该MessageConsumer用户添加到bus
registered = bus.addRegistration(address, this, repliedAddress != null, localOnly, promise);
if (bus.metrics != null) {
metric = bus.metrics.handlerRegistered(address, repliedAddress);
}
}
addRegistration
protected <T> HandlerHolder<T> addRegistration(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly, Promise<Void> promise) {
HandlerHolder<T> holder = addLocalRegistration(address, registration, replyHandler, localOnly);
//集群方法
onLocalRegistration(holder, promise);
return holder;
}
addLocalRegistration
private <T> HandlerHolder<T> addLocalRegistration(String address, HandlerRegistration<T> registration,
boolean replyHandler, boolean localOnly) {
Objects.requireNonNull(address, "address");
ContextInternal context = registration.context;
//创建HandlerHolder,作用就是把要执行(集群方法)
HandlerHolder<T> holder = createHandlerHolder(registration, replyHandler, localOnly, context);
//将新的holder添加到到的该地址的ConcurrentCyclicSequence里
ConcurrentCyclicSequence<HandlerHolder> handlers = new ConcurrentCyclicSequence<HandlerHolder>().add(holder);
ConcurrentCyclicSequence<HandlerHolder> actualHandlers = handlerMap.merge(
address,
handlers,
(old, prev) -> old.add(prev.first()));
//如果是部署的话增加关闭回调
if (context.isDeployment()) {
context.addCloseHook(registration);
}
return holder;
}
集群方法createHandlerHolder
这里唯一不同的就是会生成一个唯一码
private final AtomicLong handlerSequence = new AtomicLong(0);
protected <T> HandlerHolder<T> createHandlerHolder(HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly, ContextInternal context) {
return new ClusteredHandlerHolder<>(registration, replyHandler, localOnly, context, handlerSequence.getAndIncrement());
}
public class ClusteredHandlerHolder<T> extends HandlerHolder<T> {
//生成唯一标识
private final long seq;
public ClusteredHandlerHolder(HandlerRegistration<T> handler, boolean replyHandler, boolean localOnly, ContextInternal context, long seq) {
super(handler, replyHandler, localOnly, context);
this.seq = seq;
}
@Override
public long getSeq() {
return seq;
}
}
集群方法onLocalRegistration
protected <T> void onLocalRegistration(HandlerHolder<T> handlerHolder, Promise<Void> promise) {
//判断是否是回复处理器
if (!handlerHolder.isReplyHandler()) {
//如果是的话就创建RegistrationInfo,它内部就是节点信息nodeId,seq就是处理器序列号以及是否本地处理器
RegistrationInfo registrationInfo = new RegistrationInfo(
nodeId,
handlerHolder.getSeq(),
handlerHolder.isLocalOnly()
);
//接着把它添加进集群管理器进行注册
clusterManager.addRegistration(handlerHolder.getHandler().address, registrationInfo, Objects.requireNonNull(promise));
} else if (promise != null) {
promise.complete();
}
}
4.6 send(发送生产信息和编码数据)
@Override
public EventBus send(String address, Object message, DeliveryOptions options) {
//createMessage创建信息这里调用的是集群方法
MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName());
sendOrPubInternal(msg, options, null, null);
return this;
}
集群方法createMessage
这里与本地主要的区别是创建了clusteredMessage,这个方法构造函数多了一个参数就是nodeId,他是我们当前节点的唯一标识
// 本节点的唯一标识符
private String nodeId;
@Override
public MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
Objects.requireNonNull(address, "no null address accepted");
//选择信息解码器
MessageCodec codec = codecManager.lookupCodec(body, codecName, false);
@SuppressWarnings("unchecked")
//创建ClusteredMessage
ClusteredMessage msg = new ClusteredMessage(nodeId, address, headers, body, codec, send, this);
return msg;
}
public ClusteredMessage(String sender, String address, MultiMap headers, U sentBody,
MessageCodec<U, V> messageCodec, boolean send, EventBusImpl bus) {
super(address, headers, sentBody, messageCodec, send, bus);
this.sender = sender;
}
sendOrPubInternal
这里都是一样的就是判断是否开始,接着开始调用OutboundDeliveryContext方法,执行监听器和下一步操作
public <T> void sendOrPubInternal(OutboundDeliveryContext<T> senderCtx) {
//判断是否开始
checkStarted();
senderCtx.bus = this;
senderCtx.metrics = metrics;
//就是调用刚刚的context执行next方法,主要作用就是执行拦截器以及发送方法
senderCtx.next();
}
senderCtx.next()
这边都是一样的就是执行监听器以及execute方法
@Override
public void next() {
//判断现在是否正在执行
if (invoking) {
invokeNext = true;
} else {
//当前拦截器id是否小于拦截器数量
while (interceptorIdx < interceptors.length) {
Handler<DeliveryContext> interceptor = interceptors[interceptorIdx];
invoking = true;
interceptorIdx++;
//判断当前执行线程是否与总线线程相同
if (context.inThread()) {
//是的话则直接执行
context.dispatch(this, interceptor);
} else {
try {
//如果线程不同,则直接执行拦截器方法
interceptor.handle(this);
} catch (Throwable t) {
context.reportException(t);
}
}
//设置false并检查是否继续调用下一个拦截器
invoking = false;
if (!invokeNext) {
return;
}
invokeNext = false;
}
//所有拦截器执行完后将id设置为0
interceptorIdx = 0;
//调用execute方法,该方法实在子类里进行实现的
execute();
}
}
}
execute
该方法也是一样的
接下来才是重点了
bus.sendOrPub(this)
则是调用的集群方法
@Override
protected void execute() {
VertxTracer tracer = ctx.tracer();
//确认是否有追踪器
if (tracer != null) {
//如果信息为为被追踪,则将标为信息起点
if (message.trace == null) {
src = true;
BiConsumer<String, String> biConsumer = (String key, String val) -> message.headers().set(key, val);
TracingPolicy tracingPolicy = options.getTracingPolicy();
//没有指定则使用默认追踪策略
if (tracingPolicy == null) {
tracingPolicy = TracingPolicy.PROPAGATE;
}
//创建发送追踪请求
message.trace = tracer.sendRequest(ctx, SpanKind.RPC, tracingPolicy, message, message.send ? "send" : "publish", biConsumer, MessageTagExtractor.INSTANCE);
} else {
//如果消息存在就直接发送
// Handle failure here
tracer.sendResponse(ctx, null, message.trace, null, TagExtractor.empty());
}
}
// 实际执行消息的发送
bus.sendOrPub(this);
}
集群方法 bus.sendOrPub(this);
这里也会区分是回复信息还是本地信息,还是发送和发布
这里重点讲
serializer
:消息处理器
nodeSelector::selectForSend
节点选择器
protected <T> void sendOrPub(OutboundDeliveryContext<T> sendContext) {
//如果消息回复信息不为空则调用clusteredSendReply方法回复
if (((ClusteredMessage) sendContext.message).getRepliedTo() != null) {
clusteredSendReply(((ClusteredMessage) sendContext.message).getRepliedTo(), sendContext);
} else if (sendContext.options.isLocalOnly()) {
//如果是本地,则直接本地回复
super.sendOrPub(sendContext);
} else {
//没有回复信息
Serializer serializer = Serializer.get(sendContext.ctx);
//点对点发送
if (sendContext.message.isSend()) {
Promise<String> promise = sendContext.ctx.promise();
//选择目标节点
serializer.queue(sendContext.message, nodeSelector::selectForSend, promise);
promise.future().onComplete(ar -> {
// 如果选择目标节点成功,发送消息到目标节点
if (ar.succeeded()) {
sendToNode(sendContext, ar.result());
} else {
// 如果选择目标节点失败,处理发送失败
sendOrPublishFailed(sendContext, ar.cause());
}
});
} else {
// 如果消息是发布消息
Promise<Iterable<String>> promise = sendContext.ctx.promise();
// 使用序列化器排队消息并选择多个目标节点
serializer.queue(sendContext.message, nodeSelector::selectForPublish, promise);
promise.future().onComplete(ar -> {
if (ar.succeeded()) {
// 如果选择目标节点成功,发送消息到多个目标节点
sendToNodes(sendContext, ar.result());
} else {
sendOrPublishFailed(sendContext, ar.cause());
}
});
}
}
}
Serializer serializer = Serializer.get(sendContext.ctx);
public static Serializer get(ContextInternal context) {
// 从上下文中获取上下文数据
ConcurrentMap<Object, Object> contextData = context.contextData();
// 尝试从上下文数据中获取 Serializer 实例
Serializer serializer = (Serializer) contextData.get(Serializer.class);
if (serializer == null) {
// 如果未找到 Serializer 实例,则创建一个新的 Serializer 实例
Serializer candidate = new Serializer(context);
//将Serializer添加到contextData
Serializer previous = (Serializer) contextData.putIfAbsent(Serializer.class, candidate);
//返回Serializer
if (previous == null) {
serializer = candidate;
} else {
serializer = previous;
}
}
return serializer;
}
new Serializer
这里context
一定是eventloop
,目的就是为了执行数据的时候是顺序执行
private Serializer(ContextInternal context) {
ContextInternal unwrapped = context.unwrap();
//判断当前是否是EventLoopContext,如果不是的话则通过Vertx直接创建
if (unwrapped.isEventLoopContext()) {
ctx = unwrapped;
} else {
VertxInternal vertx = unwrapped.owner();
ctx = vertx.createEventLoopContext(unwrapped.nettyEventLoop(), unwrapped.workerPool(), unwrapped.classLoader());
}
//创建一个queues
queues = new HashMap<>();
if (unwrapped.isDeployment()) {
unwrapped.addCloseHook(this);
}
}
serializer.queue
public <T> void queue(Message<?> message, BiConsumer<Message<?>, Promise<T>> selectHandler, Promise<T> promise) {
ctx.emit(v -> {
String address = message.address();
//根据地址创建一个SerializerQueue队列
SerializerQueue queue = queues.computeIfAbsent(address, SerializerQueue::new);
//将消息、选择处理器和 Promise 添加到队列中
queue.add(message, selectHandler, promise);
});
}
queue.add
这里创建了一个SerializedTask
,内部就是消息和选择执行器。task则是一个linkedlist为了保证数据顺序
private final Queue<SerializedTask<?>> tasks;
//SerializerQueue构造方法
SerializerQueue(String address) {
this.address = address;
//linkedlist
this.tasks = new LinkedList<>();
}
<U> void add(Message<?> msg, BiConsumer<Message<?>, Promise<U>> selectHandler, Promise<U> promise) {
// 创建一个 SerializedTask 对象,将消息、选择处理器和 Promise 关联
SerializedTask<U> serializedTask = new SerializedTask<>(ctx, msg, selectHandler);
Future<U> fut = serializedTask.internalPromise.future();
//在 Future 完成时执行与 Promise 关联的操作
fut.onComplete(promise);
//在 Future 完成时执行 SerializedTask
fut.onComplete(serializedTask);
// 将任务添加到队列中
tasks.add(serializedTask);
// 检查是否有待处理的任务
checkPending();
}
checkPending
void checkPending() {
if (!running) {
running = true;
while (true) {
SerializedTask<?> task = tasks.peek();
// 循环处理队列中的任务
if (task != null) {
task.process();
if (tasks.peek() == task) {
// Task will be completed later
break;
}
} else {
queues.remove(address);
break;
}
}
running = false;
}
}
task.process()
这里就是最开始传进来的节点选择方法,由她来做具体操作
void process() {
selectHandler.accept(msg, internalPromise);
}
nodeSelector::selectForSend
这里主要是调用selectors
的withselector
方法,那我们继续跟下去
private Selectors selectors;
public void selectForSend(Message<?> message, Promise<String> promise) {
// 确保方法仅用于发送消息
Arguments.require(message.isSend(), "selectForSend used for publishing");
// 目标节点选择
selectors.withSelector(message, promise, (prom, selector) -> {
prom.tryComplete(selector.selectForSend());
});
}
withSelector
public <T> void withSelector(Message<?> message, Promise<T> promise, BiConsumer<Promise<T>, RoundRobinSelector> task) {
//获取消息地址
String address = message.address();
//如果地址不存在 则创建新的SelectorEntry
//已经存在但不是就绪状态则增加计数
SelectorEntry entry = map.compute(address, (addr, curr) -> {
return curr == null ? new SelectorEntry() : (curr.isNotReady() ? curr.increment() : curr);
});
if (entry.isNotReady()) {
//是否初始化,节点数量0则要初始化
if (entry.shouldInitialize()) {
//初始化
initialize(address);
}
// 在 SelectorPromise 完成后执行任务
entry.selectorPromise.future().onComplete(ar -> {
if (ar.succeeded()) {
task.accept(promise, ar.result());
} else {
promise.fail(ar.cause());
}
});
} else {
// 如果地址已经就绪,直接执行任务,并将 Promise 和选择器传递给任务处理函数
task.accept(promise, entry.selector);
}
}
previous.data(accessible)
选择相应的选择器,这里选择器具体实现我后面会单独讲
到这里位置就把前面的初始化方法initialize(address)
;讲完了,重新回去讲withSelector
方法
SelectorEntry data(List<String> nodeIds) {
if (nodeIds == null || nodeIds.isEmpty()) {
return null;
}
//根据节点计算权重
Map<String, Weight> weights = computeWeights(nodeIds);
RoundRobinSelector selector;
//判断是否均匀分布
if (isEvenlyDistributed(weights)) {
//如果是均匀分布则创建SimpleRoundRobinSelector(因为均匀分布吗,所以他是按照顺序)
selector = new SimpleRoundRobinSelector(new ArrayList<>(weights.keySet()));
} else {
//否则是WeightedRoundRobinSelector(它按照权重来)
selector = new WeightedRoundRobinSelector(weights);
}
return new SelectorEntry(selector, selectorPromise, counter);
}
task.accept(promise, ar.result());
withSelector
方法最后还是执行task.accept
task
就是我们这里withSelector方法的第三个参数
prom.tryComplete(selector.selectForSend());
这个方法就是选取相应的节点
selectors.withSelector(message, promise, (prom, selector) -> {
prom.tryComplete(selector.selectForSend());
});
接着重新回到sendOrPub方法内部
当节点获取成功后执行的是sendToNode
方法
sendToNode(sendContext, ar.result());
private <T> void sendToNode(OutboundDeliveryContext<T> sendContext, String nodeId) {
if (nodeId != null && !nodeId.equals(this.nodeId)) {
sendRemote(sendContext, nodeId, sendContext.message);
} else {
super.sendOrPub(sendContext);
}
}
private void sendRemote(OutboundDeliveryContext<?> sendContext, String remoteNodeId, MessageImpl message) {
// We need to deal with the fact that connecting can take some time and is async, and we cannot
// block to wait for it. So we add any sends to a pending list if not connected yet.
// Once we connect we send them.
// This can also be invoked concurrently from different threads, so it gets a little
// tricky
//获取远程连接
ConnectionHolder holder = connections.get(remoteNodeId);
//如果为空就创建一个远程连接
if (holder == null) {
// When process is creating a lot of connections this can take some time
// so increase the timeout
holder = new ConnectionHolder(this, remoteNodeId);
ConnectionHolder prevHolder = connections.putIfAbsent(remoteNodeId, holder);
if (prevHolder != null) {
// Another one sneaked in
holder = prevHolder;
} else {
holder.connect();
}
}
//开始发送信息
holder.writeMessage(sendContext);
}
holder.connect();
void connect() {
Promise<NodeInfo> promise = Promise.promise();
//获取该节点相关信息
eventBus.vertx().getClusterManager().getNodeInfo(remoteNodeId, promise);
//接着通过client连接,client在ClusteredEventBus初始化的时候就已经创建好了
promise.future()
.flatMap(info -> eventBus.client().connect(info.port(), info.host()))
.onComplete(ar -> {
if (ar.succeeded()) {
//成功连接后的信息处理
connected(ar.result());
} else {
log.warn("Connecting to server " + remoteNodeId + " failed", ar.cause());
close(ar.cause());
}
});
}
connected
private synchronized void connected(NetSocket socket) {
// 获取当前socket
this.socket = socket;
//设置为已连接
connected = true;
//异常处理
socket.exceptionHandler(err -> {
close(err);
});
//关闭处理
socket.closeHandler(v -> close());
//处理从socket内接受到的数据
socket.handler(data -> {
// Got a pong back
vertx.cancelTimer(timeoutID);
schedulePing();
});
schedulePing
这里会发送一个ping
的消息,就关乎到EventBus
的协议组成,我放在真正发送信息的地方讲
private void schedulePing() {
EventBusOptions options = eventBus.options();
pingTimeoutID = vertx.setTimer(options.getClusterPingInterval(), id1 -> {
// If we don't get a pong back in time we close the connection
timeoutID = vertx.setTimer(options.getClusterPingReplyInterval(), id2 -> {
// Didn't get pong in time - consider connection dead
log.warn("No pong from server " + remoteNodeId + " - will consider it dead");
close();
});
ClusteredMessage pingMessage =
new ClusteredMessage<>(remoteNodeId, PING_ADDRESS, null, null, new PingMessageCodec(), true, eventBus);
Buffer data = pingMessage.encodeToWire();
socket.write(data);
});
}
holder.writeMessage(sendContext);
这里重新讲回sendRemote
方法里的writeMessage
方法,这里进行数据编码之后直接调用socket.write直接发送
synchronized void writeMessage(OutboundDeliveryContext<?> ctx) {
//判断是否连接
if (connected) {
//组成协议
Buffer data = ((ClusteredMessage) ctx.message).encodeToWire();
if (metrics != null) {
metrics.messageWritten(ctx.message.address(), data.length());
}
//发送数据
socket.write(data).onComplete(ctx);
} else {
//如果没连接先将信息存储起来
if (pending == null) {
if (log.isDebugEnabled()) {
log.debug("Not connected to server " + remoteNodeId + " - starting queuing");
}
pending = new ArrayDeque<>();
}
pending.add(ctx);
}
}
encodeToWire
这个就是协议组成部分编码好协议后调用**socket.write(data).onComplete(ctx);**进行数据发送
public Buffer encodeToWire() {
toWire = true;
//定义buffer长度
int length = 1024; // TODO make this configurable
Buffer buffer = Buffer.buffer(length);
// 先占位 4 字节,表示整个消息的长度,后面会修改
buffer.appendInt(0);
//协议版本
buffer.appendByte(WIRE_PROTOCOL_VERSION);
//解析器id
byte systemCodecID = messageCodec.systemCodecID();
buffer.appendByte(systemCodecID);
//-1的话就是自定义协议名字
if (systemCodecID == -1) {
// User codec
writeString(buffer, messageCodec.name());
}
//发送还是发布
buffer.appendByte(send ? (byte) 0 : (byte) 1);
//发送地址
writeString(buffer, address);
//回复地址不为空则加上
if (replyAddress != null) {
writeString(buffer, replyAddress);
} else {
buffer.appendInt(0);
}
//发送者
writeString(buffer, sender);
//头部数据编码
encodeHeaders(buffer);
//body数据编码
writeBody(buffer);
//修改最开始占位的数据
buffer.setInt(0, buffer.length() - 4);
return buffer;
}
encodeHeaders
private void encodeHeaders(Buffer buffer) {
//如果headers不为空
if (headers != null && !headers.isEmpty()) {
int headersLengthPos = buffer.length();
//先缓存占位
buffer.appendInt(0);
//获取头部数据的长度写入
buffer.appendInt(headers.entries().size());
List<Map.Entry<String, String>> entries = headers.entries();
//把map数据循环写入
for (Map.Entry<String, String> entry: entries) {
writeString(buffer, entry.getKey());
writeString(buffer, entry.getValue());
}
//设置原先占位的长度
int headersEndPos = buffer.length();
buffer.setInt(headersLengthPos, headersEndPos - headersLengthPos);
} else {
//为空就一个长度
buffer.appendInt(4);
}
}
4.7 节点算法
4.7.1 说明
它有一个接口内部方法就是选择单个节点和获取全部节点
一共默认两种实现方式,
第一种就是平均分配。
第二种就是根据权重分配
都是实现RoundRobinSelector
接口
public interface RoundRobinSelector {
String selectForSend();
Iterable<String> selectForPublish();java
}
//这个类是用来取数的
class Index implements IntUnaryOperator {
private final int max;
private final AtomicInteger idx = new AtomicInteger(0);
Index(int max) {
this.max = max;
}
int nextVal() {
//继承了IntUnaryOperator所以根据applyAsInt来计算值
//如果等于max-1则从0开始不然就是+1
return idx.getAndUpdate(this);
}
//当到达max-1就直接归0
@Override
public int applyAsInt(int i) {
return i == max - 1 ? 0 : i + 1;
}
}
4.7.2 轮询节点选择
class SimpleRoundRobinSelector implements RoundRobinSelector {
private final List<String> nodeIds;
private final Index index;
SimpleRoundRobinSelector(List<String> nodeIds) {
if (nodeIds.size() > 1) {
this.nodeIds = Collections.unmodifiableList(nodeIds);
//就是Index里面包含一个最大值和顺序增加的值
index = new Index(nodeIds.size());
} else {
this.nodeIds = Collections.singletonList(nodeIds.get(0));
index = null;
}
}
@Override
public String selectForSend() {
//获取就是根据顺序来
if (index == null) return nodeIds.get(0);
return nodeIds.get(index.nextVal());
}
@Override
public Iterable<String> selectForPublish() {
return nodeIds;
}
}
4.7.3 权重选择器
它的原理就是先按照权重小到大进行排序,根据你的权重构造执行范围区间,在区间内来循环目前所在的节点
class WeightedRoundRobinSelector implements RoundRobinSelector {
//存储唯一值nodeId
private final List<String> uniqueIds;
private final TreeMap<Integer, Integer> offsets = new TreeMap<>();
private final Index index;
WeightedRoundRobinSelector(Map<String, Weight> weights) {
List<String> uniqueIds = new ArrayList<>(weights.size());
// 创建一个用于排序节点的列表,按照权重值排序
List<Map.Entry<String, Weight>> sorted = new ArrayList<>(weights.entrySet());
sorted.sort(Map.Entry.comparingByValue());
// 计算节点的总权重
int totalWeight = 0;
int increment, limit;
for (int i = 0; i < sorted.size(); i++) {
Map.Entry<String, Weight> current = sorted.get(i);
uniqueIds.add(current.getKey());
//获取当前节点的权重
int weight = current.getValue().value();
totalWeight += weight;
if (i < sorted.size() - 1) {
//如果i==0则前一个权重为0,如果不为0则获取前一个权重的值,并且将当前权重减去前一个权重=权重增加值
increment = weight - (i == 0 ? 0 : sorted.get(i - 1).getValue().value());
//上一个节点的偏移量,如果位第一个则为0 + 为计算权重数量乘以权重增量,这是因为要确定范围值
limit = (i == 0 ? 0 : offsets.lastKey()) + (weights.size() - i) * increment;
//将计算出来的偏移量进行存储
offsets.put(limit, i + 1);
}
}
//构造不可改变list
this.uniqueIds = Collections.unmodifiableList(uniqueIds);
//初始化,上线用全部的权重值相加,为了当总权重次数用完了哦,处于
index = new Index(totalWeight);
}
@Override
public String selectForSend() {
//从index顺序获取
int idx = index.nextVal();
//获取小于idx最小的键值对
Map.Entry<Integer, Integer> entry = offsets.floorEntry(idx);
//如果找不到,则用idx的余数。保证在循环内
if (entry == null) return uniqueIds.get(idx % uniqueIds.size());
//获取偏移量值,获取相应的值
int offset = entry.getValue();
//如果偏移量等于节点列表的大小减一,那么这表示它是最后一个节点,因此选择最后一个节点。
if (offset == uniqueIds.size() - 1)
return uniqueIds.get(offset);
//uniqueIds.size() - offset) 偏移量位置到列表末尾的节点数
//idx % (uniqueIds.size() - offset)轮询索引除以剩余节点数的余数,这里的idx决定了我们在这三个剩余节点中选择哪一个
//举例子,如果idx为0,那么offset + idx % (uniqueIds.size() - offset)就等于1 + 0% 3,也就是选择节点B。
return uniqueIds.get(offset + idx % (uniqueIds.size() - offset));
}
@Override
public Iterable<String> selectForPublish() {
return uniqueIds;
}
}
5. 总结
整体流程:
- 节点会起来一个接受信息的SocketServer服务,以及一个用于发送信息的SocketClient客户端
- 并且会把当前节点通过ClusteredMessage注册到集群管理器中
- 当要发送信息的时候根据选择选择相应的节点
- 再根据是发送还是发布,选择所有的注册点来进行处理。
- 如果是发送选择一个节点,被选中的节点选择其中一个处理器进行处理。
- 如果是发布则选择所有的节点进行信息发送
如果哪里有错误,欢迎指正。
如果哪里有不明白的地方,欢迎讨论