Quartz动态任务管理
简介
在实际应用中,我们经常需要在运行时动态地管理定时任务,例如根据业务需求动态添加新任务、修改已有任务的执行时间、暂停或恢复任务、删除不再需要的任务等。Quartz框架提供了丰富的API来支持这些动态任务管理操作。
本文将详细介绍如何使用Quartz实现动态任务管理,包括动态添加任务、修改任务、暂停和恢复任务、删除任务等功能,并提供完整的示例代码。
动态任务管理的核心API
Quartz提供了以下核心API用于动态任务管理:
scheduler.scheduleJob():添加新任务scheduler.rescheduleJob():修改已有任务scheduler.pauseJob()/scheduler.resumeJob():暂停和恢复任务scheduler.deleteJob():删除任务scheduler.triggerJob():立即执行任务
动态任务管理示例
1. 项目准备
首先创建一个Spring Boot项目,并添加Quartz依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>2. 动态任务管理器设计
创建一个动态任务管理器类,封装所有的动态任务管理操作:
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
@Component
public class DynamicJobManager {
@Autowired
private Scheduler scheduler;
/**
* 添加新任务
*/
public String addJob(Class<? extends Job> jobClass, String jobName, String jobGroup,
String cronExpression, JobDataMap jobDataMap) throws SchedulerException {
// 检查任务是否已存在
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
if (scheduler.checkExists(jobKey)) {
return "任务已存在: " + jobName + "-" + jobGroup;
}
// 创建JobDetail
JobDetail jobDetail = JobBuilder.newJob(jobClass)
.withIdentity(jobKey)
.withDescription("动态创建的任务")
.setJobData(jobDataMap)
.storeDurably()
.build();
// 创建CronTrigger
CronTrigger cronTrigger = TriggerBuilder.newTrigger()
.withIdentity("trigger_" + jobName, jobGroup)
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
.build();
// 注册任务和触发器
scheduler.scheduleJob(jobDetail, cronTrigger);
return "任务添加成功: " + jobName + "-" + jobGroup;
}
/**
* 修改任务的执行时间
*/
public String updateJob(String jobName, String jobGroup, String cronExpression) throws SchedulerException {
// 检查任务是否存在
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
if (!scheduler.checkExists(jobKey)) {
return "任务不存在: " + jobName + "-" + jobGroup;
}
// 获取旧的触发器
TriggerKey triggerKey = TriggerKey.triggerKey("trigger_" + jobName, jobGroup);
CronTrigger oldTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
// 创建新的触发器
CronTrigger newTrigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
.build();
// 重新调度任务
scheduler.rescheduleJob(triggerKey, newTrigger);
return "任务更新成功: " + jobName + "-" + jobGroup;
}
/**
* 暂停任务
*/
public String pauseJob(String jobName, String jobGroup) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
if (!scheduler.checkExists(jobKey)) {
return "任务不存在: " + jobName + "-" + jobGroup;
}
scheduler.pauseJob(jobKey);
return "任务暂停成功: " + jobName + "-" + jobGroup;
}
/**
* 恢复任务
*/
public String resumeJob(String jobName, String jobGroup) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
if (!scheduler.checkExists(jobKey)) {
return "任务不存在: " + jobName + "-" + jobGroup;
}
scheduler.resumeJob(jobKey);
return "任务恢复成功: " + jobName + "-" + jobGroup;
}
/**
* 删除任务
*/
public String deleteJob(String jobName, String jobGroup) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
if (!scheduler.checkExists(jobKey)) {
return "任务不存在: " + jobName + "-" + jobGroup;
}
scheduler.deleteJob(jobKey);
return "任务删除成功: " + jobName + "-" + jobGroup;
}
/**
* 立即执行任务
*/
public String triggerJob(String jobName, String jobGroup, JobDataMap jobDataMap) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
if (!scheduler.checkExists(jobKey)) {
return "任务不存在: " + jobName + "-" + jobGroup;
}
scheduler.triggerJob(jobKey, jobDataMap);
return "任务触发成功: " + jobName + "-" + jobGroup;
}
/**
* 获取所有任务信息
*/
public List<JobInfo> getAllJobs() throws SchedulerException {
List<JobInfo> jobInfos = new ArrayList<>();
// 获取所有的JobGroup
List<String> jobGroups = scheduler.getJobGroupNames();
// 遍历每个JobGroup
for (String jobGroup : jobGroups) {
// 获取该JobGroup下所有的JobKey
Set<JobKey> jobKeys = scheduler.getJobKeys(GroupMatcher.jobGroupEquals(jobGroup));
// 遍历每个JobKey
for (JobKey jobKey : jobKeys) {
// 获取JobDetail
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
// 获取与该Job关联的所有Trigger
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
// 遍历每个Trigger
for (Trigger trigger : triggers) {
JobInfo jobInfo = new JobInfo();
jobInfo.setJobName(jobKey.getName());
jobInfo.setJobGroup(jobKey.getGroup());
jobInfo.setJobDescription(jobDetail.getDescription());
jobInfo.setJobClassName(jobDetail.getJobClass().getName());
jobInfo.setJobDataMap(jobDetail.getJobDataMap());
// 设置Trigger信息
jobInfo.setTriggerName(trigger.getKey().getName());
jobInfo.setTriggerGroup(trigger.getKey().getGroup());
jobInfo.setTriggerType(trigger.getClass().getSimpleName());
// 设置Cron表达式(如果是CronTrigger)
if (trigger instanceof CronTrigger) {
jobInfo.setCronExpression(((CronTrigger) trigger).getCronExpression());
}
// 设置任务状态
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
jobInfo.setStatus(triggerState.name());
// 设置下次执行时间
Date nextFireTime = trigger.getNextFireTime();
jobInfo.setNextFireTime(nextFireTime);
jobInfos.add(jobInfo);
}
}
}
return jobInfos;
}
}3. 任务信息实体类
创建一个任务信息实体类,用于封装任务的相关信息:
import org.quartz.JobDataMap;
import java.util.Date;
public class JobInfo {
private String jobName;
private String jobGroup;
private String jobDescription;
private String jobClassName;
private JobDataMap jobDataMap;
private String triggerName;
private String triggerGroup;
private String triggerType;
private String cronExpression;
private String status;
private Date nextFireTime;
// getter和setter方法
// ...
}4. 示例任务类
创建一个示例任务类,用于测试动态任务管理:
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
public class DynamicJob implements Job {
private static final Logger logger = LoggerFactory.getLogger(DynamicJob.class);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
// 获取当前时间
Date now = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 获取任务参数
String jobParam = context.getJobDetail().getJobDataMap().getString("jobParam");
logger.info("动态任务执行: jobParam={}, 执行时间: {}", jobParam, sdf.format(now));
}
}5. 控制器层
创建一个RESTful API控制器,提供动态任务管理的接口:
import org.quartz.JobDataMap;
import org.quartz.SchedulerException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
@RequestMapping("/api/jobs")
public class DynamicJobController {
@Autowired
private DynamicJobManager dynamicJobManager;
/**
* 添加新任务
*/
@PostMapping("/add")
public String addJob(@RequestParam String jobName, @RequestParam String jobGroup,
@RequestParam String cronExpression, @RequestParam String jobParam) throws SchedulerException {
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("jobParam", jobParam);
return dynamicJobManager.addJob(DynamicJob.class, jobName, jobGroup, cronExpression, jobDataMap);
}
/**
* 修改任务
*/
@PutMapping("/update")
public String updateJob(@RequestParam String jobName, @RequestParam String jobGroup,
@RequestParam String cronExpression) throws SchedulerException {
return dynamicJobManager.updateJob(jobName, jobGroup, cronExpression);
}
/**
* 暂停任务
*/
@PutMapping("/pause")
public String pauseJob(@RequestParam String jobName, @RequestParam String jobGroup) throws SchedulerException {
return dynamicJobManager.pauseJob(jobName, jobGroup);
}
/**
* 恢复任务
*/
@PutMapping("/resume")
public String resumeJob(@RequestParam String jobName, @RequestParam String jobGroup) throws SchedulerException {
return dynamicJobManager.resumeJob(jobName, jobGroup);
}
/**
* 删除任务
*/
@DeleteMapping("/delete")
public String deleteJob(@RequestParam String jobName, @RequestParam String jobGroup) throws SchedulerException {
return dynamicJobManager.deleteJob(jobName, jobGroup);
}
/**
* 立即执行任务
*/
@PostMapping("/trigger")
public String triggerJob(@RequestParam String jobName, @RequestParam String jobGroup) throws SchedulerException {
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("triggerParam", "立即执行");
return dynamicJobManager.triggerJob(jobName, jobGroup, jobDataMap);
}
/**
* 获取所有任务
*/
@GetMapping("/all")
public List<JobInfo> getAllJobs() throws SchedulerException {
return dynamicJobManager.getAllJobs();
}
}动态任务管理的高级特性
1. 动态任务的持久化
在Spring Boot中,当使用spring-boot-starter-quartz依赖时,默认会将任务信息持久化到数据库中。需要在application.yml文件中配置数据库连接信息:
spring:
datasource:
url: jdbc:mysql://localhost:3306/quartz?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
quartz:
job-store-type: jdbc # 使用JDBC存储
jdbc:
initialize-schema: always # 自动初始化数据库表结构
properties:
org:
quartz:
scheduler:
instanceName: quartzScheduler
instanceId: AUTO
jobStore:
class: org.quartz.impl.jdbcjobstore.JobStoreTX
driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
tablePrefix: QRTZ_
isClustered: false
clusterCheckinInterval: 10000
useProperties: false
threadPool:
class: org.quartz.simpl.SimpleThreadPool
threadCount: 10
threadPriority: 5
threadsInheritContextClassLoaderOfInitializingThread: true2. 动态任务的参数传递
可以通过JobDataMap在创建任务时传递参数,也可以在立即执行任务时传递额外参数:
// 创建任务时传递参数
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("param1", "value1");
jobDataMap.put("param2", 123);
// 立即执行任务时传递额外参数
JobDataMap triggerDataMap = new JobDataMap();
triggerDataMap.put("extraParam", "额外参数");
scheduler.triggerJob(jobKey, triggerDataMap);在任务执行时,可以通过JobExecutionContext获取这些参数:
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
// 获取创建任务时设置的参数
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
// 获取立即执行时设置的额外参数
JobDataMap triggerDataMap = context.getTrigger().getJobDataMap();
// 获取合并后的参数
JobDataMap mergedDataMap = context.getMergedJobDataMap();
// 使用参数...
}3. 动态任务的状态管理
可以通过Trigger.TriggerState枚举获取任务的当前状态:
Trigger.TriggerState triggerState = scheduler.getTriggerState(triggerKey);
switch (triggerState) {
case NONE: // 不存在
case NORMAL: // 正常
case PAUSED: // 暂停
case COMPLETE: // 完成
case ERROR: // 错误
case BLOCKED: // 阻塞
break;
}4. 动态任务的并发控制
可以通过@DisallowConcurrentExecution注解控制任务的并发执行:
@DisallowConcurrentExecution
public class NonConcurrentJob implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
// 任务逻辑
}
}5. 动态任务的重试机制
可以通过JobExecutionException的setRefireImmediately()方法实现任务重试:
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
try {
// 任务逻辑
} catch (Exception e) {
JobExecutionException jobException = new JobExecutionException(e);
jobException.setRefireImmediately(true); // 立即重试
// jobException.setRefireImmediately(false); // 不重试
// jobException.setUnscheduleAllTriggers(true); // 取消所有关联的Trigger
throw jobException;
}
}动态任务管理的注意事项
任务类的可序列化:如果使用JDBC存储,Job类需要实现
Serializable接口,否则会报序列化错误。任务参数的类型限制:
JobDataMap中只能存储可序列化的对象,建议只存储基本数据类型和字符串。并发安全:动态任务管理操作本身是线程安全的,但在多线程环境下仍需注意业务逻辑的线程安全问题。
事务管理:动态任务管理操作默认不包含在事务中,如果需要事务支持,可以在方法上添加
@Transactional注解。异常处理:所有的动态任务管理操作都可能抛出
SchedulerException异常,需要妥善处理。性能考虑:频繁的动态任务管理操作可能会影响Quartz的性能,建议合理设计任务的生命周期。
总结
本文详细介绍了Quartz动态任务管理的实现方法,包括动态添加任务、修改任务、暂停和恢复任务、删除任务等功能,并提供了完整的示例代码。通过这些技术,我们可以在运行时灵活地管理定时任务,满足各种复杂的业务需求。
在实际应用中,动态任务管理常用于以下场景:
- 后台管理系统中的任务管理功能
- 根据业务规则动态生成任务
- 任务的动态调度和优化
- 临时任务的创建和销毁
通过合理使用Quartz的动态任务管理功能,可以大大提高系统的灵活性和可扩展性。