前提
最近的新项目和数据同步相关,有定时调度的需求。之前一直有使用过Quartz
、XXL-Job
、Easy Scheduler
等调度框架,后来越发觉得这些框架太重量级了,于是想到了Spring
内置的Scheduling
模块。而原生的Scheduling
模块只是内存态的调度模块,不支持任务的持久化或者配置(配置任务通过@Scheduled
注解进行硬编码,不能抽离到类之外),因此考虑理解Scheduling
模块的底层原理,并且基于此造一个简单的轮子,使之支持调度任务配置:通过配置文件或者JDBC数据源。
Scheduling模块
Scheduling
模块是spring-context
依赖下的一个包org.springframework.scheduling
:
这个模块的类并不多,有四个子包:
- 顶层包的定义了一些通用接口和异常。
- org.springframework.scheduling.annotation:定义了调度、异步任务相关的注解和解析类,常用的注解如@Async、@EnableAsync、@EnableScheduling和@Scheduled。
- org.springframework.scheduling.concurrent:定义了调度任务执行器和相对应的FactoryBean。
- org.springframework.scheduling.config:定义了配置解析、任务具体实现类、调度任务XML配置文件解析相关的解析类。
- org.springframework.scheduling.support:定义了反射支持类、Cron表达式解析器等工具类。
如果想单独使用Scheduling,只需要引入spring-context这个依赖。但是现在流行使用SpringBoot,引入spring-boot-starter-web已经集成了spring-context,可以直接使用Scheduling模块,笔者编写本文的时候(2020-03-14)SpringBoot的最新版本为2.2.5.RELEASE,可以选用此版本进行源码分析或者生产应用:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spring.boot.version>2.2.5.RELEASE</spring.boot.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
开启Scheduling模块支持只需要在某一个配置类中添加@EnableScheduling注解即可,一般为了明确模块的引入,建议在启动类中使用此注解,如:
@EnableScheduling
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
Scheduling模块的工作流程
这个图描述了Scheduling模块的工作流程,这里分析一下非XML配置下的流程(右边的分支):
- 通过注解@EnableScheduling中的@Import引入了SchedulingConfiguration,而SchedulingConfiguration中配置了一个类型为ScheduledAnnotationBeanPostProcessor名称为org.springframework.context.annotation.internalScheduledAnnotationProcessor的Bean,这里有个常见的技巧,Spring内部加载的Bean一般会定义名称为internalXXX,Bean的role会定义为ROLE_INFRASTRUCTURE = 2。
- Bean后置处理器ScheduledAnnotationBeanPostProcessor会解析和处理每一个符合特定类型的Bean中的@Scheduled注解(注意@Scheduled只能使用在方法或者注解上),并且把解析完成的方法封装为不同类型的Task实例,缓存在ScheduledTaskRegistrar中的。
- ScheduledAnnotationBeanPostProcessor中的钩子接口方法afterSingletonsInstantiated()在所有单例初始化完成之后回调触发,在此方法中设置了ScheduledTaskRegistrar中的任务调度器(TaskScheduler或者ScheduledExecutorService类型)实例,并且调用ScheduledTaskRegistrar#afterPropertiesSet()方法添加所有缓存的Task实例到任务调度器中执行。
任务调度器
Scheduling模块支持TaskScheduler或者ScheduledExecutorService类型的任务调度器,而ScheduledExecutorService其实是JDK并发包java.util.concurrent的接口,一般实现类就是调度线程池ScheduledThreadPoolExecutor。实际上,ScheduledExecutorService类型的实例最终会通过适配器模式转变为ConcurrentTaskScheduler,所以这里只需要分析TaskScheduler类型的执行器。
- ThreadPoolTaskScheduler:基于线程池实现的任务执行器,这个是最常用的实现,底层依赖于ScheduledThreadPoolExecutor实现。
- ConcurrentTaskScheduler:TaskScheduler接口和ScheduledExecutorService接口的适配器,如果自定义一个ScheduledThreadPoolExecutor类型的Bean,那么任务执行器就会适配为ConcurrentTaskScheduler。
- DefaultManagedTaskScheduler:JDK7引入的JSR-236的支持,可以通过JNDI配置此调度执行器,一般很少用到,底层也是依赖于ScheduledThreadPoolExecutor实现。
也就是说,内置的三个调度器类型底层都依赖于JUC调度线程池ScheduledThreadPoolExecutor。这里分析一下顶层接口org.springframework.scheduling.TaskScheduler提供的功能(笔者已经把功能一致的default方法暂时移除):
// 省略一些功能一致的default方法
public interface TaskScheduler {
// 调度一个任务,通过触发器实例指定触发时间周期
ScheduledFuture<?> schedule(Runnable task, Trigger trigger);
// 指定起始时间调度一个任务 - 单次执行
ScheduledFuture<?> schedule(Runnable task, Date startTime);
// 指定固定频率调度一个任务,period的单位是毫秒
ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period);
// 指定起始时间和固定频率调度一个任务,period的单位是毫秒
ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);
// 指定固定延迟间隔调度一个任务,delay的单位是毫秒
ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay);
// 指定起始时间和固定延迟间隔调度一个任务,delay的单位是毫秒
ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
}
Task的分类
Scheduling模块中支持不同类型的任务,主要包括下面的3种(解析的优先顺序也是如下):
- Cron表达式任务,支持通过Cron表达式配置执行的周期,对应的任务类型为org.springframework.scheduling.config.CronTask。
- 固定延迟间隔任务,也就是上一轮执行完毕后间隔固定周期再执行本轮,依次类推,对应的的任务类型为org.springframework.scheduling.config.FixedDelayTask。
- 固定频率任务,基于固定的间隔时间执行,不会理会上一轮是否执行完毕本轮会照样执行,对应的的任务类型为org.springframework.scheduling.config.FixedRateTask。
关于这几类Task,举几个简单的例子。CronTask是通过cron表达式指定执行周期的,并且不支持延迟执行,可以使用特殊字符-禁用任务执行:
// 注解声明式使用 - 每五秒执行一次,不支持initialDelay
@Scheduled(cron = "*/5 * * * * ?")
public void processTask(){
}
// 注解声明式使用 - 禁止任务执行
@Scheduled(cron = "-")
public void processTask(){
}
// 编程式使用
public class Tasks {
static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) throws Exception {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.initialize();
CronTask cronTask = new CronTask(() -> {
System.out.println(String.format("[%s] - CronTask触发...", F.format(LocalDateTime.now())));
}, "*/5 * * * * ?");
taskScheduler.schedule(cronTask.getRunnable(),cronTask.getTrigger());
Thread.sleep(Integer.MAX_VALUE);
}
}
// 某次执行输出结果
[2020-03-16 01:07:00] - CronTask触发...
[2020-03-16 01:07:05] - CronTask触发...
......
FixedDelayTask需要配置延迟间隔值(fixedDelay或者fixedDelayString)和可选的起始延迟执行时间(initialDelay或者initialDelayString),这里注意一点是fixedDelayString和initialDelayString都支持从EmbeddedValueResolver(简单理解为配置文件的属性处理器)读取和Duration(例如P2D就是parses as 2 days,表示86400秒)支持格式的解析:
// 注解声明式使用 - 延迟一秒开始执行,延迟间隔为5秒
@Scheduled(fixedDelay = 5000, initialDelay = 1000)
public void process(){
}
// 注解声明式使用 - spring-boot配置文件中process.task.fixedDelay=5000 process.task.initialDelay=1000
@Scheduled(fixedDelayString = "${process.task.fixedDelay}", initialDelayString = "${process.task.initialDelay}")
public void process(){
}
// 编程式使用
public class Tasks {
static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) throws Exception {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.initialize();
FixedDelayTask fixedDelayTask = new FixedDelayTask(() -> {
System.out.println(String.format("[%s] - FixedDelayTask触发...", F.format(LocalDateTime.now())));
}, 5000, 1000);
Date startTime = new Date(System.currentTimeMillis() + fixedDelayTask.getInitialDelay());
taskScheduler.scheduleWithFixedDelay(fixedDelayTask.getRunnable(), startTime, fixedDelayTask.getInterval());
Thread.sleep(Integer.MAX_VALUE);
}
}
// 某次执行输出结果
[2020-03-16 01:06:12] - FixedDelayTask触发...
[2020-03-16 01:06:17] - FixedDelayTask触发...
......
FixedRateTask需要配置固定间隔值(fixedRate或者fixedRateString)和可选的起始延迟执行时间(initialDelay或者initialDelayString),这里注意一点是fixedRateString和initialDelayString都支持从EmbeddedValueResolver(简单理解为配置文件的属性处理器)读取和Duration(例如P2D就是parses as 2 days,表示86400秒)支持格式的解析:
// 注解声明式使用 - 延迟一秒开始执行,每隔5秒执行一次
@Scheduled(fixedRate = 5000, initialDelay = 1000)
public void processTask(){
}
// 注解声明式使用 - spring-boot配置文件中process.task.fixedRate=5000 process.task.initialDelay=1000
@Scheduled(fixedRateString = "${process.task.fixedRate}", initialDelayString = "${process.task.initialDelay}")
public void process(){
}
// 编程式使用
public class Tasks {
static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) throws Exception {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.initialize();
FixedRateTask fixedRateTask = new FixedRateTask(() -> {
System.out.println(String.format("[%s] - FixedRateTask触发...", F.format(LocalDateTime.now())));
}, 5000, 1000);
Date startTime = new Date(System.currentTimeMillis() + fixedRateTask.getInitialDelay());
taskScheduler.scheduleAtFixedRate(fixedRateTask.getRunnable(), startTime, fixedRateTask.getInterval());
Thread.sleep(Integer.MAX_VALUE);
}
}
// 某次执行输出结果
[2020-03-16 23:58:25] - FixedRateTask触发...
[2020-03-16 23:58:30] - FixedRateTask触发...
......
简单分析核心流程的源代码
在SpringBoot注解体系下,Scheduling模块的所有逻辑基本在ScheduledAnnotationBeanPostProcessor和ScheduledTaskRegistrar中。一般来说,一个类实现的接口代表了它能提供的功能,先看ScheduledAnnotationBeanPostProcessor实现的接口:
- ScheduledTaskHolder接口:返回Set,表示持有的所有任务实例。
- MergedBeanDefinitionPostProcessor接口:Bean定义合并时回调,预留空实现,暂时不做任何处理。
- BeanPostProcessor接口:也就是MergedBeanDefinitionPostProcessor的父接口,Bean实例初始化前后分别回调,其中,后回调的postProcessAfterInitialization()方法就是用于解析@Scheduled和装载ScheduledTask,需要重点关注此方法的逻辑。
- DestructionAwareBeanPostProcessor接口:具体的Bean实例销毁的时候回调,用于Bean实例销毁的时候移除和取消对应的任务实例。
- Ordered接口:用于Bean加载时候的排序,主要是改变ScheduledAnnotationBeanPostProcessor在BeanPostProcessor执行链中的顺序。
- EmbeddedValueResolverAware接口:回调StringValueResolver实例,用于解析带占位符的环境变量属性值。
- BeanNameAware接口:回调BeanName。
- BeanFactoryAware接口:回调BeanFactory实例,具体是DefaultListableBeanFactory,也就是熟知的IOC容器。
- ApplicationContextAware接口:回调ApplicationContext实例,也就是熟知的Spring上下文,它是IOC容器的门面,同时是事件广播器、资源加载器的实现等等。
- SmartInitializingSingleton接口:所有单例实例化完毕之后回调,作用是在持有的applicationContext为NULL的时候开始调度所有加载完成的任务,这个钩子接口十分有用,笔者常用它做一些资源初始化工作。
- ApplicationListener接口:监听Spring应用的事件,具体是ApplicationListener,监听上下文刷新的事件,如果事件中携带的ApplicationContext实例和ApplicationContextAware回调的ApplicationContext实例一致,那么在此监听回调方法中开始调度所有加载完成的任务,也就是在ScheduledAnnotationBeanPostProcessor这个类中,SmartInitializingSingleton接口的实现和ApplicationListener接口的实现逻辑是互斥的。
- DisposableBean接口:当前Bean实例销毁时候回调,也就是ScheduledAnnotationBeanPostProcessor自身被销毁的时候回调,用于取消和清理所有的ScheduledTask。
上面分析的钩子接口在SpringBoot体系中可以按需使用,了解回调不同钩子接口的回调时机,可以在特定时机完成达到理想的效果。
@Scheduled
注解的解析集中在postProcessAfterInitialization()
方法:
public Object postProcessAfterInitialization(Object bean, String beanName) {
// 忽略AopInfrastructureBean、TaskScheduler和ScheduledExecutorService三种类型的Bean
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
// 获取Bean的用户态类型,例如Bean有可能被CGLIB增强,这个时候要取其父类
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
// nonAnnotatedClasses存放着不存在@Scheduled注解的类型,缓存起来避免重复判断它是否携带@Scheduled注解的方法
if (!this.nonAnnotatedClasses.contains(targetClass) &&
AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
// 因为JDK8之后支持重复注解,因此获取具体类型中Method -> @Scheduled的集合,也就是有可能一个方法使用多个@Scheduled注解,最终会封装为多个Task
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
// 解析到类型中不存在@Scheduled注解的方法添加到nonAnnotatedClasses缓存
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
}
}
else {
// Method -> @Scheduled的集合遍历processScheduled()方法进行登记
annotatedMethods.forEach((method, scheduledMethods) ->
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
processScheduled(Scheduled scheduled, Method method, Object bean)就是具体的注解解析和Task封装的方法:
// Runnable适配器 - 用于反射调用具体的方法,触发任务方法执行
public class ScheduledMethodRunnable implements Runnable {
private final Object target;
private final Method method;
public ScheduledMethodRunnable(Object target, Method method) {
this.target = target;
this.method = method;
}
....// 省略无关代码
// 这个就是最终的任务方法执行的核心方法,抑制修饰符,然后反射调用
@Override
public void run() {
try {
ReflectionUtils.makeAccessible(this.method);
this.method.invoke(this.target);
}
catch (InvocationTargetException ex) {
ReflectionUtils.rethrowRuntimeException(ex.getTargetException());
}
catch (IllegalAccessException ex) {
throw new UndeclaredThrowableException(ex);
}
}
}
// 通过方法所在Bean实例和方法封装Runnable适配器ScheduledMethodRunnable实例
protected Runnable createRunnable(Object target, Method method) {
Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
return new ScheduledMethodRunnable(target, invocableMethod);
}
// 这个方法十分长,不过逻辑并不复杂,它只做了四件事
// 0. 解析@Scheduled中的initialDelay、initialDelayString属性,适用于FixedDelayTask或者FixedRateTask的延迟执行
// 1. 优先解析@Scheduled中的cron属性,封装为CronTask,通过ScheduledTaskRegistrar进行缓存
// 2. 解析@Scheduled中的fixedDelay、fixedDelayString属性,封装为FixedDelayTask,通过ScheduledTaskRegistrar进行缓存
// 3. 解析@Scheduled中的fixedRate、fixedRateString属性,封装为FixedRateTask,通过ScheduledTaskRegistrar进行缓存
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
// 通过方法宿主Bean和目标方法封装Runnable适配器ScheduledMethodRunnable实例
Runnable runnable = createRunnable(bean, method);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
// 缓存已经装载的任务
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
// Determine initial delay
// 解析初始化延迟执行时间,initialDelayString支持占位符配置,如果initialDelayString配置了,会覆盖initialDelay的值
long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
if (this.embeddedValueResolver != null) {
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {
initialDelay = parseDelayAsLong(initialDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
}
}
}
// Check cron expression
// 解析时区zone的值,支持支持占位符配置,判断cron是否存在,存在则装载为CronTask
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
if (!Scheduled.CRON_DISABLED.equals(cron)) {
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
timeZone = TimeZone.getDefault();
}
// 此方法虽然表面上是调度CronTask,实际上由于ScheduledTaskRegistrar不持有TaskScheduler,只是把任务添加到它的缓存中
// 返回的任务实例添加到宿主Bean的缓存中,然后最后会放入宿主Bean -> List<ScheduledTask>映射中
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
}
// At this point we don't need to differentiate between initial delay set or not anymore
// 修正小于0的初始化延迟执行时间值为0
if (initialDelay < 0) {
initialDelay = 0;
}
// 解析fixedDelay和fixedDelayString,如果同时配置,fixedDelayString最终解析出来的整数值会覆盖fixedDelay,封装为FixedDelayTask
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedDelay = parseDelayAsLong(fixedDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
}
// 此方法虽然表面上是调度FixedDelayTask,实际上由于ScheduledTaskRegistrar不持有TaskScheduler,只是把任务添加到它的缓存中
// 返回的任务实例添加到宿主Bean的缓存中,然后最后会放入宿主Bean -> List<ScheduledTask>映射中
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}
// 解析fixedRate和fixedRateString,如果同时配置,fixedRateString最终解析出来的整数值会覆盖fixedRate,封装为FixedRateTask
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedRate = parseDelayAsLong(fixedRateString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
}
// 此方法虽然表面上是调度FixedRateTask,实际上由于ScheduledTaskRegistrar不持有TaskScheduler,只是把任务添加到它的缓存中
// 返回的任务实例添加到宿主Bean的缓存中,然后最后会放入宿主Bean -> List<ScheduledTask>映射中
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}
// Check whether we had any attribute set
Assert.isTrue(processedSchedule, errorMessage);
// Finally register the scheduled tasks
synchronized (this.scheduledTasks) {
// 注册所有任务实例,这个映射Key为宿主Bean实例,Value为List<ScheduledTask>,后面用于调度所有注册完成的任务
Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException(
"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
}
总的来说,这个方法做了四件事:
- 解析@Scheduled中的initialDelay、initialDelayString属性,适用于FixedDelayTask或者FixedRateTask的延迟执行。
- 优先解析@Scheduled中的cron属性,封装为CronTask,通过ScheduledTaskRegistrar进行缓存。
- 解析@Scheduled中的fixedDelay、fixedDelayString属性,封装为FixedDelayTask,通过ScheduledTaskRegistrar进行缓存。
- 解析@Scheduled中的fixedRate、fixedRateString属性,封装为FixedRateTask,通过ScheduledTaskRegistrar进行缓存。
@Scheduled
修饰的某个方法如果同时配置了cron、fixedDelay|fixedDelayString和fixedRate|fixedRateString属性,意味着此方法同时封装为三种任务CronTask、FixedDelayTask和FixedRateTask。解析xxString值的使用,用到了EmbeddedValueResolver解析字符串的值,支持占位符,这样可以直接获取环境配置中的占位符属性(基于SPEL的特性,甚至可以支持嵌套占位符)。解析成功的所有任务实例存放在ScheduledAnnotationBeanPostProcessor的一个映射scheduledTasks中:
// 宿主Bean实例 -> 解析完成的任务实例Set
private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);
解析和缓存工作完成之后,接着分析最终激活所有调度任务的逻辑,见互斥方法afterSingletonsInstantiated()和onApplicationEvent(),两者中一定只有一个方法能够调用finishRegistration():
// 所有单例实例化完毕之后回调
public void afterSingletonsInstantiated() {
// Remove resolved singleton classes from cache
this.nonAnnotatedClasses.clear();
if (this.applicationContext == null) {
// Not running in an ApplicationContext -> register tasks early...
finishRegistration();
}
}
// 上下文刷新完成之后回调
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext() == this.applicationContext) {
// Running in an ApplicationContext -> register tasks this late...
// giving other ContextRefreshedEvent listeners a chance to perform
// their work at the same time (e.g. Spring Batch's job registration).
finishRegistration();
}
}
//
private void finishRegistration() {
// 如果持有的scheduler对象不为null则设置ScheduledTaskRegistrar中的任务调度器
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}
// 这个判断一般会成立,得到的BeanFactory就是DefaultListableBeanFactory
if (this.beanFactory instanceof ListableBeanFactory) {
// 获取所有的调度配置器SchedulingConfigurer实例,并且都回调configureTasks()方法,这个很重要,它是用户动态装载调取任务的扩展钩子接口
Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
// SchedulingConfigurer实例列表排序
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
configurer.configureTasks(this.registrar);
}
}
// 下面这一大段逻辑都是为了从BeanFactory取出任务调度器实例,主要判断TaskScheduler或者ScheduledExecutorService类型的Bean,包括尝试通过类型或者名字获取
// 获取成功后设置到ScheduledTaskRegistrar中
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
try {
// Search for TaskScheduler bean...
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
}
catch (NoUniqueBeanDefinitionException ex) {
logger.trace("Could not find unique TaskScheduler bean", ex);
try {
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskScheduler bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
logger.trace("Could not find default TaskScheduler bean", ex);
// Search for ScheduledExecutorService bean next...
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
}
catch (NoUniqueBeanDefinitionException ex2) {
logger.trace("Could not find unique ScheduledExecutorService bean", ex2);
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
}
catch (NoSuchBeanDefinitionException ex3) {
if (logger.isInfoEnabled()) {
logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex2.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex2) {
logger.trace("Could not find default ScheduledExecutorService bean", ex2);
// Giving up -> falling back to default scheduler within the registrar...
logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
}
}
}
// 调用ScheduledTaskRegistrar的afterPropertiesSet()方法,装载所有的调度任务
this.registrar.afterPropertiesSet();
}
public class ScheduledTaskRegistrar implements ScheduledTaskHolder, InitializingBean, DisposableBean {
// 省略其他代码.........
@Override
public void afterPropertiesSet() {
scheduleTasks();
}
// 装载所有调度任务
@SuppressWarnings("deprecation")
protected void scheduleTasks() {
// 这里注意一点,如果找不到任务调度器实例,那么会用单个线程调度所有任务
if (this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
// 调度所有装载完毕的自定义触发器的任务实例
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
// 调度所有装载完毕的CronTask
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}
// 调度所有装载完毕的FixedRateTask
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
// 调度所有装载完毕的FixedDelayTask
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}
// 省略其他代码.........
}
注意两个个问题:
- 如果没有配置TaskScheduler或者ScheduledExecutorService类型的Bean,那么调度模块只会创建一个线程去调度所有装载完毕的任务,如果任务比较多,执行密度比较大,很有可能会造成大量任务饥饿,表现为存在部分任务不会触发调度的场景(这个是调度模块生产中经常遇到的故障,需要重点排查是否没有设置TaskScheduler或者ScheduledExecutorService)。
- SchedulingConfigurer是调度模块提供给使用的进行扩展的钩子接口,用于在激活所有调度任务之前回调ScheduledTaskRegistrar实例,只要拿到ScheduledTaskRegistrar实例,我们就可以使用它注册和装载新的Task。
调度任务动态装载
Scheduling模块本身已经支持基于NamespaceHandler支持通过XML文件配置调度任务,但是笔者一直认为XML给人的感觉太”重”,使用起来显得太笨重,这里打算扩展出JSON文件配置和基于JDBC数据源配置(也就是持久化任务,这里选用MySQL)。根据前文的源码分析,需要用到SchedulingConfigurer接口的实现,用于在所有调度任务触发之前从外部添加自定义的调度任务。先定义调度任务的一些配置属性类:
// 调度任务类型枚举
@Getter
@RequiredArgsConstructor
public enum ScheduleTaskType {
CRON("CRON"),
FIXED_DELAY("FIXED_DELAY"),
FIXED_RATE("FIXED_RATE"),
;
private final String type;
}
// 调度任务配置,enable属性为全局开关
@Data
public class ScheduleTaskProperties {
private Long version;
private Boolean enable;
private List<ScheduleTasks> tasks;
}
// 调度任务集合,笔者设计的时候采用一个宿主类中每个独立方法都是一个任务实例的模式
@Data
public class ScheduleTasks {
// 这里故意叫Klass代表Class,避免关键字冲突
private String taskHostKlass;
private Boolean enable;
private List<ScheduleTaskMethod> taskMethods;
}
// 调度任务方法 - enable为任务开关,没有配置会被ScheduleTaskProperties或者ScheduleTasks中的enable覆盖
@Data
public class ScheduleTaskMethod {
private Boolean enable;
private String taskDescription;
private String taskMethod;
// 时区,cron的计算需要用到
private String timeZone;
private String cronExpression;
private String intervalMilliseconds;
private String initialDelayMilliseconds;
}
设计的时候,考虑到多个任务执行方法可以放在同一个宿主类,这样可以方便同一种类的任务进行统一管理,如:
public class TaskHostClass {
public void task1() {
}
public void task2() {
}
......
public void taskN() {
}
}
细节方面,intervalMilliseconds和initialDelayMilliseconds的单位设计为毫秒,使用字符串形式,方便可以基于StringValueResolver解析配置文件中的属性配置。添加一个抽象的SchedulingConfigurer:
@Slf4j
public abstract class AbstractSchedulingConfigurer implements SchedulingConfigurer, InitializingBean, BeanFactoryAware,
EmbeddedValueResolverAware {
@Getter
private StringValueResolver embeddedValueResolver;
private ConfigurableBeanFactory configurableBeanFactory;
private final List<InternalTaskProperties> internalTasks = Lists.newLinkedList();
private final Set<String> tasksLoaded = Sets.newHashSet();
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
configurableBeanFactory = (ConfigurableBeanFactory) beanFactory;
}
@Override
public void afterPropertiesSet() throws Exception {
internalTasks.clear();
internalTasks.addAll(loadTaskProperties());
}
@Override
public void setEmbeddedValueResolver(StringValueResolver resolver) {
embeddedValueResolver = resolver;
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
for (InternalTaskProperties task : internalTasks) {
try {
synchronized (tasksLoaded) {
String key = task.taskHostKlass() + "#" + task.taskMethod();
// 避免重复加载
if (!tasksLoaded.contains(key)) {
if (task instanceof CronTaskProperties) {
loadCronTask((CronTaskProperties) task, taskRegistrar);
}
if (task instanceof FixedDelayTaskProperties) {
loadFixedDelayTask((FixedDelayTaskProperties) task, taskRegistrar);
}
if (task instanceof FixedRateTaskProperties) {
loadFixedRateTask((FixedRateTaskProperties) task, taskRegistrar);
}
tasksLoaded.add(key);
} else {
log.info("调度任务已经装载,任务宿主类:{},任务执行方法:{}", task.taskHostKlass(), task.taskMethod());
}
}
} catch (Exception e) {
throw new IllegalStateException(String.format("加载调度任务异常,任务宿主类:%s,任务执行方法:%s",
task.taskHostKlass(), task.taskMethod()), e);
}
}
}
private ScheduledMethodRunnable loadScheduledMethodRunnable(String taskHostKlass, String taskMethod) throws Exception {
Class<?> klass = ClassUtils.forName(taskHostKlass, null);
Object target = configurableBeanFactory.getBean(klass);
Method method = ReflectionUtils.findMethod(klass, taskMethod);
if (null == method) {
throw new IllegalArgumentException(String.format("找不到目标方法,任务宿主类:%s,任务执行方法:%s", taskHostKlass, taskMethod));
}
Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
return new ScheduledMethodRunnable(target, invocableMethod);
}
private void loadCronTask(CronTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception {
ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod());
String cronExpression = embeddedValueResolver.resolveStringValue(pops.cronExpression());
if (null != cronExpression) {
String timeZoneString = embeddedValueResolver.resolveStringValue(pops.timeZone());
TimeZone timeZone;
if (null != timeZoneString) {
timeZone = TimeZone.getTimeZone(timeZoneString);
} else {
timeZone = TimeZone.getDefault();
}
CronTask cronTask = new CronTask(runnable, new CronTrigger(cronExpression, timeZone));
taskRegistrar.addCronTask(cronTask);
log.info("装载CronTask[{}#{}()]成功,cron表达式:{},任务描述:{}", cronExpression, pops.taskMethod(),
pops.cronExpression(), pops.taskDescription());
}
}
private void loadFixedDelayTask(FixedDelayTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception {
ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod());
long fixedDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds()));
long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds()));
FixedDelayTask fixedDelayTask = new FixedDelayTask(runnable, fixedDelayMilliseconds, initialDelayMilliseconds);
taskRegistrar.addFixedDelayTask(fixedDelayTask);
log.info("装载FixedDelayTask[{}#{}()]成功,固定延迟间隔:{} ms,初始延迟执行时间:{} ms,任务描述:{}", pops.taskHostKlass(),
pops.taskMethod(), fixedDelayMilliseconds, initialDelayMilliseconds, pops.taskDescription());
}
private void loadFixedRateTask(FixedRateTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception {
ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod());
long fixedRateMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds()));
long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds()));
FixedRateTask fixedRateTask = new FixedRateTask(runnable, fixedRateMilliseconds, initialDelayMilliseconds);
taskRegistrar.addFixedRateTask(fixedRateTask);
log.info("装载FixedRateTask[{}#{}()]成功,固定执行频率:{} ms,初始延迟执行时间:{} ms,任务描述:{}", pops.taskHostKlass(),
pops.taskMethod(), fixedRateMilliseconds, initialDelayMilliseconds, pops.taskDescription());
}
private long parseDelayAsLong(String value) {
if (null == value) {
return 0L;
}
if (value.length() > 1 && (isP(value.charAt(0)) || isP(value.charAt(1)))) {
return Duration.parse(value).toMillis();
}
return Long.parseLong(value);
}
private boolean isP(char ch) {
return (ch == 'P' || ch == 'p');
}
/**
* 加载任务配置,预留给子类实现
*/
protected abstract List<InternalTaskProperties> loadTaskProperties() throws Exception;
interface InternalTaskProperties {
String taskHostKlass();
String taskMethod();
String taskDescription();
}
@Builder
protected static class CronTaskProperties implements InternalTaskProperties {
private String taskHostKlass;
private String taskMethod;
private String cronExpression;
private String taskDescription;
private String timeZone;
@Override
public String taskDescription() {
return taskDescription;
}
public String cronExpression() {
return cronExpression;
}
public String timeZone() {
return timeZone;
}
@Override
public String taskHostKlass() {
return taskHostKlass;
}
@Override
public String taskMethod() {
return taskMethod;
}
}
@Builder
protected static class FixedDelayTaskProperties implements InternalTaskProperties {
private String taskHostKlass;
private String taskMethod;
private String intervalMilliseconds;
private String initialDelayMilliseconds;
private String taskDescription;
@Override
public String taskDescription() {
return taskDescription;
}
public String initialDelayMilliseconds() {
return initialDelayMilliseconds;
}
public String intervalMilliseconds() {
return intervalMilliseconds;
}
@Override
public String taskHostKlass() {
return taskHostKlass;
}
@Override
public String taskMethod() {
return taskMethod;
}
}
@Builder
protected static class FixedRateTaskProperties implements InternalTaskProperties {
private String taskHostKlass;
private String taskMethod;
private String intervalMilliseconds;
private String initialDelayMilliseconds;
private String taskDescription;
@Override
public String taskDescription() {
return taskDescription;
}
public String initialDelayMilliseconds() {
return initialDelayMilliseconds;
}
public String intervalMilliseconds() {
return intervalMilliseconds;
}
@Override
public String taskHostKlass() {
return taskHostKlass;
}
@Override
public String taskMethod() {
return taskMethod;
}
}
}
loadTaskProperties()方法用于加载任务配置,留给子类实现。
JSON配置
JSON配置文件的格式如下(类路径下的scheduling/tasks.json文件):
{
"version": 1,
"tasks": [
{
"taskKlass": "club.throwable.schedule.Tasks",
"taskMethods": [
{
"taskType": "FIXED_DELAY",
"taskDescription": "processTask1任务",
"taskMethod": "processTask1",
"intervalMilliseconds": "5000"
}
]
}
]
}
每个层级都有一个enable属性,默认为true,只有强制指定为false的时候才不会装载对应的任务调度方法。这里就是简单继承AbstractSchedulingConfigurer,实现从类路径加载配置的逻辑,定义JsonSchedulingConfigurer:
public class JsonSchedulingConfigurer extends AbstractSchedulingConfigurer {
// 这里把默认的任务配置JSON文件放在CLASSPATH下的scheduling/tasks.json,可以通过配置项scheduling.json.config.location进行覆盖
@Value("${scheduling.json.config.location:scheduling/tasks.json}")
private String location;
@Autowired
private ObjectMapper objectMapper;
@Override
protected List<InternalTaskProperties> loadTaskProperties() throws Exception {
ClassPathResource resource = new ClassPathResource(location);
String content = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
ScheduleTaskProperties properties = objectMapper.readValue(content, ScheduleTaskProperties.class);
if (Boolean.FALSE.equals(properties.getEnable()) || null == properties.getTasks()) {
return Lists.newArrayList();
}
List<InternalTaskProperties> target = Lists.newArrayList();
for (ScheduleTasks tasks : properties.getTasks()) {
if (null != tasks) {
List<ScheduleTaskMethod> taskMethods = tasks.getTaskMethods();
if (null != taskMethods) {
for (ScheduleTaskMethod taskMethod : taskMethods) {
if (!Boolean.FALSE.equals(taskMethod.getEnable())) {
if (ScheduleTaskType.CRON == taskMethod.getTaskType()) {
target.add(CronTaskProperties.builder()
.taskMethod(taskMethod.getTaskMethod())
.cronExpression(taskMethod.getCronExpression())
.timeZone(taskMethod.getTimeZone())
.taskDescription(taskMethod.getTaskDescription())
.taskHostKlass(tasks.getTaskKlass())
.build());
}
if (ScheduleTaskType.FIXED_DELAY == taskMethod.getTaskType()) {
target.add(FixedDelayTaskProperties.builder()
.taskMethod(taskMethod.getTaskMethod())
.intervalMilliseconds(taskMethod.getIntervalMilliseconds())
.initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds())
.taskDescription(taskMethod.getTaskDescription())
.taskHostKlass(tasks.getTaskKlass())
.build());
}
if (ScheduleTaskType.FIXED_RATE == taskMethod.getTaskType()) {
target.add(FixedRateTaskProperties.builder()
.taskMethod(taskMethod.getTaskMethod())
.intervalMilliseconds(taskMethod.getIntervalMilliseconds())
.initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds())
.taskDescription(taskMethod.getTaskDescription())
.taskHostKlass(tasks.getTaskKlass())
.build());
}
}
}
}
}
}
return target;
}
}
添加一个配置类和任务类:
@Configuration
public class SchedulingAutoConfiguration {
@Bean
public JsonSchedulingConfigurer jsonSchedulingConfigurer(){
return new JsonSchedulingConfigurer();
}
}
// club.throwable.schedule.Tasks
@Slf4j
@Component
public class Tasks {
public void processTask1() {
log.info("processTask1触发..........");
}
}
启动SpringBoot应用,某次执行的部分日志如下:
2020-03-22 16:24:17.248 INFO 22836 --- [ main] c.t.s.AbstractSchedulingConfigurer : 装载FixedDelayTask[club.throwable.schedule.Tasks#processTask1()]成功,固定延迟间隔:5000 ms,初始延迟执行时间:0 ms,任务描述:processTask1任务
2020-03-22 16:24:22.275 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发..........
2020-03-22 16:24:27.277 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发..........
2020-03-22 16:24:32.279 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发..........
......
这里有些细节没有完善,例如配置文件参数的一些非空判断、配置值是否合法等等校验逻辑没有做,如果要设计成一个工业级的类库,这些方面必须要考虑。
JDBC数据源配置
JDBC数据源这里用MySQL举例说明,先建一个调度任务配置表schedule_task:
CREATE TABLE `schedule_task`
(
id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT '主键',
edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
editor VARCHAR(32) NOT NULL DEFAULT 'admin' COMMENT '修改者',
creator VARCHAR(32) NOT NULL DEFAULT 'admin' COMMENT '创建者',
deleted BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '软删除标识',
task_host_class VARCHAR(256) NOT NULL COMMENT '任务宿主类全类名',
task_method VARCHAR(128) NOT NULL COMMENT '任务执行方法名',
task_type VARCHAR(16) NOT NULL COMMENT '任务类型',
task_description VARCHAR(64) NOT NULL COMMENT '任务描述',
cron_expression VARCHAR(128) COMMENT 'cron表达式',
time_zone VARCHAR(32) COMMENT '时区',
interval_milliseconds BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '执行间隔时间',
initial_delay_milliseconds BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '初始延迟执行时间',
UNIQUE uniq_class_method (task_host_class, task_method)
) COMMENT '调度任务配置表';
其实具体的做法和JSON配置差不多,先引入spring-boot-starter-jdbc,接着编写MysqlSchedulingConfigurer:
// DAO
@RequiredArgsConstructor
public class MysqlScheduleTaskDao {
private final JdbcTemplate jdbcTemplate;
private static final ResultSetExtractor<List<ScheduleTask>> MULTI = r -> {
List<ScheduleTask> tasks = Lists.newArrayList();
while (r.next()) {
ScheduleTask task = new ScheduleTask();
tasks.add(task);
task.setId(r.getLong("id"));
task.setCronExpression(r.getString("cron_expression"));
task.setInitialDelayMilliseconds(r.getLong("initial_delay_milliseconds"));
task.setIntervalMilliseconds(r.getLong("interval_milliseconds"));
task.setTimeZone(r.getString("time_zone"));
task.setTaskDescription(r.getString("task_description"));
task.setTaskHostClass(r.getString("task_host_class"));
task.setTaskMethod(r.getString("task_method"));
task.setTaskType(r.getString("task_type"));
}
return tasks;
};
public List<ScheduleTask> selectAllTasks() {
return jdbcTemplate.query("SELECT * FROM schedule_task WHERE deleted = 0", MULTI);
}
}
// MysqlSchedulingConfigurer
@RequiredArgsConstructor
public class MysqlSchedulingConfigurer extends AbstractSchedulingConfigurer {
private final MysqlScheduleTaskDao mysqlScheduleTaskDao;
@Override
protected List<InternalTaskProperties> loadTaskProperties() throws Exception {
List<InternalTaskProperties> target = Lists.newArrayList();
List<ScheduleTask> tasks = mysqlScheduleTaskDao.selectAllTasks();
if (!tasks.isEmpty()) {
for (ScheduleTask task : tasks) {
ScheduleTaskType scheduleTaskType = ScheduleTaskType.fromType(task.getTaskType());
if (ScheduleTaskType.CRON == scheduleTaskType) {
target.add(CronTaskProperties.builder()
.taskMethod(task.getTaskMethod())
.cronExpression(task.getCronExpression())
.timeZone(task.getTimeZone())
.taskDescription(task.getTaskDescription())
.taskHostKlass(task.getTaskHostClass())
.build());
}
if (ScheduleTaskType.FIXED_DELAY == scheduleTaskType) {
target.add(FixedDelayTaskProperties.builder()
.taskMethod(task.getTaskMethod())
.intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds()))
.initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds()))
.taskDescription(task.getTaskDescription())
.taskHostKlass(task.getTaskHostClass())
.build());
}
if (ScheduleTaskType.FIXED_RATE == scheduleTaskType) {
target.add(FixedRateTaskProperties.builder()
.taskMethod(task.getTaskMethod())
.intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds()))
.initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds()))
.taskDescription(task.getTaskDescription())
.taskHostKlass(task.getTaskHostClass())
.build());
}
}
}
return target;
}
}
记得引入spring-boot-starter-jdbc和mysql-connector-java并且激活MysqlSchedulingConfigurer配置。插入一条记录:
INSERT INTO `schedule_task`(`id`, `edit_time`, `create_time`, `editor`, `creator`, `deleted`, `task_host_class`, `task_method`, `task_type`, `task_description`, `cron_expression`, `time_zone`, `interval_milliseconds`, `initial_delay_milliseconds`) VALUES (1, '2020-03-30 23:46:10', '2020-03-30 23:46:10', 'admin', 'admin', 0, 'club.throwable.schedule.Tasks', 'processTask1', 'FIXED_DELAY', '测试任务', NULL, NULL, 10000, 5000);
然后启动服务,某次执行的输出:
2020-03-30 23:47:27.376 INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发..........
2020-03-30 23:47:37.378 INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发..........
....
混合配置
有些时候我们希望可以JSON配置和JDBC数据源配置进行混合配置,或者动态二选一以便灵活应对多环境的场景(例如要在开发环境使用JSON配置而测试和生产环境使用JDBC数据源配置,甚至可以将JDBC数据源配置覆盖JSON配置,这样能保证总是倾向于使用JDBC数据源配置),这样需要对前面两小节的实现加多一层抽象。这里的设计可以参考SpringMVC中的控制器参数解析器的设计,具体是HandlerMethodArgumentResolverComposite,其实道理是相同的。
其他注意事项
在生产实践中,暂时不考虑生成任务执行日志和细粒度的监控,着重做了两件事:
- 并发控制,(多服务节点下)禁止任务并发执行。
- 跟踪任务的日志轨迹。
解决并发执行问题
一般情况下,我们需要禁止任务并发执行,考虑引入Redisson提供的分布式锁:
// 引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>最新版本</version>
</dependency>
// 配置类
@Configuration
@AutoConfigureAfter(RedisAutoConfiguration.class)
public class RedissonAutoConfiguration {
@Autowired
private RedisProperties redisProperties;
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() {
Config config = new Config();
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setAddress(String.format("redis://%s:%d", redisProperties.getHost(), redisProperties.getPort()));
if (redisProperties.getDatabase() > 0) {
singleServerConfig.setDatabase(redisProperties.getDatabase());
}
if (null != redisProperties.getPassword()) {
singleServerConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
}
// 分布式锁工厂
@Component
public class DistributedLockFactory {
private static final String DISTRIBUTED_LOCK_PATH_PREFIX = "dl:";
@Autowired
private RedissonClient redissonClient;
public DistributedLock provideDistributedLock(String lockKey) {
String lockPath = DISTRIBUTED_LOCK_PATH_PREFIX + lockKey;
return new RedissonDistributedLock(redissonClient, lockPath);
}
}
这里考虑到项目依赖了spring-boot-starter-redis,直接复用了它的配置属性类(RedissonDistributedLock是RLock的轻量级封装,见附录)。使用方式如下:
@Autowired
private DistributedLockFactory distributedLockFactory;
public void task1() {
DistributedLock lock = distributedLockFactory.provideDistributedLock(lockKey);
// 等待时间为20秒,持有锁的最大时间为60秒
boolean tryLock = lock.tryLock(20L, 60, TimeUnit.SECONDS);
if (tryLock) {
try {
// 业务逻辑
}finally {
lock.unlock();
}
}
}
引入MDC跟踪任务的Trace
MDC其实是Mapped Diagnostic Context的缩写,也就是映射诊断上下文,一般用于日志框架里面同一个线程执行过程的跟踪(例如一个线程跑过了多个方法,各个方法里面都打印了日志,那么通过MDC可以对整个调用链通过一个唯一标识关联起来),例如这里选用slf4j提供的org.slf4j.MDC:
@Component
public class MappedDiagnosticContextAssistant {
/**
* 在MDC中执行
*
* @param runnable runnable
*/
public void processInMappedDiagnosticContext(Runnable runnable) {
String uuid = UUID.randomUUID().toString();
MDC.put("TRACE_ID", uuid);
try {
runnable.run();
} finally {
MDC.remove("TRACE_ID");
}
}
}
任务执行的时候需要包裹成一个Runnale实例:
public void task1() {
mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() -> {
StopWatch watch = new StopWatch();
watch.start();
log.info("开始执行......");
// 业务逻辑
watch.stop();
log.info("执行完毕,耗时:{} ms......", watch.getTotalTimeMillis());
});
}
结合前面一节提到的并发控制,那么最终执行的任务方法如下:
public void task1() {
mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() -> {
StopWatch watch = new StopWatch();
watch.start();
log.info("开始执行......");
scheduleTaskAssistant.executeInDistributedLock("任务分布式锁KEY", () -> {
// 真实的业务逻辑
});
watch.stop();
log.info("执行完毕,耗时:{} ms......", watch.getTotalTimeMillis());
});
}
这里的方法看起来比较别扭,其实可以直接在任务装载的时候基于分布式锁和MDC进行封装,方式类似于ScheduledMethodRunnable,这里不做展开,因为要详细展开篇幅可能比较大(ScheduleTaskAssistant见附录)。
小结
其实spring-context整个调度模块完全依赖于TaskScheduler实现,更底层的是JUC调度线程池ScheduledThreadPoolExecutor。如果想要从底层原理理解整个调度模块的运行原理,那么就一定要分析ScheduledThreadPoolExecutor的实现。整篇文章大致介绍了spring-context调度模块的加载调度任务的流程,并且基于扩展接口SchedulingConfigurer扩展出多种自定义配置调度任务的方式,但是考虑到需要在生产环境中运行,那么免不了需要考虑监控、并发控制、日志跟踪等等的功能,但是这样就会使得整个调度模块变重,慢慢地就会发现,这个轮子越造越大,越有主流调度框架Quartz或者Easy Scheduler的影子。笔者认为,软件工程,有些时候要权衡取舍,该抛弃的就应该果断抛弃,否则总是负重而行,还能走多远?
参考资料:
- SpringBoot源码
附录
ScheduleTaskAssistant:
@RequiredArgsConstructor
@Component
public class ScheduleTaskAssistant {
/**
* 5秒
*/
public static final long DEFAULT_WAIT_TIME = 5L;
/**
* 30秒
*/
public static final long DEFAULT_LEAVE_TIME = 30L;
private final DistributedLockFactory distributedLockFactory;
/**
* 在分布式锁中执行
*
* @param waitTime 锁等着时间
* @param leaveTime 锁持有时间
* @param timeUnit 时间单位
* @param lockKey 锁的key
* @param task 任务对象
*/
public void executeInDistributedLock(long waitTime, long leaveTime, TimeUnit timeUnit, String lockKey, Runnable task) {
DistributedLock lock = distributedLockFactory.dl(lockKey);
boolean tryLock = lock.tryLock(waitTime, leaveTime, timeUnit);
if (tryLock) {
try {
long waitTimeMillis = timeUnit.toMillis(waitTime);
long start = System.currentTimeMillis();
task.run();
long end = System.currentTimeMillis();
long cost = end - start;
// 预防锁过早释放
if (cost < waitTimeMillis) {
Sleeper.X.sleep(waitTimeMillis - cost);
}
} finally {
lock.unlock();
}
}
}
/**
* 在分布式锁中执行 - 使用默认时间
*
* @param lockKey 锁的key
* @param task 任务对象
*/
public void executeInDistributedLock(String lockKey, Runnable task) {
executeInDistributedLock(DEFAULT_WAIT_TIME, DEFAULT_LEAVE_TIME, TimeUnit.SECONDS, lockKey, task);
}
}
RedissonDistributedLock:
@Slf4j
public class RedissonDistributedLock implements DistributedLock {
private final RedissonClient redissonClient;
private final String lockPath;
private final RLock internalLock;
RedissonDistributedLock(RedissonClient redissonClient, String lockPath) {
this.redissonClient = redissonClient;
this.lockPath = lockPath;
this.internalLock = initInternalLock();
}
private RLock initInternalLock() {
return redissonClient.getLock(lockPath);
}
@Override
public boolean isLock() {
return internalLock.isLocked();
}
@Override
public boolean isHeldByCurrentThread() {
return internalLock.isHeldByCurrentThread();
}
@Override
public void lock(long leaseTime, TimeUnit unit) {
internalLock.lock(leaseTime, unit);
}
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) {
try {
return internalLock.tryLock(waitTime, leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(String.format("Acquire lock fail by thread interrupted,path:%s", lockPath), e);
}
}
@Override
public void unlock() {
try {
internalLock.unlock();
} catch (IllegalMonitorStateException ex) {
log.warn("Unlock path:{} error for thread status change in concurrency", lockPath, ex);
}
}
}
(本文完 c-7-d e-a-20200324 真是有点滑稽,笔者发现任务持久化最好还是用现成的工业级调度器,于是基于Quartz做了轻量级封装,写了个后台管理界面,且听下回分解)