Sentinel工作主流程
在Sentinel里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个Entry对象。Entry可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用SphU API显式创建。Entry创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:
- NodeSelectorSlot:负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用链路来限流降级;
- ClusterBuilderSlot:则用于存储资源的统计信息以及调用者信息,例如该资源的RT、QPS、thread count等等,这些信息将用作为多维度限流,降级的依据;
- StatisticSlot:则用于记录、统计不同维度的runtime指标监控信息;
- AuthoritySlot:则根据配置的黑白名单和调用来源信息,来做黑白名单控制;
- SystemSlot:则通过系统的状态,例如load等,来控制总的入口流量;
- ParamFlowSlot:统计热点参数的调用量,并依据这些统计信息进行热点参数限流;
- FlowSlot:则用于根据预设的限流规则以及前面slot统计的状态,来进行流量控制;
- DegradeSlot:则通过统计信息以及预设的规则,来做熔断降级;
Sentinel将ProcessorSlot作为SPI接口进行扩展,使得SlotChain具备了扩展的能力。您可以自行加入自定义的slot并编排 slot间的顺序,从而可以给Sentinel添加自定义的功能。
几种Node的区别与联系
DefaultNode
DefaultNode的实例位于NodeSelectorSlot中,意味着一个资源对应一个NodeSelectorSlot,一个NodeSelectorSlot有多个DefaultNode。
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
/**
* {@link DefaultNode}s of the same resource in different context.
*/
// key为contextName
// 如果web-context-unify=false,contextName为sentinel_spring_web_context
// 如果web-context-unify=true,contextName为url
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
DefaultNode用于流控规则中的根据调用链路限流。
ClusterNode
ClusterNode是在ClusterBuilderSlot创建的,创建完后会放入DefaultNode中,这样当DefaultNode的统计数据更新时,ClusterNode的统计数据也跟着更新了。
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
// key为资源,跟资源名直接绑定
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
clusterNodeMap是一个静态的Map结构,这样可以根据资源名查找到这个ClusterNode。
ClusterNode主要用于流控规则中的根据关联关系限流。
OriginNode
OriginNode对应的类型为StatisticNode,位于ClusterNode内部,当origin不为空时,OriginNode就会被创建,主要用于根据流控规则中的根据调用方限流。
public class ClusterNode extends StatisticNode {
... ...
// key为orgin的名称
private Map<String, StatisticNode> originCountMap = new HashMap<>();
EntranceNode
EntranceNode的类型为ClusterNode,统计的是整个实例的QPS和线程数等信息,主要用于系统规则的限流。
public final class Constants {
... ...
/**
* Global statistic node for inbound traffic. Usually used for {@code SystemRule} checking.
*/
public final static ClusterNode ENTRY_NODE = new ClusterNode(TOTAL_IN_RESOURCE_NAME, ResourceTypeConstants.COMMON);
流控规则源码分析
程序的入口
不管sentinel与哪个框架整合,在外面包了多少层封装,程序的入口都在SphU#entry,都会调用到这个核心方法来,这是一个静态方法,所以只有在第一次调用的时候才会进行初始化,dashboard才会有数据展示,其他监控和api才会有数据展示。
com.alibaba.csp.sentinel.SphU#entry(java.lang.String, int, com.alibaba.csp.sentinel.EntryType)
public static Entry entry(String name, int resourceType, EntryType trafficType) throws BlockException {
return Env.sph.entryWithType(name, resourceType, trafficType, 1, OBJECTS0);
}
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized, Object[] args) throws BlockException {
StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);
return entryWithPriority(resource, count, prioritized, args);
}
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
return new CtEntry(resourceWrapper, null, context);
}
// 可在外部手动调用InternalContextUtil.internalEnter创建context,并指定origin
if (context == null) {
// Using default context.
// 创建Context
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// Global switch is close, no rule checking will do.
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
// 根据资源名查询 处理器插槽责任链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
/*
* Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
* so no rule checking will be done.
*/
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 调用责任链
/**
* @see DefaultProcessorSlotChain#entry(com.alibaba.csp.sentinel.context.Context, com.alibaba.csp.sentinel.slotchain.ResourceWrapper, java.lang.Object, int, boolean, java.lang.Object...)
*/
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
责任链的构建
lookProcessChain()会根据资源从缓存中查责任链是否已经存在了,不存在就调用SlotChainProvider.newSlotChain()进行创建。
com.alibaba.csp.sentinel.CtSph#lookProcessChain
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
// 初始化责任链
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
newSlotChain()使用SPI机制加载SlotChainBuilder,我们可以自己注入SlotChainBuilder来实现自己的责任链,例如可以改变责任链的顺序等。
com.alibaba.csp.sentinel.slotchain.SlotChainProvider#newSlotChain
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// Resolve the slot chain builder SPI.
/**
* sentinel-core DefaultSlotChainBuilder
* @see com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder
*/
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
slotChainBuilder.getClass().getCanonicalName());
}
// 构建责任链
return slotChainBuilder.build();
}
DefaultSlotChainBuilder#build()负责整个责任链的构建,也是通过SPI机制加载所有的ProcessorSlot,我们如果要增加ProcessorSlot,直接利用SPI进行注入即可,ProcessorSlot的执行顺序根据ProcessorSlot上的@Spi注解来指定,@Spi中的顺序值越小越先执行。
com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder#build
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// SPI加载所有的ProcessorSlot
// sentinel-core中配置的
// 从这里可以看出每个资源对应一个ProcessorSlotChain,
// ProcessorSlotChain中有多个ProcessorSlot,这些实例都跟资源绑定,不是单例
/**
*
* # Sentinel default ProcessorSlots
*
* public static final int ORDER_NODE_SELECTOR_SLOT = -10000;
* public static final int ORDER_CLUSTER_BUILDER_SLOT = -9000;
* public static final int ORDER_LOG_SLOT = -8000;
* public static final int ORDER_STATISTIC_SLOT = -7000;
* public static final int ORDER_AUTHORITY_SLOT = -6000;
* public static final int ORDER_SYSTEM_SLOT = -5000;
* public static final int ORDER_FLOW_SLOT = -2000;
* public static final int ORDER_DEGRADE_SLOT = -1000;
*
* ParamFlowSlot由sentinel-parameter-flow-control因引入
*
* @see com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
* @see com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
* @see com.alibaba.csp.sentinel.slots.logger.LogSlot
* @see com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
* @see com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
* @see com.alibaba.csp.sentinel.slots.system.SystemSlot
* @see com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot
* @see com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
* @see com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
*/
List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
默认的责任链如下图:
责任链的执行
责任链的第一个Slot是一个AbstractLinkedProcessorSlot类型的匿名内部类,调用父类的fireEntry()向后传递。
com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain#entry
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
// 默认逻辑是向后传递
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
// 默认逻辑是向后传递
super.fireExit(context, resourceWrapper, count, args);
}
};
fireEntry()就是调用下一个Slot的entry()方法,所以Slot主要关注entry()方法即可。
com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot#fireEntry
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
@SuppressWarnings("unchecked")
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
entry(context, resourceWrapper, t, count, prioritized, args);
}
NodeSelectorSlot
NodeSelectorSlot主要负责收集调用链路的路径,后续可用于流控规则的调用链路限流。
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot#entry
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
// 这里是根据context name获取DefaultNode,而不是根据resource name获取
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
// 根据资源创建DefaultNode
node = new DefaultNode(resourceWrapper, null);
// 将DefaultNode放入map中,采用CopyOnWrite保证线程安全
// 相当于 map.put(context.getName(), node);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// 构建调用链树
// Build invocation tree
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
// 设置当前Node
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
ClusterBuilderSlot
ClusterBuilderSlot主要负责存储资源的统计信息,后续将用于流控规则。
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot#entry
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// 根据资源创建ClusterNode
// Create the cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
// 将ClusterNode放入clusterNodeMap中,采用CopyOnWrite保证线程安全
// 相当于 clusterNodeMap.put(node.getId(), node);
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
// 注意clusterNodeMap是一个静态变量,整个实例共享
clusterNodeMap = newMap;
}
}
}
node.setClusterNode(clusterNode);
/*
* if context origin is set, we should get or create a new {@link Node} of
* the specific origin.
*/
if (!"".equals(context.getOrigin())) {
// origin从哪来的?
// 在外部手动调用InternalContextUtil.internalEnter创建context,并指定origin
// 创建OriginNode
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
StatisticSlot
StatisticSlot主要负责统计数据,数据存储在之前的Slot中创建的DefaultNode和ClusterNode中。
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot#entry
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 目标方法调用完成之后进行统计
// Request passed, add thread count and pass count.
// DefaultNode和ClusterNode 线程数+1
node.increaseThreadNum();
// DefaultNode和ClusterNode QPS+count
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
// OriginNode 线程数+1
// 为什么不是context.getOriginNode()
// context.getCurEntry().getOriginNode() == context.getOriginNode()
context.getCurEntry().getOriginNode().increaseThreadNum();
// OriginNode QPS+count
context.getCurEntry().getOriginNode().addPassRequest(count);
}
// TODO IN 用来干啥
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
// 记录整个系统的线程数和QPS,可用于系统规则
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// 扩展点,回调
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
... ...
} catch (BlockException e) {
// Blocked, set block exception to current entry.
// 设置限流异常,后面的slot将会使用
context.getCurEntry().setBlockError(e);
// DefaultNode和ClusterNode blockQPS +1
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
// OriginNode blockQPS +1
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
// 整个系统的blockQPS +1
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// 扩展点,回调
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
// 设置限流异常,后面的slot将会使用
context.getCurEntry().setError(e);
throw e;
}
}
AuthoritySlot、SystemSlot、ParamFlowSlot、DegradeSlot前面的文章已分析,这里不再赘述。
FlowSlot
FlowSlot#entry()会在目标方法执行之前进行流控规则的校验。
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot#entry
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 校验流控规则
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
// 校验
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
一个资源可以配置多个流控规则,所以需要遍历所有的规则逐一校验,校验不通过就会抛出FlowException。
com.alibaba.csp.sentinel.slots.block.flow.FlowRuleChecker#checkFlow
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
// 遍历所有的流控规则,逐一校验
if (!canPassCheck(rule, context, node, count, prioritized)) {
// 校验不通过,抛出FlowException
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node,
int acquireCount) {
return canPassCheck(rule, context, node, acquireCount, false);
}
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
// 单机模式
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
// 根据流控模式选择不同的Node
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
/** 根据流控效果来校验
* DefaultController为直接拒绝
* RateLimiterController为排队等待
* WarmUpController为预热
*
* @see RateLimiterController#canPass(com.alibaba.csp.sentinel.node.Node, int, boolean)
* @see WarmUpController#canPass(com.alibaba.csp.sentinel.node.Node, int, boolean)
* @see DefaultController#canPass(com.alibaba.csp.sentinel.node.Node, int, boolean)
*/
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
selectNodeByRequesterAndStrategy()主要是根据不同的流控模式选择不同的Node,这里总结下:
- 根据针对来源orgin限流,使用OriginNode
- 根据资源名直接限流,使用ClusterNode
- 根据关联关系限流,根据关联的资源名称从ClusterBuilderSlot中查找ClusterNode
- 根据调用链路限流,使用DefaultNode
com.alibaba.csp.sentinel.slots.block.flow.FlowRuleChecker#selectNodeByRequesterAndStrategy
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
// The limit app should not be empty.
String limitApp = rule.getLimitApp();
int strategy = rule.getStrategy();
String origin = context.getOrigin();
if (limitApp.equals(origin) && filterOrigin(origin)) {
// limitApp和origin相同,表示要根据来源限流
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// 流控模式为直接
// Matches limit origin, return origin statistic node.
// 在中通过context.getCurEntry().setOriginNode(originNode)设置的
/**
* 相当于context.getCurEntry().getOriginNode();
* @see ClusterBuilderSlot#entry(com.alibaba.csp.sentinel.context.Context, com.alibaba.csp.sentinel.slotchain.ResourceWrapper, com.alibaba.csp.sentinel.node.DefaultNode, int, boolean, java.lang.Object...)
*/
return context.getOriginNode();
}
// 到这里origin将失效,与针对来源设置为default逻辑一致
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Return the cluster node.
// 直接限流
return node.getClusterNode();
}
// 根据关联关系和链路限流
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
}
return null;
}
static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
String refResource = rule.getRefResource();
int strategy = rule.getStrategy();
if (StringUtil.isEmpty(refResource)) {
return null;
}
if (strategy == RuleConstant.STRATEGY_RELATE) {
// 流控模式为关联关系,根据refResource找到对应的ClusterBuilderSlot
return ClusterBuilderSlot.getClusterNode(refResource);
}
// 流控模式为调用链路
if (strategy == RuleConstant.STRATEGY_CHAIN) {
if (!refResource.equals(context.getName())) {
// 如果refResource不等于context.getName(),返回null,不进行限流
// 如果web-context-unify=false,contextName为sentinel_spring_web_context
// refResource肯定不等于context.getName()[sentinel_spring_web_context],所以不会限流
// 如果web-context-unify=true,contextName为url
return null;
}
// web-context-unify=true才会到这里
return node;
}
// No node.
return null;
}
各种流控效果的实现类:
DefaultController是处理流控效果为直接拒绝,也就是直接抛出异常,其他流控效果涉及到具体的算法,故另开文章分析。
com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController#canPass(com.alibaba.csp.sentinel.node.Node, int, boolean)
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 拿到当前时间窗口的线程数或QPS
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
// prioritized默认为false,不会进入
... ...
}
}
// 请求数大于阈值直接返回false,抛出FlowException
return false;
}
return true;
}