前言
Sentinel有pull(拉)模式,和push(推)模式。本文是使用reids实现pull模式。
通过SPI机制引入自己的类
在项目的 resources > META-INF > services下创建新文件,文件名如下,内容是自己实现类的全限定名:com.xx.sentinel.RedisDataSourceInit
创建实现类
package com.xx.sentinel;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import com.alibaba.csp.sentinel.datasource.WritableDataSource;
import com.alibaba.csp.sentinel.init.InitFunc;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.xx.schedule.Utils.SpringUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.List;
/**
* @Author: wangyn85 <br>
* @CreateDate: 2023/07/01 11:14 <br>
* @Description: sentinel初始化方法,进行持久化设置
*/
public class RedisDataSourceInit implements InitFunc {
/**
* sentinel存放redis的key。例如:sentinel:common:flow
*/
private static final String SENTINEL_REDIS_KEY = "sentinel:%s:%s";
/**
* sentinel数据更新发布订阅频道。例如:chanel_sentinel_common_flow
*/
private static final String SENTINEL_REDIS_CHANEL = "chanel_sentinel_%s_%s";
private RedisTemplate redisTemplate;
private static final String RULE_FLOW = "flow";
private static final String RULE_DEGRADE = "degrade";
/**
* 给模板对象RedisTemplate赋值,并传出去
*/
private RedisTemplate<String, Object> getRedisTemplate() {
if (redisTemplate == null) {
synchronized (this) {
if (redisTemplate == null) {
redisTemplate = SpringUtil.getBean("functionDomainRedisTemplate");
}
}
}
return redisTemplate;
}
/**
* 获取sentinel存放redis的key
*
* @param ruleType
* @return
*/
private String getSentinelRedisKey(String ruleType) {
String projectName = SentinelRedisHelper.getProjectName();
return String.format(SENTINEL_REDIS_KEY, projectName, ruleType);
}
/**
* 获取sentinel数据更新发布订阅频道
*
* @param ruleType
* @return
*/
private String getSentinelRedisChanel(String ruleType) {
String projectName = SentinelRedisHelper.getProjectName();
return String.format(SENTINEL_REDIS_CHANEL, projectName, ruleType);
}
@Override
public void init() throws Exception {
// 没有配置redis或没有配置projectName则不进行持久化配置
if (getRedisTemplate() == null || StringUtils.isEmpty(SentinelRedisHelper.getProjectName())) {
return;
}
// 1.处理流控规则
this.dealFlowRules();
// 2.处理熔断规则
this.dealDegradeRules();
}
/**
* 处理流控规则
*/
private void dealFlowRules() {
String redisFlowKey = getSentinelRedisKey(RULE_FLOW);
String redisFlowChanel = getSentinelRedisChanel(RULE_FLOW);
// 注册flow读取规则
// 官方RedisDataSource是订阅获取,官方FileRefreshableDataSource是定时刷新获取。本方法是redis订阅+定时
Converter<String, List<FlowRule>> parser = source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
});
ReadableDataSource<String, List<FlowRule>> redisFlowDataSource = new RedisDataSource2<>(parser, getRedisTemplate(), redisFlowKey, redisFlowChanel);
FlowRuleManager.register2Property(redisFlowDataSource.getProperty());
// 初始化加载一次所有flow规则
String flowRulesStr = (String) getRedisTemplate().opsForValue().get(redisFlowKey);
List<FlowRule> flowRuleList = parser.convert(flowRulesStr);
redisFlowDataSource.getProperty().updateValue(flowRuleList);
// 注册flow写入规则。这样收到控制台推送的规则时,Sentinel 会先更新到内存,然后将规则写入到文件中.
WritableDataSource<List<FlowRule>> flowRuleWds = new RedisWritableDataSource<>(JSON::toJSONString, getRedisTemplate(), redisFlowKey, redisFlowChanel);
WritableDataSourceRegistry.registerFlowDataSource(flowRuleWds);
}
/**
* 处理熔断规则
*/
public void dealDegradeRules() {
String redisDegradeKey = getSentinelRedisKey(RULE_DEGRADE);
String redisDegradeChanel = getSentinelRedisChanel(RULE_DEGRADE);
Converter<String, List<DegradeRule>> parser = source -> JSON.parseObject(source, new TypeReference<List<DegradeRule>>() {
});
ReadableDataSource<String, List<DegradeRule>> redisDegradeDataSource = new RedisDataSource2<>(parser, getRedisTemplate(), redisDegradeKey, redisDegradeChanel);
DegradeRuleManager.register2Property(redisDegradeDataSource.getProperty());
// 初始化加载一次所有flow规则
String degradeRulesStr = (String) getRedisTemplate().opsForValue().get(redisDegradeKey);
List<DegradeRule> degradeRuleList = parser.convert(degradeRulesStr);
redisDegradeDataSource.getProperty().updateValue(degradeRuleList);
// 注册degrade写入规则。这样收到控制台推送的规则时,Sentinel 会先更新到内存,然后将规则写入到文件中.
WritableDataSource<List<DegradeRule>> degradeRuleWds = new RedisWritableDataSource<>(JSON::toJSONString, getRedisTemplate(), redisDegradeKey, redisDegradeChanel);
WritableDataSourceRegistry.registerDegradeDataSource(degradeRuleWds);
}
}
创建读取
package com.midea.sentinel;
import com.alibaba.csp.sentinel.datasource.AutoRefreshDataSource;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.util.AssertUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import java.nio.charset.StandardCharsets;
/**
* @Author: wangyn85 <br>
* @CreateDate: 2023/07/04 11:09 <br>
* @Description: 订阅redis通知,当sentinel规则发生变化时,拉取redis配置保存到内存。定时获取redis信息
*/
public class RedisDataSource2<T> extends AutoRefreshDataSource<String, T> {
private static Logger logger = LoggerFactory.getLogger(RedisDataSource2.class);
private static final String REDIS_SUCCESS_MSG = "OK";
private String lastModified = "-1";
private RedisTemplate redisTemplate;
/**
* 存入redis的对应规则的key
*/
private String ruleKey;
/**
* redis订阅频道
*/
private String channel;
/**
* 存入redis更新时间的key
*/
private String ruleUpdateKey;
/**
* 定时获取redis信息
*/
private static final long DEFAULT_REFRESH_MS = 300000L;
public RedisDataSource2(Converter<String, T> parser, RedisTemplate redisTemplate, String ruleKey, String channel) {
// 父级构造器,传入定时执行的时间
super(parser, DEFAULT_REFRESH_MS);
AssertUtil.notNull(redisTemplate, "redisTemplate can not be null");
AssertUtil.notEmpty(ruleKey, "redis ruleKey can not be empty");
AssertUtil.notEmpty(channel, "redis subscribe channel can not be empty");
this.redisTemplate = redisTemplate;
this.ruleKey = ruleKey;
this.channel = channel;
this.ruleUpdateKey = SentinelRedisHelper.getRedisUpdateTimeKey(ruleKey);
subscribeFromChannel();
}
@Override
public String readSource() throws Exception {
return (String) redisTemplate.opsForValue().get(ruleKey);
}
@Override
public void close() throws Exception {
super.close();
redisTemplate.execute((RedisCallback<String>) connection -> {
connection.getSubscription().unsubscribe(channel.getBytes(StandardCharsets.UTF_8));
return REDIS_SUCCESS_MSG;
});
}
/**
* 订阅消息队列
*/
private void subscribeFromChannel() {
redisTemplate.execute((RedisCallback<String>) connection -> {
connection.subscribe((message, pattern) -> {
byte[] bytes = message.getBody();
String msg = new String(bytes, StandardCharsets.UTF_8);
logger.info("{},接收到sentinel规则更新消息: {} ", channel, msg);
try {
// 收到更新通知后,从redis获取全量数据更新到内存中
getProperty().updateValue(parser.convert(readSource()));
} catch (Exception e) {
logger.error(channel + ",接收到sentinel规则更新消息:{},更新出错:{}", msg, e.getMessage());
}
}, channel.getBytes(StandardCharsets.UTF_8));
return REDIS_SUCCESS_MSG;
});
}
@Override
protected boolean isModified() {
// 根据redis的key查询是否有更新,没有更新返回false,就不用执行后面的拉取数据,提高性能
String updateTimeStr = (String) redisTemplate.opsForValue().get(ruleUpdateKey);
if (StringUtils.isEmpty(updateTimeStr) || updateTimeStr.equals(lastModified)) {
return false;
}
this.lastModified = updateTimeStr;
return true;
}
}
创建写入
package com.midea.sentinel;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.WritableDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Author: wangyn85 <br>
* @CreateDate: 2023/07/04 09:47 <br>
* @Description: 收到sentinel控制的规则更新后,讲规则持久化,并发布redis订阅通知
*/
public class RedisWritableDataSource<T> implements WritableDataSource<T> {
private static Logger logger = LoggerFactory.getLogger(RedisWritableDataSource.class);
private final String redisRuleKey;
private final Converter<T, String> configEncoder;
private final RedisTemplate redisTemplate;
private final String redisFlowChanel;
private final Lock lock;
/**
* 存入redis更新时间的key
*/
private String ruleUpdateKey;
private static final String SENTINEL_RULE_CHANGE = "CHANGE";
public RedisWritableDataSource(Converter<T, String> configEncoder, RedisTemplate redisTemplate, String redisRuleKey, String redisFlowChanel) {
this.redisRuleKey = redisRuleKey;
this.configEncoder = configEncoder;
this.redisTemplate = redisTemplate;
this.redisFlowChanel = redisFlowChanel;
this.lock = new ReentrantLock(true);
this.ruleUpdateKey = SentinelRedisHelper.getRedisUpdateTimeKey(redisRuleKey);
}
@Override
public void write(T value) throws Exception {
this.lock.lock();
try {
logger.info("收到sentinel控制台规则写入信息,并准备持久化:{}", value);
String convertResult = this.configEncoder.convert(value);
redisTemplate.opsForValue().set(ruleUpdateKey, String.valueOf(System.currentTimeMillis()));
redisTemplate.opsForValue().set(redisRuleKey, convertResult);
logger.info("收到sentinel控制台规则写入信息,持久化后发布redis通知:{},信息:{}", this.redisFlowChanel, SENTINEL_RULE_CHANGE);
redisTemplate.convertAndSend(this.redisFlowChanel, SENTINEL_RULE_CHANGE);
} catch (Exception e) {
logger.info("收到sentinel控制台规则写入信息,持久化出错:{}", e);
throw e;
} finally {
this.lock.unlock();
}
}
@Override
public void close() throws Exception {
}
}
创建需要的配置类
package com.midea.sentinel;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @Author: wangyn85 <br>
* @CreateDate: 2023/07/04 16:12 <br>
* @Description: sentinel在redis持久化相关配置
*/
@Component
public class SentinelRedisHelper {
@Value("${project.name}")
private String projectName;
private static String SENTINEL_REDIS_UPDATE_TIME = "%s:updateTime";
private static SentinelRedisHelper self;
@PostConstruct
public void init() {
self = this;
}
/**
* 获取sentinel中配置的项目名
*
* @return
*/
public static String getProjectName() {
return self.projectName;
}
/**
* 获取redis对应规则更新时间的key
*
* @param redisKey redis对应规则的key
* @return
*/
public static String getRedisUpdateTimeKey(String redisKey) {
return String.format(SENTINEL_REDIS_UPDATE_TIME, redisKey);
}
}