Iceberg 实践 | Flink + Iceberg + 对象存储,构建数据湖方案
HBase技术社区
共 7858字,需浏览 16分钟
·
2021-07-12 21:50
数据湖和 Iceberg 简介
未来规划
演示方案 存储优化的一些思考
一、数据湖和 Iceberg 简介
1. 数据湖生态
首先我们认为它底下应具备海量存储的能力,常见的有对象存储,公有云存储以及 HDFS;
在这之上,也需要支持丰富的数据类型,包括非结构化的图像视频,半结构化的 CSV、XML、Log,以及结构化的数据库表;
除此之外,需要高效统一的元数据管理,使得计算引擎可以方便地索引到各种类型数据来做分析。
最后,我们需要支持丰富的计算引擎,包括 Flink、Spark、Hive、Presto 等,从而方便对接企业中已有的一些应用架构。
2. 结构化数据在数据湖上的应用场景
首先,可以看到数据源的类型很多,因此需要支持比较丰富的数据 Schema 的组织; 其次,它在注入的过程中要支撑实时的数据查询,所以需要 ACID 的保证,确保不会读到一些还没写完的中间状态的脏数据; 最后,例如日志这些有可能临时需要改个格式,或者加一列。类似这种情况,需要避免像传统的数仓一样,可能要把所有的数据重新提出来写一遍,重新注入到存储;而是需要一个轻量级的解决方案来达成需求。
3. 结构化数据在数据湖上的典型解决方案
4. Iceberg 表数据组织架构
快照 Metadata:表格 Schema、Partition、Partition spec、Manifest List 路径、当前快照等。 Manifest List:Manifest File 路径及其 Partition,数据文件统计信息。 Manifest File:Data File 路径及其每列数据上下边界。 Data File:实际表内容数据,以 Parque,ORC,Avro 等格式组织。
可以看到右边从数据文件开始,数据文件存放表内容数据,一般支持 Parquet、ORC、Avro 等格式; 往上是 Manifest File,它会记录底下数据文件的路径以及每列数据的上下边界,方便过滤查询文件; 再往上是 Manifest List,它来链接底下多个 Manifest File,同时记录 Manifest File 对应的分区范围信息,也是为了方便后续做过滤查询; Manifest List 其实已经表示了快照的信息,它包含当下数据库表所有的数据链接,也是 Iceberg 能够支持 ACID 特性的关键保障。 有了快照,读数据的时候只能读到快照所能引用到的数据,还在写的数据不会被快照引用到,也就不会读到脏数据。多个快照会共享以前的数据文件,通过共享这些 Manifest File 来共享之前的数据。 再往上是快照元数据,记录了当前或者历史上表格 Scheme 的变化、分区的配置、所有快照 Manifest File 路径、以及当前快照是哪一个。 同时,Iceberg 提供命名空间以及表格的抽象,做完整的数据组织管理。
5. Iceberg 写入流程
首先,Data Workers 会从元数据上读出数据进行解析,然后把一条记录交给 Iceberg 存储; 与常见的数据库一样,Iceberg 也会有预定义的分区,那些记录会写入到各个不同的分区,形成一些新的文件; Flink 有个 CheckPoint 机制,文件到达以后,Flink 就会完成这一批文件的写入,然后生成这一批文件的清单,接着交给 Commit Worker; Commit Worker 会读出当前快照的信息,然后与这一次生成的文件列表进行合并,生成一个新的 Manifest List 以及后续元数据的表文件的信息,之后进行提交,成功以后就形成一个新的快照。
6. Iceberg 查询流程
首先是 Flink Table scan worker 做一个 scan,scan 的时候可以像树一样,从根开始,找到当前的快照或者用户指定的一个历史快照,然后从快照中拿出当前快照的 Manifest List 文件,根据当时保存的一些信息,就可以过滤出满足这次查询条件的 Manifest File; 再往下经过 Manifest File 里记录的信息,过滤出底下需要的 Data Files。这个文件拿出来以后,再交给 Recorder reader workers,它从文件中读出满足条件的 Recode,然后返回给上层调用。
7. Iceberg Catalog 功能一览
它可以对 Iceberg 定义一系列角色文件; 它的 File IO 都是可以定制,包括读写和删除; 它的命名空间和表的操作 (也可称为元数据操作),也可以定制; 包括表的读取 / 扫描,表的提交,都可以用 Catalog 来定制。
二、对象存储支撑 Iceberg 数据湖
1. 当前 Iceberg Catalog 实现
2. 对象存储和 HDFS 的比较
对象存储在集群扩展性,小文件友好,多站点部署和低存储开销上更加有优势; HDFS 的好处就是提供追加上传和原子性 rename,这两个优势正是 Iceberg 需要的。
HDFS 架构是用单个 Name Node 保存所有元数据,这就决定了它单节点的能力有限,所以在元数据方面没有横向扩展能力。 对象存储一般采用哈希方式,把元数据分隔成各个块,把这个块交给不同 Node 上面的服务来进行管理,天然地它元数据的上限会更高,甚至在极端情况下可以进行 rehash,把这个块切得更细,交给更多的 Node 来管理元数据,达到扩展能力。
HDFS 基于架构的限制,小文件存储受限于 Name Node 内存等资源,虽然 HDFS 提供了 Archive 的方法来合并小文件,减少对 Name Node 的压力,但这需要额外增加复杂度,不是原生的。 同样,小文件的 TPS 也是受限于 Name Node 的处理能力,因为它只有单个 Name Node。对象存储的元数据是分布式存储和管理,流量可以很好地分布到各个 Node 上,这样单节点就可以存储海量的小文件。 目前,很多对象存储提供多介质,分层加速,可以提升小文件的性能。
对象存储支持多站点部署 全局命名空间 支持丰富的规则配置 对象存储的多站点部署能力适用于两地三中心多活的架构,而 HDFS 没有原生的多站点部署能力。虽然目前看到一些商业版本给 HDFS 增加了多站点负责数据的能力,但由于它的两个系统可能是独立的,因此并不能支撑真正的全局命名空间下多活的能力。
对于存储系统来说,为了适应随机的硬件故障,它一般会有副本机制来保护数据。 常见的如三副本,把数据存三份,然后分开保存到三个 Node 上面,存储开销是三倍,但是它可以同时容忍两个副本遇到故障,保证数据不会丢失。 另一种是 Erasure Coding,通常称为 EC。以 10+2 举例,它把数据切成 10 个数据块,然后用算法算出两个代码块,一共 12 个块。接着分布到四个节点上,存储开销是 1.2 倍。它同样可以容忍同时出现两个块故障,这种情况可以用剩余的 10 个块算出所有的数据,这样减少存储开销,同时达到故障容忍程度。 HDFS 默认使用三副本机制,新的 HDFS 版本上已经支持 EC 的能力。经过研究,它是基于文件做 EC,所以它对小文件有天然的劣势。因为如果小文件的大小小于分块要求的大小时,它的开销就会比原定的开销更大,因为两个代码块这边是不能省的。在极端情况下,如果它的大小等同于单个代码块的大小,它就已经等同于三副本了。 同时,HDFS 一旦 EC,就不能再支持 append、hflush、hsync 等操作,这会极大地影响 EC 能够使用的场景。对象存储原生支持 EC,对于小文件的话,它内部会把小文件合并成一个大的块来做 EC,这样确保数据开销方面始终是恒定的,基于预先配置的策略。
3. 对象存储的挑战:数据的追加上传
第一步先创建初始化的 MPU,拿到一个 Upload ID,然后给每一个分段赋予一个 Upload ID 以及一个编号,这些分块就可以并行上传; 在上传完成以后,还需要一步 Complete 操作,这样相当于通知系统,它会把基于同一个 Upload ID 以及所有的编号,从小到大排起来,组成一个大文件; 把机制运用到数据追加上传场景,常规实现就是写入一个文件,把文件缓存到本地,当达到分块要求大小时,就可以把它进行初始化 MPU,把它的一个分块开始上传。后面每一个分块也是一样的操作,直到最后一个分块上传完,最后再调用一个完成操作来完成上传。
缺点是 MPU 的分片数量有上限,S3 标准里可能只有 1 万个分片。想支持大文件的话,这个分块就不能太小,所以对于小于分块的文件,依然是要利用前面一种方法进行缓存上传; MPU 的优点在于并行上传的能力。假设做一个异步的上传,文件在缓存达到以后,不用等上一个分块上传成功,就可以继续缓存下一个,之后开始上传。当前面注入的速度足够快时,后端的异步提交就变成了并行操作。利用这个机制,它可以提供比单条流上传速度更快的上传能力。
4. 对象存储的挑战:原子提交
这里 Commit Worker 1 拿到了 v006 版本,然后合并自己的文件,提交 v007 成功。 此时还有另一个 Commit Worker 2,它也拿到了 v006,然后合并出来,且也要提供 v007。此时我们需要一个机制告诉它 v007 已经冲突,不能上传,然后让它自己去 Retry。Retry 以后取出新的 v007 合并,然后提交给 v008。
首先,Commit Worker 1 拿到 v006,然后合并文件,在提交之前先要获取这一把锁,拿到锁以后判断当前快照版本。如果是 v006,则 v007 能提交成功,提交成功以后再解锁。 同样,Commit Worker 2 拿到 v006 合并以后,它一开始拿不到锁,要等 Commit Worker 1 释放掉这个锁以后才能拿到。等拿到锁再去检查的时候,会发现当前版本已经是 v007,与自己的 v007 有冲突,因此这个操作一定会失败,然后它就会进行 Retry。
5. Dell EMC ECS 的数据追加上传
6. Dell EMC ECS 在并发提交下的解决方案
If-Match 就是说在 Commit Worker 1 提交拿到 v006 的时候,同时拿到了文件的 eTag。提交的时候会带上 eTag,系统需要判断要覆盖文件的 eTag 跟当前这个文件真实 eTag 是否相同,如果相同就允许这次覆盖操作,那么 v007 就能提交成功; 另一种情况,是 Commit Worker 2 也拿到了 v006 的 eTag,然后上传的时候发现拿到 eTag 跟当前系统里文件不同,则会返回失败,然后触发 Retry。
7. S3 Catalog - 统一存储的数据
三、演示方案
四、存储优化的一些思考
另外~《Apache Flink-实时计算正当时》电子书重磅发布,本书将助您轻松 Get Apache Flink 1.13 版本最新特征,同时还包含知名厂商多场景 Flink 实战经验,学用一体,干货多多!快扫描下方二维码获取吧~
(本次为抢鲜版,正式版将于 7 月初上线)
更多 Flink 相关技术交流,可扫码加入社区钉钉大群~
▼ 关注「Flink 中文社区」,获取更多技术干货 ▼
评论