「Hudi系列」Hudi查询&写入&常见问题汇总
2. 「Hudi系列」Apache Hudi入门指南 | SparkSQL+Hive+Presto集成
3. Apache Hudi 0.11 版本重磅发布,新特性速览!
1.Hudi基本概念 :
Apache Hudi(发音为“Hudi”)在DFS的数据集上提供以下流原语
插入更新 (如何改变数据集?) 增量拉取 (如何获取变更的数据?)
在本节中,我们将讨论重要的概念和术语,这些概念和术语有助于理解并有效使用这些原语。
时间轴
在它的核心,Hudi维护一条包含在不同的即时时间所有对数据集操作的时间轴,从而提供,从不同时间点出发得到不同的视图下的数据集。Hudi即时包含以下组件
操作类型 : 对数据集执行的操作类型 即时时间 : 即时时间通常是一个时间戳(例如:20190117010349),该时间戳按操作开始时间的顺序单调增加。 状态 : 即时的状态
Hudi保证在时间轴上执行的操作的原子性和基于即时时间的时间轴一致性。
执行的关键操作包括
COMMITS - 一次提交表示将一组记录原子写入到数据集中。 CLEANS - 删除数据集中不再需要的旧文件版本的后台活动。 DELTA_COMMIT - 增量提交是指将一批记录原子写入到MergeOnRead存储类型的数据集中,其中一些/所有数据都可以只写到增量日志中。 COMPACTION - 协调Hudi中差异数据结构的后台活动,例如:将更新从基于行的日志文件变成列格式。在内部,压缩表现为时间轴上的特殊提交。 ROLLBACK - 表示提交/增量提交不成功且已回滚,删除在写入过程中产生的所有部分文件。 SAVEPOINT - 将某些文件组标记为"已保存",以便清理程序不会将其删除。在发生灾难/数据恢复的情况下,它有助于将数据集还原到时间轴上的某个点。
任何给定的即时都可以处于以下状态之一
REQUESTED - 表示已调度但尚未启动的操作。 INFLIGHT - 表示当前正在执行该操作。 COMPLETED - 表示在时间轴上完成了该操作。
上面的示例显示了在Hudi数据集上大约10:00到10:20之间发生的更新事件,大约每5分钟一次,将提交元数据以及其他后台清理/压缩保留在Hudi时间轴上。
观察的关键点是:提交时间指示数据的到达时间(上午10:20),而实际数据组织则反映了实际时间或事件时间,即数据所反映的(从07:00开始的每小时时段)。在权衡数据延迟和完整性时,这是两个关键概念。
如果有延迟到达的数据(事件时间为9:00的数据在10:20达到,延迟 >1 小时),我们可以看到upsert将新数据生成到更旧的时间段/文件夹中。
在时间轴的帮助下,增量查询可以只提取10:00以后成功提交的新数据,并非常高效地只消费更改过的文件,且无需扫描更大的文件范围,例如07:00后的所有时间段。
文件组织
Hudi将DFS上的数据集组织到基本路径下的目录结构中。数据集分为多个分区,这些分区是包含该分区的数据文件的文件夹,这与Hive表非常相似。
每个分区被相对于基本路径的特定分区路径区分开来。
在每个分区内,文件被组织为文件组,由文件id唯一标识。
每个文件组包含多个文件切片,其中每个切片包含在某个提交/压缩即时时间生成的基本列文件(*.parquet)以及一组日志文件(*.log*),该文件包含自生成基本文件以来对基本文件的插入/更新。
Hudi采用MVCC设计,其中压缩操作将日志和基本文件合并以产生新的文件片,而清理操作则将未使用的/较旧的文件片删除以回收DFS上的空间。
Hudi通过索引机制将给定的hoodie键(记录键+分区路径)映射到文件组,从而提供了高效的Upsert。
一旦将记录的第一个版本写入文件,记录键和文件组/文件id之间的映射就永远不会改变。简而言之,映射的文件组包含一组记录的所有版本。
存储类型和视图
Hudi存储类型定义了如何在DFS上对数据进行索引和布局以及如何在这种组织之上实现上述原语和时间轴活动(即如何写入数据)。
反过来,视图定义了基础数据如何暴露给查询(即如何读取数据)。
存储类型
Hudi支持以下存储类型。
写时复制 : 仅使用列文件格式(例如parquet)存储数据。通过在写入过程中执行同步合并以更新版本并重写文件。 读时合并 : 使用列式(例如parquet)+ 基于行(例如avro)的文件格式组合来存储数据。更新记录到增量文件中,然后进行同步或异步压缩以生成列文件的新版本。
下表总结了这两种存储类型之间的权衡
视图
Hudi支持以下存储数据的视图
读优化视图 : 在此视图上的查询将查看给定提交或压缩操作中数据集的最新快照。该视图仅将最新文件切片中的基本/列文件暴露给查询,并保证与非Hudi列式数据集相比,具有相同的列式查询性能。 增量视图 : 对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据。该视图有效地提供了更改流,来支持增量数据管道。 实时视图 : 在此视图上的查询将查看某个增量提交操作中数据集的最新快照。该视图通过动态合并最新的基本文件(例如parquet)和增量文件(例如avro)来提供近实时数据集(几分钟的延迟)。
下表总结了不同视图之间的权衡。
写时复制存储
写时复制存储中的文件片仅包含基本/列文件,并且每次提交都会生成新版本的基本文件。
换句话说,我们压缩每个提交,从而所有的数据都是以列数据的形式储存。在这种情况下,写入数据非常昂贵(我们需要重写整个列数据文件,即使只有一个字节的新数据被提交),而读取数据的成本则没有增加。
这种视图有利于读取繁重的分析工作。
以下内容说明了将数据写入写时复制存储并在其上运行两个查询时,它是如何工作的。
随着数据的写入,对现有文件组的更新将为该文件组生成一个带有提交即时时间标记的新切片,而插入分配一个新文件组并写入该文件组的第一个切片。
这些文件切片及其提交即时时间在上面用颜色编码。
针对这样的数据集运行SQL查询(例如:select count(*)统计该分区中的记录数目),首先检查时间轴上的最新提交并过滤每个文件组中除最新文件片以外的所有文件片。
如您所见,旧查询不会看到以粉红色标记的当前进行中的提交的文件,但是在该提交后的新查询会获取新数据。因此,查询不受任何写入失败/部分写入的影响,仅运行在已提交数据上。
写时复制存储的目的是从根本上改善当前管理数据集的方式,通过以下方法来实现
优先支持在文件级原子更新数据,而无需重写整个表/分区 能够只读取更新的部分,而不是进行低效的扫描或搜索 严格控制文件大小来保持出色的查询性能(小的文件会严重损害查询性能)。
读时合并存储
读时合并存储是写时复制的升级版,从某种意义上说,它仍然可以通过读优化表提供数据集的读取优化视图(写时复制的功能)。
此外,它将每个文件组的更新插入存储到基于行的增量日志中,通过文件id,将增量日志和最新版本的基本文件进行合并,从而提供近实时的数据查询。因此,此存储类型智能地平衡了读和写的成本,以提供近乎实时的查询。
这里最重要的一点是压缩器,它现在可以仔细挑选需要压缩到其列式基础文件中的增量日志(根据增量日志的文件大小),以保持查询性能(较大的增量日志将会提升近实时的查询时间,并同时需要更长的合并时间)。
以下内容说明了存储的工作方式,并显示了对近实时表和读优化表的查询。
此示例中发生了很多有趣的事情,这些带出了该方法的微妙之处。
现在,我们每1分钟左右就有一次提交,这是其他存储类型无法做到的。 现在,在每个文件id组中,都有一个增量日志,其中包含对基础列文件中记录的更新。在示例中,增量日志包含10:05至10:10的所有数据。与以前一样,基本列式文件仍使用提交进行版本控制。因此,如果只看一眼基本文件,那么存储布局看起来就像是写时复制表的副本。 定期压缩过程会从增量日志中合并这些更改,并生成基础文件的新版本,就像示例中10:05发生的情况一样。 有两种查询同一存储的方式:读优化(RO)表和近实时(RT)表,具体取决于我们选择查询性能还是数据新鲜度。 对于RO表来说,提交数据在何时可用于查询将有些许不同。请注意,以10:10运行的(在RO表上的)此类查询将不会看到10:05之后的数据,而在RT表上的查询总会看到最新的数据。 何时触发压缩以及压缩什么是解决这些难题的关键。通过实施压缩策略,在该策略中,与较旧的分区相比,我们会积极地压缩最新的分区,从而确保RO表能够以一致的方式看到几分钟内发布的数据。
读时合并存储上的目的是直接在DFS上启用近实时处理,而不是将数据复制到专用系统,后者可能无法处理大数据量。
该存储还有一些其他方面的好处,例如通过避免数据的同步合并来减少写放大,即批量数据中每1字节数据需要的写入数据量。
2.写入Hudi:
写操作
在此之前,了解Hudi数据源及delta streamer工具提供的三种不同的写操作以及如何最佳利用它们可能会有所帮助。这些操作可以在针对数据集发出的每个提交/增量提交中进行选择/更改。
UPSERT(插入更新) :这是默认操作,在该操作中,通过查找索引,首先将输入记录标记为插入或更新。在运行启发式方法以确定如何最好地将这些记录放到存储上,如优化文件大小之类后,这些记录最终会被写入。对于诸如数据库更改捕获之类的用例,建议该操作,因为输入几乎肯定包含更新。 INSERT(插入) :就使用启发式方法确定文件大小而言,此操作与插入更新(UPSERT)非常相似,但此操作完全跳过了索引查找步骤。因此,对于日志重复数据删除等用例(结合下面提到的过滤重复项的选项),它可以比插入更新快得多。插入也适用于这种用例,这种情况数据集可以允许重复项,但只需要Hudi的事务写/增量提取/存储管理功能。 BULK_INSERT(批插入) :插入更新和插入操作都将输入记录保存在内存中,以加快存储优化启发式计算的速度(以及其它未提及的方面)。所以对Hudi数据集进行初始加载/引导时这两种操作会很低效。批量插入提供与插入相同的语义,但同时实现了基于排序的数据写入算法,该算法可以很好地扩展数百TB的初始负载。但是,相比于插入和插入更新能保证文件大小,批插入在调整文件大小上只能尽力而为。
DeltaStreamer
HoodieDeltaStreamer实用工具 (hudi-utilities-bundle中的一部分) 提供了从DFS或Kafka等不同来源进行摄取的方式,并具有以下功能。
从Kafka单次摄取新事件,从Sqoop、HiveIncrementalPuller输出或DFS文件夹中的多个文件增量导入 支持json、avro或自定义记录类型的传入数据 管理检查点,回滚和恢复 利用DFS或Confluent schema注册表的Avro模式。 支持自定义转换操作
命令行选项更详细地描述了这些功能:
[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` --help
Usage: [options]
Options:
--commit-on-errors
Commit even when some records failed to be written
Default: false
--enable-hive-sync
Enable syncing to hive
Default: false
--filter-dupes
Should duplicate records from source be dropped/filtered outbefore
insert/bulk-insert
Default: false
--help, -h
--hudi-conf
Any configuration that can be set in the properties file (using the CLI
parameter "--propsFilePath") can also be passed command line using this
parameter
Default: []
--op
Takes one of these values : UPSERT (default), INSERT (use when input is
purely new data/inserts to gain speed)
Default: UPSERT
Possible Values: [UPSERT, INSERT, BULK_INSERT]
--payload-class
subclass of HoodieRecordPayload, that works off a GenericRecord.
Implement your own, if you want to do something other than overwriting
existing value
Default: org.apache.hudi.OverwriteWithLatestAvroPayload
--props
path to properties file on localfs or dfs, with configurations for
Hudi client, schema provider, key generator and data source. For
Hudi client props, sane defaults are used, but recommend use to
provide basic things like metrics endpoints, hive configs etc. For
sources, referto individual classes, for supported properties.
Default: file:///Users/vinoth/bin/hoodie/src/test/resources/delta-streamer-config/dfs-source.properties
--schemaprovider-class
subclass of org.apache.hudi.utilities.schema.SchemaProvider to attach
schemas to input & target table data, built in options:
FilebasedSchemaProvider
Default: org.apache.hudi.utilities.schema.FilebasedSchemaProvider
--source-class
Subclass of org.apache.hudi.utilities.sources to read data. Built-in
options: org.apache.hudi.utilities.sources.{JsonDFSSource (default),
AvroDFSSource, JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}
Default: org.apache.hudi.utilities.sources.JsonDFSSource
--source-limit
Maximum amount of data to read from source. Default: No limit For e.g:
DFSSource => max bytes to read, KafkaSource => max events to read
Default: 9223372036854775807
--source-ordering-field
Field within source record to decide how to break ties between records
with same key in input data. Default: 'ts' holding unix timestamp of
record
Default: ts
--spark-master
spark master to use.
Default: local[2]
* --target-base-path
base path for the target Hudi dataset. (Will be created if did not
exist first time around. If exists, expected to be a Hudi dataset)
* --target-table
name of the target table in Hive
--transformer-class
subclass of org.apache.hudi.utilities.transform.Transformer. UDF to
transform raw source dataset to a target dataset (conforming to target
schema) before writing. Default : Not set. E:g -
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which
allows a SQL query template to be passed as a transformation function)
该工具采用层次结构组成的属性文件,并具有可插拔的接口,用于提取数据、生成密钥和提供模式。
从Kafka和DFS摄取数据的示例配置在这里:hudi-utilities/src/test/resources/delta-streamer-config
。
例如:当您让Confluent Kafka、Schema注册表启动并运行后,可以用这个命令产生一些测试数据(impressions.avro,由schema-registry代码库提供)
[confluent-5.0.0]$ bin/ksql-datagen schema=../impressions.avro format=avro topic=impressions key=impressionid
然后用如下命令摄取这些数据。
[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \
--props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field impresssiontime \
--target-base-path file:///tmp/hudi-deltastreamer-op --target-table uber.impressions \
--op BULK_INSERT
在某些情况下,您可能需要预先将现有数据集迁移到Hudi。请参考迁移指南。
Datasource Writer
hudi-spark
模块提供了DataSource API,可以将任何数据帧写入(也可以读取)到Hudi数据集中。以下是在指定需要使用的字段名称的之后,如何插入更新数据帧的方法,这些字段包括recordKey => _row_key、partitionPath => partition和precombineKey => timestamp
inputDF.write()
.format("org.apache.hudi")
.options(clientOpts) // 可以传入任何Hudi客户端参数
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.mode(SaveMode.Append)
.save(basePath);
与Hive同步
上面的两个工具都支持将数据集的最新模式同步到Hive Metastore,以便查询新的列和分区。如果需要从命令行或在独立的JVM中运行它,Hudi提供了一个HiveSyncTool
,在构建了hudi-hive模块之后,可以按以下方式调用它。
cd hudi-hive
./run_sync_tool.sh
[hudi-hive]$ ./run_sync_tool.sh --help
Usage: [options]
Options:
* --base-path
Basepath of Hudi dataset to sync
* --database
name of the target database in Hive
--help, -h
Default: false
* --jdbc-url
Hive jdbc connect url
* --pass
Hive password
* --table
name of the target table in Hive
* --user
Hive username
删除数据
通过允许用户指定不同的数据记录负载实现,Hudi支持对存储在Hudi数据集中的数据执行两种类型的删除。
Soft Deletes(软删除) :使用软删除时,用户希望保留键,但仅使所有其他字段的值都为空。通过确保适当的字段在数据集模式中可以为空,并在将这些字段设置为null之后直接向数据集插入更新这些记录,即可轻松实现这一点。 Hard Deletes(硬删除) :这种更强形式的删除是从数据集中彻底删除记录在存储上的任何痕迹。这可以通过触发一个带有自定义负载实现的插入更新来实现,这种实现可以使用总是返回Optional.Empty作为组合值的DataSource或DeltaStreamer。Hudi附带了一个内置的 org.apache.hudi.EmptyHoodieRecordPayload
类,它就是实现了这一功能。
deleteDF // 仅包含要删除的记录的数据帧
.write().format("org.apache.hudi")
.option(...) // 根据设置需要添加HUDI参数,例如记录键、分区路径和其他参数
// 指定record_key,partition_key,precombine_fieldkey和常规参数
.option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")
存储管理
Hudi还对存储在Hudi数据集中的数据执行几个关键的存储管理功能。在DFS上存储数据的关键方面是管理文件大小和数量以及回收存储空间。例如,HDFS在处理小文件上性能很差,这会对Name Node的内存及RPC施加很大的压力,并可能破坏整个集群的稳定性。通常,查询引擎可在较大的列文件上提供更好的性能,因为它们可以有效地摊销获得列统计信息等的成本。即使在某些云数据存储上,列出具有大量小文件的目录也常常比较慢。
以下是一些有效管理Hudi数据集存储的方法。
Hudi中的小文件处理功能,可以分析传入的工作负载并将插入内容分配到现有文件组中,而不是创建新文件组。新文件组会生成小文件。 可以配置Cleaner来清理较旧的文件片,清理的程度可以调整,具体取决于查询所需的最长时间和增量拉取所需的回溯。 用户还可以调整基础/parquet文件、日志文件的大小和预期的压缩率,使足够数量的插入被分到同一个文件组中,最终产生大小合适的基础文件。 智能调整批插入并行度,可以产生大小合适的初始文件组。实际上,正确执行此操作非常关键,因为文件组一旦创建后就不能删除,只能如前所述对其进行扩展。 对于具有大量更新的工作负载,读取时合并存储提供了一种很好的机制,可以快速将其摄取到较小的文件中,之后通过压缩将它们合并为较大的基础文件。
3.查询Hudi:
从概念上讲,Hudi物理存储一次数据到DFS上,同时在其上提供三个逻辑视图,如之前所述。
数据集同步到Hive Metastore后,它将提供由Hudi的自定义输入格式支持的Hive外部表。一旦提供了适当的Hudi捆绑包,就可以通过Hive、Spark和Presto之类的常用查询引擎来查询数据集。
具体来说,在写入过程中传递了两个由table name命名的Hive表。例如,如果table name = hudi_tbl,我们得到
hudi_tbl 实现了由 HoodieParquetInputFormat 支持的数据集的读优化视图,从而提供了纯列式数据。
hudi_tbl_rt 实现了由 HoodieParquetRealtimeInputFormat 支持的数据集的实时视图,从而提供了基础数据和日志数据的合并视图。
如概念部分所述,增量处理所需要的一个关键原语是增量拉取(以从数据集中获取更改流/日志)。您可以增量提取Hudi数据集,这意味着自指定的即时时间起,您可以只获得全部更新和新行。这与插入更新一起使用,对于构建某些数据管道尤其有用,包括将1个或多个源Hudi表(数据流/事实)以增量方式拉出(流/事实)并与其他表(数据集/维度)结合以写出增量到目标Hudi数据集。增量视图是通过查询上表之一实现的,并具有特殊配置,该特殊配置指示查询计划仅需要从数据集中获取增量数据。
接下来,我们将详细讨论在每个查询引擎上如何访问所有三个视图。
Hive
为了使Hive能够识别Hudi数据集并正确查询,HiveServer2需要在其辅助jars路径中提供hudi-hadoop-mr-bundle-x.y.z-SNAPSHOT.jar
。这将确保输入格式类及其依赖项可用于查询计划和执行。
读优化表 {#hive-ro-view}
除了上述设置之外,对于beeline cli访问,还需要将hive.input.format
变量设置为org.apache.hudi.hadoop.HoodieParquetInputFormat
输入格式的完全限定路径名。对于Tez,还需要将hive.tez.input.format
设置为org.apache.hadoop.hive.ql.io.HiveInputFormat
。
实时表 {#hive-rt-view}
除了在HiveServer2上安装Hive捆绑jars之外,还需要将其放在整个集群的hadoop/hive安装中,这样查询也可以使用自定义RecordReader。
增量拉取 {#hive-incr-pull}
HiveIncrementalPuller
允许通过HiveQL从大型事实/维表中增量提取更改, 结合了Hive(可靠地处理复杂的SQL查询)和增量原语的好处(通过增量拉取而不是完全扫描来加快查询速度)。该工具使用Hive JDBC运行hive查询并将其结果保存在临时表中,这个表可以被插入更新。Upsert实用程序(HoodieDeltaStreamer
)具有目录结构所需的所有状态,以了解目标表上的提交时间应为多少。例如:/app/incremental-hql/intermediate/{source_table_name}_temp/{last_commit_included}。已注册的Delta Hive表的格式为{tmpdb}.{source_table}_{last_commit_included}
。
以下是HiveIncrementalPuller的配置选项
| 配置 | 描述 | 默认值 | |hiveUrl| 要连接的Hive Server 2的URL | | |hiveUser| Hive Server 2 用户名 | | |hivePass| Hive Server 2 密码 | | |queue| YARN 队列名称 | | |tmp| DFS中存储临时增量数据的目录。目录结构将遵循约定。请参阅以下部分。| | |extractSQLFile| 在源表上要执行的提取数据的SQL。提取的数据将是自特定时间点以来已更改的所有行。| | |sourceTable| 源表名称。在Hive环境属性中需要设置。| | |targetTable| 目标表名称。中间存储目录结构需要。| | |sourceDataPath| 源DFS基本路径。这是读取Hudi元数据的地方。| | |targetDataPath| 目标DFS基本路径。这是计算fromCommitTime所必需的。如果显式指定了fromCommitTime,则不需要设置这个参数。| | |tmpdb| 用来创建中间临时增量表的数据库 | hoodie_temp | |fromCommitTime| 这是最重要的参数。这是从中提取更改的记录的时间点。| | |maxCommits| 要包含在拉取中的提交数。将此设置为-1将包括从fromCommitTime开始的所有提交。将此设置为大于0的值,将包括在fromCommitTime之后仅更改指定提交次数的记录。如果您需要一次赶上两次提交,则可能需要这样做。| 3 | |help| 实用程序帮助 | | 设置fromCommitTime=0和maxCommits=-1将提取整个源数据集,可用于启动Backfill。如果目标数据集是Hudi数据集,则该实用程序可以确定目标数据集是否没有提交或延迟超过24小时(这是可配置的),它将自动使用Backfill配置,因为增量应用最近24小时的更改会比Backfill花费更多的时间。该工具当前的局限性在于缺乏在混合模式(正常模式和增量模式)下自联接同一表的支持。
关于使用Fetch任务执行的Hive查询的说明:由于Fetch任务为每个分区调用InputFormat.listStatus(),每个listStatus()调用都会列出Hoodie元数据。为了避免这种情况,如下操作可能是有用的,即使用Hive session属性对增量查询禁用Fetch任务:set hive.fetch.task.conversion = none
;。这将确保Hive查询使用Map Reduce执行, 合并分区(用逗号分隔),并且对所有这些分区仅调用一次InputFormat.listStatus()。
Spark
Spark可将Hudi jars和捆绑包轻松部署和管理到作业/笔记本中。简而言之,通过Spark有两种方法可以访问Hudi数据集。
Hudi DataSource:支持读取优化和增量拉取,类似于标准数据源(例如:spark.read.parquet)的工作方式。
以Hive表读取:支持所有三个视图,包括实时视图,依赖于自定义的Hudi输入格式(再次类似Hive)。
通常,您的spark作业需要依赖hudi-spark或hudi-spark-bundle-x.y.z.jar, 它们必须位于驱动程序和执行程序的类路径上(提示:使用--jars参数)。
读优化表 {#spark-ro-view}
要使用SparkSQL将RO表读取为Hive表,只需按如下所示将路径过滤器推入sparkContext。对于Hudi表,该方法保留了Spark内置的读取Parquet文件的优化功能,例如进行矢量化读取。
spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]);
如果您希望通过数据源在DFS上使用全局路径,则只需执行以下类似操作即可得到Spark数据帧。
Dataset hoodieROViewDF = spark.read().format("org.apache.hudi")
// pass any path glob, can include hudi & non-hudi datasets
.load("/glob/path/pattern");
实时表 {#spark-rt-view}
当前,实时表只能在Spark中作为Hive表进行查询。为了做到这一点,设置spark.sql.hive.convertMetastoreParquet = false
, 迫使Spark回退到使用Hive Serde读取数据(计划/执行仍然是Spark)。
$ spark-shell --jars hudi-spark-bundle-x.y.z-SNAPSHOT.jar --driver-class-path /etc/hive/conf --packages com.databricks:spark-avro_2.11:4.0.0 --conf spark.sql.hive.convertMetastoreParquet=false --num-executors 10 --driver-memory 7g --executor-memory 2g --master yarn-client
scala> sqlContext.sql("select count(*) from hudi_rt where datestr = '2016-10-02'").show()
增量拉取 {#spark-incr-pull}
hudi-spark
模块提供了DataSource API,这是一种从Hudi数据集中提取数据并通过Spark处理数据的更优雅的方法。如下所示是一个示例增量拉取,它将获取自beginInstantTime
以来写入的所有记录。
Dataset hoodieIncViewDF = spark.read()
.format("org.apache.hudi")
.option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY(),
DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(),
)
.load(tablePath); // For incremental view, pass in the root/base path of dataset
请参阅设置部分,以查看所有数据源选项。
另外,HoodieReadClient通过Hudi的隐式索引提供了以下功能。
| API | 描述 | | read(keys) | 使用Hudi自己的索通过快速查找将与键对应的数据作为DataFrame读出 | | filterExists() | 从提供的RDD[HoodieRecord]中过滤出已经存在的记录。对删除重复数据有用 | | checkExists(keys) | 检查提供的键是否存在于Hudi数据集中 |
Presto
Presto是一种常用的查询引擎,可提供交互式查询性能。Hudi RO表可以在Presto中无缝查询。这需要在整个安装过程中将hudi-presto-bundle jar放入
。
4.Hudi常见问题:
1. ApacheHudi对个人和组织何时有用
如果你希望将数据快速提取到HDFS或云存储中,Hudi可以提供帮助。另外,如果你的ETL /hive/spark作业很慢或占用大量资源,那么Hudi可以通过提供一种增量式读取和写入数据的方法来提供帮助。
作为一个组织,Hudi可以帮助你构建高效的数据湖,解决一些最复杂的底层存储管理问题,同时将数据更快地交给数据分析师,工程师和科学家。
2. Hudi不打算达成的目标
Hudi不是针对任何OLTP案例而设计的,在这些情况下,通常你使用的是现有的NoSQL / RDBMS数据存储。Hudi无法替代你的内存分析数据库(至少现在还没有!)。Hudi支持在几分钟内实现近乎实时的摄取,从而权衡了延迟以进行有效的批处理。如果确实希望亚-分钟处理延迟,请使用你最喜欢的流处理解决方案。
3. 什么是增量处理?为什么Hudi一直在谈论它
增量处理是由Vinoth Chandar在O'reilly博客中首次引入的,博客中阐述了大部分工作。用纯粹的技术术语来说,增量处理仅是指以流处理方式编写微型批处理程序。典型的批处理作业每隔几个小时就会消费所有输入并重新计算所有输出。典型的流处理作业会连续/每隔几秒钟消费一些新的输入并重新计算新的/更改以输出。尽管以批处理方式重新计算所有输出可能会更简单,但这很浪费并且耗费昂贵的资源。Hudi具有以流方式编写相同批处理管道的能力,每隔几分钟运行一次。
虽然可将其称为流处理,但我们更愿意称其为增量处理,以区别于使用Apache Flink,Apache Apex或Apache Kafka Streams构建的纯流处理管道。
4. 写时复制(COW)与读时合并(MOR)存储类型之间有什么区别
写时复制(Copy On Write):此存储类型使客户端能够以列式文件格式(当前为parquet)摄取数据。使用COW存储类型时,任何写入Hudi数据集的新数据都将写入新的parquet文件。更新现有的行将导致重写整个parquet文件(这些parquet文件包含要更新的受影响的行)。因此,所有对此类数据集的写入都受parquet写性能的限制,parquet文件越大,摄取数据所花费的时间就越长。
读时合并(Merge On Read):此存储类型使客户端可以快速将数据摄取为基于行(如avro)的数据格式。使用MOR存储类型时,任何写入Hudi数据集的新数据都将写入新的日志/增量文件,这些文件在内部将数据以avro进行编码。压缩(Compaction)过程(配置为嵌入式或异步)将日志文件格式转换为列式文件格式(parquet)。
两种不同的格式提供了两种不同视图(读优化视图和实时视图),读优化视图取决于列式parquet文件的读取性能,而实时视图取决于列式和/或日志文件的读取性能。
更新现有的行将导致:a)写入从以前通过压缩(Compaction)生成的基础parquet文件对应的日志/增量文件更新;或b)在未进行压缩的情况下写入日志/增量文件的更新。因此,对此类数据集的所有写入均受avro /日志文件写入性能的限制,其速度比parquet快得多(写入时需要复制)。虽然,与列式(parquet)文件相比,读取日志/增量文件需要更高的成本(读取时需要合并)。
5. 如何为工作负载选择存储类型
Hudi的主要目标是提供更新功能,该功能比重写整个表或分区要快几个数量级。如果满足以下条件,则选择写时复制(COW)存储:
寻找一种简单的替换现有的parquet表的方法,而无需实时数据。 当前的工作流是重写整个表/分区以处理更新,而每个分区中实际上只有几个文件发生更改。 想使操作更为简单(无需压缩等),并且摄取/写入性能仅受parquet文件大小以及受更新影响文件数量限制 工作流很简单,并且不会突然爆发大量更新或插入到较旧的分区。COW写入时付出了合并成本,因此,这些突然的更改可能会阻塞摄取,并干扰正常摄取延迟目标。
如果满足以下条件,则选择读时合并(MOR)存储:
希望数据尽快被摄取并尽可能快地可被查询。 工作负载可能会突然出现模式的峰值/变化(例如,对上游数据库中较旧事务的批量更新导致对DFS上旧分区的大量更新)。异步压缩(Compaction)有助于缓解由这种情况引起的写放大,而正常的提取则需跟上上游流的变化。
不管选择何种存储,Hudi都将提供:
快照隔离和原子写入批量记录 增量拉取 重复数据删除能力
6. Hudi是分析型数据库吗
典型的数据库有一些长时间运行的服务器,以便提供读写服务。Hudi的体系结构与之不同,它高度解耦读写,为对应扩容挑战可以独立扩展写入和查询/读取。因此,它可能并不总是像数据库一样。
尽管如此,Hudi的设计非常像数据库,并提供类似的功能(更新,更改捕获)和语义(事务性写入,快照隔离读取)。
7. 如何对存储在Hudi中的数据建模
在将数据写入Hudi时,可以像在键-值存储上那样对记录进行建模:指定键字段(对于单个分区/整个数据集是唯一的),分区字段(表示要放置键的分区)和preCombine/combine逻辑(用于指定如何处理一批写入记录中的重复记录)。该模型使Hudi可以强制执行主键约束,就像在数据库表上一样。请参阅此处的示例。
当查询/读取数据时,Hudi只是将自己显示为一个类似于json的层次表,每个人都习惯于使用Hive/Spark/Presto 来对Parquet/Json/Avro进行查询。
8. Hudi是否支持云存储/对象存储
一般来说,Hudi能够在任何Hadoop文件系统实现上提供该功能,因此可以在Cloud Store(Amazon S3或Microsoft Azure或Google Cloud Storage)上读写数据集。Hudi还进行了特定的设计,使在云上构建Hudi数据集变得非常容易,例如S3的一致性检查,数据文件涉及的零移动/重命名。
9. Hudi支持Hive/Spark/Hadoop的哪些版本
从2019年9月开始,Hudi可以支持Spark 2.1 +,Hive 2.x,Hadoop 2.7+(非Hadoop 3)。
10. Hudi如何在数据集中实际存储数据
从更高层次上讲,Hudi基于MVCC设计,将数据写入parquet/基本文件以及包含对基本文件所做更改的日志文件的不同版本。所有文件都以数据集的分区模式存储,这与Apache Hive表在DFS上的布局方式非常相似。
11. 如何写入Hudi数据集
通常,你会从源获取部分更新/插入,然后对Hudi数据集执行写入操作。如果从其他标准来源(如Kafka或tailf DFS)中提取数据,那么DeltaStreamer将会非常有用,其提供了一种简单的自我管理解决方案,可将数据写入Hudi。你还可以自己编写代码,使用Spark数据源API从自定义源获取数据,并使用Hudi数据源写入Hudi。
12. 如何部署Hudi作业
写入Hudi的好处是它可以像在YARN/Mesos甚至是K8S群集上运行的任何其他Spark作业一样运行。只需使用Spark UI即可查看写入操作,而无需单独搭建Hudi集群。
13. 如何查询刚写入的Hudi数据集
除非启用了Hive同步,否则与其他任何源一样,通过上述方法写入Hudi的数据集可以简单地通过Spark数据源进行查询。
val hoodieROView =spark.read.format("org.apache.hudi").load(basePath +"/path/to/partitions/*")
val hoodieIncViewDF =spark.read().format("org.apache.hudi")
.option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY(), DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), )
.load(basePath);
请注意:当前不支持从Spark数据源读取实时视图。请使用下面的Hive路径。
如果在deltastreamer工具或数据源中启用了Hive Sync,则该数据集会同步到Hive的几张表中,可以使用HiveQL,Presto或SparkSQL进行读取。点击这里查看更多。
14. Hudi如何处理输入中的重复记录
在数据集上执行 upsert操作时,提供的记录包含给定键的多条记录,然后通过重复调用有效负载类的 preCombine方法将所有记录合并为一个最终值。默认情况下会选择最大值的记录(由 compareTo决定)。
对于 insert或 bulk_insert操作,不执行 preCombine。因此,如果你的输入包含重复项,则数据集也将包含重复项。如果您不希望重复的记录,请使用upsert或在数据源或deltastreamer中指定删除重复数据的配置项。
15. 可以实现自定义合并逻辑处理输入记录和存储的记录吗
与上面类似,定义有效负载类定义的方法(combineAndGetUpdateValue(),getInsertValue()),这些方法控制如何将存储的记录与输入的更新/插入组合以生成最终值以写回到存储中。
16. 如何删除数据集中的记录
GDPR使删除成为数据管理工具箱中的必备工具。Hudi支持软删除和硬删除。
17. 如何将数据迁移到Hudi
Hudi对迁移提供了内置支持,可使用 hudi-cli提供的 HDFSParquetImporter工具将整个数据集一次性写入Hudi。也可以使用Spark数据源API读取和写入数据集。迁移后,可以使用此处讨论的常规方法执行写操作。这里也详细讨论该问题,包括部分迁移的方法。
18. 如何将Hudi配置传递给Spark作业
这里涵盖了数据源和Hudi写入客户端(deltastreamer和数据源都会内部调用)的配置项。在DeltaStreamer之类的工具上调用 --help都会打印所有使用选项。许多控制 upsert、调整文件大小的选项是在客户端级别定义的,下面是将它们传递给可用于写数据配置项的方式。
1). 对于Spark DataSource,可以使用DataFrameWriter的 options API来传递这些配置项。
inputDF.write().format("org.apache.hudi")
.options(clientOpts)// any of the Hudi client opts can be passed in as well
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),"_row_key")
...
2). 直接使用HoodieWriteClient时,只需使用配置来构造HoodieWriteConfig对象。
3). 使用HoodieDeltaStreamer工具提取时,可以在属性文件中设置配置项,并将该文件作为命令行参数 --props传递。
19. 可以在Apache Hive Metastore中注册Hudi数据集吗
可以, 可以通过独立的Hive Sync工具或使用deltastreamer工具或数据源中的选项来执行此操作。
20. Hudi索引的工作原理及其好处是什么
索引是Hudi写入的关键部分,它始终将给定的 recordKey映射到Hudi内部的文件组( FileGroup)。这样可以更快地识别受给定写入操作影响的文件组。
Hudi支持以下几种索引配置
HoodieBloomIndex(默认):使用bloom过滤器和范围信息,并在parquet/基础文件(不久后的日志文件也支持)的页脚中放置该信息。
HoodieGlobalBloomIndex:默认索引仅在单个分区内强制执行键的唯一性,即要求用户知道存储给定记录键的分区。这可以帮助非常大的数据集很好地建立索引。但是,在某些情况下,可能需要在所有分区上执行重复数据删除/强制唯一性操作,这就需要全局索引。如果使用此选项,则将传入记录与整个数据集中的文件进行比较,并确保仅在一个分区中存在 recordKey。
HBaseIndex:Apache HBase是一个键值存储,可以将索引存储在HBase内,如果已经在使用HBase,这将会非常方便。
也可以自定义索引,需要实现HoodieIndex类并在配置中配置索引类名称。
21. Hudi Cleaner是做什么的
Hudi Cleaner(清理程序)通常在 commit和 deltacommit之后立即运行,删除不再需要的旧文件。如果在使用增量拉取功能,请确保配置了清理项来保留足够数量的commit(提交),以便可以回退,另一个考虑因素是为长时间运行的作业提供足够的时间来完成运行。否则,Cleaner可能会删除该作业正在读取或可能被其读取的文件,并使该作业失败。通常,默认配置为10会允许每30分钟运行一次提取,以保留长达5(10 * 0.5)个小时的数据。如果以繁进行摄取,或者为查询提供更多运行时间,可增加 hoodie.cleaner.commits.retained
配置项的值。
22. Hudi的模式演进(schema evolution)是什么
Hudi使用 Avro
作为记录的内部表示形式,这主要是由于其良好的架构兼容性和演进特性。这也是摄取或ETL管道保持可靠的关键所在。只要传递给Hudi的模式(无论是在DeltaStreamer
显示提供还是由SparkDatasource
的Dataset
模式隐式)向后兼容(例如不删除任何字段,仅追加新字段),Hudi将无缝处理新旧数据的的读/写操作并会保持Hive模式为最新。
23. 如何压缩(compaction)MOR数据集
在MOR数据集上进行压缩的最简单方法是运行内联压缩(compaction inline),但需要花费更多时间。通常情况下,当有少量的迟到数据落入旧分区时,这可能特别有用,在这种情况下,你可能想压缩最后的N个分区,同时等待较旧的分区积累足够的日志。其最终会将大多数最新数据转化查询优化的列格式,即从日志log文件转化为parquet文件。
还可异步运行压缩,这可以通过单独压缩任务来完成。如果使用的是 DeltaStreamer,则可以在连续模式下运行压缩,在该模式下,会在单个spark任务内同时进行摄取和压缩。
24. Hudi写入的性能/最大延迟
写入Hudi的速度在写入操作以及在调整文件大小做了权衡。就像数据库在磁盘上的直接/原始文件产生I/O开销一样,与读取/写入原始DFS文件或支持数据库之类的功能相比,Hudi可能会产生开销。Hudi采用了数据库文献中的技术,以使这些开销最少,具体可参考下表。
与许多管理时间序列数据的系统一样,如果键具有时间戳前缀或单调增加/减少,则Hudi的性能会更好,而我们几乎总是可以实现这一目标。即便是UUID密钥,也可以按照以下技巧来获得有序的密钥另请参阅调优指南以获取有关JVM和其他配置的更多提示。
25. Hudi读取/查询的性能
对于读优化视图(Read optimized views),可以达到Hive/Spark/Presto的parquet表相同的查询性能。 对于增量视图( Incremental views),相对于全表扫描所花费的时间,速度更快。例如,如果在最后一个小时中,在1000个文件的分区中仅更改了100个文件,那么与完全扫描该分区以查找新数据相比,使用Hudi中的增量拉取可以将速度提高10倍。 对于实时视图(Real time views),性能类似于Hive/Spark/Presto中Avro格式的表。
26. 如何避免创建大量小文件
Hudi的一项关键设计是避免创建小文件,并且始终写入适当大小的文件,其会在摄取/写入上花费更多时间以保持查询的高效。写入非常小的文件然后进行合并的方法只能解决小文件带来的系统可伸缩性问题,其无论如何都会因为小文件而降低查询速度。
执行插入更新/插入操作时,Hudi可以配置文件大小。(注意:bulk_insert操作不提供此功能,其设计为用来替代 spark.write.parquet
。)
对于写时复制,可以配置基本/parquet文件的最大大小和软限制,小于限制的为小文件。Hudi将在写入时会尝试将足够的记录添加到一个小文件中,以使其达到配置的最大限制。例如,对于 compactionSmallFileSize=100MB
和 limitFileSize=120MB
,Hudi将选择所有小于100MB的文件,并尝试将其增加到120MB。
对于读时合并,几乎没有其他配置。可以配置最大日志大小和一个因子,该因子表示当数据从avro转化到parquet文件时大小减小量。
HUDI-26
将较小的文件组合并成较大的文件组,从而提升提升性能。
27. 如何使用DeltaStreamer或Spark DataSource API写入未分区的Hudi数据集
Hudi支持写入未分区数据集。如果要写入未分区的Hudi数据集并执行配置单元表同步,需要在传递的属性中设置以下配置:
hoodie.datasource.write.keygenerator.class=org.apache.hudi.NonpartitionedKeyGenerator
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
28. 为什么必须进行两种不同的配置才能使Spark与Hudi配合使用
非Hive引擎倾向于自己列举DFS上的文件来查询数据集。例如,Spark直接从文件系统(HDFS或S3)读取路径。
Spark调用如下:
org.apache.spark.rdd.NewHadoopRDD.getPartitions org.apache.parquet.hadoop.ParquetInputFormat.getSplits org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits
在不了解Hudi的文件布局的情况下,引擎只会简单地读取所有parquet文件并显示结果,这样结果中可能会出现大量的重复项。
有两种方法可以配置查询引擎来正确读取Hudi数据集
A) 调用HoodieParquetInputFormat#getSplits
和HoodieParquetInputFormat#getRecordReader
方法
Hive原生就会执行此操作,因为InputFormat是Hive中插入表格式的抽象。HoodieParquetInputFormat扩展了MapredParquetInputFormat,其是hive的一种输入格式,将Hudi表注册到Hive metastore中。 当使用 UseFileSplitsFromInputFormat注解时,Presto会使用输入格式来获取分片,然后继续使用自己的优化/矢量化parquet读取器来查询写时复制表。 可以使用 --conf spark.sql.hive.convertMetastoreParquet=false将Spark强制回退到 HoodieParquetInputFormat类。
B) 使引擎调用路径过滤器(path filter)或其他方式来直接调用Hudi类来过滤DFS上的文件并挑选最新的文件切片
即使我们可以强制Spark回退到使用InputFormat类,但这样做可能会失去使用Spark的parquet读取器的能力。 为保持parquet文件读取性能的优势,我们将 HoodieROTablePathFilter设置为路径过滤器,并在Spark 的Hadoop Configuration中指定,确保始终选择Hudi相关文件的文件夹(路径)或文件的最新文件片。这将过滤出重复的条目并显示每个记录的最新条目。
29. 已有数据集,如何使用部分数据来评估Hudi
可以将该数据的一部分批量导入到新的hudi表中。例如一个月的数据
spark.read.parquet("your_data_set/path/to/month")
.write.format("org.apache.hudi")
.option("hoodie.datasource.write.operation", "bulk_insert")
.option("hoodie.datasource.write.storage.type", "storage_type") // COPY_ON_WRITE or MERGE_ON_READ
.option(RECORDKEY_FIELD_OPT_KEY, "" ).
.option(PARTITIONPATH_FIELD_OPT_KEY, "" )
...
.mode(SaveMode.Append)
.save(basePath);
一旦有初始副本后,就可选择一些数据样本进行更新插入操作
spark.read.parquet("your_data_set/path/to/month").limit(n) // Limit n records
.write.format("org.apache.hudi")
.option("hoodie.datasource.write.operation", "upsert")
.option(RECORDKEY_FIELD_OPT_KEY, "" ).
.option(PARTITIONPATH_FIELD_OPT_KEY, "" )
...
.mode(SaveMode.Append)
.save(basePath);
对于读取的表的合并,若还需要调度和运行压缩(compaction)任务。则可以使用 spark sumbit直接提交 org.apache.hudi.utilities.HoodieCompactor运行压缩,也可以使用HUDI CLI运行压缩。
如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!