Spark DataSource API v2 版本对比 v1有哪些改进?
原文:https://issues.apache.org/jira/browse/SPARK-15689
Data Source API V2.pdf
背景和动机
DataSource API v1 版本于 Spark 1.3 发布。
根据社区反馈,它具有下面的限制:
1. 由于其输入参数包括 DataFrame / SQLContext,因此 DataSource API 兼容性取决于这些上层的 API。 2. 物理存储信息(例如,划分和排序)不会从数据源传播,并且因此,Spark 的优化器无法利用。 3. 可扩展性不好,并且算子的下推能力受限。 4. 缺少高性能的列式读取接口。 5. 写入接口是如此普遍,不支持事务。 |
由于上面的限制和问题, Spark SQL 内置的数据源实现(如 Parquet,JSON等)不使用这个公共 DataSource API。
相反,他们使用内部/非公共的接口。这样很难使得外部的数据源实现像内置的一样快。
这让一些数据源开发人员感到失望,有时候为了使用 Spark ,他们不得不针对 Spark 做出昂贵的改变。
在这样的背景下,DataSource API v2 版本应运而生。
DataSource API v2版本旨在提供一个高性能的,易于维护的,易于扩展的外部数据源接口。
v2 的目标
针对 Scala / Java 设计一个新的 DataSource API:
Java Friendly
没有依赖 DataFrame,RDD, SparkSession 等
支持谓词下推和列剪裁。此外,易于添加更多的算子下推,而不会破坏向后的兼容性,例如 limit,sample,aggregate 等。请注意,二进制算子的下推,像 join 的下推,就超出了范围。
能够传播物理分区信息和其他的一些信息而不破坏向后的兼容性。例如,统计,索引和排序。这些可以被 Spark 用来优化查询。
有列式读取的接口(需要一种公共列式存储格式)和 InternalRow 读取接口(因为 InternalRow 不会发布,这仍然是一个实验性的接口)。
具有事务支持的写入接口。写入接口应当可插拔以允许只读的数据源。
能够替换 HadoopFsRelation。
能够替换内部 Hive 特定表的读/写计划。
DataSource API v2 版本主要关注读取,写入和优化扩展,而无需添加像数据更新一样的新功能。
v2 不希望达成的目标
定义 Scala 和 Java 以外的语言的数据源。
列式写入接口(尽管有的话会很好)
流数据源
目前我们没有数据源的新功能,例如 数据更新(现在我们只支持追加和覆盖),支持除 Hive 以外的 catalog,定制 DDL 语法等。
v2 中期望出现的API
保留Java 兼容性的最佳方法是在 Java 中编写 API。很容易处理 Scala 中的 Java 类/接口,但反之则不亦然。
读取接口返回输出数据的读取任务,而不是DataFrame / RDD,以最小化依赖关系。
补充的读取接口,还提供了 schema 推断接口。数据源可以实现:
需要用户指定 schema
用户指定的 schema 不允许,schema 会自动推断
尊重用户指定的 schema,如果不可用,则也可以自动推断 schema。
可以基于数据源实现支持 schema 的演进。Spark 仍然可以追加和读取那些不同的 来自数据源预定义或推断 schema 的数据。并不是所有的数据源都支持 Schema 的演进。例如,Parquet 和 JSON 支持 schema 的演进,但是 CSV 却没有。
所有的数据源优化,如列剪裁,谓词下推,列式读取等。应该定义为单独的 Java 接口,用户可以选择他们想要实现的任何优化。
DataSource API v2中不应该出现理想化的分区/分桶概念,因为它们是只是数据跳过和预分区的技术。但是,这 2 个概念在 Spark 中已经广泛使用了,例如 DataFrameWriter.partitionBy 和 像 ADD PARTITION 的DDL语法。为了保持一致性,我们需要添加分区/分桶到DataSource API v2 ,以便实现可以指定分区/分桶的读/写。
分桶可能不是唯一可以进行预分区的技术,DataSource API v2包含哈希分区下推。
写入接口遵循FileFormatWriter / FileCommitOctocol,并引入 task 和 job 级别的提交和中止。请注意,这只能保证 job 级别的事务。如果多个 job 中出现了单个查询,则此查询可能不是事务。
读取,写入和 shema 推断都将字符串作为选项带到字符串映射。每个数据源实现可以自由定义自己的选项。
DataSource 选项应该是不区分大小写的,并且显式的挑选CaseInsensitiveMap以表示选项。
除了通过为每个读写操作的字符串到字符串的映射来设置数据源选项 ,用户还可以在当前会话中设置它们,通过设置spark.datasource.SOURCE_NAME前缀的选项。例如,当用户发出命令spark.conf.set("spark.datasource.json.samplingRatio","0.5"),samplingRatio = 0.5 会在当前会话中随后的JSON 数据源读取中生效。