Sentinel规则持久化Push模式两种实现方式

news2025/1/16 1:02:22

文章目录

  • sentinel持久化push推模式
    • 微服务端的实现
      • 具体实现
      • 源码分析
        • 读数据源
        • 写数据源的实现
      • 微服务端解析读数据源流程
    • 修改源码的实现
      • 官方demo
      • 修改源码实现
        • 配置类
        • flow
        • authority
        • degread
        • param
        • system
        • gateway
        • 修改源码
      • 测试
      • 补充


前置知识 pull模式


sentinel持久化push推模式

pull拉模式的缺点,以保存本地文件举例:

  • 定时任务是每隔3s执行一次,去判断规则持久化文件的最后修改时间。这里有一定时间的延迟,但如果时间设置的太短,有影响服务器的性能
  • 我们的微服务是集群部署的,其他服务实例可读取不到我这台服务器的本地文件



所以还有一种push推送模式。我们一般会引入第三方中间件来实现,以Nacos为例。我们修改了nacos中的配置,它就会将更新后的数据推送给微服务。



push模式有两种实现方式:

  • 在微服务端添加读数据源,为dataId添加监听器,当规则配置文件更改之后我就获取到更改后的规则内存并更新内存中的数据;再添加一个写数据源,每当dashboard中更新了规则,我除了更新内存中的数据之外,我通过ConfigService.publishConfig()方法还往Nacos端进行写入

  • 在dashboard源码中进行更改,在获取规则内容、更新规则内容的接口中,不要和微服务端进行交互,直接去和Nacos通信,通过ConfigService.publishConfig()ConfigService.getConfig()来实现。这种方式主要注意dashboard端的规则实体对象和微服务端的规则实体对象不一致问题,需要经过转换相关的操作。sentinel默认情况下就直接把规则实体转换为json字符串推送给Nacos,Nacos配置文件更改了,又推送给微服务,微服务这边再把json字符串转换为规则实体对象这一步就会发现,转换失败了,某些属性对应不上。进而就导致了dashboard端设置的规则在微服务这边未生效。



微服务端的实现

具体实现

引入读数据源的依赖

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-datasource-nacos</artifactId>
</dependency>



配置文件中添加规则持久化的dataId

server:
  port: 8806

spring:
  application:
    name: mall-user-sentinel-rule-push  #微服务名称
    
  #配置nacos注册中心地址
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848

    sentinel:
      transport:
        # 添加sentinel的控制台地址
        dashboard: 127.0.0.1:8080
      datasource:
        # 名称自定义,可以随便定义字符串
        flow-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            # dataId取了微服务名字,后面再拼接字符串
            dataId: ${spring.application.name}-flow-rules
            # 我这里在Nacos配置中心,单独使用了一个组
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: flow
        degrade-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-degrade-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: degrade
        param-flow-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-param-flow-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: param-flow
        authority-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-authority-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: authority
        system-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-system-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: system



在Nacos配置中心中创建对应的配置文件

在这里插入图片描述



编写java类,定义写数据源

import com.alibaba.cloud.sentinel.SentinelProperties;
import com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 添加往Nacos的写数据源,只不过未使用InitFunc
 * 如果要使用就需要放开注解
 */
@Configuration(proxyBeanMethods = false)
@AutoConfigureAfter(SentinelAutoConfiguration.class)
public class SentinelNacosDataSourceConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public SentinelNacosDataSourceHandler sentinelNacosDataSourceHandler(SentinelProperties sentinelProperties) {
        return new SentinelNacosDataSourceHandler(sentinelProperties);
    }
}
import com.alibaba.cloud.sentinel.SentinelProperties;
import com.alibaba.cloud.sentinel.datasource.RuleType;
import com.alibaba.cloud.sentinel.datasource.config.DataSourcePropertiesConfiguration;
import com.alibaba.cloud.sentinel.datasource.config.NacosDataSourceProperties;
import com.alibaba.csp.sentinel.command.handler.ModifyParamFlowRulesCommandHandler;
import com.alibaba.csp.sentinel.datasource.WritableDataSource;
import com.alibaba.csp.sentinel.slots.block.authority.AuthorityRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule;
import com.alibaba.csp.sentinel.slots.system.SystemRule;
import com.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry;
import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.SmartInitializingSingleton;

import java.util.List;

/**
 * sentinel 规则持久化到 nacos配置中心
 */
public class SentinelNacosDataSourceHandler implements SmartInitializingSingleton {

    private final SentinelProperties sentinelProperties;

    public SentinelNacosDataSourceHandler(SentinelProperties sentinelProperties) {
        this.sentinelProperties = sentinelProperties;
    }

    @Override
    public void afterSingletonsInstantiated() {
        // 遍历我们配置文件中指定的多个spring.cloud.sentinel.datasource的多个配置
        sentinelProperties.getDatasource().values().forEach(this::registryWriter);
    }

    private void registryWriter(DataSourcePropertiesConfiguration dataSourceProperties) {
        // 只获取application.yml文件中 nacos配置的数据源
        final NacosDataSourceProperties nacosDataSourceProperties = dataSourceProperties.getNacos();
        if (nacosDataSourceProperties == null) {
            return;
        }
        // 获取规则类型,然后根据各个类型创建相应的写数据源
        final RuleType ruleType = nacosDataSourceProperties.getRuleType();
        switch (ruleType) {
            case FLOW:
                WritableDataSource<List<FlowRule>> flowRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, JSON::toJSONString);
                WritableDataSourceRegistry.registerFlowDataSource(flowRuleWriter);
                break;
            case DEGRADE:
                WritableDataSource<List<DegradeRule>> degradeRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, JSON::toJSONString);
                WritableDataSourceRegistry.registerDegradeDataSource(degradeRuleWriter);
                break;
            case PARAM_FLOW:
                WritableDataSource<List<ParamFlowRule>> paramFlowRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, JSON::toJSONString);
                ModifyParamFlowRulesCommandHandler.setWritableDataSource(paramFlowRuleWriter);
                break;
            case SYSTEM:
                WritableDataSource<List<SystemRule>> systemRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, JSON::toJSONString);
                WritableDataSourceRegistry.registerSystemDataSource(systemRuleWriter);
                break;
            case AUTHORITY:
                WritableDataSource<List<AuthorityRule>> authRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, JSON::toJSONString);
                WritableDataSourceRegistry.registerAuthorityDataSource(authRuleWriter);
                break;
            default:
                break;
        }
    }
}
import com.alibaba.cloud.sentinel.datasource.config.NacosDataSourceProperties;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.WritableDataSource;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;

import java.util.Properties;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 将sentinel规则写入到nacos配置中心
 * @param <T>
 */
@Slf4j
public class NacosWritableDataSource<T> implements WritableDataSource<T> {

    private final Converter<T, String> configEncoder;
    private final NacosDataSourceProperties nacosDataSourceProperties;

    private final Lock lock = new ReentrantLock(true);
    private ConfigService configService = null;

    public NacosWritableDataSource(NacosDataSourceProperties nacosDataSourceProperties, Converter<T, String> configEncoder) {
        if (configEncoder == null) {
            throw new IllegalArgumentException("Config encoder cannot be null");
        }
        if (nacosDataSourceProperties == null) {
            throw new IllegalArgumentException("Config nacosDataSourceProperties cannot be null");
        }
        this.configEncoder = configEncoder;
        this.nacosDataSourceProperties = nacosDataSourceProperties;
        final Properties properties = buildProperties(nacosDataSourceProperties);
        try {
            // 也可以直接注入NacosDataSource,然后反射获取其configService属性
            this.configService = NacosFactory.createConfigService(properties);
        } catch (NacosException e) {
            log.error("create configService failed.", e);
        }
    }

    private Properties buildProperties(NacosDataSourceProperties nacosDataSourceProperties) {
        Properties properties = new Properties();
        if (!StringUtils.isEmpty(nacosDataSourceProperties.getServerAddr())) {
            properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosDataSourceProperties.getServerAddr());
        } else {
            properties.setProperty(PropertyKeyConst.ACCESS_KEY, nacosDataSourceProperties.getAccessKey());
            properties.setProperty(PropertyKeyConst.SECRET_KEY, nacosDataSourceProperties.getSecretKey());
            properties.setProperty(PropertyKeyConst.ENDPOINT, nacosDataSourceProperties.getEndpoint());
        }
        if (!StringUtils.isEmpty(nacosDataSourceProperties.getNamespace())) {
            properties.setProperty(PropertyKeyConst.NAMESPACE, nacosDataSourceProperties.getNamespace());
        }
        if (!StringUtils.isEmpty(nacosDataSourceProperties.getUsername())) {
            properties.setProperty(PropertyKeyConst.USERNAME, nacosDataSourceProperties.getUsername());
        }
        if (!StringUtils.isEmpty(nacosDataSourceProperties.getPassword())) {
            properties.setProperty(PropertyKeyConst.PASSWORD, nacosDataSourceProperties.getPassword());
        }
        return properties;
    }

    @Override
    public void write(T value) throws Exception {
        lock.lock();
        // todo handle cluster concurrent problem
        try {
            String convertResult = configEncoder.convert(value);
            if (configService == null) {
                log.error("configServer is null, can not continue.");
                return;
            }
            // 规则配置数据推送到nacos配置中心
            final boolean published = configService.publishConfig(nacosDataSourceProperties.getDataId(), nacosDataSourceProperties.getGroupId(), convertResult);
            if (!published) {
                log.error("sentinel {} publish to nacos failed.", nacosDataSourceProperties.getRuleType());
            }
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void close() throws Exception {

    }
}



启动微服务进行测试。

dashboard中为某个接口定义一个流控规则

在这里插入图片描述


调用接口测试,发送三次请求

在这里插入图片描述



查看Nacos中的配置文件,就会发现也成功写入了

在这里插入图片描述



源码分析

读数据源

引入读数据源的依赖,我们来看看具体是怎么实现的

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-datasource-nacos</artifactId>
</dependency>



实现思路:

  • 和文件的读数据源一样,继承了AbstractDataSource类,这样就不需要我们再去写一遍加载配置、更新内存中的配置

在源码中的这个扩展包下面,就有nacos读数据源的实现

在这里插入图片描述



我们先看看NacosDataSource类的父类的代码

  • 创建一个DynamicSentinelProperty对象,主要作用是更新内存中的规则配置
  • 加载配置、解析配置
public abstract class AbstractDataSource<S, T> implements ReadableDataSource<S, T> {

    protected final Converter<S, T> parser;
    protected final SentinelProperty<T> property;

    public AbstractDataSource(Converter<S, T> parser) {
        if (parser == null) {
            throw new IllegalArgumentException("parser can't be null");
        }
        // 子类传过来的解析器
        this.parser = parser;
        // 更新内存中的配置
        // 我们会经常看见 getProperty().updateValue(newValue); 这样的代码
        this.property = new DynamicSentinelProperty<T>();
    }

    @Override
    public T loadConfig() throws Exception {
        // 调用子类的readSource()方法,一般会得到一个String,
        // 在通过解析器Converter 并解析配置转换成对应的对象
        return loadConfig(readSource());
    }

    public T loadConfig(S conf) throws Exception {
        // 解析配置
        T value = parser.convert(conf);
        return value;
    }

    @Override
    public SentinelProperty<T> getProperty() {
        return property;
    }
}



读配置源的具体实现:

  • 通过Nacos的serverAddr构建一个Properties对象,该对象会用于初始化ConfigService接口的对象
  • 利用线程池中唯一一个线程,创建一个监听器,监听dataId,当配置中心的配置更改后就会调用微服务客户端,微服务客户端这边有一个while+阻塞队列实现的轮询机制,它调用监听器的方法,监听器里面会更新内存中的规则配置
  • 初始化configService对象,并通过configService.addListener(…)为指定的dataId添加监听器
  • 微服务刚启动会调用父类的loadConfig()方法,父类最终又会调用本类中的readSource()方法得到配置中心中的数据,并进行解析;再更新内存中的规则配置
public class NacosDataSource<T> extends AbstractDataSource<String, T> {

    private static final int DEFAULT_TIMEOUT = 3000;
	// 创建一个只有一个线程的线程池,用来执行dataId的监听器
    private final ExecutorService pool = new ThreadPoolExecutor(...);

    private final Listener configListener;
    private final String groupId;
    private final String dataId;
    private final Properties properties;

    private ConfigService configService = null;


    public NacosDataSource(final String serverAddr, final String groupId, final String dataId,Converter<String, T> parser) {
        this(NacosDataSource.buildProperties(serverAddr), groupId, dataId, parser);
    }


    public NacosDataSource(final Properties properties, final String groupId, final String dataId,Converter<String, T> parser) {
        super(parser);
        if (StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) {
            throw new IllegalArgumentException(...);
        }
        AssertUtil.notNull(properties, "Nacos properties must not be null, you could put some keys from PropertyKeyConst");
        this.groupId = groupId;
        this.dataId = dataId;
        this.properties = properties;
        // 创建一个监听器
        this.configListener = new Listener() {
            @Override
            public Executor getExecutor() {
                return pool;
            }

            @Override
            public void receiveConfigInfo(final String configInfo) {
                RecordLog.info(...);
                // 通过转换器进行转换
                T newValue = NacosDataSource.this.parser.convert(configInfo);
                // 调用父类的SentinelProperty对象,更新内存中的规则配置
                getProperty().updateValue(newValue);
            }
        };
        // 初始化configService对象,并通过configService.addListener(..)为指定的dataId添加监听器
        initNacosListener();
        // 微服务刚启动,会从Nacos配置中心加载一次配置
        loadInitialConfig();
    }

    private void loadInitialConfig() {
        try {
            // 调用父类的loadConfig()  父类最终又会调用本类中的readSource()方法得到配置中心中的数据,并进行解析
            T newValue = loadConfig();
            if (newValue == null) {
                RecordLog.warn("[NacosDataSource] WARN: initial config is null, you may have to check your data source");
            }
            // 调用父类的SentinelProperty对象,更新内存中的规则配置
            getProperty().updateValue(newValue);
        } catch (Exception ex) {
            RecordLog.warn("[NacosDataSource] Error when loading initial config", ex);
        }
    }

    private void initNacosListener() {
        try {
            // 初始化configService对象
            this.configService = NacosFactory.createConfigService(this.properties);
            // Add config listener.
            // 通过configService.addListener(..)为指定的dataId添加监听器
            configService.addListener(dataId, groupId, configListener);
        } catch (Exception e) {
            RecordLog.warn("[NacosDataSource] Error occurred when initializing Nacos data source", e);
            e.printStackTrace();
        }
    }

    @Override
    public String readSource() throws Exception {
        if (configService == null) {
            throw new IllegalStateException("Nacos config service has not been initialized or error occurred");
        }
        // 通过ConfigService接口中的getConfig()方法,从Nacos配置中心获取配置
        return configService.getConfig(dataId, groupId, DEFAULT_TIMEOUT);
    }

    @Override
    public void close() {
        if (configService != null) {
            configService.removeListener(dataId, groupId, configListener);
        }
        pool.shutdownNow();
    }

    private static Properties buildProperties(String serverAddr) {
        // 构建一个Properties对象,该对象会在初始化ConfigService时会用上
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverAddr);
        return properties;
    }
}



写数据源的实现

写数据源源码实现流程相对简单。我们知道dashboard更新配置后调用微服务端,微服务这边的ModifyRulesCommandHandler类会处理规则更改的请求。这里会有一个写数据源相关的操作

// 注意name = "setRules",这就是控制台请求服务端的url路径
@CommandMapping(name = "setRules", desc = "modify the rules, accept param: type={ruleType}&data={ruleJson}")
public class ModifyRulesCommandHandler implements CommandHandler<String> {

    public CommandResponse<String> handle(CommandRequest request) {

        //......

        // 处理流控规则
        if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {
            List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);
            FlowRuleManager.loadRules(flowRules);
            // 关键一步,这里会有一个写数据源的操作。默认情况下是没有WritableDataSource,我们可以在这里进行扩展
            if (!writeToDataSource(getFlowDataSource(), flowRules)) {
                result = WRITE_DS_FAILURE_MSG;
            }
            return CommandResponse.ofSuccess(result);

            // 处理权限规则
        } else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {
            ...

            // 处理熔断规则
        } else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {
            ...

            // 处理系统规则
        } else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {
            ...
        }
        return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
    }
}



所以我们要做的事情就是创建一个写数据源,并进行注册写数据源WritableDataSourceRegistry。我们先来看看源码中的Demo,通过读写文件的方式实现的读写数据源。

public void init() throws Exception {
    // 文件保存路径
    String flowRuleDir = System.getProperty("user.home") + File.separator + "sentinel" + File.separator + "rules";
    String flowRuleFile = "flowRule.json";
    String flowRulePath = flowRuleDir + File.separator + flowRuleFile;

    // 添加读数据源
    ReadableDataSource<String, List<FlowRule>> ds = new FileRefreshableDataSource<>(
        flowRulePath, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {})
    );
    FlowRuleManager.register2Property(ds.getProperty());

    // 添加写数据源
    WritableDataSource<List<FlowRule>> wds = new FileWritableDataSource<>(flowRulePath, this::encodeJson);
    WritableDataSourceRegistry.registerFlowDataSource(wds);
}



我在定义一个往Nacos的写数据源,一个简单的实现,具体项目中能用的请参考上面 <具体实现>一章 。这里只是用更少的代码来理解nacos的写数据源

import com.alibaba.csp.sentinel.datasource.WritableDataSource;
import com.alibaba.csp.sentinel.init.InitFunc;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry;
import com.alibaba.fastjson.JSON;

import java.util.List;


public class NacosDataSourceInitFunc implements InitFunc {

    @Override
    public void init() throws Exception {
        //流控规则
        WritableDataSource<List<FlowRule>> writableDataSource = new NacosWritableDataSource<>(
                "127.0.0.1:8848", "DEFAULT_GROUP", "mall-user-sentinel-rule-push-demo-flow", JSON::toJSONString);
        WritableDataSourceRegistry.registerFlowDataSource(writableDataSource);
    }
}
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.WritableDataSource;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.ConfigType;
import com.alibaba.nacos.api.exception.NacosException;

import java.util.Properties;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


public class NacosWritableDataSource<T> implements WritableDataSource<T> {

    private final String serverAddr;
    private final String groupId;
    private final String dataId;
    private final Properties properties;
    private ConfigService configService;
    private final Converter<T, String> configEncoder;
    private final Lock lock = new ReentrantLock(true);

    public NacosWritableDataSource(String serverAddr, String groupId, String dataId, Converter<T, String> configEncoder) {
        this.serverAddr = serverAddr;
        this.groupId = groupId;
        this.dataId = dataId;
        // 通过serverAddr构建一个properties对象
        this.properties = NacosWritableDataSource.buildProperties(serverAddr);
        this.configEncoder = configEncoder;
        initConfigService();
    }

    private void initConfigService() {
        try {
            // 通过properties对象初始化ConfigService
            this.configService = NacosFactory.createConfigService(properties);
        } catch (NacosException e) {
            e.printStackTrace();
        }
    }

    private static Properties buildProperties(String serverAddr) {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverAddr);
        return properties;
    }

    @Override
    public void write(T t) throws Exception {
        lock.lock();
        try {
            // 通过ConfigService往Nacos配置中心写入数据
            configService.publishConfig(dataId, groupId, this.configEncoder.convert(t), ConfigType.JSON.getType());
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void close() throws Exception {

    }

}



微服务端解析读数据源流程

我们引入了下面的依赖

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-datasource-nacos</artifactId>
</dependency>

并在配置文件中指定了多个读数据源。这些数据源是如何创建的嘞?

server:
  port: 8806

spring:
  application:
    name: mall-user-sentinel-rule-push  #微服务名称
    
  #配置nacos注册中心地址
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848

    sentinel:
      transport:
        # 添加sentinel的控制台地址
        dashboard: 127.0.0.1:8080
      datasource:
        # 名称自定义,可以随便定义字符串
        # 每一个都是一个读数据源
        flow-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            # dataId取了微服务名字,后面再拼接字符串
            dataId: ${spring.application.name}-flow-rules
            # 我这里在Nacos配置中心,单独使用了一个组
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: flow
        # 读数据源
        degrade-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-degrade-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: degrade
        param-flow-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-param-flow-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: param-flow
        authority-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-authority-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: authority
        system-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-system-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: system



源码的入口是SentinelDataSourceHandler类,它实现了SmartInitializingSingleton接口,这是Spring中的接口,所有非懒加载单例bean创建完成之后会调用这个接口的实现类:

  • 在构造函数中依赖注入SentinelProperties对象,该对象中保存了我们配置文件中所有读数据源的配置
  • 遍历SentinelProperties对象中的读数据源,并为每一个读数据源生成一个beanName
  • 为每一个读数据源对象 + beanName 创建一个BeanDefinition
  • 将BeanDefinition添加进BeanFactory中
  • BeanFactory.getBean(beanName) 创建读数据源对象。该对象其实是FactoryBean类型的
  • 上方的getBean()方法最终会调用至NacosDataSourceFactoryBean.getObject()方法,在这里创建NacosDataSource对象。该对象就是上方引入maven依赖中的读数据源对象。
public class SentinelDataSourceHandler implements SmartInitializingSingleton {

    //......
    // SentinelProperties中保存着Map<String, DataSourcePropertiesConfiguration> datasource
    // 也就是我们上方yml文件中定义的多个数据源,我们自定义的名字就是String
    private final SentinelProperties sentinelProperties;

    // 构造方法中进行依赖注入 sentinelProperties对象
    public SentinelDataSourceHandler(DefaultListableBeanFactory beanFactory,SentinelProperties sentinelProperties,...) {
        //...
        this.sentinelProperties = sentinelProperties;
    }

    // 遍历Map<String, DataSourcePropertiesConfiguration>集合,最终取出我们的每一个配置的数据源
    @Override
    public void afterSingletonsInstantiated() {
        sentinelProperties.getDatasource().forEach((dataSourceName, dataSourceProperties) -> {
            try {
                List<String> validFields = dataSourceProperties.getValidField();
                // ...

                // AbstractDataSourceProperties就是我们在配置文件中具体的每一个配置对象的公共父类
                AbstractDataSourceProperties abstractDataSourceProperties = dataSourceProperties
                    .getValidDataSourceProperties();
                abstractDataSourceProperties.setEnv(env);
                abstractDataSourceProperties.preCheck(dataSourceName);
                // 把我们配置的每一个数据源,还有这里字符串凭借的一个beanName。调用下面的registerBean()方法
                // beanName为   flow-rules + "-sentinel-" + nacos + "-datasource"
                // flow-rules是我们在yml文件中自定义的名字,nacos就是下面的validFields.get(0)值
                registerBean(abstractDataSourceProperties, dataSourceName+ "-sentinel-" + validFields.get(0) + "-datasource");
            }
            catch (Exception e) {
                log.error(...);
            }
        });
    }


    private void registerBean(final AbstractDataSourceProperties dataSourceProperties,String dataSourceName) {
        // 对我们的数据源生成一个BeanDefinition
        BeanDefinitionBuilder builder = parseBeanDefinition(dataSourceProperties,dataSourceName);
        // 将BeanDefinition添加进BeanFactory中
        this.beanFactory.registerBeanDefinition(dataSourceName,builder.getBeanDefinition());
        // 通过beanFactory.getBean(dataSourceName)方法,创建bean对象
        // 我们配置文件中定义的每一个读数据源就变为了一个一个的bean
        // 注意,我们的读数据源它是一个FactoryBean,这里的getBean()方法最终会去到NacosDataSourceFactoryBean.getObject()
        AbstractDataSource newDataSource = (AbstractDataSource) this.beanFactory.getBean(dataSourceName);

        // 将读数据源添加进对应的规则管理器中
        dataSourceProperties.postRegister(newDataSource);
    }

public class NacosDataSourceFactoryBean implements FactoryBean<NacosDataSource> {

  //......

   @Override
   public NacosDataSource getObject() throws Exception {
       // 为properties对象赋值
      Properties properties = new Properties();
      if (!StringUtils.isEmpty(this.serverAddr)) {
         properties.setProperty(PropertyKeyConst.SERVER_ADDR, this.serverAddr);
      }
      else {
         properties.setProperty(PropertyKeyConst.ENDPOINT, this.endpoint);
      }

      if (!StringUtils.isEmpty(this.contextPath)) {
         properties.setProperty(PropertyKeyConst.CONTEXT_PATH, this.contextPath);
      }
      if (!StringUtils.isEmpty(this.accessKey)) {
         properties.setProperty(PropertyKeyConst.ACCESS_KEY, this.accessKey);
      }
      if (!StringUtils.isEmpty(this.secretKey)) {
         properties.setProperty(PropertyKeyConst.SECRET_KEY, this.secretKey);
      }
      if (!StringUtils.isEmpty(this.namespace)) {
         properties.setProperty(PropertyKeyConst.NAMESPACE, this.namespace);
      }
      if (!StringUtils.isEmpty(this.username)) {
         properties.setProperty(PropertyKeyConst.USERNAME, this.username);
      }
      if (!StringUtils.isEmpty(this.password)) {
         properties.setProperty(PropertyKeyConst.PASSWORD, this.password);
      }
       // 创建一个Nacos读数据源对象,这里也就是上方:<源码分析> —— <读数据源> 的那一个对象
      return new NacosDataSource(properties, groupId, dataId, converter);
   }
 
    // ......
}



修改源码的实现

我们需要在Sentinel源码中进行修改,将dashboard和微服务之间的通信,改为dashboard和nacos的通信。在通过Nacos配置中心的推送机制去更新微服务内存中的规则配置。

从 Sentinel 1.4.0 开始,Sentinel 控制台提供 DynamicRulePublisher DynamicRuleProvider 接口用于实现应用维度的规则推送和拉取:

  • DynamicRuleProvider: 拉取规则

  • DynamicRulePublisher: 推送规则

在dashboard工程下的com.alibaba.csp.sentinel.dashboard.rule包下创建nacos包,然后把各种场景的配置规则拉取和推送的实现类写到此包下

可以参考Sentinel Dashboard test包下的流控规则拉取和推送的实现逻辑

在这里插入图片描述



官方demo

我们看看官方的demo是如何实现的

首先创建一个NacosConfigUtil类,用来定义常量

public final class NacosConfigUtil {

    // 其实demo中也就用到了上面两个常量
    // 定义配置中心的分组名,这里需要和微服务端进行配对,不然dashboard推送一个分组,微服务结果从另一个分组去读取配置
    public static final String GROUP_ID = "SENTINEL_GROUP";
    
    // 定义配置文件dataId的一个后缀,一般命名就是 serviceName + 后缀。当然dataId也要和微服务那边读取配置保存一样
    // 避免你写一个dataId,微服务从另一个dataId去读
    public static final String FLOW_DATA_ID_POSTFIX = "-flow-rules";
    public static final String PARAM_FLOW_DATA_ID_POSTFIX = "-param-rules";
    public static final String CLUSTER_MAP_DATA_ID_POSTFIX = "-cluster-map";
    public static final String CLIENT_CONFIG_DATA_ID_POSTFIX = "-cc-config";
    public static final String SERVER_TRANSPORT_CONFIG_DATA_ID_POSTFIX = "-cs-transport-config";
    public static final String SERVER_FLOW_CONFIG_DATA_ID_POSTFIX = "-cs-flow-config";
    public static final String SERVER_NAMESPACE_SET_DATA_ID_POSTFIX = "-cs-namespace-set";

    private NacosConfigUtil() {}
}



创建一个NacosConfig配置类,这里就定义了流控规则相关的转换器

@Configuration
public class NacosConfig {

    // 流控规则相关 定义 List<FlowRuleEntity> 到 String的转换器
    @Bean
    public Converter<List<FlowRuleEntity>, String> flowRuleEntityEncoder() {
        return JSON::toJSONString;
    }

    // 流控规则相关  定义 String 到 List<FlowRuleEntity>的转换器
    @Bean
    public Converter<String, List<FlowRuleEntity>> flowRuleEntityDecoder() {
        return s -> JSON.parseArray(s, FlowRuleEntity.class);
    }

    // 根据一个Nacos的serverAddr,创建ConfigService对象。推送配置/拉取配置都是通过该对象来完成的
    @Bean
    public ConfigService nacosConfigService() throws Exception {
        return ConfigFactory.createConfigService("localhost");
    }
}



接下来我们来看看dashboard推送规则配置的实现代码,它实现了DynamicRulePublisher接口

@Component("flowRuleNacosPublisher")
public class FlowRuleNacosPublisher implements DynamicRulePublisher<List<FlowRuleEntity>> {

    // 注入上面配置类的中定义的ConfigService和Converter转换器
    @Autowired
    private ConfigService configService;
    @Autowired
    private Converter<List<FlowRuleEntity>, String> converter;

    @Override
    public void publish(String app, List<FlowRuleEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        // 调用Nacos的configService.publishConfig(..)方法 推送配置
        // dataId为 appName + 最上方的常量文件后缀-flow-rules  , 分组为最上方定义的常量SENTINEL_GROUP , 并对规则配置集合转换为json字符串
        configService.publishConfig(app + NacosConfigUtil.FLOW_DATA_ID_POSTFIX,
                                    NacosConfigUtil.GROUP_ID, converter.convert(rules));
    }
}



接下来我们来看看dashboard拉取规则配置的实现代码,它实现了DynamicRuleProvider接口

@Component("flowRuleNacosProvider")
public class FlowRuleNacosProvider implements DynamicRuleProvider<List<FlowRuleEntity>> {

    // 注入上面配置类的中定义的ConfigService和Converter转换器
    @Autowired
    private ConfigService configService;
    @Autowired
    private Converter<String, List<FlowRuleEntity>> converter;

    @Override
    public List<FlowRuleEntity> getRules(String appName) throws Exception {
        // 调用Nacos的configService.getConfig(dataId, group, timeoutMs)方法 拉取配置
        String rules = configService.getConfig(appName + NacosConfigUtil.FLOW_DATA_ID_POSTFIX,
            NacosConfigUtil.GROUP_ID, 3000);
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        // 将json字符串转换为 List<FlowRuleEntity> 规则实体对象集合
        return converter.convert(rules);
    }
}



官方Demo这种方式功能上的确是实现了与Nacos通信,对Nacos配置中心进行读写。但存在一个小问题。那就是dashboard这边规则实体对象是FlowRuleEntity,但是微服务端规则实体对象是FlowRule。Nacos把配置推送给微服务端时,微服务端把json字符串转换为实体对象时可能就会出现不匹配的情况 —> 微服务规则实体对象没有相应的值 ----> 内存中的规则也就不完善 ----> 出现了dashboard端更新的规则微服务端未生效情况。

当然,流控规则都还好,如下图所示,这两个之间的实体对象成员属性基本上都能对应上

在这里插入图片描述



但热点规则这边的实体就不行了,他们之间的层级关系就不同了

public class ParamFlowRuleEntity extends AbstractRuleEntity<ParamFlowRule> {

    public ParamFlowRuleEntity() {
    }

    // ParamFlowRule为客户端的规则实体,但是这里将一整个实体对象变为了ParamFlowRuleEntity的其中一个属性
    // 所以这里转json之后的层级关系就发生了改变
    public ParamFlowRuleEntity(ParamFlowRule rule) {
        AssertUtil.notNull(rule, "Authority rule should not be null");
        // 父类中的属性
        this.rule = rule;
    }
    
    ...
}

// 父类
public abstract class AbstractRuleEntity<T extends AbstractRule> implements RuleEntity {

    protected Long id;

    protected String app;
    protected String ip;
    protected Integer port;

    // ParamFlowRule为客户端的规则实体,成为了ParamFlowRuleEntity实体的一个成员属性
    protected T rule;

    private Date gmtCreate;
    private Date gmtModified;
    ...
}



为了解决这种情况,那么就需要定义一个规范,存入Nacos配置中心的数据只能是微服务那边的规则实体对象,不能是dashboard这边的规则实体对象



修改源码实现

naocs配置中心保存的是微服务端的规则实体对象

各个规则都先在dashboard端将规则实体转换为微服务能用的规则实体在推送至Nacos配置中心

从Nacos配置中心获取配置后,都先将json字符串转换为dashboard端的规则实体对象



项目结构如下

在这里插入图片描述



配置类
import com.alibaba.csp.sentinel.dashboard.datasource.entity.gateway.ApiDefinitionEntity;
import com.alibaba.csp.sentinel.dashboard.datasource.entity.gateway.GatewayFlowRuleEntity;
import com.alibaba.csp.sentinel.dashboard.datasource.entity.rule.RuleEntity;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;


public final class NacosConfigUtil {

    // 定义配置中心的分组名,这里需要和微服务端进行配对,不然dashboard推送一个分组,微服务结果从另一个分组去读取配置
    public static final String GROUP_ID = "SENTINEL_GROUP";
    
    // 定义配置文件dataId的一个后缀,一般命名就是 serviceName + 后缀。当然dataId也要和微服务那边读取配置保存一样
    // 避免你写一个dataId,微服务从另一个dataId去读
    public static final String FLOW_DATA_ID_POSTFIX = "-flow-rules";
    public static final String PARAM_FLOW_DATA_ID_POSTFIX = "-param-flow-rules";
    public static final String DEGRADE_DATA_ID_POSTFIX = "-degrade-rules";
    public static final String SYSTEM_DATA_ID_POSTFIX = "-system-rules";
    public static final String AUTHORITY_DATA_ID_POSTFIX = "-authority-rules";
    public static final String GATEWAY_FLOW_DATA_ID_POSTFIX = "-gateway-flow-rules";
    public static final String GATEWAY_API_DATA_ID_POSTFIX = "-gateway-api-rules";

    public static final String CLUSTER_MAP_DATA_ID_POSTFIX = "-cluster-map";

    /**
     * cc for `cluster-client`
     */
    public static final String CLIENT_CONFIG_DATA_ID_POSTFIX = "-cc-config";
    /**
     * cs for `cluster-server`
     */
    public static final String SERVER_TRANSPORT_CONFIG_DATA_ID_POSTFIX = "-cs-transport-config";
    public static final String SERVER_FLOW_CONFIG_DATA_ID_POSTFIX = "-cs-flow-config";
    public static final String SERVER_NAMESPACE_SET_DATA_ID_POSTFIX = "-cs-namespace-set";
    
    //超时时间
    public static final int READ_TIMEOUT = 3000;

    private NacosConfigUtil() {}
    
    
    /**
     * RuleEntity----->Rule
     * 控制台这边的规则实体都是RuleEntity类型的,这里就调用各个规则实体对象中的toRule()方法,转换为微服务端的规则实体对象
     * 例如 FlowRuleEntity#toRule ----> FlowRule          ParamFlowRuleEntity#toRule ----> ParamFlowRule
     * @param entities
     * @return
     */
    public static String convertToRule(List<? extends RuleEntity> entities){
        return JSON.toJSONString(
                entities.stream().map(r -> r.toRule())
                        .collect(Collectors.toList()));
    }
    
    /**
     * ApiDefinitionEntity----->ApiDefinition
     * @param entities
     * @return
     */
    public static String convertToApiDefinition(List<? extends ApiDefinitionEntity> entities){
        return JSON.toJSONString(
                entities.stream().map(r -> r.toApiDefinition())
                        .collect(Collectors.toList()));
    }
    
    /**
     * GatewayFlowRuleEntity----->GatewayFlowRule
     * @param entities
     * @return
     */
    public static String convertToGatewayFlowRule(List<? extends GatewayFlowRuleEntity> entities){
        return JSON.toJSONString(
                entities.stream().map(r -> r.toGatewayFlowRule())
                        .collect(Collectors.toList()));
    }



}



通过Nacos配置中心的地址,创建对应的ConfigService对象,并存入Spring容器中

@Configuration
public class NacosConfig {

    @Value("${sentinel.nacos.config.serverAddr}")
    private String serverAddr="localhost:8848";
    
    @Bean
    public ConfigService nacosConfigService() throws Exception {
        return ConfigFactory.createConfigService(serverAddr);
    } 
    
    
    /*
    
    对于Nacos开启了认证,那么就需要添加Naocs的用户名和密码了
    
    @Bean
    public ConfigService nacosConfigService() throws Exception {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
        properties.put(PropertyKeyConst.USERNAME, "nacos");
        properties.put(PropertyKeyConst.PASSWORD, "nacos");

        return ConfigFactory.createConfigService(properties);
    }*/
}



flow

拉取配置,实现DynamicRuleProvider接口

@Component("flowRuleNacosProvider")
public class FlowRuleNacosProvider implements DynamicRuleProvider<List<FlowRuleEntity>> {

    // 注入我们上面创建的ConfigService对象
    @Autowired
    private ConfigService configService;

    @Override
    public List<FlowRuleEntity> getRules(String appName,String ip,Integer port) throws NacosException {
        // 从Nacos配置中心拉取配置
        String rules = configService.getConfig(appName + NacosConfigUtil.FLOW_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT);
        
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        
        // 解析json获取到 List<FlowRule>
        List<FlowRule> list = JSON.parseArray(rules, FlowRule.class);
        
        // 通过FlowRuleEntity.fromFlowRule(..) 方法实现 FlowRule------->FlowRuleEntity
        return list.stream().map(rule ->
                FlowRuleEntity.fromFlowRule(appName,ip,port,rule))
                .collect(Collectors.toList());
    }
}

/*

FlowRuleEntity.fromFlowRule(..) 方法如下所示,Sentinel的dashboard端的规则实体对象内其实都自己写了对应的fromFlowRule()方法

public static FlowRuleEntity fromFlowRule(String app, String ip, Integer port, FlowRule rule) {
    FlowRuleEntity entity = new FlowRuleEntity();
    entity.setApp(app);
    entity.setIp(ip);
    entity.setPort(port);
    entity.setLimitApp(rule.getLimitApp());
    entity.setResource(rule.getResource());
    entity.setGrade(rule.getGrade());
    entity.setCount(rule.getCount());
    entity.setStrategy(rule.getStrategy());
    entity.setRefResource(rule.getRefResource());
    entity.setControlBehavior(rule.getControlBehavior());
    entity.setWarmUpPeriodSec(rule.getWarmUpPeriodSec());
    entity.setMaxQueueingTimeMs(rule.getMaxQueueingTimeMs());
    entity.setClusterMode(rule.isClusterMode());
    entity.setClusterConfig(rule.getClusterConfig());
    return entity;
}


*/



推送配置,实现DynamicRulePublisher接口

@Component("flowRuleNacosPublisher")
public class FlowRuleNacosPublisher implements DynamicRulePublisher<List<FlowRuleEntity>> {

    // 注入我们上面创建的ConfigService对象
    @Autowired
    private ConfigService configService;
    
    @Override
    public void publish(String app, List<FlowRuleEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        //发布配置到Nacos配置中心,这里会调用我们工具类中编写的方法NacosConfigUtil.convertToRule(rules)
        configService.publishConfig(app + NacosConfigUtil.FLOW_DATA_ID_POSTFIX,
            NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToRule(rules));
    }
}



authority
@Component("authorityRuleNacosProvider")
public class AuthorityRuleNacosProvider implements DynamicRuleProvider<List<AuthorityRuleEntity>> {
    @Autowired
    private ConfigService configService;

    @Override
    public List<AuthorityRuleEntity> getRules(String appName,String ip,Integer port) throws Exception {
        String rules = configService.getConfig(appName + NacosConfigUtil.AUTHORITY_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT);
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        List<AuthorityRule> list = JSON.parseArray(rules, AuthorityRule.class);
        return list.stream().map(rule ->
                AuthorityRuleEntity.fromAuthorityRule(appName, ip, port, rule))
                .collect(Collectors.toList());
    }


}
@Component("authorityRuleNacosPublisher")
public class AuthorityRuleNacosPublisher implements DynamicRulePublisher<List<AuthorityRuleEntity>> {
    @Autowired
    private ConfigService configService;
    

    @Override
    public void publish(String app, List<AuthorityRuleEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        configService.publishConfig(app + NacosConfigUtil.AUTHORITY_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToRule(rules));
    }
}



degread
@Component("degradeRuleNacosProvider")
public class DegradeRuleNacosProvider implements DynamicRuleProvider<List<DegradeRuleEntity>> {
    @Autowired
    private ConfigService configService;

    
    @Override
    public List<DegradeRuleEntity> getRules(String appName,String ip,Integer port) throws Exception {
        String rules = configService.getConfig(appName + NacosConfigUtil.DEGRADE_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT);
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        List<DegradeRule> list = JSON.parseArray(rules, DegradeRule.class);
        return list.stream().map(rule ->
                DegradeRuleEntity.fromDegradeRule(appName, ip, port, rule))
                .collect(Collectors.toList());
    }
}
@Component("degradeRuleNacosPublisher")
public class DegradeRuleNacosPublisher implements DynamicRulePublisher<List<DegradeRuleEntity>> {
    @Autowired
    private ConfigService configService;
    

    @Override
    public void publish(String app, List<DegradeRuleEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        configService.publishConfig(app + NacosConfigUtil.DEGRADE_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToRule(rules));
    }
}



param
@Component("paramFlowRuleNacosProvider")
public class ParamFlowRuleNacosProvider implements DynamicRuleProvider<List<ParamFlowRuleEntity>> {
    @Autowired
    private ConfigService configService;


    @Override
    public List<ParamFlowRuleEntity> getRules(String appName,String ip,Integer port) throws Exception {
        String rules = configService.getConfig(appName + NacosConfigUtil.PARAM_FLOW_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT);
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        List<ParamFlowRule> list = JSON.parseArray(rules, ParamFlowRule.class);
        return list.stream().map(rule ->
                ParamFlowRuleEntity.fromParamFlowRule(appName, ip, port, rule))
                .collect(Collectors.toList());
    }
}
@Component("paramFlowRuleNacosPublisher")
public class ParamFlowRuleNacosPublisher implements DynamicRulePublisher<List<ParamFlowRuleEntity>> {
    @Autowired
    private ConfigService configService;
    
    @Override
    public void publish(String app, List<ParamFlowRuleEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        configService.publishConfig(app + NacosConfigUtil.PARAM_FLOW_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToRule(rules));
    }
}



system
@Component("systemRuleNacosProvider")
public class SystemRuleNacosProvider implements DynamicRuleProvider<List<SystemRuleEntity>> {
    @Autowired
    private ConfigService configService;

    @Override
    public List<SystemRuleEntity> getRules(String appName,String ip,Integer port) throws Exception {
        String rules = configService.getConfig(appName + NacosConfigUtil.SYSTEM_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT);
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        List<SystemRule> list = JSON.parseArray(rules, SystemRule.class);
        return list.stream().map(rule ->
                SystemRuleEntity.fromSystemRule(appName, ip, port, rule))
                .collect(Collectors.toList());
    }


}
@Component("systemRuleNacosPublisher")
public class SystemRuleNacosPublisher implements DynamicRulePublisher<List<SystemRuleEntity>> {
    @Autowired
    private ConfigService configService;


    @Override
    public void publish(String app, List<SystemRuleEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        configService.publishConfig(app + NacosConfigUtil.SYSTEM_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToRule(rules));
    }
}



gateway
public class ApiDefinition2 {
    private String apiName;
    private Set<ApiPathPredicateItem> predicateItems;
    
    public ApiDefinition2() {
    }
    
    public String getApiName() {
        return apiName;
    }
    
    public void setApiName(String apiName) {
        this.apiName = apiName;
    }
    
    public Set<ApiPathPredicateItem> getPredicateItems() {
        return predicateItems;
    }
    
    public void setPredicateItems(Set<ApiPathPredicateItem> predicateItems) {
        this.predicateItems = predicateItems;
    }
    
    @Override
    public String toString() {
        return "ApiDefinition2{" + "apiName='" + apiName + '\'' + ", predicateItems=" + predicateItems + '}';
    }
    
    
    public ApiDefinition toApiDefinition() {
        ApiDefinition apiDefinition = new ApiDefinition();
        apiDefinition.setApiName(apiName);
        
        Set<ApiPredicateItem> apiPredicateItems = new LinkedHashSet<>();
        apiDefinition.setPredicateItems(apiPredicateItems);
        
        if (predicateItems != null) {
            for (ApiPathPredicateItem predicateItem : predicateItems) {
                apiPredicateItems.add(predicateItem);
            }
        }
        
        return apiDefinition;
    }
    
}
@Component("gatewayApiRuleNacosProvider")
public class GatewayApiRuleNacosProvider implements DynamicRuleProvider<List<ApiDefinitionEntity>> {

    @Autowired
    private ConfigService configService;


    @Override
    public List<ApiDefinitionEntity> getRules(String appName,String ip,Integer port) throws Exception {
        String rules = configService.getConfig(appName + NacosConfigUtil.GATEWAY_API_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT);
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        
        // 注意 ApiDefinition的属性Set<ApiPredicateItem> predicateItems中元素 是接口类型,JSON解析丢失数据
        // 重写实体类ApiDefinition2,再转换为ApiDefinition
        List<ApiDefinition2> list = JSON.parseArray(rules, ApiDefinition2.class);
        
        return list.stream().map(rule ->
                ApiDefinitionEntity.fromApiDefinition(appName, ip, port, rule.toApiDefinition()))
                .collect(Collectors.toList());
    }
    
    public static void main(String[] args) {
        String rules = "[{\"apiName\":\"/pms/productInfo/${id}\",\"predicateItems\":[{\"matchStrategy\":1,\"pattern\":\"/pms/productInfo/\"}]}]";
        
        
        List<ApiDefinition> list = JSON.parseArray(rules, ApiDefinition.class);
        System.out.println(list);
    
        List<ApiDefinition2> list2 = JSON.parseArray(rules, ApiDefinition2.class);
        System.out.println(list2);
    
        System.out.println(list2.get(0).toApiDefinition());
    
    }
    
}
@Component("gatewayApiRuleNacosPublisher")
public class GatewayApiRuleNacosPublisher implements DynamicRulePublisher<List<ApiDefinitionEntity>> {

    @Autowired
    private ConfigService configService;


    @Override
    public void publish(String app, List<ApiDefinitionEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        configService.publishConfig(app + NacosConfigUtil.GATEWAY_API_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToApiDefinition(rules));
    }
}
@Component("gatewayFlowRuleNacosProvider")
public class GatewayFlowRuleNacosProvider implements DynamicRuleProvider<List<GatewayFlowRuleEntity>> {

    @Autowired
    private ConfigService configService;

    @Override
    public List<GatewayFlowRuleEntity> getRules(String appName,String ip,Integer port) throws Exception {
        String rules = configService.getConfig(appName + NacosConfigUtil.GATEWAY_FLOW_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT);
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        List<GatewayFlowRule> list = JSON.parseArray(rules, GatewayFlowRule.class);
        return list.stream().map(rule ->
                GatewayFlowRuleEntity.fromGatewayFlowRule(appName, ip, port, rule))
                .collect(Collectors.toList());
    }
}
@Component("gatewayFlowRuleNacosPublisher")
public class GatewayFlowRuleNacosPublisher implements DynamicRulePublisher<List<GatewayFlowRuleEntity>> {

    @Autowired
    private ConfigService configService;

    @Override
    public void publish(String app, List<GatewayFlowRuleEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        configService.publishConfig(app + NacosConfigUtil.GATEWAY_FLOW_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToGatewayFlowRule(rules));
    }
}



修改源码

我们现在需要在controller层,将原本dashboard从微服务获取规则配置、dashboard更新规则后调用微服务,这一过程改为Nacos。

以流控规则举例,在FlowControllerV1层中注入我们写的类

/** 从远程配置中心拉取规则*/
@Autowired
@Qualifier("flowRuleNacosProvider")
private DynamicRuleProvider<List<FlowRuleEntity>> ruleProvider;

/** 推送规则到远程配置中心*/
@Autowired
@Qualifier("flowRuleNacosPublisher")
private DynamicRulePublisher<List<FlowRuleEntity>> rulePublisher;

原本dashboard从微服务获取规则配置改为通过flowRuleNacosProvider从Nacos拉取配置

@GetMapping("/rules")
@AuthAction(PrivilegeType.READ_RULE)
public Result<List<FlowRuleEntity>> apiQueryMachineRules(@RequestParam String app,
                                                         @RequestParam String ip,
                                                         @RequestParam Integer port) {

    if (StringUtil.isEmpty(app)) {
        return Result.ofFail(-1, "app can't be null or empty");
    }
    if (StringUtil.isEmpty(ip)) {
        return Result.ofFail(-1, "ip can't be null or empty");
    }
    if (port == null) {
        return Result.ofFail(-1, "port can't be null");
    }
    try {
        //从客户端内存获取规则配置
        //List<FlowRuleEntity> rules = sentinelApiClient.fetchFlowRuleOfMachine(app, ip, port);

        //从远程配置中心获取规则配置
        List<FlowRuleEntity> rules = ruleProvider.getRules(app,ip,port);
        if (rules != null && !rules.isEmpty()) {
            for (FlowRuleEntity entity : rules) {
                entity.setApp(app);
                if (entity.getClusterConfig() != null && entity.getClusterConfig().getFlowId() != null) {
                    entity.setId(entity.getClusterConfig().getFlowId());
                }
            }
        }

        rules = repository.saveAll(rules);
        return Result.ofSuccess(rules);
    } catch (Throwable throwable) {
        logger.error("Error when querying flow rules", throwable);
        return Result.ofThrowable(-1, throwable);
    }
}

规则更改后推送至Nacos

@PostMapping("/rule")
@AuthAction(PrivilegeType.WRITE_RULE)
public Result<FlowRuleEntity> apiAddFlowRule(@RequestBody FlowRuleEntity entity) {
    Result<FlowRuleEntity> checkResult = checkEntityInternal(entity);
    if (checkResult != null) {
        return checkResult;
    }
    entity.setId(null);
    Date date = new Date();
    entity.setGmtCreate(date);
    entity.setGmtModified(date);
    entity.setLimitApp(entity.getLimitApp().trim());
    entity.setResource(entity.getResource().trim());
    try {
        // 规则写入dashboard的内存中,会写入三个map中
        entity = repository.save(entity);
        //发布规则到客户端内存中
        //publishRules(entity.getApp(), entity.getIp(), entity.getPort()).get(5000, TimeUnit.MILLISECONDS);
        //发布规则到远程配置中心
        publishRules(entity.getApp());
        return Result.ofSuccess(entity);
    } catch (Throwable t) {
        Throwable e = t instanceof ExecutionException ? t.getCause() : t;
        logger.error("Failed to add new flow rule, app={}, ip={}", entity.getApp(), entity.getIp(), e);
        return Result.ofFail(-1, e.getMessage());
    }
}

/**
* 发布规则到远程配置中心
* @param app
* @throws Exception
*/
private void publishRules(/*@NonNull*/ String app) throws Exception {
    // 从三个Map中的其中一个获取规则实体集合
    List<FlowRuleEntity> rules = repository.findAllByApp(app);
    // 推送Nacos
    rulePublisher.publish(app, rules);
}



请添加图片描述



在配置文件中指定NacosConfig的地址,因为在最上方的配置类中使用到了该配置项

#接入nacos配置中心用于规则数据持久化
sentinel.nacos.config.serverAddr=localhost:8848



测试

微服务端引入Nacos的读数据源

还是需要它监听dataId的更改,并更新内存中的规则数据

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-datasource-nacos</artifactId>
</dependency>

配置文件中添加相应的配置

server:
  port: 8806

spring:
  application:
    name: mall-user-sentinel-rule-push  #微服务名称
    
  #配置nacos注册中心地址
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848

    sentinel:
      transport:
        # 添加sentinel的控制台地址
        dashboard: 127.0.0.1:8080
      datasource:
        # 名称自定义,可以随便定义字符串
        flow-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            # dataId取了微服务名字,后面再拼接字符串
            # 注意需要和配置类中常量定义的一致
            dataId: ${spring.application.name}-flow-rules
            # 这里的组名需要和配置类中常量定义的一致
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: flow
        degrade-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-degrade-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: degrade
        param-flow-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-param-flow-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: param-flow
        authority-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-authority-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: authority
        system-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-system-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: system



在Sentinel中进行了两个限流规则的配置

在这里插入图片描述



Naocs的配置中心也有相应的更改

在这里插入图片描述



微服务中也会生效

在这里插入图片描述



补充

如果在工作中sentinel的持久化这一块已经被其他项目组的人完成了,但他们是直接把dashboard端的规则实体转json,存入了Nacos配置中心。进而导致了热点参数规则不生效,并且不允许我们修改源码。

当出现了上面这种情况,那我们应该怎么处理嘞?

解决方案:

自定义一个解析热点规则配置的解析器FlowParamJsonConverter,继承JsonConverter,重写convert方法。

利用BeanPostProcessor机制替换beanName为param-flow-rules-sentinel-nacos-datasourceconverter属性,注入FlowParamJsonConverter

@Configuration
public class ConverterConfig {
 
    @Bean("sentinel-json-param-flow-converter2")
    @Primary
    public JsonConverter jsonParamFlowConverter() {
        return new FlowParamJsonConverter(new ObjectMapper(), ParamFlowRule.class);
    }
}
 
@Component
public class FlowParamConverterBeanPostProcessor implements BeanPostProcessor {
 
    @Autowired
    private JsonConverter jsonParamFlowConverter;
 
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        if (beanName.equals("param-flow-rules-sentinel-nacos-datasource")) {
            NacosDataSourceFactoryBean nacosDataSourceFactoryBean = (NacosDataSourceFactoryBean) bean;
            nacosDataSourceFactoryBean.setConverter(jsonParamFlowConverter);
            return bean;
        }
        return bean;
    }
}


public class FlowParamJsonConverter extends JsonConverter {
    Class ruleClass;
 
    public FlowParamJsonConverter(ObjectMapper objectMapper, Class ruleClass) {
        super(objectMapper, ruleClass);
        this.ruleClass = ruleClass;
    }
 
    @Override
    public Collection<Object> convert(String source) {
        List<Object> list = new ArrayList<>();
        JSONArray jsonArray = JSON.parseArray(source);
        for (int i = 0; i < jsonArray.size(); i++) {
            //解析rule属性
            JSONObject jsonObject = (JSONObject) jsonArray.getJSONObject(i).get("rule");
            Object object = JSON.toJavaObject(jsonObject, ruleClass);
            list.add(object);
        }
        return list;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1931866.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

效率飙升!用升级版思维导图搞定测试用例

Xmind思维导图&#xff1c;转&#xff1e;测试用例_如何将xmind改成测试用例-CSDN博客https://weiyv.blog.csdn.net/article/details/135920569 上一次的【xmind思维导图转测试用例】的文章浏览量飙升&#xff0c;这一次把工具又进行升级啦&#xff0c;是在线版的免费工具哦&am…

JRebelXRebel在线激活(亲测可用)

包含所有新旧版本&#xff0c;包括2023.4.2、2023.4.1、2023.4.0、2023.3.2、2023.3.1、2023.3.0、2023.2.2、2023.2.1、2023.2.0、2023.1.2、2023.1.1 等以及所有2022版本 JRebel&XRebel激活服务器地址 激活服务器地址&#xff08;路线1,推荐&#xff09;&#xff0c;可…

文本到 3D AI 生成——Meta 3D Gen、OpenAI Shap-E工作原理与算法解析

概述 根据文本提示生成 3D 数字资产的能力代表了人工智能和计算机图形学领域最近最令人兴奋的发展之一。随着 3D 数字资产市场规模预计将从2024 年的 28.3 亿美元增加到 2029 亿美元&#xff0c;文本转 3D 人工智能模型将在游戏、电影、电子商务等行业的内容创作革命中发挥重要…

【论文阅读】(StemGNN)多元时间序列预测的谱时间图神经网络

&#xff08;StemGNN&#xff09;Spectral Temporal Graph Neural Network for Multivariate Time-series Forecasting 引用&#xff1a; Cao D , Wang Y , Duan J ,et al.Spectral Temporal Graph Neural Network for Multivariate Time-series Forecasting[J]. 2021.DOI:10.…

揭秘“循环乐购”:消费赠礼,每日返利

大家好&#xff0c;我是吴军&#xff0c;今天作为您的电商策略顾问&#xff0c;将带您深入探索一种前所未有的商业模式——“循环乐购”。在这个模式中&#xff0c;消费不再是单向支出&#xff0c;而是成为了开启财富增值的钥匙。您是否好奇&#xff0c;为何有人能在享受购物乐…

ESP32部署TensorFlow Lite

本来是想找一篇中文教程&#xff0c;不过只看到一个英文官方的&#xff0c;也行吧&#xff0c;虽然效率会慢丢丢。 GitHub - espressif/esp-tflite-micro: TensorFlow Lite Micro for Espressif Chipsets 看了一圈&#xff0c;有个中文的&#xff1a; esp-dl/README_cn.md a…

C语言之大小端理解

目录 1前言2 大小端理解与区分3 大小端的识别和基本切换操作4 总结 1前言 在汽车CAN通讯报文中往往会接触到Intel类型和motorola类型&#xff0c;实际项目中涉及到多机通讯也会接触到大小端问题 2 大小端理解与区分 大端(Big_Endian) :低字节放在高地址小端(Little_Endian):…

STM32 BootLoader 刷新项目 (三) 程序框架搭建及刷新演示

STM32 Customer BootLoader 刷新项目 (三) 程序框架搭建 文章目录 STM32 Customer BootLoader 刷新项目 (三) 程序框架搭建典型工作流程 1. 硬件原理图介绍1.1 USART硬件介绍1.2 LED和按键介绍 2. STM32 CubeMX工程搭建2.1 创建工程2.2 系统配置2.3 USART串口配置2.4 配置按键G…

SSE(Server Sent Event)实战(2)- Spring MVC 实现

一、服务端实现 使用 RestController 注解创建一个控制器类&#xff08;Controller&#xff09; 创建一个方法来创建一个客户端连接&#xff0c;它返回一个 SseEmitter&#xff0c;处理 GET 请求并产生&#xff08;produces&#xff09;文本/事件流 (text/event-stream) 创建…

QT小细节

QT小细节 1 QTextToSpeech1.1 cmake1.2 qmake QT6 6.7.2 1 QTextToSpeech 从下图可以看到&#xff0c;分别使用qmake或者cmake编译情况下的&#xff0c;QTextToSpeech的使用方法 QTextToSpeech官方链接&#xff0c;也可以直接在QT Creator的帮助中搜索 1.1 cmake 将上图中的…

无人机之机型区别与应用领域

一、多旋翼无人机 特点&#xff1a;多旋翼无人机依靠产生升力以平衡飞行器的重力&#xff0c;通过改变每个旋翼的转速来控制飞行姿态&#xff0c;能够悬停和垂直起降。他们具备体积小、重量轻、噪音小、隐蔽性好的特点&#xff0c;操作灵活且易于维护。 应用&#xff1a;多旋…

django踩坑(四):终端输入脚本可正常执行,而加入crontab中无任何输出

使用crontab执行python脚本时&#xff0c;有时会遇到脚本无法执行的问题。这是因为crontab在执行任务时使用的环境变量与我们在终端中使用的环境变量不同。具体来说&#xff0c;crontab使用的环境变量是非交互式(non-interactive)环境变量&#xff0c;而终端则使用交互式(inter…

补充.IDEA的使用

首先我们要了解在idea中Java工程由项目&#xff08;project&#xff09;、模块&#xff08;module&#xff09;包&#xff08;package&#xff09;、类&#xff08;class&#xff09;组成。 他们之间的关系是project包含module包含package包含class。 所以我们要按照先建一个pr…

启智畅想火车类集装箱号码识别技术,软硬件解决方案

集装箱号码识别需求&#xff1a; 实时检测车皮号、火车底盘号码、集装箱号码&#xff0c;根据火车类型分为以下三种情况&#xff1a; 1、纯车皮&#xff0c;只检测车皮号&#xff1b; 2、火车拉货箱&#xff08;半车皮&#xff09;&#xff0c;检测车皮号集装箱号码&#xff1b…

巧用通义灵码助力护网面试

前言 前几年护网还算是一个比较敏感的话题&#xff0c;但是随着近段时间的常态化开始&#xff0c;护网行动也是逐渐走进了大众的视野&#xff0c;成为了社会各界共同关注的安全盛事。本篇也是受通义灵码备战求职季活动的启发&#xff0c;结合近期要开始的护网行动&#xff0c…

监控系统怎样做?

监控类型自底向上分为资源监控、服务监控和业务监控。希望打造公司级的监控系统最好的时机是系统规划时&#xff0c;如果把监控设计往后放&#xff0c;将会面临一个巨大的难题&#xff1a;推行和现有不兼容的规范。 三种监控类型 资源监控 这个相对简单&#xff0c;随着k8s的兴…

Python 如何使用列表推导式(list comprehensions)?

列表推导式&#xff08;List Comprehensions&#xff09;是 Python 中一种简洁且强大的创建列表的方式。通过使用列表推导式&#xff0c;可以用一行代码来生成列表&#xff0c;而不是通过多行代码的循环或其他方法。 一、列表推导式的基本语法 列表推导式的基本语法如下&…

QT开发笔记:信号和槽

乱码问题&#xff1a; 出现乱码问题原因只有一个&#xff1a;就是编码方式不匹配&#xff01;&#xff01;&#xff01; 中文常见汉字4K,算上各种生僻字差不多六万字 仍然使用一个大表格&#xff0c;给每个汉字&#xff0c;分配一个整数即可。 字符集~~表示汉字的字符集&#…

基于若依的ruoyi-nbcio流程管理系统修正自定义业务表单的回写bug

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码&#xff1a; https://gitee.com/nbacheng/ruoyi-nbcio 演示地址&#xff1a;RuoYi-Nbcio后台管理系统 http://218.75.87.38:9666/ 更多nbcio-boot功能请看演示系统 gitee源代码地址 后端代码&#xff1a; h…

差异分析的结果各种热图,火山图,箱式图可视化作图教程

1. 基因表达的热图绘制 1.1 根据所有差异基因绘制基因表达的聚类热图 视频教程: https://www.bilibili.com/video/BV13m421g7wv/ 1.2 绘制top差异基因表达的聚类热图 视频教程: https://www.bilibili.com/video/BV1jZ42147KP/ 1.3 绘制感兴趣基因的聚类热图 视频教程: http…