何为热点?热点即经常访问的数据。很多时候我们希望统计某些热点数据中访问频次最高的Top K数据,并对其访问进行限制。
比如:
- 商品ID为参数,统计一段时间内最常购买的商品ID并进行限制
- 用户ID为参数,针对一段时间内频繁访问的用户ID进行限制
热点参数限流会统计传入参数中的热点参数,并根据配置的限流阈值与模式,对包含热点参数的资源调用进行限流。热点参数限流可以看做是一种特殊的流量控制,仅对包含热点参数的资源调用生效。
注意:
- 热点规则需要使用@SentinelResource(“resourceName”)注解,否则不生效
- 参数必须是7种基本数据类型才会生效
Sentinel利用LRU策略统计最近最常访问的热点参数,结合令牌桶算法来进行参数级别的流控。
热点参数规则
热点参数规则(ParamFlowRule)类似于流量控制规则(FlowRule):
属性 | 说明 | 默认值 |
---|---|---|
resource | 资源名,必填 | |
count | 限流阈值,必填 | |
grade | 限流模式 | QPS 模式 |
durationInSec | 统计窗口时间长度(单位为秒),1.6.0 版本开始支持 | 1s |
controlBehavior | 流控效果(支持快速失败和匀速排队模式),1.6.0 版本开始支持 | 快速失败 |
maxQueueingTimeMs | 最大排队等待时长(仅在匀速排队模式生效),1.6.0 版本开始支持 | 0ms |
paramIdx | 热点参数的索引,必填,对应 SphU.entry(xxx, args) 中的参数索引位置 | |
paramFlowItemList | 参数例外项,可以针对指定的参数值单独设置限流阈值,不受前面 count 阈值的限制。仅支持基本类型和字符串类型 | |
clusterMode | 是否是集群参数流控规则 | false |
clusterConfig | 集群流控相关配置 |
我们可以通过ParamFlowRuleManager的loadRules方法更新热点参数规则,下面是一个示例:
ParamFlowRule rule = new ParamFlowRule(resourceName)
.setParamIdx(0)
.setCount(5);
// 针对 int 类型的参数 PARAM_B,单独设置限流 QPS 阈值为 10,而不是全局的阈值 5.
ParamFlowItem item = new ParamFlowItem().setObject(String.valueOf(PARAM_B))
.setClassType(int.class.getName())
.setCount(10);
rule.setParamFlowItemList(Collections.singletonList(item));
ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
热点规则的使用
要使用热点参数限流功能,需要引入以下依赖:
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-parameter-flow-control</artifactId>
<version>x.y.z</version>
</dependency>
然后为对应的资源配置热点参数限流规则,并在entry的时候传入相应的参数,即可使热点参数限流生效。
注:若自行扩展并注册了自己实现的SlotChainBuilder,并希望使用热点参数限流功能,则可以在chain里面合适的地方插入 ParamFlowSlot。
那么如何传入对应的参数以便Sentinel统计呢?我们可以通过SphU类里面几个entry重载方法来传入:
public static Entry entry(String name, EntryType type, int count, Object... args) throws BlockException
public static Entry entry(Method method, EntryType type, int count, Object... args) throws BlockException
其中最后的一串args就是要传入的参数,有多个就按照次序依次传入。比如要传入两个参数paramA和paramB,则可以:
// paramA in index 0, paramB in index 1.
// 若需要配置例外项或者使用集群维度流控,则传入的参数只支持基本类型。
SphU.entry(resourceName, EntryType.IN, 1, paramA, paramB);
注意:若entry的时候传入了热点参数,那么exit的时候也一定要带上对应的参数(exit(count, args)),否则可能会有统计错误。
正确的示例:
Entry entry = null;
try {
entry = SphU.entry(resourceName, EntryType.IN, 1, paramA, paramB);
// Your logic here.
} catch (BlockException ex) {
// Handle request rejection.
} finally {
if (entry != null) {
entry.exit(1, paramA, paramB);
}
}
注意在Sentinel Dashboard中的簇点链路中根据链接直接配置热点规则是无效的,因为将链接标记为资源是在拦截器AbstractSentinelInterceptor的preHandle()方法中完成的,这个方法里并没有将方法的参数传入entry中。
com.alibaba.csp.sentinel.adapter.spring.webmvc.AbstractSentinelInterceptor#preHandle
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {
try {
// 拦截所有的web请求
String resourceName = getResourceName(request);
if (StringUtil.isEmpty(resourceName)) {
return true;
}
if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {
return true;
}
// Parse the request origin using registered origin parser.
String origin = parseOrigin(request);
String contextName = getContextName(request);
ContextUtil.enter(contextName, origin);
// sentinel的入口,注意没有传入方法的参数,无法实现热点规则限流
Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
return true;
} catch (BlockException e) {
try {
handleBlockException(request, response, e);
} finally {
ContextUtil.exit();
}
return false;
}
}
对于@SentinelResource注解方式定义的资源,若注解作用的方法上有参数,Sentinel会将它们作为参数传入SphU.entry(res, args)
。
比如以下的方法里面p1和p2会分别作为第一个和第二个参数传入Sentinel API,从而可以用于热点规则判断:
@RequestMapping("sentinel")
@RestController
public class ParamFlowController {
@RequestMapping("paramFlow")
@SentinelResource("paramFlow")
public String paramFlow(@RequestParam(value = "p1", required = false) String p1,
@RequestParam(value = "p2", required = false) String p2) {
return "paramFlow p1=" + p1 + ", p2=" + p2;
}
}
例如对上面的资源paramFlow
进行热点规则配置:
限流模式只支持QPS模式,也只有QPS模式下才叫热点。
配置的参数索引是@SentinelResource
注解的方法参数索引,0代表第一个参数,1代表第二个参数,以此类推;单机阀值以及统计窗口时长表示在此窗口时间超过阀值就限流。
上例中,我们将参数索引指定为0,所以当访问路径带上第一个参数p1时,在一秒(统计窗口时长)内访问超过一次(单机阈值)就可能发生限流,如果不带参数p1不会触发限流。
参数例外项的演示:
在前面的例子基础上,我们增加参数例外项,参数值为1,限流阈值为10,这样当访问路径上第一个参数p1的值为1时,在一秒(统计窗口时长)内访问超过10次(单机阈值)才会发生限流,如果第一个参数p1的值不是1时,限流的阈值还是1,如果不带参数p1不会触发限流,注意指定的参数类型要与方法的参数类型保持一致。
源码分析
ParamFlowSlot插槽
处理热点参数的Slot是ParamFlowSlot。
com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot#entry
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 处理热点参数规则
// 判断资源名是否配置了规则
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
return;
}
// 校验热点规则
checkFlow(resourceWrapper, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot#checkFlow
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
if (args == null) {
return;
}
// 判断资源名是否配置了规则
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
return;
}
// 根据资源名查询规则
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
for (ParamFlowRule rule : rules) {
// 对规则中参数的index进行处理,index可以为负数
applyRealParamIdx(rule, args.length);
// Initialize the parameter metrics.
// 初始化统计参数
ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
// 校验规则
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
String triggeredParam = "";
if (args.length > rule.getParamIdx()) {
Object value = args[rule.getParamIdx()];
triggeredParam = String.valueOf(value);
}
// 校验不通过,抛出ParamFlowException
throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
}
}
}
ParamFlowChecker的数据结构
热点参数限流使用的算法为令牌桶算法,首先来看一下数据结构是如何存储的:
// timeRecorder
// 记录令牌桶的最后添加时间,用于QPS限流
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTimeCounters = new HashMap<>();
// tokenCounter
// 记录令牌桶的令牌数量,用于QPS限流
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>();
每个Resource对应一个ParameterMetric对象,上述CacheMap<Object, AtomicLong>的Key代表热点参数的值,Value则是对应的计数器。
所以这里数据结构的关系是这样的:
- 一个Resource有一个ParameterMetric
- 一个ParameterMetric统计了多个Rule所需要的限流指标数据
- 每个Rule又可以配置多个热点参数
CacheMap的默认实现,包装了com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap,使用该类的主要原因是为了实现热点参数的LRU。
ParamFlowChecker的校验逻辑
com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker#passCheck
public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
Object... args) {
// 如果参数不存在直接返回
if (args == null) {
return true;
}
int paramIdx = rule.getParamIdx();
//参数的个数小于规则的索引直接返回
if (args.length <= paramIdx) {
return true;
}
// Get parameter value.
Object value = args[paramIdx];
// Assign value with the result of paramFlowKey method
if (value instanceof ParamFlowArgument) {
value = ((ParamFlowArgument) value).paramFlowKey();
}
// If value is null, then pass
// 如果参数为空,直接返回
if (value == null) {
return true;
}
if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
return passClusterCheck(resourceWrapper, rule, count, value);
}
// 校验规则
return passLocalCheck(resourceWrapper, rule, count, value);
}
private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
Object value) {
try {
// 根据参数的类型来校验
if (Collection.class.isAssignableFrom(value.getClass())) {
for (Object param : ((Collection)value)) {
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
} else if (value.getClass().isArray()) {
int length = Array.getLength(value);
for (int i = 0; i < length; i++) {
Object param = Array.get(value, i);
// 参数一般是数组
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
} else {
return passSingleValueCheck(resourceWrapper, rule, count, value);
}
} catch (Throwable e) {
RecordLog.warn("[ParamFlowChecker] Unexpected error", e);
}
return true;
}
static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
// 热点规则的阈值类型只能配置QPS类型
if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {
return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
} else {
// 流控效果只能是快速失败
return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
}
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value);
if (exclusionItems.contains(value)) {
int itemThreshold = rule.getParsedHotItems().get(value);
return ++threadCount <= itemThreshold;
}
long threshold = (long)rule.getCount();
return ++threadCount <= threshold;
}
return true;
}
com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker#passDefaultLocalCheck
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
/**
* ParamFlowSlot#checkFlow中会调用ParameterMetricStorage.initParamMetricsFor()初始化统计数据
* @see ParamFlowSlot#checkFlow(com.alibaba.csp.sentinel.slotchain.ResourceWrapper, int, java.lang.Object...)
*/
ParameterMetric metric = getParameterMetric(resourceWrapper);
CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
if (tokenCounters == null || timeCounters == null) {
return true;
}
// Calculate max token count (threshold)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
// tokenCount表示的是QPS的阈值,默认使用热点参数中配置的单机阈值
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
// 如果参数例外项中配置了,则使用参数例外项中配置的阈值
tokenCount = rule.getParsedHotItems().get(value);
}
// 阈值为0,直接返回false,不通过
if (tokenCount == 0) {
return false;
}
// burstCount表示应对突发流量额外允许的数量,默认为0
long maxCount = tokenCount + rule.getBurstCount();
// acquireCount表示请求需要的QPS数量,默认为1
if (acquireCount > maxCount) {
return false;
}
while (true) {
long currentTime = TimeUtil.currentTimeMillis();
// tokenCounters用来记录参数当前还能获取到的令牌数
// timeCounters用来记录时间段的开始时间。
// 获取当前统计的时间段的开始时间
AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
if (lastAddTokenTime == null) {
// 当前时间段还没有计算就初始化令牌数
// Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
return true;
}
// Calculate the time duration since last token was added.
// 计算已经过去了多长时间
long passTime = currentTime - lastAddTokenTime.get();
// A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
if (passTime > rule.getDurationInSec() * 1000) {
// 当前时间不在这个窗口内
AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null) {
// Might not be accurate here.
lastAddTokenTime.set(currentTime);
return true;
} else {
// 重新计算QPS
long restQps = oldQps.get();
// 每毫秒应该生成的 token = tokenCount / (rule.getDurationInSec() * 1000)
// 再 * passTime 即等于应该补充的 token
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);
if (newQps < 0) {
return false;
}
if (oldQps.compareAndSet(restQps, newQps)) {
// 更新当前时间
lastAddTokenTime.set(currentTime);
return true;
}
Thread.yield();
}
} else {
// 当前时间还在还在这个窗口内
AtomicLong oldQps = tokenCounters.get(value);
if (oldQps != null) {
long oldQpsValue = oldQps.get();
if (oldQpsValue - acquireCount >= 0) {
// CAS减少令牌数
if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
return true;
}
} else {
// 令牌数不够直接返回false,限流
return false;
}
}
Thread.yield();
}
}
}