@Schedule定时任务+分布式环境,这些坑你一定得注意!!!
Java后端技术
共 24854字,需浏览 50分钟
·
2024-08-02 09:19
往期热门文章:
2、放弃Java8的Stream流,我选择使用JDFrame!
5、一天干了多少活儿,摸了多少鱼,这个工具一目了然给你统计出来
来源:juejin.cn/post/7155872110252916766
<parent>
<artifactId>spring-boot-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.7.2</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
@SpringBootApplication
public class ApplicationScheduling {
public static void main(String[] args) {
SpringApplication.run(ApplicationScheduling.class, args);
}
}
/**
* @description:
* @author: Ning Zaichun
*/
@Slf4j
@Component
@EnableScheduling
public class ScheduleService {
// 每五秒执行一次,cron的表达式就不再多说明了
@Scheduled(cron = "0/5 * * * * ? ")
public void testSchedule() {
log.info("当前执行任务的线程号ID===>{}", Thread.currentThread().getId());
}
}
二、问题::执行时间延迟和单线程执行
按照上面代码中给定的cron表达式@Scheduled(cron = "0/5 * * * * ? ")每五秒执行一次,那么最近五次的执行结果应当为:
2022-09-06 00:21:10
2022-09-06 00:21:15
2022-09-06 00:21:20
2022-09-06 00:21:25
2022-09-06 00:21:30
2022-09-06 19:42:10.018 INFO 24496 --- [ scheduling-1] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>64
2022-09-06 19:42:15.015 INFO 24496 --- [ scheduling-1] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>64
2022-09-06 19:42:20.001 INFO 24496 --- [ scheduling-1] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>64
2022-09-06 19:42:25.005 INFO 24496 --- [ scheduling-1] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>64
2022-09-06 19:42:30.007 INFO 24496 --- [ scheduling-1] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>64
@Scheduled(cron = "0/5 * * * * ? ")
public void testSchedule() {
try {
Thread.sleep(10000);
log.info("当前执行任务的线程号ID===>{}", Thread.currentThread().getId());
} catch (Exception e) {
e.printStackTrace();
}
}
2022-09-06 19:46:50.019 INFO 27236 --- [ scheduling-1] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>64
2022-09-06 19:47:05.024 INFO 27236 --- [ scheduling-1] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>64
2022-09-06 19:47:20.016 INFO 27236 --- [ scheduling-1] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>64
2022-09-06 19:47:35.005 INFO 27236 --- [ scheduling-1] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>64
2022-09-06 19:47:50.006 INFO 27236 --- [ scheduling-1] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>64
-
执行时间延迟: 从时间上可以明显看出,不再是每五秒执行一次,执行时间延迟很多,造成任务的 -
单线程执行: 从始至终都只有一个线程在执行任务,造成任务的堵塞.
三、为什么会出现上述问题?
问题的根本:线程阻塞式执行,执行任务线程数量过少。
那到底是为什么呢?
回到启动类上,我们在启动上标明了一个@EnableScheduling注解。
大家在看到诸如@Enablexxxx这样的注解的时候,就要知道它一定有一个xxxxxAutoConfiguration的自动装配的类。
@EnableScheduling也不例外,它的自动装配的类是TaskSchedulingAutoConfiguration。
我们来看看它到底做了一些什么设置?我们如何修改?
@ConditionalOnClass(ThreadPoolTaskScheduler.class)
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(TaskSchedulingProperties.class)
@AutoConfigureAfter(TaskExecutionAutoConfiguration.class)
public class TaskSchedulingAutoConfiguration {
@Bean
@ConditionalOnBean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@ConditionalOnMissingBean({ SchedulingConfigurer.class, TaskScheduler.class, ScheduledExecutorService.class })
public ThreadPoolTaskScheduler taskScheduler(TaskSchedulerBuilder builder) {
return builder.build();
}
// ......
}
public ThreadPoolTaskScheduler build() {
return configure(new ThreadPoolTaskScheduler());
}
protected ScheduledExecutorService createExecutor(
int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler)
四、解决方式
1、@EnableConfigurationProperties(TaskSchedulingProperties.class) ,自动装配类通常也都会对应有个xxxxProperties文件滴,TaskSchedulingProperties也确实可以配置核心线程数等基本参数,但是无法配置线程池中最大的线程数量和等待队列数量,这种方式还是不合适的。
4.1、修改配置文件
可以配置的就下面几项~
spring:
task:
scheduling:
thread-name-prefix: nzc-schedule- #线程名前缀
pool:
size: 10 #核心线程数
# shutdown:
# await-termination: true #执行程序是否应等待计划任务在关机时完成。
# await-termination-period: #执行程序应等待剩余任务完成的最长时间。
2022-09-06 20:49:15.015 INFO 7852 --- [ nzc-schedule-1] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>64
2022-09-06 20:49:30.004 INFO 7852 --- [ nzc-schedule-2] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>66
2022-09-06 20:49:45.024 INFO 7852 --- [ nzc-schedule-1] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>64
2022-09-06 20:50:00.025 INFO 7852 --- [ nzc-schedule-3] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>67
2022-09-06 20:50:15.023 INFO 7852 --- [ nzc-schedule-2] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>66
2022-09-06 20:50:30.008 INFO 7852 --- [ nzc-schedule-4] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>68
请注意:这里的配置并非是一定生效的,修改后有可能成功,有可能失败,具体原因未知,但这一点是真实存在的。
4.2、执行逻辑改为异步执行
首先我们先向Spring中注入一个我们自己编写的线程池,参数自己设置即可,我这里比较随意。
@Configuration
public class MyTheadPoolConfig {
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//设置核心线程数
executor.setCorePoolSize(10);
//设置最大线程数
executor.setMaxPoolSize(20);
//缓冲队列200:用来缓冲执行任务的队列
executor.setQueueCapacity(200);
//线程活路时间 60 秒
executor.setKeepAliveSeconds(60);
//线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
// 这里我继续沿用 scheduling 默认的线程名前缀
executor.setThreadNamePrefix("nzc-create-scheduling-");
//设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
/**
* @description:
* @author: Ning Zaichun
*/
@Slf4j
@Component
@EnableScheduling
public class ScheduleService {
@Autowired
TaskExecutor taskExecutor;
@Scheduled(cron = "0/5 * * * * ? ")
public void testSchedule() {
CompletableFuture.runAsync(()->{
try {
Thread.sleep(10000);
log.info("当前执行任务的线程号ID===>{}", Thread.currentThread().getId());
} catch (Exception e) {
e.printStackTrace();
}
},taskExecutor);
}
}
2022-09-06 21:00:00.019 INFO 18356 --- [te-scheduling-1] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>66
2022-09-06 21:00:05.022 INFO 18356 --- [te-scheduling-2] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>67
2022-09-06 21:00:10.013 INFO 18356 --- [te-scheduling-3] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>68
2022-09-06 21:00:15.020 INFO 18356 --- [te-scheduling-4] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>69
2022-09-06 21:00:20.026 INFO 18356 --- [te-scheduling-5] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>70
/**
* @description:
* @author: Ning Zaichun
*/
@Slf4j
@Component
@EnableAsync
@EnableScheduling
public class ScheduleService {
@Autowired
TaskExecutor taskExecutor;
@Async(value = "taskExecutor")
@Scheduled(cron = "0/5 * * * * ? ")
public void testSchedule() {
try {
Thread.sleep(10000);
log.info("当前执行任务的线程号ID===>{}", Thread.currentThread().getId());
} catch (Exception e) {
e.printStackTrace();
}
}
}
2022-09-06 21:10:15.022 INFO 22760 --- [zc-scheduling-1] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>66
2022-09-06 21:10:20.021 INFO 22760 --- [zc-scheduling-2] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>67
2022-09-06 21:10:25.007 INFO 22760 --- [zc-scheduling-3] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>68
2022-09-06 21:10:30.020 INFO 22760 --- [zc-scheduling-4] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>69
2022-09-06 21:10:35.007 INFO 22760 --- [zc-scheduling-5] com.nzc.service.ScheduleService : 当前执行任务的线程号ID===>70
4.4、小结
/**
* 定时任务
* 1、@EnableScheduling 开启定时任务
* 2、@Scheduled开启一个定时任务
* 3、自动装配类 TaskSchedulingAutoConfiguration
*
* 异步任务
* 1、@EnableAsync:开启异步任务
* 2、@Async:给希望异步执行的方法标注
* 3、自动装配类 TaskExecutionAutoConfiguration
*/
在单体项目中,也许上面的问题是解决了,但是站在分布式的情况下考虑,就并非是安全的了。
假如这个定时任务是收集某个信息,发送给消息队列,如果多台机器同时执行,同时给消息队列发送信息,那么必然导致之后产生一系列的脏数据。这是非常不可靠的
解决方式:分布式锁
很简单也不简单,加分布式锁~ 或者是用一些分布式调度的框架
如使用XXL-JOB实现,或者是其他的定时任务框架。
大家在执行这个定时任务之前,先去获取一把分布式锁,获取到了就执行,获取不到就直接结束。
我这里使用的是 redission,因为方便,打算写分布式锁的文章,还在准备当中。
加入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.6</version>
</dependency>
/**
* @description:
* @author: Ning Zaichun
*/
@Configuration
public class MyRedissonConfig {
/**
* 所有对Redisson的使用都是通过RedissonClient
* @return
* @throws IOException
*/
@Bean(destroyMethod="shutdown")
public RedissonClient redissonClient() throws IOException {
//1、创建配置
Config config = new Config();
// 这里规定要用 redis://+IP地址
config.useSingleServer().setAddress("redis://xxxxx:6379").setPassword("000415"); // 有密码就写密码~ 木有不用写~
//2、根据Config创建出RedissonClient实例
//Redis url should start with redis:// or rediss://
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
}
/**
* @description:
* @author: Ning Zaichun
*/
@Slf4j
@Component
@EnableAsync
@EnableScheduling
public class ScheduleService {
@Autowired
TaskExecutor taskExecutor;
@Autowired
RedissonClient redissonClient;
private final String SCHEDULE_LOCK = "schedule:lock";
@Async(value = "taskExecutor")
@Scheduled(cron = "0/5 * * * * ? ")
public void testSchedule() {
//分布式锁
RLock lock = redissonClient.getLock(SCHEDULE_LOCK);
try {
//加锁 10 为时间,加上时间 默认会去掉 redisson 的看门狗机制(即自动续锁机制)
lock.lock(10, TimeUnit.SECONDS);
Thread.sleep(10000);
log.info("当前执行任务的线程号ID===>{}", Thread.currentThread().getId());
} catch (Exception e) {
e.printStackTrace();
} finally {
// 一定要记得解锁~
lock.unlock();
}
}
}
思考:继续往深处思考,在分布式情况下如果一个定时任务抢到锁,但是它在执行业务过程中失败或者是宕机了,这又该如何处理呢?如何补偿呢?
往期热门文章:
1、编程语言座次图,谁才是老大?(PS:原来这么多编程语言都有同一个祖宗) 2、如果网站的 Cookie 超过 4K,会发生什么情况? 3、如何优雅的实现在线人数统计功能? 4、JetBrains再出手,这波秀翻了。。 5、FullGC 40次/天到10天1次,真牛B!! 6、不服不行,这才是后端API接口应该有的样子! 7、一个强大的分布式锁框架——Lock4j 8、Stream很好,Map很酷,但答应我别用toMap() 9、你合并代码用 merge 还是用 rebase ? 10、99%的时间里只使用这14个Git命令就够了!!!
评论