【Nacos】Nacos服务注册源码分析(二)

news2025/1/8 5:12:04

在上篇文章中,我们主要聚焦于Nacos服务注册在服务端grpc设计层面的一些代码。本篇文章将深入探讨服务注册的相关逻辑,通过细致的分析来更全面地理解这一过程。


NamingGrpcClientProxy.registerService

我们从NamingGrpcClientProxyregisterService方法看起。不清楚为什么从这里看起的,请看上篇文章。

@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
            instance);
    redoService.cacheInstanceForRedo(serviceName, groupName, instance);
    doRegisterService(serviceName, groupName, instance);
}

1.cacheInstanceForRedo

看下redoService.cacheInstanceForRedo(serviceName, groupName, instance);的实现:

private final ConcurrentMap<String, InstanceRedoData> registeredInstances = new ConcurrentHashMap<>();

public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) {
    String key = NamingUtils.getGroupedName(serviceName, groupName);
    InstanceRedoData redoData = InstanceRedoData.build(serviceName, groupName, instance);
    synchronized (registeredInstances) {
        registeredInstances.put(key, redoData);
    }
}

可以看到这其实就是将当前服务实例放到一个map中,这个map的含义是:已注册实例。

2.doRegisterService

public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
	//构建请求对象request
    InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
            NamingRemoteConstants.REGISTER_INSTANCE, instance);
    requestToServer(request, Response.class);
    redoService.instanceRegistered(serviceName, groupName);
}

requestToServer很显然就是实际请求server端去创建客户端实例的代码,下一行的redoService.instanceRegistered(serviceName, groupName) 这里的redoService是不是有些眼熟?上边cacheInstanceForRedo做的事情是在一个map中添加当前客户端实例,instanceRegistered是把这个实例标记为已注册(this.registered = true)。

从一致性的角度来讲,requestToServer如果出现了意外case,那么就一定会抛出异常,继而redoService.instanceRegistered就不会执行,也就不会出现一致性问题。

OK,接下来我们只需要看requestToServer就好。

private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
        throws NacosException {
    try {
        request.putAllHeader(
                getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));

		//使用rpcClient去请求服务端
        Response response =
                requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
        if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
            throw new NacosException(response.getErrorCode(), response.getMessage());
        }
        if (responseClass.isAssignableFrom(response.getClass())) {
            return (T) response;
        }
        NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
                response.getClass().getName(), responseClass.getName());
    } catch (NacosException e) {
        throw e;
    } catch (Exception e) {
        throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
    }
    throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
}

我们着重看下请求服务端的代码rpcClient.request(request)

/**
 * send request.
 *
 * @param request request.
 * @return response from server.
 */
public Response request(Request request, long timeoutMills) throws NacosException {
    //用来做请求重试
    int retryTimes = 0;
    Response response;
    Throwable exceptionThrow = null;
    long start = System.currentTimeMillis();
    while (retryTimes < rpcClientConfig.retryTimes() && System.currentTimeMillis() < timeoutMills + start) {
        boolean waitReconnect = false;
        try {
            if (this.currentConnection == null || !isRunning()) {
                waitReconnect = true;
                throw new NacosException(NacosException.CLIENT_DISCONNECT,
                        "Client not connected, current status:" + rpcClientStatus.get());
            }
            //发起请求的代码
            response = this.currentConnection.request(request, timeoutMills);
            if (response == null) {
                throw new NacosException(SERVER_ERROR, "Unknown Exception.");
            }
            if (response instanceof ErrorResponse) {
                if (response.getErrorCode() == NacosException.UN_REGISTER) {
                    synchronized (this) {
                        waitReconnect = true;
                        if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                            LoggerUtils.printIfErrorEnabled(LOGGER,
                                    "Connection is unregistered, switch server, connectionId = {}, request = {}",
                                    currentConnection.getConnectionId(), request.getClass().getSimpleName());
                            switchServerAsync();
                        }
                    } 
                }
                throw new NacosException(response.getErrorCode(), response.getMessage());
            }
            // return response.
            lastActiveTimeStamp = System.currentTimeMillis();
            return response;
            
        } catch (Throwable e) {
            if (waitReconnect) {
                try {
                    // wait client to reconnect.
                    Thread.sleep(Math.min(100, timeoutMills / 3));
                } catch (Exception exception) {
                    // Do nothing.
                }
            }
            
            LoggerUtils.printIfErrorEnabled(LOGGER,
                    "Send request fail, request = {}, retryTimes = {}, errorMessage = {}", request, retryTimes,
                    e.getMessage());
            
            exceptionThrow = e;
        }
        retryTimes++;  
    }
    
    if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
        switchServerAsyncOnRequestFail();
    }
    
    if (exceptionThrow != null) {
        throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow
                : new NacosException(SERVER_ERROR, exceptionThrow);
    } else {
        throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");
    }
}

代码比较长,主要还是添加了重试逻辑。我们只要关注实际发起请求的代码this.currentConnection.request(request, timeoutMills)


currentConnection是怎么来的?

/**
 * Start this client.
 */
public final void start() throws NacosException {
    
    boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
    if (!success) {
        return;
    }
    
    clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
        Thread t = new Thread(r);
        t.setName("com.alibaba.nacos.client.remote.worker");
        t.setDaemon(true);
        return t;
    });
    
    // connection event consumer.
    clientEventExecutor.submit(() -> {
        while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
            ConnectionEvent take;
            try {
                take = eventLinkedBlockingQueue.take();
                if (take.isConnected()) {
                    notifyConnected();
                } else if (take.isDisConnected()) {
                    notifyDisConnected();
                }
            } catch (Throwable e) {
                // Do nothing
            }
        }
    });
    
    clientEventExecutor.submit(() -> {
        while (true) {
            try {
                if (isShutdown()) {
                    break;
                }
                ReconnectContext reconnectContext = reconnectionSignal
                        .poll(rpcClientConfig.connectionKeepAlive(), TimeUnit.MILLISECONDS);
                if (reconnectContext == null) {
                    // check alive time.
                    if (System.currentTimeMillis() - lastActiveTimeStamp >= rpcClientConfig.connectionKeepAlive()) {
                        boolean isHealthy = healthCheck();
                        if (!isHealthy) {
                            if (currentConnection == null) {
                                continue;
                            }
                            LoggerUtils.printIfInfoEnabled(LOGGER,
                                    "[{}] Server healthy check fail, currentConnection = {}",
                                    rpcClientConfig.name(), currentConnection.getConnectionId());
                            
                            RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
                            if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
                                break;
                            }
                            
                            boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
                                    .compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
                            if (statusFLowSuccess) {
                                reconnectContext = new ReconnectContext(null, false);
                            } else {
                                continue;
                            }
                            
                        } else {
                            lastActiveTimeStamp = System.currentTimeMillis();
                            continue;
                        }
                    } else {
                        continue;
                    }
                    
                }
                
                if (reconnectContext.serverInfo != null) {
                    // clear recommend server if server is not in server list.
                    boolean serverExist = false;
                    for (String server : getServerListFactory().getServerList()) {
                        ServerInfo serverInfo = resolveServerInfo(server);
                        if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
                            serverExist = true;
                            reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
                            break;
                        }
                    }
                    if (!serverExist) {
                        LoggerUtils.printIfInfoEnabled(LOGGER,
                                "[{}] Recommend server is not in server list, ignore recommend server {}",
                                rpcClientConfig.name(), reconnectContext.serverInfo.getAddress());
                        
                        reconnectContext.serverInfo = null;
                        
                    }
                }
                reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
            } catch (Throwable throwable) {
                // Do nothing
            }
        }
    });
    
    // connect to server, try to connect to server sync retryTimes times, async starting if failed.
    Connection connectToServer = null;
    rpcClientStatus.set(RpcClientStatus.STARTING);
    
    int startUpRetryTimes = rpcClientConfig.retryTimes();
    while (startUpRetryTimes > 0 && connectToServer == null) {
        try {
            startUpRetryTimes--;
            ServerInfo serverInfo = nextRpcServer();
            
            LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}",
                    rpcClientConfig.name(), serverInfo);
            
            connectToServer = connectToServer(serverInfo);
        } catch (Throwable e) {
            LoggerUtils.printIfWarnEnabled(LOGGER,
                    "[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}",
                    rpcClientConfig.name(), e.getMessage(), startUpRetryTimes, e);
        }
        
    }
    
    if (connectToServer != null) {
        LoggerUtils
                .printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}",
                        rpcClientConfig.name(), connectToServer.serverInfo.getAddress(),
                        connectToServer.getConnectionId());
        this.currentConnection = connectToServer;
        rpcClientStatus.set(RpcClientStatus.RUNNING);
        eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
    } else {
        switchServerAsync();
    }
    
    registerServerRequestHandler(new ConnectResetRequestHandler());
    
    // register client detection request.
    registerServerRequestHandler(request -> {
        if (request instanceof ClientDetectionRequest) {
            return new ClientDetectionResponse();
        }
        
        return null;
    });
    
}

在这里插入图片描述
观察到currentConnection是在start方法中赋值的,继续追踪代码我们发现这一切还是源于NamingGrpcClientProxy的构造方法中调用。
在这里插入图片描述
回到connectToServer(serverInfo)方法,看一下是如何去拿到链接的。

//抽象方法,子类去实现
public abstract Connection connectToServer(ServerInfo serverInfo) throws Exception;

//GrpcClient的实现
@Override
public Connection connectToServer(ServerInfo serverInfo) {
    try {
        if (grpcExecutor == null) {
        	//创建一个线程池
            this.grpcExecutor = createGrpcExecutor(serverInfo.getServerIp());
        }
        int port = serverInfo.getServerPort() + rpcPortOffset();
		
		//根据ip和端口创建ManagedChannel -> io.grpc中的类
        ManagedChannel managedChannel = createNewManagedChannel(serverInfo.getServerIp(), port);
        RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(managedChannel);
        if (newChannelStubTemp != null) {

            Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
            if (response == null || !(response instanceof ServerCheckResponse)) {
                shuntDownChannel(managedChannel);
                return null;
            }

            BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(
                    newChannelStubTemp.getChannel());
            GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
            grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());

            //create stream request and bind connection event to this connection.
            StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);

            // stream observer to send response to server
            grpcConn.setPayloadStreamObserver(payloadStreamObserver);
            grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
            grpcConn.setChannel(managedChannel);
            //send a  setup request.
            ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
            conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
            conSetupRequest.setLabels(super.getLabels());
            conSetupRequest.setAbilities(super.clientAbilities);
            conSetupRequest.setTenant(super.getTenant());
            grpcConn.sendRequest(conSetupRequest);
            //wait to register connection setup
            Thread.sleep(100L);
            return grpcConn;
        }
        return null;
    } catch (Exception e) {
        LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e);
    }
    return null;
}

总结

本篇算是填了第一篇客户端源码分析的坑,着重看了发起grpc请求链路上的源码,整体上还是比较清晰易懂的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1018952.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue项目中使用element-plus的el-table组件-组件使用-样式修改

项目配置 <div class"table-wrap"><el-tableclass"table-card-container":data"tableData"style"width: 100%"><template v-for"column in tableColumn"><el-table-columnv-if"column.isShow&qu…

前端笔试2

1.下面哪一个是检验对象是否有一个以自身定义的属性? foo.hasOwnProperty("bar")bar in foo foo["bar"] ! undefinedfoo.bar ! null 解析&#xff1a; bar in foo 检查 foo 对象是否包含名为 bar 的属性&#xff0c;但是这个属性可以是从原型链继承来的&a…

华为云云耀云服务器L实例评测|轻量级应用服务器对决:基于 fio 深度测评华为云云耀云服务器L实例的磁盘性能

本文收录在专栏&#xff1a;#云计算入门与实践 - 华为云 专栏中&#xff0c;本系列博文还在更新中 相关华为云云耀云服务器L实例评测文章列表如下&#xff1a; 华为云云耀云服务器L实例评测 | 从零开始&#xff1a;云耀云服务器L实例的全面使用解析指南华为云云耀云服务器L实…

JVM:如何判断对象已死?

对象已死&#xff1f; 在堆里面存放着Java世界中几乎所有的对象实例&#xff0c;垃圾收集器在对堆进行回收前&#xff0c;第一件事情就是要确定这些对象之中哪些还“存活”着&#xff0c;哪些已经“死去”&#xff08;“死去”即不可能再被任何途径使用的对象&#xff09;了。…

2023/09/17

文章目录 1. vscode展开所有代码快捷键ctrl k j2. git删除所有stash或指定stash git stash drop [可选stash名]3. vue在函数默认参数后增加新参数4. git push 添加“-u”参数5. vscode快捷输入符号$的使用6. WebGL之什么是GLB&GLTF文件&#xff1f;7. WebGL之什么是HDR&a…

python之pyQt5实例:Matplotlib的应用

1、显示逻辑 1.1MatplotlibWidget.py import sys import random import matplotlibmatplotlib.use("Qt5Agg") from PyQt5 import QtCore from PyQt5.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QSizePolicy, QWidget from numpy import arange, si…

nginx日志分割

nginx日志分割 有的用nginx做代理的&#xff0c;日志产生的特别多&#xff0c;而nginx的日志又是一整个&#xff0c;所以需要我们自己来写分割脚本了 小白教程&#xff0c;一看就会&#xff0c;一做就成。 1.写脚本 #!/bin/bash #nginx日志分割 LOGPATH/home/oldlogs CURLOG…

多目标优化算法:基于非支配排序的小龙虾优化算法(NSCOA)MATLAB

一、小龙虾优化算法COA 小龙虾优化算法&#xff08;Crayfsh optimization algorithm&#xff0c;COA&#xff09;由Jia Heming 等人于2023年提出&#xff0c;该算法模拟小龙虾的避暑、竞争和觅食行为&#xff0c;具有搜索速度快&#xff0c;搜索能力强&#xff0c;能够有效平衡…

前端用JavaScript实现桑基图(Sankey图)

前端用JavaScript实现桑基图&#xff08;Sankey图&#xff09; 桑基图&#xff08;Sankey图&#xff09;&#xff0c;是流图的一种&#xff0c;常用来展示事物的数量、发展方向、数据量大小等&#xff0c;在可视化分析中经常使用。 本文&#xff0c;演示如何在前端用JavaScri…

Spring Boot - Rest VS GraphQL

文章目录 概述图解CodeSpring Boot RestSpring Boot GraphQL 概述 REST&#xff08;Representational State Transfer&#xff09;和GraphQL都是用于构建Web服务的API设计和交互方式&#xff0c;它们有不同的特点和优劣势。 REST&#xff08;Representational State Transf…

PHP自己的框架2.0加载控制器并运行(重构篇四)

目录 1、加载控制器并运行 2、创建admin和index模块控制器 3、自动加载控制器文件 4、运行控制器中方法 5、运行模块下控制器方法 1、加载控制器并运行 2、创建admin和index模块控制器 <?php namespace app\admin\controller; class index{public function index(){ech…

【pytest】生成测试报告

0. 脚本&#xff1a; fixture/test_fixtures_02.py # 功能函数 import pytestdef multiply(a, b):return a * bclass TestMultiply:# fixturesclassmethoddef setup_class(cls):print("setup_class>")classmethoddef teardown_class(cls):print("teardown_c…

在PHP8中对数组进行计算-PHP8知识详解

在php8中&#xff0c;提供了丰富的计算函数&#xff0c;可以对数组进行计算操作。常见的计算函数如下几个&#xff1a;array_sum()函数、array_merge()函数、array_diff()函数、array_diff_assoc()函数、array_intersect()函数、array_intersect_assoc()函数。 1、array_sum()函…

MySQL数据库——MySQL8.0.26-Linux版安装

目录 准备Linux服务器 下载安装包 上传安装包 创建目录,并解压 安装mysql的安装包 启动MySQL服务 查询自动生成的root用户密码 修改root用户密码 创建用户 并给root用户分配权限 通过DataGrip远程连接MySQL 准备Linux服务器 云服务器或者虚拟机&#xff0c; Linux…

P2P协议的传输艺术

TP 采用两个 TCP 连接来传输一个文件。 控制连接&#xff1a;服务器以被动的方式&#xff0c;打开众所周知用于 FTP 的端口 21&#xff0c;客户端则主动发起连接。该连接将命令从客户端传给服务器&#xff0c;并传回服务器的应答。常用的命令有&#xff1a;list——获取文件目…

图解曲面积分的对称性

1.图解曲面积分的对称性 1.1 第一类曲面积分的一般对称性 二重积分、三重积分、第一类曲线积分、第一类曲面积分的一般对称性其原理都类似 平面和空间曲面的原理一样&#xff0c;以下内容以空间曲面为例 图中所示为积分区域 Σ \Sigma Σ&#xff0c;函数 f ( x , y , z ) f(…

PostgreSQL缓存管理

缓冲区管理器、存储和后端进程之间的关系 缓存管理结构 PostgreSQL 缓冲区管理器由buffer table、buffer descriptors和buffer pool组成。buffer pool层存储表和索引等数据文件页&#xff0c;以及空闲空间映射和可见性映射。buffer pool是一个数组&#xff0c;每个槽存储数据文…

Redis RedLock算法和底层源码分析

Redlock红锁算法 官网地址&#xff1a;Distributed Locks with Redis | Redis 为什么要使用RedLock&#xff1f; 解释&#xff1a; 线程 1 首先获取锁成功&#xff0c;将键值对写入 redis 的 master 节点&#xff0c;在 redis 将该键值对同步到 slave 节点之前&#xff0c;mas…

vue+express、gitee pm2部署轻量服务器

一、代码配置 前后端接口都保持 127.0.0.1:3000 vue创建文件 pm2.config.cjs module.exports {apps: [{name: xin-web, // 应用程序的名称script: npm, // 启动脚本args: run dev, // 启动脚本的参数cwd: /home/vue/xin_web, // Vite 项目的根目录interpreter: none, // 告诉…

java基础-并发编程-CountDownLatch(JDK1.8)源码学习

CountDownLatch方法调用与类关系图 一、初始化&#xff1a;public CountDownLatch(int count) public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync new Sync(count);}Sync(int count) {// 将参数…