【ShenYu系列】ShenYu Dubbo插件全流程源码解析

news2024/9/9 0:12:17

网关启动

ShenyuConfiguration注入ShenyuWebHandler

    @Bean("webHandler")
    public ShenyuWebHandler shenyuWebHandler(final ObjectProvider<List<ShenyuPlugin>> plugins, final ShenyuConfig config, @Lazy final ShenyuLoaderService shenyuLoaderService) {
        List<ShenyuPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);
        List<ShenyuPlugin> shenyuPlugins = pluginList.stream()
                .sorted(Comparator.comparingInt(ShenyuPlugin::getOrder)).collect(Collectors.toList());
        shenyuPlugins.forEach(shenyuPlugin -> LOG.info("load plugin:[{}] [{}]", shenyuPlugin.named(), shenyuPlugin.getClass().getName()));
        return new ShenyuWebHandler(shenyuPlugins, shenyuLoaderService, config);
    }

NacosSyncDataConfiguration注入用nacos的数据同步服务。

    @Bean
    public SyncDataService nacosSyncDataService(final ObjectProvider<ConfigService> configService, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
        LOGGER.info("you use nacos sync shenyu data.......");
        return new NacosSyncDataService(configService.getIfAvailable(), pluginSubscriber.getIfAvailable(),
                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
    }

该实例的构造方法中会执行NacosSyncDataService#start

    public void start() {
        watcherData(NacosPathConstants.PLUGIN_DATA_ID, this::updatePluginMap);
        watcherData(NacosPathConstants.SELECTOR_DATA_ID, this::updateSelectorMap);
        watcherData(NacosPathConstants.RULE_DATA_ID, this::updateRuleMap);
        watcherData(NacosPathConstants.META_DATA_ID, this::updateMetaDataMap);
        watcherData(NacosPathConstants.AUTH_DATA_ID, this::updateAuthMap);
    }

NacosCacheHandler#updatePluginMap,监听到数据,执行

    protected void updatePluginMap(final String configInfo) {
        try {
            // Fix bug #656(https://github.com/apache/shenyu/issues/656)
            List<PluginData> pluginDataList = new ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, PluginData.class).values());
            pluginDataList.forEach(pluginData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
                subscriber.unSubscribe(pluginData);
                subscriber.onSubscribe(pluginData);
            }));
        } catch (JsonParseException e) {
            LOG.error("sync plugin data have error:", e);
        }
    }

AbstractDubboPluginDataHandler#handlerPlugin,处理器,把数据存储到ApacheDubboConfigCache

public abstract class AbstractDubboPluginDataHandler implements PluginDataHandler {

    public static final Supplier<CommonHandleCache<String, DubboRuleHandle>> RULE_CACHED_HANDLE = new BeanHolder<>(CommonHandleCache::new);

    public static final Supplier<CommonHandleCache<String, List<DubboUpstream>>> SELECTOR_CACHED_HANDLE = new BeanHolder<>(CommonHandleCache::new);

    protected abstract void initConfigCache(DubboRegisterConfig dubboRegisterConfig);

    @Override
    public void handlerPlugin(final PluginData pluginData) {
        if (Objects.nonNull(pluginData) && Boolean.TRUE.equals(pluginData.getEnabled())) {
            DubboRegisterConfig dubboRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), DubboRegisterConfig.class);
            DubboRegisterConfig exist = Singleton.INST.get(DubboRegisterConfig.class);
            if (Objects.isNull(dubboRegisterConfig)) {
                return;
            }
            if (Objects.isNull(exist) || !dubboRegisterConfig.equals(exist)) {
                // If it is null, initialize it
                this.initConfigCache(dubboRegisterConfig);
            }
            Singleton.INST.single(DubboRegisterConfig.class, dubboRegisterConfig);
        }
    }
}

ApacheDubboPluginDataHandler

public class ApacheDubboPluginDataHandler extends AbstractDubboPluginDataHandler {

    @Override
    protected void initConfigCache(final DubboRegisterConfig dubboRegisterConfig) {
        ApacheDubboConfigCache.getInstance().init(dubboRegisterConfig);
        ApacheDubboConfigCache.getInstance().invalidateAll();
    }
}

服务调用

ShenyuWebHandler.DefaultShenyuPluginChain#execute,执行每个插件.。

        @Override
        public Mono<Void> execute(final ServerWebExchange exchange) {
            return Mono.defer(() -> {
                if (this.index < plugins.size()) {
                    ShenyuPlugin plugin = plugins.get(this.index++);
                    boolean skip = plugin.skip(exchange);
                    if (skip) {
                        return this.execute(exchange);
                    }
                    return plugin.execute(exchange, this);
                }
                return Mono.empty();
            });
        }

AbstractShenyuPlugin#execute,匹配选择器和插件。

    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
        initCacheConfig();
        final String pluginName = named();
        PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
        // early exit
        if (Objects.isNull(pluginData) || !pluginData.getEnabled()) {
            return chain.execute(exchange);
        }
        final String path = exchange.getRequest().getURI().getPath();
        List<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
        SelectorData selectorData = obtainSelectorDataCacheIfEnabled(path);
        // handle Selector
        if (Objects.nonNull(selectorData) && StringUtils.isBlank(selectorData.getId())) {
            return handleSelectorIfNull(pluginName, exchange, chain);
        }
        if (Objects.isNull(selectorData)) {
            if (CollectionUtils.isEmpty(selectors)) {
                return handleSelectorIfNull(pluginName, exchange, chain);
            }
            Pair<Boolean, SelectorData> matchSelectorData = matchSelector(exchange, selectors);
            selectorData = matchSelectorData.getRight();
            if (Objects.isNull(selectorData)) {
                if (matchCacheConfig.getSelector().getSelectorEnabled() && matchSelectorData.getLeft()) {
                    selectorData = new SelectorData();
                    selectorData.setPluginName(pluginName);
                    cacheSelectorData(path, selectorData);
                }
                return handleSelectorIfNull(pluginName, exchange, chain);
            } else {
                if (matchCacheConfig.getSelector().getSelectorEnabled() && matchSelectorData.getLeft()) {
                    cacheSelectorData(path, selectorData);
                }
            }
        }
        printLog(selectorData, pluginName);
        if (Objects.nonNull(selectorData.getContinued()) && !selectorData.getContinued()) {
            // if continued, not match rules
            return doExecute(exchange, chain, selectorData, defaultRuleData(selectorData));
        }
        List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
        if (CollectionUtils.isEmpty(rules)) {
            return handleRuleIfNull(pluginName, exchange, chain);
        }
        RuleData ruleData = obtainRuleDataCache(path);
        if (Objects.nonNull(ruleData) && Objects.isNull(ruleData.getId())) {
            return handleRuleIfNull(pluginName, exchange, chain);
        }
        if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
            //get last
            RuleData rule = rules.get(rules.size() - 1);
            printLog(rule, pluginName);
            return doExecute(exchange, chain, selectorData, rule);
        } else {
            // lru map as L1 cache,the cache is enabled by default.
            // if the L1 cache fails to hit, using L2 cache based on trie cache.
            // if the L2 cache fails to hit, execute default strategy.
            if (Objects.isNull(ruleData)) {
                // L1 cache not exist data, try to get data through trie cache
                ruleData = trieMatchRule(exchange, selectorData, path);
                // trie cache fails to hit, execute default strategy
                if (Objects.isNull(ruleData)) {
                    LOG.info("{} rule match path from default strategy", named());
                    Pair<Boolean, RuleData> matchRuleData = matchRule(exchange, rules);
                    ruleData = matchRuleData.getRight();
                    if (matchRuleData.getLeft()) {
                        ruleData = Optional.ofNullable(ruleData)
                                .orElse(RuleData.builder().pluginName(pluginName).matchRestful(false).build());
                        cacheRuleData(path, ruleData);
                    }
                }
            }
        }
        if (Objects.isNull(ruleData) || Objects.isNull(ruleData.getId())) {
            return handleRuleIfNull(pluginName, exchange, chain);
        }
        printLog(ruleData, pluginName);
        return doExecute(exchange, chain, selectorData, ruleData);
    }

GlobalPlugin构建上下文信息

public class GlobalPlugin implements ShenyuPlugin {
    
    private final ShenyuContextBuilder builder;
    
    /**
     * Instantiates a new Global plugin.
     *
     * @param builder the builder
     */
    public GlobalPlugin(final ShenyuContextBuilder builder) {
        this.builder = builder;
    }
    
    @Override
    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
        ShenyuContext shenyuContext = builder.build(exchange);
        exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
        return chain.execute(exchange);
    }
}

DefaultShenyuContextBuilder#build,构建上下文

    @Override
    public ShenyuContext build(final ServerWebExchange exchange) {
        Pair<String, MetaData> buildData = buildData(exchange);
        return decoratorMap.get(buildData.getLeft()).decorator(buildDefaultContext(exchange.getRequest()), buildData.getRight());
    }

    private Pair<String, MetaData> buildData(final ServerWebExchange exchange) {
        ServerHttpRequest request = exchange.getRequest();
        HttpHeaders headers = request.getHeaders();
        String rpcType = headers.getFirst(RPC_TYPE);
        if (StringUtils.isNotEmpty(rpcType)) {
            return Pair.of(rpcType, new MetaData());
        }
        String upgrade = headers.getFirst(UPGRADE);
        if (StringUtils.isNotEmpty(upgrade) && RpcTypeEnum.WEB_SOCKET.getName().equals(upgrade)) {
            return Pair.of(RpcTypeEnum.WEB_SOCKET.getName(), new MetaData());
        }
        MetaData metaData = MetaDataCache.getInstance().obtain(request.getURI().getPath());
        if (Objects.nonNull(metaData) && Boolean.TRUE.equals(metaData.getEnabled())) {
            exchange.getAttributes().put(Constants.META_DATA, metaData);
            return Pair.of(metaData.getRpcType(), metaData);
        } else {
            return Pair.of(RpcTypeEnum.HTTP.getName(), new MetaData());
        }
    }

RpcParamTransformPlugin用于获取参数信息

public class RpcParamTransformPlugin implements ShenyuPlugin {

    @Override
    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
        if (Objects.nonNull(shenyuContext)) {
            MediaType mediaType = request.getHeaders().getContentType();
            if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
                return body(exchange, request, chain);
            }
            if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {
                return formData(exchange, request, chain);
            }
            return query(exchange, request, chain);
        }
        return chain.execute(exchange);
    }
}

AbstractDubboPlugin#doExecute,检查元数据和参数,进行方法调用

    @Override
    public Mono<Void> doExecute(final ServerWebExchange exchange,
                                   final ShenyuPluginChain chain,
                                   final SelectorData selector,
                                   final RuleData rule) {
        String param = exchange.getAttribute(Constants.PARAM_TRANSFORM);
        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
        assert shenyuContext != null;
        MetaData metaData = exchange.getAttribute(Constants.META_DATA);
        if (!checkMetaData(metaData)) {
            LOG.error(" path is : {}, meta data have error : {}", shenyuContext.getPath(), metaData);
            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.META_DATA_ERROR);
            return WebFluxResultUtils.result(exchange, error);
        }
        if (Objects.nonNull(metaData) && StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(param)) {
            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
            Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.DUBBO_HAVE_BODY_PARAM);
            return WebFluxResultUtils.result(exchange, error);
        }
        this.rpcContext(exchange);
        return this.doDubboInvoker(exchange, chain, selector, rule, metaData, param);
    }

ApacheDubboPlugin#doDubboInvoker,开始dubbo的泛化调用。

    @Override
    protected Mono<Void> doDubboInvoker(final ServerWebExchange exchange,
                                        final ShenyuPluginChain chain,
                                        final SelectorData selector,
                                        final RuleData rule,
                                        final MetaData metaData,
                                        final String param) {
        RpcContext.getContext().setAttachment(Constants.DUBBO_SELECTOR_ID, selector.getId());
        RpcContext.getContext().setAttachment(Constants.DUBBO_RULE_ID, rule.getId());
        RpcContext.getContext().setAttachment(Constants.DUBBO_REMOTE_ADDRESS, Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress());
        final Mono<Object> result = dubboProxyService.genericInvoker(param, metaData, exchange);
        return result.then(chain.execute(exchange));
    }

ApacheDubboProxyService#genericInvoker

  • 获取ReferenceConfig对象;
  • 获取泛化服务GenericService对象;
  • 构造请求参数pair对象;
  • 发起异步的泛化调用。
    public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws ShenyuException {
        String referenceKey = metaData.getPath();
        String namespace = "";
        if (CollectionUtils.isNotEmpty(exchange.getRequest().getHeaders().get(Constants.NAMESPACE))) {
            namespace = exchange.getRequest().getHeaders().get(Constants.NAMESPACE).get(0);
            referenceKey = namespace + ":" + referenceKey;
        }
        ReferenceConfig<GenericService> reference = ApacheDubboConfigCache.getInstance().get(referenceKey);
        if (StringUtils.isEmpty(reference.getInterface())) {
            ApacheDubboConfigCache.getInstance().invalidate(referenceKey);
            reference = ApacheDubboConfigCache.getInstance().initRefN(metaData, namespace);
        }
        GenericService genericService = reference.get();
        Pair<String[], Object[]> pair;
        if (StringUtils.isBlank(metaData.getParameterTypes()) || ParamCheckUtils.bodyIsEmpty(body)) {
            pair = new ImmutablePair<>(new String[]{}, new Object[]{});
        } else {
            pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());
        }
        return Mono.fromFuture(invokeAsync(genericService, metaData.getMethodName(), pair.getLeft(), pair.getRight()).thenApply(ret -> {
            if (Objects.isNull(ret)) {
                ret = Constants.DUBBO_RPC_RESULT_EMPTY;
            }
            exchange.getAttributes().put(Constants.RPC_RESULT, ret);
            exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
            return ret;
        })).onErrorMap(exception -> exception instanceof GenericException ? new ShenyuException(((GenericException) exception).getExceptionMessage()) : new ShenyuException(exception));
    }

    @SuppressWarnings("unchecked")
    private CompletableFuture<Object> invokeAsync(final GenericService genericService, final String method, final String[] parameterTypes, final Object[] args) throws GenericException {
        //Compatible with asynchronous calls of lower Dubbo versions
        genericService.$invoke(method, parameterTypes, args);
        Object resultFromFuture = RpcContext.getContext().getFuture();
        return resultFromFuture instanceof CompletableFuture ? (CompletableFuture<Object>) resultFromFuture : CompletableFuture.completedFuture(resultFromFuture);
    }
    

ResponsePlugin#execute,处理响应结果

    @Override
    public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
        assert shenyuContext != null;
        return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);
    }

在这里插入图片描述

服务注册

启动shenyu-examples里面的shenyu-examples-apache-dubbo-service-annotation

AbstractContextRefreshedEventListener

  • 读取属性配置
  • 开启线程池
  • 启动注册中心,用于向shenyu-admin注册
    public AbstractContextRefreshedEventListener(final PropertiesConfig clientConfig,
                                                 final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
        Properties props = clientConfig.getProps();
        this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
        this.contextPath = Optional.ofNullable(props.getProperty(ShenyuClientConstants.CONTEXT_PATH)).map(UriUtils::repairData).orElse("");
        if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {
            String errorMsg = "client register param must config the appName or contextPath";
            LOG.error(errorMsg);
            throw new ShenyuClientIllegalArgumentException(errorMsg);
        }
        this.ipAndPort = props.getProperty(ShenyuClientConstants.IP_PORT);
        this.host = props.getProperty(ShenyuClientConstants.HOST);
        this.port = props.getProperty(ShenyuClientConstants.PORT);
        publisher.start(shenyuClientRegisterRepository);
    }

AbstractContextRefreshedEventListener#onApplicationEvent,获取所有的ServiceBean的Bean,处理元数据对象和处理URI对象。

    @Override
    public void onApplicationEvent(@NonNull final ContextRefreshedEvent event) {
        final ApplicationContext context = event.getApplicationContext();
        Map<String, T> beans = getBeans(context);
        if (MapUtils.isEmpty(beans)) {
            return;
        }
        if (!registered.compareAndSet(false, true)) {
            return;
        }
        publisher.publishEvent(buildURIRegisterDTO(context, beans));
        beans.forEach(this::handle);
        Map<String, Object> apiModules = context.getBeansWithAnnotation(ApiModule.class);
        apiModules.forEach((k, v) -> handleApiDoc(v, beans));
    }

ShenyuClientRegisterEventPublisher#publishEvent,使用disruptor来处理事件。

    public void publishEvent(final DataTypeParent data) {
        DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
        provider.onData(data);
    }

RegisterCenterConfiguration注入ShenyuClientServerRegisterRepository

    @Bean(destroyMethod = "close")
    public ShenyuClientServerRegisterRepository shenyuClientServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig,
                                                                               final List<ShenyuClientRegisterService> shenyuClientRegisterService) {
        String registerType = shenyuRegisterCenterConfig.getRegisterType();
        ShenyuClientServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuClientServerRegisterRepository.class).getJoin(registerType);
        RegisterClientServerDisruptorPublisher publisher = RegisterClientServerDisruptorPublisher.getInstance();
        Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, Function.identity()));
        publisher.start(registerServiceMap);
        registerRepository.init(publisher, shenyuRegisterCenterConfig);
        return registerRepository;
    }

DisruptorProviderManage#startup(boolean),启动DisruptorProviderManage会创建Disruptor

    public void startup(final boolean isOrderly) {
        OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());
        int newConsumerSize = this.consumerSize;
        EventFactory<DataEvent<T>> eventFactory;
        if (isOrderly) {
            newConsumerSize = 1;
            eventFactory = new OrderlyDisruptorEventFactory<>();
        } else {
            eventFactory = new DisruptorEventFactory<>();
        }
        Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,
                size,
                DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),
                ProducerType.MULTI,
                new BlockingWaitStrategy());
        @SuppressWarnings("all")
        QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];
        for (int i = 0; i < newConsumerSize; i++) {
            consumers[i] = new QueueConsumer<>(executor, consumerFactory);
        }
        disruptor.handleEventsWithWorkerPool(consumers);
        disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
        disruptor.start();
        RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
        provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);
    }

RegisterServerConsumerExecutor用来处理Disruptor中的数据,选择执行器

    @Override
    public void run() {
        Collection<DataTypeParent> results = getData()
                .stream()
                .filter(this::isValidData)
                .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(results)) {
            return;
        }
        selectExecutor(results).executor(results);
    }

MetadataExecutorSubscriber#executor,根据rpcType获取shenyuClientRegisterService

    @Override
    public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
        metaDataRegisterDTOList.forEach(meta -> {
            Optional.ofNullable(this.shenyuClientRegisterService.get(meta.getRpcType()))
                    .ifPresent(shenyuClientRegisterService -> {
                        synchronized (shenyuClientRegisterService) {
                            shenyuClientRegisterService.register(meta);
                        }
                    });
        });
    }

AbstractShenyuClientRegisterServiceImpl#register,注册selectorruleHandlermetadatacontextPath

    @Override
    public String register(final MetaDataRegisterDTO dto) {
        //handler plugin selector
        String selectorHandler = selectorHandler(dto);
        String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
        //handler selector rule
        String ruleHandler = ruleHandler();
        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
        ruleService.registerDefault(ruleDTO);
        //handler register metadata
        registerMetadata(dto);
        //handler context path
        String contextPath = dto.getContextPath();
        if (StringUtils.isNotEmpty(contextPath)) {
            registerContextPath(dto);
        }
        return ShenyuResultMessage.SUCCESS;
    }

RuleServiceImpl#registerDefault,判断数据库中是否存在

    @Override
    public String registerDefault(final RuleDTO ruleDTO) {
        if (Objects.nonNull(ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName()))) {
            return "";
        }
        RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);
        if (StringUtils.isEmpty(ruleDTO.getId())) {
            ruleMapper.insertSelective(ruleDO);
            addCondition(ruleDO, ruleDTO.getRuleConditions());
        }
        ruleEventPublisher.onRegister(ruleDO, ruleDTO.getRuleConditions());
        return ruleDO.getId();
    }

DataChangedEventDispatcher用于监听DataChangedEvent

    public void onApplicationEvent(final DataChangedEvent event) {
        for (DataChangedListener listener : listeners) {
            switch (event.getGroupKey()) {
                case APP_AUTH:
                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
                    break;
                case PLUGIN:
                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
                    break;
                case RULE:
                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
                    break;
                case SELECTOR:
                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                    applicationContext.getBean(LoadServiceDocEntry.class).loadDocOnSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                    break;
                case META_DATA:
                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
            }
        }
    }

AbstractListDataChangedListener#onRuleChanged,规则改变执行该方法。

    @Override
    public void onRuleChanged(final List<RuleData> changed, final DataEventTypeEnum eventType) {
        updateRuleMap(getConfig(changeData.getRuleDataId()));
        switch (eventType) {
            case DELETE:
                changed.forEach(rule -> {
                    List<RuleData> ls = RULE_MAP
                            .getOrDefault(rule.getSelectorId(), new ArrayList<>())
                            .stream()
                            .filter(s -> !s.getId().equals(rule.getId()))
                            .sorted(RULE_DATA_COMPARATOR)
                            .collect(Collectors.toList());
                    RULE_MAP.put(rule.getSelectorId(), ls);
                });
                break;
            case REFRESH:
            case MYSELF:
                Set<String> selectIdSet = changed
                        .stream()
                        .map(RuleData::getSelectorId)
                        .collect(Collectors.toSet());
                RULE_MAP.keySet().removeAll(selectIdSet);
                changed.forEach(rule -> {
                    List<RuleData> ls = new ArrayList<>(RULE_MAP.getOrDefault(rule.getSelectorId(),
                            new ArrayList<>()));
                    ls.add(rule);
                    ls.sort(RULE_DATA_COMPARATOR);
                    RULE_MAP.put(rule.getSelectorId(), ls);
                });
                break;
            default:
                changed.forEach(rule -> {
                    List<RuleData> ls = RULE_MAP
                            .getOrDefault(rule.getSelectorId(), new ArrayList<>())
                            .stream()
                            .filter(s -> !s.getId().equals(rule.getId()))
                            .collect(Collectors.toList());
                    ls.add(rule);
                    ls.sort(RULE_DATA_COMPARATOR);
                    RULE_MAP.put(rule.getSelectorId(), ls);
                });
                break;
        }
        publishConfig(changeData.getRuleDataId(), RULE_MAP);
        LOG.debug("[DataChangedListener] RuleChanged {}", changeData.getRuleDataId());
    }

NacosDataChangedListener#publishConfig,使用nacos发布配置。

    @Override
    public void publishConfig(final String dataId, final Object data) {
        try {
            configService.publishConfig(
                    dataId, 
                    NacosPathConstants.GROUP, 
                    GsonUtils.getInstance().toJson(data),
                    ConfigType.JSON.getType());
        } catch (NacosException e) {
            LOG.error("Publish data to nacos error.", e);
            throw new ShenyuException(e.getMessage());
        }
    }

总结一下

  1. 应用服务往注册中心发布信息,网关服务从注册中心上拉取信息。
  2. 网关服务根据请求匹配选择器和规则,经过一系列的执行链,调用应用服务。相比于gateway,省去了配置服务的工作。
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/482061.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

初识Go语言20-包与工程化【用go mod管理工程、包引入规则、init调用链、可见性】

文章目录 包与工程化用go mod管理工程包引入规则init调用链可见性 包与工程化 用go mod管理工程 初始化项目: go mod init $module_name$module_name和目录名可以不一样。上述命令会生成go.mod文件&#xff0c;该文件内容形如&#xff1a; module go-coursego 1.17require (…

HarmonyOS服务卡片开发-文件组织与配置学习

HarmonyOS服务卡片开发-文件组织与配置学习 1.文件组织 目录结构 JS服务卡片(entry/src/main/js/Component)的典型开发目录结构如下&#xff1a; 目录结构中文件分类如下&#xff1a; .hml结尾的HML模板文件&#xff0c;这个文件用来描述卡片页面的模板布局结构。 .css结…

云计算(Cloud Computing)

一、从技术概念理解云计算 早期的云计算就是虚拟化主机上的分布式计算&#xff0c;现阶段的云计算&#xff0c;已经不单单是一种分布式计算&#xff0c;而是分布式计算、效用计算、负载均衡、并行计算、网络存储、热备份冗杂和虚拟化等计算机技术混合演进并跃升的结果。云计算…

飞书接入ChatGPT,打造属于自己的智能问答助手

文章目录 前言环境列表视频教程1.飞书设置2.克隆feishu-chatgpt项目3.配置config.yaml文件4.运行feishu-chatgpt项目5.安装cpolar内网穿透6.固定公网地址7.机器人权限配置8.创建版本9.创建测试企业10. 机器人测试 转载自远控源码文章&#xff1a;飞书接入ChatGPT - 将ChatGPT集…

太酷了,库昊

昨天晚上凌晨3点30&#xff0c;勇士和国王的第7场比赛开打。 在上一局在勇士主场干翻勇士后&#xff0c;国王队的信心倍增&#xff0c;他们用自己的节奏一次次击溃勇士&#xff0c;特别是今天的前两节&#xff0c;国王能能够回应勇士的进球&#xff0c;防守也更有侵略性。今天不…

Spring第九阶段:Spring的注解功能

注解功能 1、注解配置Dao、Service、Controller组件 通过注解分别创建Dao、Service、Controller Spring配置bean的常用注解有 Controller 配置web层的组件 Service 配置Service组件 Repository 配置Dao组件 Component 配置JavaBean( 除Service , Dao , Controller组件之外的…

台积电和三星都变脸了,美国图谋失败,外媒:拜登心都碎了

美国的图谋甚大&#xff0c;然而美国的图谋却又如此迅速的浮现&#xff0c;看穿美国图谋的台积电和三星两家芯片企业如今不干了&#xff0c;这将破坏美国的计划&#xff0c;让美国无可奈何。 一、美国精心编织的大网 从数年前开始&#xff0c;美国就要求台积电和三星上交机密数…

想学编程但没有好的资料 来这 Python 爬取信息并写入数据,最详细教程

提示一下&#xff1a;我这个中一些用import引用的软件包&#xff0c;你们小白记得要下载哟 不然的话会报错的哟&#xff01; 下载软件包很简单的! 话不多说&#xff0c;直接开始… from selenium.webdriver.common.by import By from selenium import webdriver import re, …

5月2日第壹简报,星期二,农历三月十三

5月2日第壹简报&#xff0c;星期二&#xff0c;农历三月十三坚持阅读&#xff0c;静待花开1. “港车北上”政策公布&#xff1a;6月1日起接受申请&#xff0c;7月1日起可驶入广东&#xff0c;将惠及45万香港车主。2. 全球女性第一人&#xff01;董红娟登顶全部14座8000米级高峰…

nvm 安装 node,配置 yarn,cnpm

nvm 安装 node&#xff0c;配置 yarn, cnpm nvmnodeyarncnpm nvm 是什么&#xff1f;nodejs 的版本管理工具&#xff0c;为了解决 node.js 各种版本存在不兼容现象可以通过它安装和切换不同版本的 node.js重要&#xff1a;完全卸载本地 node下载链接 卸载完成之后&#xff0c;…

初识C++之智能指针

目录 一、智能指针的概念 二、RAII 三、 智能指针的拷贝构造 1. 智能指针的拷贝构造问题 2. C库中的智能指针 2.1 auto_ptr 2.2 unique_ptr 2.3 shared_pt 2.4 weak_ptr 四、shared_ptr的循环引用问题 五、 定制删除器 一、智能指针的概念 在了解智能指针的概念前&…

Java连接与操作Perforce

对于源码控管的基本使用来说&#xff0c; 使用Perforce的客户端工具就可以了&#xff0c; 但是某些应用场景下可能需要使用代码来与Perforce服务器进行交互&#xff0c; 比如&#xff1a; 自动部署流程中的自动取代码&#xff08;该场景一般也可以使用P4命令行工具实现&#x…

Windows共享内存与死锁

实验一 一、实验内容或题目&#xff1a; 利用共享内存完成一个生产者进程和一个消费者进程的同步。 二、实验目的与要求&#xff1a; 1、编写程序&#xff0c;使生产者进程和消费者进程通过共享内存和mutex来完成工作同步。 2、了解通过操作系统接口调用&#xff0c;实现通…

Linux字符设备驱动

前言 代码结构简单&#xff0c;旨在用最简单的原理理解最主要的框架逻辑&#xff0c;细节需要自行延伸。 -----------------学习的基础底层逻辑 基础步骤 开发linux内核驱动需要以下4个步骤&#xff1a; 编写驱动代码编写makefile编译和加载驱动编写应用程序测试驱动 由于硬…

Android9.0 系统Framework发送通知流程分析

1.前言 在android 9.0的系统rom定制化开发中,在systemui中一个重要的内容就是系统通知的展示,在状态栏展示系统发送通知的图标,而在 系统下拉通知栏中展示接收到的系统发送过来的通知,所以说对系统framework中发送通知的流程分析很重要,接下来就来分析下系统 通知从frame…

开发攻城狮必备的Linux虚拟机搭建指南|原创

hi&#xff0c;我是阿笠&#xff01; 这篇文章主要面对的是不常搭建Linux操作系统环境的开发同学&#xff0c;文中介绍了基本操作步骤并且提供了相关云盘资源&#xff0c;都是为了节约时间&#xff01; 因为从我自身来讲&#xff0c;作为一名后端开发&#xff0c;经常需要练习一…

c#笔记-内置类型

内置类型 内置类型是一些有关键字表示的类型。关键字具有非常高的优先级&#xff0c;可以让你在没有别的配置的情况下&#xff0c; 只要用的是c#就可以使用。这也意味着这些类型是非常重要&#xff0c;或是基本的东西。 整数&#xff1a;byte, sbyte, short, ushort, int, ui…

【Python入门】搭建开发环境-安装Pycharm开发工具

前言 &#x1f4d5;作者简介&#xff1a;热爱跑步的恒川&#xff0c;致力于C/C、Java、Python等多编程语言&#xff0c;热爱跑步&#xff0c;喜爱音乐的一位博主。 &#x1f4d7;本文收录于Python零基础入门系列&#xff0c;本专栏主要内容为Python基础语法、判断、循环语句、函…

【数据结构】线性表之单链表(讲解实现——带动图理解)

文章目录 单链表单链表主体结构单链表操作函数介绍单链表操作函数实现单链表的初始化&#xff1a;打印函数单链表插入函数&#xff1a;头插尾插指定结点后插入和查找函数单链表结点之前插入数据 单链表删除函数头删尾删指定结点后删除指定结点删除 销毁单链表 文件分类test.cLi…

【STM32】基础知识 第十课 CubeMx

【STM32】基础知识 第十课 CubeMx STM32 CubeMX 简介安装 JAVACubeMX 安装新建 STM32 CubeMX 工程步骤新建工程时钟模块配置GPIO 配置生成源码 main.c STM32 CubeMX 简介 CubeMX (全称 STM32CubeMX) 是 ST 公司推出的一款用于 STM32 微控制器配置的图形化工具. 它能帮助开发者…