程序入口
XxlJobAdminConfig 是 admin 的核心配置类,此处开始初始化调度器。
// 1.实现了 InitializingBean 接口,初始化完成后会调用此方法
@Override
public void afterPropertiesSet() throws Exception {
adminConfig = this;
xxlJobScheduler = new XxlJobScheduler();
//2.初始化调度器
xxlJobScheduler.init();
}
初始化6大核心处理
XxlJobScheduler 是 xxl-job-admin 非常核心的一个类。
1.JobRegistryMonitorHelper-启动一个线程,定时扫描过期的执行器、扫描执行器绑定到 对应的 appname 上。
2.JobFailMonitorHelper-启动一个线程,定时扫描需要重试的任务、如果设置了告警 那么触发消息通知。
3.JobLosedMonitorHelper-启动一个线程,定时扫描将任务处理结果丢失且超过10分钟,执行器没有了心跳💗的调度记录 主动处理为失败。
4.JobTriggerPoolHelper-初始化 一个快速处理的线程池和一个慢处理的线程池 分别执行时间消耗不一样的任务,加快任务执行性能,比较好的一个设计。
5.JobLogReportHelper-启动一个线程,定时扫描任务执行和日志信息统计称报告信息,用于展示。
6.JobScheduleHelper-启动一个线程,定时扫描5s中内需要执行的任务,触发任务处理。
public void init() throws Exception {
// 初始化 执行器阻塞策略 的国际化
initI18n();
// 1.删除过期的 执行器 2. 绑定 appname 和 多个执行器
JobRegistryMonitorHelper.getInstance().start();
// 1.重试需要重试的任务 2.告警设置了告警的任务
JobFailMonitorHelper.getInstance().start();
// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
JobLosedMonitorHelper.getInstance().start();
// 初始化了 一个 快速 和 一个慢 的 线程池
JobTriggerPoolHelper.toStart();
// 运行报告统计
JobLogReportHelper.getInstance().start();
// 定时扫描 需要执行的任务,并计算下一次执行的时间
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
核心-JobRegistryMonitorHelper
流程:
1.每30s 循环一次(所以执行器运行后,调度中心最慢将30s才会把它加入到执行器组) 查询出 执行器组列表(自动注册到 调度中心的执行器 会通过 appname 匹配到任务组,此定时任务将匹配的执行器的IP 挂载到 这个执行器组,相当于一个nginx 可以路由到多个 tomcat)
2.xxl_job_registry 是执行器运行后 注册到调度中心的记录表,记录执行器的相关信息,这里通过扫描 xxl_job_registry 的更新时间 确定执行器是否还存活(执行器会定时请求 调度中心的 /api/registry 接口 用户注册活更新心跳时间),超过 90s 没有更新的 执行器将被移除
3.获取到 存活的 执行器列表 通过appname 对比 绑定到 执行器组
public void start(){
// 1。删除过期的 执行器 2. 绑定 appname 和 多个执行器
registryThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// 查询出 执行器组列表(自动注册到 调度中心的执行器 会通过 appname 匹配到任务组,
// 此定时任务将匹配的执行器的IP 挂载到 这个执行器组,相当于一个nginx 可以路由到多个 tomcat)
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
// 查询出死亡的 执行器 从 xxl_job_registry 查询出 心跳时间超过90 秒没有更新的数据
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
//移除
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// 新的在线地址
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
//查询 刷新时间大于 当前时间 - 90s的,即符合存活条件的,---90s 内没更新的都算存活
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
// 扫描 执行器 list,通过 HashMap 的特性把 相同 appname 的执行器整合到一条数据
for (XxlJobRegistry item: list) {
//跟新 执行器集群下 每个组 的 list
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
if (registryList == null) {
registryList = new ArrayList<String>();
}
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
appAddressMap.put(appname, registryList);
}
}
}
// 循环 执行器组,通过appname 匹配 上面处理好的 map 集合,把相同 appname 的执行器地址通过逗号分隔的方式写入 执行器组
for (XxlJobGroup group: groupList) {
List<String> registryList = appAddressMap.get(group.getAppname());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
Collections.sort(registryList);
addressListStr = "";
for (String item:registryList) {
addressListStr += item + ",";
}
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
group.setAddressList(addressListStr);
//修改 xxl_job_group 一个 appname 绑定 多个地址
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
}
});
//设置为守护线程
registryThread.setDaemon(true);
registryThread.setName("xxl-job, admin JobRegistryMonitorHelper");
registryThread.start();
}
核心-JobFailMonitorHelper
JobFailMonitorHelper 定时任务用于处理:
1.重试需要重试的任务
2.告警设置了告警的任务
流程:
1.查询调用失败的日志信息(执行器执行完成后会回调请求 调度中心告知是否成功)
2.查询任务信息,判断是否设置了重试,如果设置了重试那么执行调用操作并更新日志
3.判断是否设置了告警,如果有那么触发告警接口,并更新日志的状态
public void start(){
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
// monitor
while (!toStop) {
try {
//查询出所有失败的任务日志
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
for (long failLogId: failLogIds) {
// lock log 通过改变 告警状态 加锁(字段状态来模拟锁操作)
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 1、如果重试的次数大于 0
if (log.getExecutorFailRetryCount() > 0) {
//重新调用
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
//更新执行器日志
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// 2、fail alarm monitor
int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
//任务存在 且 设置了 通知的 email
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
//通知操作,默认提供了 email 的实现,可以扩展 叮叮 和短信
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
//设置告警状态
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}
//更新告警状态
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
try {
//休息10秒,防止循环性能消耗过大
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");
}
});
//设置为守护线程
monitorThread.setDaemon(true);
monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
monitorThread.start();
}
核心-JobLosedMonitorHelper
JobLosedMonitorHelper 用于处理 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
流程:
1.查询 日志表调度日志 状态 10分钟没有更新的数据,并且对应的执行器处于不在线状态的 任务id
2.修改对应调度日志 状态为“失败”状态
public void start(){
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
// monitor
while (!toStop) {
try {
// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
if (losedJobIds!=null && losedJobIds.size()>0) {
for (Long logId: losedJobIds) {
XxlJobLog jobLog = new XxlJobLog();
jobLog.setId(logId);
jobLog.setHandleTime(new Date());
jobLog.setHandleCode(ReturnT.FAIL_CODE);
jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(jobLog);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
try {
//一分钟 检查一次
TimeUnit.SECONDS.sleep(60);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");
}
});
monitorThread.setDaemon(true);
monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");
monitorThread.start();
}
核心-JobTriggerPoolHelper
JobTriggerPoolHelper 用于初始化一个 快速线程池和一个慢线程池。
流程:
1.初始化一个快速线程池执行器
2.初始化一个慢线程池执行器
public void start(){
//初始化一个快速线程池执行器
fastTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
60L,
TimeUnit.SECONDS,
//声名一个队列
new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
}
});
//初始化一个慢线程池执行器
slowTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
}
});
}
核心-JobLogReportHelper
JobLogReportHelper 用于定时更新 日志信息、任务执行信息 的统计数据更新。
流程:
1.从日志信息从读取出相关统计数据并写入 日志统计表
public void start(){
logrThread = new Thread(new Runnable() {
@Override
public void run() {
// last clean log time
long lastCleanLogTime = 0;
while (!toStop) {
// 1、log-report refresh: refresh log report in 3 days
try {
for (int i = 0; i < 3; i++) {
// today
Calendar itemDay = Calendar.getInstance();
itemDay.add(Calendar.DAY_OF_MONTH, -i);
itemDay.set(Calendar.HOUR_OF_DAY, 0);
itemDay.set(Calendar.MINUTE, 0);
itemDay.set(Calendar.SECOND, 0);
itemDay.set(Calendar.MILLISECOND, 0);
Date todayFrom = itemDay.getTime();
itemDay.set(Calendar.HOUR_OF_DAY, 23);
itemDay.set(Calendar.MINUTE, 59);
itemDay.set(Calendar.SECOND, 59);
itemDay.set(Calendar.MILLISECOND, 999);
Date todayTo = itemDay.getTime();
// refresh log-report every minute 每分钟刷新日志报告
XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
xxlJobLogReport.setTriggerDay(todayFrom);
xxlJobLogReport.setRunningCount(0);
xxlJobLogReport.setSucCount(0);
xxlJobLogReport.setFailCount(0);
Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
if (triggerCountMap!=null && triggerCountMap.size()>0) {
int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;
xxlJobLogReport.setRunningCount(triggerDayCountRunning);
xxlJobLogReport.setSucCount(triggerDayCountSuc);
xxlJobLogReport.setFailCount(triggerDayCountFail);
}
// do refresh 更新到数据库
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
if (ret < 1) {
XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);
}
}
// 2、log-clean: switch open & once each day
if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0
&& System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {
// expire-time
Calendar expiredDay = Calendar.getInstance();
expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());
expiredDay.set(Calendar.HOUR_OF_DAY, 0);
expiredDay.set(Calendar.MINUTE, 0);
expiredDay.set(Calendar.SECOND, 0);
expiredDay.set(Calendar.MILLISECOND, 0);
Date clearBeforeTime = expiredDay.getTime();
// clean expired log
List<Long> logIds = null;
do {
logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);
if (logIds!=null && logIds.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);
}
} while (logIds!=null && logIds.size()>0);
// update clean time
lastCleanLogTime = System.currentTimeMillis();
}
try {
//一分钟一次
TimeUnit.MINUTES.sleep(1);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job log report thread stop");
}
});
logrThread.setDaemon(true);
logrThread.setName("xxl-job, admin JobLogReportHelper");
logrThread.start();
}
核心-JobScheduleHelper
scheduleThread 扫描任务线程
流程:
1.启动一个线程,随机休眠 4-5s 防止 多个线程同时启动竞争(猜想)
2.计算出一次性读取的任务个数,给出的 计算公式为:pre-read count: treadpool-size * trigger-qps,(1s=1000ms;每个任务花费50ms,qps = 1000/50 = 20),一秒钟可以处理20个任务,默认值为 6000 个
3.查询 xxl_job_lock 表 只有一条数据 用做读取任务的锁,每次调度者读取任务之前要先获得这把锁,通过 mysql for update 悲观锁的方式实现,保证任务不会被重复执行。
4.查询任务状态为“运行中” 且下次执行时间在 5s中以内的任务。
5.循环任务判断,这里分三种情况:
- 任务的执行时间已经过期了 5s,那么直接忽略不执行,并用当前时间计算下次执行的时间。
- 任务的执行时间已经过期了 但没有大于 5s,可能是 2s、3s、4s,马上触发一次执行,并计算下一次执行时间, 判断计算出来的下一次执行时间是否在 5s以内,如果是 那么放入时间轮,并计算下一次执行的时间。
- 还没到任务执行时间的任务 放入时间轮,并计算下一次执行的时间。
6.批量把上面计算好的当前的执行时间 和 下一次的执行时间写入数据库 xxl_job_info 表
7.提交事务,释放获取到的锁🔒
8.休眠判断:如果处理上面的逻辑耗时小于1秒,那么进入休眠 判断有数据处理那么休眠一秒随机减0到1秒(为了对齐秒数),没有任务处理 多休眠些,5秒随机减0到1秒(为了对齐秒数),对齐秒数是为了 保证任务执行时间的准确性。
// schedule thread
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
// 随机休眠 4-5s 防止 多个 调度中心同时启动,减少竞争
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
//预 准备大小
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
// 预读计数:treadpool-size * trigger-qps(1s=1000ms;每个任务花费50ms,qps = 1000/50 = 20),一秒钟可以处理20个任务
// 默认 6000 个
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
//使用数据库锁🔒 读取即将运行的任务,防止重复运行
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、pre read
long nowTime = System.currentTimeMillis();
//查询 运行中的任务 且 下一次执行时间 在5s 内的
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring 加入到时间轮中
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
//如果已经过期5s了,那么忽略这次
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
//如果已经过期5s了,那么忽略这次,用当前时间 计算下一次执行时间
// fresh next
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
//过期时间在5s 以内直接触发 && 处理下次触发时间
// 1、trigger 触发 加入到
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next
//计算下一次的执行时间
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again
// 下一次触发时间为5s内,再次预读,,,减少性能消耗
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second 计算还有多少秒
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 还没有到执行时间的 job
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 触发预读:时间环触发&&下一次触发时间
// 计算得到下次执行的 秒数
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、放入时间轮
pushTimeRing(ringSecond, jobInfo.getId());
// 3、刷新下一次执行时间
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、修改触发时间
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
} finally {
// commit
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
long cost = System.currentTimeMillis()-start;
// 耗时 小于 1000 毫秒,则需要休眠,否则不休眠
if (cost < 1000) { //扫描-超时,而不是等待
try {
// 预读期:成功>每秒扫描;失败>跳过这段时间;
// 有任务处理 休眠 1000 毫秒减去(随机0到1秒),没有任务 5000 毫秒减去(随机0到1秒)
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
});
什么是时间轮
为了解决定时触发任务的场景需求,引入了生活中时钟 ⏰ 的概念,用一个环形数组存放槽位,每一个槽位对应当前时间轮的最小精度,如果出现超过当前时间轮最大表示范围的情况,可以上升到上层时间轮,上层时间轮的最小精度为下层时间轮的最大值。
这里只用到了一层时间轮,拆分为60个槽位,分别对应0到59秒,使用 ConcurrentHashMap 来实现,key为0到59秒,值为 list 放入对应秒需要执行的任务。
ringThread 时间轮线程
流程:
1.通过休眠 对齐秒数
2.获取当前这一秒和前一秒的任务放入ringItemData 循环触发任务,这里获取前一秒的数据是为了防止比较繁忙的情况下 前序流程耗时较长,前一秒有任务未执行完成✅,做一个补充执行,只在比较繁忙的情况下才会出现。
3.做秒数校对
// 时间轮线程
ringThread = new Thread(new Runnable() {
@Override
public void run() {
// 对齐秒数
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
while (!ringThreadToStop) {
try {
// 秒 数据
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
// 避免处理前序流程耗时太长,导致上一秒的任务没有执行,向前获取一秒,补充执行
// 取当前这一秒 需要执行的任务,从时间轮中移除 放入 ringItemData,供下面调用执行
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// 时间轮触发任务
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
//如果时间轮有数据 那么循环执行
// do trigger
for (int jobId: ringItemData) {
// 触发 任务
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// clear 时间轮
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
// 下一秒 校对秒数
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
任务触发 JobTriggerPoolHelper.trigger
JobTriggerPoolHelper.trigger 方法触发任务
流程:
1.根据前面任务的执行情况,判断走 快速线程还是满线程
2.做任务调用,通过http 请求执行器
3.统计执行时间,1分钟内 累计投递超过500ms 的次数超过10次,那么下一次此任务执行将放入慢任务池
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
// 选择线程池
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // 1分钟内作业超时10次
//替换成 慢线程池
triggerPool_ = slowTriggerPool;
}
// trigger
triggerPool_.execute(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
// 做任务调用
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// check timeout-count-map
// 计算出现在的 分
// 是否在同一分钟执行的,不在同一分钟 清空超时计数器
// 清空是因为 慢作业的定义是 1分钟内 累计投递超过500ms 的次数超过10次
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {
minTim = minTim_now;
jobTimeoutCountMap.clear();
}
// incr timeout-count-map
long cost = System.currentTimeMillis()-start;
// 超时阈值500毫秒
if (cost > 500) { // ob-timeout threshold 500ms
//放入 jobTimeoutCountMap
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}
}
}
});
}
XxlJobTrigger-trigger做任务触发
流程:
1.从数据库查询出任务信息
2.获取设置的重试次数
3.查询出对应的任务组信息
4.判断是否在执行任务时录入了执行器地址,如果没有录入则用默认的执行器列表
5.数据分片处理
6.processTrigger 做 http 请求 执行器
public static void trigger(int jobId,
TriggerTypeEnum triggerType,
int failRetryCount,
String executorShardingParam,
String executorParam,
String addressList) {
// 从数据库 加载任务数据
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
return;
}
if (executorParam != null) {
jobInfo.setExecutorParam(executorParam);
}
//获取重试次数
int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
//查询出 任务对应的 执行器组信息
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
// 如果手动录入了 执行器地址列表,那么覆盖默认的执行器地址列表
if (addressList!=null && addressList.trim().length()>0) {
group.setAddressType(1);
group.setAddressList(addressList.trim());
}
// 分片处理
int[] shardingParam = null;
if (executorShardingParam!=null){
String[] shardingArr = executorShardingParam.split("/");
if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
shardingParam = new int[2];
shardingParam[0] = Integer.valueOf(shardingArr[0]);
shardingParam[1] = Integer.valueOf(shardingArr[1]);
}
}
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
// 分片流程
for (int i = 0; i < group.getRegistryList().size(); i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
} else {
if (shardingParam == null) {
shardingParam = new int[]{0, 1};
}
// 正常流程
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
}
}
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
// param
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // 阻塞策略--串行
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // 路由策略
//分片参数
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
// 1、保存日志
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
jobLog.setTriggerTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2、初始化触发器参数
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(index);
triggerParam.setBroadcastTotal(total);
// 3、初始化地址信息
String address = null;
ReturnT<String> routeAddressResult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
// 如果是分片广播
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
// 根据 index 选择出 执行器
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
// 只有一台则 就选择这一台
address = group.getRegistryList().get(0);
}
} else {
// 非分片,策略模式,根据对应的 路由策略 返回执行器地址
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
} else {
// 没有执行者
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
}
// 4、触发远程执行器
ReturnT<String> triggerResult = null;
if (address != null) {
// 运行执行器
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
}
// 5、记录 触发信息
StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
.append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
if (shardingParam != null) {
triggerMsgSb.append("("+shardingParam+")");
}
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
.append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
// 6、保存触发信息
jobLog.setExecutorAddress(address);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setExecutorShardingParam(shardingParam);
jobLog.setExecutorFailRetryCount(finalFailRetryCount);
//jobLog.setTriggerTime();
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
消息告警机制实现
在 核心-JobFailMonitorHelper 中会根据执行日志结果及是否设置了告警机制,触发告警
xxl-job 默认提供了 邮件告警,可以自行扩展,只需要实现 JobAlarm 接口即可,这里是运用到了 策略的设置模式
xxl-job-admin 提供的接口
JobApiController 用于对执行器提供接口
分别提供了三种接口:
callback:任务执行结果的回调,更新执行日志信息,如果设置了子任务触发子任务。
registry:作为注册接口也是心跳💗接口,更新 信息的 update_time
registryRemove:注销执行器接口
/**
* api 处理 client 来的请求
*
* @param uri
* @param data
* @return
*/
@RequestMapping("/{uri}")
@ResponseBody
@PermissionLimit(limit=false)
public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {
// valid
if (!"POST".equalsIgnoreCase(request.getMethod())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri==null || uri.trim().length()==0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null
&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0
&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// services mapping
if ("callback".equals(uri)) {
List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
return adminBiz.callback(callbackParamList);
} else if ("registry".equals(uri)) {
// 也用作 心跳接口, 存在就更新时间,不存在则 添加
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registry(registryParam);
} else if ("registryRemove".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registryRemove(registryParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
}
代码:https://gitee.com/zhang-purui/xxl-job-admin-code-understand.git
评论区