SpringCloudAlibaba系列之Nacos服务注册与发现

news2024/11/17 4:48:57

目录

说明

认识注册中心

Nacos架构图

Nacos服务注册与发现实现原理总览

SpringCloud服务注册规范

服务注册

心跳机制与健康检查

服务发现

主流服务注册中心对比

小小收获


说明

本篇文章主要目的是从头到尾比较粗粒度的分析Nacos作为注册中心的一些实现,很多细节没有涉及,希望能给大家带来一定的启发。其中的源码是1.x版本的,虽然和2.x版本会有不同,但它们实现注册中心的思路都是类似。对于服务中心,当我们了解了一个的实现原理,知道了它的技术本质之后,再去了解和学习其他注册中心就会更加游刃有余,因为它们的设计思想是相通的,解决的问题是一样的。如果大家对其中更多的实现细节感兴趣,可以留言区留言大家一起讨论。下面就让我们一起开始它的探索之旅吧!

认识注册中心

如果没有注册中心,情况很可能是这样的:服务消费者需要在本地维护一个服务提供者的节点列表;如果服务提供者有新上线的节点或者有旧节点需要下线,服务消费者都需要及时去同步删除对应的节点信息。注册中心的出现,将所有的服务节点信息集中管理,并将前面提到的这些事情全部自动化。

在微服务架构下,注册中心的作用主要体现在下面几个方面:

  • 服务地址管理
  • 服务注册
  • 服务动态感知

Nacos架构图

学习任何技术,我们首先看下它官方的架构图,有个整体的认识。Nacos架构图如下:

核心内容就是:Nacos Server作为Nacos的服务端,其中的Naming Service模块提供了注册中心管理服务,然后对外提供了OpenAPI接口供客户端调用。实际应用当中,我们是通过Nacos客户端SDK来完成相关接口的调用的,SDK屏蔽了所有接口调用的细节,我们只需要完成相关的配置即可。 

核心Open API接口如下:

服务注册:/nacos/v1/ns/instance (POST)

服务实例获取:/nacos/v1/ns/instance/list (GET)

服务监听:/nacos/v1/ns/instance/list (GET)

Nacos服务注册与发现实现原理总览

  1.  服务提供者使用Open API发起服务注册;
  2. 客户端与服务端建立心跳机制,检测服务状态;
  3. 客户端(服务消费者)查询服务提供方实例列表;
  4. 定时任务定期(默认10s)拉取一次服务端数据到客户端(服务消费者);
  5. Nacos服务端检测到服务提供者异常,基于UDP协议推送更新到客户端(服务消费者)。

SpringCloud服务注册规范

 核心类ServiceRegistry,它是Spring Cloud提供的服务注册标准。集成到Spring Cloud中实现服务注册的组件,都会实现该接口。该接口定义如下:

package org.springframework.cloud.client.serviceregistry;
public interface ServiceRegistry<R extends Registration> {
    void register(R registration);
    void deregister(R registration);
    void close();
    void setStatus(R registration, String status);
    <T> T getStatus(R registration);
}

服务注册

Spring Cloud Alibaba Nacos作为注册中心,它在具体项目中是如何开始服务注册的呢?不论我们在项目中是通过什么样的方式集成Nacos,服务注册的开启方式是:应用程序启动之后,发布相关的事件,然后基于spring的事件监听,去调用ServiceReistry的register方法。因为ServiceRegistry是一个接口,所以当我们在应用中集成了Nacos,实际调用的时候会执行对应实现类的register方法,这里另一个核心类就出来了,它就是NacosServiceRegistry。NacosServiceRegistry主要做了如下两个事情:

  1. 通过Nacos客户端SDK调用Nacos服务端提供的Open API接口完成服务的注册,对应的接口为:nacos/v1/ns/instance。
  2. 向服务端定时发送心跳(服务端确保注册服务健康的手段)。

这里我们先分析第1点,第2点后面单独分析。服务注册的时候,Nacos客户端一些关键的实现源码如下:

public void register(Registration registration) {
    if (StringUtils.isEmpty(registration.getServiceId())) {
        log.warn("No service to register for nacos client...");
    } else {
        NamingService namingService = this.namingService();
        String serviceId = registration.getServiceId();
        String group = this.nacosDiscoveryProperties.getGroup();
        Instance instance = this.getNacosInstanceFromRegistration(registration);
        try {
            //核心方法(服务注册入口)
            namingService.registerInstance(serviceId, group, instance);
            log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()});
        } catch (Exception var7) {
            log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var7});
            ReflectionUtils.rethrowRuntimeException(var7);
        }
    }
}

通过反射构造NamingService,它是一个接口,该类封装了和Nacos服务端的各种交互,对应的实现类是NacosNamingService。

public static NamingService createNamingService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        NamingService vendorImpl = (NamingService)constructor.newInstance(properties);
        return vendorImpl;
    } catch (Throwable var4) {
        throw new NacosException(-400, var4);
    }
}

 NacosNamingService构造方法中会调用一个init方法

private void init(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    this.namespace = InitUtils.initNamespaceForNaming(properties);
    InitUtils.initSerialization();
    this.initServerAddr(properties);
    InitUtils.initWebRootContext();
    this.initCacheDir();
    this.initLogName(properties);
    this.eventDispatcher = new EventDispatcher();
    this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
    //客户端心跳发送定时任务在BeatReactor中(BeatTask)
    this.beatReactor = new BeatReactor(this.serverProxy, this.initClientBeatThreadCount(properties));
    this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, this.beatReactor, this.cacheDir, this.isLoadCacheAtStart(properties), this.initPollingThreadCount(properties));
}

 大家应该注意到了,心跳发送的定时任务是在这里初始化的!!!

说了这么多,是哪里调用服务端的注册地址呢?

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    if (instance.isEphemeral()) {
        //创建心跳信息实现健康检查,Nacos Server必须要确保注册的服务实例是健康的,而心跳检查就是服务健康检测的手段。
        BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance);
        //心跳发送定时任务BeatTask在这个方法中被运行...
        this.beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }

    //serverProxy.registerService实现服务注册(/nacos/v1/ns/instance)
    this.serverProxy.registerService(groupedServiceName, groupName, instance);
}

完成服务的注册,客户端的实现基本上是这样。那么客户端发出了服务注册请求之后,服务端会做哪些事情呢?对应到服务端,服务注册的实现代码在nacos-naming模块下的InstanceController类中。

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    
    final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    
    final Instance instance = HttpRequestInstanceBuilder.newBuilder()
            .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
    
    getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

这个controller方法做了两个事情:

  • 从请求参数中获取namespaceId、serviceName和实例信息Instance;
  • 调用registerInstance注册实例信息。

registerInstance方法具体实现如下: 

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    
    Service service = getService(namespaceId, serviceName);
    
    checkServiceIsNull(service, namespaceId, serviceName);
    
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

第一步:创建一个空服务(在Nacos控制台服务列表中展示的服务信息),实际上是初始化一个serviceMap,它是一个ConcurrentHashMap集合,一个双层Map结构。

/**
 * Map(namespace, Map(group::serviceName, Service)).
 */
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

 第二步:getService,从serviceMap中根据namespaceId和serviceName得到一个服务对象。

第三步:调用addInstance添加服务实例。

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
        throws NacosException {
    //1.根据namespaceId和serviceName从缓存中获取Service实例。
    //2.如果Service实例为空,则创建并保存到缓存中。
    Service service = getService(namespaceId, serviceName);
    if (service == null) {
        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
        service = new Service();
        service.setName(serviceName);
        service.setNamespaceId(namespaceId);
        service.setGroupName(NamingUtils.getGroupName(serviceName));
        // now validate the service. if failed, exception will be thrown
        service.setLastModifiedMillis(System.currentTimeMillis());
        service.recalculateChecksum();
        if (cluster != null) {
            cluster.setService(service);
            service.getClusterMap().put(cluster.getName(), cluster);
        }
        service.validate();
        
        putServiceAndInit(service);
        if (!local) {
            addOrReplaceService(service);
        }
    }
}

这里我们重点看一下putServiceAndInit方法:

private void putServiceAndInit(Service service) throws NacosException {
    //1.通过putService将服务缓存到内存。
    putService(service);
    service = getService(service.getNamespaceId(), service.getName());
    //2.service.init()建立心跳检测机制(ClientBeatCheckTask)。它主要是通过定时任务不断检测当前服务下所有实例最后发送心跳包的时间。
    //15s没有收到客户端发送的心跳,服务健康状态设置为false;30s没有收到客户端发送的心跳,服务实例移除。
    //如果超时,则设置healthy为false,表示服务不健康,并且发送服务变更事件。这里可以思考一下,服务实例的最后心跳包更新时间是由谁来触发的(nacos/vs/ns/beat中的service.processClientBeat(clientBeat) ClientBeatProcessor)。
    service.init();
    //3.consistencyService.listen实现数据一致性的监听。
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

service.init方法中会启动服务端的心跳检测机制ClientBeatCheckTask,具体实现见下面的心跳机制与健康检查。

最后,addInstance方法保存服务实例:

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
        throws NacosException {
    
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    
    Service service = getService(namespaceId, serviceName);
    
    synchronized (service) {
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
        
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);
        
        consistencyService.put(key, instances);
    }
}

服务注册总结:

1.客户端通过调用OpenAPI的形式发起服务注册请求(POST请求发送请求/nacos/v1/ns/instance); 

2.服务端收到请求后会做下面几件事情:

  • 构建一个Service对象保存到ConcurrentHashMap集合中。
  • 使用定时任务对当前服务下的所有实例建立心跳检测机制(ClientBeatCheckTask)。
  • 基于数据一致性协议将服务数据进行同步(Raft一致性协议)。

心跳机制与健康检查

心跳机制是Nacos作为注册中心检测服务是否健康的重要手段,接下来我们就来详细看看客户端和服务端各自的实现。

前面我们已经知道了客户端发送心跳的时机,这里我们看看下客户端的核心实现代码:

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    String key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    BeatInfo existBeat = null;
    if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) {
        existBeat.setStopped(true);
    }

    this.dom2Beat.put(key, beatInfo);
    //定时发送心跳包(默认period为5s)
    this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
}
package com.alibaba.nacos.client.naming.beat;

public class BeatReactor implements Closeable {
    private final ScheduledExecutorService executorService;
    private final NamingProxy serverProxy;
    private boolean lightBeatEnabled;
    public final Map<String, BeatInfo> dom2Beat;

    class BeatTask implements Runnable {
        BeatInfo beatInfo;

        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }

        public void run() {
            if (!this.beatInfo.isStopped()) {
                long nextTime = this.beatInfo.getPeriod();

                try {
                    //向Nacos Server发送心跳(/nacos/v1/ns/instance/beat),服务端收到客户端发送的心跳之后,会更新服务实例最后一次上报心跳的时间
                    JsonNode result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo, BeatReactor.this.lightBeatEnabled);
                    long interval = result.get("clientBeatInterval").asLong();
                    boolean lightBeatEnabled = false;
                    if (result.has("lightBeatEnabled")) {
                        lightBeatEnabled = result.get("lightBeatEnabled").asBoolean();
                    }

                    BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                    if (interval > 0L) {
                        nextTime = interval;
                    }

                    int code = 10200;
                    if (result.has("code")) {
                        code = result.get("code").asInt();
                    }

                    if (code == 20404) {
                        Instance instance = new Instance();
                        instance.setPort(this.beatInfo.getPort());
                        instance.setIp(this.beatInfo.getIp());
                        instance.setWeight(this.beatInfo.getWeight());
                        instance.setMetadata(this.beatInfo.getMetadata());
                        instance.setClusterName(this.beatInfo.getCluster());
                        instance.setServiceName(this.beatInfo.getServiceName());
                        instance.setInstanceId(instance.getInstanceId());
                        instance.setEphemeral(true);

                        try {
                            //如果请求资源Nacos服务端没有找到,返回20404;向Nacos Server重新发起服务注册
BeatReactor.this.serverProxy.registerService(this.beatInfo.getServiceName(), NamingUtils.getGroupName(this.beatInfo.getServiceName()), instance);
                        } catch (Exception var10) {
                        }
                    }
                } catch (NacosException var11) {
                    LogUtils.NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", new Object[]{JacksonUtils.toJson(this.beatInfo), var11.getErrCode(), var11.getErrMsg()});
                }
                //每一次心跳发送完之后,5s再次发送。
                BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);
            }
        }
    }
}

心跳机制就是客户端通过schedule定时向服务端发送一个数据包,然后启动一个线程不断检测服务端的回应,如果在设定时间内没有收到服务端的回应,则认为服务器出现了故障。Nacos服务端会根据客户端的心跳包不断更新服务的状态。

客户端发送完心跳,服务端又是如何对服务健康状态进行检查的呢?接下来我们一起看看Nacos服务端是如何实现服务健康检查的!从前面服务注册的分析中我们知道,服务端的心跳检查机制定时任务为:ClientBeatCheckTask(该任务是在服务注册的时候开启的),其具体代码实现如下:

package com.alibaba.nacos.naming.healthcheck;
public class ClientBeatCheckTask implements Runnable {
    @Override
    public void run() {
        try {
            List<Instance> instances = service.allIPs(true);
            
            // first set health status of instances:
            for (Instance instance : instances) {
                //服务端超过15s没有收到心跳,设置服务健康状态为false,并发布事件InstanceHeartbeatTimeoutEvent
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
                        if (instance.isHealthy()) {
                            instance.setHealthy(false);
                            Loggers.EVT_LOG
                                    .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                            instance.getIp(), instance.getPort(), instance.getClusterName(),
                                            service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                            instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                            getPushService().serviceChanged(service);
                            ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }
            
            // then remove obsolete instances:
            for (Instance instance : instances) {
                if (instance.isMarked()) {
                    continue;
                }
                //服务端超过30s没有收到心跳,移除服务实例(/nacos/v1/ns/instance - DELETE)
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                            JacksonUtils.toJson(instance));
                    deleteIp(instance);
                }
            }
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }
    }
    
    private void deleteIp(Instance instance) {
        try {
            NamingProxy.Request request = NamingProxy.Request.newRequest();
            request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort()))
                    .appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName())
                    .appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());
            
            String url = "http://" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath()
                    + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();
            
            // delete instance asynchronously:
            HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() {
                @Override
                public void onReceive(RestResult<String> result) {
                    if (!result.ok()) {
                        Loggers.SRV_LOG
                                .error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
                                        instance.toJson(), result.getMessage(), result.getCode());
                    }
                }
    
                @Override
                public void onError(Throwable throwable) {
                    Loggers.SRV_LOG
                            .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(),
                                    throwable);
                }
    
                @Override
                public void onCancel() {
                }
            });
        } catch (Exception e) {
            Loggers.SRV_LOG
                    .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);
        }
    }
}

 其核心逻辑是:不断检测当前服务下所有实例最后发送心跳包的时间,15s没有收到客户端发送的心跳,服务健康状态设置为false;30s没有收到客户端发送的心跳,服务实例移除。如果超时,则设置healthy为false,表示服务不健康,并且发送服务变更事件。这里有一个小小的问题,服务实例的最后心跳包更新时间是由谁来触发的?是在客户端向服务端发送心跳之后,服务端收到请求之后处理的时候会进行设置(ClientBeatProcessor)。

public class ClientBeatProcessor implements Runnable {
    public static final long CLIENT_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
    private RsInfo rsInfo;
    private Service service;
    @JsonIgnore
    public PushService getPushService() {
        return ApplicationUtils.getBean(PushService.class);
    }
    public RsInfo getRsInfo() {
        return rsInfo;
    }
    public void setRsInfo(RsInfo rsInfo) {
        this.rsInfo = rsInfo;
    }
    public Service getService() {
        return service;
    }
    public void setService(Service service) {
        this.service = service;
    }
    
    @Override
    public void run() {
        Service service = this.service;
        if (Loggers.EVT_LOG.isDebugEnabled()) {
            Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
        }
        
        String ip = rsInfo.getIp();
        String clusterName = rsInfo.getCluster();
        int port = rsInfo.getPort();
        Cluster cluster = service.getClusterMap().get(clusterName);
        List<Instance> instances = cluster.allIPs(true);
        
        for (Instance instance : instances) {
            if (instance.getIp().equals(ip) && instance.getPort() == port) {
                if (Loggers.EVT_LOG.isDebugEnabled()) {
                    Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
                }
                //更新心跳最后一次上报的时间
                instance.setLastBeat(System.currentTimeMillis());
                if (!instance.isMarked() && !instance.isHealthy()) {
                    instance.setHealthy(true);
                    Loggers.EVT_LOG
                            .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                                    cluster.getService().getName(), ip, port, cluster.getName(),
                                    UtilsAndCommons.LOCALHOST_SITE);
                    getPushService().serviceChanged(service);
                }
            }
        }
    }
}

3.nacos服务端针对服务的健康检查(15s未收到心跳设置服务健康状态healthy=false,30s未收到心跳删除服务实例)

服务发现

1.客服端主动拉取

服务提供者注册到注册中心之后,服务消费者是如何获取服务提供者地址的呢?服务消费者完成对服务提供者的订阅之后,首先会有一个线程定期去获取服务列表,这种场景下是客户端主动去拉取服务提供者的相关信息。分析服务注册实现原理的时候,我们说到NacosNamingService的初始化,其中有一个很关键的类HostReactor,后面的服务动态发现的实现也有它的参与。客户端对服务端进行订阅之后,就会主动去获取服务提供者的信息。

public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {
        this.eventDispatcher.addListener(this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")), StringUtils.join(clusters, ","), listener);
    }

这里调用了HostReactor类的getServiceInfo方法:

public ServiceInfo getServiceInfo(String serviceName, String clusters) {
        LogUtils.NAMING_LOGGER.debug("failover-mode: " + this.failoverReactor.isFailoverSwitch());
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (this.failoverReactor.isFailoverSwitch()) {
            return this.failoverReactor.getService(key);
        } else {
            ServiceInfo serviceObj = this.getServiceInfo0(serviceName, clusters);
            if (null == serviceObj) {
                serviceObj = new ServiceInfo(serviceName, clusters);
                this.serviceInfoMap.put(serviceObj.getKey(), serviceObj);
                this.updatingMap.put(serviceName, new Object());
                this.updateServiceNow(serviceName, clusters);
                this.updatingMap.remove(serviceName);
            } else if (this.updatingMap.containsKey(serviceName)) {
                synchronized(serviceObj) {
                    try {
                        serviceObj.wait(5000L);
                    } catch (InterruptedException var8) {
                        LogUtils.NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, var8);
                    }
                }
            }

            this.scheduleUpdateIfAbsent(serviceName, clusters);
            return (ServiceInfo)this.serviceInfoMap.get(serviceObj.getKey());
        }
    }

这个方法除了第一次获取服务提供者的信息,还会将UpdateTask定时任务启动,这个定时任务负责定期拉取服务提供者列表。 

public class UpdateTask implements Runnable {
        long lastRefTime = Long.MAX_VALUE;
        private final String clusters;
        private final String serviceName;
        private int failCount = 0;

        public UpdateTask(String serviceName, String clusters) {
            this.serviceName = serviceName;
            this.clusters = clusters;
        }

        private void incFailCount() {
            int limit = 6;
            if (this.failCount != limit) {
                ++this.failCount;
            }
        }

        private void resetFailCount() {
            this.failCount = 0;
        }

        public void run() {
            long delayTime = 1000L;

            try {
                ServiceInfo serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters));
                if (serviceObj == null) {
                    HostReactor.this.updateService(this.serviceName, this.clusters);
                    return;
                }

                if (serviceObj.getLastRefTime() <= this.lastRefTime) {
                    HostReactor.this.updateService(this.serviceName, this.clusters);
                    serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters));
                } else {
                    HostReactor.this.refreshOnly(this.serviceName, this.clusters);
                }

                this.lastRefTime = serviceObj.getLastRefTime();
                if (!HostReactor.this.eventDispatcher.isSubscribed(this.serviceName, this.clusters) && !HostReactor.this.futureMap.containsKey(ServiceInfo.getKey(this.serviceName, this.clusters))) {
                    LogUtils.NAMING_LOGGER.info("update task is stopped, service:" + this.serviceName + ", clusters:" + this.clusters);
                    return;
                }

                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                    this.incFailCount();
                    return;
                }

                delayTime = serviceObj.getCacheMillis();
                this.resetFailCount();
            } catch (Throwable var7) {
                this.incFailCount();
                LogUtils.NAMING_LOGGER.warn("[NA] failed to update serviceName: " + this.serviceName, var7);
            } finally {
                HostReactor.this.executor.schedule(this, Math.min(delayTime << this.failCount, 60000L), TimeUnit.MILLISECONDS);
            }

        }
    }

下面我们再回头来看看服务发现的具体实现,客户端请求如下:

public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException {
    Map<String, String> params = new HashMap(8);
    params.put("namespaceId", this.namespaceId);//命名空间ID
    params.put("serviceName", serviceName);//服务名称
    params.put("clusters", clusters);//集群
    params.put("udpPort", String.valueOf(udpPort));//端口
    params.put("clientIP", NetUtils.localIP());//IP
    params.put("healthyOnly", String.valueOf(healthyOnly));
    return this.reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, "GET");
}

Nacos服务端对应的Controller实现为:

@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
    //1.解析请求参数
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    
    String agent = WebUtils.getUserAgent(request);
    String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
    String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
    int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
    String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
    boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
    
    String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
    
    String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
    
    boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
    
    //2.通过doSrvIpxt返回服务列表参数
    return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
            healthyOnly);
}
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
        int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
    ClientInfo clientInfo = new ClientInfo(agent);
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    //1.根据namespaceId和serviceName获取Service实例
    Service service = serviceManager.getService(namespaceId, serviceName);
    //2.获取指定服务下的所有实例IP
    List<Instance> srvedIPs;
    srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
    
    // filter ips using selector:
    if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
        srvedIPs = service.getSelector().select(clientIP, srvedIPs);
    }
    
    Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
    ipMap.put(Boolean.TRUE, new ArrayList<>());
    ipMap.put(Boolean.FALSE, new ArrayList<>());
    
    for (Instance ip : srvedIPs) {
        ipMap.get(ip.isHealthy()).add(ip);
    }
    
    //3.遍历,完成JSON字符串的组装
    ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
    for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
        List<Instance> ips = entry.getValue();
        
        if (healthyOnly && !entry.getKey()) {
            continue;
        }
        
        for (Instance instance : ips) {
            // remove disabled instance:
            if (!instance.isEnabled()) {
                continue;
            }
            
            ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
            
            ipObj.put("ip", instance.getIp());
            ipObj.put("port", instance.getPort());
            // deprecated since nacos 1.0.0:
            ipObj.put("valid", entry.getKey());
            ipObj.put("healthy", entry.getKey());
            ipObj.put("marked", instance.isMarked());
            ipObj.put("instanceId", instance.getInstanceId());
            ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
            ipObj.put("enabled", instance.isEnabled());
            ipObj.put("weight", instance.getWeight());
            ipObj.put("clusterName", instance.getClusterName());
            if (clientInfo.type == ClientInfo.ClientType.JAVA
                    && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                ipObj.put("serviceName", instance.getServiceName());
            } else {
                ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
            }
            
            ipObj.put("ephemeral", instance.isEphemeral());
            hosts.add(ipObj);
        }
    }
    
    result.replace("hosts", hosts);
    if (clientInfo.type == ClientInfo.ClientType.JAVA
            && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
        result.put("dom", serviceName);
    } else {
        result.put("dom", NamingUtils.getServiceName(serviceName));
    }
    result.put("name", serviceName);
    result.put("cacheMillis", cacheMillis);
    result.put("lastRefTime", System.currentTimeMillis());
    result.put("checksum", service.getChecksum());
    result.put("useSpecifiedURL", false);
    result.put("clusters", clusters);
    result.put("env", env);
    result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
    return result;
}

2.服务实例发生变更,服务端推送(基于UDP协议)

我们知道,定期拉取会存在时效性的问题。Nacos作为注册中心,设计思想和Nacos作为配置中心,一些思想上都是一致的,都采用了推拉结合的模式。下面我们来看看它的具体实现。

这里我们需要回忆一下前面的一些分析,服务端的心跳检测机制中,如果15s没有收到服务提供者发送的心跳,会发布一个ServiceChangeEvent事件。

package com.alibaba.nacos.naming.healthcheck;
public class ClientBeatCheckTask implements Runnable {
    private Service service;
    
    @Override
    public void run() {
        try {
            List<Instance> instances = service.allIPs(true);
            
            // first set health status of instances:
            for (Instance instance : instances) {
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
                        if (instance.isHealthy()) {
                            instance.setHealthy(false);
                            Loggers.EVT_LOG
                                    .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                            instance.getIp(), instance.getPort(), instance.getClusterName(),
                                            service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                            instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                            //发布ServiceChangeEvent事件
                            getPushService().serviceChanged(service);
                            ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }
            
            // then remove obsolete instances:
            for (Instance instance : instances) {
                if (instance.isMarked()) {
                    continue;
                }
                
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                            JacksonUtils.toJson(instance));
                    deleteIp(instance);
                }
            }
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }
    }
    
    private void deleteIp(Instance instance) {
        try {
            NamingProxy.Request request = NamingProxy.Request.newRequest();
            request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort()))
                    .appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName())
                    .appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());
            
            String url = "http://" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath()
                    + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();
            
            // delete instance asynchronously:
            HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() {
                @Override
                public void onReceive(RestResult<String> result) {
                    if (!result.ok()) {
                        Loggers.SRV_LOG
                                .error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
                                        instance.toJson(), result.getMessage(), result.getCode());
                    }
                }
    
                @Override
                public void onError(Throwable throwable) {
                    Loggers.SRV_LOG
                            .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(),
                                    throwable);
                }
    
                @Override
                public void onCancel() {
        
                }
            });
            
        } catch (Exception e) {
            Loggers.SRV_LOG
                    .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);
        }
    }
}
public void serviceChanged(Service service) {
    // merge some change events to reduce the push frequency:
    if (futureMap
            .containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
        return;
    }
    
    this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}

 PushService实现了ApplicationListener,会监听ServiceChangeEvent事件。

@Override
public void onApplicationEvent(ServiceChangeEvent event) {
    Service service = event.getService();
    String serviceName = service.getName();
    String namespaceId = service.getNamespaceId();
    
    Future future = GlobalExecutor.scheduleUdpSender(() -> {
        try {
            Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
            ConcurrentMap<String, PushClient> clients = clientMap
                    .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            if (MapUtils.isEmpty(clients)) {
                return;
            }
            
            Map<String, Object> cache = new HashMap<>(16);
            long lastRefTime = System.nanoTime();
            for (PushClient client : clients.values()) {
                if (client.zombie()) {
                    Loggers.PUSH.debug("client is zombie: " + client.toString());
                    clients.remove(client.toString());
                    Loggers.PUSH.debug("client is zombie: " + client.toString());
                    continue;
                }
                
                Receiver.AckEntry ackEntry;
                Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
                String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                byte[] compressData = null;
                Map<String, Object> data = null;
                if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                    org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                    compressData = (byte[]) (pair.getValue0());
                    data = (Map<String, Object>) pair.getValue1();
                    
                    Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                }
                
                if (compressData != null) {
                    ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                } else {
                    ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                    if (ackEntry != null) {
                        cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                    }
                }
                
                Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                        client.getServiceName(), client.getAddrStr(), client.getAgent(),
                        (ackEntry == null ? null : ackEntry.key));
                //基于UDP协议推送信息到客户端
                udpPush(ackEntry);
            }
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
        } finally {
            futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
        }
    }, 1000, TimeUnit.MILLISECONDS);
    
    futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
    if (ackEntry == null) {
        Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
        return null;
    }
    
    if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
        Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;
        return ackEntry;
    }
    
    try {
        if (!ackMap.containsKey(ackEntry.key)) {
            totalPush++;
        }
        ackMap.put(ackEntry.key, ackEntry);
        udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
        
        Loggers.PUSH.info("send udp packet: " + ackEntry.key);
        udpSocket.send(ackEntry.origin);
        
        ackEntry.increaseRetryTime();
        
        GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
                TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
        
        return ackEntry;
    } catch (Exception e) {
        Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
                ackEntry.origin.getAddress().getHostAddress(), e);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;
        
        return null;
    }
}

服务消费者收到请求后,使用HostReactor中提供的processServiceJSON解析消息,并更新本地服务地址列表。

HostReactor的构造方法中会实例化一个PushReceiver类,它就是用来处理服务端推送的数据的。

public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart, int pollingThreadCount) {
    this.futureMap = new HashMap();
    this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
        //这是一个后台线程,一直运行
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("com.alibaba.nacos.client.naming.updater");
            return thread;
        }
    });
    this.eventDispatcher = eventDispatcher;
    this.beatReactor = beatReactor;
    this.serverProxy = serverProxy;
    this.cacheDir = cacheDir;
    if (loadCacheAtStart) {
        this.serviceInfoMap = new ConcurrentHashMap(DiskCache.read(this.cacheDir));
    } else {
        this.serviceInfoMap = new ConcurrentHashMap(16);
    }

    this.updatingMap = new ConcurrentHashMap();
    this.failoverReactor = new FailoverReactor(this, cacheDir);
    this.pushReceiver = new PushReceiver(this);
}
public class PushReceiver implements Runnable, Closeable {
    private static final Charset UTF_8 = Charset.forName("UTF-8");
    private static final int UDP_MSS = 65536;
    private ScheduledExecutorService executorService;
    private DatagramSocket udpSocket;
    private HostReactor hostReactor;
    private volatile boolean closed = false;

    public PushReceiver(HostReactor hostReactor) {
        try {
            this.hostReactor = hostReactor;
            this.udpSocket = new DatagramSocket();
            this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.naming.push.receiver");
                    return thread;
                }
            });
            this.executorService.execute(this);
        } catch (Exception var3) {
            LogUtils.NAMING_LOGGER.error("[NA] init udp socket failed", var3);
        }

    }

    public void run() {
        while(!this.closed) {
            try {
                byte[] buffer = new byte[65536];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                //接收服务端的数据
                this.udpSocket.receive(packet);
                String json = (new String(IoUtils.tryDecompress(packet.getData()), UTF_8)).trim();
                LogUtils.NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
                PushReceiver.PushPacket pushPacket = (PushReceiver.PushPacket)JacksonUtils.toObj(json, PushReceiver.PushPacket.class);
                String ack;
                if (!"dom".equals(pushPacket.type) && !"service".equals(pushPacket.type)) {
                    if ("dump".equals(pushPacket.type)) {
                        ack = "{\"type\": \"dump-ack\", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(this.hostReactor.getServiceInfoMap())) + "\"}";
                    } else {
                        ack = "{\"type\": \"unknown-ack\", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":\"\"}";
                    }
                } else {
                    //解析数据
                    this.hostReactor.processServiceJson(pushPacket.data);
                    ack = "{\"type\": \"push-ack\", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":\"\"}";
                }

                //向服务端发送确认信息
                this.udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length, packet.getSocketAddress()));
            } catch (Exception var6) {
                LogUtils.NAMING_LOGGER.error("[NA] error while receiving push data", var6);
            }
        }
    }

    public void shutdown() throws NacosException {
        String className = this.getClass().getName();
        LogUtils.NAMING_LOGGER.info("{} do shutdown begin", className);
        ThreadUtils.shutdownThreadPool(this.executorService, LogUtils.NAMING_LOGGER);
        this.closed = true;
        this.udpSocket.close();
        LogUtils.NAMING_LOGGER.info("{} do shutdown stop", className);
    }

    public int getUdpPort() {
        return this.udpSocket.getLocalPort();
    }

    public static class PushPacket {
        public String type;
        public long lastRefTime;
        public String data;

        public PushPacket() {
        }
    }
}

主流服务注册中心对比

这里我们对比几个常用的注册中心:Nacos、Eureka、zookeeper和consul。下面是网上找的一张它们之间的对比内容,供大家参考:

不管是配置中心,还是这篇文章我们分析的服务注册中心,只要它们能实现我们的需求,在具体的选型上,不用太纠结。简单来说,跟着团队目前的技术栈走即可,大部分场景下,不论我们选择哪一个都能达到我们想要的效果。可能在极少数的情况下,我们才需要选择特定的注册中心,比如对一致性要求很高,那AP模式的注册中心我们就要排除掉。

小小收获

前面分析了这么多关于Nacos作为服务注册中心的实现,那我们能从中学习到一些什么样的知识呢?下面我会列出一些核心的内容,大家感兴趣可以再次去深入了解并学习一下。 

  • SpringBoot启动流程(熟悉启动流程,才能找到服务注册的入口)
  • 【重要】SpringBoot自动装配机制
  • Spring事件发布与监听机制(阅读一些开源中间件的时候,涉及比较多)
  • JDK反射机制(反射创建NamingService)
  • 线程池(定时发送心跳、定时拉取服务等)
  • 【重要】SpringCloud服务注册标准——ServiceRegistry
  • 服务异步注册(实现Nacos高性能手段之一)
  • 注册表更新机制(写时复制CopyOnWrite)
  • 服务变更实现主动推送(DatagramSocket的UDP协议)
  • 数据一致性算法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1226867.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AcWing 3. 完全背包问题 学习笔记

有 N&#xfffd; 种物品和一个容量是 V&#xfffd; 的背包&#xff0c;每种物品都有无限件可用。 第 i&#xfffd; 种物品的体积是 vi&#xfffd;&#xfffd;&#xff0c;价值是 wi&#xfffd;&#xfffd;。 求解将哪些物品装入背包&#xff0c;可使这些物品的总体积不…

Elasticsearch中的语义检索

一、传统检索的背景痛点 和传统的基于关键词的匹配方式不同&#xff0c;语义检索&#xff0c;利用大模型&#xff0c;将文本内容映射到神经网络空间&#xff0c;最终记忆token做检索。 例如想要搜索中国首都&#xff0c;例如数据集中&#xff0c;只有一篇文章在描述北京&#x…

Zabbix实现故障自愈

一、简介 Zabbix agent 可以运行被动检查和主动检查。 在被动检查模式中 agent 应答数据请求。Zabbix server&#xff08;或 proxy&#xff09;询求数据&#xff0c;例如 CPU load&#xff0c;然后 Zabbix agent 返还结果。 主动检查处理过程将相对复杂。Agent 必须首先从 Z…

优卡特脸爱云一脸通智慧管理平台权限绕过漏洞复现(CVE-2023-6099)

0x01 产品简介 脸爱云一脸通智慧管理平台是一套功能强大&#xff0c;运行稳定&#xff0c;操作简单方便&#xff0c;用户界面美观&#xff0c;轻松统计数据的一脸通系统。无需安装&#xff0c;只需在后台配置即可在浏览器登录。 功能包括&#xff1a;系统管理中心、人员信息管理…

[qemu逃逸] XNUCA2019-vexx

前言 这题没有去符合, 题目本身不算难. 用户名: root 密码: goodluck 设备逆向 题目没有去符合, 所以其实没啥好讲了, 就列一些笔者认为关键的地方 这里的定义了两块 mmio 内存区. 然后看下设备实例结构体: 可以看到 QEMUTimer, 所以多半就是劫持 dma_timer 了. 漏洞点在…

使用Qt实现多人聊天工作室

目录 1、项目背景 2、技术分析 3、架构设计 3、1 服务器架构 3.1.1 模块划分 3.1.2 模块之间的交互 3、2 客户端架构 3.2.1 模块划分 3.2.2 模块之间交互 4、实现过程 4、1 功能实现 4.1.1 用户登录注册功能​编辑 4.1.2 用户主界面功能 4、2 设计实现 4.2.1 登录…

代码随想录算法训练营|五十六天

回文子串 647. 回文子串 - 力扣&#xff08;LeetCode&#xff09; dp含义&#xff1a;表示区间内[i,j]是否有回文子串&#xff0c;有true&#xff0c;没有false。 递推公式&#xff1a;当s[i]和s[j]不相等&#xff0c;false&#xff1b;相等时&#xff0c;情况一&#xff0c;…

Springboot框架中使用 Redis + Lua 脚本进行限流功能

Springboot框架中使用 Redis Lua 脚本进行限流功能 限流是一种用于控制系统资源利用率或确保服务质量的策略。在Web应用中&#xff0c;限流通常用于控制接口请求的频率&#xff0c;防止过多的请求导致系统负载过大或者防止恶意攻击。 什么是限流&#xff1f; 限流是一种通过…

Go——一、Go语言安装及介绍

Go 一、Windows下安装Go1、下载Go2、配置环境变量3、下载Jetbrain下的GoLang4、编写hello world5、编译和执行 二、Go语言介绍1、开发文档2、Go语言核心开发团队3、为什么要创建Go4、Go语言发展史5、Go语言特点6、Golang执行过程6.1 执行过程分析6.2 编译是什么 7、开发注意事项…

腾讯微服务平台TSF学习笔记(一)--如何使用TSF的Sidecar过滤器实现mesh应用的故障注入

Mesh应用的故障注入 故障注入前世今生Envoy设置故障注入-延迟类型设置故障注入-延迟类型并带有自定义状态码总结 故障注入前世今生 故障注入是一种系统测试方法&#xff0c;通过引入故障来找到系统的bug&#xff0c;验证系统的稳健性。istio支持延迟故障注入和异常故障注入。 …

中国制库:创新引领,效率突破,塑造行业新标准

制库是一家专注于企业知识应用的在线SAAS平台,主要构成部分包括制度、表单、流程、制问和集合。作为集合了各种管理制度的平台,制库不仅提供了丰富的制度资源,还通过SAAS版实现了知识集成、修订和应用的全流程。目标是打造中国全面的企业制度库,帮助企业快速建立核心管理系统,并…

Jenkins测完通知到人很麻烦?一个设置配置钉钉消息提醒!

Jenkins 作为最流行的开源持续集成平台&#xff0c;其强大的拓展功能一直备受测试人员及开发人员的青睐。大家都知道我们可以在 Jenkins 中安装 Email 插件支持构建之后通过邮件将结果及时通知到相关人员。但其实 Jenkins 还可以支持钉钉消息通知&#xff0c;其主要通过 DingTa…

Spark 平障录

Profile Profile 是最重要的第一环。 利用好 spark UI 和 yarn container log分析业务代码&#xff0c;对其计算代价进行预判建设基准&#xff0c;进行对比&#xff0c;比如application id 进行对比&#xff0c;精确到 job DAG 环节 充分利用 UI Stage 页面 页头 summary&…

ON1 Photo RAW MAX 2024 v18.0.4.14758

ON1 Photo RAW MAX 2024 for mac是一款专业的raw照片编辑软件&#xff0c;提供了各种各样的编辑工具&#xff0c;包括调整曝光、对比度、色彩、锐化、裁剪、旋转和去除红眼等功能&#xff0c;用户可以根据具体需求对照片进行精确的调整。ON1 Photo RAW MAX 2024还提供了智能修复…

Windows 的 WSL 中运行 EasyConnect

Windows 的 WSL 中运行 EasyConnect docker-easyconnect 安装 Docker Desktop 通过 Docker 的官网 Docker Desktop 下载并安装. 安装过程一直下一步即可, 默认推荐 WSL 模式 初始化过程需要梯子 安装完后在搜索框搜索 docker-easyconnect hagb/docker-easyconnect 就是需要…

nacos网关

目录 拉取docker镜像 环境配置 网关搭建架构 wemedia-gateway网关配置 依赖 启动类配置 网关yml配置 nacos配置中心配置网关 wdmedia服务配置 依赖 启动类配置 yml配置 nacos配置 nacos中的配置共享 nacos配置 wmedia模块中yml的配置 参考:https://blog.csdn.net/…

【【SOC设计之 数据回路从 DMA到 FIFO再到BRAM 再到FIFO 再写回DMA】】

SOC设计之 数据回路从 DMA到 FIFO再到BRAM 再到FIFO 再写回DMA 基本没问题的回路设计 从 DMA出发将数据传递到 FIFO 再 写到 自定义的 RTL文件中 再写到 BRAM 再到 自定义的RTL文件 再到 FIFO 再写回DMA block design 的 设计连接 可以参考我上一个文件的设计 下面介绍两个c…

线性变换概论

线性变换 定义 设 V V V 和 W W W 都是在域 K K K上定义的向量空间&#xff0c; T : V → W T :V \rightarrow W T:V→W 对任二向量 x , y ∈ V x,y \in V x,y∈V,与任何标量 a ∈ K a \in K a∈K&#xff0c;满足&#xff1a; T ( x y ) T ( x ) T ( y ) T(xy)T(x)T(…

Zookeeper实战案例(1)

前置知识&#xff1a; Zookeeper学习笔记&#xff08;1&#xff09;—— 基础知识-CSDN博客 Zookeeper学习笔记&#xff08;2&#xff09;—— Zookeeper API简单操作-CSDN博客 Zookeeper 服务器动态上下线监听案例 需求分析 某分布式系统中&#xff0c;主节点可以有多台&am…