【Nacos源码分析01-服务注册与集群间数据是同步】

news2025/1/20 19:25:50

文章目录

  • 了解CAP
  • BASE理论
  • Nacos支持CP还是AP
  • 集群数据同步
  • 实现集群数据一致性源码

了解CAP

在这里插入图片描述
CAP理论的核心观点是,一个分布式系统无法同时完全满足一致性、可用性和分区容错性这三个特性。具体而言,当发生网络分区时,系统必须在一致性和可用性之间做出选择:
CA(一致性和可用性):
系统在正常情况下能够保证一致性和可用性,但无法应对网络分区故障。一旦发生网络分区,系统必须放弃一致性或可用性中的一个。
CP(一致性和分区容错性):
系统在网络分区情况下能够保证一致性和分区容错性,但可能会牺牲部分可用性,即某些请求可能不会得到响应。
AP(可用性和分区容错性):
系统在网络分区情况下能够保证可用性和分区容错性,但可能会牺牲一致性,即不同节点可能会看到不一致的数据。
所谓的CAP定理,就是指在一个分布式系统中,CAP这三个指标,最多同时只能满足其中的两个,不可能三个都同时满足
实际应用中的选择
在实际应用中,不同的系统设计会根据具体需求在CAP三者之间进行权衡。
例如:
金融系统:通常更重视一致性(CP),确保数据的一致性和准确性,即使这可能会牺牲部分可用性。
社交媒体平台:通常更重视可用性和分区容错性(AP),确保用户始终可以访问和发布内容,即使这可能会导致短时间内的数据不一致。

BASE理论

BASE理论主要是包括以下三点:
基本可用(Basically Available):系统出现故障还是能够对外提供服务,不至于直接无法用了
软状态(Soft State):允许各个节点的数据不一致
最终一致性,(Eventually Consistent):虽然允许各个节点的数据不一致,但是在一定时间之后,各个节点的数据最终需要一致的
BASE理论其实就是妥协之后的产物。

Nacos支持CP还是AP

Nacos其实目前是同时支持AP和CP的
具体使用AP还是CP得取决于Nacos内部的具体功能,并不是有的文章说的可以通过一个配置自由切换。
就以服务注册举例来说,对于临时实例来说,Nacos会优先保证可用性,也就是AP
对于永久实例,Nacos会优先保证数据的一致性,也就是CP

集群数据同步

在Nacos以集群模式运行时,当Nacos服务器收到服务注册请求后,发生了ClientEvent.ClientChangedEvent事件,就会触发将注册的服务信息同步给集群中的其他Nacos-server节点。
具体的代码分析:
1.先开始从服务注册接口开始:
地址:com.alibaba.nacos.naming.controllers.v2.InstanceControllerV2.java
服务注册

@CanDistro
    @PostMapping
    @Secured(action = ActionTypes.WRITE)
    public Result<String> register(InstanceForm instanceForm) throws NacosException {
        // check param
        instanceForm.validate();
        checkWeight(instanceForm.getWeight());
        // build instance
        Instance instance = buildInstance(instanceForm);
        instanceServiceV2.registerInstance(instanceForm.getNamespaceId(), buildCompositeServiceName(instanceForm), instance);
        NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(), "",
                false, instanceForm.getNamespaceId(), instanceForm.getGroupName(), instanceForm.getServiceName(),
                instance.getIp(), instance.getPort()));
        return Result.success("ok");
    }

2.添加实例

 @Override
    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    //参数校验
        NamingUtils.checkInstanceIsLegal(instance);
        //判断是临时节点还是永久节点
        boolean ephemeral = instance.isEphemeral();
        String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
        createIpPortClientIfAbsent(clientId);
        Service service = getService(namespaceId, serviceName, ephemeral);
        clientOperationService.registerInstance(service, instance, clientId);
    }
 @Override
    public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
    
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        if (!singleton.isEphemeral()) {
            throw new NacosRuntimeException(NacosException.INVALID_PARAM,
                    String.format("Current service %s is persistent service, can't register ephemeral instance.",
                            singleton.getGroupedServiceName()));
        }
        Client client = clientManager.getClient(clientId);
        if (!clientIsLegal(client, clientId)) {
            return;
        }
        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        client.addServiceInstance(singleton, instanceInfo);
        client.setLastUpdatedTime();
        client.recalculateRevision();
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        NotifyCenter
                .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    }

3.添加并且同步

/**
     * Request publisher publish event Publishers load lazily, calling publisher.
     *
     * @param eventType class Instances type of the event type.
     * @param event     event instance.
     */
    private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
        if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
            return INSTANCE.sharePublisher.publish(event);
        }
        
        final String topic = ClassUtils.getCanonicalName(eventType);
        
        EventPublisher publisher = INSTANCE.publisherMap.get(topic);
        if (publisher != null) {
            return publisher.publish(event);
        }
        if (event.isPluginEvent()) {
            return true;
        }
        LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
        return false;
    }
  public boolean publish(Event event) {
        checkIsStart();
        boolean success = this.queue.offer(event);
        if (!success) {
            LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
            receiveEvent(event);
            return true;
        }
        return true;
    }
void receiveEvent(Event event) {
        final long currentEventSequence = event.sequence();
        
        if (!hasSubscriber()) {
            LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
            return;
        }
        
        // Notification single event listener
        for (Subscriber subscriber : subscribers) {
            if (!subscriber.scopeMatches(event)) {
                continue;
            }
            
            // Whether to ignore expiration events
            if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
                LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
                        event.getClass());
                continue;
            }
            
            // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
            // Remove original judge part of codes.
            notifySubscriber(subscriber, event);
        }
    }

通知集群节点

 public void notifySubscriber(final Subscriber subscriber, final Event event) {
        
        LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
        
        final Runnable job = () -> subscriber.onEvent(event);
        final Executor executor = subscriber.executor();
        
        if (executor != null) {
            executor.execute(job);
        } else {
            try {
                job.run();
            } catch (Throwable e) {
                LOGGER.error("Event callback exception: ", e);
            }
        }
    }

实现集群数据一致性源码

进入DistroClientDataProcessor的实现类来执行onEvent

  @Override
    public void onEvent(Event event) {
        if (EnvUtil.getStandaloneMode()) {
            return;
        }
        if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
            syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
        } else {
            syncToAllServer((ClientEvent) event);
        }
    }

通过syncToAllServer()方法同步数据给集群中的其它节点:
临时实例,使用distro协议同步; 持久实例:使用raft协议同步

  
    private void syncToAllServer(ClientEvent event) {
     /**
     * 判断客户端是否为空,是否是临时实例,判断是否是负责节点
     * 
     * 临时实例,使用distro协议同步; 持久实例:使用raft协议同步
**/
        Client client = event.getClient();
        // Only ephemeral data sync by Distro, persist client should sync by raft.
        if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
            return;
        }
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
        //客户端断开连接
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            distroProtocol.sync(distroKey, DataOperation.DELETE);
        } else if (event instanceof ClientEvent.ClientChangedEvent) {
        //客户端变更 新增或者修改
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            distroProtocol.sync(distroKey, DataOperation.CHANGE);
        }
    }

同步时,会涉及到一个负责节点和非负责节点。
负责节点(发起同步)
也就是收到客户端事件ClientChangedEvent后负责同步信息给其他非负责节点, 所以这里只能有负责节点来进行同步,非负责节点只能接收同步事件。
DistroProtocol
Distro是阿里巴巴的私有协议,distro协议是为了注册中心而创造出的协议。
DistroProtocol会循环所有其他nacos节点,提交一个异步任务,这个异步任务会延迟1s,其实这里我们就可以看到这里涉及到客户端的断开和客户端的新增和修改。
对于Delete操作,由DistroSyncDeleteTask处理,对于Change操作,由DistroSyncChangeTask处理,这里我们从DistroSyncChangeTask来看。

   public void sync(DistroKey distroKey, DataOperation action, long delay) {
        for (Member each : memberManager.allMembersWithoutSelf()) {
            syncToTarget(distroKey, action, each.getAddress(), delay);
        }
    }

核心逻辑在syncToTarget

public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
    DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
            targetServer);
    DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
    // 往延时任务引擎中加入DistroDelayTask任务,最终将会调用DistroDelayTaskProcessor.process方法
    // distroTaskEngineHolder.getDelayTaskExecuteEngine()返回的是DistroDelayTaskExecuteEngine,它继承自NacosDelayTaskExecuteEngine,
    // 其构造方法中初始化了一个定时任务ProcessRunnable,不断从阻塞队列中拿出任务出来执行(ConcurrentHashMap<Object, AbstractDelayTask> tasks)
    distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
    if (Loggers.DISTRO.isDebugEnabled()) {
        Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
    }
}

在syncToTarget()方法中,构建了一个DistroDelayTask任务,然后放到了延时任务引擎中执行,熟悉Nacos服务注册流程的小伙伴对这一块应该不陌生,这里通过distroTaskEngineHolder.getDelayTaskExecuteEngine()返回了一个DistroDelayTaskExecuteEngine执行引擎,它继承自NacosDelayTaskExecuteEngine,而在NacosDelayTaskExecuteEngine的构造方法中初始化了一个定时任务ProcessRunnable,不断从阻塞队列中拿出任务出来执行(ConcurrentHashMap<Object, AbstractDelayTask> tasks)。

/**
 * 任务队列
 * key:对应的服务
 */
protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
 
protected final ReentrantLock lock = new ReentrantLock();
 
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
    super(logger);
    // 初始化任务队列
    tasks = new ConcurrentHashMap<>(initCapacity);
    // 创建定时任务的线程池
    processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
    // 在指定的初始延迟时间(100毫秒)后开始执行任务,并按固定的时间间隔周期性(100毫秒)地执行任务。
    // 默认延时100毫秒执行ProcessRunnable,然后每隔100毫秒周期性执行ProcessRunnable
    processingExecutor
            .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
 
public void addTask(Object key, AbstractDelayTask newTask) {
    // 加锁防并发处理,key就是对应的服务
    lock.lock();
    try {
        // ConcurrentHashMap<Object, AbstractDelayTask> tasks = new ConcurrentHashMap<>(initCapacity);
        // 通过key判断是否已存在map中
        AbstractDelayTask existTask = tasks.get(key);
        if (null != existTask) {
            // 服务存在的话,则需要合并任务,其实就是合并多个任务,一起执行
            newTask.merge(existTask);
        }
        // 将任务放入到map中,等待处理
        tasks.put(key, newTask);
    } finally {
        lock.unlock();
    }
}
 
/**
 * 任务处理类
 */
private class ProcessRunnable implements Runnable {
    
    @Override
    public void run() {
        try {
            processTasks();
        } catch (Throwable e) {
            getEngineLog().error(e.toString(), e);
        }
    }
}
 
protected void processTasks() {
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {
        // 从队列中移除这个任务
        AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {
            continue;
        }
        // taskKey示例值: Service{namespace='public', group='DEFAULT_GROUP', name='discovery-provider', ephemeral=true, revision=0}
        // 找到处理类
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {
            getEngineLog().error("processor not found for task, so discarded. " + task);
            continue;
        }
        try {
            // ReAdd task if process failed
            if (!processor.process(task)) {
                // 处理失败的话,重新入队(即重试)
                retryFailedTask(taskKey, task);
            }
        } catch (Throwable e) {
            getEngineLog().error("Nacos task execute error ", e);
            retryFailedTask(taskKey, task);
        }
    }
}

在processTasks()方法中,首先获取处理类,如果获取不到,则使用默认的处理类。在DistroTaskEngineHolder构造方法中,已经设置了默认处理类为DistroDelayTaskProcessor。

public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
    DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
    // 设置默认的处理器
    delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}
// DistroDelayTaskProcessor#process
public boolean process(NacosTask task) {
    if (!(task instanceof DistroDelayTask)) {
        return true;
    }
    DistroDelayTask distroDelayTask = (DistroDelayTask) task;
    DistroKey distroKey = distroDelayTask.getDistroKey();
    switch (distroDelayTask.getAction()) {
        // unregister注册的是DELETE事件
        case DELETE:
            // 添加了DistroSyncDeleteTask执行任务,由 DistroExecuteTaskExecuteEngine 执行
            DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
            distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
            return true;
        case CHANGE:
        case ADD:
            DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
            // 往立即执行的任务引擎中加入DistroSyncChangeTask任务,DistroSyncChangeTask实现了runnable接口,关注其run方法
            distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
            return true;
        default:
            return false;
    }
}

这里我们以DistroSyncChangeTask为例,来分析整体的同步流程。
可以看到,这里还未开始同步数据的流程,而是又封装了一个DistroSyncChangeTask任务,加入到了Distro任务引擎中,实际上是加入到了TaskExecuteWorker类内部的阻塞任务队列中,如下所示:

public void addTask(Object tag, AbstractExecuteTask task) {
    // 获取处理类
    NacosTaskProcessor processor = getProcessor(tag);
    if (null != processor) {
        // 不为空,就用对应的processor处理
        processor.process(task);
        return;
    }
    // 没有找到处理类的话, 就用公共的TaskExecuteWorker执行
    TaskExecuteWorker worker = getWorker(tag);
    worker.process(task);
}
 
 
/**
 * 阻塞队列, 类型为Runnable,说明存入的是一个线程
 */
private final BlockingQueue<Runnable> queue;
 
// TaskExecuteWorker#process
public boolean process(NacosTask task) {
    if (task instanceof AbstractExecuteTask) {
        // 添加任务到阻塞队列中
        putTask((Runnable) task);
    }
    return true;
}
 
private void putTask(Runnable task) {
    try {
        queue.put(task);
    } catch (InterruptedException ire) {
        log.error(ire.toString(), ire);
    }
}

这里我们没有找到处理类的话,,就用公共的TaskExecuteWorker执行:

public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
    this.name = name + "_" + mod + "%" + total;
    // 阻塞队列
    this.queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
    this.closed = new AtomicBoolean(false);
    this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
    // 内部执行worker,实际上是一个线程
    realWorker = new InnerWorker(this.name);
    // 启动worker
    realWorker.start();
}

在TaskExecuteWorker的构造方法中,初始化了一个内部执行worker,实际上是一个线程,它不断地从阻塞队列中拿出任务,来执行:

private class InnerWorker extends Thread {
    
    InnerWorker(String name) {
        setDaemon(false);
        setName(name);
    }
    
    @Override
    public void run() {
        while (!closed.get()) {
            try {
                // 从阻塞队列获取任务,在process()方法中通过putTask()将任务存入到了阻塞队列中
                Runnable task = queue.take();
                long begin = System.currentTimeMillis();
                // 执行任务
                task.run();
                long duration = System.currentTimeMillis() - begin;
                if (duration > 1000L) {
                    log.warn("task {} takes {}ms", task, duration);
                }
            } catch (Throwable e) {
                log.error("[TASK-FAILED] " + e, e);
            }
        }
    }
}

调用task.run(),我们之前加入的是DistroSyncChangeTask,它继承自AbstractDistroExecuteTask,间接实现了runnable接口,查看其run方法:

// AbstractDistroExecuteTask#run
public void run() {
    String type = getDistroKey().getResourceType();
    DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
    if (null == transportAgent) {
        Loggers.DISTRO.warn("No found transport agent for type [{}]", type);
        return;
    }
    Loggers.DISTRO.info("[DISTRO-START] {}", toString());
    if (transportAgent.supportCallbackTransport()) {
        doExecuteWithCallback(new DistroExecuteCallback());
    } else {
        executeDistroTask();
    }
}
 
private void executeDistroTask() {
    try {
        boolean result = doExecute();
        if (!result) {
            // 失败重试
            handleFailedTask();
        }
        Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
    } catch (Exception e) {
        Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
        handleFailedTask();
    }
}
 
// 由子类DistroSyncChangeTask去实现
protected abstract boolean doExecute();

最终将会执行DistroSyncChangeTask的doExecute()方法:

protected boolean doExecute() {
    String type = getDistroKey().getResourceType();
    // 从DistroClientDataProcessor获取DistroData, 其实是从ClientManager实时获取Client
    DistroData distroData = getDistroData(type);
    if (null == distroData) {
        Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
        return true;
    }
    // 将得到的数据同步给其他服务节点
    return getDistroComponentHolder().findTransportAgent(type)
            .syncData(distroData, getDistroKey().getTargetServer());
}

八、Nacos源码系列:Nacos集群数据同步

负责节点(发起同步)

DistroProtocol

获取同步数据getDistroData

执行同步数据syncData

非负责节点(接收请求)

在Nacos以集群模式运行时,当Nacos服务器收到服务注册请求后,发生了ClientEvent.ClientChangedEvent事件,就会触发将注册的服务信息同步给集群中的其他Nacos-server节点。

ClientEvent.ClientChangedEvent事件的真正处理类是在DistroClientDataProcessor#onEvent方法:



public void onEvent(Event event) {
    // 只有集群模式才有效,单机模式启动的Nacos,不会执行同步操作
    if (EnvUtil.getStandaloneMode()) {
        return;
    }
    if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
        syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
    } else {
        // 同步数据给其它节点
        syncToAllServer((ClientEvent) event);
    }
}

通过syncToAllServer()方法同步数据给集群中的其它节点:

private void syncToAllServer(ClientEvent event) {
    Client client = event.getClient();
    /**
     * 判断客户端是否为空,是否是临时实例,判断是否是负责节点
     * 
     * 临时实例,使用distro协议同步; 持久实例:使用raft协议同步
     */
    if (isInvalidClient(client)) {
        return;
    }
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
        // 客户端断开连接事件
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.DELETE);
    } else if (event instanceof ClientEvent.ClientChangedEvent) {
        // 客户端变更事件(新增或修改)
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.CHANGE);
    }
}

同步时,会涉及到一个负责节点和非负责节点。

负责节点(发起同步)
也就是收到客户端事件ClientChangedEvent后负责同步信息给其他非负责节点, 所以这里只能有负责节点来进行同步,非负责节点只能接收同步事件。

private boolean isInvalidClient(Client client) {
    // 临时实例,使用distro协议同步; 持久实例:使用raft协议同步
    return null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client);
}

DistroProtocol
Distro是阿里巴巴的私有协议,distro协议是为了注册中心而创造出的协议。

DistroProtocol会循环所有其他nacos节点,提交一个异步任务,这个异步任务会延迟1s,其实这里我们就可以看到这里涉及到客户端的断开和客户端的新增和修改。

对于Delete操作,由DistroSyncDeleteTask处理,对于Change操作,由DistroSyncChangeTask处理,这里我们从DistroSyncChangeTask来看。

public void sync(DistroKey distroKey, DataOperation action) {
    // 配置同步延迟的时间:默认为1s
    sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());
}
 
// com.alibaba.nacos.core.distributed.distro.DistroProtocol#sync()
public void sync(DistroKey distroKey, DataOperation action, long delay) {
    // 遍历每个除自己以外的其它成员
    for (Member each : memberManager.allMembersWithoutSelf()) {
        syncToTarget(distroKey, action, each.getAddress(), delay);
    }
}
核心逻辑在syncToTarget()方法:
```java
public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
    DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
            targetServer);
    DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
    // 往延时任务引擎中加入DistroDelayTask任务,最终将会调用DistroDelayTaskProcessor.process方法
    // distroTaskEngineHolder.getDelayTaskExecuteEngine()返回的是DistroDelayTaskExecuteEngine,它继承自NacosDelayTaskExecuteEngine,
    // 其构造方法中初始化了一个定时任务ProcessRunnable,不断从阻塞队列中拿出任务出来执行(ConcurrentHashMap<Object, AbstractDelayTask> tasks)
    distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
    if (Loggers.DISTRO.isDebugEnabled()) {
        Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
    }
}

在syncToTarget()方法中,构建了一个DistroDelayTask任务,然后放到了延时任务引擎中执行,熟悉Nacos服务注册流程的小伙伴对这一块应该不陌生,这里通过distroTaskEngineHolder.getDelayTaskExecuteEngine()返回了一个DistroDelayTaskExecuteEngine执行引擎,它继承自NacosDelayTaskExecuteEngine,而在NacosDelayTaskExecuteEngine的构造方法中初始化了一个定时任务ProcessRunnable,不断从阻塞队列中拿出任务出来执行(ConcurrentHashMap<Object, AbstractDelayTask> tasks)。

/**
 * 任务队列
 * key:对应的服务
 */
protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
 
protected final ReentrantLock lock = new ReentrantLock();
 
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
    super(logger);
    // 初始化任务队列
    tasks = new ConcurrentHashMap<>(initCapacity);
    // 创建定时任务的线程池
    processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
    // 在指定的初始延迟时间(100毫秒)后开始执行任务,并按固定的时间间隔周期性(100毫秒)地执行任务。
    // 默认延时100毫秒执行ProcessRunnable,然后每隔100毫秒周期性执行ProcessRunnable
    processingExecutor
            .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
 
public void addTask(Object key, AbstractDelayTask newTask) {
    // 加锁防并发处理,key就是对应的服务
    lock.lock();
    try {
        // ConcurrentHashMap<Object, AbstractDelayTask> tasks = new ConcurrentHashMap<>(initCapacity);
        // 通过key判断是否已存在map中
        AbstractDelayTask existTask = tasks.get(key);
        if (null != existTask) {
            // 服务存在的话,则需要合并任务,其实就是合并多个任务,一起执行
            newTask.merge(existTask);
        }
        // 将任务放入到map中,等待处理
        tasks.put(key, newTask);
    } finally {
        lock.unlock();
    }
}
 
/**
 * 任务处理类
 */
private class ProcessRunnable implements Runnable {
    
    @Override
    public void run() {
        try {
            processTasks();
        } catch (Throwable e) {
            getEngineLog().error(e.toString(), e);
        }
    }
}
 
protected void processTasks() {
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {
        // 从队列中移除这个任务
        AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {
            continue;
        }
        // taskKey示例值: Service{namespace='public', group='DEFAULT_GROUP', name='discovery-provider', ephemeral=true, revision=0}
        // 找到处理类
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {
            getEngineLog().error("processor not found for task, so discarded. " + task);
            continue;
        }
        try {
            // ReAdd task if process failed
            if (!processor.process(task)) {
                // 处理失败的话,重新入队(即重试)
                retryFailedTask(taskKey, task);
            }
        } catch (Throwable e) {
            getEngineLog().error("Nacos task execute error ", e);
            retryFailedTask(taskKey, task);
        }
    }
}

在processTasks()方法中,首先获取处理类,如果获取不到,则使用默认的处理类。在DistroTaskEngineHolder构造方法中,已经设置了默认处理类为DistroDelayTaskProcessor。

public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
// 设置默认的处理器
delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}
也就是说,在调用syncToTarget()方法后,会触发任务DistroDelayTaskProcessor处理任务。 对于删除类型的任务,触发任务DistroSyncDeleteTask , 对于新增、修改类型的任务:
DistroSyncChangeTask。

// DistroDelayTaskProcessor#process
public boolean process(NacosTask task) {
    if (!(task instanceof DistroDelayTask)) {
        return true;
    }
    DistroDelayTask distroDelayTask = (DistroDelayTask) task;
    DistroKey distroKey = distroDelayTask.getDistroKey();
    switch (distroDelayTask.getAction()) {
        // unregister注册的是DELETE事件
        case DELETE:
            // 添加了DistroSyncDeleteTask执行任务,由 DistroExecuteTaskExecuteEngine 执行
            DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
            distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
            return true;
        case CHANGE:
        case ADD:
            DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
            // 往立即执行的任务引擎中加入DistroSyncChangeTask任务,DistroSyncChangeTask实现了runnable接口,关注其run方法
            distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
            return true;
        default:
            return false;
    }
}

这里我们以DistroSyncChangeTask为例,来分析整体的同步流程。

可以看到,这里还未开始同步数据的流程,而是又封装了一个DistroSyncChangeTask任务,加入到了Distro任务引擎中,实际上是加入到了TaskExecuteWorker类内部的阻塞任务队列中,如下所示:

public void addTask(Object tag, AbstractExecuteTask task) {
    // 获取处理类
    NacosTaskProcessor processor = getProcessor(tag);
    if (null != processor) {
        // 不为空,就用对应的processor处理
        processor.process(task);
        return;
    }
    // 没有找到处理类的话, 就用公共的TaskExecuteWorker执行
    TaskExecuteWorker worker = getWorker(tag);
    worker.process(task);
}

 
/**
 * 阻塞队列, 类型为Runnable,说明存入的是一个线程
 */
private final BlockingQueue<Runnable> queue;
 
// TaskExecuteWorker#process
public boolean process(NacosTask task) {
    if (task instanceof AbstractExecuteTask) {
        // 添加任务到阻塞队列中
        putTask((Runnable) task);
    }
    return true;
}
 
private void putTask(Runnable task) {
    try {
        queue.put(task);
    } catch (InterruptedException ire) {
        log.error(ire.toString(), ire);
    }
}

这里我们没有找到处理类的话,,就用公共的TaskExecuteWorker执行:

public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
    this.name = name + "_" + mod + "%" + total;
    // 阻塞队列
    this.queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
    this.closed = new AtomicBoolean(false);
    this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
    // 内部执行worker,实际上是一个线程
    realWorker = new InnerWorker(this.name);
    // 启动worker
    realWorker.start();
}TaskExecuteWorker的构造方法中,初始化了一个内部执行worker,实际上是一个线程,它不断地从阻塞队列中拿出任务,来执行:

private class InnerWorker extends Thread {
    
    InnerWorker(String name) {
        setDaemon(false);
        setName(name);
    }
    
    @Override
    public void run() {
        while (!closed.get()) {
            try {
                // 从阻塞队列获取任务,在process()方法中通过putTask()将任务存入到了阻塞队列中
                Runnable task = queue.take();
                long begin = System.currentTimeMillis();
                // 执行任务
                task.run();
                long duration = System.currentTimeMillis() - begin;
                if (duration > 1000L) {
                    log.warn("task {} takes {}ms", task, duration);
                }
            } catch (Throwable e) {
                log.error("[TASK-FAILED] " + e, e);
            }
        }
    }
}

调用task.run(),我们之前加入的是DistroSyncChangeTask,它继承自AbstractDistroExecuteTask,间接实现了runnable接口,查看其run方法:

// AbstractDistroExecuteTask#run
public void run() {
    String type = getDistroKey().getResourceType();
    DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
    if (null == transportAgent) {
        Loggers.DISTRO.warn("No found transport agent for type [{}]", type);
        return;
    }
    Loggers.DISTRO.info("[DISTRO-START] {}", toString());
    if (transportAgent.supportCallbackTransport()) {
        doExecuteWithCallback(new DistroExecuteCallback());
    } else {
        executeDistroTask();
    }
}
 
private void executeDistroTask() {
    try {
        boolean result = doExecute();
        if (!result) {
            // 失败重试
            handleFailedTask();
        }
        Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
    } catch (Exception e) {
        Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
        handleFailedTask();
    }
}

// 由子类DistroSyncChangeTask去实现
protected abstract boolean doExecute();

最终将会执行DistroSyncChangeTask的doExecute()方法:

protected boolean doExecute() {
    String type = getDistroKey().getResourceType();
    // 从DistroClientDataProcessor获取DistroData, 其实是从ClientManager实时获取Client
    DistroData distroData = getDistroData(type);
    if (null == distroData) {
        Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
        return true;
    }
    // 将得到的数据同步给其他服务节点
    return getDistroComponentHolder().findTransportAgent(type)
            .syncData(distroData, getDistroKey().getTargetServer());
}

获取同步数据getDistroData
这里获取同步数据其实是从DistroClientDataProcessor获取DistroData, 其实是从ClientManager实时获取Client。

private DistroData getDistroData(String type) {
    // 其实是从ClientManager实时获取Client
    DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
    if (null != result) {
        result.setType(OPERATION);
    }
    return result;
}
 
// DistroClientDataProcessor#getDistroData
public DistroData getDistroData(DistroKey distroKey) {
    Client client = clientManager.getClient(distroKey.getResourceKey());
    if (null == client) {
        return null;
    }
    // 把生成的同步数据放入到数组中
    byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
    return new DistroData(distroKey, data);
}

可以看到generateSyncData 方法是关键获取服务的方法,该方法提供了同步数据,包含Client的注册信息,包括客户端注册了哪些namespace,哪些group,哪些service,哪些instance。

public ClientSyncData generateSyncData() {
    List<String> namespaces = new LinkedList<>();
    List<String> groupNames = new LinkedList<>();
    List<String> serviceNames = new LinkedList<>();
 
    List<String> batchNamespaces = new LinkedList<>();
    List<String> batchGroupNames = new LinkedList<>();
    List<String> batchServiceNames = new LinkedList<>();
 
    List<InstancePublishInfo> instances = new LinkedList<>();
    List<BatchInstancePublishInfo> batchInstancePublishInfos = new LinkedList<>();
    BatchInstanceData  batchInstanceData = new BatchInstanceData();
    for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
        InstancePublishInfo instancePublishInfo = entry.getValue();
        if (instancePublishInfo instanceof BatchInstancePublishInfo) {
            BatchInstancePublishInfo batchInstance = (BatchInstancePublishInfo) instancePublishInfo;
            batchInstancePublishInfos.add(batchInstance);
            buildBatchInstanceData(batchInstanceData, batchNamespaces, batchGroupNames, batchServiceNames, entry);
            batchInstanceData.setBatchInstancePublishInfos(batchInstancePublishInfos);
        } else {
            namespaces.add(entry.getKey().getNamespace());
            groupNames.add(entry.getKey().getGroup());
            serviceNames.add(entry.getKey().getName());
            instances.add(entry.getValue());
        }
    }
    ClientSyncData data = new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData);
    data.getAttributes().addClientAttribute(REVISION, getRevision());
    return data;
}

执行同步数据syncData
这里的同步实际是由DistroClientTransportAgent来负责的,将数据封装成DistroDataRequest,然后获取目标节点Member,然后调用sendRequest异步方法执行同步:

public boolean syncData(DistroData data, String targetServer) {
    if (isNoExistTarget(targetServer)) {
        return true;
    }
    DistroDataRequest request = new DistroDataRequest(data, data.getType());
    // 获取目标节点
    Member member = memberManager.find(targetServer);
    if (checkTargetServerStatusUnhealthy(member)) {
        Loggers.DISTRO
                .warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy, key: {}", targetServer,
                        data.getDistroKey());
        return false;
    }
    try {
        // 同步发送 DistroDataRequest 请求
        // 真正处理请求是在:com.alibaba.nacos.naming.remote.rpc.handler.DistroDataRequestHandler.handle方法
        Response response = clusterRpcClientProxy.sendRequest(member, request);
        return checkResponse(response);
    } catch (NacosException e) {
        Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! key: {}", data.getDistroKey(), e);
    }
    return false;
}

这时我们主要关注非负责节点收到同步请求后如何处理。
非负责节点(接收请求)
当负责节点将数据发送给非负责节点以后,将要处理发送过来的Client数据。

// DistroDataRequestHandler#handle
public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
    try {
        switch (request.getDataOperation()) {
            case VERIFY:
                return handleVerify(request.getDistroData(), meta);
            case SNAPSHOT:
                return handleSnapshot();
            case ADD:
            case CHANGE:
            case DELETE:
                // 变更操作: 维护注册表数据,然后发布事件
                return handleSyncData(request.getDistroData());
            case QUERY:
                return handleQueryData(request.getDistroData());
            default:
                return new DistroDataResponse();
        }
    } catch (Exception e) {
        Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
        DistroDataResponse result = new DistroDataResponse();
        result.setErrorCode(ResponseCode.FAIL.getCode());
        result.setMessage("handle distro request with exception");
        return result;
    }
}
 
private DistroDataResponse handleSyncData(DistroData distroData) {
    DistroDataResponse result = new DistroDataResponse();
	// 调用DistroProtocol.onReceive方法
    if (!distroProtocol.onReceive(distroData)) {
        result.setErrorCode(ResponseCode.FAIL.getCode());
        result.setMessage("[DISTRO-FAILED] distro data handle failed");
    }
    return result;
}
 
// DistroProtocol#onReceive
public boolean onReceive(DistroData distroData) {
    Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(),
            distroData.getDistroKey());
    String resourceType = distroData.getDistroKey().getResourceType();
    DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
    if (null == dataProcessor) {
        Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
        return false;
    }
    // 通过处理器处理接收到的数据
    return dataProcessor.processData(distroData);
}

这里我主要关注ADD/CHANGE,所以主要关注handleSyncData()方法。

public boolean processData(DistroData distroData) {
    switch (distroData.getType()) {
        case ADD:
        case CHANGE:
            // 反序列化同步数据为ClientSyncData
            ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
                    .deserialize(distroData.getContent(), ClientSyncData.class);
            // 处理同步数据
            handlerClientSyncData(clientSyncData);
            return true;
        case DELETE:
            String deleteClientId = distroData.getDistroKey().getResourceKey();
            Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
            clientManager.clientDisconnected(deleteClientId);
            return true;
        default:
            return false;
    }
}

首先反序列化接收到的同步数据,封装成ClientSyncData,然后处理同步数据:

private void handlerClientSyncData(ClientSyncData clientSyncData) {
    Loggers.DISTRO
            .info("[Client-Add] Received distro client sync data {}, revision={}", clientSyncData.getClientId(),
                    clientSyncData.getAttributes().getClientAttribute(ClientConstants.REVISION, 0L));
    // 同步客户端连接,生成client:不存在时创建client(IpPortBasedClient)
    clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
    // 获取Client
    Client client = clientManager.getClient(clientSyncData.getClientId());
    // 更新Client数据
    upgradeClient(client, clientSyncData);
}

同步客户端连接,然后获取到客户端对象,更新客户端的注册表信息,并发布一些事件:

private void upgradeClient(Client client, ClientSyncData clientSyncData) {
    Set<Service> syncedService = new HashSet<>();
    // process batch instance sync logic
    processBatchInstanceDistroData(syncedService, client, clientSyncData);
    List<String> namespaces = clientSyncData.getNamespaces();
    List<String> groupNames = clientSyncData.getGroupNames();
    List<String> serviceNames = clientSyncData.getServiceNames();
    List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
    
    for (int i = 0; i < namespaces.size(); i++) {
        Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        syncedService.add(singleton);
        InstancePublishInfo instancePublishInfo = instances.get(i);
        if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
            // 添加注册表信息,并发布ClientRegisterServiceEvent
            client.addServiceInstance(singleton, instancePublishInfo);
            NotifyCenter.publishEvent(
                    new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
        }
    }
    for (Service each : client.getAllPublishedService()) {
        if (!syncedService.contains(each)) {
            // 删除注册表信息,并发布ClientDeregisterServiceEvent
            client.removeServiceInstance(each);
            NotifyCenter.publishEvent(
                    new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
        }
    }
    client.setRevision(clientSyncData.getAttributes().<Integer>getClientAttribute(ClientConstants.REVISION, 0));
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1719475.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Vue】v-for中的key

文章目录 一、引入问题二、分析问题 一、引入问题 语法&#xff1a; key属性 "唯一值" 作用&#xff1a;给列表项添加的唯一标识。便于Vue进行列表项的正确排序复用。 为什么加key&#xff1a;Vue 的默认行为会尝试原地修改元素&#xff08;就地复用&#xff09;…

华媒舍:10种欧洲地区媒体发稿推广技巧

1.了解欧洲地区媒体自然环境必须掌握欧洲地区媒体的发稿推广方法&#xff0c;首先要对欧洲地区媒体自然环境有一定的了解。包含不一样国家的主力媒体&#xff0c;他的阅读者人群、销售市场遮盖及其报导风格等。仅有熟悉媒体自然环境&#xff0c;才能更好的制订营销推广策略。 …

【Unity Shader入门精要 第11章】让画面动起来(一)

1. Unity Shader中的时间变量 Shader控制这物体的显示&#xff0c;当向Shader中引入时间变量后&#xff0c;就可以让物体的显示效果随时间发生变化&#xff0c;以实现动画效果。 Unity中常见的时间变量如下表&#xff1a; 变量类型描述_Timefloat4(t/20, t, 2t, 3t)&#xf…

Visual Studio 2022创建dll并调用

需求&#xff1a; 创建A项目&#xff0c;有函数和类&#xff0c;将A项目生成DLL动态链接库 创建B项目&#xff0c;使用A项目生成的dll和lib相关文件 正常项目开发.h用于函数声明&#xff0c;.cpp用于函数实现&#xff0c;但是项目开发往往不喜欢将.cpp函数实现的代码发给别人&…

git使用流程与规范

原文网址&#xff1a;git代码提交流程与规范-CSDN博客 简介 本文git提交流程与规范是宝贵靠谱的经验&#xff0c;它能解决如下问题&#xff1a; 分支差距过大&#xff0c;导致合代码无数的冲突合完代码后发现代码丢失分支不清晰&#xff0c;无法追溯问题合代码耗时很长&…

计算机视觉与模式识别实验1-1 图像的直方图平衡

文章目录 &#x1f9e1;&#x1f9e1;实验流程&#x1f9e1;&#x1f9e1;1.读入图像‘rice.png’&#xff0c;在一个窗口中显示灰度级n64&#xff0c;128和256的图像直方图。2.调解图像灰度范围&#xff0c;观察变换后的图像及其直方图的变化。3.分别对图像‘pout.tif’和‘ti…

unity2D跑酷游戏

项目成果 项目网盘 导入资源包 放入Assets文件Assets资源文件 游戏流程分析 摄像机size调小&#xff0c;让图片占满屏幕 人跑本质&#xff0c;相对运动&#xff0c;图片无限向右滚动 图片720&#xff0c;缩小100倍第二个图片x为7.2每unit px100两张图片刚好挨着连贯 空对象Bg…

(奇幻森林)POLYGON - Enchanted Forest - Nature Biomes - 3D Environment Art by Synty

各种雄伟的树木,装饰着优雅简化的树叶,在头顶形成了一个天堂般的树冠,在苔藓覆盖的森林地面上投下了宁静的咒语。 每一项资产,从引人入胜的环境材料到平缓的波浪状山丘,都经过精心制作,将您带到魔法和自然融合的地方。POLYGON-魔法森林-自然生物技术为数字领域注入真正魔…

搭载算能 BM1684 芯片,面向AI推理计算加速卡

搭载算能 BM1684 芯片&#xff0c;是面向AI推理的算力卡。可集成于服务器、工控机中&#xff0c;高效适配市场上所有AI算法&#xff0c;实现视频结构化、人脸识别、行为分析、状态监测等应用&#xff0c;为智慧城市、智慧交通、智慧能源、智慧金融、智慧电信、智慧工业等领域进…

FreeRtos进阶——软件定时器内部逻辑

在FreeRtos软件定时器&#xff0c;是根据Systick来判断定时是否到达&#xff0c;可以是单次定时器也可以是循环定时器。在创建定时器任务后&#xff0c;在每一次SysTick中断中&#xff0c;会将定时器时钟到的任务写入定时器任务队列。在prvTimerTask任务&#xff08;守护任务&a…

JVM之【运行时数据区1】

JVM简图 运行时数据区简图 一、程序计数器&#xff08;Program Counter Register&#xff09; 1.程序计数器是什么&#xff1f; 程序计数器是JVM内存模型中的一部分&#xff0c;它可以看作是一个指针&#xff0c;指向当前线程所执行的字节码指令的地址。每个线程在执行过程中…

基础—SQL—DQL(数据查询语言)排序查询

一、引言 排序查询这里面涉及的关键字&#xff1a;ORDER BY。在我们日常的开发中&#xff0c;这个是很常见的&#xff0c;比如打开一个网购的商城&#xff0c;这里面可以找到一个销量的排序、综合的排序、价格的排序&#xff08;升序、降序&#xff09;等等。接下来就学习这一部…

前端传String字符串 后端使用enun枚举类出现错误

情况 前端 String 后端 enum 前端 后端 报错 2024-05-31T21:47:40.61808:00 WARN 21360 --- [nio-8080-exec-6] .w.s.m.s.DefaultHandlerExceptionResolver : Resolved [org.springframework.web.method.annotation.MethodArgumentTypeMismatchException: Failed to con…

《QT实用小工具·六十九》基于QT开发的五子棋AI游戏

1、概述 源码放在文章末尾 该项目实现了五子棋对战AI&#xff0c;可以享受和AI下棋的快乐&#xff0c;项目实现思路如下&#xff1a; 博弈树 ●Alpha-Beta剪枝(性能提高较大) ●启发式搜索(性能提高较大) ●落子区域限制(性能提高较大) ●Zobrist哈希(性能小幅提升) ●Qt…

【再探】设计模式—访问者模式、策略模式及状态模式

访问者模式是用于访问复杂数据结构的元素&#xff0c;对不同的元素执行不同的操作。策略模式是对于具有多种实现的算法&#xff0c;在运行过程中可动态选择使用哪种具体的实现。状态模式是用于具有不同状态的对象&#xff0c;状态之间可以转换&#xff0c;且不同状态下对象的行…

记mapboxGL实现鼠标经过高亮时的一个问题

概述 mapboxGL实现鼠标经过高亮可通过注册图层的mousemove和moveout事件来实现&#xff0c;在mousemove事件中可以拿到当前经过的要素&#xff0c;但是当使用该要素时&#xff0c;发现在某个地图级别下会有线和面数据展示不全的情况。究其原因&#xff0c;发现是mapboxGL在绘图…

2024Dragon Knight CTF复现web

穿梭隐藏的密钥 首先看看页面的源代码&#xff0c;但是发现f12和鼠标右键都被禁用了 用ctrlu查看&#xff0c;发现一个可疑页面 访问看看&#xff0c;发现还是只有一张图&#xff0c;查看源代码发现提示 扩展&#xff1a; Fuzz&#xff1a;Fuzz是一种基于黑盒的自动化软件模糊…

数据结构与算法笔记:基础篇 - 栈:如何实现浏览器的前进和后退功能?

概述 浏览器的前进、后退功能&#xff0c;你肯定很熟悉吧&#xff1f; 当依次访问完一串页面 a-b-c 之后&#xff0c;点击浏览器的后退按钮&#xff0c;就可以查看之前浏览过的页面 b 和 a。当后退到页面 a&#xff0c;点击前进按钮&#xff0c;就可以重新查看页面 b 和 c。但…

C/S模型测试

1 1.1代码示例 #include<stdio.h> #include<stdio.h>#include <sys/types.h> /* See NOTES */ #include <sys/socket.h>#include <netinet/in.h> #include <netinet/ip.h> /* superset of previous */ #include <arpa/inet.…

004 仿muduo实现高性能服务器组件_Buffer模块与Socket模块的实现

​&#x1f308;个人主页&#xff1a;Fan_558 &#x1f525; 系列专栏&#xff1a;仿muduo &#x1f339;关注我&#x1f4aa;&#x1f3fb;带你学更多知识 文章目录 前言Buffer模块Socket模块 小结 前言 这章将会向你介绍仿muduo高性能服务器组件的buffer模块与socket模块的实…