Hudi 原理 | 聊一聊 Apache Hudi 原理

共 5878字,需浏览 12分钟

 ·

2022-03-18 22:38

作为这个公众号的第二篇文章,来讲讲近年来比较火,并有越来越火的趋势的存储系统——Hudi。Hudi经常被拿来跟Delta,Iceberg一起,并称为“数据湖三剑客”,最近的热度也是越来越高,被很多的大公司采用(例如字节bilibili顺丰等),相信有不少朋友也正在考虑引入Hudi,或者已经进入调研试用的阶段。然而Hudi的概念很多,文档写得稍微语焉不详,估计有些朋友看完文档以后还是“有点懵”。这篇文章就是希望讲清楚Hudi的原理,帮助大家更好地理解Hudi的工作机制和每个配置项的含义,然后在生产环境可以用好Hudi。


01


首先我会讲一讲Hudi的背景,因为背景对理解一个项目很关键。(只想看原理的朋友,可以直接跳到第二节)


Hudi,正式的全称是Hadoop Upsert Delete and Incremental。其实关于这个名字也不用太较真,因为都是后来附会上去的,从源代码里可以看到这个项目最初的名字是hoodie,和现在的名字发音相同。但这个名字还是透露了一些信息的,那就是Hudi项目最初的设计目标:hadoop上实现update和delete操作


为什么会有update和delete的需求?uber在开源Hudi的文章中解释了:


最初uber使用的是Lambda架构,但是有个问题是计算逻辑分为批量和实时两种,要保持两者的逻辑完全一致很困难(毕竟是两套代码)


然后uber转向了Kappa架构,使得两套代码变为一套,但是存储依然有两套,分别支持实时写入和批量写入。


为了把存储也统一起来,减少运维的压力,就需要让负责批量写入的存储系统也能支持实时写入,这就产生了update和delete的需求。为什么呢?有多种原因,例如实时计算常有的迟到数据,还有业务时效性要求以及一些合规需求(GDPR要求平台允许用户删除自己的数据)。而众所众知的是,无论是HDFS还是云平台的对象存储(例如aws的s3,阿里云的oss等),都不支持update而只能overwrite,因此要实现update和delete功能,就必须在底层存储之上做文章。Hudi于是应运而生。


02


讲完了背景,接下来我们会深入Hudi的实现部分。和上一篇文章《详解Parquet格式》一样,这次我同样会循着一条主线来讲解Hudi,这条主线就是Hudi的标志性功能——Upsert


Upsert可以说是Hudi的招牌,正如上一节所说,Hudi最初的设计目标就是在hadoop上实现数据的update。于是这里的核心问题就是


如何在一个只能overwrite的文件系统上实现update操作?


Hudi解决了这个问题,使用了一种很简单的思想,那就是


把一个完整的文件拆分为多个“小文件”,当需要更新其中某条记录时,只要把包含这条记录的“小文件”给重写一遍即可。


到这里还没有出现任何Hudi的概念,例如Copy on Write(简称COW)或Merge on Read(简称MOR),是不是?别急,马上我就会拿COW表来举例。之所以先讲COW表,是因为这种类型的表原理更加简单,也是MOR表的基础。而且Hudi最初的版本只支持COW表,可见这是Hudi的立项之本。


接下来我会用一个例子直观地展示下COW表的upsert是如何实现的。


首先,假设我们向一张Hudi表中预先写入了5行数据,如下


txn_iduser_id
item_id
amount
date
11
1
220220101
221120220101
31
2
3
20220101
4
13120220102
5
2
3220220102


这时在我们的hdfs里面,会有下面2个目录,以及1个隐藏的.hoodie目录。


warehouse├── .hoodie├── 20220101│   └── fileId1_001.parquet└── 20220102    └── fileId2_001.parquet

文件名分为两部分,fileId是Hudi中的一个概念,后面会做解释,001则是commitId。


画成图就是


可以看到,属于20220101分区的3条数据保存在一个parquet文件:fileId1_001.parquet,属于20220102分区的2条数据则保存在另一个parquet文件:fileId2_001.parquet。


然后我们再写入3条新的数据。其中有2条数据是新增,1条数据是更新。写入的数据如下


txn_iduser_id
item_idamountdate
3
125
20220101
6
14
120220103
7
232
20220103


写入完成后,hdfs里面的文件结构会变成这样


warehouse├── .hoodie├── 20220101│   ├── fileId1_001.parquet│   └── fileId1_002.parquet├── 20220102│   └── fileId2_001.parquet└── 20220103    └── fileId3_001.parquet

注意.hoodie这个目录,里面保存了hudi的元数据


画成图就是



可以看到,更新的那一条记录,实际被写入到了同一个分区下的新文件:fileId1_002.parquet。这个新文件的fileId和上一个相同,只不过commitId变成了002。同时还有一个新文件:fileId3_001.parquet。


update到这里就算完成了,那么使用这张表的用户又是如何读到更新以后的数据呢?Hudi客户端在读取这张表时,会根据.hoodie目录下保存的元数据信息,获知需要读取的文件是:fileId1_002.parquet,fileId2_001.parquet,fileId3_001.parquet。这些文件里保存的正是最新的数据


读取的是最新的文件


以上就是Hudi实现update的原理。在有了相对直观的理解之后,我们就可以进一步深入实现细节了。


03


这一节会对Hudi的写入逻辑进行更细节的讲解。首先来看下Hudi使用spark接口进行upsert的代码


df.write.format("hudi").  option(RECORDKEY_FIELD_OPT_KEY, "txn_id").  option(PARTITIONPATH_FIELD_OPT_KEY, "date").  option(TABLE_NAME, tableName).  mode(Overwrite).  save()


注意到这里有两个必填的配置项:RECORDKEY_FIELD_OPT_KEY和PARTITIONPATH_FIELD_OPT_KEY,它们的含义是“作为recordKey的字段名”,“作为partitionPath的字段名”。请记住这两个字段,在后面的写入过程中有非常重要的作用。


Upsert的过程整体分为3步(这里省略了很多不太重要的步骤):


  1. 根据partitionPath进行重新分区。

  2. 根据recordKey确定哪些记录需要插入,哪些记录需要更新。对于需要更新的记录,还需要找到旧的记录所在的文件。(这个过程被称为tagging)

  3. 把记录写入实际的文件。


Step1. 重新分区


无论DataFrame在写入前是如何分区的,Hudi都会对它们进行重新分区。重新分区的依据就是partitionPath。partitionPath相同的record都会被分到同一个partition,并交给一个executor负责写入。上面例子中的配置项PARTITIONPATH_FIELD_OPT_KEY就是用来指定record里面的哪个字段作为partitionPath。


Step2. Tagging


在确定了每个record的partition之后,接下来做的就是tagging。tagging是写入过程中最重要的一步,核心逻辑是确定每条record是insert还是update,以及如果是update,则定位到上次写入时的fileId


Hudi如何确定一条record是insert还是update?是通过recordKey。用户在写入时需要指定每条record的recordKey,Hudi会用这个recordKey和现有的数据进行比对,如果找到一条key相同的record,则认为这次新的写入是update,否则就是insert


对于一条update的数据,也就是说之前曾经插入过相同key的record,那么Hudi会把旧的record的fileId取出来,作为这条新record的fileId。之前一直没有解释fileId的含义,现在可以解释下了。fileId是Hudi为每条record赋予的id,用于标识这条record被保存在哪个文件里,或者更严格地说,是“哪一批文件”里。由于每次update都会生成一个新的文件,但是共享同一个fileId,所以最终会变成一批文件。Hudi把具有相同fileId的一批文件称为file group。最后,fileId本身是一个uuid,是全局唯一的。


warehouse├── .hoodie├── 20220101│   ├── fileId1_001.parquet│   └── fileId1_002.parquet├── 20220102│   └── fileId2_001.parquet└── 20220103    └── fileId3_001.parquet

文件名里包含fileId1的2个文件就是一个file group


整个tagging过程还有一个显而易见的问题,那就是tagging需要在已有的数据里寻找key相同的record,如果表的数据量比较大时会非常耗时。为了解决这个问题,Hudi引入了index机制,下一节我会更详细地讲一讲。


Step3. 写入文件


当tagging完成以后,就会开始真正地写入数据。Hudi会把需要写入的数据分为insert和update两部分,update的数据会用原来的fileId进行写入,insert的数据则会生成一个新的fileId用于写入。值得一提的是,insert的数据也不会全部写入到同一个文件,而是到达了一定阈值(由hoodie.parquet.max.file.size配置项控制)以后,关闭当前文件,换一个新的文件继续写入(同时也会生成一个新的fileId)。


04


讲完了Hudi的upsert过程,Hudi的基础框架就已经比较清楚了。后面的大部分工作都是在这个基础上的优化。这里试讲下其中的几个


Merge on Read


Hudi最大的特征就是表分为Copy on Write和Merge on Read两种类型。Copy on Write的工作原理上文已经解释过了,Merge on Read则是对Copy on Write的优化。优化了什么呢?主要是写入性能


从上面的例子中可以看到,对于COW表,每次更新都会生成一个新的文件,里面包括了更新的数据以及属于同一个文件但没有被更新的老数据。所以这个文件比较大,写入也会比较慢。


txn_id=3是更新的数据,1和2没有变化,是老数据


为了加快写入(主要是update)的速度,Hudi引入了MOR表。和COW表最大的不同就是,MOR表在更新时只会把更新的那部分数据写入一个.log文件,因为.log文件不包含老数据,也不涉及tagging,又是顺序写入的,所以写入会非常快。而当客户端要读取数据时,会有两种选择:


  1. 读取时动态地把.log文件和原始数据文件(称为base文件)进行merge

  2. 异步地把.log文件和base文件merge,如果merge还没完成,只能读到上个版本的数据


无论是哪一种办法,都有利有弊。第一种办法的优点是数据保证最新,缺点是读取的性能较差。第二种办法的优点是读取的性能和COW表相同,缺点是异步merge(称为compaction)有一定的延迟。这也就是Hudi官网上展示的snapshot query和read optimised query的差异来源



Index

在upsert的工作原理中,我们提到了tagging过程中需要使用index确定每一条数据之前是否已经插入过。这个index也有很多门道,Hudi默认提供了3种index实现,同时允许用户实现自己的index。


这3种index分别是:Bloom IndexSimple IndexHBase Index


  • Bloom Index:实现原理是bloom filter。优点是效率高,缺点是bloom filter固有的假阳性问题,所以Hudi对bloom filter里存在的key,还需要回溯原文件再查找一遍。Hudi默认使用的是Bloom Index

  • Simple Index:实现原理是把新数据和老数据进行join。优点是实现最简单,无需额外的资源。缺点是性能比较差。

  • HBase Index:实现原理是把index存放在HBase里面。优点是性能最高,缺点是需要外部的系统,增加了运维压力。


Index还有一个概念是global index和non-global index。这两者有什么区别呢?global index里面存放了一张表里所有record的key,而non-global index是每个partition都有一个对应的index,里面只存放了本partition的key。所以如果用户使用non-global index,就必须保证同一个key的record不会出现在多个partition里面。看起来global index比non-global index更好,为什么还要有non-global index?主要是出于index的维护成本和写入性能考虑。因为维护一个global index的难度更大,对写入性能的影响也更大。


05


在这一篇文章里,我整体介绍了COW表的写入原理,可以说这是Hudi的基础,有助于理解Hudi的所有方面。下一篇文章,我会对MOR表的实现原理,以及Hudi增量写入的原理等,再做一些介绍。

浏览 1655
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报