xxl-job如何实现在admin后台直接编写任务代码执行

文章概览

  • 续接上文,讲述admin后台编写代码直接执行是怎么实现的
  • 相关技术点的讲解

admin对java/其他脚本的支持

  • 先看看admin调度任务支持的类型
    • 总共支持了7种模式(bean模式是提前写好的任务类,随着执行器启动/GLUE_GROOVY是在admin上编写的代码)
public enum GlueTypeEnum {
    BEAN("BEAN", false, null, null),
    GLUE_GROOVY("GLUE(Java)", false, null, null),
    GLUE_SHELL("GLUE(Shell)", true, "bash", ".sh"),
    GLUE_PYTHON("GLUE(Python)", true, "python", ".py"),
    GLUE_PHP("GLUE(PHP)", true, "php", ".php"),
    GLUE_NODEJS("GLUE(Nodejs)", true, "node", ".js"),
    GLUE_POWERSHELL("GLUE(PowerShell)", true, "powershell", ".ps1");
}    
  • 执行器如何实现携带GLUE_GROOVY的请求
    • 定位到com.xxl.job.core.biz.impl.ExecutorBizImpl#run
if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
      // valid handler
  if (jobHandler == null) {
    try {
      // triggerParam.getGlueSource()获取到的字符串是admin发送过来的源码
      // 这里首先要对胶水语言的工厂进行初始化,获取一个单例实例  
      IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
      // 构造完jobHandler,后续执行跟上一篇文章介绍的Bean执行是一致的
      jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
    } catch (Exception e) {
      logger.error(e.getMessage(), e);
      return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
    }
  }
}
  • GlueFactory
public class GlueFactory {
  private static GlueFactory glueFactory = new GlueFactory();
  public static GlueFactory getInstance() {
    return glueFactory;
  }
  // 会根据运行环境决定构造的工厂是原生的还是spring的
  // SpringGlueFactory相比GlueFactory仅仅是实现了com.xxl.job.core.glue.impl.SpringGlueFactory#injectService
  // injectService的方法主要是用来处理依赖注入的
  public static void refreshInstance(int type) {
    if (type == 0) {
      glueFactory = new GlueFactory();
    } else if (type == 1) {
      glueFactory = new SpringGlueFactory();
    }
  }
  // 类加载器
  private GroovyClassLoader groovyClassLoader = new GroovyClassLoader();
  // 缓存,key是源码的md5
  private ConcurrentMap<String, Class<?>> CLASS_CACHE = new ConcurrentHashMap<>();
}

  • 加载实例loadNewInstance
public IJobHandler loadNewInstance(String codeSource) throws Exception{
    if (codeSource!=null && codeSource.trim().length()>0) {
        // 把源码进行加载转换成class类
        Class<?> clazz = getCodeSourceClass(codeSource);
        if (clazz != null) {
            // 调用class的方法转换成实例
            Object instance = clazz.newInstance();
            if (instance!=null) {
                // admin编写的类要求必须实现IJobHandler
                if (instance instanceof IJobHandler) {
                    // 这里要进行字段注入,比如@Autowired
                    this.injectService(instance);
                    // 返回的这个实例也就跟我们自己编写的执行器bean任务类无异了
                    return (IJobHandler) instance;
                } else {
                    throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, "
                            + "cannot convert from instance["+ instance.getClass() +"] to IJobHandler");
                }
            }
        }
    }
    throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, instance is null");
}
// 对string类型的源码转换成class,也就是要进行类加载
private Class<?> getCodeSourceClass(String codeSource){
    try {
        // 把源码转换为md5的字符串
        byte[] md5 = MessageDigest.getInstance("MD5").digest(codeSource.getBytes());
        String md5Str = new BigInteger(1, md5).toString(16);
        // 从缓存中检查是否存在对应的class
        Class<?> clazz = CLASS_CACHE.get(md5Str);
        if(clazz == null){
            // 使用GroovyClassLoader进行类加载
            clazz = groovyClassLoader.parseClass(codeSource);
            CLASS_CACHE.putIfAbsent(md5Str, clazz);
        }
        return clazz;
    } catch (Exception e) {
        return groovyClassLoader.parseClass(codeSource);
    }
}

  • 看看注入的代码实现 injectService
@Override
public void injectService(Object instance){
    // 判断当先传入的实例
    // 空返回    
    if (instance==null) {
        return;
    }
    // 检查spring上下文是否存在
    // spring环境中上下文携带的beanFactory是用来存放bean实例的    
    if (XxlJobSpringExecutor.getApplicationContext() == null) {
        return;
    }
    // 获取实例的字段
    // 这里是没办法获取父类字段,所以编写的代码最好按它的规范来,直接实现IJobHandler
    // 当然你可以修改源码来达到父类的字段也可以被注入    
    Field[] fields = instance.getClass().getDeclaredFields();
    for (Field field : fields) {
        // 排除静态字段
        if (Modifier.isStatic(field.getModifiers())) {
            continue;
        }

        Object fieldBean = null;
        
        // 接下来就是处理注入的逻辑了
        if (AnnotationUtils.getAnnotation(field, Resource.class) != null) {
            try {
                // 先判断Resource字段注入
                // Resource注入的逻辑是先获取注解的value,如果value不存在这取字段的名字
                Resource resource = AnnotationUtils.getAnnotation(field, Resource.class);
                if (resource.name()!=null && resource.name().length()>0){
                    fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(resource.name());
                } else {
                    fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getName());
                }
            } catch (Exception e) {
            }
            if (fieldBean==null ) {
                // 这里补充一段兜底逻辑,按类型进行获取
                fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getType());
            }
        } else if (AnnotationUtils.getAnnotation(field, Autowired.class) != null) {
            // Autowired注入可以搭配Qualifier来指定bean的名字,如果不存在这个注解,默认按bean类型来
            Qualifier qualifier = AnnotationUtils.getAnnotation(field, Qualifier.class);
            if (qualifier!=null && qualifier.value()!=null && qualifier.value().length()>0) {
                fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(qualifier.value());
            } else {
                fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getType());
            }
        }

        if (fieldBean!=null) {
            field.setAccessible(true);
            try {
                field.set(instance, fieldBean);
            } catch (IllegalArgumentException e) {
                logger.error(e.getMessage(), e);
            } catch (IllegalAccessException e) {
                logger.error(e.getMessage(), e);
            }
        }
    }
}

  • GroovyClassLoader这个类加载器的具体原理会放在下个文章去讲!
  • 接着看xxl-job执行器是如何支持其他脚本语言?
    • ScriptJobHandler表示它是一个脚本文件执行器
// 代码会进行删减
// 只留下关键部分进行讲解
public class ScriptJobHandler extends IJobHandler {

    // 构造函数
    public ScriptJobHandler(int jobId, long glueUpdatetime, String gluesource, GlueTypeEnum glueType){
        // 省略了N行代码  
        // 这里的代码主要意图就是遍历胶水代码存放路径
        // 检查到如果有存在相同jobid的胶水代码文件,要进行删除,防止脏逻辑
        File glueSrcPath = new File(XxlJobFileAppender.getGlueSrcPath());
        if (glueSrcPath.exists()) {
            File[] glueSrcFileList = glueSrcPath.listFiles();
            if (glueSrcFileList!=null && glueSrcFileList.length>0) {
                for (File glueSrcFileItem : glueSrcFileList) {
                    if (glueSrcFileItem.getName().startsWith(String.valueOf(jobId)+"_")) {
                        glueSrcFileItem.delete();
                    }
                }
            }
        }

    }
    // 真正执行的代码
    @Override
    public void execute() throws Exception {
        // 执行命令
        // 这些都是枚举定义好的
        String cmd = glueType.getCmd();
        // 这部分创建脚本文件
        String scriptFileName = XxlJobFileAppender.getGlueSrcPath()
                .concat(File.separator)
                .concat(String.valueOf(jobId))
                .concat("_")
                .concat(String.valueOf(glueUpdatetime))
                .concat(glueType.getSuffix());
        File scriptFile = new File(scriptFileName);
        if (!scriptFile.exists()) {
            ScriptUtil.markScriptFile(scriptFileName, gluesource);
        }
        // 获取日志文件名
        // 后续可以写执行日志
        String logFileName = XxlJobContext.getXxlJobContext().getJobLogFileName();
        // script params:0=param、1=分片序号、2=分片总数
        String[] scriptParams = new String[3];
        scriptParams[0] = XxlJobHelper.getJobParam();
        scriptParams[1] = String.valueOf(XxlJobContext.getXxlJobContext().getShardIndex());
        scriptParams[2] = String.valueOf(XxlJobContext.getXxlJobContext().getShardTotal());
        // invoke
        XxlJobHelper.log("----------- script file:"+ scriptFileName +" -----------");
        // 执行脚本代码
        int exitValue = ScriptUtil.execToFile(cmd, scriptFileName, logFileName, scriptParams);
        if (exitValue == 0) {
            XxlJobHelper.handleSuccess();
            return;
        } else {
            XxlJobHelper.handleFail("script exit value("+exitValue+") is failed");
            return ;
        }

    }

}

  • ScriptUtil.execToFile
public static int execToFile(String command, String scriptFile, String logFile, String... params) throws IOException {
    FileOutputStream fileOutputStream = null;
    Thread inputThread = null;
    Thread errThread = null;
    try {
        // 打开文件输出流
        fileOutputStream = new FileOutputStream(logFile, true);
        // 把命令和参数组合成数组
        List<String> cmdarray = new ArrayList<>();
        cmdarray.add(command);
        cmdarray.add(scriptFile);
        if (params!=null && params.length>0) {
            for (String param:params) {
                cmdarray.add(param);
            }
        }
        String[] cmdarrayFinal = cmdarray.toArray(new String[cmdarray.size()]);
        // java调用系统命令开启进程的api
        final Process process = Runtime.getRuntime().exec(cmdarrayFinal);
        // 把启动后进程的输出流读取到文件中去
        // copy方法就是流的复制
        final FileOutputStream finalFileOutputStream = fileOutputStream;
        inputThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    copy(process.getInputStream(), finalFileOutputStream, new byte[1024]);
                } catch (IOException e) {
                    XxlJobHelper.log(e);
                }
            }
        });
        errThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    copy(process.getErrorStream(), finalFileOutputStream, new byte[1024]);
                } catch (IOException e) {
                    XxlJobHelper.log(e);
                }
            }
        });
        inputThread.start();
        errThread.start();
        // process-wait
        int exitValue = process.waitFor();      // exit code: 0=success, 1=error
        // log-thread join
        inputThread.join();
        errThread.join();
        return exitValue;
    } catch (Exception e) {
        XxlJobHelper.log(e);
        return -1;
    } finally {
        if (fileOutputStream != null) {
            try {
                fileOutputStream.close();
            } catch (IOException e) {
                XxlJobHelper.log(e);
            }

        }
        if (inputThread != null && inputThread.isAlive()) {
            inputThread.interrupt();
        }
        if (errThread != null && errThread.isAlive()) {
            errThread.interrupt();
        }
    }
}

结尾

  • 后续会在下篇文章中分析GroovyClassLoader,然后我们尝试来做一个简单的Java做题平台!类似leetcode,但是又不足以那么强大

作者:SAM2021
链接:xxl-job如何实现在admin后台直接编写任务代码执行 - 掘金