详述Java线程池实现原理
点击上方蓝色字体,选择“标星公众号”
优质文章,第一时间送达
一、写在前面
1.1 线程池是什么
线程池(Thread Pool) 是一种池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。
线程过多会带来额外的开销,其中包括创建销毁线程的开销,操作系统调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务是创建销毁线程开销代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对操作系统内核的充分利用。
本文描述的线程池是JDK提供的ThreadPoolExecutor类
1.2 线程池解决的问题是什么
线程池解决的问题就是资源管理的问题。在并发环境下,系统不能够确定在任意时刻有多少任务需要执行,有多少资源需要投入。
二、线程池和核心设计与实现
2.1 总体设计
2.2 生命周期管理
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Packing and unpacking ctl
// 计算当前运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; } 
// 计算当前线程数据
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 通过状态和线程数生成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
| 运行状态 | 状态描述 | 
|---|---|
| RUNNING | 能接受新提交的任务,并且也能处理阻塞队列中的任务 | 
| SHUTDOWN | 状态关闭,不在接受新提交的任务,但是能继续处理阻塞队列已保存的让任务 | 
| STOP | 不接受新任务,也不处理队列中的任务,会中断正在处理任务的线程 | 
| TIDYING | 所有让任务都已终止,workerCount(有效处理让任务线程)状态为0 | 
| TERMINATED | 在terminated()方法执行结束后进入该状态 | 
2.3 任务调度机制
2.3.1 任务调度
2.3.2 任务缓冲
| 名称 | 描述 | 
|---|---|
| ArrayBlockingQueue | 一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁 | 
| LinkedBlockingDeque | 一个由链表结构组成的有界队列,此队列按照先进先出(FIFO)的原则对元素进行排序。此队列的默认长度为Integer.MAX_VALUE,所以默认创建此队列有容量危险 | 
| PriorityBlockingQueue | 一个支持线程优先级排序的无界队列,默认自然进行排序,也可以自定义实现compareTo()方法指定排序故障,不能保证同优先级元素的顺序。 | 
| DelayQueue | 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延迟期满后才能从队列中获取元素。 | 
| SynchronousQueue | 一个不存储元素的阻塞队列,每个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务来)创建新的线程,如果有空闲的线程就使用空闲线程,线程空闲60秒会被回收。 return new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());  | 
| LinkedTransferQueue | 一个由链表结构组成的无界阻塞队列,相当于其他队列,LinkedTransferQueue多了transfer和tryTransfer方法 | 
| LinkedBlockingQueue | 一个由链表结构组成的双向阻塞队列,队列的头部和尾部都可以插入和删除元素,多线程并发时,可以将锁的竞争最多降到一半 | 
2.3.3 任务申请
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        // 判断线程池是否已停止运行
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  // 判断线程现阶段是否够多
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
  // 限时任务获取和阻塞获取
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
2.3.4 任务拒绝
public interface RejectedExecutionHandler {
    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
| 名称 | 描述 | 
|---|---|
| ThreadPoolExecutor.AbortPolicy | 丢弃任务并抛出RejectedExecutionException异常。这是线程池默认的拒绝策略,在任务不能在提交的时候,抛出异常,及时反馈程序运行状态。如果是比较关键的业务,推荐使用该策略,这样子在系统不能承载更大并发的时候,能过及时的通过异常发现。 | 
| ThreadPoolExecutor.DiscardPolicy | 丢弃任务,但是不抛出异常。使用该策略,可能会使我们无法发现系统的异常状态。建议一些无关紧要的业务采用此策略。 | 
| ThreadPoolExecutor.DiscardOldestPolicy | 丢弃队列最前面的任务,然后重新提交比拒接的任务。是否要采用此种策略,需要根据实际业务是否允许丢弃老任务来认真衡量 | 
| ThreadPoolExecutor.CallerRunsPolicy | 由调用线程(提交任务的线程)来处理任务。这种情况是需要让所有的任务都执行完毕,那么就适合大量计算的任务类型去执行,多线程仅仅是增加大吞吐量的手段,最终必须要让每个任务都执行 | 
/**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
         * Creates a {@code CallerRunsPolicy}.
         */
    public CallerRunsPolicy() { }
    /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
/**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
public static class AbortPolicy implements RejectedExecutionHandler {
    /**
         * Creates an {@code AbortPolicy}.
         */
    public AbortPolicy() { }
    /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
/**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
         * Creates a {@code DiscardPolicy}.
         */
    public DiscardPolicy() { }
    /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}
/**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
    public DiscardOldestPolicy() { }
    /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}
2.4 Worker线程管理
2.4.1 Worker线程
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** Thread this worker is running in.  Null if factory fails. */
        // worker持有的线程
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        // 初始化的任务,可以为null
        Runnable firstTask;
    
        ...
}
/**
     * The queue used for holding tasks and handing off to worker
     * threads.  We do not require that workQueue.poll() returning
     * null necessarily means that workQueue.isEmpty(), so rely
     * solely on isEmpty to see if the queue is empty (which we must
     * do for example when deciding whether to transition from
     * SHUTDOWN to TIDYING).  This accommodates special-purpose
     * queues such as DelayQueues for which poll() is allowed to
     * return null even if it may later return non-null when delays
     * expire.
     */
# workerQueue 源码定义
private final BlockingQueue<Runnable> workQueue;
/**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
private final HashSet<Worker> workers = new HashSet<Worker>();
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
 public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            // 执行interruptIdleWorkers方法
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                // 执行interruptIdleWorkers
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
 private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
2.4.2 worker线程增加
 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 判断线程是否已经停止
            // 判断线程是否正在停止 如果是则判断线程是否用于执行剩余任务firstTask
            // workQueue是否为空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            for (;;) {
                // 获取线程数量
                int wc = workerCountOf(c);
                // 判断线程是否超过容量 
                // 判断线程是否超过对应核心数  上面讲了core 传true/false区别
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 尝试登记线程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
     // 判断线程池状态是否改变
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 增加线程
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    // 释放锁
                    mainLock.unlock();
                }
                // 增加成功启动线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
2.4.3 worker线程回收
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                // 执行任务
        } finally {
                // 获取不到任务,主动回收自己
            processWorkerExit(w, completedAbruptly);
        }
    }
    
    
    
 private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // 回收
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
  ...
    }
2.4.4 worker线程执行任务
2.4.5 worker如何保证核心线程不被回收
public void execute(Runnable command) {
     // 提交任务为null 抛出异常
        if (command == null)
            throw new NullPointerException();
    // 获取线程池状态\线程池线程数据
        int c = ctl.get();
     // 小于核心线程数 addWorker()
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
     // 大于核心线程数,当前线程池是运行状态,向阻塞队列中添加任务
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
     // 队列添加失败 拒绝策略处理
        else if (!addWorker(command, false))
            reject(command);
    }
 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
       // 死循环
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
   // 如果当前线程状态是SHUTDOWN STOP TIDYING TERMINATED 并且SHUTDOWN状态时任务队列为空 返回false
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
   // 死循环
            for (;;) {
                int wc = workerCountOf(c);
                // core参数 true corePoolSize核心线程数 false maximumPoolSize最大线程数
                // CAPACITY integer最大值 (1 << COUNT_BITS) - 1;
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 如果增加任务成功,退出该循环执行下面代码,否则继续
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 重点代码 后续分析
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 内置锁 加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
     // 判断线程池状态,防止使用过程中线程池被关闭
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 向正在被执行的任务队列workers中添加worker 
                        // 注意区分 
                        // HashSet<Worker> workers = new HashSet<Worker>()  线程池中线程
                        // private final BlockingQueue<Runnable> workQueue 等待被执行的任务
                        workers.add(w);
                        int s = workers.size();
                        // 记录任务最大数
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 添加任务成功
                        workerAdded = true;
                    }
                } finally {
                    // 释放锁
                    mainLock.unlock();
                }
                // 添加任务成功,那么开始执行任务
                if (workerAdded) {
                    // 重点代码 -- 我们需要查看worker中的run()
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
 /** Delegates main run loop to outer runWorker  */
public void run() {
    runWorker(this);
}
 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
      // 获取worker对象中的任务 可以为null 
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 死循环
            // 判断任务是否为空,如果为空则getTask()获取任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 任务执行前调用
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 任务执行后调用
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 重点代码,执行完任务将task设置为null 则会从getTask()重新获取
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 回收worker 
            processWorkerExit(w, completedAbruptly);
        }
    }
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
  // 死循环
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
   // 判断线程池状态
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
   // 统计worker
            int wc = workerCountOf(c);
            // 如果设置了allowCoreThreadTimeOut(true) 或者当前运行的统计worker数大于设置的核心线程数,那么timed =true
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            // 核心代码
            try {
                // 看完这里就明白了
                // 阻塞队列获取
                // workQueue.poll() 规定时间获取任务
                // workQueue.take() 会一直等待,知道阻塞队列中任务不为空
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 获取任务返回
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:
https://blog.csdn.net/qq_41125219/article/details/117535516


评论












