网关启动
在ShenyuConfiguration
注入ShenyuWebHandler
。
@Bean("webHandler")
public ShenyuWebHandler shenyuWebHandler(final ObjectProvider<List<ShenyuPlugin>> plugins, final ShenyuConfig config, @Lazy final ShenyuLoaderService shenyuLoaderService) {
List<ShenyuPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);
List<ShenyuPlugin> shenyuPlugins = pluginList.stream()
.sorted(Comparator.comparingInt(ShenyuPlugin::getOrder)).collect(Collectors.toList());
shenyuPlugins.forEach(shenyuPlugin -> LOG.info("load plugin:[{}] [{}]", shenyuPlugin.named(), shenyuPlugin.getClass().getName()));
return new ShenyuWebHandler(shenyuPlugins, shenyuLoaderService, config);
}
在NacosSyncDataConfiguration
注入用nacos的数据同步服务。
@Bean
public SyncDataService nacosSyncDataService(final ObjectProvider<ConfigService> configService, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
LOGGER.info("you use nacos sync shenyu data.......");
return new NacosSyncDataService(configService.getIfAvailable(), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
该实例的构造方法中会执行NacosSyncDataService#start
public void start() {
watcherData(NacosPathConstants.PLUGIN_DATA_ID, this::updatePluginMap);
watcherData(NacosPathConstants.SELECTOR_DATA_ID, this::updateSelectorMap);
watcherData(NacosPathConstants.RULE_DATA_ID, this::updateRuleMap);
watcherData(NacosPathConstants.META_DATA_ID, this::updateMetaDataMap);
watcherData(NacosPathConstants.AUTH_DATA_ID, this::updateAuthMap);
}
NacosCacheHandler#updatePluginMap
,监听到数据,执行
protected void updatePluginMap(final String configInfo) {
try {
// Fix bug #656(https://github.com/apache/shenyu/issues/656)
List<PluginData> pluginDataList = new ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, PluginData.class).values());
pluginDataList.forEach(pluginData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
subscriber.unSubscribe(pluginData);
subscriber.onSubscribe(pluginData);
}));
} catch (JsonParseException e) {
LOG.error("sync plugin data have error:", e);
}
}
AbstractDubboPluginDataHandler#handlerPlugin
,处理器,把数据存储到ApacheDubboConfigCache
public abstract class AbstractDubboPluginDataHandler implements PluginDataHandler {
public static final Supplier<CommonHandleCache<String, DubboRuleHandle>> RULE_CACHED_HANDLE = new BeanHolder<>(CommonHandleCache::new);
public static final Supplier<CommonHandleCache<String, List<DubboUpstream>>> SELECTOR_CACHED_HANDLE = new BeanHolder<>(CommonHandleCache::new);
protected abstract void initConfigCache(DubboRegisterConfig dubboRegisterConfig);
@Override
public void handlerPlugin(final PluginData pluginData) {
if (Objects.nonNull(pluginData) && Boolean.TRUE.equals(pluginData.getEnabled())) {
DubboRegisterConfig dubboRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), DubboRegisterConfig.class);
DubboRegisterConfig exist = Singleton.INST.get(DubboRegisterConfig.class);
if (Objects.isNull(dubboRegisterConfig)) {
return;
}
if (Objects.isNull(exist) || !dubboRegisterConfig.equals(exist)) {
// If it is null, initialize it
this.initConfigCache(dubboRegisterConfig);
}
Singleton.INST.single(DubboRegisterConfig.class, dubboRegisterConfig);
}
}
}
ApacheDubboPluginDataHandler
public class ApacheDubboPluginDataHandler extends AbstractDubboPluginDataHandler {
@Override
protected void initConfigCache(final DubboRegisterConfig dubboRegisterConfig) {
ApacheDubboConfigCache.getInstance().init(dubboRegisterConfig);
ApacheDubboConfigCache.getInstance().invalidateAll();
}
}
服务调用
ShenyuWebHandler.DefaultShenyuPluginChain#execute
,执行每个插件.。
@Override
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
ShenyuPlugin plugin = plugins.get(this.index++);
boolean skip = plugin.skip(exchange);
if (skip) {
return this.execute(exchange);
}
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
AbstractShenyuPlugin#execute
,匹配选择器和插件。
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
initCacheConfig();
final String pluginName = named();
PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
// early exit
if (Objects.isNull(pluginData) || !pluginData.getEnabled()) {
return chain.execute(exchange);
}
final String path = exchange.getRequest().getURI().getPath();
List<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
SelectorData selectorData = obtainSelectorDataCacheIfEnabled(path);
// handle Selector
if (Objects.nonNull(selectorData) && StringUtils.isBlank(selectorData.getId())) {
return handleSelectorIfNull(pluginName, exchange, chain);
}
if (Objects.isNull(selectorData)) {
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIfNull(pluginName, exchange, chain);
}
Pair<Boolean, SelectorData> matchSelectorData = matchSelector(exchange, selectors);
selectorData = matchSelectorData.getRight();
if (Objects.isNull(selectorData)) {
if (matchCacheConfig.getSelector().getSelectorEnabled() && matchSelectorData.getLeft()) {
selectorData = new SelectorData();
selectorData.setPluginName(pluginName);
cacheSelectorData(path, selectorData);
}
return handleSelectorIfNull(pluginName, exchange, chain);
} else {
if (matchCacheConfig.getSelector().getSelectorEnabled() && matchSelectorData.getLeft()) {
cacheSelectorData(path, selectorData);
}
}
}
printLog(selectorData, pluginName);
if (Objects.nonNull(selectorData.getContinued()) && !selectorData.getContinued()) {
// if continued, not match rules
return doExecute(exchange, chain, selectorData, defaultRuleData(selectorData));
}
List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
if (CollectionUtils.isEmpty(rules)) {
return handleRuleIfNull(pluginName, exchange, chain);
}
RuleData ruleData = obtainRuleDataCache(path);
if (Objects.nonNull(ruleData) && Objects.isNull(ruleData.getId())) {
return handleRuleIfNull(pluginName, exchange, chain);
}
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//get last
RuleData rule = rules.get(rules.size() - 1);
printLog(rule, pluginName);
return doExecute(exchange, chain, selectorData, rule);
} else {
// lru map as L1 cache,the cache is enabled by default.
// if the L1 cache fails to hit, using L2 cache based on trie cache.
// if the L2 cache fails to hit, execute default strategy.
if (Objects.isNull(ruleData)) {
// L1 cache not exist data, try to get data through trie cache
ruleData = trieMatchRule(exchange, selectorData, path);
// trie cache fails to hit, execute default strategy
if (Objects.isNull(ruleData)) {
LOG.info("{} rule match path from default strategy", named());
Pair<Boolean, RuleData> matchRuleData = matchRule(exchange, rules);
ruleData = matchRuleData.getRight();
if (matchRuleData.getLeft()) {
ruleData = Optional.ofNullable(ruleData)
.orElse(RuleData.builder().pluginName(pluginName).matchRestful(false).build());
cacheRuleData(path, ruleData);
}
}
}
}
if (Objects.isNull(ruleData) || Objects.isNull(ruleData.getId())) {
return handleRuleIfNull(pluginName, exchange, chain);
}
printLog(ruleData, pluginName);
return doExecute(exchange, chain, selectorData, ruleData);
}
GlobalPlugin
构建上下文信息
public class GlobalPlugin implements ShenyuPlugin {
private final ShenyuContextBuilder builder;
/**
* Instantiates a new Global plugin.
*
* @param builder the builder
*/
public GlobalPlugin(final ShenyuContextBuilder builder) {
this.builder = builder;
}
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
ShenyuContext shenyuContext = builder.build(exchange);
exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
return chain.execute(exchange);
}
}
DefaultShenyuContextBuilder#build
,构建上下文
@Override
public ShenyuContext build(final ServerWebExchange exchange) {
Pair<String, MetaData> buildData = buildData(exchange);
return decoratorMap.get(buildData.getLeft()).decorator(buildDefaultContext(exchange.getRequest()), buildData.getRight());
}
private Pair<String, MetaData> buildData(final ServerWebExchange exchange) {
ServerHttpRequest request = exchange.getRequest();
HttpHeaders headers = request.getHeaders();
String rpcType = headers.getFirst(RPC_TYPE);
if (StringUtils.isNotEmpty(rpcType)) {
return Pair.of(rpcType, new MetaData());
}
String upgrade = headers.getFirst(UPGRADE);
if (StringUtils.isNotEmpty(upgrade) && RpcTypeEnum.WEB_SOCKET.getName().equals(upgrade)) {
return Pair.of(RpcTypeEnum.WEB_SOCKET.getName(), new MetaData());
}
MetaData metaData = MetaDataCache.getInstance().obtain(request.getURI().getPath());
if (Objects.nonNull(metaData) && Boolean.TRUE.equals(metaData.getEnabled())) {
exchange.getAttributes().put(Constants.META_DATA, metaData);
return Pair.of(metaData.getRpcType(), metaData);
} else {
return Pair.of(RpcTypeEnum.HTTP.getName(), new MetaData());
}
}
RpcParamTransformPlugin
用于获取参数信息
public class RpcParamTransformPlugin implements ShenyuPlugin {
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
ServerHttpRequest request = exchange.getRequest();
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
if (Objects.nonNull(shenyuContext)) {
MediaType mediaType = request.getHeaders().getContentType();
if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
return body(exchange, request, chain);
}
if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {
return formData(exchange, request, chain);
}
return query(exchange, request, chain);
}
return chain.execute(exchange);
}
}
AbstractDubboPlugin#doExecute
,检查元数据和参数,进行方法调用
@Override
public Mono<Void> doExecute(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final SelectorData selector,
final RuleData rule) {
String param = exchange.getAttribute(Constants.PARAM_TRANSFORM);
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
MetaData metaData = exchange.getAttribute(Constants.META_DATA);
if (!checkMetaData(metaData)) {
LOG.error(" path is : {}, meta data have error : {}", shenyuContext.getPath(), metaData);
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.META_DATA_ERROR);
return WebFluxResultUtils.result(exchange, error);
}
if (Objects.nonNull(metaData) && StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(param)) {
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.DUBBO_HAVE_BODY_PARAM);
return WebFluxResultUtils.result(exchange, error);
}
this.rpcContext(exchange);
return this.doDubboInvoker(exchange, chain, selector, rule, metaData, param);
}
ApacheDubboPlugin#doDubboInvoker
,开始dubbo的泛化调用。
@Override
protected Mono<Void> doDubboInvoker(final ServerWebExchange exchange,
final ShenyuPluginChain chain,
final SelectorData selector,
final RuleData rule,
final MetaData metaData,
final String param) {
RpcContext.getContext().setAttachment(Constants.DUBBO_SELECTOR_ID, selector.getId());
RpcContext.getContext().setAttachment(Constants.DUBBO_RULE_ID, rule.getId());
RpcContext.getContext().setAttachment(Constants.DUBBO_REMOTE_ADDRESS, Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress());
final Mono<Object> result = dubboProxyService.genericInvoker(param, metaData, exchange);
return result.then(chain.execute(exchange));
}
ApacheDubboProxyService#genericInvoker
- 获取
ReferenceConfig
对象; - 获取泛化服务
GenericService
对象; - 构造请求参数
pair
对象; - 发起异步的泛化调用。
public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws ShenyuException {
String referenceKey = metaData.getPath();
String namespace = "";
if (CollectionUtils.isNotEmpty(exchange.getRequest().getHeaders().get(Constants.NAMESPACE))) {
namespace = exchange.getRequest().getHeaders().get(Constants.NAMESPACE).get(0);
referenceKey = namespace + ":" + referenceKey;
}
ReferenceConfig<GenericService> reference = ApacheDubboConfigCache.getInstance().get(referenceKey);
if (StringUtils.isEmpty(reference.getInterface())) {
ApacheDubboConfigCache.getInstance().invalidate(referenceKey);
reference = ApacheDubboConfigCache.getInstance().initRefN(metaData, namespace);
}
GenericService genericService = reference.get();
Pair<String[], Object[]> pair;
if (StringUtils.isBlank(metaData.getParameterTypes()) || ParamCheckUtils.bodyIsEmpty(body)) {
pair = new ImmutablePair<>(new String[]{}, new Object[]{});
} else {
pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());
}
return Mono.fromFuture(invokeAsync(genericService, metaData.getMethodName(), pair.getLeft(), pair.getRight()).thenApply(ret -> {
if (Objects.isNull(ret)) {
ret = Constants.DUBBO_RPC_RESULT_EMPTY;
}
exchange.getAttributes().put(Constants.RPC_RESULT, ret);
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
return ret;
})).onErrorMap(exception -> exception instanceof GenericException ? new ShenyuException(((GenericException) exception).getExceptionMessage()) : new ShenyuException(exception));
}
@SuppressWarnings("unchecked")
private CompletableFuture<Object> invokeAsync(final GenericService genericService, final String method, final String[] parameterTypes, final Object[] args) throws GenericException {
//Compatible with asynchronous calls of lower Dubbo versions
genericService.$invoke(method, parameterTypes, args);
Object resultFromFuture = RpcContext.getContext().getFuture();
return resultFromFuture instanceof CompletableFuture ? (CompletableFuture<Object>) resultFromFuture : CompletableFuture.completedFuture(resultFromFuture);
}
ResponsePlugin#execute
,处理响应结果
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) {
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
assert shenyuContext != null;
return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain);
}
服务注册
启动shenyu-examples
里面的shenyu-examples-apache-dubbo-service-annotation
。
AbstractContextRefreshedEventListener
- 读取属性配置
- 开启线程池
- 启动注册中心,用于向
shenyu-admin
注册
public AbstractContextRefreshedEventListener(final PropertiesConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
Properties props = clientConfig.getProps();
this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
this.contextPath = Optional.ofNullable(props.getProperty(ShenyuClientConstants.CONTEXT_PATH)).map(UriUtils::repairData).orElse("");
if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {
String errorMsg = "client register param must config the appName or contextPath";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
this.ipAndPort = props.getProperty(ShenyuClientConstants.IP_PORT);
this.host = props.getProperty(ShenyuClientConstants.HOST);
this.port = props.getProperty(ShenyuClientConstants.PORT);
publisher.start(shenyuClientRegisterRepository);
}
AbstractContextRefreshedEventListener#onApplicationEvent
,获取所有的ServiceBean
的Bean,处理元数据对象和处理URI对象。
@Override
public void onApplicationEvent(@NonNull final ContextRefreshedEvent event) {
final ApplicationContext context = event.getApplicationContext();
Map<String, T> beans = getBeans(context);
if (MapUtils.isEmpty(beans)) {
return;
}
if (!registered.compareAndSet(false, true)) {
return;
}
publisher.publishEvent(buildURIRegisterDTO(context, beans));
beans.forEach(this::handle);
Map<String, Object> apiModules = context.getBeansWithAnnotation(ApiModule.class);
apiModules.forEach((k, v) -> handleApiDoc(v, beans));
}
ShenyuClientRegisterEventPublisher#publishEvent
,使用disruptor来处理事件。
public void publishEvent(final DataTypeParent data) {
DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
provider.onData(data);
}
在RegisterCenterConfiguration
注入ShenyuClientServerRegisterRepository
@Bean(destroyMethod = "close")
public ShenyuClientServerRegisterRepository shenyuClientServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig,
final List<ShenyuClientRegisterService> shenyuClientRegisterService) {
String registerType = shenyuRegisterCenterConfig.getRegisterType();
ShenyuClientServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuClientServerRegisterRepository.class).getJoin(registerType);
RegisterClientServerDisruptorPublisher publisher = RegisterClientServerDisruptorPublisher.getInstance();
Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, Function.identity()));
publisher.start(registerServiceMap);
registerRepository.init(publisher, shenyuRegisterCenterConfig);
return registerRepository;
}
DisruptorProviderManage#startup(boolean)
,启动DisruptorProviderManage
会创建Disruptor
public void startup(final boolean isOrderly) {
OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());
int newConsumerSize = this.consumerSize;
EventFactory<DataEvent<T>> eventFactory;
if (isOrderly) {
newConsumerSize = 1;
eventFactory = new OrderlyDisruptorEventFactory<>();
} else {
eventFactory = new DisruptorEventFactory<>();
}
Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,
size,
DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),
ProducerType.MULTI,
new BlockingWaitStrategy());
@SuppressWarnings("all")
QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];
for (int i = 0; i < newConsumerSize; i++) {
consumers[i] = new QueueConsumer<>(executor, consumerFactory);
}
disruptor.handleEventsWithWorkerPool(consumers);
disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
disruptor.start();
RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);
}
RegisterServerConsumerExecutor
用来处理Disruptor
中的数据,选择执行器
@Override
public void run() {
Collection<DataTypeParent> results = getData()
.stream()
.filter(this::isValidData)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(results)) {
return;
}
selectExecutor(results).executor(results);
}
MetadataExecutorSubscriber#executor
,根据rpcType
获取shenyuClientRegisterService
。
@Override
public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
metaDataRegisterDTOList.forEach(meta -> {
Optional.ofNullable(this.shenyuClientRegisterService.get(meta.getRpcType()))
.ifPresent(shenyuClientRegisterService -> {
synchronized (shenyuClientRegisterService) {
shenyuClientRegisterService.register(meta);
}
});
});
}
AbstractShenyuClientRegisterServiceImpl#register
,注册selector
,ruleHandler
,metadata
和contextPath
@Override
public String register(final MetaDataRegisterDTO dto) {
//handler plugin selector
String selectorHandler = selectorHandler(dto);
String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
//handler selector rule
String ruleHandler = ruleHandler();
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
ruleService.registerDefault(ruleDTO);
//handler register metadata
registerMetadata(dto);
//handler context path
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
RuleServiceImpl#registerDefault
,判断数据库中是否存在
@Override
public String registerDefault(final RuleDTO ruleDTO) {
if (Objects.nonNull(ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName()))) {
return "";
}
RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);
if (StringUtils.isEmpty(ruleDTO.getId())) {
ruleMapper.insertSelective(ruleDO);
addCondition(ruleDO, ruleDTO.getRuleConditions());
}
ruleEventPublisher.onRegister(ruleDO, ruleDTO.getRuleConditions());
return ruleDO.getId();
}
DataChangedEventDispatcher
用于监听DataChangedEvent
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
applicationContext.getBean(LoadServiceDocEntry.class).loadDocOnSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
AbstractListDataChangedListener#onRuleChanged
,规则改变执行该方法。
@Override
public void onRuleChanged(final List<RuleData> changed, final DataEventTypeEnum eventType) {
updateRuleMap(getConfig(changeData.getRuleDataId()));
switch (eventType) {
case DELETE:
changed.forEach(rule -> {
List<RuleData> ls = RULE_MAP
.getOrDefault(rule.getSelectorId(), new ArrayList<>())
.stream()
.filter(s -> !s.getId().equals(rule.getId()))
.sorted(RULE_DATA_COMPARATOR)
.collect(Collectors.toList());
RULE_MAP.put(rule.getSelectorId(), ls);
});
break;
case REFRESH:
case MYSELF:
Set<String> selectIdSet = changed
.stream()
.map(RuleData::getSelectorId)
.collect(Collectors.toSet());
RULE_MAP.keySet().removeAll(selectIdSet);
changed.forEach(rule -> {
List<RuleData> ls = new ArrayList<>(RULE_MAP.getOrDefault(rule.getSelectorId(),
new ArrayList<>()));
ls.add(rule);
ls.sort(RULE_DATA_COMPARATOR);
RULE_MAP.put(rule.getSelectorId(), ls);
});
break;
default:
changed.forEach(rule -> {
List<RuleData> ls = RULE_MAP
.getOrDefault(rule.getSelectorId(), new ArrayList<>())
.stream()
.filter(s -> !s.getId().equals(rule.getId()))
.collect(Collectors.toList());
ls.add(rule);
ls.sort(RULE_DATA_COMPARATOR);
RULE_MAP.put(rule.getSelectorId(), ls);
});
break;
}
publishConfig(changeData.getRuleDataId(), RULE_MAP);
LOG.debug("[DataChangedListener] RuleChanged {}", changeData.getRuleDataId());
}
NacosDataChangedListener#publishConfig
,使用nacos发布配置。
@Override
public void publishConfig(final String dataId, final Object data) {
try {
configService.publishConfig(
dataId,
NacosPathConstants.GROUP,
GsonUtils.getInstance().toJson(data),
ConfigType.JSON.getType());
} catch (NacosException e) {
LOG.error("Publish data to nacos error.", e);
throw new ShenyuException(e.getMessage());
}
}
总结一下
- 应用服务往注册中心发布信息,网关服务从注册中心上拉取信息。
- 网关服务根据请求匹配选择器和规则,经过一系列的执行链,调用应用服务。相比于gateway,省去了配置服务的工作。