Xxl-Job执行器原理解析

本文最后更新于:2022年9月19日 早上

项目解析源码地址:xxl-job
xxl-job版本:2.3.0
Xxl-Job分为执行器、调度器。而我们平时的客户端就属于一个执行器,执行器启动的时候会自动注册到调度器上,然后调度器进行远程调度。

image.png

执行器初始化过程步骤如下

1 通过加了@Conguration注解的XxlJobConfig初始化,并生成beanName=xxlJobExecutor的Bean
2 注册的BeanName=XxlJobConfig,会进行初始化,步骤如下:

  • 扫描所有bean,加载加了@XxlJob注解类,并记录在jobHandlerRepository
  • 选择工厂类:GlueFactory 或 SpringGlueFactory

-- SpringGlueFactory会拦截当前实例属性 判断是否加了@Resource或@Autowired注解,如果存在就获取当前对应的bean实例,然后通过反射注入成员中

  • 启动,其步骤如下:

-– 1 初始化存放执行日志目录文件 — 2 初始化执行者,管理客户端 — 3 初始化日志清除线程,一天执行一次,默认清除N天(可配置)前数据 — 4 初始化回调触发器线程,线程执行完会把数据回调调度器接口告诉他结果 — 5 初始化执行服务器,初始化netty服务器,并发客户端信息注册到调度器上


接下来对各个源码进行一定的解析

1 通过加了@Conguration注解的XxlJobConfig初始化,并生成beanName=xxlJobExecutor的Bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
public class XxlJobConfig {// 注入XxlJob相关的配置信息,并生成Bean xxlJobExecutor
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
//省略
@Bean

public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
//省略
return xxlJobSpringExecutor;
}
}

2 注册的BeanName=XxlJobConfig,会进行初始化,步骤如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);

// start
@Override
public void afterSingletonsInstantiated() {
// init JobHandler Repository
/*initJobHandlerRepository(applicationContext);*/
// 扫描所有bean,加载加了@XxlJob注解类,并记录在jobHandlerRepository init JobHandler Repository (for method)
initJobHandlerMethodRepository(applicationContext);
// 选择工厂类:GlueFactory 或 SpringGlueFactory 选择工厂类:GlueFactory 或 SpringGlueFactory refresh GlueFactory
GlueFactory.refreshInstance(1);
// 启动,其步骤如下:
//--- 1 初始化存放执行日志目录文件
//--- 2 初始化执行者,管理客户端
//--- 3 初始化日志清除线程
//--- 4 初始化回调触发器线程
//--- 5 初始化执行服务器
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

上面我们对启动super.start()步骤来做具体的分析,其分析代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void start() throws Exception {
// 1 初始化存放执行日志目录文件 init logpath
XxlJobFileAppender.initLogPath(logPath);
// 2 初始化执行者,管理客户端 init invoker, admin-client
initAdminBizList(adminAddresses, accessToken);
// 3 初始化日志清除线程 init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// 4 初始化回调触发器线程 init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
// 5 初始化执行服务器 init executor-server
initEmbedServer(address, ip, port, appname, accessToken);
}

启动super.start() -> 初始化存放执行日志目录文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static String logBasePath = "/data/applogs/xxl-job/jobhandler";
private static String glueSrcPath = logBasePath.concat("/gluesource");

public static void initLogPath(String logPath) {
// init 判断有没有自定义目录
if (logPath != null && logPath.trim().length() > 0) {
logBasePath = logPath;
}
// mk base dir 如果当前目录为空,就创建一个
File logPathDir = new File(logBasePath);
if (!logPathDir.exists()) {
logPathDir.mkdirs();
}
logBasePath = logPathDir.getPath();// 获取创建的目录路径

// mk glue dir 创建glue目录路径,没有就创建然后获取
File glueBaseDir = new File(logPathDir, "gluesource");
if (!glueBaseDir.exists()) {
glueBaseDir.mkdirs();
}
glueSrcPath = glueBaseDir.getPath();
}

启动super.start() -> 初始化执行者,管理客户端

-- 把调度管理器的地址写入adminBizList中
-- 启动super.start() -> 初始化日志清除线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void start(final long logRetentionDays){

// 日志最多也只能清除三天前的 limit min value
if (logRetentionDays < 3 ) {
return;
}
// 启动一个本地线程,用于处理日志清除
localThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// 清除日志目录中超过logRetentionDays天的日志文件 clean log dir, over logRetentionDays
File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
if (childDirs!=null && childDirs.length>0) {
// today
Calendar todayCal = Calendar.getInstance();
todayCal.set(Calendar.HOUR_OF_DAY,0);
todayCal.set(Calendar.MINUTE,0);
todayCal.set(Calendar.SECOND,0);
todayCal.set(Calendar.MILLISECOND,0);
Date todayDate = todayCal.getTime();
for (File childFile: childDirs) {
// valid
if (!childFile.isDirectory()) {
continue;
}
if (childFile.getName().indexOf("-") == -1) {
continue;
}
// file create date
Date logFileCreateDate = null;
try {
// 将日志文件名转为时间
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
logFileCreateDate = simpleDateFormat.parse(childFile.getName());
} catch (ParseException e) {
logger.error(e.getMessage(), e);
}
if (logFileCreateDate == null) {
continue;
}
// 如果文件时间超过logRetentionDays天,就进行删除
if ((todayDate.getTime(

启动super.start() -> 初始化回调触发器线程

回调job执行结果给调度器,告诉调度器已执行完

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* 启动执行结果回调线程
*/
public void start() {
// 判断是否合法的admin地址 valid
if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
return;
}
// 启动回调触发线程 callback
triggerCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
// 进入回调循环处理中 normal callback
while(!toStop){
try {
// 任务执行完,会把执行结果塞入 LinkedBlockingQueue 中,然后LinkedBlockingQueue.tack()是阻塞形的,会阻塞等待执行结果
HandleCallbackParam callback = getInstance().callBackQueue.take();
if (callback != null) {

// 把执行结果全部搞出来,然后塞入callbackParamList,然后批量回调处理 callback list param
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback);
// callback, will retry if error
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);// 回调执行结果,告诉admin,请求的结果是 addressUrl+"api/callback"
}
}
} catch (Exception e) {......}
// 进行回到后的回归操作 last callback
try {
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);
}
} catch (Exception e) {..........}
logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");
}

启动super.start() ->初始化执行服务器

初始化netty服务器,并发客户端信息注册到调度器上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 初始化执行服务器
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
// 填充ip和端口号 fill ip port
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
// 地址为空,就根据ip:port生成新的地址 generate address
if (address==null || address.trim().length()==0) {
String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}
// accessToken
if (accessToken==null || accessToken.trim().length()==0) {
logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
}
// 实例化一个网络server start
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);// 启动server,基于netty
}
// embedServer.start 调用的就是以下方法,启动nettyServer服务器,然后请求注册到调度器上,调度器会通过netty通知来调度job
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {
@Override
public void run() {
// param
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(0, 200, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000), new ThreadFactory() {xxxxx});
try {
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
............省略.................
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThre

上面启动了nettyServer,并且对应的实现handler为EmbedHttpServerHandler,通过监听,最终调用如下代码:

-- 路径为:EmbedHttpServerHandler->channelRead0->process()->executorBiz.run(triggerParam)->ExecutorBizImpl#run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 执行job任务最终会调用到这里
* @param triggerParam 执行参数
*/
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// 加载出对应的jobHandler + jobThread load old:jobHandler + jobThread
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// valid:jobHandler + jobThread
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) {// bean的方式调用,现在一般都是这个方式,其他方式是怎么处理的就不解析了
// new jobhandler
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// 验证旧的任务处理器,如果不相等,就创建一个新的 valid old jobThread
if (jobThread!=null && jobHandler != newJobHandler) {
// change handler, need kill old thread
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;// 任务线程
jobHandler = null;// 任务处理器
}
// 验证处理器并再次赋值 valid handler
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
}
}
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {......省略多种其他方式......}

// 执行block策略 executor block strategy
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {// 废弃后面来的
// discard when running
if (jobThread.isRunningOrHasQueue()) {// 如果有运行中线程,就终止当前调用
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
}

References


Xxl-Job执行器原理解析
https://baymax55.github.io/2022/09/19/java/Xxl-Job执行器原理解析/
作者
baymax55
发布于
2022年9月19日
许可协议