老板今天让我搞 分布式定时任务....
共 10041字,需浏览 21分钟
·
2020-09-30 12:12
点击上方蓝色字体,选择“标星公众号”
优质文章,第一时间送达
作者 | 荒野猛兽
来源 | urlify.cn/vqiYva
第一步:引入依赖
org.quartz-scheduler
quartz
${quartz.version}
org.quartz-scheduler
quartz-jobs
${quartz.version}
org.springframework
spring-context-support
第二步:创建MySQL表,Quartz是基于表来感知其他定时任务节点的,节点间不会直接通信。建表语句在jar包中自带了。
org\quartz-scheduler\quartz\2.3.0\quartz-2.3.0.jar!\org\quartz\impl\jdbcjobstore\tables_mysql_innodb.sql
第三步:配置线程池,我这里是因为项目的其他地方有用到线程池,你也可以选择在Quartz的配置类中注入。
(我在其他位置使用了线程池,占用了一个线程,所以当我将核心线程数量设置为1时,定时任务不会执行;需确保有足够的线程来执行)
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Author 1
* @Description 配置线程池交给Spring容器管理
* @Date 2020/8/26 18:23
**/
@Configuration
public class ExecturConfig {
@Bean("taskExector")
public Executor taskExector() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程池数量
executor.setCorePoolSize(2);
//最大线程数量
executor.setMaxPoolSize(5);
//线程池的队列容量
executor.setQueueCapacity(10);
//线程名称的前缀
executor.setThreadNamePrefix("expireOrderHandle-");
//配置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.initialize();
return executor;
}
}
第四步:因为定时任务业务中需要使用到注入Spring容器的类,所以配置注入,否则报空指针异常。
参考了一位大佬的博客:https://blog.csdn.net/qq_39513430/article/details/104996237
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.scheduling.quartz.AdaptableJobFactory;
import org.springframework.stereotype.Component;
@Component("myAdaptableJobFactory")
public class MyAdaptableJobFactory extends AdaptableJobFactory {
//AutowireCapableBeanFactory 可以将一个对象添加到SpringIOC容器中,并且完成该对象注入
@Autowired
private AutowireCapableBeanFactory autowireCapableBeanFactory;
/**
* 该方法需要将实例化的任务对象手动的添加到springIOC容器中并且完成对象的注入
*/
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object obj = super.createJobInstance(bundle);
//将obj对象添加Spring IOC容器中,并完成注入
this.autowireCapableBeanFactory.autowireBean(obj);
return obj;
}
}
第五步:添加Quartz属性文件
关于属性配置解释参考https://blog.csdn.net/github_36429631/article/details/63254055
#============================================================================
# Configure JobStore
# Using Spring datasource in SchedulerConfig.java
# Spring uses LocalDataSourceJobStore extension of JobStoreCMT
#============================================================================
#设置为TRUE不会出现序列化非字符串类到 BLOB 时产生的类版本问题
org.quartz.jobStore.useProperties=true
#quartz相关数据表前缀名
org.quartz.jobStore.tablePrefix = QRTZ_
#开启分布式部署
org.quartz.jobStore.isClustered = true
#分布式节点有效性检查时间间隔,单位:毫秒
org.quartz.jobStore.clusterCheckinInterval = 20000
#信息保存时间 默认值60秒
org.quartz.jobStore.misfireThreshold = 60000
#事务隔离级别为“读已提交”
org.quartz.jobStore.txIsolationLevelReadCommitted = true
#配置线程池实现类
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#============================================================================
# Configure Main Scheduler Properties
# Needed to manage cluster instances
#============================================================================
org.quartz.scheduler.instanceName = ClusterQuartz
org.quartz.scheduler.instanceId= AUTO
#如果你想quartz-scheduler出口本身通过RMI作为服务器,然后设置“出口”标志true(默认值为false)。
org.quartz.scheduler.rmi.export = false
#true:链接远程服务调度(客户端),这个也要指定registryhost和registryport,默认为false
# 如果export和proxy同时指定为true,则export的设置将被忽略
org.quartz.scheduler.rmi.proxy = false
org.quartz.scheduler.wrapJobExecutionInUserTransaction = false
#============================================================================
# Configure ThreadPool
# Can also be configured in spring configuration
#============================================================================
#线程池的实现类(一般使用SimpleThreadPool即可满足几乎所有用户的需求)
#org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
#org.quartz.threadPool.threadCount = 5
#org.quartz.threadPool.threadPriority = 5
#org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true
第六步:写任务类
package com.website.task;
import com.website.mapper.WebTeacherMapper;
import com.website.pojo.ao.TeacherSalaryRuleAO;
import com.website.pojo.bo.TeacherSalaryRuleDetailBO;
import com.website.pojo.bo.TeacherSalaryRuleRelationBO;
import com.website.pojo.bo.TeacherSalaryStatTempBO;
import com.website.pojo.bo.TeacherSalaryStatisticBO;
import io.jsonwebtoken.lang.Collections;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.PersistJobDataAfterExecution;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
@Slf4j
public class QuartzJob extends QuartzJobBean {
@Autowired
private WebTeacherMapper webTeacherMapper;
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
log.info("统计老师月薪资定时任务开始执行。");
System.out.println("任务编写位置");
log.info("统计老师月薪资定时任务执行完毕。");
}
}
第七步:配置定时器
import com.website.task.QuartzJob;
import org.quartz.Scheduler;
import org.quartz.TriggerKey;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
import org.springframework.scheduling.quartz.JobDetailFactoryBean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import javax.sql.DataSource;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.Executor;
@Configuration
public class SchedulerConfig {
@Autowired
private DataSource dataSource;
@Autowired
private Executor taskExector;
@Autowired
private MyAdaptableJobFactory myAdaptableJobFactory;
@Bean
public Scheduler scheduler() throws Exception {
Scheduler scheduler = schedulerFactoryBean().getScheduler();
TriggerKey triggerKey1 = TriggerKey.triggerKey("trigger1", "TriggerTest111");
/*========如果有必要可以配置删除任务,开始====================*/
//停止触发器
// scheduler.pauseTrigger(triggerKey1);
//移除触发器
// scheduler.unscheduleJob(triggerKey1);
// JobKey jobKey1 = JobKey.jobKey("job1111------", "quartzTest--------");
//删除任务
// boolean b = scheduler.deleteJob(jobKey1);
// System.out.println(b);
/*=========结束====================*/
scheduler.start();
return scheduler;
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
//开启更新job
factory.setOverwriteExistingJobs(true);
//如果不配置就会使用quartz.properties中的instanceName
//factory.setSchedulerName("Cluster_Scheduler");
//配置数据源,这是quartz使用的表的数据库存放位置
factory.setDataSource(dataSource);
//设置实例在spring容器中的key
factory.setApplicationContextSchedulerContextKey("applicationContext");
//配置线程池
factory.setTaskExecutor(taskExector);
//配置配置文件
factory.setQuartzProperties(quartzProperties());
//设置调度器自动运行
factory.setAutoStartup(true);
//配置任务执行规则,参数是一个可变数组
factory.setTriggers(trigger1().getObject());
// 解决mapper无法注入问题,此处配合第四步的配置。
factory.setJobFactory(myAdaptableJobFactory);
return factory;
}
@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
// 在quartz.properties中的属性被读取并注入后再初始化对象
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
@Bean
public JobDetailFactoryBean job1() {
JobDetailFactoryBean jobDetail = new JobDetailFactoryBean();
//配置任务的具体实现
jobDetail.setJobClass(QuartzJob.class);
//是否持久化
jobDetail.setDurability(true);
//出现异常是否重新执行
jobDetail.setRequestsRecovery(true);
//配置定时任务信息
jobDetail.setName("TeacherSalaryJob");
jobDetail.setGroup("TeacherSalaryJobGroup");
jobDetail.setDescription("这是每月1号凌晨统计教师薪资任务");
return jobDetail;
}
@Bean
public CronTriggerFactoryBean trigger1() {
CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();
//定时规则的分组
cronTrigger.setGroup("TeacherSalaryTriggerGroup");
cronTrigger.setName("TeacherSalaryTrigger");
//配置执行的任务jobdetail
cronTrigger.setJobDetail(job1().getObject());
//配置执行规则 每月一号0点过1分执行一次
cronTrigger.setCronExpression("0 1 0 1 * ? ");
return cronTrigger;
}
}
到此完毕,另外发现如果执行任务的代码中报错,会导致定时任务停止循环,重启也不会再执行。建议任务内容用try...catch代码块包裹起来,打印好日志。
已中断的任务清空Quartz所有表格,再启动项目即可再次触发启动任务。
如果某一天定时任务突然不执行了,网上很多情况都是远程调用没有加超时中断,从而导致线程阻塞引起的。
粉丝福利:Java从入门到入土学习路线图
???
?长按上方微信二维码 2 秒
感谢点赞支持下哈