Java线程池之ThreadPoolExecutor
日常开发中对于多线程的使用,一般很少直接new Thread。因为线程的频繁创建、销毁会耗费大量的系统资源。为此基于池化技术的线程池应运而生

Executors类
在Executors类中提供了很多创建线程池的工厂方法。这里介绍下一些常见的工厂方法
newFixedThreadPool
该方法创建一个固定数量线程的线程池
public class TestThreadPool {
    /**
     * 定长的线程池
     */
    public static void test1() {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        work(executor);
        executor.shutdown();
    }
    private static void work(ExecutorService executor) {
        for (int i=0; i<10; i++) {
            executor.submit( new Job("Job-"+i) );
        }
    }
}
从测试结果可以看到,无论多少个任务,可用的线程数量都是固定的

newCachedThreadPool
该方法创建一个线程池,当没有空闲线程可用时,其会一直创建新的线程来处理任务
public class TestThreadPool {
    /**
     * 可缓存的线程池
     */
    public static void test2() {
        ExecutorService executor = Executors.newCachedThreadPool();
        work(executor);
        executor.shutdown();
    }
}
测试结果如下所示

newSingleThreadExecutor
该方法创建的线程池中只有一个线程,故提交至此的任务会依次执行
public class TestThreadPool {
    /**
     * 单线程的线程池
     */
    public static void test3() {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        work(executor);
        executor.shutdown();
    }
}
测试结果如下所示

newScheduledThreadPool
该方法创建的线程池可用于执行定时任务
public class TestThreadPool {
    /**
     * 定时任务的线程池
     */
    public static void test4() {
        Consumer task = (String taskName) -> {
            String now = DateUtil.format( new Date(), "HH:mm:ss" );
            String info = " + now + " [Thread]: " + Thread.currentThread().getName() + " : "  + taskName;
            System.out.println( info );
            try{
                // 耗时模拟:5秒
                Thread.sleep(1000*5);
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
        System.out.println( " + DateUtil.format( new Date(), "HH:mm:ss") );
        // 一次性任务, 延迟10秒后执行
        executorService.schedule( ()->task.accept("OneTime"), 10, TimeUnit.SECONDS );
        // 定时任务: 延迟30秒启动,每次间隔10秒开始执行
        executorService.scheduleAtFixedRate( ()->task.accept("fixedRate"), 30, 10, TimeUnit.SECONDS);
        // 定时任务: 延迟30秒启动,每次完成10秒后再次执行
        executorService.scheduleWithFixedDelay( ()->task.accept("fixedDelay") , 30, 10, TimeUnit.SECONDS);
    }
}
 其中上述代码中scheduleAtFixedRate、scheduleWithFixedDelay方法第三个参数的含义分别是两次任务开始执行的间隔时间、上一次任务结束至本次任务开始的间隔时间。与SpringBoot中的@Scheduled(fixedRate)、@Scheduled(fixedDelay)注解的用途类似。值得一提的是,对于scheduleAtFixedRate而言,当 我们指定的两次任务开始执行的间隔时间 小于 该任务执行一次所需的耗时 时,将会以 该任务执行所需的耗时 作为 两次任务开始执行的实际间隔时间
测试结果如下所示。从蓝框可以看出,三个任务的第一次执行时机均按指定的延时时间(分别延迟10秒、30秒、30秒)启动;从绿框可知,对于名为fixedRate的任务而言,每次开始执行的间隔为10秒;从红框可知,对于名为fixedDelay的任务而言,每次开始执行的间隔为15秒。因为其上一次任务结束至本次任务开始的时间间隔为10秒,加上该任务本身耗时5秒,故累计为15秒

ThreadPoolExecutor类
概述
事实上对于上述的工厂方法而言,其内部是使用线程池ThreadPoolExecutor类。该类的继承结构如下所示

与线程类似。对于线程池而言,其整个生命周期阶段也存在若干不同的状态。具体如下
Running:该状态下,线程池可以接受新任务,并能够处理阻塞队列中的任务 ShutDown:该状态下,线程池不再可以接受新任务,但能够继续处理阻塞队列中的任务 Stop:该状态下,线程池不再可以接受新任务,也不会继续处理阻塞队列中的任务。同时会中断正在处理的任务 Tidying:该状态下,线程池中的工作线程数量为0。并且会调用terminated()钩子方法(hook method) Terminated:当terminated()钩子方法(hook method)执行完毕后,线程池进入该状态 
各状态的变化流程如下所示

值得一提的是,在ThreadPoolExecutor的实现过程中。其通过一个AtomicInteger类型的原子变量ctl实现了对线程池状态、工作线程数的记录。具体来说,是将高3位用于表示线程池状态,剩余位表示工作线程数。runStateOf方法用于获取线程池状态信息,workerCountOf方法用于获取工作线程数

在实际应用过程中,线程池ThreadPoolExecutor常见参数如下:
corePoolSize:线程池的核心线程数 maximumPoolSize:线程池的最大线程数 keepAliveTime:空闲线程的超时时间,用于终止空闲线程。通常其只对线程池中超过corePoolSize的多余线程生效。除非allowCoreThreadTimeOut属性设为true,才会对核心线程生效 unit:keepAliveTime参数的时间单位。其可选值定义在枚举类TimeUnit中 workQueue:任务的阻塞队列 handler:当 任务队列workQueue已满 且 线程数已达到maximumPoolSize ,提交新任务时的拒绝策略 
其在接收任务后的基本流程如下所示

拒绝策略
JDK提供了四种拒绝策略,均实现了RejectedExecutionHandler接口
DiscardPolicy 丢弃策略
该策略下当提交的任务 无空闲线程执行 或 任务队列已满 时,则会直接被丢弃且不会产生任何异常
public class RejectedPolicyDemo {
    /**
     * 创建一个线程池,其最多只会创建2个线程,任务队列最多存放1个任务
     */
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
        2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1) );
    public static void test1() {
        // 拒绝策略:直接丢弃
        System.out.println("-------------------- 拒绝策略:直接丢弃 --------------------");
        executor.setRejectedExecutionHandler( new ThreadPoolExecutor.DiscardPolicy() );
        executeJob( "clean" );
        executor.shutdown();
    }
    private static void executeJob(String name) {
        for (int i=0; i<10; i++) {
            executor.submit( new Job( name+"-"+i) );
        }
    }
}
测试结果如下所示,#3-#9号任务被直接丢弃了

DiscardOldestPolicy 丢弃最老策略
该策略下当提交的任务 无空闲线程执行 或 任务队列已满 时,则会丢弃队列中最旧的任务以释放空间来存储该任务
public class RejectedPolicyDemo {
    /**
     * 创建一个线程池,其最多只会创建2个线程,任务队列最多存放1个任务
     */
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
        2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1) );
    public static void test2(){
        // 拒绝策略:丢弃队列中最旧的任务
        System.out.println("-------------------- 拒绝策略:丢弃队列中最旧的任务 --------------------");
        executor.setRejectedExecutionHandler( new ThreadPoolExecutor.DiscardOldestPolicy() );
        executeJob( "register" );
        executor.shutdown();
    }
}
测试结果如下所示。当#0、#1号任务在执行时,#2~#9号任务不断被存储到队列、然后被丢弃以存放最新的任务。所以#9号任务最终被保留并执行

AbortPolicy 中止策略
在该策略下,当线程池无法继续接收提交的任务时会抛出RejectedExecutionException异常。其也是线程池的默认拒绝策略。显然抛出异常的方式可以让开发者更好的把握系统的运行状态。当然在此种拒绝策略下,我们需要处理好其所抛出的异常,以免打断当前的执行流程
public class RejectedPolicyDemo {
    /**
     * 创建一个线程池,其最多只会创建2个线程,任务队列最多存放1个任务
     */
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
        2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1) );
    public static void test3(){
        // 拒绝策略:抛异常
        System.out.println("-------------------- 拒绝策略:抛异常 --------------------");
        executor.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy() );
        try{
            executeJob( "request" );
        }catch (RejectedExecutionException e) {
            System.out.println("[Error] 提交到线程池的任务量过多");
        }
        executor.shutdown();
    }
}
测试结果如下所示

CallerRunsPolicy 调用者执行策略
在该策略下,当线程池无法继续接收提交的任务时,其会交由调用者(提交任务的线程)去执行完成
public class RejectedPolicyDemo {
    /**
     * 创建一个线程池,其最多只会创建2个线程,任务队列最多存放1个任务
     */
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
        2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1) );
    public static void test4() {
        // 拒绝策略:由调用者执行
        System.out.println("-------------------- 拒绝策略:由调用者执行 --------------------");
        executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() );
        executeJob( "caller" );
        executor.shutdown();
    }  
}
测试结果如下所示,符合预期。线程池无法继续接收新任务时,其会被提交任务的线程(即这里的main线程)执行完成

Note
当线程池的拒绝策略为DiscardPolicy、DiscardOldestPolicy时,则对于被拒绝任务的Future实例而言。如果在其上调用无参的get()方法,则会导致一直被阻塞。故在此种场景下,推荐使用支持超时机制的get()方法。测试代码如下所示 
public class ThreadPoolDemo {
    @Test
    public void test1() throws Exception {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new ThreadPoolExecutor.DiscardPolicy());
        Future future1 = executor.submit( taskFactory("Task 1") );
        Future future2 = executor.submit( taskFactory("Task 2") );
        Future future3 = executor.submit( taskFactory("Task 3") );
        future1.get();
        System.out.println("future 1 Over");
        future2.get();
        System.out.println("future 2 Over");
        future3.get();
        System.out.println("future 3 Over");
    }
    private static Runnable taskFactory(String taskName) {
        return () -> {
            // 模拟业务耗时
            try{Thread.sleep(3000);} catch (Exception e){}
            System.out.println(taskName + ": Over");
        };
    }
}
测试结果如下所示,符合预期

线程池使用完毕后,应通过 shutdown() 方法进行关闭。实例代码如下所示 
public static void main(String[] args) {
    System.out.println("Hello World");
    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
    threadPoolExecutor.execute( ()-> System.out.println("Test Task") );
    // 关闭线程池
    //threadPoolExecutor.shutdown();
    System.out.println("Main Over");
}
测试结果如下所示,左下角处的红色方块表明由于线程池未关闭,JVM依然存在并未退出。正确做法是放开上述代码中对线程池关闭操作的注释

参考文献
Java并发编程之美 翟陆续、薛宾田著 
