Xxl-Job调度器原理解析
本文最后更新于:2022年9月19日 早上
项目解析源码地址:xxl-job
xxl-job版本:2.3.0
Xxl-Job分为执行器、调度器。而我们平时的客户端就属于一个执行器,执行器启动的时候会自动注册到调度器上,然后调度器进行远程调度。
调度器初始化过程步骤如下
1 国际化相关
配置参数: xxl.job.i18n=zh_CN, 这里设置为中文简体
2 初始化快线程fastTriggerPool、慢线程池slowTriggerPool
配置参数:xxl.job.triggerpool.fast.max=200, 这里设置为fastTriggerPool的最大线程数=200, 不能小于200
xxl.job.triggerpool.slow.max=100, 这里设置为slowTriggerPool的最大线程数=100, 不能小于1003 启动注册监听线程
3.1 初始化registryOrRemoveThreadPool线程池:用于注册或者移除的线程池,客户端调用api/registry或api/registryRemove接口时,会用这个线程池进行注册或注销
3.2 启动监听注册的线程registryMonitorThread:清除心跳超过90s的注册信息,并且刷新分组注册信息4 启动失败任务监听线程(重试、告警)
配置参数:spring.mail.from=[email protected], 告警邮箱
5 启动监控线程
5.1 初始化callbackThreadPool线程池:用于callback回调的线程池,客户端调用api/callback接口时会使用这个线程池
5.2 启动监控线monitorThread:调度记录停留在 “运行中” 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败6 启动日志统计和清除线程logrThread
-- 日志记录刷新,刷新最近三天的日志Report(即统计每天的失败、成功、运行次数等)
-- 每天清除一次失效过期的日志数据
配置参数:xxl.job.logretentiondays=30, 清除xxl-job数据库日志的过期时间, 小于7天则不清除7 启动任务调度(**很重要!!**主要靠这两个线程进行塞数据到时间轮,然后时间轮取数调度任务)
7.1 scheduleThread线程-取待执行任务数据入时间轮(塞数据)
-- 第一步:用select for update 数据库作为分布式锁加锁,避免多个xxl-job admin调度器节点同时执行
-- 第二步:预读数据,从数据库中读取当前截止到五秒后内会执行的job信息,并且读取分页大小为preReadCount=6000条数据
-— preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
-- 第三步:将当前时间与下次调度时间对比,有如下三种情况
**** 当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS(5s)):可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理
-——- 1、匹配过期失效的策略:DO_NOTHING=过期啥也不干,废弃;FIRE_ONCE_NOW=过期立即触发一次
-——- 2、刷新上一次触发 和 下一次待触发时间
**** 当前时间 大于 任务的下一次触发时间 并且是没有过期的:
-——- 1、直接触发任务执行器
-——- 2、刷新上一次触发 和 下一次待触发时间
-——- 3、如果下一次触发在五秒内,直接放进时间轮里面待调度
-————— 1、求当前任务下一次触发时间所处一分钟的第N秒
-————— 2、将当前任务ID和ringSecond放进时间轮里面
-————— 3、刷新上一次触发 和 下一次待触发时间
**** 当前时间 小于 下一次触发时间:
-——- 1、求当前任务下一次触发时间所处一分钟的第N秒
-——- 2、将当前任务ID和ringSecond放进时间轮里面
-——- 3、刷新上一次触发 和 下一次待触发时间
-- 第四步:更新数据库执行器信息,如trigger_last_time、trigger_next_time-- 第五步:提交数据库事务,释放数据库select for update排它锁
7.2 ringThread线程-根据时间轮执行job任务 (取数据执行)
首先时间轮数据格式为:Map<Integer, List> ringData = new ConcurrentHashMap<>()
-- 第一步:获取当前所处的一分钟第几秒,然后for两次,第二次是为了重跑前面一个刻度没有被执行的的job list,避免前面的刻度遗漏了
-- 第二步:执行触发器
-- 第三步:清除当前刻度列表的数据
**** 执行的过程中还会选择对应的策略,如下:
-——- 阻塞策略:串行、废弃后面、覆盖前面
-——- 路由策略:取第一个、取最后一个、最小分发、一致性hash、快速失败、LFU最不常用、LRU最近最少使用、随机、轮询
初始化的入口代码为 XxlJobAdminConfig如下
@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
private static XxlJobAdminConfig adminConfig = null;
public static XxlJobAdminConfig getAdminConfig() {
return adminConfig;
}
// ---------------------- XxlJobScheduler ----------------------
private XxlJobScheduler xxlJobScheduler;
@Override
public void afterPropertiesSet() throws Exception {// 生命周期中的属性注入来对xxlJobScheduler初始化
adminConfig = this;
// 初始化xxl-job定时任务
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}
@Override
public void destroy() throws Exception { // 生命周期中的销毁来对xxlJobScheduler销毁
xxlJobScheduler.destroy();
}
..............省略..............
}
xxlJobScheduler.init()进行初始化会执行如下过程:
public class XxlJobScheduler {
private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);
public void init() throws Exception {
// 1 国际化相关 init i18n
initI18n();
// 2 初始化快线程池fastTriggerPool、慢线程池slowTriggerPool admin trigger pool start
JobTriggerPoolHelper.toStart();
// 3 启动注册监听线程 admin registry monitor run
JobRegistryHelper.getInstance().start();
// 4 启动失败任务监听线程(重试、告警) admin fail-monitor run
JobFailMonitorHelper.getInstance().start();
// 5 启动监控线程(调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败)admin lose-monitor run ( depend on JobTriggerPoolHelper )
JobCompleteHelper.getInstance().start();
// 6 启动日志统计和清除线程(日志记录刷新,刷新最近三天的日志Report(即统计每天的失败、成功、运行次数等);每天清除一次失效过期的日志数据)admin log report start
JobLogReportHelper.getInstance().start();
// 7 启动任务调度(scheduleThread-取待执行任务数据入时间轮;ringThread-根据时间轮执行job任务) start-schedule ( depend on JobTriggerPoolHelper )
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
................省略........................
}
上面初始化的7个步骤拆分如下
1 国际化相关
private void initI18n(){// 根据环境设置title为中文、英文等
for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {
item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));
}
}
2 初始化快线程fastTriggerPool、慢线程池slowTriggerPool
这个步骤初始化了两个线程池fastTriggerPool和slowTriggerPool
在触发调度的时候会有一个选择快慢线程池的过程,如果job在一分钟内超过超过10次,就用slowTriggerPool来处理,如下:
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job在一分钟内超过超过10次,就用slowTriggerPool来处理 job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
triggerPool_.execute(new Runnable() {.........省略............}
3 启动注册监听线程
3.1 初始化registryOrRemoveThreadPool线程池:用于注册或者移除的线程池,客户端调用api/registry或api/registryRemove接口时,会用这个线程池进行注册或注销
3.2 启动监听注册的线程registryMonitorThread:清除心跳超过90s的注册信息,并且刷新分组注册信息
1 |
|
4 启动失败任务监听线程(重试、告警)
这部分逻辑比较简单,就是重试 + 告警,核心代码如下
// 获取执行失败的job信息
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
for (long failLogId: failLogIds) {
// lock log
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 1、失败塞回重试 fail retry monitor
if (log.getExecutorFailRetryCount() > 0) {
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "<span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// 2、进行失败告警 fail alarm monitor
int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
}
5 启动监控线程
5.1 初始化callbackThreadPool线程池:用于callback回调的线程池,客户端调用api/callback接口时会使用这个线程池
5.2 启动监控线monitorThread:调度记录停留在 “运行中” 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败
6 启动日志统计和清除线程logrThread
-- 日志记录刷新,刷新最近三天的日志Report(即统计每天的失败、成功、运行次数等)
-- 每天清除一次失效过期的日志数据
配置参数:xxl.job.logretentiondays=30, 清除xxl-job数据库日志的过期时间, 小于7天则不清除
7 启动任务调度(重点!!)
7.1 scheduleThread线程-取待执行任务数据入时间轮(塞数据)
-- 第一步:用select for update 数据库作为分布式锁加锁,避免多个xxl-job admin调度器节点同时执行
-- 第二步:预读数据,从数据库中读取当前截止到五秒后内会执行的job信息,并且读取分页大小为preReadCount=6000条数据
-— preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
-- 第三步:将当前时间与下次调度时间对比,有如下三种情况
**** 当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS(5s)):可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理
-——- 1、匹配过期失效的策略:DO_NOTHING=过期啥也不干,废弃;FIRE_ONCE_NOW=过期立即触发一次
-——- 2、刷新上一次触发 和 下一次待触发时间
**** 当前时间 大于 任务的下一次触发时间 并且是没有过期的:
-——- 1、直接触发任务执行器
-——- 2、刷新上一次触发 和 下一次待触发时间
-——- 3、如果下一次触发在五秒内,直接放进时间轮里面待调度
-————— 1、求当前任务下一次触发时间所处一分钟的第N秒
-————— 2、将当前任务ID和ringSecond放进时间轮里面
-————— 3、刷新上一次触发 和 下一次待触发时间
**** 当前时间 小于 下一次触发时间:
-——- 1、求当前任务下一次触发时间所处一分钟的第N秒
-——- 2、将当前任务ID和ringSecond放进时间轮里面
-——- 3、刷新上一次触发 和 下一次待触发时间
-- 第四步:更新数据库执行器信息,如trigger_last_time、trigger_next_time-- 第五步:提交数据库事务,释放数据库select for update排它锁
7.2 ringThread线程-根据时间轮执行job任务 (取数据执行)
首先时间轮数据格式为:Map<Integer, List> ringData = new ConcurrentHashMap<>()
-- 第一步:获取当前所处的一分钟第几秒,然后for两次,第二次是为了重跑前面一个刻度没有被执行的的job list,避免前面的刻度遗漏了
-- 第二步:执行触发器
-- 第三步:清除当前刻度列表的数据
**** 执行的过程中还会选择对应的策略,如下:
-——- 阻塞策略:串行、废弃后面、覆盖前面
-——- 路由策略:取第一个、取最后一个、最小分发、一致性hash、快速失败、LFU最不常用、LRU最近最少使用、随机、轮询
- 启动两个线程解析的核心源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173public void start(){
// 启动调度线程,这些线程是用来取数据的 schedule thread
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {// 不知道为啥要休眠 4-5秒 时间,然后再启动
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// 这里是预读数量 pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// 扫描任务 Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
// 采用select for update,是排它锁。说白了xxl-job用一张数据库表来当分布式锁了,确保多个xxl-job admin节点下,依旧只能同时执行一个调度线程任务
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、预读数据 pre read
long nowTime = System.currentTimeMillis();
// -- 从数据库中读取截止到五秒后未执行的job,并且读取preReadCount=6000条
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push压进 时间轮 push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS(5s)),可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 1、匹配过期失效的策略:DO_NOTHING=过期啥也不干,废弃;FIRE_ONCE_NOW=过期立即触发一次 misfire match
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
}
// 2、刷新上一次触发 和 下一次待触发时间 fresh next
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 当前时间 大于 任务的下一次触发时间 并且是没有过期的
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 1、直接触发任务执行器 trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、刷新上一次触发 和 下一次待触发时间 fresh next
refreshNextValidTime(jobInfo, new Date());
// 如果下一次触发在五秒内,直接放进时间轮里面待调度 next-trigger-time in 5s, pre-read again
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、求当前任务下一次触发时间所处一分钟的第N秒 make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、将当前任务ID和ringSecond放进时间轮里面 push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、刷新上一次触发 和 下一次待触发时间 fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 当前时间 小于 下一次触发时间
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、求当前任务下一次触发时间所处一分钟的第N秒 make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、将当前任务ID和ringSecond放进时间轮里面 push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、刷新上一次触发 和 下一次待触发时间 fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、更新数据库执行器信息,如trigger_last_time、trigger_next_time update trigger info
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
} finally {
// 提交事务,释放数据库select for update的锁 commit
.......................省略.............
}
long cost = System.currentTimeMillis()-start;
// 如果执行太快了,就稍微sleep等待一下 Wait seconds, align second
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
// 时间轮线程,用于取出每秒的数据,然后处理 ring thread
ringThread = new Thread(new Runnable() {
@Override
public void run() {
while (!ringThreadToStop) {
// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
// second data
List<Integer> ringItemData = new ArrayList<>();
// 获取当前所处的一分钟第几秒,然后for两次,第二次是为了重跑前面一个刻度没有被执行的的job list,避免前面的刻度遗漏了
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
// do trigger
for (int jobId: ringItemData) {
// 执行触发器 do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// 清除当前刻度列表的数据 clear
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}References