Xxl-Job调度器原理解析

本文最后更新于:2022年9月19日 早上

项目解析源码地址:xxl-job
xxl-job版本:2.3.0
Xxl-Job分为执行器、调度器。而我们平时的客户端就属于一个执行器,执行器启动的时候会自动注册到调度器上,然后调度器进行远程调度。

image.png

调度器初始化过程步骤如下

1 国际化相关

配置参数: xxl.job.i18n=zh_CN, 这里设置为中文简体

2 初始化快线程fastTriggerPool、慢线程池slowTriggerPool

配置参数:xxl.job.triggerpool.fast.max=200, 这里设置为fastTriggerPool的最大线程数=200, 不能小于200
xxl.job.triggerpool.slow.max=100, 这里设置为slowTriggerPool的最大线程数=100, 不能小于100

3 启动注册监听线程

3.1 初始化registryOrRemoveThreadPool线程池:用于注册或者移除的线程池,客户端调用api/registry或api/registryRemove接口时,会用这个线程池进行注册或注销
3.2 启动监听注册的线程registryMonitorThread:清除心跳超过90s的注册信息,并且刷新分组注册信息

4 启动失败任务监听线程(重试、告警)

配置参数:spring.mail.from=[email protected], 告警邮箱

5 启动监控线程

5.1 初始化callbackThreadPool线程池:用于callback回调的线程池,客户端调用api/callback接口时会使用这个线程池
5.2 启动监控线monitorThread:调度记录停留在 “运行中” 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败

6 启动日志统计和清除线程logrThread

-- 日志记录刷新,刷新最近三天的日志Report(即统计每天的失败、成功、运行次数等)
-- 每天清除一次失效过期的日志数据
配置参数:xxl.job.logretentiondays=30, 清除xxl-job数据库日志的过期时间, 小于7天则不清除

7 启动任务调度(**很重要!!**主要靠这两个线程进行塞数据到时间轮,然后时间轮取数调度任务)

7.1 scheduleThread线程-取待执行任务数据入时间轮(塞数据)
-- 第一步:用select for update 数据库作为分布式锁加锁,避免多个xxl-job admin调度器节点同时执行
-- 第二步:预读数据,从数据库中读取当前截止到五秒后内会执行的job信息,并且读取分页大小为preReadCount=6000条数据
-— preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
-- 第三步:将当前时间与下次调度时间对比,有如下三种情况
**** 当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS(5s)):可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理
-——- 1、匹配过期失效的策略:DO_NOTHING=过期啥也不干,废弃;FIRE_ONCE_NOW=过期立即触发一次
-——- 2、刷新上一次触发 和 下一次待触发时间
**** 当前时间 大于 任务的下一次触发时间 并且是没有过期的:
-——- 1、直接触发任务执行器
-——- 2、刷新上一次触发 和 下一次待触发时间
-——- 3、如果下一次触发在五秒内,直接放进时间轮里面待调度
-————— 1、求当前任务下一次触发时间所处一分钟的第N秒
-————— 2、将当前任务ID和ringSecond放进时间轮里面
-————— 3、刷新上一次触发 和 下一次待触发时间
**** 当前时间 小于 下一次触发时间:
-——- 1、求当前任务下一次触发时间所处一分钟的第N秒
-——- 2、将当前任务ID和ringSecond放进时间轮里面
-——- 3、刷新上一次触发 和 下一次待触发时间
-- 第四步:更新数据库执行器信息,如trigger_last_time、trigger_next_time

-- 第五步:提交数据库事务,释放数据库select for update排它锁

7.2 ringThread线程-根据时间轮执行job任务 (取数据执行)
首先时间轮数据格式为:Map<Integer, List> ringData = new ConcurrentHashMap<>()
-- 第一步:获取当前所处的一分钟第几秒,然后for两次,第二次是为了重跑前面一个刻度没有被执行的的job list,避免前面的刻度遗漏了
-- 第二步:执行触发器
-- 第三步:清除当前刻度列表的数据
**** 执行的过程中还会选择对应的策略,如下:
-——- 阻塞策略:串行、废弃后面、覆盖前面
-——- 路由策略:取第一个、取最后一个、最小分发、一致性hash、快速失败、LFU最不常用、LRU最近最少使用、随机、轮询

初始化的入口代码为 XxlJobAdminConfig如下

@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
    private static XxlJobAdminConfig adminConfig = null;
    public static XxlJobAdminConfig getAdminConfig() {
        return adminConfig;
    }
    // ---------------------- XxlJobScheduler ----------------------
    private XxlJobScheduler xxlJobScheduler;
    @Override
    public void afterPropertiesSet() throws Exception {// 生命周期中的属性注入来对xxlJobScheduler初始化
        adminConfig = this;
        // 初始化xxl-job定时任务
        xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
    }
    @Override
    public void destroy() throws Exception { // 生命周期中的销毁来对xxlJobScheduler销毁
        xxlJobScheduler.destroy();
    }
    ..............省略..............
}
     

xxlJobScheduler.init()进行初始化会执行如下过程:

public class XxlJobScheduler {
    private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);
    public void init() throws Exception {
        // 1 国际化相关 init i18n
        initI18n();
        // 2 初始化快线程池fastTriggerPool、慢线程池slowTriggerPool admin trigger pool start
        JobTriggerPoolHelper.toStart();
        // 3 启动注册监听线程 admin registry monitor run
        JobRegistryHelper.getInstance().start();
        // 4 启动失败任务监听线程(重试、告警) admin fail-monitor run
        JobFailMonitorHelper.getInstance().start();
        // 5 启动监控线程(调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败)admin lose-monitor run ( depend on JobTriggerPoolHelper )
        JobCompleteHelper.getInstance().start();
        // 6 启动日志统计和清除线程(日志记录刷新,刷新最近三天的日志Report(即统计每天的失败、成功、运行次数等);每天清除一次失效过期的日志数据)admin log report start
        JobLogReportHelper.getInstance().start();
        // 7 启动任务调度(scheduleThread-取待执行任务数据入时间轮;ringThread-根据时间轮执行job任务) start-schedule ( depend on JobTriggerPoolHelper )
        JobScheduleHelper.getInstance().start();
        logger.info(">>>>>>>>> init xxl-job admin success.");
    }
    ................省略........................
}
     

上面初始化的7个步骤拆分如下

1 国际化相关

private void initI18n(){// 根据环境设置title为中文、英文等
    for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {
        item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));
    }
}
     

2 初始化快线程fastTriggerPool、慢线程池slowTriggerPool

这个步骤初始化了两个线程池fastTriggerPool和slowTriggerPool
在触发调度的时候会有一个选择快慢线程池的过程,如果job在一分钟内超过超过10次,就用slowTriggerPool来处理,如下:

ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job在一分钟内超过超过10次,就用slowTriggerPool来处理 job-timeout 10 times in 1 min
    triggerPool_ = slowTriggerPool;
}
triggerPool_.execute(new Runnable() {.........省略............}
     

3 启动注册监听线程

3.1 初始化registryOrRemoveThreadPool线程池:用于注册或者移除的线程池,客户端调用api/registry或api/registryRemove接口时,会用这个线程池进行注册或注销
3.2 启动监听注册的线程registryMonitorThread:清除心跳超过90s的注册信息,并且刷新分组注册信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public void start(){
// 用于注册或者移除的线程池,客户端调用api/registry或api/registryRemove接口时,会用这个线程池进行注册或注销 for registry or remove
registryOrRemoveThreadPool = new ThreadPoolExecutor(
2,
10,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
}
});

// 启动监听注册的线程 for monitor
registryMonitorThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// 获取自动注册的执行器组(执行器地址类型:0=自动注册、1=手动录入) auto registry group
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {// group组集合不为空
// 移除死掉的调用地址(心跳时间超过90秒,就当线程挂掉了。默认是30s做一次心跳) remove dead address (admin/executor)
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {// 移除挂掉的注册地址信息
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}

// fresh online address (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
// 找出所有正常没死掉的注册地址
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
// 确保是 EXECUTOR 执行器类型
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
if (registryList == null) {
registryList = new ArrayList<String>();
}



if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
appAddressMap.put(appname, registryList);
}
}
}



// 刷新分组注册地址信息 fresh group address
for (XxlJobGroup group: groupList) {
List<String> registryList = appAddressMap.get(group.getAppname());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
Collections.sort(registryList);
StringBuilder addressListSB = new StringBuilder();
for (String item:registryList) {
addressListSB.append(item).append(",");
}
addressListStr = addressListSB.toString();
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
group.setAddressList(addressListStr);
group.setUpdateTime(new Date());



XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
}
});
registryMonitorThread.setDaemon(true);
registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
registryMonitorThread.start();
}

4 启动失败任务监听线程(重试、告警)

这部分逻辑比较简单,就是重试 + 告警,核心代码如下

// 获取执行失败的job信息
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
   for (long failLogId: failLogIds) {
      // lock log
      int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
      if (lockRet < 1) {
         continue;
      }
      XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
      XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

      // 1、失败塞回重试       fail retry monitor
      if (log.getExecutorFailRetryCount() > 0) {
         JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
         String retryMsg = "<span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span>";
         log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
         XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
      }

      // 2、进行失败告警       fail alarm monitor
      int newAlarmStatus = 0;       // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
      if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
         boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
         newAlarmStatus = alarmResult?2:3;
      } else {
         newAlarmStatus = 1;
      }
      XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
   }
}
     

5 启动监控线程

5.1 初始化callbackThreadPool线程池:用于callback回调的线程池,客户端调用api/callback接口时会使用这个线程池
5.2 启动监控线monitorThread:调度记录停留在 “运行中” 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败

6 启动日志统计和清除线程logrThread

-- 日志记录刷新,刷新最近三天的日志Report(即统计每天的失败、成功、运行次数等)
-- 每天清除一次失效过期的日志数据
配置参数:xxl.job.logretentiondays=30, 清除xxl-job数据库日志的过期时间, 小于7天则不清除

7 启动任务调度(重点!!

7.1 scheduleThread线程-取待执行任务数据入时间轮(塞数据)
-- 第一步:用select for update 数据库作为分布式锁加锁,避免多个xxl-job admin调度器节点同时执行
-- 第二步:预读数据,从数据库中读取当前截止到五秒后内会执行的job信息,并且读取分页大小为preReadCount=6000条数据
-— preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
-- 第三步:将当前时间与下次调度时间对比,有如下三种情况
**** 当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS(5s)):可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理
-——- 1、匹配过期失效的策略:DO_NOTHING=过期啥也不干,废弃;FIRE_ONCE_NOW=过期立即触发一次
-——- 2、刷新上一次触发 和 下一次待触发时间
**** 当前时间 大于 任务的下一次触发时间 并且是没有过期的:
-——- 1、直接触发任务执行器
-——- 2、刷新上一次触发 和 下一次待触发时间
-——- 3、如果下一次触发在五秒内,直接放进时间轮里面待调度
-————— 1、求当前任务下一次触发时间所处一分钟的第N秒
-————— 2、将当前任务ID和ringSecond放进时间轮里面
-————— 3、刷新上一次触发 和 下一次待触发时间
**** 当前时间 小于 下一次触发时间:
-——- 1、求当前任务下一次触发时间所处一分钟的第N秒
-——- 2、将当前任务ID和ringSecond放进时间轮里面
-——- 3、刷新上一次触发 和 下一次待触发时间
-- 第四步:更新数据库执行器信息,如trigger_last_time、trigger_next_time

-- 第五步:提交数据库事务,释放数据库select for update排它锁

7.2 ringThread线程-根据时间轮执行job任务 (取数据执行)
首先时间轮数据格式为:Map<Integer, List> ringData = new ConcurrentHashMap<>()
-- 第一步:获取当前所处的一分钟第几秒,然后for两次,第二次是为了重跑前面一个刻度没有被执行的的job list,避免前面的刻度遗漏了
-- 第二步:执行触发器
-- 第三步:清除当前刻度列表的数据
**** 执行的过程中还会选择对应的策略,如下:
-——- 阻塞策略:串行、废弃后面、覆盖前面
-——- 路由策略:取第一个、取最后一个、最小分发、一致性hash、快速失败、LFU最不常用、LRU最近最少使用、随机、轮询

  • 启动两个线程解析的核心源码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    public void start(){

    // 启动调度线程,这些线程是用来取数据的 schedule thread
    scheduleThread = new Thread(new Runnable() {
    @Override
    public void run() {
    try {// 不知道为啥要休眠 4-5秒 时间,然后再启动
    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
    } catch (InterruptedException e) {
    if (!scheduleThreadToStop) {
    logger.error(e.getMessage(), e);
    }
    }
    logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

    // 这里是预读数量 pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
    int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

    while (!scheduleThreadToStop) {
    // 扫描任务 Scan Job
    long start = System.currentTimeMillis();
    Connection conn = null;
    Boolean connAutoCommit = null;
    PreparedStatement preparedStatement = null
    boolean preReadSuc = true;
    try {
    conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
    connAutoCommit = conn.getAutoCommit();
    conn.setAutoCommit(false);
    // 采用select for update,是排它锁。说白了xxl-job用一张数据库表来当分布式锁了,确保多个xxl-job admin节点下,依旧只能同时执行一个调度线程任务
    preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
    preparedStatement.execute();

    // tx start

    // 1、预读数据 pre read
    long nowTime = System.currentTimeMillis();
    // -- 从数据库中读取截止到五秒后未执行的job,并且读取preReadCount=6000条
    List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
    if (scheduleList!=null && scheduleList.size()>0) {
    // 2、push压进 时间轮 push time-ring
    for (XxlJobInfo jobInfo: scheduleList) {

    // time-ring jump
    if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
    // 当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS(5s)),可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理
    // 2.1、trigger-expire > 5s:pass && make next-trigger-time
    logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
    // 1、匹配过期失效的策略:DO_NOTHING=过期啥也不干,废弃;FIRE_ONCE_NOW=过期立即触发一次 misfire match
    MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
    if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
    // FIRE_ONCE_NOW 》 trigger
    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
    }
    // 2、刷新上一次触发 和 下一次待触发时间 fresh next
    refreshNextValidTime(jobInfo, new Date());
    } else if (nowTime > jobInfo.getTriggerNextTime()) {
    // 当前时间 大于 任务的下一次触发时间 并且是没有过期的
    // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
    // 1、直接触发任务执行器 trigger
    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
    // 2、刷新上一次触发 和 下一次待触发时间 fresh next
    refreshNextValidTime(jobInfo, new Date());

    // 如果下一次触发在五秒内,直接放进时间轮里面待调度 next-trigger-time in 5s, pre-read again
    if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
    // 1、求当前任务下一次触发时间所处一分钟的第N秒 make ring second
    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
    // 2、将当前任务ID和ringSecond放进时间轮里面 push time ring
    pushTimeRing(ringSecond, jobInfo.getId());
    // 3、刷新上一次触发 和 下一次待触发时间 fresh next
    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
    }

    } else {
    // 当前时间 小于 下一次触发时间
    // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
    // 1、求当前任务下一次触发时间所处一分钟的第N秒 make ring second
    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
    // 2、将当前任务ID和ringSecond放进时间轮里面 push time ring
    pushTimeRing(ringSecond, jobInfo.getId());
    // 3、刷新上一次触发 和 下一次待触发时间 fresh next
    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
    }
    }

    // 3、更新数据库执行器信息,如trigger_last_time、trigger_next_time update trigger info
    for (XxlJobInfo jobInfo: scheduleList) {
    XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
    }

    } else {
    preReadSuc = false;
    }
    // tx stop
    } catch (Exception e) {
    if (!scheduleThreadToStop) {
    logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
    }
    } finally {
    // 提交事务,释放数据库select for update的锁 commit
    .......................省略.............
    }
    long cost = System.currentTimeMillis()-start;

    // 如果执行太快了,就稍微sleep等待一下 Wait seconds, align second
    if (cost < 1000) { // scan-overtime, not wait
    try {
    // pre-read period: success > scan each second; fail > skip this period;
    TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
    } catch (InterruptedException e) {
    if (!scheduleThreadToStop) {
    logger.error(e.getMessage(), e);
    }
    }
    });
    scheduleThread.setDaemon(true);
    scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
    scheduleThread.start();


    // 时间轮线程,用于取出每秒的数据,然后处理 ring thread
    ringThread = new Thread(new Runnable() {
    @Override
    public void run() {
    while (!ringThreadToStop) {
    // align second
    try {
    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
    } catch (InterruptedException e) {
    if (!ringThreadToStop) {
    logger.error(e.getMessage(), e);
    }
    }
    try {
    // second data
    List<Integer> ringItemData = new ArrayList<>();
    // 获取当前所处的一分钟第几秒,然后for两次,第二次是为了重跑前面一个刻度没有被执行的的job list,避免前面的刻度遗漏了
    int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
    for (int i = 0; i < 2; i++) {
    List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
    if (tmpData != null) {
    ringItemData.addAll(tmpData);
    }
    }

    // ring trigger
    logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
    if (ringItemData.size() > 0) {
    // do trigger
    for (int jobId: ringItemData) {
    // 执行触发器 do trigger
    JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
    }
    // 清除当前刻度列表的数据 clear
    ringItemData.clear();
    }
    } catch (Exception e) {
    if (!ringThreadToStop) {
    logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
    }
    }
    }
    logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
    }
    });
    ringThread.setDaemon(true);
    ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
    ringThread.start();
    }

    References


Xxl-Job调度器原理解析
https://baymax55.github.io/2022/09/19/java/Xxl-Job调度器原理解析/
作者
baymax55
发布于
2022年9月19日
许可协议