动态调整线程池参数实践

老周聊架构

共 19450字,需浏览 39分钟

 ·

2021-05-02 11:03

点击上方老周聊架构关注我


一、线程池遇到的挑战

我们上一篇 《一文读懂线程池的实现原理 》已经从线程池如何维护自身状态、线程池如何管理任务、线程池如何管理线程三个维度来深入剖析线程池的底层原理与源码剖析,这让我们对线程池的原理有了较为深入的理解。这对我们多线程编程有很大的帮助,但在使用线程池时还是会面临几个棘手的问题。

  • 开发人员个人经验与水平参差不齐,配置线程池参数都是按照自己想法来,没有统一的一个配置标准。

  • 线程池执行情况与任务类型相关性较大,IO 密集型和 CPU 密集型的任务运行起来的情况差异非常大。

  • 当你配置好线程池后,有的时间段流量高峰期,导致线程池忙不过来;有的时间段流量低峰期,线程池比较空闲。这就会导致资源调度失衡,降低了系统的稳定性。

我们先来看下美团调研的业界一些线程池参数配置方案:


  • 第一种方案是出自《Java并发编程实践》,显然和业务场景有所偏离。

  • 第二种方案也不太合理,为什么呢?我们一个项目里一般来说不止一个自定义线程池吧?比如有专门处理数据异步持久化的线程池,有专门处理查询请求的线程池,这样去做一个简单的线程隔离。但是如果都用这样的参数配置的话,显然是不合理的。

  • 第三种方案虽然考虑到了业务场景,但这是理想状态。流量是不可能这么均衡的,就拿美团来说,下午 3、4 点的流量,能和 12 点左右午饭时的流量比吗?

基于上面线程池的几个痛点,那有没有好的解决方案呢?有的,那就是动态调整线程池参数。

尽管业界没有一些成熟的经验配置策略,那么我们是不是可以从修改线程池参数的成本入手?毕竟每次线上线程池故障的话,都得修改代码里的线程池相应的参数,然后再部署上线,这个过程在对可用性要求极高的项目中那是极其慢的,可能给公司造成巨大的损失。

既然改代码里的线程池相应的参数并上线这个过程慢,那我们是不是可以把相应的线程池参数配置到分布式配置中心上去?实现线程池参数可动态配置和即时生效,线程池参数动态化前后的参数修改流程对比如下:


二、动态化线程池

2.1 整体设计

动态化线程池的核心设计包括以下三个方面:

2.1.1 简化线程池配置

线程池构造参数有 7 个,但是最核心的是 3 个:corePoolSize、maximumPoolSize、workQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。考虑到在实际应用中我们获取并发性的场景主要是两种:

  • 并行执行子任务,提高响应速度。这种情况下,应该使用同步队列,没有什么任务应该被缓存下来,而是应该立即执行。

  • 并行执行大批次任务,提升吞吐量。这种情况下,应该使用有界队列,使用队列去缓冲大批量的任务,队列容量必须声明,防止任务无限制堆积。

所以线程池只需要提供这三个关键参数的配置,并且提供两种队列的选择,就可以满足绝大多数的业务需求。

2.1.2 参数可动态修改

为了解决参数不好配,修改参数成本高等问题。在 Java 线程池留有高扩展性的基础上,封装线程池,允许线程池监听同步外部的消息,根据消息进行修改配置。将线程池的配置放置在平台侧,允许开发同学简单的查看、修改线程池配置。

2.1.3 增加线程池监控

对某事物缺乏状态的观测,就对其改进无从下手。在线程池执行任务的生命周期添加监控能力,帮助开发同学了解线程池状态。



2.2 功能架构

动态化线程池提供如下功能:

  • 动态调参:支持线程池参数动态调整、界面化操作;包括修改线程池核心大小、最大核心大小、队列长度等;参数修改后及时生效。

  • 任务监控:支持应用粒度、线程池粒度、任务粒度的 Transaction 监控;可以看到线程池的任务执行情况、最大任务执行时间、平均任务执行时间、P95/P99线等。

  • 负载告警:支持告警规则配置,当超过阈值时(线程池队列任务积压到一定值、线程池负载数达到一定阈值)会通知相关的开发负责人。

  • 操作监控:创建、修改和删除线程池都会通知到应用的开发负责人。

  • 操作日志:可以查看线程池参数的修改记录,谁在什么时候修改了线程池参数、修改前的参数值是什么。

  • 权限校验:只有应用开发负责人才能够修改应用的线程池参数。


三、Apollo 分布式配置中心

了解完动态化线程池的整体设计与功能架构后,我相信你也可以设计出一款动态线程池组件出来的。下面跟着老周来实践一下动态调整线程池参数,可能不像上面设计的那样那么全面,但会把动态调整线程池参数的核心给实现一下。

不难发现动态化线程池的核心是配置管理,那我们就得找一个分布式配置中心,这里老周用的 Apollo,还有其它的像 Spring Cloud Config、disconf、某些大型互联网公司自研的分布式配置中心等,根据自己的项目情况以及使用场景来选择就行。

3.1 Apollo 总体设计

Apollo(阿波罗)是携程框架部门研发的分布式配置中心,能够集中化管理应用不同环境、不同集群的配置,配置修改后能够实时推送到应用端,并且具备规范的权限、流程治理等特性,适用于微服务配置管理场景。

3.1.1 基础模型

如下图即是 Apollo 的基础模型:

  1. 用户在配置中心对配置进行修改并发布

  2. 配置中心通知 Apollo 客户端有配置更新

  3. Apollo 客户端从配置中心拉取最新的配置、更新本地配置并通知到应用



3.1.2 架构模块




上图简要描述了 Apollo 的总体设计,我们可以从下往上看:

  • Config Service 提供配置的读取、推送等功能,服务对象是 Apollo 客户端

  • Admin Service 提供配置的修改、发布等功能,服务对象是 Apollo Portal(管理界面)

  • Config Service 和 Admin Service 都是多实例、无状态部署,所以需要将自己注册到 Eureka 中并保持心跳

  • 在 Eureka 之上我们架了一层 Meta Server 用于封装 Eureka 的服务发现接口

  • Client 通过域名访问 Meta Server 获取 Config Service 服务列表(IP+Port),而后直接通过 IP+Port 访问服务,同时在 Client 侧会做 load balance、错误重试

  • Portal 通过域名访问 Meta Server 获取 Admin Service 服务列表(IP+Port),而后直接通过 IP+Port 访问服务,同时在 Portal 侧会做 load balance、错误重试

  • 为了简化部署,我们实际上会把 Config Service、Eureka 和 Meta Server 三个逻辑角色部署在同一个 JVM 进程中

3.2 服务端设计

3.2.1 配置发布后的实时推送设计

在配置中心中,一个重要的功能就是配置发布后实时推送到客户端。本文重点分析配置更新推送方式,下面我们简要看一下这块是怎么设计实现的。



上图简要描述了配置发布的大致过程:

  1. 用户在 Portal 操作配置发布

  2. Portal 调用 Admin Service 的接口操作发布

  3. Admin Service 发布配置后,发送 ReleaseMessage 给各个 Config Service

  4. Config Service 收到 ReleaseMessage 后,通知对应的客户端

上图的发送 ReleaseMessage 的实现方式详情请往下继续看:

Admin Service 在配置发布后,需要通知所有的 Config Service 有配置发布,从而 Config Service 可以通知对应的客户端来拉取最新的配置。

从概念上来看,这是一个典型的消息使用场景,Admin Service 作为 producer 发出消息,各个 Config Service 作为 consumer 消费消息。通过一个消息组件(Message Queue)就能很好的实现 Admin Service 和 Config Service 的解耦。

在实现上,考虑到 Apollo 的实际使用场景,以及为了尽可能减少外部依赖,我们没有采用外部的消息中间件,而是通过数据库实现了一个简单的消息队列。



3.3 客户端设计




上图简要描述了 Apollo 客户端的实现原理:

  1. 客户端和服务端保持了一个长连接,从而能第一时间获得配置更新的推送。(通过 Http Long Polling 实现)

  2. 客户端还会定时从 Apollo 配置中心服务端拉取应用的最新配置。

  3. 客户端从 Apollo 配置中心服务端获取到应用的最新配置后,会保存在内存中。

  4. 客户端会把从服务端获取到的配置在本地文件系统缓存一份。

  5. 应用程序可以从 Apollo 客户端获取最新的配置、订阅配置更新通知。

我们从基础模型、服务端设计、客户端设计三个维度来分析了 Apollo 总体设计,相信你对 Apollo 分布式配置中心有了全面且清晰的理解了。为了照顾没有用过 Apollo 这款分布式配置中心的同学,老周这里还是简单给个 Apollo 开发样例演示,希望对你后面的动态调整线程池参数实践有所帮助。

四、动态调整线程池参数实践

我们了解原理以及架构后,那我们开始实践了。

4.1 服务端安装

请看官方文档进行相应的安装:https://ctripcorp.github.io/apollo/#/zh/deployment/quick-start

执行启动脚本后,当看到如下输出后,就说明启动成功了!



启动成功后访问地址:http://localhost:8070



默认输入用户名:apollo、密码:admin,进行登录。



点击 SampleApp,我们看到在 DEV 环境包含一个 timeout 配置项,100 是这个配置项的值,下面我们在应用程序读取这个配置项:



4.2 应用程序

4.2.1 引入依赖

<dependencies>
    <dependency>
        <groupId>com.ctrip.framework.apollo</groupId>
        <artifactId>apollo-client</artifactId>
        <version>1.7.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.25</version>
        <scope>compile</scope>
    </dependency>
</dependencies>    

4.2.2 样例测试

/**
 * 动态获取Apollo配置
 * 注意要配置:-Dapp.id=myApp -Denv=DEV -Dapollo.cluster=default -Ddev_meta=http://localhost:8080
 *
 * @author 微信公众号【老周聊架构】
 */

public class GetApolloConfigTest {
    public static void main(String[] args) throws InterruptedException {
        Config config = ConfigService.getAppConfig();
        config.addChangeListener(new ConfigChangeListener() {
            @Override
            public void onChange(ConfigChangeEvent changeEvent) {
                System.out.println("Changes for namespace " + changeEvent.getNamespace());
                for (String key : changeEvent.changedKeys()) {
                    ConfigChange change = changeEvent.getChange(key);
                    System.out.println(String.format("Found change - key: %s, oldValue: %s, newValue: %s, changeType: %s", change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType()));
                }
            }
        });
        Thread.sleep(1000000L);
    }
}

我们现在把配置项默认的值 100 改为 200 程序输出结果如下:



控制台会出现以下日志,表明动态获取 Apollo 配置成功了。


4.3 动态线程池

上面我们把 Apollo 的动态监听修改配置的功能整明白了以后,再把线程池和 Apollo 结合起来构建动态线程池那就方便多了。首先我们用默认值构建一个线程池,然后线程池会监听 Apollo 关于相关配置项,如果相关配置有变化则刷新相关参数。


代码演示:

/**
 * 动态线程池工厂
 *
 * @author 微信公众号【老周聊架构】
 */

@Slf4j
@Component
public class DynamicThreadPoolFactory {
    /** 这里是你的namespace,我这里是默认的application **/
    private static final String NAME_SPACE = "application";

    /** 线程执行器 **/
    private volatile ThreadPoolExecutor executor;

    /** 核心线程数 **/
    private Integer corePoolSize = 10;

    /** 最大值线程数 **/
    private Integer maximumPoolSize = 20;

    /** 待执行任务的队列的长度 **/
    private Integer workQueueSize = 1000;

    /** 线程空闲时间 **/
    private Long keepAliveTime = 1000L;

    /** 线程名 **/
    private String threadName;

    public DynamicThreadPoolFactory() {
        Config config = ConfigService.getConfig(NAME_SPACE);
        init(config);
        listen(config);
    }

    /**
     * 初始化
     */

    private void init(Config config) {
        if (executor == null) {
            synchronized (DynamicThreadPoolFactory.class) {
                if (executor == null) {
                    String corePoolSizeProperty = config.getProperty(ParamsEnum.CORE_POOL_SIZE.getParam(), corePoolSize.toString());
                    String maximumPoolSizeProperty = config.getProperty(ParamsEnum.MAXIMUM_POOL_SIZE.getParam(), maximumPoolSize.toString());
                    String keepAliveTImeProperty = config.getProperty(ParamsEnum.KEEP_ALIVE_TIME.getParam(), keepAliveTime.toString());
                    BlockingQueue<Runnable> workQueueProperty = new LinkedBlockingQueue<>(workQueueSize);
                    executor = new ThreadPoolExecutor(Integer.valueOf(corePoolSizeProperty), Integer.valueOf(maximumPoolSizeProperty),
                            Long.valueOf(keepAliveTImeProperty), TimeUnit.MILLISECONDS, workQueueProperty);
                }
            }
        }
    }

    /**
     * 监听器
     */

    private void listen(Config config) {
        config.addChangeListener(new ConfigChangeListener() {
            @Override
            public void onChange(ConfigChangeEvent changeEvent) {
                log.info("命名空间发生变化={}", changeEvent.getNamespace());
                for (String key : changeEvent.changedKeys()) {
                    ConfigChange change = changeEvent.getChange(key);
                    String newValue = change.getNewValue();
                    refreshThreadPool(key, newValue);
                    log.info("发生变化key={},oldValue={},newValue={},changeType={}", change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType());
                }
            }
        });
    }

    /**
     * 刷新线程池
     */

    private void refreshThreadPool(String key, String newValue) {
        if (executor == null) {
            return;
        }
        if (ParamsEnum.CORE_POOL_SIZE.getParam().equals(key)) {
            executor.setCorePoolSize(Integer.valueOf(newValue));
            log.info("修改核心线程数key={},value={}", key, newValue);
        }
        if (ParamsEnum.MAXIMUM_POOL_SIZE.getParam().equals(key)) {
            executor.setMaximumPoolSize(Integer.valueOf(newValue));
            log.info("修改最大线程数key={},value={}", key, newValue);
        }
        if (ParamsEnum.KEEP_ALIVE_TIME.getParam().equals(key)) {
            executor.setKeepAliveTime(Integer.valueOf(newValue), TimeUnit.MILLISECONDS);
            log.info("修改线程空闲时间key={},value={}", key, newValue);
        }
    }

    public ThreadPoolExecutor getExecutor(String threadName) {
        return executor;
    }
}
@AllArgsConstructor
public enum ParamsEnum {
    CORE_POOL_SIZE("corePoolSize""核心线程数"),
    MAXIMUM_POOL_SIZE("maximumPoolSize""最大线程数"),
    KEEP_ALIVE_TIME("keepAliveTime""线程空闲时间"),
    ;

    @Getter
    private String param;

    @Getter
    private String desc;
}
/**
 * 动态线程池执行器
 *  
 * @author 微信公众号【老周聊架构】
 */

@Component
public class DynamicThreadExecutor {
    @Resource
    private DynamicThreadPoolFactory threadPoolFactory;

    public void execute(String bizName, Runnable job) {
        threadPoolFactory.getExecutor(bizName).execute(job);
    }

    public Future<?> sumbit(String bizName, Runnable job) {
        return threadPoolFactory.getExecutor(bizName).submit(job);
    }
}
@Slf4j
public class DynamicThreadPoolExecutorTest {
    @Resource
    private DynamicThreadExecutor dynamicThreadExecutor;

    /**
     * 记得 IDEA VM options 要记得加下面的参数
     * -Dapp.id=SampleApp -Denv=DEV -Dapollo.meta=http://localhost:8080
     */

    @Test
    public void testExecute() throws InterruptedException {
        while (true) {
            dynamicThreadExecutor.execute("bizName"new Runnable() {
                @Override
                public void run() {
                    System.out.println("bizInfo");
                }
            });
            TimeUnit.SECONDS.sleep(1);
        }
    }
}

这里可以通过 JDK 自带的 JVisualVM 工具可以查看到相应的线程使用情况。

我们在配置中心修改配置项把核心线程数设置为 50,最大线程数设置为 100:



你会观察到线程数显著上升

这里还可以在代码中通过打印相应的线程状态,更加直观的从日志上观察到核心线程、最大线程数的修改情况。

private static void threadPoolStatus(ThreadPoolExecutor executor, String name) {
    BlockingQueue<Runnable> queue = executor.getQueue();
    System.out.println(Thread.currentThread().getName() + "-" + name + "-:" +
            "核心线程数:" + executor.getCorePoolSize() +
            " 活动线程数:" + executor.getActiveCount() +
            " 最大线程数:" + executor.getMaximumPoolSize() +
            " 线程池活跃度:" + divide(executor.getActiveCount(), executor.getMaximumPoolSize()) +
            " 任务完成数:" + executor.getCompletedTaskCount() +
            " 队列大小:" + (queue.size() + queue.remainingCapacity()) +
            " 当前排队线程数:" + queue.size() +
            " 队列剩余大小:" + queue.remainingCapacity() +
            " 队列使用度:" + divide(queue.size(), queue.size() + queue.remainingCapacity()));
}

这样的话就可以实现动态调整线程池参数,这就很好的解决了我们线程池现有的痛点,不至于线上出了问题还得改代码部署那么漫长的修复时间了,动态线程池大大简化了运维以及开发快速修复相关问题的难度。



欢迎大家关注我的公众号【老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。

喜欢的话,点赞、再看、分享三连。

点个在看你最好看



浏览 72
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报