10问10答:你真的了解线程池吗?

《Java开发手册》中强调,线程资源必须通过线程池提供,而创建线程池必须使用ThreadPoolExecutor。手册主要强调利用线程池避免两个问题,一是线程过渡切换,二是避免请求过多时造成OOM。但是如果参数配置错误,还是会引发上面的两个问题。所以本节我们主要是讨论ThreadPoolExecutor的一些技术细节,并且给出几个常用的最佳实践建议。
我在查找资料的过程中,发现有些问题存在争议。后面发现,一部分原因是因为不同JDK版本的现实是有差异的。因此,下面的分析是基于当下最常用的版本JDK1.8,并且对于存在争议的问题,我们分析源码,源码才是最准确的。
1 corePoolSize=0会怎么样
这是一个争议点。我发现大部分博文,不论是国内的还是国外的,都是这样回答这个问题的:
提交任务后,先判断当前池中线程数是否小于corePoolSize,如果小于,则创建新线程执行这个任务。
否则,判断等待队列是否已满,如果没有满,则添加到等待队列。
否则,判断当前池中线程数是否大于maximumPoolSize,如果大于则拒绝。
否则,创建一个新的线程执行这个任务。
int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);// 注意这一行代码,添加到等待队列成功后,判断当前池内线程数是否为0,如果是则创建一个firstTask为null的worker,这个worker会从等待队列中获取任务并执行。else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);
线程池提交任务后,首先判断当前池中线程数是否小于corePoolSize。
如果小于则尝试创建新的线程执行该任务;否则尝试添加到等待队列。
如果添加队列成功,判断当前池内线程数是否为0,如果是则创建一个firstTask为null的worker,这个worker会从等待队列中获取任务并执行。
如果添加到等待队列失败,一般是队列已满,才会再尝试创建新的线程。
但在创建之前需要与maximumPoolSize比较,如果小于则创建成功。
否则执行拒绝策略。
prestartCoreThread:Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new tasks are executed.
prestartAllCoreThreads:Starts all core threads.
corePoolSize=0:在一般情况下只使用一个线程消费任务,只有当并发请求特别多、等待队列都满了之后,才开始用多线程。
allowsCoreThreadTimeOut=true && corePoolSize>1:在一般情况下就开始使用多线程(corePoolSize个),当并发请求特别多,等待队列都满了之后,继续加大线程数。但是当请求没有的时候,允许核心线程也终止。
在这个while条件中,有个getTask()方法是核心中的核心,它所做的事情就是从等待队列中取出任务来执行:
如果没有达到corePoolSize,则创建的Worker在执行完它承接的任务后,会用workQueue.take()取任务、注意,这个接口是阻塞接口,如果取不到任务,Worker线程一直阻塞。
如果超过了corePoolSize,或者allowCoreThreadTimeOut,一个Worker在空闲了之后,会用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)取任务。注意,这个接口只阻塞等待keepAliveTime时间,超过这个时间返回null,则Worker的while循环执行结束,则被终止了。
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 看这里,核心逻辑在这里while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 注意,核心中的核心在这里Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
虚拟机栈 本地方法栈 程序计数器
ThreadLocal:业务代码是否使用了ThreadLocal?就算没有,Spring框架中也大量使用了ThreadLocal,你所在公司的框架可能也是一样。
局部变量:线程处于阻塞状态,肯定还有栈帧没有出栈,栈帧中有局部变量表,凡是被局部变量表引用的内存都不能回收。所以如果这个线程创建了比较大的局部变量,那么这一部分内存无法GC。
TLAB机制:如果你的应用线程数处于高位,那么新的线程初始化可能因为Eden没有足够的空间分配TLAB而触发YoungGC。
线程池保持空闲的核心线程是它的默认配置,一般来讲是没有问题的,因为它占用的内存一般不大。怕的就是业务代码中使用ThreadLocal缓存的数据过大又不清理。
如果你的应用线程数处于高位,那么需要观察一下YoungGC的情况,估算一下Eden大小是否足够。如果不够的话,可能要谨慎地创建新线程,并且让空闲的线程终止;必要的时候,可能需要对JVM进行调参。
如果我们使用execute()提交任务,我们一般要在Runable任务的代码加上try-catch进行异常处理。
如果我们使用submit()提交任务,我们一般要在主线程中,对Future.get()进行try-catch进行异常处理。
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)// 核心代码s = awaitDone(false, 0L);return report(s);}private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;// 死循环for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;// 只有任务的状态是’已完成‘,才会跳出死循环if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}
public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}// get方法中依赖的,报告执行结果private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}
不论是用execute还是submit,都可以自己在业务代码上加try-catch进行异常处理。我一般喜欢使用这种方式,因为我喜欢对不同业务场景的异常进行差异化处理,至少打不一样的日志吧。
如果是execute,还可以自定义线程池,继承ThreadPoolExecutor并复写其afterExecute(Runnable r, Throwable t)方法。
或者实现Thread.UncaughtExceptionHandler接口,实现void uncaughtException(Thread t, Throwable e);方法,并将该handler传递给线程池的ThreadFactory。
但是注意,afterExecute和UncaughtExceptionHandler都不适用submit。因为通过上面的FutureTask.run()不难发现,它自己对Throwable进行了try-catch,封装到了outcome属性,所以底层方法execute的Worker是拿不到异常信息的。
shutdown => 平缓关闭,等待所有已添加到线程池中的任务执行完再关闭。
shutdownNow => 立刻关闭,停止正在执行的任务,并返回队列中未执行的任务。
/*** Executes the given command at some time in the future. The command* may execute in a new thread, in a pooled thread, or in the calling* thread, at the discretion of the {@code Executor} implementation.*/
【强制】使用ThreadPoolExecutor的构造函数声明线程池,避免使用Executors类的 newFixedThreadPool和newCachedThreadPool。
【强制】 创建线程或线程池时请指定有意义的线程名称,方便出错时回溯。即threadFactory参数要构造好。
【建议】建议不同类别的业务用不同的线程池。
【建议】CPU密集型任务(N+1):这种任务消耗的主要是CPU资源,可以将线程数设置为N(CPU核心数)+1,比CPU核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用CPU的空闲时间。
【建议】I/O密集型任务(2N):这种任务应用起来,系统会用大部分的时间来处理I/O交互,而线程在处理I/O的时间段内不会占用CPU来处理,这时就可以将CPU交出给其它线程使用。因此在I/O密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是2N。
【建议】workQueue不要使用无界队列,尽量使用有界队列。避免大量任务等待,造成OOM。
【建议】如果是资源紧张的应用,使用allowsCoreThreadTimeOut可以提高资源利用率。
【建议】虽然使用线程池有多种异常处理的方式,但在任务代码中,使用try-catch最通用,也能给不同任务的异常处理做精细化。
【建议】对于资源紧张的应用,如果担心线程池资源使用不当,可以利用ThreadPoolExecutor的API实现简单的监控,然后进行分析和优化。

private static final ThreadPoolExecutor pool;static {ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("po-detail-pool-%d").build();pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(512), threadFactory, new ThreadPoolExecutor.AbortPolicy());pool.allowCoreThreadTimeOut(true);}
threadFactory:给出带业务语义的线程命名。
corePoolSize:快速启动4个线程处理该业务,是足够的。
maximumPoolSize:IO密集型业务,我的服务器是4C8G的,所以4*2=8。
keepAliveTime:服务器资源紧张,让空闲的线程快速释放。
pool.allowCoreThreadTimeOut(true):也是为了在可以的时候,让线程释放,释放资源。
workQueue:一个任务的执行时长在100~300ms,业务高峰期8个线程,按照10s超时(已经很高了)。10s钟,8个线程,可以处理10 * 1000ms / 200ms * 8 = 400个任务左右,往上再取一点,512已经很多了。
handler:极端情况下,一些任务只能丢弃,保护服务端。
有道无术,术可成;有术无道,止于术
欢迎大家关注Java之道公众号
好文章,我在看❤️
评论
