Apache 神禹(shenyu)源码阅读(三)——被网关路由的后端服务 Client 向 Admin 注册的数据传输(Client端)

news2024/11/24 20:30:32

前言

在真正测试 Divide 插件时,想要知道后端服务(以下称为 Client)是如何将自己的信息注册到管理台(以下称为 Client)。这里后端服务用的是 shenyu 自带的 http 的例子,项目名字为 shenyu-examples-http。

下图描述了本文研究的内容——服务注册时 Client端向 Admin 注册的数据同步——在 shenyu 架构中处于什么位置。红色部分都是我自己加的,在官网的图中没有。
在这里插入图片描述

阅读准备

Disruptor入门及应用

正文

Client事件监听器监听本地的 Context 的刷新事件

当 Client (Spring 应用)依赖注入后,Spring 框架会刷新上下文 Context,这时,shenyu 自定义的一个监听 ContextRefreshedEvent 的监听器 SpringMvcClientEventListener (AbstractContextRefreshedEventListener 的子类)会触发 onApplicationEvent 方法。

  • AbstractContextRefreshedEventListener.onApplicationEvent()
public abstract class AbstractContextRefreshedEventListener<T, A extends Annotation> implements ApplicationListener<ContextRefreshedEvent> {
	
	// ...
   	
   	@Override
    public void onApplicationEvent(@NonNull final ContextRefreshedEvent event) {
        context = event.getApplicationContext();
        // 1. 拿到 beans
        Map<String, T> beans = getBeans(context);
        if (MapUtils.isEmpty(beans)) {
            return;
        }
        // 2. 原子地设置 registered 为 true
        if (!registered.compareAndSet(false, true)) {
            return;
        }
        if (isDiscoveryLocalMode) {
            // 3. 如果是“本地发现”模式,发布用于注册 URI 的 DTO
            publisher.publishEvent(buildURIRegisterDTO(context, beans));
        }
        // 4. 处理每个 bean,具体是发布 bean 的注册信息给 Disruptor 的 QueueConsumer
        beans.forEach(this::handle);
        // 5. apiModules 的 key 是 beanName,value 是 bean 的成员变量
        Map<String, Object> apiModules = context.getBeansWithAnnotation(ApiModule.class);
        // 6. 处理每个 apiModules,具体是发布 apiModules 的注册信息给 Disruptor 的 QueueConsumer
        apiModules.forEach((k, v) -> handleApiDoc(v, beans));
    }
    
    protected void handle(final String beanName, final T bean) {
    	// ...
    }
    
    private void handleApiDoc(final Object bean, final Map<String, T> beans) {
    	// ...
    }
}

从 SpringMvcClientEventListener.getBeans() 拿到 Beans

  • SpringMvcClientEventListener.java
public class SpringMvcClientEventListener extends AbstractContextRefreshedEventListener<Object, ShenyuSpringMvcClient> {

	// ...
	
	private final ShenyuClientRegisterEventPublisher publisher = ShenyuClientRegisterEventPublisher.getInstance();
	
    @Override
    protected Map<String, Object> getBeans(final ApplicationContext context) {
        // Filter out
        // isFull 这个 Boolean 值代表的是:是否代理整个服务,目前适用于 SpringMvc/SpringCould
        if (Boolean.TRUE.equals(isFull)) {
            // 在全代理模式下,发布一个事件,这个事件包含了服务的元数据,用于注册服务
            getPublisher().publishEvent(MetaDataRegisterDTO.builder()
                    .contextPath(getContextPath()) // 设置服务的上下文路径
                    .addPrefixed(addPrefixed) // 设置是否添加前缀
                    .appName(getAppName()) // 设置应用名称
                    .path(UriComponentsBuilder.fromUriString(PathUtils.decoratorPathWithSlash(getContextPath()) + EVERY_PATH).build().encode().toUriString())
                    // 设置服务的路径,这里使用了 UriComponentsBuilder 来构建URI,将上下文路径装饰后加上一个通配符,代表匹配所有路径
                    .rpcType(RpcTypeEnum.HTTP.getName()) // 设置远程调用类型为 HTTP
                    .enabled(true) // 设置服务为启用状态
                    .ruleName(getContextPath()) // 使用上下文路径作为规则名称
                    .build());
            LOG.info("init spring mvc client success with isFull mode");
            // 发布一个 URI 注册的事件,传入空的映射作为参数
            publisher.publishEvent(buildURIRegisterDTO(context, Collections.emptyMap()));
            return Collections.emptyMap();
        }
        // shenyu-examples-http 用的不是全代理模式,因为 isFull 为 false,此时直接返回带 Controller 注解的 bean
        return context.getBeansWithAnnotation(Controller.class);
    }
}

publisher.publishEvent(buildURIRegisterDTO(context, Collections.emptyMap())); 发布一个 URI 注册的事件,传入空的映射作为参数。

ShenyuClientRegisterEventPublisher 给 Client 端的 Disruptor 的 QueueConsumer 发布要向 Admin 注册的数据(是的,此时还没传给 Admin,还停留在 Client 端)
  • ShenyuClientRegisterEventPublisher.publishEvent() 调用 DisruptorProvider.onData() 传递数据
public class ShenyuClientRegisterEventPublisher {
    
    // ...
    
    private DisruptorProviderManage<DataTypeParent> providerManage;
    
    public void publishEvent(final DataTypeParent data) {
        DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
        // data 传给 Disruptor provider 
        provider.onData(data);
    }
}
  • DisruptorProvider 传递给 RingBuffer.publishEvent(),最终将注册的信息发布给 Diruptor 的 QueueConsumer。

    ps: Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,能够在无锁的情况下实现网络的Queue并发操作,基于Disruptor开发的系统单线程能支撑每秒600万订单。

public class DisruptorProvider<T> {
    
    // ...
    
    private final RingBuffer<DataEvent<T>> ringBuffer;
    
    private final boolean isOrderly;
    
    private final EventTranslatorOneArg<DataEvent<T>, T> translatorOneArg = (event, sequence, t) -> event.setData(t);
    
    // ...
    
    public void onData(final T data) {
        if (isOrderly) {
            throw new IllegalArgumentException("The current provider is  of orderly type. Please use onOrderlyData() method.");
        }
        try {
            // 由  ringBuffer 发布事件
            ringBuffer.publishEvent(translatorOneArg, data);
        } catch (Exception ex) {
            logger.error("ex", ex);
        }
    }
}
由 QueueConsumer.onEvent() 接收 RingBuffer.publishEvent() 发布的事件,并进行处理
  • 从 DisruptorProviderManage.startup 的源码中可以看到,在创建 Disruptor 时,线程池 OrderlyExecutor 被传进了 QueueConsumer,
public class DisruptorProviderManage<T> {
	
	// ...
   
    private final Integer consumerSize;
    
    private final QueueConsumerFactory<T> consumerFactory;
 	
 	// ...
    
    public void startup(final boolean isOrderly) {
        // 创建一个定制的线程池,用于消费者
        OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());
        int newConsumerSize = this.consumerSize;
        EventFactory<DataEvent<T>> eventFactory;
        // 根据是否有序来调整消费者数量和选择事件工厂
        if (isOrderly) {
            // 有序模式下,消费者数量设为1,使用有序的事件工厂
            newConsumerSize = 1;
            eventFactory = new OrderlyDisruptorEventFactory<>();
        } else {
            // 无序模式下,使用默认的事件工厂
            eventFactory = new DisruptorEventFactory<>();
        }
        // 创建Disruptor实例,配置其基本参数
        Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,
                size,
                DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),
                ProducerType.MULTI,
                new BlockingWaitStrategy());
        // 创建消费者数组,根据newConsumerSize指定的大小
        @SuppressWarnings("all")
        QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];
        for (int i = 0; i < newConsumerSize; i++) {
            consumers[i] = new QueueConsumer<>(executor, consumerFactory);
        }
        // 将消费者注册到Disruptor,使用工作池模式
        disruptor.handleEventsWithWorkerPool(consumers);
        // 设置默认的异常处理器,这里选择忽略异常
        disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
        // 启动Disruptor
        disruptor.start();
        // 获取Disruptor的环形缓冲区,用于发布事件
        RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
        // 创建并存储DisruptorProvider实例,用于向Disruptor发布事件
        provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);
    }
}
  • 当接收到一个事件时,QueueConsumer 将任务交给线程池去处理事件,处理事件的 Runnable 接口由工厂 factory 产生。
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {
	
	// ...
	
	private final QueueConsumerFactory<T> factory;
	
	// ...
    
    @Override
    public void onEvent(final DataEvent<T> t) {
        if (Objects.nonNull(t)) {
            ThreadPoolExecutor executor = orderly(t);
            QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
            queueConsumerExecutor.setData(t.getData());
            // help gc
            t.setData(null);
            executor.execute(queueConsumerExecutor);
        }
    }
}
  • QueueConsumerExecutor 在 Client 端的消费者执行器 RegisterClientConsumerExecutor
/**
 * The type Consumer executor.
 */
public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {
    
    private final Map<DataType, ExecutorTypeSubscriber<T>> subscribers;
    
    private RegisterClientConsumerExecutor(final Map<DataType, ExecutorTypeSubscriber<T>> executorSubscriberMap) {
        this.subscribers = new EnumMap<>(executorSubscriberMap);
    }

    @Override
    // run 接口继承自 QueueConsumerExecutor,而 QueueConsumerExecutor 继承自 Runnable
    public void run() {
        final T data = getData();
       	// subscribers 拿到 ExecutorTypeSubscriber 去处理数据 data
        subscribers.get(data.getType()).executor(Lists.newArrayList(data));
    }
    
    /**
     * The type Register client executor factory.
     */
    public static class RegisterClientExecutorFactory<T extends DataTypeParent> extends AbstractQueueConsumerFactory<T> {
        
        @Override
        public RegisterClientConsumerExecutor<T> create() {
            Map<DataType, ExecutorTypeSubscriber<T>> map = getSubscribers()
                    .stream()
                    // 将 AbstractQueueConsumerFactory.getSubscribers()
                    // 接口返回的 ExecutorSubscriber<T> 转为 ExecutorTypeSubscriber<T>,
                    // 其带有 getType 接口
                    .map(e -> (ExecutorTypeSubscriber<T>) e)
                    .collect(Collectors.toMap(ExecutorTypeSubscriber::getType, e -> e));
            return new RegisterClientConsumerExecutor<>(map);
        }

        @Override
        public String fixName() {
            return "shenyu_register_client";
        }
    }
}

ExecutorTypeSubscriber 继承自 ExecutorSubscriber :

public interface ExecutorTypeSubscriber<T extends DataTypeParent> extends ExecutorSubscriber<T> {`

从下图的 ExecutorTypeSubscriber 接口的实现类可以看到,在 Client 端有 3 个 Subscriber
在这里插入图片描述

我们这个例子看的是URI,所以就以 ShenyuClientURIExecutorSubscriber 举例。

数据交由 ShenyuClientURIExecutorSubscriber 执行处理

  • ShenyuClientURIExecutorSubscriber.execute()
public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {
	
	// ...
    
    private final ShenyuClientRegisterRepository shenyuClientRegisterRepository;
    @Override
    public void executor(final Collection<URIRegisterDTO> dataList) {
        for (URIRegisterDTO uriRegisterDTO : dataList) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            while (true) {
                // 连得上就跳出死循环
                try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {
                    break;
                } catch (IOException e) {
                    long sleepTime = 1000;
                    // maybe the port is delay exposed
                    if (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {
                        LOG.error("host:{}, port:{} connection failed, will retry",
                                uriRegisterDTO.getHost(), uriRegisterDTO.getPort());
                        // If the connection fails for a long time, Increase sleep time
                        if (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {
                            sleepTime = 10000;
                        }
                    }
                    try {
                        TimeUnit.MILLISECONDS.sleep(sleepTime);
                    } catch (InterruptedException ex) {
                        LOG.error("interrupted when sleep", ex);
                    }
                }
            }
            // 1. 延迟应用关闭时的其他钩子
            ShenyuClientShutdownHook.delayOtherHooks();
            // 2. 给 Admin 端发送 DTO 注册信息
            shenyuClientRegisterRepository.persistURI(uriRegisterDTO);
            // 3. 向应用添加一个钩子,使得在应用关闭时,应用自动开启一个新线程去注销注册信息
            ShutdownHookManager.get().addShutdownHook(new Thread(() -> {
                final URIRegisterDTO offlineDTO = new URIRegisterDTO();
                BeanUtils.copyProperties(uriRegisterDTO, offlineDTO);
                offlineDTO.setEventType(EventType.OFFLINE);
                // 给 Admin 端发送下线 DTO
                shenyuClientRegisterRepository.offline(offlineDTO);
            }), 2);
        }
    }
}

有三个方法需要说明:

  1. ShenyuClientShutdownHook.delayOtherHooks() 延迟应用关闭时的其他钩子
  2. ShenyuClientRegisterRepository.persistURI() 给 Admin 端发送 DTO 注册信息
  3. ShutdownHookManager.get().addShutdownHook() 向应用添加一个钩子,使得在应用关闭时,应用自动开启一个新线程去注销注册信息
延迟应用关闭时的其他钩子
  • ShenyuClientShutdownHook.delayOtherHooks()

    1. 利用 CAS 不加锁地确保并发时 TakeoverOtherHooksThread 线程只被运行一次
    2. 一个接管其他钩子的线程
public class ShenyuClientShutdownHook {
	
	// ...
	
	private static final AtomicBoolean DELAY = new AtomicBoolean(false);

    private static String hookNamePrefix = "ShenyuClientShutdownHook";

    private static AtomicInteger hookId = new AtomicInteger(0);

    private static Properties props;

    private static IdentityHashMap<Thread, Thread> delayHooks = new IdentityHashMap<>();

    private static IdentityHashMap<Thread, Thread> delayedHooks = new IdentityHashMap<>();
	
	// ....
	
    /**
     * Delay other shutdown hooks.
     */
    public static void delayOtherHooks() {
    	// 1. 利用 CAS 不加锁地确保并发时 TakeoverOtherHooksThread 线程只被运行一次
        if (!DELAY.compareAndSet(false, true)) {
            return;
        }
        // 2. 一个接管其他钩子的线程
        TakeoverOtherHooksThread thread = new TakeoverOtherHooksThread();
        thread.start();
    }

    /**
     * Delay other shutdown hooks thread.
     */
    private static class TakeoverOtherHooksThread extends Thread {
        @Override
        // 1. 该线程用于生成钩子,这些钩子用来延迟执行已经添加的钩子,为的是处理一些资源的关闭,和注册信息的注销
        public void run() {
            int shutdownWaitTime = Integer.parseInt(props.getProperty("shutdownWaitTime", "3000"));
            int delayOtherHooksExecTime = Integer.parseInt(props.getProperty("delayOtherHooksExecTime", "2000"));
            IdentityHashMap<Thread, Thread> hooks = null;
            try {
                // 2. 通过反射拿到应用关闭时的所有钩子
                Class<?> clazz = Class.forName(props.getProperty("applicationShutdownHooksClassName", "java.lang.ApplicationShutdownHooks"));
                Field field = clazz.getDeclaredField(props.getProperty("applicationShutdownHooksFieldName", "hooks"));
                field.setAccessible(true);
                hooks = (IdentityHashMap<Thread, Thread>) field.get(clazz);
            } catch (ClassNotFoundException | NoSuchFieldException | IllegalAccessException ex) {
                LOG.error(ex.getMessage(), ex);
            }
            long s = System.currentTimeMillis();
            // 3. 限制处理钩子的时间在 delayOtherHooksExecTime 之内,为什么要控制时间,难道不会遗漏一些钩子无法延迟吗?
            // GPT:
            // 答:1. 避免死锁或长时间阻塞
            //    2. 可以确保这个延迟逻辑不会过度拖延应用的关闭过程
            //    3. 实用性考虑: 在大多数情况下,如果在给定的时间内无法连接到或修改某些钩子,可能是因为存在一些异常或特殊情况。
            //       在这种情况下,继续等待可能不会带来太多好处,而是增加了关闭过程的复杂性和不确定性。
            //    确实,这种方法可能会遗漏一些在延迟期间新注册的钩子,但这通常是一个权衡的结果,设计者可能认为这种情况很少发生,或者遗漏的风险相对较小。
            while (System.currentTimeMillis() - s < delayOtherHooksExecTime) {
                for (Iterator<Thread> iterator = Objects.requireNonNull(hooks).keySet().iterator(); iterator.hasNext();) {
                    Thread hook = iterator.next();
                    // 4. 用于延迟执行原本钩子的钩子不必再延迟,所以跳过
                    if (hook.getName().startsWith(hookNamePrefix)) {
                        continue;
                    }
                    // 5. 正在处理的延迟的钩子和处理过的延迟的钩子不必再延迟,所以跳过
                    if (delayHooks.containsKey(hook) || delayedHooks.containsKey(hook)) {
                        continue;
                    }
                    Thread delayHook = new Thread(() -> {
                        LOG.info("sleep {}ms", shutdownWaitTime);
                        try {
                            // 6. 先睡眠 shutdownWaitTime,然后再执行原本的在应用关闭时的钩子
                            TimeUnit.MILLISECONDS.sleep(shutdownWaitTime);
                        } catch (InterruptedException ex) {
                            LOG.error(ex.getMessage(), ex);
                        }
                        hook.run();
                    }, hook.getName());
                    delayHooks.put(delayHook, delayHook);
                    // 7. 从原本的钩子 map 中移除这个原本要执行的钩子,即 delayHook
                    iterator.remove();
                }

                for (Iterator<Thread> iterator = delayHooks.keySet().iterator(); iterator.hasNext();) {
                    Thread delayHook = iterator.next();
                    // 8. 向运行时加入用来延迟执行原本钩子的钩子,即 delayedHooks
                    Runtime.getRuntime().addShutdownHook(delayHook);
                    // 9. 加入已处理过的钩子 map,
                    delayedHooks.put(delayHook, delayHook);
                    iterator.remove();
                    LOG.info("hook {} will sleep {}ms when it start", delayHook.getName(), shutdownWaitTime);
                }
                try {
                    // 10. 睡眠 100ms,目的是?
                    // GPT:
                    // 答:1. 减少CPU使用率
                    //    2. 给其他操作留出处理时间,通过在每次循环后短暂休眠,可以给其他线程运行的机会
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException ex) {
                    LOG.error(ex.getMessage(), ex);
                }
            }
            // 帮助 GC
            hookNamePrefix = null;
            hookId = new AtomicInteger(0);
            props = null;
            delayHooks = null;
            delayedHooks = null;
        }
    }	
}
  • TakeoverOtherHooksThread.run()

    代码如上面给出的:

    1. 该线程用于生成钩子,这些钩子用来延迟执行已经添加的钩子,为的是处理一些资源的关闭,和注册信息的注销
    2. 通过反射拿到应用关闭时的所有钩子
    3. 限制处理钩子的时间在 delayOtherHooksExecTime 之内,为什么要控制时间,难道不会遗漏一些钩子无法延迟吗?
      GPT:
      答:
      1. 避免死锁或长时间阻塞
      2. 可以确保这个延迟逻辑不会过度拖延应用的关闭过程
      3. 实用性考虑: 在大多数情况下,如果在给定的时间内无法连接到或修改某些钩子,可能是因为存在一些异常或特殊情况。 在这种情况下,继续等待可能不会带来太多好处,而是增加了关闭过程的复杂性和不确定性。确实,这种方法可能会遗漏一些在延迟期间新注册的钩子,但这通常是一个权衡的结果,设计者可能认为这种情况很少发生,或者遗漏的风险相对较小。
    4. 用于延迟执行原本钩子的钩子不必再延迟,所以跳过
    5. 正在处理的延迟的钩子和处理过的延迟的钩子不必再延迟,所以跳过
    6. 先睡眠 shutdownWaitTime,然后再执行原本的在应用关闭时的钩子
    7. 从原本的钩子 map 中移除这个原本要执行的钩子,即 delayHook
    8. 向运行时加入用来延迟执行原本钩子的钩子,即 delayedHooks
    9. 加入已处理过的钩子 map
    10. 睡眠 100ms,目的是?
      GPT:
      答:
      1. 减少CPU使用率
      2. 给其他操作留出处理时间,通过在每次循环后短暂休眠,可以给其他线程运行的机会
给 Admin 端发送 DTO 注册信息
  • ShenyuClientRegisterRepository.persistURI()

    ShenyuClientRegisterRepositoryFailbackRegistryRepositoryHttpClientRegisterRepository继承关系如下图 在这里插入图片描述

  • ShenyuClientRegisterRepository.persistURI()

/**
 * Shenyu client register repository.
 */
@SPI
public interface ShenyuClientRegisterRepository {

    /**
     * Init.
     *
     * @param config the config
     */
    default void init(ShenyuRegisterCenterConfig config) {
    }
    
    /**
     * Persist metadata.
     *
     * @param metadata metadata
     */
    void persistInterface(MetaDataRegisterDTO metadata);
    
    /**
     * Persist uri.
     *
     * @param registerDTO the register dto
     */
    default void persistURI(URIRegisterDTO registerDTO) {
    }

    /**
     * Node active offline when shutdown.
     *
     * @param offlineDTO the offline dto
     */
    default void offline(URIRegisterDTO offlineDTO) {
    }

    /**
     * persistApiDoc.
     * @param apiDocRegisterDTO apiDocRegisterDTO
     */
    default void persistApiDoc(ApiDocRegisterDTO apiDocRegisterDTO) {
    }
    
    /**
     * closeRepository.
     * If the close method is used, Spring will call it by default when the bean is destroyed,
     * So its method name is closeRepository to avoid being called by default when the bean is destroyed.
     */
    default void closeRepository() {
    }
}
  • FailbackRegistryRepository.persistURI()

    这里同样用到了模板方法,doPersistURI 交由子类 HttpClientRegisterRepository 实现

public abstract class FailbackRegistryRepository implements ShenyuClientRegisterRepository {
	
	// ... 
	
    @Override
    public void persistURI(final URIRegisterDTO registerDTO) {
        try {
        	// 1. 同样是模板方法,交由子类 HttpClientRegisterRepository 实现
            this.doPersistURI(registerDTO);
        } catch (Exception ex) {
            //If a failure occurs, it needs to be added to the retry list.
            logger.warn("Failed to persistURI {}, cause:{}", registerDTO, ex.getMessage());
            this.addFailureUriDataRegister(registerDTO);
        }
    }
}
  • HttpClientRegisterRepository.doPersistURI()

    1. 如果端口已被其他进程监听,则直接返回,不需要再注册
    2. 否则注册
public class HttpClientRegisterRepository extends FailbackRegistryRepository {
    
    // ...
    
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientRegisterRepository.class);

    private static URIRegisterDTO uriRegisterDTO;

    private static ApiDocRegisterDTO apiDocRegisterDTO;

    private String username;
    
    private String password;
    
    private List<String> serverList;
    
    /**
     * server -> accessToken.
     */
    private LoadingCache<String, String> accessToken;
    
    // ...
    
    @Override
    public void doPersistURI(final URIRegisterDTO registerDTO) {
        if (RuntimeUtils.listenByOther(registerDTO.getPort())) {
        	// 1. 如果端口已被其他进程监听,则直接返回,不需要再注册
            return;
        }
        // 2. 否则注册
        doRegister(registerDTO, Constants.URI_PATH, Constants.URI);
        uriRegisterDTO = registerDTO;
    }
    
    private <T> void doRegister(final T t, final String path, final String type) {
        int i = 0;
        for (String server : serverList) {
            i++;
            String concat = server.concat(path);
            try {
                String accessToken = this.accessToken.get(server);
                if (StringUtils.isBlank(accessToken)) {
                    throw new NullPointerException("accessToken is null");
                }
                // 1. 调用注册工具类进行注册
                RegisterUtils.doRegister(GsonUtils.getInstance().toJson(t), concat, type, accessToken);
                // considering the situation of multiple clusters, we should continue to execute here
            } catch (Exception e) {
                LOGGER.error("Register admin url :{} is fail, will retry. cause:{}", server, e.getMessage());
                if (i == serverList.size()) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
}
  • HttpClientRegisterRepository.doRegister()

    1. 调用注册工具类进行注册(代码如上)
  • RegisterUtils.doRegister()

    1. 构建 http 的 heade
    2. 在此通过 http 调用 Admin 的服务进行注册,
      url 为 Admin 端的注册用的接口,有 localhost:9095/shenyu-client/register-metadata 等url;
      json 为要传输的注册信息
    3. OkHttpTools 是 shenyu 对 okhttp 外部组件的封装
public final class RegisterUtils {

	// ...
	
    public static void doRegister(final String json, final String url, final String type, final String accessToken) throws IOException {
        if (StringUtils.isBlank(accessToken)) {
            LOGGER.error("{} client register error accessToken is null, please check the config : {} ", type, json);
            return;
        }
        // 1. 构建 http 的 header
        Headers headers = new Headers.Builder().add(Constants.X_ACCESS_TOKEN, accessToken).build();
        // 2. 在此通过 http 调用 Admin 的服务进行注册,
        //    url 为 Admin 端的注册用的接口,有 localhost:9095/shenyu-client/register-metadata 等url;
        //    json 为要传输的注册信息
        // 3. OkHttpTools 是 shenyu 对 okhttp 外部组件的封装
        String result = OkHttpTools.getInstance().post(url, json, headers);
        if (Objects.equals(SUCCESS, result)) {
            LOGGER.info("{} client register success: {} ", type, json);
        } else {
            LOGGER.error("{} client register error: {} ", type, json);
        }
    }
}
向应用添加一个钩子,使得在应用关闭时,应用自动开启一个新线程去注销注册信息
  • ShutdownHookManager.addShutdownHook()

    1. 向运行时添加一个关机钩子,这个钩子是一个新线程,新线程去执行 ShutdownHookManager 管理的要在关机时执行的钩子
    2. 添加关闭应用时要执行的注销注册的钩子
public final class ShutdownHookManager {
	
	// ...
	
	private static final ShutdownHookManager MGR = new ShutdownHookManager();
   
    private final Set<HookEntry> hooks =
            Collections.synchronizedSet(new HashSet<HookEntry>());	
    
    static {
    	// 1. 向运行时添加一个关机钩子,这个钩子是一个新线程,
    	// 新线程去执行 ShutdownHookManager  管理的要在关机的钩子
        Runtime.getRuntime().addShutdownHook(
                new Thread(() -> {
                    MGR.shutdownInProgress.set(true);
                    for (Runnable hook : MGR.getShutdownHooksInOrder()) {
                        try {
                            hook.run();
                        } catch (Throwable ex) {
                            LOG.error(ex.getMessage(), ex);
                        }
                    }
                })
        );
    }

	// ...
	
    public void addShutdownHook(final Runnable shutdownHook, final int priority) {
        if (shutdownHook == null) {
            throw new IllegalArgumentException("shutdownHook cannot be NULL");
        }
        if (shutdownInProgress.get()) {
            throw new IllegalStateException("Shutdown in progress, cannot add a shutdownHook");
        }
        // 2. 添加关闭应用时要执行的注销注册的钩子
        hooks.add(new HookEntry(shutdownHook, priority));
    }
}

一张图总结

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1449685.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

QlikSense财务聚合函数:IRR/NPV/XIRR/XNPV

IRR - 脚本函数 IRR() 函数用于返回聚合内部回报率&#xff0c;以揭示迭代于 group by 子句定义的大量记录上的表达式的数值表示的现金流系列。 这些现金流不必是均值&#xff0c;因为它们可用于年金。但是&#xff0c;现金流必须定期出现&#xff0c;例如每月或每年。内部收…

BUGKU-WEB 你必须让他停下

题目描述 题目截图如下&#xff1a; 进入场景看看&#xff1a; 解题思路 图片会消失,那应该是使用了js来控制根据提示,那就是要停止js才会看到flag (也就是要抓包,不要陷入停止js的思维) 相关工具 F12大法Burp Suit抓包工具 解题步骤 出现图片的时候,源码中确实出现…

CISA知识点

审计流程21%&#xff1b;运营和业务恢复23%&#xff1b;保护资产27%&#xff1b;IT治理17%&#xff1b;开发12%。 领域1-信息系统审计流程 规划-现场工作-报告 &#xff08;1&#xff09;审计规划 了解业务使命、目标、目的和流程 找到相关规定 实施风险分析&#xff08;…

权限提升:利用Linux错配提权

目录 Linux权限基础 Linux用户权限 Linux文件权限 特殊的Linux文件权限 Linux本机信息收集 Linux错配提权 crontab计划任务提权 SUID提权 Linux权限基础 Linux用户权限 在Linux中&#xff0c;根据权限的不同&#xff0c;大致可以分为三种&#xff1a;超级用户&#x…

linux内核原理--用户态线性地址空间,mmap,malloc,缺页异常

1.概述 前面我们介绍了内核态线性地址空间划分&#xff0c;及在内核态运行时&#xff0c;如何利用伙伴系统完成连续可用物理页框申请和释放。如何利用小块内存分配器实现高效的动态内存分配和释放。如何利用vmalloc&#xff0c;vfree完成线性地址连续但物理地址不连续的多个页框…

山西电力市场日前价格预测【2024-02-14】

日前价格预测 预测说明&#xff1a; 如上图所示&#xff0c;预测明日&#xff08;2024-02-14&#xff09;山西电力市场全天平均日前电价为203.58元/MWh。其中&#xff0c;最高日前电价为348.00元/MWh&#xff0c;预计出现在19:00。最低日前电价为0.00元/MWh&#xff0c;预计出…

DarkSide针对VMware EXSI系统进行加密

前言 最近黑客组织利用DarkSide勒索病毒对Colonial Pipeline 发起勒索攻击&#xff0c;国内外各大安全厂商和安全媒体也都有相关报道&#xff0c;DarkSide勒索软件是从2020年8月出现&#xff0c;并以(RAAS)勒索即服务的商业模式进行运作&#xff0c;此勒索病毒不仅可以部署基于…

片上网络NoC(6)——路由算法

目录 一、概述 二、路由算法的类型 三、避免死锁 四、实现 4.1 源路由实现 4.2 基于节点查找表的路由实现 4.3 组合电路实现 五、总结 一、概述 路由算法&#xff08;routing algorithm&#xff09;&#xff0c;即决定数据包在网络拓扑中从起点到终点路径的算法。路由算…

作为国产大模型之光的智谱AI,究竟推出了多少模型?一篇文章带你详细了解!

虽然OpenAI发布了一系列基于GPT模型的产品&#xff0c;在不同领域取得了很高的成就。但是作为LLM领域绝对的领头羊&#xff0c;OpenAI没有按照其最初的Open初衷行事。无论是ChatGPT早期采用的GPT3&#xff0c;还是后来推出的GPT3.5和GPT4模型&#xff0c;OpenAI都因为担心被滥用…

人工智能时代

一、人工智能发展历史:从概念到现实 人工智能(Artificial Intelligence,简称AI)是计算机科学领域中一门旨在构建能够执行人类智能任务的系统的分支。其发展历程充满曲折,从概念的提出到如今的广泛应用,是技术、理论和实践相互交织的产物。 1. 起源(20世纪中期) 人工智…

OWASP TOP10

OWASP TOP10 OWASP网址&#xff1a;http://ww.owasp.org.cn A01&#xff1a;失效的访问控制 例如&#xff1a;越权漏洞 案例1&#xff1a; 正常&#xff1a;每个人登录教务系统&#xff0c;只能查询自己的成绩信息 漏洞&#xff1a;张三登录后可以查看自己的成绩 例如&…

WebSocket 通信流程,注解和Spring实现WebSocket ,实战多人聊天室系统

一、前言 实现即时通信常见的有四种方式-分别是&#xff1a;轮询、长轮询(comet)、长连接(SSE)、WebSocket。 ①短轮询 很多网站为了实现推送技术&#xff0c;所用的技术都是轮询。轮询是在特定的的时间间隔&#xff08;如每1秒&#xff09;&#xff0c;由客户端浏览器对服务…

JavaScript中的querySelector()方法是什么,它是如何工作的?

在JavaScript中&#xff0c;有时您需要访问HTML元素。querySelector方法是一个Web API&#xff0c;它选择与传入的指定CSS选择器匹配的第一个元素。 但是&#xff0c;更详细地说&#xff0c;这是如何工作的呢&#xff1f;在本文中&#xff0c;我们将看一些如何使用querySelect…

2022年12月电子学会青少年软件编程 中小学生Python编程等级考试二级真题解析(判断题)

2022年12月Python编程等级考试二级真题解析 判断题&#xff08;共10题&#xff0c;每题2分&#xff0c;共20分&#xff09; 26、字典的元素可以通过键来访问&#xff0c;也可以通过索引(下标)来访问 答案&#xff1a;错 考点分析&#xff1a;考查字典相关知识&#xff0c;字…

Java与JavaScript的区别与联系

Java是目前编程领域使用非常广泛的编程语言&#xff0c;相较于JavaScript&#xff0c;Java更被人们熟知。很多Java程序员想学门脚本语言&#xff0c;一看JavaScript和Java这么像&#xff0c;很有亲切感&#xff0c;那干脆就学它了&#xff0c;这也间接的帮助了JavaScript的发展…

vivado Shift Registers、Dynamic Shift Registers

移位寄存器是一个触发器链&#xff0c;允许数据在固定&#xff08;静态&#xff09;数字上传播延迟阶段。相反&#xff0c;在动态移位寄存器中&#xff0c;传播链的长度在电路操作期间动态变化。从“coding”下载编码示例文件示例。 静态移位寄存器元件 静态移位寄存器通常包…

收藏:不错的讲座《拆解成功领导者的三重底层思维逻辑》

在B 站看到个不错的讲座《拆解成功领导者的三重底层思维逻辑》&#xff0c;地址&#xff1a;第145期-拆解成功领导者的三重底层思维逻辑_哔哩哔哩_bilibili 演讲内容文章摘要在这里&#xff1a;《直播精华 | 拆解成功领导者的思维逻辑》&#xff08;直播精华 | 拆解成功领导者的…

揭秘 2024 春晚刘谦魔术——代码还原

其他系列文章导航 Java基础合集数据结构与算法合集 设计模式合集 多线程合集 分布式合集 ES合集 文章目录 其他系列文章导航 文章目录 前言 一、魔术大概流程 二、代码实现各个步骤 2.1 partition&#xff08;对半撕牌&#xff09; 2.2 bottom&#xff08;将 n 张牌置底…

力扣1732. 找到最高海拔(前缀和)

Problem: 1732. 找到最高海拔 文章目录 题目描述思路及解法复杂度Code 题目描述 思路及解法 1.求取数组gain的大小 n n n; 2.定义一个大小为 n 1 n 1 n1的数组preSum; 3.先求取前 n n n个元素的前缀和&#xff0c;再最后单独处理preSum[n];其中preSum[n] preSum[n - 1] gai…

使用playwright进行自动化端到端测试

项目希望能接入自动化端到端测试提高可靠性&#xff0c;发现微软的 playwright 还挺好用的&#xff0c;推荐一下&#xff0c;顺便说下遇到的一些难点以及最佳实践。 难点 登录 项目不能帐号密码登录&#xff0c;只能扫二维码 临时方案是先自己扫码保存 cookie 用于测试&#…