​一文搞懂线程池的底层原理

袁柏舟

共 9912字,需浏览 20分钟

 ·

2022-02-20 10:09

线程池

较稀缺的资源,在应用程序中大量的创建和销毁线程是非常消耗资源的,在并发的情况下对性能有很大的影响。而使用线程池则可以减少创建和销毁线程的次数,对每个线程可以进行复用,提供使用效率,减少资源的消耗。

1. 线程池的创建

public ThreadPoolExecutor(int corePoolSize,
                  int maximumPoolSize,
              long keepAliveTime,
              TimeUnit unit,
              BlockingQueue workQueue,
              ThreadFactory threadFactory,
              RejectedExecutionHandler handler)

       corePoolSize:            最小核心线程数;

    maximumPoolSize:       最大核心线程数;

    keepAliveTime:          空闲线程最大存活时间;

    unit:                   存活时间的单位:秒/分/时...;

    workQueue:             任务队列,用来存放提交的任务;

    threadFactory:           线程工厂用来创建线程池中工作的线程;

    handler:                拒绝策略,当线程数达到最大线程数,在提交任务线程池该如何操作。那么就由拒绝策略来决定,一共有4种,默认采用的是抛出异常。其他3种下面会进行介绍。


2. 线程池的工作原理

48b13b871232dc4ffb1f20fa768078ee.webp

    这里大致介绍线程池的工作原理,具体的细节需要查看源码,在以下的源码介绍中会进行详细介绍。假设我们提交的任务为while(1)无限循环,会一直执行下去。

线程池的工作原理:

    a. 在初始化线程池的时候,线程池的核心线程数和最大核心线程数都为0,任务队列为空;

    b. 当当前线程第一次通过execute()方法提交任务的时候,线程池会创建一个核心线程,来执行任务。

    c. 当提交的任务超过核心线程数的时候,线程池会把多余的任务加入到任务队列中;

    d. 当任务队列被填充满了的时候,在提交其他任务,线程池会创建最大核心线程来处理任务。

    e. 当创建的线程超过最大核心线程数的时候,继续提交任务会走相应的拒绝策略。默认拒绝策略就是报错。


3. 线程池的状态转换

   线程池共有五种状态:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED

private static final int RUNNING = -1 << COUNT_BITS;  // 111-00000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS;  // 000-00000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS;    // 001-00000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS;  // 010-00000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS; // 011-00000 00000000 00000000 00000000

    其中在线程池创建的时候初始状态为RUNNING

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 111-00000 00000000 00000000 00000000
   线程池的状态转换图:

594412a298fc9ef000978a56b66cb6f1.webp

   1、RUNNING状态:能够接收新任务,以及对添加的任务进行处理。线程池的初始状态为RUNNING,并且线程池的任务数为0;

   2、SHUTDOWN状态:不接收新任务,但能够处理已添加的任务。调用线程池的shutdown()方法,线程池由RUNNING -> SHUTDOWN;

   3、STOP状态:不接受新任务,也不在处理已添加的任务,并且会中断正在处理的任务。调用线程池的shutdownNow()方法,线程池由(RUNNING or SHUTDOWN) -> STOP

   4、IDYING状态:当所有的任务已终止,ctl记录的 “任务数量” 为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()

          terminated()方法在线程池中的实现是空的,若用户想在线程池变为TIDYING时,进行相应的处理,可以通过重载terminated()函数进行实现。

     当在SHUTDOWN状态时,阻塞队列为空并且线程池中工作的线程数也为 0 时,就会由SHUTDOWN -> TIDYING

     当在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING

   5、TERMINATED状态:线程池彻底终止,就会变为TERMINATED状态。线程池处于TIDYING状态时,执行完terminated()后, 就会由TIDYING -> TERMINATED


4. 线程池源码解析

    主要看线程池是如何工作的,那么就看它的提交方法的工作原理。线程池提供了两个函数进行任务提交,分别为execute方法和submit方法。通过底层可以看到submit方法底层仍然是调用execute方法。所以接下来主要以execute方法解析为主。   27c2ac5cb7e922d4b865215c5cf559ab.webp    在介绍线程池的 execute 方法之前,先介绍一下线程池中的一个核心状态属性 ctl,类型是AtomicInteger,可以看作是一个整数类型。它表示线程池的控制状态,这一个状态包含了两个原子的属性,分别为工作的线程数和线程的状态。

可能会奇怪,一个整数类型,怎么表达两个意思呢?这就是它设计的巧妙之处,采用的是位操作,前 3 位用来表示线程池的状态。后 29 位表示创建的线程数。

// ctlOf 进行与操作,wc 都为 0, 所以结果由 rc 来决定
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 111-00000 00000000 00000000 00000000

private static int ctlOf(int rs, int wc) { return rs | wc; }
execute方法的源码解析,execute这个方法就可以体现出线程池的大致工作原理。其中工作线程数也就是线程池中已有的线程数。在 execute 代码中创建线程的工作主要在 addWorker 方法中。
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 获取线程池的状态,这个状态包括 线程池的状态(高3位)| 工作的线程数 (低29位)
    int c = ctl.get();
    // 计算工作线程数是否小于最小核心线程数
    if (workerCountOf(c) < corePoolSize) {
    // true表示创建最小核心线程数的逻辑,一个工作线程 可以看成对应一个 Worker对象
        if (addWorker(command, true))
            // 表示创建成功
            return;
        c = ctl.get();
    }
    // 判断线程池的状态是否为Running状态,如果为true才加入到队列中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get(); // 再次进行判断,确保线程安全
        if (! isRunning(recheck) && remove(command)) // 如果当前线程池不是运行状态,则移除队列中的任务
            reject(command); // 使用拒绝策略
        else if (workerCountOf(recheck) == 0) // 如果工作线程数为0, 则需要在创建新的线程进行处理任务
            addWorker(null, false);
    }
    else if (!addWorker(command, false)) // 绑定一个非核心线程数 (创建最大核心线程数逻辑)
        // 如果创建最大核心线程数失败,使用拒绝策略
        reject(command);
}
addWorker 方法创建线程,处理提交任务。在addWorker 方法中我们可以把一些判断逻辑去掉,主要看它主要的方法,其中最主要的就是 w = new Worker(firstTask); 创建一个工作线程对象。然后,获取工作线程的线程对象,调用它的start方法,启动工作线程。
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        /**
         * 如果 rs >= SHUTDOWN 表示当前线程池不可能在接收任务了
         *
         */

        if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                    firstTask == null &&
                    !workQueue.isEmpty()))
            return false;
    
        for (;;) {
            int wc = workerCountOf(c);
            // core 为 true,表示是最小核心线程数的步骤, 如果 wc >= corePoolSize, 表示无法在创建最小核心线程数了
            // maximumPoolSize 也是一样,表示无法创建最大核心线程数了
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 先把工作线程数CAS + 1 操作
            if (compareAndIncrementWorkerCount(c))
                break retry; // 跳出 retry 标记层的循环
            c = ctl.get(); // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    
    boolean workerStarted = false;
    boolean workerAdded = false;
    // 一个Worker 包含一个线程和一个任务
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 获取不到锁,在此阻塞
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if shut down before lock acquired.
                int rs = runStateOf(ctl.get());
    
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 把工作的线程添加到Set集合中
                    workers.add(w);
                    // 记录线程池中工作线程的值
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 任务开始执行,实现在 Worker中的run方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

    Worker 工作线程类的具体实现。类中有两个非常重要的属性一个是创建的线程和提交的任务。也就是线程和任务绑定到一块。其中构造方法getThreadFactory()获取 线程工程默认DefaultThreadFactory类的实现,然后调用newThread方法创建一个线程,参数是Worker 类自身。

    也就是说,当线程启动的时候 (在addWorker 方法的 t.start() 完成) ,会调用 Worker的run方法实现。

    而 Worker 的run方法调用 runWorker 方法。其中 runWoker 方法是线程池工作的主要方法。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /** Thread this worker is running in. Null if factory fails. */
    final Thread thread;
    /** Initial task to run. Possibly null. */
    Runnable firstTask;
    ...
    Worker(Runnable firstTask) {
        ...
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    
    /** Delegates main run loop to outer runWorker */
    public void run() {
        runWorker(this);
    }
}


public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(),0);
    if (t.isDaemon())
        t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
    return t;
}

    线程池工作的主要实现 就是在 runWorker 方法中。去掉一些不重要的代码,主要看它的 while 的循环条件和  task.run(); 方法。线程池能够不停的进行工作原理,就是依赖阻塞队列完成的。当前线程如果完成了提交任务,会从阻塞队列中进行拉取任务,如果阻塞队列任务为空,那么当前线程就会阻塞。直到往阻塞队列里面添加任务,即可唤醒线程继续工作。

    注意,当创建的核心线程数已满的时候,执行的任务已完成并且任务队列为空,那么创建的核心线程数拉取任务的时候就会阻塞,不会被释放掉。当再次提交任务的时候,不会在创建核心线程数了,而是唤醒阻塞的核心线程数继续从阻塞队列拉取任务,进行执行,如果执行完后,任务队列又为空,那么核心线程数再次阻塞。依次循环往复,到达线程复用的效果。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // task != null 表示是创建新的 Worker线程
        // 如果 task == null, 那么会走 task = getTask() 表示该线程之前的任务已经执行完成了,从任务队列里获取任务
       // 通过循环不断的从当前提交的任务和从任务队列获取任务进行工作
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task); // 扩展程序,未实现
                Throwable thrown = null;
                try {
                    task.run(); // 直接调用run, 而不是通过线程自身回调的
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown); // 扩展程序,未实现
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

    线程复用的实现就是 getTask 方法。最大核心线程数过期的具体实现也在 getTask 方法中。如果 timed 为 true,说明当前线程数超过最小核心线程数(启用了最大核心线程数),会从阻塞队列中拉取并设置过期时间。

    如果在指定时间内,拉取不到就会解开阻塞,表示任务队列中没有任务。那么最大核心线程数就可以关闭了。把timedOut  值为 true,在下一次的 for(;;) 的循环中调用compareAndDecrementWorkerCount 把 ctl 的线程数减1,并返回null。当返回为 null 的时候 runWorker的 while 条件就不满足了,会直接跳出去,这个函数也就执行完成了,那么对应的线程也就释放了。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // 是否为非核心线程数, (allowCoreThreadTimeOut 如果为ture 是允许关闭核心线程数的)
        // 保证线程数 符合核心线程数即可
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // wc > maximumPoolSize 大于最大核心线程数
            // timed && timedOut (timed 为 true 表示当前线程数大于最小核心线程数) (timedOut 表示任务队列为空,并超过当前线程的最大存活时间)
            // wc > 1 表示有可以剔除的线程数量
            // 表示队列为空,目前没有任务
            if (compareAndDecrementWorkerCount(c)) // 剔除一个线程数
                return null;
            continue;
        }

        try {
            // 当阻塞队列为空的时候,如果不设置过期时间,在此会进行阻塞
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}



浏览 20
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报