文章概览
- 续接上文,讲述admin后台编写代码直接执行是怎么实现的
- 相关技术点的讲解
admin对java/其他脚本的支持
- 先看看admin调度任务支持的类型
- 总共支持了7种模式(bean模式是提前写好的任务类,随着执行器启动/GLUE_GROOVY是在admin上编写的代码)
public enum GlueTypeEnum {
BEAN("BEAN", false, null, null),
GLUE_GROOVY("GLUE(Java)", false, null, null),
GLUE_SHELL("GLUE(Shell)", true, "bash", ".sh"),
GLUE_PYTHON("GLUE(Python)", true, "python", ".py"),
GLUE_PHP("GLUE(PHP)", true, "php", ".php"),
GLUE_NODEJS("GLUE(Nodejs)", true, "node", ".js"),
GLUE_POWERSHELL("GLUE(PowerShell)", true, "powershell", ".ps1");
}
- 执行器如何实现携带GLUE_GROOVY的请求
- 定位到com.xxl.job.core.biz.impl.ExecutorBizImpl#run
if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
// valid handler
if (jobHandler == null) {
try {
// triggerParam.getGlueSource()获取到的字符串是admin发送过来的源码
// 这里首先要对胶水语言的工厂进行初始化,获取一个单例实例
IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
// 构造完jobHandler,后续执行跟上一篇文章介绍的Bean执行是一致的
jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
}
}
- GlueFactory
public class GlueFactory {
private static GlueFactory glueFactory = new GlueFactory();
public static GlueFactory getInstance() {
return glueFactory;
}
// 会根据运行环境决定构造的工厂是原生的还是spring的
// SpringGlueFactory相比GlueFactory仅仅是实现了com.xxl.job.core.glue.impl.SpringGlueFactory#injectService
// injectService的方法主要是用来处理依赖注入的
public static void refreshInstance(int type) {
if (type == 0) {
glueFactory = new GlueFactory();
} else if (type == 1) {
glueFactory = new SpringGlueFactory();
}
}
// 类加载器
private GroovyClassLoader groovyClassLoader = new GroovyClassLoader();
// 缓存,key是源码的md5
private ConcurrentMap<String, Class<?>> CLASS_CACHE = new ConcurrentHashMap<>();
}
- 加载实例loadNewInstance
public IJobHandler loadNewInstance(String codeSource) throws Exception{
if (codeSource!=null && codeSource.trim().length()>0) {
// 把源码进行加载转换成class类
Class<?> clazz = getCodeSourceClass(codeSource);
if (clazz != null) {
// 调用class的方法转换成实例
Object instance = clazz.newInstance();
if (instance!=null) {
// admin编写的类要求必须实现IJobHandler
if (instance instanceof IJobHandler) {
// 这里要进行字段注入,比如@Autowired
this.injectService(instance);
// 返回的这个实例也就跟我们自己编写的执行器bean任务类无异了
return (IJobHandler) instance;
} else {
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, "
+ "cannot convert from instance["+ instance.getClass() +"] to IJobHandler");
}
}
}
}
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, instance is null");
}
// 对string类型的源码转换成class,也就是要进行类加载
private Class<?> getCodeSourceClass(String codeSource){
try {
// 把源码转换为md5的字符串
byte[] md5 = MessageDigest.getInstance("MD5").digest(codeSource.getBytes());
String md5Str = new BigInteger(1, md5).toString(16);
// 从缓存中检查是否存在对应的class
Class<?> clazz = CLASS_CACHE.get(md5Str);
if(clazz == null){
// 使用GroovyClassLoader进行类加载
clazz = groovyClassLoader.parseClass(codeSource);
CLASS_CACHE.putIfAbsent(md5Str, clazz);
}
return clazz;
} catch (Exception e) {
return groovyClassLoader.parseClass(codeSource);
}
}
- 看看注入的代码实现 injectService
@Override
public void injectService(Object instance){
// 判断当先传入的实例
// 空返回
if (instance==null) {
return;
}
// 检查spring上下文是否存在
// spring环境中上下文携带的beanFactory是用来存放bean实例的
if (XxlJobSpringExecutor.getApplicationContext() == null) {
return;
}
// 获取实例的字段
// 这里是没办法获取父类字段,所以编写的代码最好按它的规范来,直接实现IJobHandler
// 当然你可以修改源码来达到父类的字段也可以被注入
Field[] fields = instance.getClass().getDeclaredFields();
for (Field field : fields) {
// 排除静态字段
if (Modifier.isStatic(field.getModifiers())) {
continue;
}
Object fieldBean = null;
// 接下来就是处理注入的逻辑了
if (AnnotationUtils.getAnnotation(field, Resource.class) != null) {
try {
// 先判断Resource字段注入
// Resource注入的逻辑是先获取注解的value,如果value不存在这取字段的名字
Resource resource = AnnotationUtils.getAnnotation(field, Resource.class);
if (resource.name()!=null && resource.name().length()>0){
fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(resource.name());
} else {
fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getName());
}
} catch (Exception e) {
}
if (fieldBean==null ) {
// 这里补充一段兜底逻辑,按类型进行获取
fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getType());
}
} else if (AnnotationUtils.getAnnotation(field, Autowired.class) != null) {
// Autowired注入可以搭配Qualifier来指定bean的名字,如果不存在这个注解,默认按bean类型来
Qualifier qualifier = AnnotationUtils.getAnnotation(field, Qualifier.class);
if (qualifier!=null && qualifier.value()!=null && qualifier.value().length()>0) {
fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(qualifier.value());
} else {
fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getType());
}
}
if (fieldBean!=null) {
field.setAccessible(true);
try {
field.set(instance, fieldBean);
} catch (IllegalArgumentException e) {
logger.error(e.getMessage(), e);
} catch (IllegalAccessException e) {
logger.error(e.getMessage(), e);
}
}
}
}
- GroovyClassLoader这个类加载器的具体原理会放在下个文章去讲!
- 接着看xxl-job执行器是如何支持其他脚本语言?
- ScriptJobHandler表示它是一个脚本文件执行器
// 代码会进行删减
// 只留下关键部分进行讲解
public class ScriptJobHandler extends IJobHandler {
// 构造函数
public ScriptJobHandler(int jobId, long glueUpdatetime, String gluesource, GlueTypeEnum glueType){
// 省略了N行代码
// 这里的代码主要意图就是遍历胶水代码存放路径
// 检查到如果有存在相同jobid的胶水代码文件,要进行删除,防止脏逻辑
File glueSrcPath = new File(XxlJobFileAppender.getGlueSrcPath());
if (glueSrcPath.exists()) {
File[] glueSrcFileList = glueSrcPath.listFiles();
if (glueSrcFileList!=null && glueSrcFileList.length>0) {
for (File glueSrcFileItem : glueSrcFileList) {
if (glueSrcFileItem.getName().startsWith(String.valueOf(jobId)+"_")) {
glueSrcFileItem.delete();
}
}
}
}
}
// 真正执行的代码
@Override
public void execute() throws Exception {
// 执行命令
// 这些都是枚举定义好的
String cmd = glueType.getCmd();
// 这部分创建脚本文件
String scriptFileName = XxlJobFileAppender.getGlueSrcPath()
.concat(File.separator)
.concat(String.valueOf(jobId))
.concat("_")
.concat(String.valueOf(glueUpdatetime))
.concat(glueType.getSuffix());
File scriptFile = new File(scriptFileName);
if (!scriptFile.exists()) {
ScriptUtil.markScriptFile(scriptFileName, gluesource);
}
// 获取日志文件名
// 后续可以写执行日志
String logFileName = XxlJobContext.getXxlJobContext().getJobLogFileName();
// script params:0=param、1=分片序号、2=分片总数
String[] scriptParams = new String[3];
scriptParams[0] = XxlJobHelper.getJobParam();
scriptParams[1] = String.valueOf(XxlJobContext.getXxlJobContext().getShardIndex());
scriptParams[2] = String.valueOf(XxlJobContext.getXxlJobContext().getShardTotal());
// invoke
XxlJobHelper.log("----------- script file:"+ scriptFileName +" -----------");
// 执行脚本代码
int exitValue = ScriptUtil.execToFile(cmd, scriptFileName, logFileName, scriptParams);
if (exitValue == 0) {
XxlJobHelper.handleSuccess();
return;
} else {
XxlJobHelper.handleFail("script exit value("+exitValue+") is failed");
return ;
}
}
}
- ScriptUtil.execToFile
public static int execToFile(String command, String scriptFile, String logFile, String... params) throws IOException {
FileOutputStream fileOutputStream = null;
Thread inputThread = null;
Thread errThread = null;
try {
// 打开文件输出流
fileOutputStream = new FileOutputStream(logFile, true);
// 把命令和参数组合成数组
List<String> cmdarray = new ArrayList<>();
cmdarray.add(command);
cmdarray.add(scriptFile);
if (params!=null && params.length>0) {
for (String param:params) {
cmdarray.add(param);
}
}
String[] cmdarrayFinal = cmdarray.toArray(new String[cmdarray.size()]);
// java调用系统命令开启进程的api
final Process process = Runtime.getRuntime().exec(cmdarrayFinal);
// 把启动后进程的输出流读取到文件中去
// copy方法就是流的复制
final FileOutputStream finalFileOutputStream = fileOutputStream;
inputThread = new Thread(new Runnable() {
@Override
public void run() {
try {
copy(process.getInputStream(), finalFileOutputStream, new byte[1024]);
} catch (IOException e) {
XxlJobHelper.log(e);
}
}
});
errThread = new Thread(new Runnable() {
@Override
public void run() {
try {
copy(process.getErrorStream(), finalFileOutputStream, new byte[1024]);
} catch (IOException e) {
XxlJobHelper.log(e);
}
}
});
inputThread.start();
errThread.start();
// process-wait
int exitValue = process.waitFor(); // exit code: 0=success, 1=error
// log-thread join
inputThread.join();
errThread.join();
return exitValue;
} catch (Exception e) {
XxlJobHelper.log(e);
return -1;
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
XxlJobHelper.log(e);
}
}
if (inputThread != null && inputThread.isAlive()) {
inputThread.interrupt();
}
if (errThread != null && errThread.isAlive()) {
errThread.interrupt();
}
}
}
结尾
- 后续会在下篇文章中分析GroovyClassLoader,然后我们尝试来做一个简单的Java做题平台!类似leetcode,但是又不足以那么强大
作者:SAM2021
链接:xxl-job如何实现在admin后台直接编写任务代码执行 - 掘金