分布式定时任务,你了解多少?基于Quartz实现分布式任务解决方案!

架构师精进

共 9505字,需浏览 20分钟

 ·

2022-07-09 13:08

后台定时任务系统在应用平台中的重要性不言而喻,特别是互联网电商、金融等行业更是离不开定时任务。在任务数量不多、执行频率不高时,单台服务器完全能够满足。但是随着业务逐渐增加,定时任务系统必须具备高可用和水平扩展的能力,单台服务器已经不能满足需求。因此需要把定时任务系统部署到集群中,实现分布式定时任务系统集群。


分布式任务调度框架几乎是每个大型应用必备的工具,下面我们结合项目实践,对业界普遍使用的开源分布式任务调度框架的使用进行了探究实践,并分析了这几种框架的优劣势和对自身业务的思考。


一、分布式定时任务简介

1.什么是分布式任务?

分布式定时任务就是把分散的、批量的后台定时任务纳入统一的管理调度平台,实现任务的集群管理、调度和分布式部署管理方式。


2.分布式定时任务的特点

实际项目中涉及到分布式任务的业务场景非常多,这就使得我们的定时任务系统应该集管理、调度、任务分配、监控预警为一体的综合调度系统,如何打造一套健壮的、适应不同场景的系统,技术选型尤其重要。针对以上场景我们需要我们的分布式任务系统具备以下能力:

  • 支持多种任务类型(shell任务/Java任务/web任务)

  • 支持HA,负载均衡和故障转移

  • 支持弹性扩容(应对开门红以及促销活动)

  • 支持Job Timeout 处理

  • 支持统一监控和告警

  • 支持任务统一配置

  • 支持资源隔离和作业隔离


二、为什么需要分布式定时任务?

定时任务系统在应用平台中的重要性不言而喻,特别是互联网电商、金融等行业更是离不开定时任务。在任务数量不多、执行频率不高时,单台服务器完全能够满足。但是,为什么还需要分布式呢?主要有如下两点原因:

  • 高可用:单机版的定时任务调度只能在一台机器上运行,如果系统出现异常,就会导致整个后台定时任务不可用。这对于互联网企业来说是不可接受的。

  • 单机处理极限:单机处理的数据,任务数量是有限的。原本1分钟内需要处理1万个订单,但是现在需要1分钟内处理10万个订单;原来一个统计需要1小时,现在业务方需要10分钟就统计出来。你也许会说,你也可以多线程、单机多进程处理。的确,多线程并行处理可以提高单位时间的处理效率,但是单机能力毕竟有限(主要是CPU、内存和磁盘),始终会有单机处理不过来的情况。


但我们遇到的问题还不止这些,比如容错功能、失败重试、分片功能、路由负载均衡、管理后台等。这些都是单机的定时任务系统所不具备的,因此需要把定时任务系统部署到集群中,实现分布式定时任务系统集群。


三、常见开源方案

目前,分布式定时任务框架非常多,而且大部分都已经开源,比较流行的有:xxl-job、elastic-job、quartz等。

  • elastic-job,是由当当网基于quartz 二次开发之后的分布式调度解决方案 , 由两个相对独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成 。

  • xxl-job,是由个人开源的一个轻量级分布式任务调度框架 ,主要分为 调度中心和执行器两部分 , 调度中心在启动初始化的时候,会默认生成执行器的RPC代理对象(http协议调用), 执行器项目启动之后, 调度中心在触发定时器之后通过jobHandle 来调用执行器项目里面的代码,核心功能和elastic-job差不多,同时技术文档比较完善

  • quartz,是非常流行的开源的作业调度框架,它提供了巨大的灵活性而不牺牲简单性。你能够用它来为执行一个作业而创建简单的或复杂的调度。同时也提供了基于数据库的集群方案,通过在数据库中配置定时器信息,以数据库锁的方式达到同一个任务始终只有一个节点在运行。


以表列出了几个代表性的开源分布式任务框架的:

功能quartzelastic-jobxxl-job
HA多节点部署,通过数据库锁来保证只有一个节点执行任务通过zookeeper的注册与发现,可以动态添加服务器。支持水平扩容集群部署
任务分片支持支持
文档完善完善完善完善
管理界面支持支持
难易程度简单较复杂简单
公司OpenSymphony当当网个人
缺点没有管理界面,以及不支持任务分片等。不适用于分布式场景需要引入zookeeper , mesos, 增加系统复杂度, 学习成本较高通过获取数据库锁的方式,保证集群中执行任务的唯一性,性能不好。



四、基于Quartz实现分布式定时任务解决方案

1.Quartz的集群解决方案

Quartz单机版本相比大家应该比较熟悉,它的集群方案则是在单机的基础上加上一个公共数据库。通过在数据库中配置定时器信息, 以数据库锁的方式达到同一个任务始终只有一个节点在运行,集群架构如下:

通过上面的架构图可以看到,三个Quartz服务节点共享同一个数据库,如果某一个服务节点失效,那么Job会在其他节点上执行。各个Quartz服务器都遵守基于数据库锁的调度原则,只有获取了锁才能调度后台任务,从而保证了任务执行的唯一性。同时多个节点的异步运行保证了服务的可靠性。


2.实现基于Quartz的分布式定时任务

下面就通过示例,演示如何基于Quartz实现分布式定时任务。

1. 添加Quartz依赖

由于分布式的原因,Quartz中提供分布式处理的JAR包以及数据库和连接相关的依赖。示例代码如下:

<dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-quartz</artifactId></dependency> <!-- mysql --><dependency>   <groupId>mysql</groupId>   <artifactId>mysql-connector-java</artifactId></dependency> <!-- orm --><dependency>    <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-data-jpa</artifactId></dependency>

在上面的示例中,除了添加Quartz依赖外,还需要添加mysql-connector-java和spring-boot-starter-data-jpa两个组件,这两个组件主要用于JOB持久化到MySQL数据库。


2. 初始化Quartz数据库

分布式Quartz定时任务的配置信息存储在数据库中,数据库初始化脚本可以在官方网站中查找,默认保存在quartz-2.2.3-distribution\src\org\quartz\impl\jdbcjobstore\tables-mysql.sql目录下。首先创建quartz_jobs数据库,然后在数据库中执行tables-mysql.sql初始化脚本。


3. 配置数据库和Quartz

修改application.properties配置文件,配置数据库与Quartz。具体操作如下:

# Quartz 数据库spring.datasource.url=jdbc:mysql://localhost:3306/quartz_jobs?useSSL=false&serverTimezone=UTCspring.datasource.username=rootspring.datasource.password=rootspring.datasource.driver-class-name=com.mysql.cj.jdbc.Driverspring.datasource.max-active=1000spring.datasource.max-idle=20spring.datasource.min-idle=5spring.datasource.initial-size=10 # 是否使用properties作为数据存储org.quartz.jobStore.useProperties=false# 数据库中表的命名前缀org.quartz.jobStore.tablePrefix=QRTZ_# 是否是一个集群,是不是分布式的任务org.quartz.jobStore.isClustered=true# 集群检查周期,单位为毫秒,可以自定义缩短时间。当某一个节点宕机的时候,其他节点等待多久后开始执行任务org.quartz.jobStore.clusterCheckinInterval=5000# 单位为毫秒,集群中的节点退出后,再次检查进入的时间间隔org.quartz.jobStore.misfireThreshold=60000# 事务隔离级别org.quartz.jobStore.txIsolationLevelReadCommitted=true# 存储的事务管理类型org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX# 使用的Delegate类型org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate# 集群的命名,一个集群要有相同的命名org.quartz.scheduler.instanceName=ClusterQuartz# 节点的命名,可以自定义。AUTO代表自动生成org.quartz.scheduler.instanceId=AUTO# rmi远程协议是否发布org.quartz.scheduler.rmi.export=false# rmi远程协议代理是否创建org.quartz.scheduler.rmi.proxy=false# 是否使用用户控制的事务环境触发执行任务org.quartz.scheduler.wrapJobExecutionInUserTransaction=false

上面的配置主要是Quartz数据库和Quartz分布式集群相关的属性配置。分布式定时任务的配置存储在数据库中,所以需要配置数据库连接和Quartz配置信息,为Quartz提供数据库配置信息,如数据库、数据表的前缀之类。


4. 定义定时任务

后台定时任务与普通Quartz任务并无差异,只是增加了@PersistJobDataAfterExecution注解和@DisallowConcurrentExecution注解。创建QuartzJob定时任务类并实现Quartz定时任务的具体示例代码如下:

// 持久化@PersistJobDataAfterExecution// 禁止并发执行@DisallowConcurrentExecutionpublic class QuartzJob extends QuartzJobBean {    private static final Logger log = LoggerFactory.getLogger(QuartzJob.class);    @Override   protected void executeInternal(JobExecutionContext context) throws JobExecutionException {       String taskName = context.getJobDetail().getJobDataMap().getString("name");       log.info("---> Quartz job, time:{"+new Date()+"} ,name:{"+taskName+"}<----");    }}

在上面的示例中,创建了QuartzJob定时任务类,使用@PersistJobDataAfterExecution注解持久化任务信息。DisallowConcurrentExecution禁止并发执行,避免同一个任务被多次并发执行。


5. SchedulerConfig配置

创建SchedulerConfig配置类,初始化Quartz分布式集群相关配置,包括集群设置、数据库等。示例代码如下:

@Configurationpublic class SchedulerConfig {    @Autowired    private DataSource dataSource;     /**     * 调度器     *     * @return     * @throws Exception     */    @Bean    public Scheduler scheduler() throws Exception {       Scheduler scheduler = schedulerFactoryBean().getScheduler();       return scheduler;    }     /**     * Scheduler工厂类     *     * @return     * @throws IOException     */    @Bean    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {       SchedulerFactoryBean factory = new SchedulerFactoryBean();       factory.setSchedulerName("Cluster_Scheduler");       factory.setDataSource(dataSource);       factory.setApplicationContextSchedulerContextKey("applicationContext");       factory.setTaskExecutor(schedulerThreadPool());       //factory.setQuartzProperties(quartzProperties());       factory.setStartupDelay(10);// 延迟10s执行       return factory;    }     /**     * 配置Schedule线程池     *     * @return     */    @Bean    public Executor schedulerThreadPool() {        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();       executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());       executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());       executor.setQueueCapacity(Runtime.getRuntime().availableProcessors());       return executor;    }}

在上面的示例中,主要是配置Schedule线程池、配置Quartz数据库、创建Schedule调度器实例等初始化配置。


6. 触发定时任务

配置完成之后,还需要触发定时任务,创建JobStartupRunner类以便在系统启动时触发所有定时任务。示例代码如下:

@Componentpublic class JobStartupRunner implements CommandLineRunner {    @Autowired    SchedulerConfig schedulerConfig;    private static String TRIGGER_GROUP_NAME = "test_trigger";    private static String JOB_GROUP_NAME = "test_job";     @Override    public void run(String... args) throws Exception {        Scheduler scheduler;        try {            scheduler = schedulerConfig.scheduler();            TriggerKey triggerKey = TriggerKey.triggerKey("trigger1", TRIGGER_GROUP_NAME);            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);            if (null == trigger) {                Class clazz = QuartzJob.class;                JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity("job1", JOB_GROUP_NAME).usingJobData("name","weiz QuartzJob").build();               CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");                trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", TRIGGER_GROUP_NAME)                        .withSchedule(scheduleBuilder).build();               scheduler.scheduleJob(jobDetail, trigger);               System.out.println("Quartz 创建了job:...:" + jobDetail.getKey());            } else {               System.out.println("job已存在:{}" + trigger.getKey());            }             TriggerKey triggerKey2 = TriggerKey.triggerKey("trigger2", TRIGGER_GROUP_NAME);            CronTrigger trigger2 = (CronTrigger) scheduler.getTrigger(triggerKey2);            if (null == trigger2) {                Class clazz = QuartzJob2.class;                JobDetail jobDetail2 = JobBuilder.newJob(clazz).withIdentity("job2", JOB_GROUP_NAME).usingJobData("name","weiz QuartzJob2").build();               CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");                trigger2 = TriggerBuilder.newTrigger().withIdentity("trigger2", TRIGGER_GROUP_NAME)                       .withSchedule(scheduleBuilder).build();               scheduler.scheduleJob(jobDetail2, trigger2);               System.out.println("Quartz 创建了job:...:{}" + jobDetail2.getKey());            } else {               System.out.println("job已存在:{}" + trigger2.getKey());            }            scheduler.start();        } catch (Exception e) {           System.out.println(e.getMessage());        }    }}

在上面的示例中,为了适应分布式集群,我们在系统启动时触发定时任务,判断任务是否已经创建、是否正在执行。如果集群中的其他示例已经创建了任务,则启动时无须触发任务。


7. 验证测试

配置完成之后,接下来启动任务,测试分布式任务配置是否成功。启动一个实例,可以看到定时任务执行了,然后每10秒钟打印输出一次,如下图所示。

接下来,模拟分布式部署的情况。我们再启动一个测试程序实例,这样就有两个后台定时任务实例,如下所示。

后台定时任务实例1的日志输出:

后台定时任务实例2的日志输出:

从上面的日志中可以看到,Quartz Job和Quartz Job2交替地在两个任务实例进程中执行,同一时刻同一个任务只有一个进程在执行,这说明已经达到了分布式后台定时任务的效果。

接下来,停止任务实例1,测试任务实例2是否会接管所有任务继续执行。如下图所示,停止任务实例1后,任务实例2接管了所有的定时任务。这样如果集群中的某个实例异常了,其他实例能够接管所有的定时任务,确保任务集群的稳定运行。


最后

以上,就把分布式后台任务介绍完了,并通过Spring Boot + Quartz 实现了基于Quartz的分布式定时任务解决方案!

分布式任务调度框架几乎是每个大型应用必备的工具,作为程序员、架构师必须熟练掌握。


浏览 157
点赞
1评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
全部评论
QS26866549fce35db192023-11-07 17:07
是我的问题吗?我这边看不到代码
点赞回复
推荐
点赞
1评论
收藏
分享

手机扫一扫分享

分享
举报