Iceberg 功能 | 保障数据质量的利器:Iceberg 的 WAP 功能

共 2399字,需浏览 5分钟

 ·

2022-08-03 12:51

这两年来,越来越多的公司把他们的数据迁移到数据湖存储格式之上(Delta,Hudi,Iceberg),之所以会有这种趋势,和过去的存储方案(基于HDFS和Parquet,Orc,Csv等)越来越难以满足新的需求关系很大。这些新需求包括对数据更新的需求,对ACID的需求,更好的写入性能,更快的查询性能等等。今天就来介绍一下Iceberg的一个国内较少被提到的功能:WAP(Write-Audit-Publish),以及WAP如何用于数据质量校验。

使用方法

WAP,全称是Write-Audit-Publish(写入-审核-发布),是#342中新增的功能。什么是“Write-Audit-Publish”呢?简单来说是一种写入模式,在WAP模式下,新写入的数据会首先被写入staging空间,只有经过审核(Audit)并且发布(Publish)之后,才算真正写入完成(也就是说,可以被查询到)。这个功能很匹配数据质量校验中一个常见的需求:新数据只有当通过质量校验,才能被下游任务看到和使用。

接下来讲一讲WAP功能具体怎么用。

打开WAP功能

首先,WAP功能默认是关闭的,需要用户手动打开。配置的对象是表,对需要打开WAP功能的表,运行以下SQL

ALTER TABLE metrics
SET TBLPROPERTIES (
    'write.wap.enabled'='true'
)

这样WAP功能就算打开了。但是光打开功能还不够,在负责写入的spark session里,还需要设置wapId。这个wapId是个标识符,可以是任意的字符串(常用的是UUID),主要用于之后获取数据的。

sparkSession.conf().set('spark.wap.id', wapId)

只有当WAP功能打开,同时也设置了wapId,写入的数据才会进入staging空间,否则就和普通的写入一样(写入成功时立即可见)。

读取staging空间的数据

写入staging的数据无法用普通的SELECT语法读取,所以当我们要对这些数据进行质量校验时,就得按照以下步骤:

  1. wapId查找对应的snapshot-id
  2. 根据上面查到的snapshot-id,使用time travel功能读取

通过wapId查找snapshot-id使用的是下面这条语句

SELECT snapshot_id FROM metrics.snapshots
WHERE summary['wap.id'] = {wapId}

在得到了snapshot-id之后,然后就可以使用正常的time travel功能来读取写入的数据,并对数据进行各种校验。

如果是Spark 3.3,可以使用SQL API

SELECT * FROM metrics VERSION AS OF {snapshot-id}

如果是Spark 3.2以下,则只能使用DataFrame API

spark.read
    .option("snapshot-id", snapshot-id)
    .format("iceberg")
    .load("metrics")
    .collect()

发布数据

当数据校验成功,确定质量没有问题后,就可以着手数据的发布了。

数据发布使用的也是Iceberg现成的功能:cherrypick_snapshot。通过调用内置的函数cherrypick_snapshot来发布数据。

CALL {catelog_name}.system.cherrypick_snapshot('metrics', {snapshot-id})

这样,我们就算是走完一个完整的“Write-Audit-Publish”流程了。

实现原理

如果你对Iceberg的原理比较熟悉(读过《谈一谈Iceberg的原理》),那可能无需我的讲解,就能猜个八九不离十。本质上来说,WAP的实现原理就是写入时创建新的snapshot,但不更新commit history,只有发布以后才并入主流

在实现层面,WAP大量复用Iceberg已有的能力,甚至到了“偷懒”的地步。主要是以下3个功能:

  1. snapshot summary
  2. time travel
  3. cherrypick

其中第二项和第三项都是面向用户的功能,Iceberg的官方文档有详细介绍,我就不赘述了。第一项summary是主要面向开发者的功能,定义在在Iceberg Table Spec里,规定了v2格式的snapshot里必须包含summary字段,是个kv结构,用于保存table的一些配置和属性信息,如过期时间等。WAP也是借用了这个字段,用于存储wapId

可以说,WAP就是把以上三个功能“拼起来”实现的。固然实现起来是简单了,但带来一个副作用:接口显得特别难用。至少我个人觉得使用流程非常不直觉。

总结

这篇文章以介绍WAP功能为主,因为这项功能在数据质量校验里非常有用。推荐已经迁移到Iceberg的朋友可以尝试一下。

但是目前的WAP也有一些限制,可能有时候会“不那么好用”:

  • 目前只支持Spark引擎
  • 用户需要负责wapId的生成、存储等工作
  • 对于Spark 3.2以下版本,无法只用SQL API完成




如果觉得这篇文章对你有所帮助,

请点一下在看,是对我的肯定和支持~


浏览 68
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报