Iceberg 实践 | 胡争:如何基于 Flink 和 Iceberg 构建云原生数据湖?
共 6693字,需浏览 14分钟
·
2022-03-06 03:30
分享嘉宾:胡争 阿里云 技术专家
编辑整理:张德通 Treelab
出品平台:DataFunTalk
导读:今天的分享主要介绍阿里云云原生数据湖Iceberg产品实现方案和对数据湖技术的一些思考。今天的分享围绕下几点展开:
数据湖背景介绍
上云面临的挑战
Iceberg解决方案
总结
数据湖和数据库、数据仓库的区别在哪里?这些系统的特点分别是什么?
常用的数据库如MySQL、HBase、Redis和Neo4j图数据库。数据库的特点是QPS吞吐高,单机QPS几千到上万。请求延迟低,一般是毫秒级延迟,单次请求数据集比较小。OLTP数据库在大数据集的扫描和聚合性能表现不如OLAP数据库。数据库对宕机的容忍度低,一般如果MySQL或Redis服务出现异常,业务方会第一时间找到基础服务测催促恢复服务,需要在分钟级时间段内恢复服务。
数据库使用B-Tree、LSMTree等索引,目的是优化请求延迟,其存储成本较高。数据类型方面的话,交易数据、在线的数据偏多。数据库的计算模型相对简单、固定,MySQL是SQL模型,Redis、HBase等KV数据库支持点查,图数据库使用图的方式查询数据库。
数据仓库领域内成熟的产品如Hive、ClickHouse、Snowflake。Hive常用于离线数仓,ClickHouse常用于秒级和分钟级别的纯实时数据的存储,Snowflake是云上数据仓库。数据仓库单次查询的数据集大,单次请求响应延迟一般在秒级、分钟级甚至小时级。数据仓库对可用性的容忍程度稍高一些,例如当数仓服务发生故障时,可在数仓服务恢复后,重启数仓的离线作业继续执行计算任务。数仓存储的数据量大,存储成本高,ClickHouse会建立更复杂的索引进行查询加速、优化;存储的数据类型也会是经过数仓加工后的数据类型。数仓的计算场景比较固定,以SQL查询为主,对于图、机器学习等计算,还需要把数仓中存储的数据导出到对应的数据库中进行进一步分析。
数据湖常见的是Delta Lake、Iceberg、Hudi。数据湖的一些场景和特性与数据仓库类似,但数据湖直接构建在列存文件之上,省去了为数据建立复杂索引的操作,存储成本更低。数据湖存储的数据一般是原始数据集。数据湖的计算模式比数据仓库更丰富,它规定了入湖的数据格式,对上层计算模型没有绑定,SQL、图、机器学习训练都可以支持。数据湖定义了数据格式,上层可以选择任意的模式进行数据计算。
常见的数据湖架构如下图所示,上层可以是Spark、Flink、Hive,甚至是机器学习计算引擎,第二层是数据湖对Table Format的定义,其下层是存储系统。存储系统分为两部分,一部分是数据文件实际存储的位置,可以是对象存储、HDFS等;另一部分是数据缓存,作用是加速数据访问。在存储计算分离的场景下每一次数据访问都需要通过网络做数据查询,太多数据网络数据查询会影响整体系统性能。如果对最近访问的热点数据做本地缓存,则可以大大加速查询性能。
1. 为何上云?
有一些用户使用了Hadoop和HDFS,在传统的环境下已经可以做到很流畅地扩容,那么云可以为这类用户带来什么?
云提供了弹性。在非云环境中部署的HDFS,遇到容量不够的问题时扩容操作复杂、时间长,对突发的流量高峰响应不及时、扩容后流量高峰已过资源被浪费。云上的对象存储可以认为存储容量是无限大的,但HDFS需要用户时刻关心HDFS存储容量是否已经用完、是否需要进行存储扩容操作。
OSS和S3的存储成本比HDFS低很多,这是由于云厂商的存储做了EC优化、云存储有硬件加速和协议栈优化等。云厂商的规模效应让所有客户都可以享受到云平台对存储组件优化带来的红利,云厂商对用户需求也能够做到专业且及时的响应。
使用云服务,不再需要自己维护基础组建,使用云厂商提供的服务减少了用户自己维护基础存储组件的麻烦,能够更专注地做业务。
2. 以Hive为例,数仓上云
Netflix的所有基础设施都是基于AWS的云服务,做过Hive上云的尝试。在Hive上云的过程中发现了一些问题:
数据变更和回溯困难:由于Hive系统设计问题,很难做到ACID语义的保证,当数据有变更时很难做到读写影响相互隔离,如数据改动会影响分析任务。Hive也无法解决多个数据改动者同时写入造成的数据冲突。Hive无法有效回溯历史版本。
存储替换HDFS为S3困难:Hive数据访问接口直接依赖HDFS API。S3在文件操作的语义上支持并不完善,Hive使用rename完成原子性写文件的操作,这在S3等对象存储上无法实现相同的语义。Hive大量依赖文件目录list接口,而对象存储的文件list操作效率低。
太多细节问题:Schema变更时,不同文件格式行为不一致。不同 FileFormat 甚至连数据类型的支持都不一致,如Timestamp在不同数据格式上的精度不同,需要在开发过程中时刻关心数据精度,对业务开发造成了很大负担。Hive Metastore仅维护partition级别的统计信息,在扩容metastore时甚至需要搭建分布式MySQL集群,维护困难、难以扩展。非partition字段不能做partition prune,对开发人员不友好。
3. 数据湖上云的挑战
统一的元数据中心:数据湖需要对数据做标准化的定义,数据源数量多的情况下如何管理来自异构存储的数据源是一个大问题。另一个问题是如何让数据湖与其他上层系统无缝对接,能让闭源和开源的计算服务访问到数据。在数据源多的情况下元数据信息多,如何保障元数据中心服务的可扩展性也是需要面对的问题。
多源实时数据入湖方案:数据入湖直接把Kafka的数据导入就好了,考虑多源实时入湖有意义吗?为不同的客户接入数据时数据入湖有很多需要考虑的细节。数据入湖需要隔离写入对分析的影响,例如需要考虑增加字段后,数据流如何不断开、分析作业能不崩溃。如何保证稳定可靠的端到端数据变更。数据实时入湖对系统的压力,如元数据激增、数据文件数量增长,如何保障数据分析性能。数据实时入湖和分析CDC数据如何做。
企业级的数据安全保证:私有化的大数据系统对数据加密和权限隔离天然地具有数据隔离的特点,但作为云上的数据湖服务提供商,就必须要提供严格的数据正确性、端到端数据加密、租户之间的权限隔离。例如用户向Iceberg内写数据,其他用户是否可以通过S3拿到数据?数据的安全性在公有云服务上是必须要保障的。
接下来我们详细介绍阿里云的Iceberg云原生数据湖解决方案。
1. Apache Iceberg核心特性
Iceberg是向通用化的方向设计,既不会特别面向某个存储设计、也不会特别面向哪个计算引擎设计,有利于设立Table Format的标准。
2. Apache Iceberg存储结构
Iceberg的data和metadata都是存储在文件系统中的。
3. Apache Iceberg Snapshot视图
从存储视角上看Iceberg分为三层。最底层是具体的文件,中间是每次transaction提交时生成的manifest对象,最上层是快照。快照指的是每个时间点看到的整个表全局数据。下面几张图有助于快速理解数据存储和快照:
第一次Transaction时候写了阴影部分的文件,读取也是读取阴影部分的数据。
第二次Transaction写入数据后,读到的数据是新的阴影部分数据。
第三次Transaction时候读取的是下图的阴影数据。
第四次Transaction时读取的是下图的阴影数据。
Iceberg写入的流程在Table Format的设计上考虑了流和批的两种场景。Iceberg把流和批的场景进行了融合,可以高效地做批量读取、流式增量读取、历史版本回溯,能做到较好的近实时处理。
4. 选择Apache Iceberg的公司数量众多
Netflix、阿里云、网易等公司都在不同场合分享过Iceberg的使用。
5. 阿里云如何解决Iceberg数据库上云的挑战?
在阿里云的Iceberg数据湖产品中,Iceberg作为Table Fomat位于一个中间核心位置,数据通过Flink写入阿里云OSS。数据湖的访问分为两部分,一部分是“入湖”,是不同数据源写入数据到数据湖中;另一部分是“出湖”,使用流分析、批分析甚至机器学习的方式分析数据,此时需要根据业务场景选择计算框架,如Hive、Presto、Spark等。
(1) 统一的元数据中心——DLF
元数据分为两部分,包括Database、Table和Table到location的映射关系;另一部分包含了Table内的数据的信息,包括Table的Schema、partition、partition内文件分布和文件内的Schema信息。对于云原生数据湖架构,Table以上的元数据由DLF Catalog管理,该服务对的外名称为阿里云数据湖构建(Data Lake Formation,DLF)产品。Iceberg对Table一下的信息做了很好的抽象,管理元数据方便,因此Table以下的元数据由Iceberg进行管理。
DLF做了统一的元数据管理,只要入湖,都可以在DLF中找到数据的表名、归属的Database和Catalog。有统一的元数据视图后,数据不会特别混乱。元数据还可以用来追踪数据的血缘关系,一份数据从A转换到B后各数据之间的依赖关系可以通过元数据管理工具做到数据血缘关系的追踪。
数据湖会和上游的计算框架、模型做对接,对接的入口就是DLF。例如Spark访问某一个表,表的Location对应着文件系统上的位置,通过DLF可以拿到该位置。
关于权限和认证问题,由于数据湖是一个统一数据中心,公司内部、不同组织的业务方做数据访问必须做数据隔离,否则机密数据可以被所有人访问会导致很多问题。DFL作为数据的入口,也负责管理数据访问权限。
(2) 多源实时数据入湖方案
隔离写一半的数据对分析作业的影响
写入作业如何隔离和分析作业的影响?阿里云主推Flink配合Iceberg,隔离同时读和写的影响。Flink可以很好地实现Exactly Once语义,在故障情况下也可以保障数据一致性的语义。Flink数据管道范围内可以保障数据不多一条、不少一条。Iceberg可以通过存储的ACID特性隔离写入对数据分析作业的影响。当不断有数据写入数据湖时,由于Iceberg的ACID特性,分析作业读取数据湖的表时一直在读Transaction内的数据,其他的写入任务不会影响读取的数据。最终数据的计算结果是一致性的结果。
保证稳定可靠的端到端数据变更
入湖容易遇到数据源、数据管道、数据湖之间的数据变更,最典型的场景是增加字段。如果要实现字段新增,旧的数据变更方案中,首先要在数据源中改掉字段;然后修改Flink任务,使Flink可以识别该字段;最后在数据湖中把数据湖中所有的数据都加上该字段。这个流程冗长,成本和开销比较大,我们尝试利用Flink配合Iceberg进行优化。
目前我们在推进的新方案是:Flink可以直接捕捉到上游DDL变更事件、转发变更事件到下游,下游收到事件后直接应用到数据湖。Iceberg支持Schema变更的能力,利用该功能可以瞬间完成Schema同步。用户不再需要感知到端到端数据变更,数据入湖的链路已经自动化地处理了Schema变更的问题。解放了管理数据湖入湖流程的人力。
实时数据入湖的副作用与应对方法
当遇到数据从小时级别延迟缩短到分钟级的改造需求时候,需要对数据入湖链路做实时化改造。实时化改造会遇到多种问题:数据要尽可能快地提交到数仓/数据湖,如果利用Hive做数据存储,Hive的元数据会快速增长,小文件数量很多、查询效率会变慢、元数据存储介质也需要进行响应改造和升级。
在阿里云的云原生数据湖方案中,Iceberg很好地解决了元数据中心化的问题。Iceberg的Table Format把Table内的元信息托管到文件系统中,因此元数据的扩展性与文件系统的扩展性近似。统计信息存储到对象存储中,非常方便地实现了可扩展性。
随着小文件增多,查询效率变低,有三种实现解决该问题:第一种方式是在Flink写入数据时按照数据湖设定的Bucket来Shuffle方式进行写入,一个Bucket的数据只有一个并发写入,写入时已经减小了小文件生成的概率。第二种方式是使用批处理任务,定期批量检查、合并小文件。第三种方式是自动增量合并小文件。Flink流式增量合并适合处理小量的增量合并,大规模合并会影响流作业的稳定性、对资源的开销比较大。流式入湖合并适合做小范围的文件合并,可以以较低的成本实现较好的性能,全表级别的小文件合并需要交给批作业,减小资源消耗。
实时入湖并处理CDC数据
当前市面上能够处理更改数据捕获(Change Data Capture,CDC)的存储不多、而且各有缺陷。
Flink是一个实时性很好的Exactly once的流式计算引擎,适合作为MySQL CDC与数据湖之间的管道。Iceberg能很好地在云上存储和分析CDC数据,利用开源的解决方案就可以做到CDC链路串通。
首先定义MySQL源表,再在Iceberg里定义Sink表,中间使用Flink作业即可利用Insert into语句从Mysql源表导入数据到Iceberg。生产环境中使用该方案依然可能遇到小文件多的问题,需要进行批量compaction;但批量compaction操作可能和流式入湖的任务有冲突,对语义有一定影响。在Iceberg社区对这个问题已经进行了很好的讨论,后续阿里云会把与此相关的优化工作推到社区。
(3) 企业级的数据安全保证
经过上述的MySQL源、Flink数据管道,数据进入数据湖,数据湖内的数据再提供给各个计算引擎,每个系统之间的串联比较复杂。如果发现数据有问题,最常出问题的点是数据入湖这一过程。数据一旦入湖,基本不会再对数据有进一步操作,因此核心要保障数据入湖的服务稳定、准确。
为了保障服务稳定性、数据正确性,我们专门为数据入湖出湖的整套流程设计了一套测试框架,对数据做了自动化的、严谨的端到端验证。我们会人为地产生一些数据,使用Flink入湖,再通过多计算引擎(Flink / Spark / Hive / Presto)对比验证数据的一致性;使用Flink入湖,多种批处理引擎验证源数据和计算引擎框架计算结果。测试框架已经应用到研发过程中,主要检查数据从源端到入湖的正确性,保障项目迭代过程中软件的准确性。
我们为了保障异常情况下的数据准确性,测试框架也会模拟一些异常情况,注入异常并检查数据正确性。我们注入的异常包括:JobManager重启或挂掉,一个TaskManager重启或挂掉,多个TaskManager重启或挂掉,无法正常访问HDFS / OSS服务,无法正常访问DLF / Hive-metastore服务,机器负载高、某个CPU利用率长期100%、某块磁盘访问慢、网络不可访问或丢包严重等错误注入等。
阿里云Iceberg云原生数据湖方案核心优势有以下四点:
1. 数据格式开放
完全兼容Iceberg Table Format格式。Parquet、ORC、Avro格式也可以很平滑地迁移到Iceberg。如果历史数据存储在OSS、HDFS,不需要做数据文件级别拷贝,只需要利用一个Spark或Flink作业,为存量数据parquet文件重新生成一份Iceberg元数据即可完成入湖。对parquet格式的存量数据用户也非常友好。
2.计算多样性
数据湖核心优势之一是能支持不同计算场景、满足用户的需求。计算多样性对业务场景的迭代和研发很灵活。
3. 资源弹性化
相对于自己部署的软件,云原生的弹性扩缩容特性实现了用户按需付费。
4. 服务专业性
阿里云每个细分产品都有几十到上百人的团队进行支持,用户可以享受到专业性很强的服务。
今天的分享就到这里,谢谢大家。
在文末分享、点赞、在看,给个3连击呗~
分享嘉宾:
🧐分享、点赞、在看,给个3连击呗!👇