Iceberg 功能 | 保障数据质量的利器:Iceberg 的 WAP 功能
这两年来,越来越多的公司把他们的数据迁移到数据湖存储格式之上(Delta,Hudi,Iceberg),之所以会有这种趋势,和过去的存储方案(基于HDFS和Parquet,Orc,Csv等)越来越难以满足新的需求关系很大。这些新需求包括对数据更新的需求,对ACID的需求,更好的写入性能,更快的查询性能等等。今天就来介绍一下Iceberg的一个国内较少被提到的功能:WAP(Write-Audit-Publish),以及WAP如何用于数据质量校验。
使用方法
WAP,全称是Write-Audit-Publish(写入-审核-发布),是#342中新增的功能。什么是“Write-Audit-Publish”呢?简单来说是一种写入模式,在WAP模式下,新写入的数据会首先被写入staging空间,只有经过审核(Audit)并且发布(Publish)之后,才算真正写入完成(也就是说,可以被查询到)。这个功能很匹配数据质量校验中一个常见的需求:新数据只有当通过质量校验,才能被下游任务看到和使用。
接下来讲一讲WAP功能具体怎么用。
打开WAP功能
首先,WAP功能默认是关闭的,需要用户手动打开。配置的对象是表,对需要打开WAP功能的表,运行以下SQL
ALTER TABLE metrics
SET TBLPROPERTIES (
'write.wap.enabled'='true'
)
这样WAP功能就算打开了。但是光打开功能还不够,在负责写入的spark session里,还需要设置wapId
。这个wapId
是个标识符,可以是任意的字符串(常用的是UUID),主要用于之后获取数据的。
sparkSession.conf().set('spark.wap.id', wapId)
只有当WAP功能打开,同时也设置了wapId
,写入的数据才会进入staging空间,否则就和普通的写入一样(写入成功时立即可见)。
读取staging空间的数据
写入staging的数据无法用普通的SELECT
语法读取,所以当我们要对这些数据进行质量校验时,就得按照以下步骤:
用 wapId
查找对应的snapshot-id
根据上面查到的 snapshot-id
,使用time travel功能读取
通过wapId
查找snapshot-id
使用的是下面这条语句
SELECT snapshot_id FROM metrics.snapshots
WHERE summary['wap.id'] = {wapId}
在得到了snapshot-id
之后,然后就可以使用正常的time travel功能来读取写入的数据,并对数据进行各种校验。
如果是Spark 3.3,可以使用SQL API
SELECT * FROM metrics VERSION AS OF {snapshot-id}
如果是Spark 3.2以下,则只能使用DataFrame API
spark.read
.option("snapshot-id", snapshot-id)
.format("iceberg")
.load("metrics")
.collect()
发布数据
当数据校验成功,确定质量没有问题后,就可以着手数据的发布了。
数据发布使用的也是Iceberg现成的功能:cherrypick_snapshot
。通过调用内置的函数cherrypick_snapshot
来发布数据。
CALL {catelog_name}.system.cherrypick_snapshot('metrics', {snapshot-id})
这样,我们就算是走完一个完整的“Write-Audit-Publish”流程了。
实现原理
如果你对Iceberg的原理比较熟悉(读过《谈一谈Iceberg的原理》),那可能无需我的讲解,就能猜个八九不离十。本质上来说,WAP的实现原理就是写入时创建新的snapshot,但不更新commit history,只有发布以后才并入主流。
在实现层面,WAP大量复用Iceberg已有的能力,甚至到了“偷懒”的地步。主要是以下3个功能:
snapshot summary time travel cherrypick
其中第二项和第三项都是面向用户的功能,Iceberg的官方文档有详细介绍,我就不赘述了。第一项summary
是主要面向开发者的功能,定义在在Iceberg Table Spec里,规定了v2格式的snapshot里必须包含summary
字段,是个kv结构,用于保存table的一些配置和属性信息,如过期时间等。WAP也是借用了这个字段,用于存储wapId
。
可以说,WAP就是把以上三个功能“拼起来”实现的。固然实现起来是简单了,但带来一个副作用:接口显得特别难用。至少我个人觉得使用流程非常不直觉。
总结
这篇文章以介绍WAP功能为主,因为这项功能在数据质量校验里非常有用。推荐已经迁移到Iceberg的朋友可以尝试一下。
但是目前的WAP也有一些限制,可能有时候会“不那么好用”:
目前只支持Spark引擎 用户需要负责wapId的生成、存储等工作 对于Spark 3.2以下版本,无法只用SQL API完成
如果觉得这篇文章对你有所帮助,
请点一下赞或在看,是对我的肯定和支持~