全网最详细的线程池 ThreadPoolExecutor 解读!
答应我, 不要再用 if (obj != null) 判空了 20个示例!详解 Java8 Stream 用法,从此告别shi山(垃圾代码) 利用Java8新特征,重构传统设计模式,你学会了吗? 竟然有一半的人不知道 for 与 foreach 的区别??? 利用多线程批量拆分 List 导入数据库,效率杠杠的!
一、ThreadPoolExecutor类讲解
1、线程池状态:
五种状态:

线程池的 shutdown()
方法,将线程池由 RUNNING(运行状态)转换为 SHUTDOWN状态线程池的 shutdownNow()
方法,将线程池由RUNNING 或 SHUTDOWN 状态转换为 STOP 状态。
注:
SHUTDOWN
状态 和 STOP 状态 先会转变为TIDYING
状态,最终都会变为TERMINATED
2、ThreadPoolExecutor构造函数:
ThreadPoolExecutor
继承自AbstractExecutorService
,而AbstractExecutorService
实现了ExecutorService
接口。

接下来我们分别讲解这些参数的含义。
2.1)线程池工作原理:
corePoolSize
:线程池中核心线程数的最大值maximumPoolSize
:线程池中能拥有最多线程数workQueue
:用于缓存任务的阻塞队列
当调用线程池execute()
方法添加一个任务时,线程池会做如下判断:
如果有空闲线程,则直接执行该任务; 如果没有空闲线程,且当前运行的线程数少于 corePoolSize
,则创建新的线程执行该任务;如果没有空闲线程,且当前的线程数等于 corePoolSize
,同时阻塞队列未满,则将任务入队列,而不添加新的线程;如果没有空闲线程,且阻塞队列已满,同时池中的线程数小于 maximumPoolSize
,则创建新的线程执行任务;如果没有空闲线程,且阻塞队列已满,同时池中的线程数等于 maximumPoolSize
,则根据构造函数中的 handler 指定的策略来拒绝新的任务。

2.2)KeepAliveTime:
keepAliveTime
:表示空闲线程的存活时间TimeUnit unit
:表示keepAliveTime的单位
当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize
,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize
的大小。
注:如果线程池设置了
allowCoreThreadTimeout
参数为true(默认false),那么当空闲线程超过keepaliveTime
后直接停掉。(不会判断线程数是否大于corePoolSize
)即:最终线程数会变为0。
2.3)workQueue 任务队列:
workQueue
:它决定了缓存任务的排队策略ThreadPoolExecutor
线程池推荐了三种等待队列,它们是:SynchronousQueue
、LinkedBlockingQueue
和ArrayBlockingQueue
。
1)有界队列:
SynchronousQueue
:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于 阻塞状态,吞吐量通常要高于LinkedBlockingQueue
,静态工厂方法Executors.newCachedThreadPool
使用了这个队列。ArrayBlockingQueue
:一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
2)无界队列:
LinkedBlockingQueue
:基于链表结构的无界阻塞队列,它可以指定容量也可以不指定容量(实际上任何无限容量的队列/栈都是有容量的,这个容量就是Integer.MAX_VALUE
)PriorityBlockingQueue
:是一个按照优先级进行内部元素排序的无界阻塞队列。队列中的元素必须实现 Comparable 接口,这样才能通过实现compareTo()
方法进行排序。优先级最高的元素将始终排在队列的头部;PriorityBlockingQueue
不会保证优先级一样的元素的排序。
注意:
keepAliveTime
和maximumPoolSize
及BlockingQueue
的类型均有关系。如果BlockingQueue
是无界的,那么永远不会触发maximumPoolSize
,自然keepAliveTime
也就没有了意义。
2.4)threadFactory:
threadFactory
:指定创建线程的工厂。(可以不指定)
如果不指定线程工厂时,ThreadPoolExecutor
会使用ThreadPoolExecutor.defaultThreadFactory
创建线程。默认工厂创建的线程:同属于相同的线程组,具有同为 Thread.NORM_PRIORITY
的优先级,以及名为 “pool-XXX-thread-
” 的线程名(XXX为创建线程时顺序序号),且创建的线程都是非守护进程。
2.5)handler 拒绝策略:
handler
:表示当 workQueue
已满,且池中的线程数达到 maximumPoolSize
时,线程池拒绝添加新任务时采取的策略。(可以不指定)

最科学的的还是 AbortPolicy 提供的处理方式:抛出异常,由开发人员进行处理。
3、常用方法:
除了在创建线程池时指定上述参数的值外,还可在线程池创建以后通过如下方法进行设置。

此外,还有一些方法:
getCorePoolSize()
:返回线程池的核心线程数,这个值是一直不变的,返回在构造函数中设置的coreSize大小;getMaximumPoolSize()
:返回线程池的最大线程数,这个值是一直不变的,返回在构造函数中设置的coreSize大小;getLargestPoolSize()
:记录了曾经出现的最大线程个数(水位线);getPoolSize()
:线程池中当前线程的数量;getActiveCount()
:Returns the approximate(近似) number of threads that are actively executing tasks;prestartAllCoreThreads()
:会启动所有核心线程,无论是否有待执行的任务,线程池都会创建新的线程,直到池中线程数量达到 corePoolSize;prestartCoreThread()
:会启动一个核心线程(同上);allowCoreThreadTimeOut(true)
:允许核心线程在KeepAliveTime时间后,退出;
4、Executors类:
Executors类的底层实现便是ThreadPoolExecutor!Executors 工厂方法有:
Executors.newCachedThreadPool()
:无界线程池,可以进行自动线程回收Executors.newFixedThreadPool(int)
:固定大小线程池Executors.newSingleThreadExecutor()
:单个后台线程
它们均为大多数使用场景预定义了设置。不过在阿里java文档中说明,尽量不要用该类创建线程池。
二、线程池相关接口介绍:
1、ExecutorService接口:
该接口是真正的线程池接口。上面的ThreadPoolExecutor
以及下面的ScheduledThreadPoolExecutor
都是该接口的实现类。改接口常用方法:
Future submit(Runnable task)
:提交Runnable任务到线程池,返回Future对象,由于Runnable没有返回值,也就是说调用Future对象get()方法返回null;
:提交Callable任务到线程池,返回Future对象,调用Future对象get()方法可以获取Callable的返回值;Future submit(Callable task)
:提交Runnable任务到线程池,返回Future对象,调用Future对象get()方法可以获取Runnable的参数值;Future submit(Runnable task,T result) invokeAll(collection of tasks)/invokeAll(collection of tasks, long timeout, TimeUnit unit)
:invokeAll会按照任务集合中的顺序将所有的Future添加到返回的集合中,该方法是一个阻塞的方法。只有当所有的任务都执行完毕时,或者调用线程被中断,又或者超出指定时限时,invokeAll方法才会返回。当invokeAll返回之后每个任务要么返回,要么取消,此时客户端可以调用get/isCancelled来判断具体是什么情况。invokeAny(collection of tasks)/invokeAny(collection of tasks, long timeout, TimeUnit unit)
:阻塞的方法,不会返回 Future 对象,而是返回集合中某一个Callable 对象的结果,而且无法保证调用之后返回的结果是哪一个 Callable,如果一个任务运行完毕或者抛出异常,方法会取消其它的 Callable 的执行。和invokeAll区别是只要有一个任务执行完了,就把结果返回,并取消其他未执行完的任务;同样,也带有超时功能;shutdown()
:在完成已提交的任务后关闭服务,不再接受新任;shutdownNow()
:停止所有正在执行的任务并关闭服务;isTerminated()
:测试是否所有任务都执行完毕了;isShutdown()
:测试是否该ExecutorService已被关闭。
1.1)submit方法示例:
我们知道,线程池接口中有以下三个主要方法,接下来我们看一下具体示例:

1)Callable:
public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 50, 300, TimeUnit.SECONDS,
new ArrayBlockingQueue(50),
new ThreadFactory(){ public Thread newThread(Runnable r) {
return new Thread(r, "schema_task_pool_" + r.hashCode());
}}, new ThreadPoolExecutor.DiscardOldestPolicy());
public static void callableTest() {
int a = 1;
//callable
Future future = threadPool.submit(new Callable(){
@Override
public Boolean call() throws Exception {
int b = a + 100;
System.out.println(b);
return true;
}
});
try {
System.out.println("feature.get");
Boolean boolean1 = future.get();
System.out.println(boolean1);
} catch (InterruptedException e) {
System.out.println("InterruptedException...");
e.printStackTrace();
} catch (ExecutionException e) {
System.out.println("execute exception...");
e.printStackTrace();
}
}
2)Runnable:
public static void runnableTest() {
int a = 1;
//runnable
Future future1 = threadPool.submit(new Runnable(){
@Override
public void run() {
int b = a + 100;
System.out.println(b);
}
});
try {
System.out.println("feature.get");
Object x = future1.get(900,TimeUnit.MILLISECONDS);
System.out.println(x);//null
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
System.out.println("execute exception...");
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
3)Runnable+result:
class RunnableTask implements Runnable {
Person p;
RunnableTask(Person p) {
this.p = p;
}
@Override
public void run() {
p.setId(1);
p.setName("Runnable Task...");
}
}
class Person {
private Integer id;
private String name;
public Person(Integer id, String name) {
super();
this.id = id;
this.name = name;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person [id=" + id + ", name=" + name + "]";
}
}
public static void runnableTest2() {
//runnable + result
Person p = new Person(0,"person");
Future future2 = threadPool.submit(new RunnableTask(p),p);
try {
System.out.println("feature.get");
Person person = future2.get();
System.out.println(person);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
1.2)线程池执行时,Callable的call方法(Runnable的run方法)抛出异常后,会出现什么?
在上面的例子中我们可以看到,线程池无论是执行Callable
还是Runnable
,调用返回的Future对象get()
方法时需要处理两种异常(如果是调用get(timeout)
方法,需要处理三种异常),如下:
//在线程池上运行
Future