蚂蚁金服后端一面:Java怎么使用线程池/理解/处理什么问题

Java专栏

共 15416字,需浏览 31分钟

 ·

2021-06-08 19:51

点击关注公众号,Java干货及时送达

真香!24W字的Java面试手册(点击查看)


什么是线程池呢,字面意思,池是一个容器,比如我们之前还学过JDBC的连接池,还有内存池,对象池等等。

线程池就是事先准备一些线程资源,需要的时候拿去用,用完的时候还回来。

在之前有个推文讲了SpringBoot中集成的线程池用法

SpringBoot中的线程池,你真的会用么?

线程池好处

每次创建线程和销毁线程都是消耗一定的资源,将线程统一进行管理,可以大大减少了创建/销毁的次数,而且还能提高响应的速度。另外多个线程去争夺CPU资源时会造成堵塞,线程池可以统一对资源进行分配,提高了线程的稳定性和可控制性。

总结:

  • 每个线程都可以被重复利用,减少了创建和销毁线程的次数。降低资源消耗
  • 灵活的调整线程的数量,方便管理
  • 提高响应速度

在面试中,经常会问到线程池的 四大方法、七个参数、四种拒绝策略

四大方法

即创建线程池的四种方法,如何创建线程池呢,很多人使用Executors来创建线程池,比如

ExecutorService threadPool = Executors.newFixedThreadPool(5);

创建线程有四种方法

  • newFixedThreadPool:创建固定大小的线程池

  • newSingleThreadExecutor:创建只有一个线程的线程池,确保任务串行执行

  • newCachedThreadPool:创建一个不限制线程数的线程池。如果当前线程池的规模超出了处理需求,将回收空的线程;当需求增加时,会增加线程数量;线程池规模无限制。

  • newScheduledThreadPool(不是很常用,适用于特定场景):创建一个固定长度的线程池,而且以延迟或者定时的方式来执行

四个方法的源码如下。

// 1
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

// 2
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(11,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
}

// 3
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

// 4
public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory)
 
{
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

从源码中可以看到除了最后一个newScheduledThreadPool,其余的都是 return 了一个ThreadPoolExecutor对象。

所以,开启线程池的本质就是调用了ThreadPoolExecutor方法

在阿里巴巴开发手册中明确规定:

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这 样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。说明:Executors 返回的线程池对象的弊端如下:

  1. FixedThreadPool 和 SingleThreadPool:

允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

  1. CachedThreadPool:

允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

如果你的IDE安装了阿里巴巴编码规约的插件你会发现会有这样的提示因此创建线程正确的方法是使用ThreadPoolExecutor来手动创建(newScheduledThreadPool除外),这样可以更好的规避资源耗尽的风险。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
 
{
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, handler);
}


public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)
 
{
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

从上面看,ThreadPoolExecutor有7个参数,对应的含义如下:

  • int corePoolSize:核心线程池大小
  • int maximumPoolSize:最大线程池大小
  • long keepAliveTime:线程最大的空闲时间(标记线程空闲多久释放)
  • TimeUnit unit:keepAliveTime的时间单位
  • BlockingQueue wordQueue:阻塞队列
  • ThreadFactory threadFactory:线程的创建工厂,一般不用动
  • RejectedExecutionHandle:拒绝策略

newSingleThreadExecutor(创建只有一个线程的线程池) 默认核心线程池大小是1,最大线程池也是1,而创建固定大小线程池的newFixedThreadPoolnewSingleThreadExecutor基本一样,只不过把线程池大小和最大线程池大小动态化了。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,....);
}

// 2
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(11,..,threadFactory));
}

newCachedThreadPool,核心线程池大小是0,但是默认最大线程池大小是Integer.MAX_VALUE ,这是一个亿级的数值,如果一个服务器上有上亿个线程那消耗的资源必然是非常大的,因此正如前面讲到的阿里开发规范中建议通过ThreadPoolExecutor来手动创建线程,指定相应的参数,避免发生OOM。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());}

举个🌰

咱们去银行办理业务。先抽号,然后在候客区等着叫号,假设有6个窗口,因为今天人不是很多,其中4个窗口关闭了,只开了2个办理业务的窗口

关于核心线程池/ 最大线程池/ 空闲时间 / 阻塞队列 / 拒绝策略 ,下图应该很形象了。

拒绝策略:

查看源代码发现 RejectedExecutionHandler 有四种实现,也就是四种拒绝策略。

四种拒绝策略分别有什么用呢?还是从源码上来看。

AbortPolicy

直接throw 一个异常,不做处理

public static class AbortPolicy implements RejectedExecutionHandler {
 
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
  }
}

CallerRunsPolicy

如果添加线程池失败,就把任务交给主线程去执行。好比排队领妹子,你不让我排队,那我自己去找妹子.

public static class CallerRunsPolicy implements RejectedExecutionHandler {


  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
      r.run();
    }
  }
}

DiscardOldestPolicy

如果阻塞队列满了,就把最开始加入队列的任务移除出去。再尝试加入队列。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
      e.getQueue().poll();
      e.execute(r);
    }
  }
}

DiscardPolicy

不做任何处理,rejectedExecution是一个空方法

public static class DiscardPolicy implements RejectedExecutionHandler {

  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  }
}

ok!每个参数都搞明白了,用代码实现一下上面的去银行办业务的例子

package bilibili;

import java.awt.desktop.AboutHandler;
import java.util.concurrent.*;

public class Pool {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2// 核心线程2
                6// 最大线程6
                3// 最大空闲时间3秒
                TimeUnit.SECONDS, // 时间单位秒
                new LinkedBlockingDeque<>(4), // 阻塞队列,容量4
                Executors.defaultThreadFactory(), // 默认的创建工厂
                new ThreadPoolExecutor.AbortPolicy() //AbortPolicy拒绝策略,抛出异常
        );

        try {
            for (int i = 0; i < 5; i++) {
                threadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("正在执行: " + Thread.currentThread().getName());
                    }
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            threadPool.shutdown();
        }
    }
}

运行结果

正在执行: pool-1-thread-2
正在执行: pool-1-thread-1
正在执行: pool-1-thread-2
正在执行: pool-1-thread-2
正在执行: pool-1-thread-1

可以看到在for循环中i < 5,没有超过最大线程,此时使用的是 2 个核心线程在执行

i < 7,大于总线程数,但是小于总线程+ 阻塞线程(6 + 4)

此时,使用了pool-1-thread-3线程

如果正好10个线程,也就是最大线程数6  + 阻塞线程4。刚好够用,

上面写法过于臃肿,这里使用函数式编程的方法

//其他代码省略
try {
  for (int i = 0; i < 10; i++) {
    threadPool.execute(()-> {
      System.out.println("正在执行: " + Thread.currentThread().getName());
    });
  }
catch (Exception e) {
  e.printStackTrace();
}finally {
  threadPool.shutdown();
}

执行结果:

可以看到线程池中的 6 个线程都被用到了

正在执行: pool-1-thread-4
正在执行: pool-1-thread-4
正在执行: pool-1-thread-4
正在执行: pool-1-thread-4
正在执行: pool-1-thread-4
正在执行: pool-1-thread-1
正在执行: pool-1-thread-2
正在执行: pool-1-thread-5
正在执行: pool-1-thread-6
正在执行: pool-1-thread-3

但是!当循环11次,也就是11个人来办理业务时,因为超过了最大的线程数 + 阻塞线程,就会触发 拒绝策略。

因为在定义的时候使用的是AbortPolicy,因此会抛出异常。

image-20210601163032900

如果此时拒绝策略,改为CallerRunsPolicy, 也就是说多出来的一个人交给主线程去执行。

拒绝策略为DiscardOldestPolicy或者DiscardPolicy时,控制台只会输出10条结果。区别在于DiscardOldestPolicy会把最开始加入队列的任务移除出去,尝试去竞争线程池资源。而DiscardPolicy则将最后多出来的那一个不做任何处理。被干掉了。

本文主要讲了线程池的作用,以及三种创建方法,7个参数,4种拒绝策略。这在面试中是经常问到的。如有不对之处欢迎指出。

如有文章对你有帮助,

欢迎关注❤️、点赞👍、转发📣!



推荐 Java面试手册 
内容包括网络协议、Java基础、进阶、字符串、集合、并发、JVM、数据结构、算法、MySQL、Redis、Mongo、Spring、SpringBoot、MyBatis、SpringCloud、Linux以及各种中间件(Dubbo、Nginx、Zookeeper、MQ、Kafka、ElasticSearch)等等...

点击文末“阅读原文”可直达

浏览 150
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报