提效7倍,Apache Spark 自适应查询优化在网易的深度实践及改进
前言
自适应查询优化 (Adaptive Query Execution, AQE) 是 Spark 3.0 版本引入的重大特性之一,可以在运行时动态的优化用户的 SQL 执行计划,很大程度上提高了 Spark 作业的性能和稳定性。AQE 包含动态分区合并、Join 数据倾斜自动优化、动态 Join 策略选择等多个子特性,这些特性可以让用户省去很多需要根据作业负载逐个手动调优,甚至修改业务逻辑的痛苦过程,极大的提升了 Spark 自身的易用性和灵活性。
作为网易大数据基础软件的缔造者,网易数帆旗下网易有数团队自 AQE 诞生起就关注其应用。第一个应用 AQE 的系统是 Kyuubi。Kyuubi 是网易开源的一款企业级数据湖探索平台,它基于 Spark SQL 实现了多租户 SQL on Hadoop 查询引擎。在网易内部,基于 Kyuubi 的 C/S 架构,在保证 SQL 兼容性的前提下,服务端可以平滑地实现 Spark 版本升级,将社区和内部的最新优化和增强快速赋能用户。从 Spark 3.0.2 开始,网易有数就在生产环境中逐步试用和推广 AQE 的特性。而在 Spark 3.1.1 发布后,AQE 在 Kyuubi 生产环境中已经是用户默认的执行方式。在这个过程中,我们还端到端地帮助某个业务迁移了 1500+ Hive 历史任务到 Spark 3.1.1 上,不仅实现了资源量减半,更将总执行时间缩短了 70% 以上,综合来看执行性能提升 7 倍多。
当然,AQE 作为一个“新”特性,在实践过程中我们也发现它在很多方面不尽如人意,还有很大的优化空间。秉着坚持开源策略,网易有数努力将团队遇到的问题和 Spark 社区分享,将我们的优化努力合进社区。以下章节,我们将展开介绍这半年多来 AQE 特性在网易的实践经验和优化改进。
AQE的设计思路
不同于传统以整个执行计划为粒度进行调度的方式,AQE 会把执行计划基于 shuffle 划分成若干个子计划,每个子计划用一个新的叶子节点包裹起来,从而使得执行计划的调度粒度细化到 stage 级别 (stage 也是基于 shuffle 划分)。这样拆解后,AQE 就可以在某个子执行计划完成后获取到其 shuffle 的统计数据,并基于这些统计数据再对下一个子计划动态优化。
图片来自 databricks 博客
有了这个调度流程之后,AQE 才可能有接下来的优化策略,从宏观上来看 AQE 优化执行计划的策略有两种:一是动态修改执行计划;二是动态生成 shuffle reader。
2.1 动态修改执行计划
动态修改执行计划包括两个部分:对其逻辑计划重新优化,以及生成新的物理执行计划。我们知道一般的 SQL 执行流程是,逻辑执行计划 -> 物理执行计划,而 AQE 的执行逻辑是,子物理执行计划 -> 父逻辑执行计划 -> 父物理执行计划,这样的执行流程提供了更多优化的空间。比如在对 Join 算子选择执行方式的时候可能有原来的 Sort Merge Join 优化为 Broadcast Hash Join。执行计划层面看起来是这样:
2.2 动态生成 Shuffle Reader
先明确一个简单的概念 map 负责写 shuffle 数据,reduce 负责读取 shuffle 数据。而 shuffle reader 可以理解为在 reduce 里负责拉 shuffle 数据的工具。标准的 shuffle reader 会根据预设定的分区数量 (也就是我们经常改的 spark.sql.shuffle.partitions),在每个 reduce 内拉取分配给它的 shuffle 数据。而动态生成的 shuffle reader 会根据运行时的 shuffle 统计数据来决定 reduce 的数量。下面举两个例子,分区合并和 Join 动态优化。
(1)分区合并是一个通用的优化,其思路是将多个读取 shuffle 数据量少的 reduce 合并到 1 个 reduce。假如有一个极端情况,shuffle 的数据量只有几十 KB,但是分区数声明了几千,那么这个任务就会极大的浪费调度资源。在这个背景下,AQE 在跑完 map 后,会感知到这个情况,然后动态的合并 reduce 的数量,而在这个 case 下 reduce 的数量就会合并为 1。这样优化后可以极大的节省 reduce 数量,并提高 reduce 吞吐量。
(2)Join 倾斜优化相对于分区合并,Join 倾斜优化则只专注于 Join 的场景。如果我们 Join 的某个 key 存在倾斜,那么对应到 Spark 中就会出现某个 reduce 的分区出现倾斜。在这个背景下,AQE 在跑完 map 后,会预统计每个 reduce 读取到的 shuffle 数据量,然后把数据量大的 reduce 分区做切割,也就是把原本由 1 个 reduce 读取的 shuffle 数据改为 n 个 reduce 读取。这样处理后就保证了每个 reduce 处理的数据量是一致的,从而解决数据倾斜问题。
AQE 优化规则实现都是非常巧妙的,其他更多优化细节就不展开了,推荐阅读 Kyuubi 与 AQE。
社区原生AQE的问题
网易在 AQE 上的改进
SPARK-35239,这个 issue 可以描述为当输入的 RDD 分区是空的时候无法对其 shuffle 的分区合并。看起来影响并不大,如果是空表的话那么就算空跑一些任务也是非常快的。但是在 Add hoc 场景下,默认的 spark.sql.shuffle.partitions 配置调整很大,这就会造成严重的 task 资源浪费,并且加重 Driver 的负担
SPARK-34899,当我们发现某些 shuffle 分区在被 AQE 的分区合并规则成功优化后,分区数居然没有下降,一度怀疑是没有找到正确使用 AQE 的姿势
SPARK-35168,一些 Hive 转过来的同学可能会遇到的 issue,理论上 MapReduce 中 reduce 的数量等价于 Spark 的 shuffle 分区数,所以 Spark 做了一些配置映射。但是在映射中出现了 bug 这肯定是不能容忍的。
4.3 内部优化(已开源)
除了和社区保持交流之外,网易数帆也做了许多基于 AQE 的优化,这些优化都在我们的开源项目 Kyuubi 里。
支持复杂场景下 Join 倾斜优化
社区版本对 AQE 的优化比较谨慎,只对标准的 Sort Merge Join 做了倾斜优化,也就是每个 Join 下的子算子必须包含 Sort 和 Shuffle,这个策略极大的限制了 Join 倾斜优化的覆盖率。举例来说,有一个执行计划先 Aggregate 再 Join,并且这两个算子之间没有出现 shuffle。我们可以猜到,在没有 AQE 的介入下,Aggregate 和 Join 之间的 shuffle 被剪枝了,这是一种常见的优化策略,一般是由于 Aggregate 的 key 和 Join 的 key 存在重复引起的。但是由于没有击中规则,AQE 无法优化这个场景的 Join。有一些可以绕过去的方法,比如手动在 Aggregate 和 Join 之间插入一个 shuffle,得到的执行计划长这样子:
我们在这种思路下,以增加规则的方式可以在不入侵 AQE 代码的前提下,自动增加 shuffle 来满足 Join 倾斜优化的触发条件。选择这样处理的理由有 3 个
增加 shuffle 可以带来另一个优秀的副作用,就是支持多表 Join 场景下的优化,可以说是一举两得
不用魔改 AQE 的代码,可以独立于我们内部的 Spark 分支快速迭代
当然这不是最终的解决方案,和社区的交流还在继续
小文件合并以及 stage 级别的配置隔离
Spark 的小文件问题已经存在很多年了,解决方案也有很多。而 AQE 的出现看起来可以天然的解决小文件问题,因此网易内部基于 AQE 的分区合并优化规则,对每个涉及写操作的 SQL,在其执行计划的顶端动态插入一个 shuffle 节点,从执行计划的角度看起来是这样的:
再结合可以控制每个分区大小的相关配置,看起来一切都是这么美好。但问题还是来了,其中有两个最明显的问题:
简单添加一个 shuffle 节点无法满足动态分区写的场景
假设我们最终产生 1k 个分区,动态插入的分区值的数量也是 1k,那么最终会产生的文件数是 1k x 1k = 1m。这肯定是不能被接受的,因此我们需要对动态分区字段做重分区,让包含相同分区值的数据落在同一个分区内,这样 1k 个分区生成的文件数最多也是 1k。但是这样处理后还有有一个潜在的风险点,不同分区值的分布是不均匀的,也就是说可能出现数据倾斜问题。对于这样情况,我们又额外增加了与业务无关的重分区字段,并通过配置的方式帮助用户快速应对不同的业务场景。
单分区处理的数据量过大导致性能瓶颈
成也萧何,败也萧何。把 spark.sql.adaptive.advisoryPartitionSizeInBytes 调大后小文件的问题是解决了,但是过程中每个分区处理的数据量也随之增加,这导致过程中的并发度无法达到预期的要求。因此 stage 级别的配置隔离出现了。我们直接把整个 SQL 配置划分为两部分,最后一个 stage 以及之前的 stage,然后把这两个部分之间的配置做了隔离。拿上面这个配置来说,在最后一个 stage 的样子是 spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes。在配置隔离的帮助下,我们可以完美解决小文件和计算性能不能兼得的问题,用户可以更加优雅地使用 AQE。
4.4 案例分享
多表 Join 倾斜
下面这两张图为 3 表 Join 的执行计划,由于长度的限制我们只截取到 Join 相关的片段,并且没有被优化的任务由于数据倾斜问题没有执行成功。可以明显看到社区版本无法对这类多表 Join 做倾斜优化,而我们在动态插入 shuffle 之后,两次 Join 都成功的被优化。在这个特性的帮助下,Join 倾斜优化的覆盖场景相对于社区有明显提升。
社区版本
内部版本
配置隔离前
配置隔离后
总结与展望
在优化细节上的角度,可以增加命中 AQE 优化的 case,比如 Join 倾斜优化增强,让用户不用逐个检查不能被优化的执行计划
在业务使用上的角度,可以同时支持 ETL,Add hoc 等侧重点不一样的场景,比如 stage 配置隔离这个特性,让关注写和读的业务都有良好的体验
在完成这个阶段性的优化后,接下来我们会继续深耕在 AQE 的覆盖场景上,比如支持 Union 算子的细粒度优化,增强 AQE 的代价估计算法等。除此之外,还有一些潜在的性能回归问题也是值得我们注意的,比如在做分区合并优化后会放大某些高时间复杂度算子的性能瓶颈。
作为可能是最快在线上使用 Apache Spark 3.1.1 的用户,网易在享受社区技术福利的同时也在反哺社区。这也是网易对技术的思考和理念:
因为开放,我们拥抱开源,深入社区
因为热爱,我们快速接收新的理论,实践新的技术