Java线程池之ThreadPoolExecutor
共 4252字,需浏览 9分钟
·
2021-10-25 10:13
日常开发中对于多线程的使用,一般很少直接new Thread。因为线程的频繁创建、销毁会耗费大量的系统资源。为此基于池化技术的线程池应运而生
Executors类
在Executors类中提供了很多创建线程池的工厂方法。这里介绍下一些常见的工厂方法
newFixedThreadPool
该方法创建一个固定数量线程的线程池
public class TestThreadPool {
/**
* 定长的线程池
*/
public static void test1() {
ExecutorService executor = Executors.newFixedThreadPool(3);
work(executor);
executor.shutdown();
}
private static void work(ExecutorService executor) {
for (int i=0; i<10; i++) {
executor.submit( new Job("Job-"+i) );
}
}
}
从测试结果可以看到,无论多少个任务,可用的线程数量都是固定的
newCachedThreadPool
该方法创建一个线程池,当没有空闲线程可用时,其会一直创建新的线程来处理任务
public class TestThreadPool {
/**
* 可缓存的线程池
*/
public static void test2() {
ExecutorService executor = Executors.newCachedThreadPool();
work(executor);
executor.shutdown();
}
}
测试结果如下所示
newSingleThreadExecutor
该方法创建的线程池中只有一个线程,故提交至此的任务会依次执行
public class TestThreadPool {
/**
* 单线程的线程池
*/
public static void test3() {
ExecutorService executor = Executors.newSingleThreadExecutor();
work(executor);
executor.shutdown();
}
}
测试结果如下所示
newScheduledThreadPool
该方法创建的线程池可用于执行定时任务
public class TestThreadPool {
/**
* 定时任务的线程池
*/
public static void test4() {
Consumer task = (String taskName) -> {
String now = DateUtil.format( new Date(), "HH:mm:ss" );
String info = " + now + " [Thread]: " + Thread.currentThread().getName() + " : " + taskName;
System.out.println( info );
try{
// 耗时模拟:5秒
Thread.sleep(1000*5);
}catch (InterruptedException e) {
e.printStackTrace();
}
};
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
System.out.println( " + DateUtil.format( new Date(), "HH:mm:ss") );
// 一次性任务, 延迟10秒后执行
executorService.schedule( ()->task.accept("OneTime"), 10, TimeUnit.SECONDS );
// 定时任务: 延迟30秒启动,每次间隔10秒开始执行
executorService.scheduleAtFixedRate( ()->task.accept("fixedRate"), 30, 10, TimeUnit.SECONDS);
// 定时任务: 延迟30秒启动,每次完成10秒后再次执行
executorService.scheduleWithFixedDelay( ()->task.accept("fixedDelay") , 30, 10, TimeUnit.SECONDS);
}
}
其中上述代码中scheduleAtFixedRate、scheduleWithFixedDelay方法第三个参数的含义分别是两次任务开始执行的间隔时间、上一次任务结束至本次任务开始的间隔时间。与SpringBoot中的@Scheduled(fixedRate)、@Scheduled(fixedDelay)注解的用途类似。值得一提的是,对于scheduleAtFixedRate而言,当 我们指定的两次任务开始执行的间隔时间 小于 该任务执行一次所需的耗时 时,将会以 该任务执行所需的耗时 作为 两次任务开始执行的实际间隔时间
测试结果如下所示。从蓝框可以看出,三个任务的第一次执行时机均按指定的延时时间(分别延迟10秒、30秒、30秒)启动;从绿框可知,对于名为fixedRate的任务而言,每次开始执行的间隔为10秒;从红框可知,对于名为fixedDelay的任务而言,每次开始执行的间隔为15秒。因为其上一次任务结束至本次任务开始的时间间隔为10秒,加上该任务本身耗时5秒,故累计为15秒
ThreadPoolExecutor类
概述
事实上对于上述的工厂方法而言,其内部是使用线程池ThreadPoolExecutor类。该类的继承结构如下所示
与线程类似。对于线程池而言,其整个生命周期阶段也存在若干不同的状态。具体如下
Running:该状态下,线程池可以接受新任务,并能够处理阻塞队列中的任务 ShutDown:该状态下,线程池不再可以接受新任务,但能够继续处理阻塞队列中的任务 Stop:该状态下,线程池不再可以接受新任务,也不会继续处理阻塞队列中的任务。同时会中断正在处理的任务 Tidying:该状态下,线程池中的工作线程数量为0。并且会调用terminated()钩子方法(hook method) Terminated:当terminated()钩子方法(hook method)执行完毕后,线程池进入该状态
各状态的变化流程如下所示
值得一提的是,在ThreadPoolExecutor的实现过程中。其通过一个AtomicInteger类型的原子变量ctl实现了对线程池状态、工作线程数的记录。具体来说,是将高3位用于表示线程池状态,剩余位表示工作线程数。runStateOf方法用于获取线程池状态信息,workerCountOf方法用于获取工作线程数
在实际应用过程中,线程池ThreadPoolExecutor常见参数如下:
corePoolSize:线程池的核心线程数 maximumPoolSize:线程池的最大线程数 keepAliveTime:空闲线程的超时时间,用于终止空闲线程。通常其只对线程池中超过corePoolSize的多余线程生效。除非allowCoreThreadTimeOut属性设为true,才会对核心线程生效 unit:keepAliveTime参数的时间单位。其可选值定义在枚举类TimeUnit中 workQueue:任务的阻塞队列 handler:当 任务队列workQueue已满 且 线程数已达到maximumPoolSize ,提交新任务时的拒绝策略
其在接收任务后的基本流程如下所示
拒绝策略
JDK提供了四种拒绝策略,均实现了RejectedExecutionHandler接口
DiscardPolicy 丢弃策略
该策略下当提交的任务 无空闲线程执行 或 任务队列已满 时,则会直接被丢弃且不会产生任何异常
public class RejectedPolicyDemo {
/**
* 创建一个线程池,其最多只会创建2个线程,任务队列最多存放1个任务
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1) );
public static void test1() {
// 拒绝策略:直接丢弃
System.out.println("-------------------- 拒绝策略:直接丢弃 --------------------");
executor.setRejectedExecutionHandler( new ThreadPoolExecutor.DiscardPolicy() );
executeJob( "clean" );
executor.shutdown();
}
private static void executeJob(String name) {
for (int i=0; i<10; i++) {
executor.submit( new Job( name+"-"+i) );
}
}
}
测试结果如下所示,#3-#9号任务被直接丢弃了
DiscardOldestPolicy 丢弃最老策略
该策略下当提交的任务 无空闲线程执行 或 任务队列已满 时,则会丢弃队列中最旧的任务以释放空间来存储该任务
public class RejectedPolicyDemo {
/**
* 创建一个线程池,其最多只会创建2个线程,任务队列最多存放1个任务
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1) );
public static void test2(){
// 拒绝策略:丢弃队列中最旧的任务
System.out.println("-------------------- 拒绝策略:丢弃队列中最旧的任务 --------------------");
executor.setRejectedExecutionHandler( new ThreadPoolExecutor.DiscardOldestPolicy() );
executeJob( "register" );
executor.shutdown();
}
}
测试结果如下所示。当#0、#1号任务在执行时,#2~#9号任务不断被存储到队列、然后被丢弃以存放最新的任务。所以#9号任务最终被保留并执行
AbortPolicy 中止策略
在该策略下,当线程池无法继续接收提交的任务时会抛出RejectedExecutionException异常。其也是线程池的默认拒绝策略。显然抛出异常的方式可以让开发者更好的把握系统的运行状态。当然在此种拒绝策略下,我们需要处理好其所抛出的异常,以免打断当前的执行流程
public class RejectedPolicyDemo {
/**
* 创建一个线程池,其最多只会创建2个线程,任务队列最多存放1个任务
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1) );
public static void test3(){
// 拒绝策略:抛异常
System.out.println("-------------------- 拒绝策略:抛异常 --------------------");
executor.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy() );
try{
executeJob( "request" );
}catch (RejectedExecutionException e) {
System.out.println("[Error] 提交到线程池的任务量过多");
}
executor.shutdown();
}
}
测试结果如下所示
CallerRunsPolicy 调用者执行策略
在该策略下,当线程池无法继续接收提交的任务时,其会交由调用者(提交任务的线程)去执行完成
public class RejectedPolicyDemo {
/**
* 创建一个线程池,其最多只会创建2个线程,任务队列最多存放1个任务
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1) );
public static void test4() {
// 拒绝策略:由调用者执行
System.out.println("-------------------- 拒绝策略:由调用者执行 --------------------");
executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() );
executeJob( "caller" );
executor.shutdown();
}
}
测试结果如下所示,符合预期。线程池无法继续接收新任务时,其会被提交任务的线程(即这里的main线程)执行完成
Note
当线程池的拒绝策略为DiscardPolicy、DiscardOldestPolicy时,则对于被拒绝任务的Future实例而言。如果在其上调用无参的get()方法,则会导致一直被阻塞。故在此种场景下,推荐使用支持超时机制的get()方法。测试代码如下所示
public class ThreadPoolDemo {
@Test
public void test1() throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new ThreadPoolExecutor.DiscardPolicy());
Future future1 = executor.submit( taskFactory("Task 1") );
Future future2 = executor.submit( taskFactory("Task 2") );
Future future3 = executor.submit( taskFactory("Task 3") );
future1.get();
System.out.println("future 1 Over");
future2.get();
System.out.println("future 2 Over");
future3.get();
System.out.println("future 3 Over");
}
private static Runnable taskFactory(String taskName) {
return () -> {
// 模拟业务耗时
try{Thread.sleep(3000);} catch (Exception e){}
System.out.println(taskName + ": Over");
};
}
}
测试结果如下所示,符合预期
线程池使用完毕后,应通过 shutdown() 方法进行关闭。实例代码如下所示
public static void main(String[] args) {
System.out.println("Hello World");
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
threadPoolExecutor.execute( ()-> System.out.println("Test Task") );
// 关闭线程池
//threadPoolExecutor.shutdown();
System.out.println("Main Over");
}
测试结果如下所示,左下角处的红色方块表明由于线程池未关闭,JVM依然存在并未退出。正确做法是放开上述代码中对线程池关闭操作的注释
参考文献
Java并发编程之美 翟陆续、薛宾田著