XXL-JOB EXECUTOR 源码解析

XXL-JOB EXECUTOR 源码解析

xxl-job 是一个分布式调度框架,同时具备GLUE特性。

xxl-job 分两部分,一为执行器,二为调度器,执行器的职责是具体的作业,而调度器的职责是接收原生配置的调度策略进行任务调度执行器作业。

此文专门讲解执行器部分,全文围绕spring boot环境讲解。

xxl-job version: 2.0.2-SNAPSHOT

官方地址:http://www.xuxueli.com/xxl-job/#/

解决了哪些问题:

  1. 统一管理分布式环境下的定时任务
  2. 界面化管理,清晰查看每一个任务的情况
  3. 监控任务执行情况及实时dashboard
  4. 集群调用、集群执行,具备HA及去除多余的分布式锁

调用顺序

  1. 加载配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/
@Configuration
@ComponentScan(basePackages = "com.xxl.job.executor.service.jobhandler")
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

@Value("${xxl.job.executor.appname}")
private String appName;

@Value("${xxl.job.executor.ip}")
private String ip;

@Value("${xxl.job.executor.port}")
private int port;

@Value("${xxl.job.accessToken}")
private String accessToken;

@Value("${xxl.job.executor.logpath}")
private String logPath;

@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;


@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppName(appName);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

return xxlJobSpringExecutor;
}

}
  1. 利用spring的初始化进入到com.xxl.job.core.executor.impl.XxlJobSpringExecutor#start方法中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* xxl-job executor (for spring)
*
* @author xuxueli 2018-11-01 09:24:52
*/
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware {


@Override
public void start() throws Exception {

// 初始化JobHandler仓库
initJobHandlerRepository(applicationContext);

// 刷新Glue工厂
GlueFactory.refreshInstance(1);


// 继续调用核心库提供的加载方法
super.start();
}

private void initJobHandlerRepository(ApplicationContext applicationContext){
if (applicationContext == null) {
return;
}

// 开始构建JobHandler
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);

if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) {
if (serviceBean instanceof IJobHandler){
String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
IJobHandler handler = (IJobHandler) serviceBean;
if (loadJobHandler(name) != null) { // 名称去重
throw new RuntimeException("xxl-job jobhandler naming conflicts.");
}
registJobHandler(name, handler);
}
}
}
}

// ---------------------- applicationContext ----------------------
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}

}

可以看到,start方法很具体的分为三步:

  1. 加载Jobhandler
  2. 根据参数刷新GLUE工厂
  3. 调用核心库加载方法

先具体看第一步:

根据spring容器提供的ApplicationContext扫描并读取所有注解了@JobHandlerBean, 以注解@JobHandler的值作为JobHandler的名称,并且做了名称去重检查,然后再加入到由xxl-job框架维护的JobHandler仓库中,到此,JobHandler加载完毕。

由此,也可以得出扩展,只要你能办法把你的Bean并且带一个名称加入到这个由xxl-job框架维护的JobHandler仓库中,那么即使你没有经过前面的那些步骤,你这个Bean也是一个合法的JobHandler。而正好,这个JobHandler仓库提供了开放的访问方式,如下:

1
2
3
4
// 注入JobHandler
XxlJobExecutor.registJobHandler(String name, IJobHandler jobHandler);
// 获取JobHandler
XxlJobExecutor.loadJobHandler(String name);

再来看第二步:

第二步相对来说步骤比较简单,根据参数刷新GLUE工厂,其实就是初始化一个工厂,目前只接受01两个参数:

0:初始化一个普通的GLUE工厂,依赖反射生成对象,同时使用GROOVY类加载器编译。

1:初始化一个SpringGLUE工厂,依赖Spring获取对象。

最后再来看第三步:

这里开始使用前面提到的那些配置,其实前面的绝大多数配置都是有默认值可以不填的,甚至连ipport都是可以不提供的。真正需要填的只有两个参数xxl.job.executor.appnamexxl.job.admin.addresses,然而当接入了spring-cloud-common依赖后,你这两个参数也可以不填了,这部分将在后面讲解,spring cloud环境无缝集成xxl-job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* Created by xuxueli on 2016/3/2 21:14.
*/
public class XxlJobExecutor {
private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);

// ---------------------- param ----------------------
private String adminAddresses;
private String appName;
private String ip;
private int port;
private String accessToken;
private String logPath;
private int logRetentionDays;

// ---------------------- start + stop ----------------------
public void start() throws Exception {

// 配置日志路径
XxlJobFileAppender.initLogPath(logPath);

// 构建调度器客户端
initAdminBizList(adminAddresses, accessToken);


// 配置日志清理线程(daemon)
JobLogFileCleanThread.getInstance().start(logRetentionDays);

// 构建触发回调线程
TriggerCallbackThread.getInstance().start();

// 构建执行器服务端(正式开启对外提供访问入口)
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
initRpcProvider(ip, port, appName, accessToken);
}

public void destroy(){
// 销毁作业仓库线程
if (jobThreadRepository.size() > 0) {
for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) {
removeJobThread(item.getKey(), "web容器销毁然后终止作业.");
}
jobThreadRepository.clear();
}


// 销毁日志清理线程
JobLogFileCleanThread.getInstance().toStop();

// 销毁触发回调线程
TriggerCallbackThread.getInstance().toStop();

// 销毁执行器服务端(正式关闭对外访问入口)
stopRpcProvider();
}
}

start方法:

  1. 配置日志路径

xxl.job.executor.logpath参数定义日志路径,默认值为:/data/applogs/xxl-job/jobhandler/gluesource

  1. 构建调度器客户端

根据xxl.job.admin.addresses生成一个RPC调用对象,而最终RPC的调用对象的地址是拼接了一个/api,使用Jetty的网络包和Hessian的序列化。

  1. 配置日志清理线程

日志保留天数最少保留4天,否则永久保留。

  1. 构建触发回调线程

调度器调用执行器执行结束后,反馈结果给调度器

  1. 构建执行客户端

构建外部可以访问的web入口,向调度器注册自身,提供被调度的入口。