Apache Iceberg在小红书的探索与实践
数据派THU
共 7580字,需浏览 16分钟
· 2022-08-25
![](https://filescdn.proginn.com/d7405684a30ca08be243f8364fac6173/6293a94904b84878688bd4cec2497dcb.webp)
本文约5200字,建议阅读10分钟 本文主要介绍了小红书数据流团队基于Apache Iceberg在实时数仓领域的探索与实践。
![截屏2022-07-05 下午6.22.56.png](https://filescdn.proginn.com/67725c056e470a29a818fe8a7165df87/4c8dd89e6766d1248ab616781d98c312.webp)
日志数据入湖 CDC实时入湖 实时湖分析探索 未来规划
![](https://filescdn.proginn.com/02452a9fa7347445067ffe3d555a81ed/b7a6812b27f90bc3646d26ca462145ac.webp)
![](https://filescdn.proginn.com/4ede96fc3104ea1c15f541cb3451501d/1ca1b029d1f58030508d99a73dc54adb.webp)
Distcp会变得非常慢,导致数据延迟在小时级以上。 流量小的很多文件集中在一个Task,导致查询性能差。
![](https://filescdn.proginn.com/235337dcda26681d30d05847adf5cb9a/5fb22d6719e8a32991ffcb1b60222249.webp)
异步的小文件合并为周期调度,但是Iceberg在commit之后,下游ETL读文件作业会立即执行,在这之后再挂异步合并作业的意义就不大了。 如果同步合并小文件,即在Flink入湖作业中挂一个合并算子,这样会引入跨云IO,并增加Flink作业的OOM风险。
![](https://filescdn.proginn.com/9bfaf3db64f91e2a0d6841e833373d25/78959b21e7b963f41d819fbc19365d7a.webp)
Fanout:下游Subtask的分区个数。 Residual:下游Subtask的分配流量和与目标流量差距。
![](https://filescdn.proginn.com/0a6c22f93c55dee89a8e0ba4f99a53c2/c97c070f94c6dd35eb10323d1ce62c7a.webp)
小文件的问题得到了解决。 Writer算子内存占用减少。
引入了Shuffle。 流量动态变化。暂时还不能根据流量变化动态调整分区分布,因为当前是在Flink 作业启动的时候读取Iceberg的元数据。
![](https://filescdn.proginn.com/4c770cf6476c9e1ef4ab2bd18c3154a4/ac7c9465473219b0a094a2d466bd8434.webp)
![](https://filescdn.proginn.com/a43fcef2488c4763e65214aa56fba563/ddce23b3390a4c65700d8f04bd7ab311.webp)
![](https://filescdn.proginn.com/124233565917f747a2af32467b2c0c04/252b5bd350c8bface81dd80b26894842.webp)
HttpsClients,我们将S3原生的HttpsClients(Java8自带的HTTP URL Connection)更换为了Apache HttpClient,其在Socket链接以及易用性上有一些提升。在写的过程中我们也遇到了一些问题,多云机器带来的问题是每个厂商机器的内核是不太一样的,例如在某云上发现有写S3超时的问题,我们与厂商一起抓包发现是内核参数的问题。 API Call Timeout,将S3的Timeout配置项暴露给Iceberg。 Credential Provider,S3 SDK从FlinkConf中读取密钥。
![](https://filescdn.proginn.com/819b09437812a8aac7136f53dd1158ab/a6c43ac004efa1278a9e571d43a558bc.webp)
第一个问题是Batch Read
第二个问题是Adhoc查询
hive_prod.Iceberg_test.table
![](https://filescdn.proginn.com/f428e1cfa19d46dbc6a9636cd575509c/788dc7435627f151c823bca58686a0c3.webp)
![](https://filescdn.proginn.com/86976edca524627a357a964cd45da90f/5906d8117afcd739e9cd8620f76ed31d.webp)
![](https://filescdn.proginn.com/223426e742c00b8494c10fbdff2ba32d/5bb87429b28c8b7801eb47765508d3fc.webp)
全增量,先发全量再发增量。 At-Least-Once,保证重复发送时保证有序(最终⼀致性)。 MQ Producer根据主键Hash(且分桶数固定,不受扩容影响)。
Shuffle Key 只能是主键的⼦集 + Immutable Columns。
Upsert Mode。
![](https://filescdn.proginn.com/be8036c953fc887140211ac5f92a6e80/9f0995176ca9dfcba273942f27904e5f.webp)
![](https://filescdn.proginn.com/0a06dbd1a65e752e9bcdc0bcfdc5844c/d8efa22ca1ebf8fd356cf88dab157d4c.webp)
在读数据时可以只查询关联分区,忽略其他分区。 错峰做File Compaction,减少冲突。例如在写当前小时分区时我们可以对之前的分区做File Compaction。
![](https://filescdn.proginn.com/a4f2d412d7d65ad6ff05597ac20aca05/c93ff67d743b65c225f82d66ab78a44a.webp)
关掉当前的Writer,以新的Schema去建立新的Writer写数据。 以Schema变更的时间点为分割,对Schema变更前的数据先提交,再对Schema 进行Update,之后再提交 Schema变更后的文件。
![](https://filescdn.proginn.com/4731e1350fbb8c01469dc6620207e29e/24b973443a123bdf217b987e9b25939e.webp)
Binlog Format。支持解析Canal PB格式。 Progressive Compaction。Compaction是我们接下来工作的重点,尤其在MySQL的量比较小的时候,如果想维持五分钟级别的CheckPoint,小文件问题就会非常突出。如何避开流式任务正在写的Partition去做Compaction 也是目前在做的事情。
![](https://filescdn.proginn.com/4f2e86c362875553480ee0cbea3f6e99/391af7fae2da436652457f7a362fa75a.webp)
![](https://filescdn.proginn.com/82d6b8e21292599ee0bdb58d394bf5bb/f5d949d3739dcb8a7c8fccbcdf542e68.webp)
![](https://filescdn.proginn.com/426011ca90ba31d590e462569846135a/684691666d8273caadca2a1a24c51d57.webp)
![](https://filescdn.proginn.com/7300db90921002d8f374c3a4bac013ba/bb6c59ef4736b6b474dd79a44b6cdc5f.webp)
首先在存储方面,我们需要对CloudNative FileIO持续优化,比如进一步减少Checkpoint的毛刺、进一步提高吞吐、提高跨云读写的稳定性。 关于计算,我们会跟更多引擎去集成,目前已经集成了Spark引擎,同时正在集成ClickHouse。另外StarRocks社区已经集成了Iceberg外表的Connector,我们以后也会在上面做一些应用。在查询方面,计划通过改变数据的组织形式,或者添加一些二级索引来做Data Skipping去加速查询。 管理方面,让Iceberg持续稳定的运行下去还是需要外挂表维护作业的,这对下游数仓同学来说还是引入了运维压力。我们接下来会将其服务化,思考如何智能地拉起一些作业,以及运用什么策略可以减少冲突的概率。
编辑:王菁
评论