DDIA:数仓和大数据的双向奔赴
DDIA 读书分享会,会逐章进行分享,结合我在工业界分布式存储和数据库的一些经验,补充一些细节。每两周左右分享一次,欢迎加入,Schedule 在这里[1]。我们有个对应的分布式&数据库讨论群,每次分享前会在群里通知。如想加入,可以加我的微信号:qtmuniao,简单自我介绍下,并注明:分布式系统群。
在 MapReduce 流行这些年之后,针对大数据集的分布式批处理执行引擎已经逐渐成熟。到现在(2017年)已经有比较成熟的基础设施可以在上千台机器上处理 PB 量级的数据。因此,针对这个量级的基本数据处理问题可以认为已经被解决,大家的注意力开始转到其他问题上:
- 完善编程模型
- 提升处理性能
- 扩大处理领域
之前我们讨论过,由于 MapReduce 提供的编程接口实在太过难用,像 Hive, Pig,Cascading 和 Crunch 等处理 API 和框架逐渐流行。Apache Tez 更进一步,可以让原来的代码不做过多改动就可以迁移。Spark 和 Flink 也各自有其高层的数据流 API,基本借鉴自 FlumeJava。
这些数据流工具基本都是用关系型的算子来表达计算过程:
- 基于某些字段对数据集进行连接的 Join 算子
- 基于关键字对元组进行聚类的 Group 算子
- 基于条件对元组进行过滤的 Filter 算子
- 对元素进行聚合和统计的 Aggregate 算子
等等。这些算子内部实现时,会用到我们本章之前提到的各种 join 和 group 算法。
除了能够显著降低使用方的代码量外,这些高层的框架通常还支持交互式的使用。因此,你可以在 shell 中增量式的构建分析代码,且能够方便的多次跑以查看运行结果。当我们拿到一个新的数据集,需要做实验探索该如何对其进行分析时,这种交互式的方式非常方便。这其实也是我们之前讨论过的 Unix 编程哲学的一个体现。
这些高层的 API 不仅让用户可以更高效的使用体验,还能够提升任务在物理层面的执行效率。
向声明式方向靠拢
相比直接实现代码进行 Join,使用关系型的 Join 算子给了处理框架分析数据集特点、选择最高效 Join 算的优化空间。Hive,Spark 和 Flink 都有基于代价的优化器,可以对执行路径进行优化。甚至,可以交换 Join 的顺序,来最小化中间数据集的物化。
不同 Join 算法的选择对批处理任务的性能影响极大,但我们最好避免将选择的心智负担推给用户,而可以自动地根据情况进行优化。使用声明式风格的接口使这种自动优化称为可能:用户侧仅需要指定哪些数据集需要 Join,而查询优化器会根据数据特点动态的决定其最优 Join 方式。我们在数据查询语言一节中讨论过这种思想。
但从另一方面来说,MapReduce 和其后继的数据流框架和 SQL 这种完全的声明式语言又不一样。MapReduce 是基于回调函数来构建的:对于任意的一条或一批数据,用户可以自定义处理函数(Mapper 或者 Reducer),调用任何库代码、决定其输出格式。这种方式的优点是,你可以复用很多现成的库来减少开发工作量,比如 Parsing、自然语言分析、图像分析和一些数理统计算法方面的库。
在很长一段时间内,能够自由地跑任意的代码是批处理系统和 MPP 数据库的一个重要区分点。尽管数据库也支持 UDF(user defined function),但使用起来较为复杂,且不能很好的和编程语言的包管理工具(比如 Maven 之于 Java,npm 之于 JavaScript,Rubygems 之于 Ruby)相整合。
然而,在 Join 之外,更进一步地引入声明式功能也对数据流工具有诸多好处。例如,一个过滤函数只有很简单的过滤条件(过滤行)、或只是从原数据集中选择几列(过滤列),则针对每条数据都调用一遍回调函数会有很大的额外性能损耗。如果这些简单的过滤和投影能够用声明式的方式表达,则优化器可以充分利用面向列的存储格式(参见列存),只读取需要的列。Hive,Spark DataFrames 和 Impala 还使用了列式执行引擎(vectorized execution):
以一种 CPU 缓存友好的方式,紧凑地进行迭代(每次取一个 Cache Line,使用 SIMD 指令进行运算),以减少函数调用次数。
Spark 使用 JVM 字节码、Impala 使用 LLVM 来通过生成代码的方式优化这些 Join 内层循环。
通过在高层 API 中注入声明式的特性、在运行时使用优化器动态地优化,批处理框架长得越来越像 MPP 数据库(也获得了类似性能)。但同时,仍然保持原来允许运行任意库代码、读取任意格式数据的扩展性,让这些框架仍然可以保持原有的灵活性。
不同领域的特化
保留运行任意代码的自由度很有必要,但对于很多非常通用、反复出现的处理模式,我们有必要提供系统实现以方便用户复用。传统上,MPP 数据库通常充当商业智能(BI)分析和商业汇报领域的生态位,但这个方向只是批处理众多应用方向的一个。
另外一个越来越重要的方向是数值统计算法,其在推荐和分类的机器学习算法中常常用到。可复用的实现逐渐多了起来:例如 Mahout 在 MapReduce、Spark 和 Flink 之上实现了很多机器学习算法;MADlib 也在 MPP 数据库之上实现了类似的功能模块。
其他有用的算法还有—— k 最近邻算法(k-nearest neighbors)——一种在多维空间中搜索与给定数据条目相似度最高的数据算法,是一种近似性搜索算法。近似搜索对于基因组分析算法也很重要,因为在基因分析中,常需要找不同但类似的基因片段。近年来较火的向量数据库也是主要基于该算法。
批处理引擎被越来越多的用到不同领域算法的分布式执行上。随着批处理系统越来越多支持内置函数和高层声明式算子、MPP 数据库变的越来越可编程和灵活度高,他们开始长的越来越像——说到底,本质上他们都是用于存储和处理数据的系统。
小结在本章,我们探讨了批处理的话题。我们从 Unix 的命令行工具 awk、grep 和 sort 开始,探讨其背后的思想被如何应用到 MapReduce 框架和更近的数据流框架中。这些核心设计原则包括:
- 输入数据不可变
- 一个组件的输出可以喂给另一个组件成为输入
- 通过组合“解决好一件事的小工具”来解决复杂问题
在 Unix 世界中,让所有命令行具有可组合性的统一抽象是——文件和管道,在 MapReduce 中,这个抽象是分布式文件系统。之后我们注意到,数据流工具通过增加各自的“类管道”的数据传输方式,避免了将中间结果物化到分布式文件系统中的额外损耗,但最外侧的输入和输出仍然是在 HDFS 上。
分布式处理框架最主要解决的两个问题是:
-
分片
在 MapReduce 中,会根据输入数据的文件块(file chunk)的数量来调度 mappers。mappers 的输出会在二次分片、排序、合并(我们通常称之为 shuffle)到用户指定数量的 Reducer 中。该过程是为了将所有相关的数据(如具有相同 key)集结到一块。
后 MapReduce 时代的数据流工具会尽量避免不必要的排序(因为代价太高了),但他们仍然使用了和 MapReduce 类似的分区方式。
-
容错
MapReduce 通过频繁的(每次 MapReduce 后)刷盘,从而可以避免重启整个任务,而只重新运行相关子任务就可以从其故障中快速恢复过来。但在错误频率很低的情况下,这种频繁刷盘做法代价很高。数据流工具通过尽可能的减少中间状态的刷盘(当然,shuffle 之后还是要刷的),并将其尽可能的保存在内存中,但这意味着一旦出现故障就要从头重算。算子的确定性可以减少重算的数据范围(确定性能保证只需要算失败分区,并且结果和其他分区仍然一致)。
接下来我们讨论了几种基于 MapReduce 的 Join 算法,这些算法也常被用在各种数据流工具和 MPP 数据库里。他们很好的说明了基于数据分区的算法的工作原理:
-
Sort-merge joins
分桶排序。将多个待 join 的输入数据使用一个 MapReduce 处理,在 Mapper 中提取待 join key ,然后通过再分区、排序和合并,会将具有相同 join key 的 records 送到同一个 Reducer 中进行 join。然后 Reducer 函数会将 join 结果进行输出。
-
Broadcast hash joins
小表广播。如果 join 中的一个表数据量很小,可以完全加载进内存的哈希表里,则不用对其进行分片。我们可以将大表进行分片,分发给各个 mapper,每个 Mapper 将小表加载到内存里,然后逐个遍历大表每个 record,提取相应 join key,再与小表中的记录值进行 Join。
-
Partitioned hash joins
分桶哈希。如果两个待 join 输入使用相同的方式进行分片(相同的 key、相同的哈希函数和分区数),则广播哈希算法可以在每个分区内单独应用。
分布式批处理引擎使用了受限的编程模型:回调函数需要是无状态的,且除了输出之外没有其他的副作用。在此设定下,框架可以向应用层屏蔽很多分布式系统的实现细节:当遇到宕机或者网络问题时,子任务可以安全的进行重试;失败任务的输出可以自由抛弃;如果有多个冗余计算过程都成功了,则只有其中一个可以作为输出对后面可见。
由于框架的存在,用户侧的批处理代码无需关心容错机制的实现细节:即使在物理上有大量错误重试的情况下,框架可以保证在逻辑上最终的输出和没有任何故障发生是一致的。这种可靠性语义保证(reliable semantics)通常远强于我们在在线服务中常见到的、将用户的请求写到数据库中的容错性。
批处理任务的基本特点是——读取输入,进行处理,产生输出的过程中,不会修改原数据。换句话说,输出是输入的衍生数据。其中一个重要特点是,输入数据是有界的(bounded):输入的大小是固定的、事先确定的(比如输入是包含一组日志的数据或者一个快照点的数据)。唯其有界,处理任务才能知道什么时候输入读取结束了、什么时候计算完成了。
但在下一章中,我们将会转到流处理(stream processing)上,其中,输入是无界的(unbounded)——你的任务面对的是不知道何时结束的无限数据流。在这种情况下,任何时刻都有可能有新的数据流入,任务会永不结束。我们之后可以看到,虽然批处理和流处理在某些方面有相似之处,但对于输入的无界假设,会在构建系统时对我们的设计产生诸多影响。
参考资料
[1]DDIA 读书分享会: https://ddia.qtmuniao.com/
DDIA 学习会
这本书由于涵盖知识点实在太多,没有一定基础读的时候会有很多问题,如果没有人交流和解惑,往往坚持一半就容易弃掉。 因此我依托小报童平台建立了一个针对本书的学习和答疑的学习会。 主要目的,是结合我的一些工业经验,给大家提供一些细节的答疑,但能力所限,也难免有疏漏和谬误之处,如果你觉得我的回答有问题,欢迎 challenge,交流才能进步! 次要目的,就是为所有喜欢这本书的同学创造一个交流讨论环境。
入会方式:订阅专栏:https://xiaobot.net/p/large-scale-sys (最下方,点阅读原文即可达)。 如果有任何疑问和建议,订购前可以加我微信:qtmuniao。
加我微信(vx: qtmuniao)拉你入学习会的群,进行答疑、讨论和信息发布。大致形式:每两周过一章,发一篇以章节标题+时间段的空白文章。
-
留言答疑:大家在上述文章下面打卡和提问。
-
文字答疑:微信群,每天我会抽时间集中回复。
-
视频答疑:腾讯会议,每周一次,每次一小时。
-
答疑汇总:将大家的问题和我的回答整理处理,以文章形式发在小报童专栏中;答疑录音会发到小宇宙上。
最后欢迎关注我的公众号: 木鸟杂记 ,专注分布式系统、数据库和存储等大规模数据系统,关注后可回复“资料”领取一份我总结的分布式系统和数据库的入门资料大全。