关于线程池的问题总结

共 5712字,需浏览 12分钟

 ·

2020-07-10 15:30

4c1ee023b8d6570d6211997962396e27.webp

出处:csdn.net/fly910905/article/details/81584675


合理利用线程池能够带来三个好处


    第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

    第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。

    第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。


线程池的主要工作流程


c8545017df24d1985ffc154e1c9af07d.webp


从上图我们可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:


  • 首先线程池判断基本线程池是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。

  • 其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。    

  • 最后线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。


线程池的创建


我们可以通过ThreadPoolExecutor来创建一个线程池。


new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime, milliseconds,runnableTaskQueue, threadFactory,handler);

创建一个线程池需要输入几个参数:


  • corePoolSize - 线程池核心池的大小。

  • maximumPoolSize - 线程池的最大线程数。

  • keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。

  • unit - keepAliveTime 的时间单位。

  • workQueue - 用来储存等待执行任务的队列。

  • threadFactory - 线程工厂。

  • handler - 拒绝策略。


corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。

    

maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。


线程池大小


线程池有两个线程数的设置,一个为核心池线程数(corePoolSize),一个为最大线程数(maximumPoolSize)。

    

在创建了线程池后,默认情况下,线程池中并没有任何线程,等到有任务来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法

    

当创建的线程数等于 corePoolSize 时,会加入设置的阻塞队列。当队列满时,会创建线程执行任务直到线程池中的数量等于maximumPoolSize。


keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。


TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

    

runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。


  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

  • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。

  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。

  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。


ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常有帮助。

   

RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。


  • ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。(默认)

  • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 


当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。


线程池不允许使用Executors去创建


线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。说明:Executors各个方法的弊端:


1)newFixedThreadPool和newSingleThreadExecutor:


主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。


newSingleThreadExecutor


创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。

    

此线程池保证所有任务的执行顺序,按照任务的提交顺序(FIFO, LIFO, 优先级)执行。


public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue()));
    }


newFixedThreadPool


  • 创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。

  • 线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

  • 可控制线程最大并发数,超出的线程会在队列中等待


public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }


2)newCachedThreadPool和newScheduledThreadPool:


主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。


newCachedThreadPool


  • 创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。

  • 此线程池不会对线程池大小做限制

  • 线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。


public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }


newScheduledThreadPool


创建一个定时线程池,支持定时及周期性任务执行


public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
 
 
 public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
 
 
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue)
 
{
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }


线程池创建方式(推荐)


根据阿里巴巴java开发规范,推荐了3种线程池创建方式     


推荐方式1:


首先引入:commons-lang3包


ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
        new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());


推荐方式 2:


首先引入:com.google.guava包


ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
        .setNameFormat("demo-pool-%d").build();
 
    //Common Thread Pool
    ExecutorService pool = new ThreadPoolExecutor(5, 200,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
 
    pool.execute(()-> System.out.println(Thread.currentThread().getName()));
    pool.shutdown();//gracefully shutdown


推荐方式 3:


spring配置线程池方式:自定义线程工厂bean需要实现ThreadFactory,可参考该接口的其它默认实现类,使用方式直接注入bean,调用execute(Runnable task)方法即可


<bean id="userThreadPool"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">

        <property name="corePoolSize" value="10" />
        <property name="maxPoolSize" value="100" />
        <property name="queueCapacity" value="2000" />
 
    <property name="threadFactory" value= threadFactory />
        <property name="rejectedExecutionHandler">
            <ref local="rejectedExecutionHandler" />
        property>
    bean>
    //in code
    userThreadPool.execute(thread);




浏览 19
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报