Iceberg 剖析 | Iceberg 是如何提高查询性能的

共 4440字,需浏览 9分钟

 ·

2022-04-10 04:20



在上周介绍Iceberg原理的文章中,我有提到Iceberg的设计初衷是为了解决Hive数仓所遇到的问题,主要有4点:


  1. 没有ACID保证

  2. 只能支持partition粒度的谓词下推

  3. 确定扫描的文件需要使用文件系统的list操作

  4. partition字段必须显式出现在query里面


其中2和3都是直接影响query性能的问题,而Iceberg的主打卖点正是更快的查询速度。作者Ryan Blue在DataWorks Summit上宣讲Iceberg的ppt上就有列出Netflix在使用了Iceberg之后的性能提升情况

Hive光是确定查询计划就要9.6分钟,而Iceberg确定查询计划只要42秒

在腾讯关于Iceberg的《速度提升10倍,腾讯基于Iceberg的数据治理优化实践》这篇文章中,也提到了腾讯采用Iceberg的一部分原因正是看中了其对查询性能的提升。所以这篇文章的主题,就是来讲一讲Iceberg是如何减少查询的数据量,从而提高查询性能的

01


首先,对于存储层的数据湖系统(Delta,Hudi,Iceberg)来说,query在存储层发生的事情粗略地可分为两个阶段:

  1. 确定需要读取的文件列表(planning阶段

  2. 读取数据并传递给计算引擎(execution阶段


对于execution阶段,相信比较符合直觉,是所有人都认可的存储层需要做的事情。但是对于planning阶段,或许有些朋友会有疑问:Spark等计算引擎需要产生执行计划,我还能理解,为什么存储层也需要执行计划

其实存储层的执行计划没有计算层的那么复杂,但确实也有一些需要做计划的事情,主要是以下2点:

  1. 需要读取哪些partition

  2. 每个partition下有哪些文件


这两件事情的内容一目了然,我就不再赘述了。

讲完了planning阶段需要解决的问题,回到我们最初的主题,来讲讲Iceberg所做的优化。其实Iceberg在这两个阶段都有做优化,优化机制总的来说主要有2种:

  1. Partition Pruning(分区剪枝)
  2. Predicate Pushdown(谓词下推)

接下来就会讲一讲Iceberg是如何实现这两个功能的。

02


Partition Pruning(分区剪枝),主要针对的是planning阶段中的第一个问题:需要读取哪些partition

分区剪枝并不是一个新鲜事物。比如Hive就会根据查询条件来决定是否使用分区查询,以及具体查询哪个或哪几个分区,其实就是一种分区剪枝。

Hive会根据查询条件确定读取哪个分区


Iceberg没有沿用Hive相对简单的分区规则,而是自己实现了一套更为复杂的分区系统及分区剪枝算法,名为Hidden Partition。Iceberg选择自己实现,目的是为了克服Hive的分区功能在使用上的不方便以及容易出错。在Iceberg里面,分区是存储系统的一个实现细节,用户无需理解分区和文件系统的路径是如何对应的。(关于Hive的分区功能的问题,请见上一篇文章

接下来具体讲一讲Iceberg的分区剪枝算法。

首先在Iceberg中我们用下面的CREATE语句来创建分区表

CREATE TABLE foobar (id bigint, data string) USING iceberg PARTITIONED BY (truncate(id, 3))
可以看到建表语法和Hive是一样的

和Hive不同的是,Iceberg实现分区剪枝不是依赖文件所在的目录,而是利用了Iceberg特有的manifest文件。上一篇文章中有提到,Iceberg每次写入都会产生一个新的snapshot,而一个snapshot在文件系统上就对应一个manifest文件。

manifest文件的内容类似如下

{  ...  "snapshot_id": {    "long": 4370069137697126000  },  "data_file": {    "file_path": ".../table/data/id_trunc=0/00000-0-1c1865ee-a812-465c-87cb-7588478c2d8f-00001.parquet",    "file_format": "PARQUET",    "partition": {      "id_trunc": {        "long": 0      }    },    ...  }}{  ...  "snapshot_id": {    "long": 4370069137697126000  },  "data_file": {    "file_path": ".../table/data/id_trunc=3/00001-1-b18c1240-86bb-48da-85e9-4a0e41a82b26-00001.parquet",    "file_format": "PARQUET",    "partition": {      "id_trunc": {        "long": 3      }    },    ...  }}


其中的data_file字段记录的是这个snapshot里包含的数据文件的信息,注意到data_file字段里有一个字段是partition,里面记录的就是这个data_file所在的partition信息。Iceberg正是使用这些元数据确定每个分区里包含哪些文件的。

相比于Hive,Iceberg的这种partition实现方式有以下好处:

  1. 直接定位到parquet文件,无需调用文件系统的list操作。

  2. partition的存储方式对用户透明,用户在修改partition定义时Iceberg可以自动地修改存储布局而无需用户操作。


03


讲完了分区剪枝,接下来再讲一讲Predicate Pushdown(谓词下推)

前几篇文章中也有提到过,谓词下推是计算引擎(Spark等)把查询的过滤条件(where条件)下推到存储层,在存储层面就把一部分必然不满足条件的数据过滤掉,从而减少存储层返回给计算引擎的数据量。

在讲Iceberg如何实现谓词下推之前,我先讲一讲Spark是如何实现谓词下推的

Spark作为计算层的系统,自己并不实现谓词下推,而是交给文件格式的reader来解决。例如,对于parquet文件,Spark使用下面这个类读取parquet文件

/**   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.   * @param filter for filtering individual records   */  public ParquetRecordReader(ReadSupport readSupport, Filter filter) {    internalReader = new InternalParquetRecordReader(readSupport, filter);  }

注意ParquetRecordReader的第二个参数filter,就是parquet对计算引擎提供的接口,用于传入过滤条件。而ParquetRecordReader从parquet文件中读取数据后,会首先使用filter过滤掉不满足的record,然后再交给计算引擎。

相比于Spark,Iceberg做得更多一些。Iceberg会在两个层面实现谓词下推:

1. 在snapshot层面,过滤掉不满足条件的data file
2. 在data file层面,过滤掉不满足条件的数据

其中第一点是Iceberg特有的,也是利用了manifest文件里保存的元数据。

Iceberg在manifest文件里记录了每个字段的上界和下界。所以在planning阶段,Iceberg就可以利用这个信息,过滤掉不满足条件的文件,进一步减少文件的扫描量。

{  ...  "data_file": {    "file_path": ".../table/data/id_trunc=0/00000-0-1c1865ee-a812-465c-87cb-7588478c2d8f-00001.parquet",    ...    "lower_bounds": {      "array": [        {          "key": 1,          "value": 1        },        {          "key": 2,          "value": "a"        }      ]    },    "upper_bounds": {      "array": [        {          "key": 1,          "value": 2        },        {          "key": 2,          "value": "b"        }      ]    },    ...  }}
这个文件包含两条数据,(1, “a”)和(2, “b”)。可以看到上界和下界分别是1和2,a和b

第二点则和Spark类似,也是在读取文件时,过滤掉不满足条件的数据。不过和Spark不同的是,Iceberg作为存储层的系统,使用的是parquet更偏底层的ParquetFileReader接口,自己实现了过滤逻辑。和Spark的ParquetRecordReader有什么不同呢?Iceberg的这种实现方式可以直接跳过整个row group,更进一步地减少io量,不过碍于篇幅,细节我就不展开讲了。


04


以上就是Iceberg在优化查询性能方面所实现的优化机制,本质都是为了减少数据查询量。可以看到manifest文件在Iceberg的优化逻辑中起到非常关键的作用,正是因为Iceberg利用了云原生数据仓库的文件大多不可变的特性,收集了非常多关于数据分布的元数据的关系。相信未来存储层会在这方面有更多的发展,然后和计算引擎的结合更为紧密,从而进一步提高查询的性能。

最后你或许会问,既然Iceberg有这些query优化机制,那Hudi有同样的功能吗?就我从Hudi的源代码看来,现阶段Hudi实现了一半。Hudi实现了分区剪枝功能,但是谓词下推功能目前似乎还没有实现。

总结下这篇文章知识点:

  1. Iceberg对减少数据查询量提供了两种优化机制:分区剪枝和谓词下推。
  2. Iceberg收集了关于数据分布的元数据,并利用这些元数据实现更高效的数据裁剪,减少了数据的查询量。
浏览 307
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报