Quartz学习
创始人
2024-02-06 23:47:47
0次
任务执行流程
- StdSchedulerFactory创建和属性初始化
如果自定义了属性,会在这里加载

- StdScheduler创建
入口为StdSchedulerFactory#getScheduler();
,首次进入时调用StdSchedulerFactory#instantiate
: - 如果没有配置自定义属性,则先读取默认的quartz.properties文件配置
- 创建ClassLoadHelper并初始化

- 创建
JobFactory
,默认实现为SimpleJobFactory,用来产生用户自定义的Job接口实现类的实例 - 创建
ThreadPool
,默认实现为org.quartz.simpl.SimpleThreadPool
,并将org.quartz.threadPool
开头的配置通过反射的方式赋予实例,这就是用来执行定时任务的线程池。
如org.quartz.threadPool.threadCount: 10
,org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
- 初始化
JobStore
,默认为RAMJobStore
,并将org.quartz.jobStore
相关配置赋值(如org.quartz.jobStore.misfireThreshold: 60000
控制misfire阈值) - 如果有数据源持久化配置,读取前缀为
org.quartz.dataSource
的进行配置 - 如果有SchedulerPlugin插件配置,读取前缀为
org.quartz.plugin
的进行配置 - 如果有JobListener配置,读取前缀为
org.quartz.jobListener
的进行配置 - 如果有TriggerListener配置,读取前缀为
org.quartz.triggerListener
的进行配置 - 创建ThreadExecutor,未配置时默认创建DefaultThreadExecutor ,execute(thread)时直接执行thread#start`
- 创建JobRunShellFactory,被用来构建JobRunShell实例
未开启org.quartz.scheduler.wrapJobExecutionInUserTransaction
时,默认创建JTAAnnotationAwareJobRunShellFactory
- 创建QuartzSchedulerResources,包装上述组件和一些配置信息
- 为ThreadPool#setInstanceName 为配置项
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
,setInstanceId为org.quartz.scheduler.instanceId:NON_CLUSTERED
- SimpleThreadPool#initialize
- 创建
org.quartz.threadPool.threadCount: 10
个实现自Thread的WorkerThread
,线程前缀为如DefaultQuartzScheduler_Worker-
,则工作线程名为如DefaultQuartzScheduler_Worker-1
。最后都放入列表workers
- 启动workers中所有WorkerThread,并加入
SimpleThreadPool.availWorkers
等待派发任务 - WorkerThread#run方法会不断循环,尝试执行内部runnable,没有派发的runnable就lock.wait(500)休息500ms;如果有任务,就执行,完成后将本WorkerThread重新加入
SimpleThreadPool.availWorkers
- 使用上述信息创建QuartzScheduler,可用来注册调度job、JobListener等
- 创建并启动QuartzSchedulerThread,初始
paused=true,halted=false
- QuartzSchedulerThread#run方法,halted==false时,会不断做以下工作:
- RAMJobStore#acquireNextTriggers,获取不超过当前时间30秒的、
Math.min(availThreadCount, qsRsrcs.getMaxBatchSize())
个OperableTrigger。
具体来说,是从TreeSet timeTriggers
中获取。依次比较nextFireTime
越早越优先(trigger在首次scheduler#schedule时会调用computeFirstFireTime存储,以后每次触发就更新)、优先级越大越优先、key字典序排序优先。 - 对取出的Trigger计算Misfire规则
- misfireTime = now - org.quartz.jobStore.misfireThreshold: 60000
- 如果下次nextFireTime==null 或 nextFireTime > misfireTime 或 配置了 MisfireHandlingInstructionIgnoreMisfires,则说明不会misfire
- 否则说明会misfire,先通知triggerListeners该trigger发生misfire,然后根据MISFIRE_INSTRUCTION策略按需更新trigger.newFireTime
- 最后,如果nextFireTime为null,则说明已经STATE_COMPLETE,通知SchedulerListener,并从timeTriggers移除;否则继续判断newFireTime是否已经被更新,若未更新则表示没有misfire(ignore),否则表示已经更新且当次触发misfire。
- 如果trigger触发了mifire且nextFireTime不为null,则更新到timeTriggers
- 如果trigger.nextFireTime 超过当前时间30秒的,则加入timeTriggers
- 如果Job含有
@DisallowConcurrentExecution
注解,说明不能并行运行,则该jobKey除了第一个以外的Trigger都放入timeTriggers等待下次触发 - 更新可触发的trigger.state=TriggerWrapper.STATE_ACQUIRED
- 因为默认的maxBatchSize为1,所以这里就只取这个trigger
- 等到trigger.nextFireTime距离now 2ms以内时,就goAhead:RAMJobStore#triggersFired:
- 获取prevFireTime
- 更新nextFireTime
- tw.state = TriggerWrapper.STATE_WAITING;
- 将上述信息组装为运行时信息类TriggerFiredBundle
- Job含有
@DisallowConcurrentExecution
注解,则同jobKey的其他trigger变为TriggerWrapper.STATE_BLOCKED,并从timeTriggers移除,放入blockedJobs;否则放入timeTriggers等待下次触发 - JobRunShellFactory使用TriggerFiredBundle创建JobRunShell,传入StdCheduler、TriggerFiredBundle,并调用其initialize方法,创建用户自定义Job的实例
- threadPool#runInThread(JobRunShell),使用一个可用WorkerThread来运行JobRunShell
更多好文
- 阅读源码——Quartz源码分析之任务存储
- 阅读源码——Quartz源码分析之任务调度
相关内容