手把手教你设计一个任务调度器

共 6588字,需浏览 14分钟

 ·

2021-07-11 11:39

我们从业务现状出发,考虑需要覆盖的场景,设计调度器需要提供的角色。然后确定了调度器的关键接口,同时给出了简单实现。同时,也提到一些个人设计和实现时候的一些思考,比如多叉树和有向无环图。另外,考虑到 Rxjs 比较适合 task 的组织,借鉴 Rxjs 的 API 并将其应用到实践中。希望能给出一些启发,欢迎一起讨论~

现状

以腾讯文档业务中的 预加载流程举例,在进行预加载的过程中,有三个主要的流程,分别为

  1. 数据预加载

  2. 离线数据同步

  3. 预渲染

在代码实现过程中,这三大逻辑竟然写在一个方法里面,代码已经有上百行。三者的依赖图大致如下:

但是随着代码的膨胀,业务逻辑的复杂度指数增加。且维护的同学也不可能只有一个,慢慢地,相互之间就形成这样的情况:

我们项目中的任务调度

我们发现, 预加载整个流程中,很多业务逻辑是可以复用的。

假设有两个业务场景,一个是断网重连,一个是列表页滑动。在断网重连场景下,需要执行的流程为:

在列表页滑动场景下,需要执行的流程为:

不同业务场景下,需要执行的操作有很多相同的地方,比如以上两个场景,都有“检测文档状态”、“检测文档状态”和“预渲染”三个步骤。所以我们将每个场景需要执行的操作拆分为可以复用的细粒度的任务,这样可以减少重复代码。

为了更好地维护管理,减少耦合,我们打算引入一个任务调度器,以便能够做到:

  1. 将业务流程中的各个逻辑合理拆分,使其粒度更细,便于复用。

  2. 拆分后的子任务,之间的依赖顺序可以自由配置,灵活组配,适应更多的业务场景。

引入 task

我们首先将业务逻辑进行拆解,将业务逻辑的最小拆分单元,描述为 task,task 是调度器能处理的最小单元。

如何描述 task 之间的执行顺序

将业务逻辑拆分为一个个细粒度的 task 之后,如何控制 task 之间的执行顺序,如何使多个 task 灵活地并行和串行,是设计时遇到的一个难题。

我们想到的可能的场景有如下几个。

首先,最简单的串行任务。如图,Task1、Task2 以及 Task3 串行执行,后面的 task 需要前面的返回值:

其次,就是并行任务。如图,Task1 执行完之后,需要执行后续三个任务,这三个任务是并行执行的:

然后,还需要覆盖到有条件的分支流程。如图,Task1 执行完成在满足一定条件后,才能依次执行 Task2 和 Task3 。若不满足,则什么都不执行:

也有其他的条件分支流程,比如 Task1 执行完成在满足一定条件后,才能执行 Task2 。若不满足,则需要并行执行 Task3 和 Task4。如图:

最后,还有最复杂的情况,就是串行和并行交织在一起,再加上条件控制比较复杂的情况。举例如下:

设计思路

如何根据以上需要满足的场景,去设计我们调度器的架构呢?

一个任务完成后,可能接下来需要执行一个或者三个后续的流程。所以第一时间想到了多叉树。且是有向的多叉树。

有向多叉树

最初将这种结构抽象为 有向多叉树,可以满足很多场景。比如:

但是 有向多叉树不能表示这种:

有向无环图设计

我们发现 这种数据结构可能并不能满足需求,我们就想到图。又因为这种图,好像有一种流向,貌似可以进行拓扑排序。所以我们尝试用 有向无环图这种数据结构。

这样一来,我们的数据结构抽象为 DAG (有向无环图),问题也就变成了,如何构建 DAG ,并操作 DAG 。

其他角色

为了更好地实现任务调度器,除了引入 task 之外, 我们还引入 Job 和 Scheduler 的概念。

task

我们将业务逻辑的最小拆分单元,描述为 task。task 是调度器能处理的最小单元,Scheduler 管理 Job,不直接管理 task ,task 由其所在的 Job 管理。

Job

Job 用来管理一组互相关联的 task,成为一个有意义的作业,即一个 Job 由一个或多个 task 组成。

BasicJob 是 Job 基类,业务的 Job 需要继承并实现其抽象方法。

Scheduler

Scheduler 用来管理调度向其添加的 Job, 可以恢复和暂停自身的执行。

整体结构

上面介绍了如何描述 task 之间的执行顺序,具体的业务逻辑由 task 来执行,那如何管理 task 呢?介绍角色的时候提到,Job 用来管理一组互相关联的 task,成为一个有意义的 作业。且用 Scheduler 来管理调度向其添加的 Job 。

整体的架构如下:

实现分析

接口定义

task 的接口

我们由底层向高层逐渐来分析。上文提到,task 是最基本的运行单位,是最细粒度的拆分单元。那我们该如何规范 task 呢?

我们希望业务方可以将自己的业务逻辑抽象封装为一个 task ,而封装好的 task 能够被我们调度器所识别,并能被调度器正确处理。所以 task 需要实现我们自定义的接口,接口 ITask 规范 task 应该具备怎么样的能力,应该怎么样被实现。

下面看下:

interface ITask {  /**   * 执行 task 的具体逻辑   *   * @return {*}  {Promise<ITaskResult<Result>>} task 的返回结果   */  run(previousTaskResult: ITaskResult): Promise<ITaskResult>;
/** * 当 task 内部执行失败时,应做的后续处理。 * 比如回滚 task 的执行 * 比如仅仅上报日志 * onError 是可选的,可以不实现 */ onError?(error: Error): void;}

ITask 提供一个 run 方法,用来执行具体的业务逻辑。它接受上一个任务的执行结果,并按照规范返回自己的处理结果。

当 task 内部执行失败时,需要做一些注入回滚或者上报之类的异常处理逻辑。所以提供 onError 方法。

job 的接口

Job 用来管理 task,需要具备两大能力——

  1. Job 需要能添加要处理的 task

  2. Job 需要要能指定 task 的执行顺序

JobScheduler 的接口

JobScheduler 用来管理 Job, 由于我们赋予 task 灵活的组织方式,可以并行,可以串行,同时可以指定条件分支。用来适应各种业务场景。

我们发现 Job 之间不需要太复杂的组织方式,简单地串行即可。所以 JobScheduler 相对简单,能管理 Job 进行串行执行即可。另外, JobScheduler 需要具备暂停当前调度器执行的能力,相应地,也需要具备恢复执行的能力。

于是,我们简单定义 IJobScheduler 为:

interface IJobScheduler {  /**   * 添加 job 以便调度   * 如果当前没有 job 正在被执行,且该 JobSchedule 没有被 pause,则立即执行   * 否则,只是放在队列,以便后续执行   */  add(job: BasicJob): void;
/** * 暂停 JobScheduler 的调度行为,当前正在执行的 Job 不受影响 * 增加 key 是为了如果有多个原因要 pause ,那么需要所有的原因都 * 可以 resume 的时候,才能真正 resume * * @param {string} [key] 标识当前暂停原因的 key */ pause(key?: string): string;
/** * 恢复 JobSchedule 的执行 * * @param {string} key pause 返回的 key * @return {*} {boolean} resume 是否成功 */ resume(key: string): boolean;}

实现

接口 ITask 由业务方去执行即可,相对简单。调度器管理的 ITask 接口,不关心具体的实现。

BasicJob

Job 管理 task,我们需要实现其两大能力——

  1. Job 需要能添加要处理的 task

  2. Job 需要要能指定 task 的执行顺序

这里可以用构建和操作 有向无环图的方式实现。最初设想也是将问题转化为

  1. 将 task 的执行依赖抽象为 有向无环图的构建

  2. Job 的执行则是对 有向无环图进行遍历。

后面参考了 Rxjs 的 API 和 使用方式,我们决定使用 Rxjs 来实现 Job 。

变换式编程思考

为什么使用 Rxjs ?

所有程序其实都是对数据的一种变换——将输入转换为输出。然而,当我们在构思设计时,很少考虑创建变换过程。相反,我们关心的是类和模块、数据结构和算法、语言和框架。

我们认为,从这个角度关注代码往往忽略了要点——我们需要重新将程序视为输入到输出的一个变换。当这样做的时候,许多以前操心的细节就消失了。结构变得更清晰,错误处理更加一致,耦合下降了很多。

如果你不能将正在做的事情描述为一个流程,那表示你不知道自己在做什么。

————《程序员修炼之道》

而 Rxjs 做这件事就特别合适。其中主要用到了 Rxjs 中的 API 有:

  • pipe

  • concatMap

  • forkJoin

  • iif

我们可以将 task 的执行流程抽象为一个数据管道,首先需要创建一个数据流的起始点:

private source: Observable<TaskResultType>;
// defaultData 可以是初始化的数据,也可以是外界传入的配置信息private initSource(defaultData?: unknown): void { /** @type {ITaskResult} task 添加前的起始值 */ const startPoint: ITaskResult = { status: TASK_STATUS.success, data: defaultData, }; // 创建 Observable this.source = of(startPoint);}

这里的 source 是 Rxjs 中提供的 Observable 类型对象,即可以被观察的,相当于一个生产者。

另外这里也用到了 Rxjs 中提供的 of 操作符,简单说就是将数据转化为 Observable 类型对象。那如何添加并组织 task 呢?这里提供了三个方法:

/** * 添加多个 task ,串行执行 */public serialNext(tasks: ITask[]): void {}
/** * 添加多个 task ,并行执行 */public parallellNext(tasks: ITask[]): void {}
/** * 条件操作。 * 第一个参数是一个 条件函数,返回 true false,以此来决定走哪个子 Job */public iif(condition: (previousResult: unknown) => boolean, trueSource: BasicJob, falseSource?: BasicJob):void {}

JobScheduler

先分析下我们的构造函数:

public constructor(queue: IQueue = new FIFOQueue()) {}

这个 FIFOQueue为啥不直接使用数组在 JobScheduler实现呢?

其实, FIFOQueue的内部实现就是数组,代码不过十行左右。这样做,是为了将 queuejob-schedule解耦开,而 queue是可以依赖注入的,可以自定义实现的。简单说,就是不希望 queue的实现细节在 job-schedule中, job-schedule只存在 queue的接口操作。

再看 JobSchedulerpause()resume()方法,设计的时候考虑到,这里不能是简单的,调用 pause()的时候暂停执行,调用 resume()就恢复执行这么简单。

设想一种业务场景,如果有多个原因要暂停调度器的执行,那么可能有一个业务模块内的多处都执行了 pause()。那么当其中一一处需要恢复执行的时候,即某一个暂停执行的原因不再成立的时候,这时调用了 resume()的时候,调度器是否应该立即响应,去恢复执行呢?

要不要恢复执行,就看此时是否真的可以恢复调度器的执行。如果业务方只在调度器可以恢复执行的时候,才真正恢复。也就是所以暂停执行的原因都不再成立的时候,才执行 resume()。那么这样的话,调度器就把恢复和暂停的工作交给业务方来确保。

这样的弊端是:

  1. 增加了业务方的处理负担,需要关注调度器内部的执行状态。

  2. 当需要调用 resume()的时候,需要考虑到其他业务是否受当前执行状态的影响。

  3. debug 的时候也难以追踪,很难调试。

  4. 维护成本很高,需要知道当前模块下所有使用调度器的业务。

于是,我们将这一工作收敛到调度器内部。业务只关心自己应该暂停还是恢复,不需要关心其他业务会不会受影响。那么当调用 resume()的时候,可能并不会真正地开始恢复执行。

具体实现如下:

/** * 暂停 JobScheduler 的调度行为,当前正在执行的 Job 不受影响 * 增加 key 是为了如果有多个原因要 pause ,那么需要所有的原因都 * 可以 resume 的时候,才能真正 resume * * @param {string} [key] 标识当前暂停原因的 key */public pause(key?: string): string {  // 如果不传 key ,则生成一个随机字符串作为 key  const keyNotNull = key ?? this.generatePauseKey();  this.pausedKeys.add(keyNotNull);
return keyNotNull;}/** * 恢复 JobSchedule 的执行 * * @param {string} key pause 返回的 key * @return {*} {boolean} resume 是否成功 */public resume(key: string): boolean { // 如果 key 存在于 set 中,返回true,并成功移除。否则,返回 false const isContainKey = this.pausedKeys.delete(key);
// 如果移除后 pausedKeys 为空,则可以 resume if (this.pausedKeys.size === 0) { this.executeNextJob(); } return isContainKey;}

其他特性

除了上面介绍的,我们的调度器还支持

  • 指定任务的优先级

  • 任务被处理的各个时机回调

  • 超时处理

  • 异常处理

  • ...

总结

我们从业务现状出发,考虑需要覆盖的场景,设计调度器需要提供的角色。然后确定了调度器的关键接口,同时给出了简单实现。同时,也提到一些个人设计和实现时候的一些思考,比如多叉树和有向无环图。

另外,考虑到 Rxjs 比较适合 task 的组织,借鉴 Rxjs 的 API 并将其应用到实践中。希望能给出一些启发,欢迎一起讨论~

参考文章

  • 实现一个异步并发调度器

  • Rxjs 官网

  • learnrxjs

  • bullmq



内推社群


我组建了一个氛围特别好的腾讯内推社群,如果你对加入腾讯感兴趣的话(后续有计划也可以),我们可以一起进行面试相关的答疑、聊聊面试的故事、并且在你准备好的时候随时帮你内推。下方加 winty 好友回复「面试」即可。


浏览 34
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报