深入浅出线程池

1、什么是线程
2、如何创建线程
2.1、JAVA中创建线程
/*** 继承Thread类,重写run方法*/class MyThread extends Thread {public void run() {System.out.println("myThread..." + Thread.currentThread().getName());} }/*** 实现Runnable接口,实现run方法*/class MyRunnable implements Runnable {public void run() {System.out.println("MyRunnable..." + Thread.currentThread().getName());} }/*** 实现Callable接口,指定返回类型,实现call方法*/class MyCallable implements Callable<String> {public String call() throws Exception {return "MyCallable..." + Thread.currentThread().getName();} }
2.2、测试一下
public static void main(String[] args) throws Exception {MyThread thread = new MyThread();thread.run(); //myThread...mainthread.start(); //myThread...Thread-0MyRunnable myRunnable = new MyRunnable();Thread thread1 = new Thread(myRunnable);myRunnable.run(); //MyRunnable...mainthread1.start(); //MyRunnable...Thread-1MyCallable myCallable = new MyCallable();FutureTask<String> futureTask = new FutureTask<>(myCallable);Thread thread2 = new Thread(futureTask);thread2.start();System.out.println(myCallable.call()); //MyCallable...mainSystem.out.println(futureTask.get()); //MyCallable...Thread-2}
2.3、问题
2.4、问题分析
class Thread implements Runnable { //Thread类实现了Runnalbe接口,实现了run()方法private Runnable target;public synchronized void start() {...boolean started = false;try {start0(); //可以看到,start()方法真实的调用时start0()方法started = true;} finally {...}}private native void start0(); //start0()是一个native方法,由JVM调用底层操作系统,开启一个线程,由操作系统过统一调度public void run() {if (target != null) {target.run(); //操作系统在执行新开启的线程时,回调Runnable接口的run()方法,执行我们预设的线程任务}}}
2.5、总结
-
JAVA不能直接创建线程执行任务,而是通过创建Thread对象调用操作系统开启线程,在由操作系 统回调Runnable接口的run()方法执行任务; -
实现Runnable的方式,将线程实际要执行的回调任务单独提出来了,实现线程的启动与回调任务 解耦; -
实现Callable的方式,通过Future模式不但将线程的启动与回调任务解耦,而且可以在执行完成后 获取到执行的结果;
1、什么是多线程
2、多线程有什么好处
2.1、串行处理
public static void main(String[] args) throws Exception {System.out.println("start...");long start = System.currentTimeMillis();for (int i = 0; i < 5; i++) {Thread.sleep(2000); //每个任务执行2秒System.out.println("task done..."); //处理执行结果}long end = System.currentTimeMillis();System.out.println("end...,time = " + (end - start));}//执行结果start...task done...task done...task done...task done...task done... end...,time = 10043
2.2、并行处理
public static void main(String[] args) throws Exception {System.out.println("start...");long start = System.currentTimeMillis();List<Future> list = new ArrayList<>();for (int i = 0; i < 5; i++) {Callable<String> callable = new Callable<String>() {@Overridepublic String call() throws Exception {Thread.sleep(2000); //每个任务执行2秒return "task done...";}};FutureTask task = new FutureTask(callable);list.add(task);new Thread(task).start();}list.forEach(future -> {try {System.out.println(future.get()); //处理执行结果 } catch (Exception e) {}});long end = System.currentTimeMillis();System.out.println("end...,time = " + (end - start));}//执行结果start...task done...task done...task done...task done...task done... end...,time = 2005
2.3、总结
-
多线程可以把一个任务拆分为几个子任务,多个子任务可以并发执行,每一个子任务就是一个线程。 -
多线程是为了同步完成多项任务,不是为了提高运行效率,而是为了提高资源使用效率来提高系统 的效率。
2.4、多线程的问题
1、如何设计一个线程池
1.1、线程池基本功能
-
多线程会创建大量的线程耗尽资源,那线程池应该对线程数量有所限制,可以保证不会耗尽系统资 源; -
每次创建新的线程会增加创建时的开销,那线程池应该减少线程的创建,尽量复用已创建好的线 程;
1.2、线程池面临问题
-
我们知道线程在执行完自己的任务后就会被回收,那我们如何复用线程? -
我们指定了线程的最大数量,当任务数超出线程数时,我们该如何处理?
1.3、创新源于生活
-
最开始货物来的时候,我们还没有货车,每批要运输的货物我们都要购买一辆车来运输; -
当货车运输完成后,暂时还没有下一批货物到达,那货车就在仓库停着,等有货物来了立马就可以 运输; -
当我们有了一定数量的车后,我们认为已经够用了,那后面就不再买车了,这时要是由新的货物来 了,我们就会让货物先放仓库,等有车回来再配送; -
当618大促来袭,要配送的货物太多,车都在路上,仓库也都放满了,那怎么办呢?我们就选择临 时租一些车来帮忙配送,提高配送的效率; -
但是货物还是太多,我们增加了临时的货车,依旧配送不过来,那这时我们就没办法了,只能让发货的客户排队等候或者干脆不接收了; -
大促圆满完成后,累计的货物已经配送完成了,为了降低成本,我们就将临时租的车都还了;
1.4、技术源于创新
-
当任务进来我们还没有线程时,我们就该创建线程执行任务; -
当线程任务执行完成后,线程不释放,等着下一个任务进来后接着执行; -
当创建的线程数量达到一定量后,新来的任务我们存起来等待空闲线程执行,这就要求线程池有个 存任务的容器; -
当容器存满后,我们需要增加一些临时的线程来提高处理效率; -
当增加临时线程后依旧处理不了的任务,那就应该将此任务拒绝; -
当所有任务执行完成后,就应该将临时的线程释放掉,以免增加不必要的开销;
2、线程池具体分析
2.1、 JAVA中的线程池是如何设计的
2.1.1、 线程池设计
public class ThreadPoolExecutor extends AbstractExecutorService {//线程池的打包控制状态,用高3位来表示线程池的运行状态,低29位来表示线程池中工作线程的数量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//值为29,用来表示偏移量private static final int COUNT_BITS = Integer.SIZE - 3;//线程池的最大容量private static final int CAPACITY = (1 << COUNT_BITS) - 1;//线程池的运行状态,总共有5个状态,用高3位来表示private static final int RUNNING = -1 << COUNT_BITS; //接受新任务并处理阻塞队列中的任务private static final int SHUTDOWN = 0 << COUNT_BITS; //不接受新任务但会处理阻塞队列中的任务private static final int STOP = 1 << COUNT_BITS; //不会接受新任务,也不会处理阻塞队列中的任务,并且中断正在运行的任务private static final int TIDYING = 2 << COUNT_BITS; //所有任务都已终止, 工作线程数量为0,即将要执行terminated()钩子方法private static final int TERMINATED = 3 << COUNT_BITS; // terminated()方法已经执行结束//任务缓存队列,用来存放等待执行的任务private final BlockingQueue<Runnable> workQueue;//全局锁,对线程池状态等属性修改时需要使用这个锁private final ReentrantLock mainLock = new ReentrantLock();//线程池中工作线程的集合,访问和修改需要持有全局锁private final HashSet<Worker> workers = new HashSet<Worker>();// 终止条件private final Condition termination = mainLock.newCondition();//线程池中曾经出现过的最大线程数private int largestPoolSize;//已完成任务的数量private long completedTaskCount;//线程工厂private volatile ThreadFactory threadFactory;//任务拒绝策略private volatile RejectedExecutionHandler handler;//线程存活时间private volatile long keepAliveTime;//是否允许核心线程超时private volatile boolean allowCoreThreadTimeOut;//核心池大小,若allowCoreThreadTimeOut被设置,核心线程全部空闲超时被回收的情况下会为0private volatile int corePoolSize;//最大池大小,不得超过CAPACITYprivate volatile int maximumPoolSize;//默认的任务拒绝策略private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();//运行权限相关private static final RuntimePermission shutdownPerm =new RuntimePermission("modifyThread");...}
-
提供了线程创建、数量及存活时间等的管理; -
提供了线程池状态流转的管理; -
提供了任务缓存的各种容器; -
提供了多余任务的处理机制; -
提供了简单的统计功能;
2.1.2、线程池构造函数
//构造函数public ThreadPoolExecutor(int corePoolSize, //核心线程数int maximumPoolSize, //最大允许线程数long keepAliveTime, //线程存活时间TimeUnit unit, //存活时间单位BlockingQueue<Runnable> workQueue, //任务缓存队列ThreadFactory threadFactory, //线程工厂RejectedExecutionHandler handler) { //拒绝策略if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
-
构造函数告诉了我们可以怎样去适用线程池,线程池的哪些特性是我们可以控制的;
2.1.3、线程池执行
2.1.3.1、提交任务方法
-
public void execute(Runnable command); -
Future<?> submit(Runnable task); -
Future submit(Runnable task, T result); -
Future submit(Callable task);
public Future> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();//第一步:创建核心线程if (workerCountOf(c) < corePoolSize) { //worker数量小于corePoolSizeif (addWorker(command, true)) //创建workerreturn;c = ctl.get();}//第二步:加入缓存队列if (isRunning(c) && workQueue.offer(command)) { //线程池处于RUNNING状态,将任务加入workQueue任务缓存队列int recheck = ctl.get();if (! isRunning(recheck) && remove(command)) //双重检查,若线程池状态关闭了,移除任务reject(command);else if (workerCountOf(recheck) == 0) //线程池状态正常,但是没有线程了,创建workeraddWorker(null, false);}//第三步:创建临时线程else if (!addWorker(command, false))reject(command);}
-
核心线程数量不足就创建核心线程; -
核心线程满了就加入缓存队列; -
缓存队列满了就增加非核心线程; -
非核心线程也满了就拒绝任务;
2.1.3.2、创建线程
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);//等价于:rs>=SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())//线程池已关闭,并且无需执行缓存队列中的任务,则不创建if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c)) //CAS增加线程数break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}//上面的流程走完,就可以真实开始创建线程了boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask); //这里创建了线程final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w); //这里将线程加入到线程池中int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start(); //添加成功,启动线程workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w); //添加线程失败操作}return workerStarted;}
-
增加线程数; -
创建线程Worker实例加入线程池; -
加入完成开启线程; -
启动失败则回滚增加流程;
2.1.3.3、工作线程的实现
private final class Worker //Worker类是ThreadPoolExecutor的内部类extends AbstractQueuedSynchronizerimplements Runnable{final Thread thread; //持有实际线程Runnable firstTask; //worker所对应的第一个任务,可能为空volatile long completedTasks; //记录执行任务数Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public void run() {runWorker(this); //当前线程调用ThreadPoolExecutor中的runWorker方法,在这里实现的线程复用}...继承AQS,实现了不可重入锁...}
-
此类持有一个工作线程,不断处理拿到的新任务,持有的线程即为可复用的线程; -
此类可看作一个适配类,在run()方法中真实调用runWorker()方法不断获取新任务,完成线程复用;
final void runWorker(Worker w) { //ThreadPoolExecutor中的runWorker方法,在这里实现的线程复用Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true; //标识线程是否异常终止try {while (task != null || (task = getTask()) != null) { //这里会不断从任务队列获取任务并执行w.lock();//线程是否需要中断if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task); //执行任务前的Hook方法,可自定义Throwable thrown = null;try {task.run(); //执行实际的任务} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown); //执行任务后的Hook方法,可自定义}} finally {task = null; //执行完成后,将当前线程中的任务制空,准备执行下一个任务w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly); //线程执行完成后的清理工作}}
-
循环从缓存队列中获取新的任务,直到没有任务为止; -
使用worker持有的线程真实执行任务; -
任务都执行完成后的清理工作;
2.1.3.5、队列中获取待执行任务
private Runnable getTask() {boolean timedOut = false; //标识当前线程是否超时未能获取到task对象for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c)) //若线程存活时间超时,则CAS减去线程数量return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //允许超时回收则阻塞等待workQueue.take(); //不允许则直接获取,没有就返回nullif (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
2.1.3.6、清理工作
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w); //移除执行完成的线程} finally {mainLock.unlock();}tryTerminate(); //每次回收完一个线程后都尝试终止线程池int c = ctl.get();if (runStateLessThan(c, STOP)) { //到这里说明线程池没有终止if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false); //异常终止线程的话,需要在常见一个线程}}
-
真实完成线程池线程的回收; -
调用尝试终止线程池; -
保证线程池正常运行;
2.1.3.7、尝试终止线程池
final void tryTerminate() {for (;;) {int c = ctl.get();//若线程池正在执行、线程池已终止、线程池还需要执行缓存队列中的任务时,返回if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;//执行到这里,线程池为SHUTDOWN且无待执行任务 或 STOP 状态if (workerCountOf(c) != 0) {interruptIdleWorkers(ONLY_ONE); //只中断一个线程return;}//执行到这里,线程池已经没有可用线程了,可以终止了final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //CAS设置线程池终止try {terminated(); //执行钩子方法} finally {ctl.set(ctlOf(TERMINATED, 0)); //这里将线程池设为终态termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}}
-
实际尝试终止线程池; -
终止成功则调用钩子方法,并且将线程池置为终态。
2.2、JAVA线程池总结
2.2.1、主要功能
线程数量及存活时间的管理;
待处理任务的存储功能;
线程复用机制功能;
任务超量的拒绝功能;
2.2.2、扩展功能
-
简单的执行结果统计功能; -
提供线程执行异常处理机制; -
执行前后处理流程自定义; -
提供线程创建方式的自定义;
2.2.3、流程总结
2.3、JAVA线程池使用
public static void main(String[] args) throws Exception {//创建线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(5));//加入4个任务,小于核心线程,应该只有4个核心线程,队列为0for (int i = 0; i < 4; i++) {threadPoolExecutor.submit(new MyRunnable());}System.out.println("worker count = " + threadPoolExecutor.getPoolSize()); //worker count = 4System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 0//再加4个任务,超过核心线程,但是没有超过核心线程 + 缓存队列容量,应该5个核心线程,队列为3for (int i = 0; i < 4; i++) {threadPoolExecutor.submit(new MyRunnable());}System.out.println("worker count = " + threadPoolExecutor.getPoolSize()); //worker count = 5System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 3//再加4个任务,队列满了,应该5个热核心线程,队列5个,非核心线程2个for (int i = 0; i < 4; i++) {threadPoolExecutor.submit(new MyRunnable());}System.out.println("worker count = " + threadPoolExecutor.getPoolSize()); //worker count = 7System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 5//再加4个任务,核心线程满了,应该5个热核心线程,队列5个,非核心线程5个,最后一个拒绝for (int i = 0; i < 4; i++) {try {threadPoolExecutor.submit(new MyRunnable());} catch (Exception e) {e.printStackTrace(); //java.util.concurrent.RejectedExecutionException}}System.out.println("worker count = " + threadPoolExecutor.getPoolSize()); //worker count = 10System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 5System.out.println(threadPoolExecutor.getTaskCount()); //共执行15个任务//执行完成,休眠15秒,非核心线程释放,应该5个核心线程,队列为0Thread.sleep(1500);System.out.println("worker count = " + threadPoolExecutor.getPoolSize()); //worker count = 5System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 0//关闭线程池threadPoolExecutor.shutdown();}
评论
