Hudi 原理 | 一文彻底掌握 Apache Hudi 异步 Clustering 部署
1. 摘要
在之前的一篇博客中,我们介绍了Clustering(聚簇)
的表服务来重新组织数据来提供更好的查询性能,而不用降低摄取速度,并且我们已经知道如何部署同步Clustering
,本篇博客中,我们将讨论近期社区做的一些改进以及如何通过HoodieClusteringJob
和DeltaStreamer
工具来部署异步Clustering
。
2. 介绍
通常讲,Clustering
根据可配置的策略创建一个计划,根据特定规则对符合条件的文件进行分组,然后执行该计划。Hudi支持并发写入[1],并在多个表服务之间提供快照隔离,从而允许写入程序在后台运行Clustering
时继续摄取。有关Clustering
的体系结构的更详细概述请查看上一篇博文。
3. Clustering策略
如前所述Clustering
计划和执行取决于可插拔的配置策略。这些策略大致可分为三类:计划策略
、执行策略
和更新策略
。
3.1 计划策略
该策略在创建Clustering计划时发挥作用。它有助于决定应该对哪些文件组进行Clustering。让我们看一下Hudi提供的不同计划策略。请注意,使用此配置[2]可以轻松地插拔这些策略。
•SparkSizeBasedClusteringPlanStrategy:根据基本文件的小文件限制[3]选择文件切片并创建Clustering
组,最大大小为每个组允许的最大文件大小。可以使用此配置[4]指定最大大小。此策略对于将中等大小的文件合并成大文件非常有用,以减少跨冷分区分布的大量文件。•SparkRecentDaysClusteringPlanStrategy:根据以前的N
天分区创建一个计划,将这些分区中的小文件片进行Clustering
,这是默认策略,当工作负载是可预测的并且数据是按时间划分时,它可能很有用。•SparkSelectedPartitionsClusteringPlanStrategy:如果只想对某个范围内的特定分区进行Clustering
,那么无论这些分区是新分区还是旧分区,此策略都很有用,要使用此策略,还需要在下面设置两个配置(包括开始和结束分区):
hoodie.clustering.plan.strategy.cluster.begin.partition
hoodie.clustering.plan.strategy.cluster.end.partition
注意:所有策略都是分区感知的,后两种策略仍然受到第一种策略的大小限制的约束。
3.2 执行策略
在计划阶段构建Clustering
组后,Hudi主要根据排序列和大小为每个组应用执行策略,可以使用此配置[5]指定策略。
SparkSortAndSizeExecutionStrategy
是默认策略。使用此配置进行Clustering
时,用户可以指定数据排序列。除此之外我们还可以为Clustering
产生的Parquet文件设置最大文件大小[6]。该策略使用bulk_insert
将数据写入新文件,在这种情况下,Hudi隐式使用一个分区器,该分区器根据指定列进行排序。通过这种策略改变数据布局,不仅提高了查询性能,而且自动平衡了重写开销。
现在该策略可以作为单个Spark作业或多个作业执行,具体取决于在计划阶段创建的Clustering
组的数量。默认情况下Hudi将提交多个Spark作业并合并结果。如果要强制Hudi使用单Spark作业,请将执行策略类配置设置为SingleSparkJobExecutionStrategy
。
3.3 更新策略
目前只能为未接收任何并发更新的表/分区调度Clustering
。默认情况下更新策略的配置设置为SparkRejectUpdateStrategy
。如果某个文件组在Clustering
期间有更新,则它将拒绝更新并引发异常。然而在某些用例中,更新是非常稀疏的,并且不涉及大多数文件组。简单拒绝更新的默认策略似乎不公平。在这种用例中用户可以将配置设置为SparkAllowUpdateStregy
。
我们讨论了关键策略配置,下面列出了与Clustering
相关的所有其他配置。在此列表中一些非常有用的配置包括:
配置项 | 解释 | 默认值 |
hoodie.clustering.async.enabled | 启用在表上的异步运行Clustering 服务。 | false |
hoodie.clustering.async.max.commits | 通过指定应触发多少次提交来控制异步Clustering 的频率。 | 4 |
hoodie.clustering.preserve.commit.metadata | 重写数据时保留现有的_hoodie_commit_time 。这意味着用户可以在Clustering 数据上运行增量查询,而不会产生任何副作用。 | false |
4. 异步Clustering
之前我们已经了解了用户如何设置同步Clustering
[7]。此外用户可以利用HoodiecClusteringJob
[8]设置两步异步Clustering
。
4.1 HoodieClusteringJob
随着Hudi版本0.9.0的发布,我们可以在同一步骤中调度和执行Clustering
。我们只需要指定-mode
或-m
选项。有如下三种模式:
•schedule(调度):制定一个Clustering计划。这提供了一个可以在执行模式下传递的instant
。•execute(执行):在给定的instant
执行Clustering计划,这意味着这里需要instant
。•scheduleAndExecute(调度并执行):首先制定Clustering计划并立即执行该计划。
请注意要在原始写入程序仍在运行时运行作业请启用多写入:
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
使用spark submit
命令提交HoodieClusteringJob
示例如下:
spark-submit \
--class org.apache.hudi.utilities.HoodieClusteringJob \
/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar \
--props /path/to/config/clusteringjob.properties \
--mode scheduleAndExecute \
--base-path /path/to/hudi_table/basePath \
--table-name hudi_table_schedule_clustering \
--spark-memory 1g
clusteringjob.properties
配置文件示例如下
hoodie.clustering.async.enabled=true
hoodie.clustering.async.max.commits=4
hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
hoodie.clustering.plan.strategy.small.file.limit=629145600
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
hoodie.clustering.plan.strategy.sort.columns=column1,column2
4.2 HoodieDeltaStreamer
接着看下如何使用HudiDeltaStreamer
。现在我们可以使用DeltaStreamer
触发异步Clustering。只需将hoodie.clustering.async.enabled
为true
,并在属性文件中指定其他Clustering配置,在启动Deltastreamer
时可以将其位置设为-props
(与HoodieClusteringJob
配置类似)。
使用spark submit
命令提交HoodieDeltaStreamer
示例如下:
spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar \
--props /path/to/config/clustering_kafka.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field impresssiontime \
--table-type COPY_ON_WRITE \
--target-base-path /path/to/hudi_table/basePath \
--target-table impressions_cow_cluster \
--op INSERT \
--hoodie-conf hoodie.clustering.async.enabled=true \
--continuous
4.3 Spark Structured Streaming
我们还可以使用Spark结构化流启用异步Clustering,如下所示。
val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)
def getAsyncClusteringOpts(isAsyncClustering: String,
clusteringNumCommit: String,
executionStrategy: String):Map[String, String] = {
commonOpts + (DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,
HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit,
HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.key -> executionStrategy
)
}
def initStreamingWriteFuture(hudiOptions: Map[String, String]): Future[Unit] = {
val streamingInput = // define the source of streaming
Future {
println("streaming starting")
streamingInput
.writeStream
.format("org.apache.hudi")
.options(hudiOptions)
.option("checkpointLocation", basePath + "/checkpoint")
.mode(Append)
.start()
.awaitTermination(10000)
println("streaming ends")
}
}
def structuredStreamingWithClustering(): Unit = {
val df = //generate data frame
val hudiOptions = getClusteringOpts("true", "1", "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy")
val f1 = initStreamingWriteFuture(hudiOptions)
Await.result(f1, Duration.Inf)
}
5. 总结和未来工作
在这篇文章中,我们讨论了不同的Clustering策略以及如何设置异步Clustering。未来的工作包括:
•Clustering支持更新。•支持Clustering的CLI工具。
另外Flink支持Clustering已经有相应Pull Request[9],有兴趣的小伙伴可以关注该PR。
可以查看JIRA[10]了解更多关于此问题的开发,我们期待社会各界的贡献,希望你喜欢这个博客!
引用链接
[1]
并发写入: https://hudi.apache.org/docs/concurrency_control#enabling[2]
此配置: http://hudi.apache.org/docs/next/configurations#hoodieclusteringplanstrategyclass[3]
小文件限制: http://hudi.apache.org/docs/next/configurations/#hoodieclusteringplanstrategysmallfilelimit[4]
此配置: http://hudi.apache.org/docs/next/configurations/#hoodieclusteringplanstrategymaxbytespergroup[5]
此配置: http://hudi.apache.org/docs/next/configurations/#hoodieclusteringexecutionstrategyclass[6]
最大文件大小: http://hudi.apache.org/docs/next/configurations/#hoodieparquetmaxfilesize[7]
同步Clustering
: http://hudi.apache.org/blog/2021/01/27/hudi-clustering-intro#setting-up-clustering[8]
HoodiecClusteringJob
: https://cwiki.apache.org/confluence/display/HUDI/RFC+-+19+Clustering+data+for+freshness+and+query+performance#RFC19Clusteringdataforfreshnessandqueryperformance-SetupforAsyncclusteringJob[9]
Pull Request: https://github.com/apache/hudi/pull/3599[10]
JIRA: https://issues.apache.org/jira/browse/HUDI-1042
推荐阅读
Apache Hudi 0.9.0版本重磅发布!更强大的流式数据湖平台
引用链接
[1]
并发写入: https://hudi.apache.org/docs/concurrency_control#enabling[2]
此配置: http://hudi.apache.org/docs/next/configurations#hoodieclusteringplanstrategyclass[3]
小文件限制: http://hudi.apache.org/docs/next/configurations/#hoodieclusteringplanstrategysmallfilelimit[4]
此配置: http://hudi.apache.org/docs/next/configurations/#hoodieclusteringplanstrategymaxbytespergroup[5]
此配置: http://hudi.apache.org/docs/next/configurations/#hoodieclusteringexecutionstrategyclass[6]
最大文件大小: http://hudi.apache.org/docs/next/configurations/#hoodieparquetmaxfilesize[7]
同步Clustering
: http://hudi.apache.org/blog/2021/01/27/hudi-clustering-intro#setting-up-clustering[8]
HoodiecClusteringJob
: https://cwiki.apache.org/confluence/display/HUDI/RFC+-+19+Clustering+data+for+freshness+and+query+performance#RFC19Clusteringdataforfreshnessandqueryperformance-SetupforAsyncclusteringJob[9]
Pull Request: https://github.com/apache/hudi/pull/3599[10]
JIRA: https://issues.apache.org/jira/browse/HUDI-1042