软件架构从当初的单机,演变到后来的集群,再到后来的分布式应用。原本看似可以信任的服务调用,加上了网络因素就变得不再可靠。再考虑到一些调用链路的特殊性,又要保证性能,又要尽可能增加成功率,所以调用方必须肩负起重试的责任。
自己写,怎样实现?
重试并不复杂,首先来分析下重试的调用场景,可以想到业务当中不止一处会需要重试能力,并且业务其实更关乎自己的代码块被重试就可以了,而不在乎如何实现的重试。
所以变化的是一段可以重复执行的代码块,以及重试次数等。
① 代码块可以用Java8支持的函数式编程解决
② 次数可以用入参/配置实现
首先定义一个执行的模版,结合上面我们的分析,这个模版需要一个待实行的 **方法块** ,以及 **配置。** (次数等区分场景,用枚举定义,方便全局管控):
@AllArgsConstructor
@Getter
public enum RetrySceneEnums {
QUERY_USER_INFO("查询用户信息", 2),
;
private String desc;
private int retryTimes;
}
abstract class MyRetryTemplate<Req, Resp> {
/** 配置 */
private IntegrationRetryEnums retryEnum;
/** 方法块 */
private Function<Req, Resp> fuction;
MyRetryTemplate(IntegrationRetryEnums retryEnum, Function<Req, Resp> fuction) {
this.retryEnum = retryEnum;
this.fuction = fuction;
}
/**
* 构建函数
*/
private Supplier<Function<Req, Resp>> retry = () -> (param) -> {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
return execute(param, 1, stopWatch);
};
private Resp execute(Req param, int index, StopWatch stopWatch) {
if (index > retryEnum.getRetryTimes()) {
stopWatch.split();
// 最大的重试次数后仍失败,可以打印摘要日志
return null;
}
try {
// 执行方法块
Resp result = fuction.apply(param);
stopWatch.split();
// 认为成功
return result;
} catch (Throwable e) {
// 递归重试
return execute(param, ++index, stopWatch);
}
}
Function<Req, Resp> getRetryService() {
return retry.get();
}
}
逻辑很简单,利用递归思想(当然也可以用循环),默认调用方法抛出异常才会重试,成功则返回,否则递归直到最大次数,使用 StopWatch 统计总耗时,业务可以打印摘要日志并配置监控等,代码简短,但基本实现了功能。
对 抛出异常才会重试 稍微解释下,为什么不支持对业务成功/失败进行重试。首先重试框架感知不到具体的业务响应结构,进而也无法判断业务的成功与失败,当然重试框架也可以自定义一个响应体并强制使用,但这便于业务代码形成耦合。再退一步讲,也没有必要对业务失败进行重试,因为框架毕竟是框架,只需要做通用的事情,业务的失败到底需不需要重试,需要看具体的场景。
模板完工了,看下具体怎样改造一个已有的方法,使其支持重试功能,假设当前有一个查询用户信息的方法 *UserService#queryById(String id)* 。需要在 Spring 启动后将其包装以支持重试,通过事件机制监听 ContextRefreshedEvent 事件。
@Component
public class RetryServiceFactory implements ApplicationListener<ContextRefreshedEvent> {
private static AtomicBoolean INIT = new AtomicBoolean(false);
@Autowired
private UserService userService;
private Function<String, WorkOrderDTO> uerQueryFunction;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
if (INIT.compareAndSet(false, true)) {
this.uerQueryFunction = new MyRetryTemplate<String, WorkOrderDTO>(QUERY_USER_INFO, userService::queryById) {
}.getRetryService();
}
}
public Function<String, WorkOrderDTO> getUserQueryService() {
return uerQueryFunction;
}
}
需要使用此能力的地方,这么调用即可:
@Autowired
private RetryServiceFactory retryServiceFactory;
public void businessLogic(String userId){
// 调用工厂获取自带重试功能的服务,并执行
UserInfo userInfo = retryServiceFactory.getUserQueryService().apply(userId);
}
到此,自己手写的重试工具已经完成,如果执行时遇到网络抖动,超时等,都会进行最大2次的重试(次数由 RetrySceneEnums 设置)。
这时候再回头审视下代码,可以发现诸多局限性:
- 比如 Function 的使用,就限定了入参只能是一个,且必须有返回(即不支持 void);
- 每次接入一个服务,就要在 RetryServiceFactory 扩充;
- 考虑网络抖动可能有区间性,盲目连续请求可能全部失败,如要实现等待一段实现再重试,怎么办?
- 假使底层服务真的挂了,但上层业务系统依旧重试,重试每次都以超时告终(占用线程资源),流量上来以后,必然会牵连业务系统也不可用。自然想到熔断,那重试如何加入断路器(Circuit Breaker)模式?
遇到共性问题,首先要想到有没有现成的框架可以使用,而不是造轮子。目前我已知的重试框架,有 spring-retry 以及 guava-retrying ,接下来将对 spring-retry 的使用和原理进行讲解。
Spring Retry 使用
同spring事务框架一样,retry框架同样支持 编程式 和 声明式 两种。仅需要一个maven依赖。如果使用声明式,需要额外引入 aspectj 。
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.2</version>
<scope>runtime</scope>
</dependency>
编程式
先来介绍下编程式使用到的几个主要组件:
-
重试模板类:org.springframework.retry.support.RetryTemplate
-
重试策略类:org.springframework.retry.RetryPolicy
-
重试回退策略(两次重试间等待策略):org.springframework.retry.backoff.BackOffPolicy
-
重试上下文:org.springframework.retry.RetryContext
-
监听器:org.springframework.retry.RetryListener
接下来来看下如何使用,然后穿插讲下不同策略的区别。
首先是开启重试,并声明一个模板bean。因为只是模板,所以可以借助 Spring IOC 声明为单例。
@Configuration
@EnableRetry(proxyTargetClass = true)
public class RetryConfig {
@Bean
public RetryTemplate simpleRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
// 最大重试次数策略
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3);
retryTemplate.setRetryPolicy(simpleRetryPolicy);
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
// 每隔1s后再重试
fixedBackOffPolicy.setBackOffPeriod(1000);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
return retryTemplate;
}
}
例子代码使用了 SimpleRetryPolicy 和 FixedBackOffPolicy,当然还有其他重试策略和回退策略。枚举如下:
重试策略
策略类 | 效果 | 关键参数 |
---|---|---|
MaxAttemptsRetryPolicy |
设置最大的重试次数,超过之后执行recover | maxAttempts:最大重试次数 |
BinaryExceptionClassifierRetryPolicy |
可以指定哪些异常需要重试,哪些异常不需要重试 | exceptionClassifier:BinaryExceptionClassifier,异常识别类,其实就是存在一个Map映射,Map<Class<? extends Throwable>, Boolean>,value为true的话就说明需要重试。 |
SimpleRetryPolicy |
上面两者的结合,支持次数和自定义异常重试 | maxAttempts 和 exceptionClassifier |
TimeoutRetryPolicy |
在指定的一段时间内重试 | timeout:单位ms |
ExceptionClassifierRetryPolicy |
支持异常和重试策略的映射,比如异常A对应重试策略A,异常B对应重试策略B | exceptionClassifier:本质就是异常和重试策略的映射 |
CompositeRetryPolicy |
策略类的组合类,支持两种模式,乐观模式:只要一个重试策略满足即执行,悲观模式:另一种是所有策略模式满足再执行 | policies:策略数组 |
optimistic:true-乐观;false-悲观 | ||
CircuitBreakerRetryPolicy |
熔断器模式 | delegate:策略代理类openTimeout:[0,openTimeout]会一直执行代理类策略,(openTimeout, resetTimeout] 会半打开,如果代理类判断不可以重试,就会熔断,执行recover逻辑,如果代理类判断还可以重试且重试成功,开关会闭合。resetTimeout:超过指定时间,开关重置闭合 |
ExpressionRetryPolicy |
符合表达式就重试 | expression:表达式,见 org.springframework.expression.Expression实现 |
AlwaysRetryPolicy |
一直重试 | |
NeverRetryPolicy |
从不重试 |
回退策略
策略类 | 效果 | 关键参数 |
---|---|---|
FixedBackOffPolicy |
间隔固定时间重试 | sleeper:支持线程sleep和Object#wait,区别在于是否释放锁 |
backOffPeriod:等待间隔 | ||
NoBackOffPolicy |
无等待直接重试 | |
UniformRandomBackOffPolicy |
在一个设置的时间区间内。随机等待后重试 | minBackOffPeriod:区间下限 maxBackOffPeriod:区间上限 sleeper:同上 |
ExponentialBackOffPolicy |
在一个设置的时间区间内,等待时长为上一次时长的递增 | initialInterval:默认起始等待时间 maxInterval:最大等待时间 multiplier:递增倍数 sleeper:同上 |
ExponentialRandomBackOffPolicy |
在 ExponentialBackOffPolicy 基础上,乘数随机 |
至此,完成了重试模板的定义。接下来是需要在具体的业务场景下调用模板方法即可。同样的,为了保证复用性,仍然借助函数式编程定义:
@Service
public class RetryTemplateService {
@Autowired
private RetryTemplate simpleRetryTemplate;
public <R, T> R retry(Function<T, R> method, T param) throws Throwable {
return simpleRetryTemplate.execute(new RetryCallback<R, Throwable>() {
@Override
public R doWithRetry(RetryContext context) throws Throwable {
return method.apply(param);
}
}, new RecoveryCallback<R>() {
@Override
public R recover(RetryContext context) throws Exception {
return null;
}
});
}
}
@Service
public class UserService {
@Autowired
private RetryTemplateService retryTemplateService;
public User queryById(String id) {
// TODO 业务逻辑
}
public User queryByIdWithRetry(String id) throws Throwable {
return retryTemplateService.retry(this::queryById, id);
}
}
通过调用 *org.springframework.retry.support.RetryTemplate#execute* ,指定需要支持重试的业务代码回调 *org.springframework.retry.RetryCallback* ,以及全部重试失败的兜底回调 *org.springframework.retry.RecoveryCallback* 。
RetryTemplate 支持四种重载的 execute 方法,这里不全部展开分析,大体十分相似,变化的是策略(见上面表格),以及 RetryState(有无状态,下面分析)。
声明式
通过上面的编码式,还需要针对不同场景编写一到多套模板方法,那有没有什么可以简化这一步呢,毕竟业务没必要关注这些具体的模板代码。就像事务一样,只需要方法上加 *@Transactional* 就可以了。当然,spring-retry 也支持,直接上代码:
@Service
public class UserService {
@Retryable(include = RetryException.class, exclude = IllegalArgumentException.class,
listeners = {"defaultRetryListener"}, backoff = @Backoff(delay = 1000, maxDelay = 2000))
public User queryById(String id) {
// TODO 业务逻辑
}
@Recover
public User recover(RetryException e, String id) {
User user = new User();
user.setName("兜底");
return user;
}
}
@Service
public class DefaultRetryListener implements RetryListener {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
System.out.println("==前置回调==");
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
System.out.println("==后置回调==");
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
System.out.println("==执行报错==");
}
}
这样,直接调用 *UserService#queryById* 就支持重试了,比编程式节省了不少工作量。
我这里一股脑设置了很多注解参数,其实都对应上面编程式的策略,毕竟编程式有的,我声明式也得要。使用时按需即可,接下来对照着 *@Retryable* 中定义的所有属性,解释下含义:
- recover:指定兜底/补偿的方法名。如果不指定,默认对照 @Recover 标识的,第一入参为重试异常,其余入参和出参一致的方法;
- interceptor:指定方法切面 bean, org.aopalliance.intercept.MethodInterceptor 实现类
- value / include:两者用途一致,指出哪些类型需要重试;
- exclude:和 include 相反,指出哪些异常不需要重试;
- label:可以指定唯一标签,用于统计;
- stateful:默认false。重试是否是有状态的;
- maxAttempts:最大的重试次数;
- backoff:指定 @Backoff ,回退策略;
- listeners:指定 org.springframework.retry.RetryListener 实现 bean。
对照着上面编程式,可以看出回退策略用注解 *@Backoff* 支持了。这里还补充了上面编程式没有用到的 *RetryListener* ,它定义了3个方法:
<T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback);
<T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable);
<T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable);
open 和 close 分别在重试整体的前置和后置被回调一次,onError 则是重试整体过程中,每次异常都会被回调。(具体细节见下面 **源码分析** )
@Backoff 的属性如下,基本都对应上面编程式的几种策略,仅简单解释下
- value / delay:两者都标识延迟时间,为 0则对应 NoBackOffPolicy 策略。
- maxDelay:最大延迟时间
- multiplier:递增乘数
- random:递增乘数是否随机
重试模式中的断路器,由于其场景的特殊性以及属性的复杂性,则被单独定义成了一个注解 *@CircuitBreaker* ,从注释的定义可以看出,它是在 *@Retryable* 基础之上的扩展。
@Target({ ElementType.METHOD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Retryable(stateful = true)
public @interface CircuitBreaker {
....
}
其中属性的定义,也是在 *@Retryable* 的属性基础上,额外扩充了断路器特有的几个属性,对应上面重试策略表格中—— *CircuitBreakerRetryPolicy* 的关键参数:
- resetTimeout:重置闭合超时时间
- openTImeout:半打开状态超时时间
源码解析
那具体 spring-retry 的原理是怎样的呢?是递归还是循环,注解又是怎样实现的?带着这些问题开始源码分析,顺带着可以看下上面没有讲到的 org.springframework.retry.RetryContext 组件在业务中如何使用。
首先从模板方法源码开始:
public class RetryTemplate implements RetryOperations {
@Override
public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback,
RecoveryCallback<T> recoveryCallback) throws E {
return doExecute(retryCallback, recoveryCallback, null);
}
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {
// 通过 set方法设置的重试策略,默认 SimpleRetryPolicy
RetryPolicy retryPolicy = this.retryPolicy;
// 通过 set方法设置的回退策略,默认 NoBackOffPolicy
BackOffPolicy backOffPolicy = this.backOffPolicy;
// 无状态的:重试策略自定义 org.springframework.retry.RetryPolicy.open
// 有状态的:根据策略,每次新建/从缓存获取
RetryContext context = open(retryPolicy, state);
if (this.logger.isTraceEnabled()) {
this.logger.trace("RetryContext retrieved: " + context);
}
// 绑定到 ThreadLocal 以便线程内全局获取
RetrySynchronizationManager.register(context);
Throwable lastException = null;
boolean exhausted = false;
try {
// 回调所有的 org.springframework.retry.RetryListener.open
boolean running = doOpenInterceptors(retryCallback, context);
// 任意一个监听器的open返回 false则抛出异常
if (!running) {
throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");
}
// Get or Start the backoff context...
BackOffContext backOffContext = null;
Object resource = context.getAttribute("backOffContext");
if (resource instanceof BackOffContext) {
backOffContext = (BackOffContext) resource;
}
if (backOffContext == null) {
// 回调重试策略的 start 方法
backOffContext = backOffPolicy.start(context);
if (backOffContext != null) {
context.setAttribute("backOffContext", backOffContext);
}
}
// 回调 org.springframework.retry.RetryPolicy.canRetry 判断是否可以重试
// 同时可以调用 org.springframework.retry.RetryContext.setExhaustedOnly进行干预,不再进行重试
while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Retry: count=" + context.getRetryCount());
}
lastException = null;
// 回调业务代码块
return retryCallback.doWithRetry(context);
} catch (Throwable e) {
lastException = e;
try {
// 回调 org.springframework.retry.RetryPolicy.registerThrowable
registerThrowable(retryPolicy, state, context, e);
} catch (Exception ex) {
throw new TerminatedRetryException("Could not register throwable", ex);
} finally {
// 回调 org.springframework.retry.RetryListener.onError
doOnErrorInterceptors(retryCallback, context, e);
}
// 同上面的判断一样,允许重试的话,会回调回退策略
if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
// 这一步会根据策略进行等待/立即执行
backOffPolicy.backOff(backOffContext);
} catch (BackOffInterruptedException ex) {
lastException = e;
if (this.logger.isDebugEnabled()) {
this.logger.debug("Abort retry because interrupted: count=" + context.getRetryCount());
}
throw ex;
}
}
if (this.logger.isDebugEnabled()) {
this.logger.debug("Checking for rethrow: count=" + context.getRetryCount());
}
// 如果指定了 org.springframework.retry.RetryState,会判断是否针对该异常进行抛出,即进行重试阻断
if (shouldRethrow(retryPolicy, context, state)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount());
}
throw RetryTemplate.<E>wrapIfNecessary(e);
}
}
if (state != null && context.hasAttribute(GLOBAL_STATE)) {
break;
}
}
if (state == null && this.logger.isDebugEnabled()) {
this.logger.debug("Retry failed last attempt: count=" + context.getRetryCount());
}
exhausted = true;
// 回调兜底/补偿 org.springframework.retry.RecoveryCallback.recover
return handleRetryExhausted(recoveryCallback, context, state);
} catch (Throwable e) {
throw org.springframework.retry.support.RetryTemplate.<E>wrapIfNecessary(e);
} finally {
// 回调 org.springframework.retry.RetryPolicy.close
close(retryPolicy, context, state, lastException == null || exhausted);
// 回调 org.springframework.retry.RetryListener.close
doCloseInterceptors(retryCallback, context, lastException);
// 清除 ThreadLoacal中存储的 org.springframework.retry.RetryContext
RetrySynchronizationManager.clear();
}
}
}
整体模板方法不是很复杂,上面注释也标明具体回调哪些方法。大体逻辑就是:
初始化上下文并绑定 → 回调监听器open判断是否可以重试 → 回调重试策略start方法 → 通过重试策略判断是否可以重试 → 可以的话执行业务代码块 → 下面两个分支 成功/异常
成功:回调重试策略的close → 回调监听器的close → 清除上下文 链路 ①
异常:回调重试策略的registerThrowable → 回调监听器的onError → 回调回退策略的backoff → 如果设置了 Retrystate,判断是否需要抛异常阻断重试 → 兜底/补偿逻辑 → 链路 ①
大体流程了解后,先来看下上下文接口的定义:
public interface RetryContext extends AttributeAccessor {
String NAME = "context.name"; // 上下文的自定义名称
String STATE_KEY = "context.state"; // RetryState定义的key
String CLOSED = "context.closed"; // 标识重试是否close
String RECOVERED = "context.recovered"; // 标识是否执行了兜底/补偿
String EXHAUSTED = "context.exhausted"; // 标识重试最大次数仍失败
void setExhaustedOnly();
boolean isExhaustedOnly();
RetryContext getParent();
int getRetryCount();
Throwable getLastThrowable();
}
public interface AttributeAccessor {
void setAttribute(String name, @Nullable Object value);
@Nullable
Object getAttribute(String name);
@Nullable
Object removeAttribute(String name);
boolean hasAttribute(String name);
String[] attributeNames();
}
加上继承类 *AttributeAccessor* 支持的key-value存储,作为上下文,贯穿重试框架的一次调用。框架定义的key常量接口内定义了部分,还有一些分散在实现类以及策略类中。如果业务所需,也可以放置数据到上下文,在一次重试策略调用周期内共享。
那就来看下上下文的初始化逻辑:
public class RetryTemplate implements RetryOperations {
protected RetryContext open(RetryPolicy retryPolicy, RetryState state) {
// 如果是无状态的,每次都调用 org.springframework.retry.RetryPolicy.open创建
if (state == null) {
return doOpenInternal(retryPolicy);
}
Object key = state.getKey();
// 如果有状态,但设置了强制刷新,同样 org.springframework.retry.RetryPolicy.open创建,并写入 state.key
if (state.isForceRefresh()) {
return doOpenInternal(retryPolicy, state);
}
// 尝试从缓存获取,不存在走新建
if (!this.retryContextCache.containsKey(key)) {
// The cache is only used if there is a failure.
return doOpenInternal(retryPolicy, state);
}
// 缓存获取
RetryContext context = this.retryContextCache.get(key);
if (context == null) {
// 异常场景
if (this.retryContextCache.containsKey(key)) {
throw new RetryException("Inconsistent state for failed item: no history found. "
+ "Consider whether equals() or hashCode() for the item might be inconsistent, "
+ "or if you need to supply a better ItemKeyGenerator");
}
// 因为整体没有锁,所以有这一步作为补偿(没仔细琢磨,会不会有并发逻辑)
return doOpenInternal(retryPolicy, state);
}
// 因为是缓存,所以同一个 state.key 会共享这个上下文,清除会影响其他正在校验这几个key的地方
context.removeAttribute(RetryContext.CLOSED);
context.removeAttribute(RetryContext.EXHAUSTED);
context.removeAttribute(RetryContext.RECOVERED);
return context;
}
}
从这里可以看出,如果个别场景想要在多次调用间共享 *RetryContext* ,就需要定义 state.key,且设置 forceRefresh=false。每次重试独占上下文的话,要么就使用无状态的重试,要么就设置 forceRefresh=true。
到此为止,基本线索都集中到 *RetryPolicy* 的实现上了,基本都是回调 *RetryPolicy* 定义的方法。话不多说,来看两个重试策略的实现。
public class SimpleRetryPolicy implements RetryPolicy {
private volatile int maxAttempts;
private BinaryExceptionClassifier retryableClassifier = new BinaryExceptionClassifier(false);
public SimpleRetryPolicy(int maxAttempts, BinaryExceptionClassifier classifier) {
super();
this.maxAttempts = maxAttempts;
this.retryableClassifier = classifier;
}
@Override
public boolean canRetry(RetryContext context) {
// 获取最新一次重试的异常
Throwable t = context.getLastThrowable();
// 允许重试的异常 并且 次数<最大重试次数
return (t == null || retryForException(t)) && context.getRetryCount() < this.maxAttempts;
}
@Override
public void close(RetryContext status) {
}
@Override
public void registerThrowable(RetryContext context, Throwable throwable) {
SimpleRetryContext simpleContext = ((SimpleRetryContext) context);
// 记录最近一次异常,并自增重试次数
simpleContext.registerThrowable(throwable);
}
@Override
public RetryContext open(RetryContext parent) {
return new SimpleRetryContext(parent);
}
private static class SimpleRetryContext extends RetryContextSupport {
public SimpleRetryContext(RetryContext parent) {
super(parent);
}
}
private boolean retryForException(Throwable ex) {
return this.retryableClassifier.classify(ex);
}
}
public class CircuitBreakerRetryPolicy implements RetryPolicy {
public static final String CIRCUIT_OPEN = "circuit.open";
public static final String CIRCUIT_SHORT_COUNT = "circuit.shortCount";
private static Log logger = LogFactory.getLog(CircuitBreakerRetryPolicy.class);
private final RetryPolicy delegate;
private long resetTimeout = 20000;
private long openTimeout = 5000;
public CircuitBreakerRetryPolicy(RetryPolicy delegate) {
this.delegate = delegate;
}
public void setResetTimeout(long timeout) {
this.resetTimeout = timeout;
}
public void setOpenTimeout(long timeout) {
this.openTimeout = timeout;
}
@Override
public boolean canRetry(RetryContext context) {
CircuitBreakerRetryContext circuit = (CircuitBreakerRetryContext) context;
// 判断断路器开关是否打开
if (circuit.isOpen()) {
// 打开则不允许重试
circuit.incrementShortCircuitCount();
return false;
} else {
circuit.reset();
}
// 断路器开关闭合 or 半打开,是否允许重试交给代理实现
return this.delegate.canRetry(circuit.context);
}
@Override
public RetryContext open(RetryContext parent) {
return new CircuitBreakerRetryContext(parent, this.delegate, this.resetTimeout, this.openTimeout);
}
@Override
public void close(RetryContext context) {
CircuitBreakerRetryContext circuit = (CircuitBreakerRetryContext) context;
// 代理
this.delegate.close(circuit.context);
}
@Override
public void registerThrowable(RetryContext context, Throwable throwable) {
CircuitBreakerRetryContext circuit = (CircuitBreakerRetryContext) context;
circuit.registerThrowable(throwable);
// 代理
this.delegate.registerThrowable(circuit.context, throwable);
}
static class CircuitBreakerRetryContext extends RetryContextSupport {
private volatile RetryContext context;
private final RetryPolicy policy;
private volatile long start = System.currentTimeMillis();
private final long timeout;
private final long openWindow;
private final AtomicInteger shortCircuitCount = new AtomicInteger();
public CircuitBreakerRetryContext(RetryContext parent, RetryPolicy policy, long timeout, long openWindow) {
super(parent);
this.policy = policy;
this.timeout = timeout;
this.openWindow = openWindow;
this.context = createDelegateContext(policy, parent);
setAttribute("state.global", true);
}
public void reset() {
shortCircuitCount.set(0);
setAttribute(CIRCUIT_SHORT_COUNT, shortCircuitCount.get());
}
public void incrementShortCircuitCount() {
shortCircuitCount.incrementAndGet();
setAttribute(CIRCUIT_SHORT_COUNT, shortCircuitCount.get());
}
private RetryContext createDelegateContext(RetryPolicy policy, RetryContext parent) {
RetryContext context = policy.open(parent);
reset();
return context;
}
/* 判断断路器开关是否打开 **/
public boolean isOpen() {
// 计算时间间隔
long time = System.currentTimeMillis() - this.start;
// 判断是否允许重试
boolean retryable = this.policy.canRetry(this.context);
if (!retryable) {
// 闭合重置开关超时时间
if (time > this.timeout) {
logger.trace("Closing");
// 重建上下文
this.context = createDelegateContext(policy, getParent());
this.start = System.currentTimeMillis();
// 判断是否允许重试
retryable = this.policy.canRetry(this.context);
}
// [0,openTimeout)
else if (time < this.openWindow) {
// 指定开关打开
if ((Boolean) getAttribute(CIRCUIT_OPEN) == false) {
logger.trace("Opening circuit");
setAttribute(CIRCUIT_OPEN, true);
}
this.start = System.currentTimeMillis();
return true;
}
}
// 允许重试
else {
// (openWindow,resetTimeout]
if (time > this.openWindow) {
logger.trace("Resetting context");
// 重建上下文
this.start = System.currentTimeMillis();
this.context = createDelegateContext(policy, getParent());
}
}
if (logger.isTraceEnabled()) {
logger.trace("Open: " + !retryable);
}
// 设置开关打开的标志位,不能重试=开关打开
setAttribute(CIRCUIT_OPEN, !retryable);
return !retryable;
}
@Override
public int getRetryCount() {
return this.context.getRetryCount();
}
@Override
public String toString() {
return this.context.toString();
}
}
}
除了断路器策略( *CircuitBreakerRetryPolicy* )稍微复杂点,其他基本都像 *SimpleRetryPolicy* 一样,逻辑简单。
首先 *canRetry* 在模板方法内是判断是否要重试的实现。 *SimpleRetryPolicy* 实现就是判断了次数和异常,是否满足要求。 *CircuitBreakerRetryPolicy* ,主要逻辑集中在 *org.springframework.retry.policy.CircuitBreakerRetryPolicy.CircuitBreakerRetryContext#isOpen* ,开关打开不允许重试,否则根据代理类判断是否允许重试。
重试策略看完,再来看回退策略代码就更简单了,看一个固定时长的回退策略实现:
public class FixedBackOffPolicy extends StatelessBackOffPolicy implements SleepingBackOffPolicy<FixedBackOffPolicy> {
// 默认使用 Thread.sleep等待
private Sleeper sleeper = new ThreadWaitSleeper();
// 可以指定 org.springframework.retry.backoff.ObjectWaitSleeper
public void setSleeper(Sleeper sleeper) {
this.sleeper = sleeper;
}
protected void doBackOff() throws BackOffInterruptedException {
try {
sleeper.sleep(backOffPeriod);
}
catch (InterruptedException e) {
throw new BackOffInterruptedException("Thread interrupted while sleeping", e);
}
}
}
默认的就是 *Thread.sleep* 一段时间。
至此,编程式的源码简单的流程已经了解了。接下来分析下注解的实现原理,可预见的,注解也借助了编程式这套模板。
大体逻辑主要分为应用启动时,对全局内 @Retryable方法的收集,以及运行期的动态代理。
@Configuration
public class RetryConfiguration extends AbstractPointcutAdvisor implements IntroductionAdvisor, BeanFactoryAware {
@PostConstruct
public void init() {
Set<Class<? extends Annotation>> retryableAnnotationTypes = new LinkedHashSet<Class<? extends Annotation>>(1);
retryableAnnotationTypes.add(Retryable.class);
// 切点定义,也就是所有被 @Retryable标识的方法
this.pointcut = buildPointcut(retryableAnnotationTypes);
// 切面构建
this.advice = buildAdvice();
if (this.advice instanceof BeanFactoryAware) {
((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
}
}
protected Advice buildAdvice() {
// 看这个实现
AnnotationAwareRetryOperationsInterceptor interceptor = new AnnotationAwareRetryOperationsInterceptor();
if (retryContextCache != null) {
interceptor.setRetryContextCache(retryContextCache);
}
if (retryListeners != null) {
interceptor.setListeners(retryListeners);
}
if (methodArgumentsKeyGenerator != null) {
interceptor.setKeyGenerator(methodArgumentsKeyGenerator);
}
if (newMethodArgumentsIdentifier != null) {
interceptor.setNewItemIdentifier(newMethodArgumentsIdentifier);
}
if (sleeper != null) {
interceptor.setSleeper(sleeper);
}
return interceptor;
}
}
上面配置类,init 生命中期内定义了切点和切面,切点很简单,就是所有 *@Retryable* 标识的方法;切面的话需要移步 *AnnotationAwareRetryOperationsInterceptor* 。
public class AnnotationAwareRetryOperationsInterceptor implements IntroductionInterceptor, BeanFactoryAware {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
// 获取切面代理
MethodInterceptor delegate = getDelegate(invocation.getThis(), invocation.getMethod());
if (delegate != null) {
// 执行代理方法
return delegate.invoke(invocation);
}
else {
return invocation.proceed();
}
}
}
这里切面的构建并不是我们关心的,所以 getDelegate 感兴趣的可以自行研究,里面加了一道缓存,保证代理对象只会创建一次。
继续跟源码,可以找到代理切面实现类根据有无状态有两种(根据 *@Retryable#stateful()* 设置):
- org.springframework.retry.interceptor.RetryOperationsInterceptor
- org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor (@CircuitBreaker 强制是这种 )
public class RetryOperationsInterceptor implements MethodInterceptor {
private RetryOperations retryOperations = new RetryTemplate();
public Object invoke(final MethodInvocation invocation) throws Throwable {
String name;
if (StringUtils.hasText(label)) {
name = label;
} else {
name = invocation.getMethod().toGenericString();
}
final String label = name;
// 构建 org.springframework.retry.RetryCallback
RetryCallback<Object, Throwable> retryCallback = new MethodInvocationRetryCallback<Object, Throwable>(
invocation, label) {
@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute(RetryContext.NAME, label);
if (invocation instanceof ProxyMethodInvocation) {
try {
// 回调业务代码块
return ((ProxyMethodInvocation) invocation).invocableClone().proceed();
} catch (Exception e) {
throw e;
} catch (Error e) {
throw e;
} catch (Throwable e) {
throw new IllegalStateException(e);
}
} else {
throw new IllegalStateException(
"MethodInvocation of the wrong type detected - this should not happen with Spring AOP, "
+ "so please raise an issue if you see this exception");
}
}
};
// 根据是否有 @Recover 兜底/补偿方法,调用模板的不同方法
if (recoverer != null) {
RetryOperationsInterceptor.ItemRecovererCallback recoveryCallback = new RetryOperationsInterceptor.ItemRecovererCallback(invocation.getArguments(), recoverer);
return this.retryOperations.execute(retryCallback, recoveryCallback);
}
return this.retryOperations.execute(retryCallback);
}
}
public class StatefulRetryOperationsInterceptor implements MethodInterceptor {
private RetryOperations retryOperations;
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Executing proxied method in stateful retry: " + invocation.getStaticPart() + "("
+ ObjectUtils.getIdentityHexString(invocation) + ")");
}
Object[] args = invocation.getArguments();
Object defaultKey = Arrays.asList(args);
if (args.length == 1) {
defaultKey = args[0];
}
Object key = createKey(invocation, defaultKey);
RetryState retryState = new DefaultRetryState(key,
this.newMethodArgumentsIdentifier != null && this.newMethodArgumentsIdentifier.isNew(args),
this.rollbackClassifier);
Object result = this.retryOperations.execute(new StatefulRetryOperationsInterceptor.StatefulMethodInvocationRetryCallback(invocation, label),
this.recoverer != null ? new StatefulRetryOperationsInterceptor.ItemRecovererCallback(args, this.recoverer) : null, retryState);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Exiting proxied method in stateful retry with result: (" + result + ")");
}
return result;
}
}
最终 *invoke* 的实现里面,可以看到调用模板方法。利用切面,省得每个方法都写一遍 *execute* 方法。
总结
总之,重试框架并不复杂,已经有现成的工具,就不要重复造轮子。本文也是通过阅读源码后写出来的,如果有理解不正确的地方,欢迎各位指正。
原文:https://my.oschina.net/marvelcode/blog/4563352
作者: MarvelCode