Sentinel核心源码和熔断降级原理分析
Sentinel核心源码分析
sentinel简介
sentinel是alibaba开源的流控组件,以流量为核心,提供系统流量控制,熔断降级,系统保护等功能,保证系统的平稳运行。目前主要的流量控制手段主要有两种,一种是以Hystrix为代表,基于线程池隔离的方式,另一种则是通过信号量的方式,sentinel就是此方式来实现流控的。
使用介绍
本文不会对其详细的使用深入介绍,具体的可以参考
简单使用步骤:
1.引入依赖
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.7.2</version>
</dependency>
2.在代码中定义资源和规则
资源 :在项目里的一段代码,一个方法等一切东西
规则 :你要对资源定义的流控规则,如qps等指标
public static void main(String[] args) {
initFlowRules();
while (true) {
Entry entry = null;
try {
entry = SphU.entry("HelloWorld");
/*您的业务逻辑 - 开始*/
System.out.println("hello world");
/*您的业务逻辑 - 结束*/
} catch (BlockException e1) {
/*流控逻辑处理 - 开始*/
System.out.println("block!");
/*流控逻辑处理 - 结束*/
} finally {
if (entry != null) {
entry.exit();
}
}
}
private static void initFlowRules(){
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setResource("HelloWorld");
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// Set limit QPS to 20.
rule.setCount(20);
rules.add(rule);
FlowRuleManager.loadRules(rules);
}
}
上述代码中业务代码被定义名为"HelloWorld"的资源,该资源的qps为20。
原理分析
从上面的代码可以看出,限流的入口是从一个叫 Entry 的对象开始的,Entry顾名思义可以理解为通行入口的意思。通过入口来把我们的代码即资源保护起来,任何请求到来时都要先经过入口的检查,如果检查通过了才能放行,否则的话拒绝放行。而通行规则则有开发人员自己来定。下图来源于sentinel官方,此图大概描述了sentinel运行的原理图。对于初次接触sentinel的人来说看到此图可能会比较懵逼,这里先简单说下大概的工作流程
当我们有请求到来时, Entry 会为每一个资源创建一个处理链 ProcessorSlotChain ,系统默认提供了8个Handler构成了此处理链,每个handler各司其职完成相应的功能,当然我们的流量校验处理对象也在其中名为 FlowSlot ,如果请求到来时能够通过 ProcessorSlotChain 的校验的话,就放行此请求,如果不通过,就会抛出相应的异常。Sentinel是以流量控制为核心,底层的流量统计是以滑动窗口来完成qps统计的,具体实现是通过名为 LeapArray 的对象来完成的。处理链如下图所示:
核心概念和类
在分析具体的源码之前,先介绍几个比较核心的概念和对象,不然进入代码会比较生涩。
Entry :前面说过了,对于要保护的资源须用Entry包裹起来即:
Entry entry = SphU.entry("HelloWorld");
......//保护的资源
entry.exit();
Context :上下文,每个entry都是存在特定上下文中,它是一个ThreadLocal变量,其内部通过name字段来区分不同的上下文,一个上下文即代表一个EntranceNode;
ResourceWrapper :资源包装类,上面SphU.entry(“HelloWorld”)语句实际上创建了一个HelloWorld的ResourceWrapper,资源具有全局唯一性
public abstract class ResourceWrapper {
protected final String name; //资源名
......
}
ProcessorSlotChain : 这个是个核心组件,其各种限流,熔断等功能都是通过此对象来实现的。内部是一个个的Slot对象,每个Slot对象完成各自对应的功能,其Chain的构建是通过Spi的方式来构建的。
# Sentinel default ProcessorSlots
//为每个ProcessorSlotChain提供Node对象
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
//为每个ProcessorSlotChain提clusterNode对象和originNode
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
//记录日志
com.alibaba.csp.sentinel.slots.logger.LogSlot
//统计流量
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
//系统规则校验保护
com.alibaba.csp.sentinel.slots.system.SystemSlot
//认证校验
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
//流控检查
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
//熔断降级检测
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
除了上面系统默认提供的8个Slot对象外,开发者还可以自己定义相关的Slot对象,按照SPI的方式加入即可。
Node : 用于完成统计相关的功能, ProcessorSlotChain 中的第一个Slot对象NodeSelectorSlot就是用于创建或获取Node对象,后续的每个Slot都会透传此Node对象,用于统计相关功能。
核心源码分析
有了上面几个比较核心的概念后,下面正式进入源码的分析。
1. 入口创建 SphU
此类提供了创建Entry的api,类似的类还有一个叫做SphO,两个类的区别在于前者是通过抛出异常的方式来拒绝请求通过,而后者则是通过返回Bool类型的结果来表示结果。
SphU
public static Entry entry(String name) throws BlockException {
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}
SphO
public static boolean entry(String name) {
return entry(name, EntryType.OUT, 1, OBJECTS0);
}
后面我们已SphU为入口来分析,在SphU的entry方法中调用了Env类:
public class Env {
//创建CtSph对象,用于统计和规则校验
public static final Sph sph = new CtSph();
static {
// If init fails, the process will exit.
//初始化InitFunc相关接口,通过SPI定义
InitExecutor.doInit();
}
}
进入CtSph的entry方法,此类中entry方法有多个重载,我们只分析一个即可,其他的都是一样的
@Override
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return entry(resource, count, args);
}
2.创建conext和processorSlotChain对象
在上面的方法中我们可以看出,每个entry都会关联一个资源,资源通过name和type来唯一关联。接着代码往下走,最后会进入一个entryWithPriority的方法,此方法是一个很重要的方法,在方法类会创建上下文对象Context,处理链对象ProcessorSlotChain,也是规则校验的入口
/**
* resourceWrapper:资源
* count:请求许可
* prioritized:优先级
* args:额外携带的参数
*/
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
//获取上下文对象,一个ThreadLocal变量
Context context = ContextUtil.getContext();
//如果创建的上下文数量达到上限(2000),会返回一个NullContext对象
if (context instanceof NullContext) {
//创建一个不执行规则校验的entry对象
return new CtEntry(resourceWrapper, null, context);
}
//第一次进入会走到走到这里
if (context == null) {
//创建上下文对象,并设置默认名称为:sentinel_default_context
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
//全局控制开关,如果关闭则不执行规则校验
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
//创建或获取ProcessorSlotChain对象,用于执行规则校验
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
//如果走到这里,意味着创建了太多的资源,默认不能超过6000
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
//走到这里,意味着上下文对象创建和ProcessorSlotChain对象创建都ok
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
//进入规则校验,如果不通过则会抛出异常
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
//限流触发的异常
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
上面的代码实际上就包含了整个流控规则的校验流程。下面来看看上下文对象Context的创建
1.ContextUtil.getContext()
private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
......
public static Context getContext() {
return contextHolder.get();
}
可以看到context是一个线程本地变量,第一次进入的时候返回空,在ContextUtil类的trueEnter方法中会创建新的context对象
protected static Context trueEnter(String name, String origin) {
Context context = contextHolder.get();
if (context == null) {
//获取上下文node缓存,key为context的name
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
//获取上下文入口EntranceNode
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
LOCK.lock();
try {
//二次校验
node = contextNameNodeMap.get(name);
if (node == null) {
if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
//创建入口EntranceNode,每个上下文都有唯一的一个入口
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// 将创建的entranceNode挂载到根node下
Constants.ROOT.addChild(node);
//将新创建的node加入到缓冲中
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
}
} finally {
LOCK.unlock();
}
}
}
//创建上下文
context = new Context(node, name);
context.setOrigin(origin);
contextHolder.set(context);
}
return context;
}
上述代码就是context的创建的全过程,每个context都是线程本地变量,并且都会关联一个EntranceNode,并将其挂载根node节点下面。context对象创建完毕后就会创建ProcessorSlotChain对象,我们回到上面entryWithPriority方法中的lookProcessChain方法
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
//先从缓存中查询获取对象
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
//通过SPI的方式创建对象
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
ProcessorSlotChain对象的创建是通过spi的方式,其接口实现类定义在如下文件中
,将文件中的对象按照约定顺序组织起来就形成了ProcessorSlotChain对象,具体就不深入进去了。当相关的对象都创建好了以后就是具体的规则校验了,回到entryWithPriority方法中的chain.entry(context, resourceWrapper, null, count, prioritized, args)这行代码来,此方法就是规则校验的入口。进入处理链的一个对象是NodeSelectorSlot,来看看源码
@SpiOrder(-10000)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
//DefaultNode缓存,key为context的名称
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
//创建node
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
这段代码还是比较简单,根据上下文的名字获取DefalutNode,如果没有则创建。这里先梳理一下context、entry、ProcesssorSlotChain、和DefaultNode的关系。
- context代表的是一个上下文,每个entry都是在某个具体的context下运行的,同时每个context都会有一个EntranceNode;
- 每个entry都关联一个具体resource;
- 每个resource都会有一个ProcesssorSlotChain来做规则校验;
- 每个ProcessSlotChain可以包含多个DefaultNode,但只会有一个clusterNode;
node的关系图如下(来自官网):
EntranceNode1、EntranceNode2分别代表两个上下文环境的入口node。处理链中第二个对象是ClusterBuilderSlot,此对象的作用维护ClusterNode和originNode,clusterNode的作用是针对resource的所有上下文来统计的,originNode的作用是针对具有origin属性的entry来说的,来看看主要代码
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
//创建clusterNode
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
//node为当前上下文的entranceNode
node.setClusterNode(clusterNode);
//如果上下文设置了origin属性
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
接下来的处理器就是LogSlot,此类是功能主要就是记录异常的日志,这里就不细说了。下一个处理器StatisticSlot是非常重要的一个对象,它维护着流量、线程、异常等统计信息。代码如下:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// 后续流控、熔断降级等校验
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 如果校验通过会走到这里,增加线程和pass的计数统计
node.increaseThreadNum();
node.addPassRequest(count);
//如果设置了origin,增加originNode统计
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
//增加全局计数统计
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// 调用回调
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// 增加block计数统计
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
接下来的SystemSlot和AuthoritySlot这里就不做介绍了。重点说下FlowSlot这个类,整个流控的校验都是在这里面进行的。代码比较简单:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
进入checkFlow方法,最终会调用FlowRuleChecker的checkFlow方法:
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
//获取流控规则,从这里可以看出,如果设置了多个规则,会逐一校验每一个规则
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
进入canPassCheck方法,
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
//如果是集群模式,进行集群模式校验
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
//本地校验
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
//选取合适的node,根据其统计技术来判断是否通过
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
最后会调用流量整形控制器的canPass方法,这里看下默认的流量整形控制器DefaultController的实现
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//获取已使用的许可
int curCount = avgUsedTokens(node);
//如果 当前已使用许可 + 请求许可 > 设置的数量
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
//采用滑动窗口来统计,每个窗口分割成了多个小的窗体,通过判断后续窗体的容量来进行流控
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
上述的过程分析了sentinel限流的原理和主要工作流程。其核心在于ProcessorSlotChain,把握住它就可以抓住其主脉络。
sentinel熔断降级原理分析
上面介绍了sentinel的基本工作原理和限流原理。本文将从源码的角度对熔断降级的原理进行分析。熔断操作位于slot处理链的末尾,由名为DegradeSlot的类来处理的,来看看其源码
@SpiOrder(-1000)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
//熔断降级判断
DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
源码很简单,委托DegradeRuleManager来处理,进入DegradeRuleManager的checkDegrade方法
public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
throws BlockException {
//获取资源熔断规则
Set<DegradeRule> rules = degradeRules.get(resource.getName());
if (rules == null) {
return;
}
//遍历每个熔断规则,校验是否满足熔断条件
for (DegradeRule rule : rules) {
//如果达到了熔断条件,就会抛出DegradeException的异常
if (!rule.passCheck(context, node, count)) {
throw new DegradeException(rule.getLimitApp(), rule);
}
}
}
熔断的判断就是针对资源设置的规则,逐一判断处理。如果有一个条件不满足的话,就会抛出DegradeException异常。那么熔断判断具体是怎么做的呢?继续深入DegradeRule类中的passCheck方法,在分析passCheck方法之前,先介绍DegradeRule类几个比较重要的字段。
//慢请求或异常请求的计数
private double count;
//熔断窗口
private int timeWindow;
//熔断策略 (0: 慢调用, 1: 异常率, 2: 异常数)
private int grade = RuleConstant.DEGRADE_GRADE_RT;
/**
* 针对慢调用,如果慢调用数小于其值(默认为5),是不会触发熔断的
*
* @since 1.7.0
*/
private int rtSlowRequestAmount = RuleConstant.DEGRADE_DEFAULT_SLOW_REQUEST_AMOUNT;
/**
* 针对异常率,如果异常数小于其值(默认为5),是不会触发熔断的
*
* @since 1.7.0
*/
private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;
熔断的实现原理简单说来就是在一个设定的窗口时间内,根据设置的具体熔断策略,判断相应的计数统计是否超过了门限值,如果超过了则会触发熔断机制。深入passCheck的源码
//满调用计数
private AtomicLong passCount = new AtomicLong(0);
//熔断降级标记位,如果为true,则表示触发了熔断
private final AtomicBoolean cut = new AtomicBoolean(false);
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
//如果标记位为真,表示已触发熔断
if (cut.get()) {
return false;
}
//获取资源计数统计node
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
if (clusterNode == null) {
return true;
}
//如果熔断降级策略为慢调用
if (grade == RuleConstant.DEGRADE_GRADE_RT) {
//获取慢调用平均响应时间
double rt = clusterNode.avgRt();
//如果调用平均响应时间小于设定的门限值,则重置慢调用计数统计
if (rt < this.count) {
passCount.set(0);
return true;
}
//如果满调用数小于默认的最小门限数(5),则不进行熔断降级
if (passCount.incrementAndGet() < rtSlowRequestAmount) {
return true;
}
//如果熔断降级策略是异常率
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
//每秒的异常数
double exception = clusterNode.exceptionQps();
//每秒成功调用数
double success = clusterNode.successQps();
//每秒总调用数
double total = clusterNode.totalQps();
//如果总调用数小于默认的门限值(5),则不会触发熔断降级
if (total < minRequestAmount) {
return true;
}
//此句需要好好理解下,它表达的意思是:在异常数小于最小门限的条件是不进行熔断降级的,但前提是所用调用都不能全是异常调用
double realSuccess = success - exception;
if (realSuccess <= 0 && exception < minRequestAmount) {
return true;
}
//异常率小于设置的门限,则不熔断降级
if (exception / success < count) {
return true;
}
//如果熔断降级策略是异常数
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
//注意,这个异常数是每分钟统计的
double exception = clusterNode.totalException();
//小于设置的门限值,则不熔断
if (exception < count) {
return true;
}
}
//如果走到了这里,则表示将要触发熔断降级了
//重置慢调用统计时间窗口,此处用了CAS的方法来设置标志位,防止并发。时间窗口的重置是依赖于定时任务来完成的,当timeWindow时间后,会重置熔断标志位和计数统计
if (cut.compareAndSet(false, true)) {
ResetTask resetTask = new ResetTask(this);
pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
}
return false;
}
//重置时间窗口
private static final class ResetTask implements Runnable {
private DegradeRule rule;
ResetTask(DegradeRule rule) {
this.rule = rule;
}
@Override
public void run() {
//重置慢调用计数
rule.passCount.set(0);
//熔断标志位
rule.cut.set(false);
}
}
上面的代码描述了熔断降级核心流程,针对上面代码需要注意的是:
- 慢调用是通过一个 时间窗口 来计数满调用的次数来实现的
- 异常率是针对 每秒 的异常数和成功数的比值来判断是否满足触发条件的
- 异常数是针对 每分钟 的异常数统计来实现的
当熔断被触发后,标志位会被设置为true,并会持续timeWindow长的时间,这个时间就是开发者在设置熔断降级规则时设置的。上述就是整个熔断降级的实现过程,从代码来看,熔断窗口通过一个定时任务来更新,设计的还是比较新颖的。