原来Kafka源码也在用二分搜索!

JavaEdge

共 4878字,需浏览 10分钟

 ·

2020-12-05 21:58


  点击上方“JavaEdge”,关注公众号

设为“星标”,好文章不错过!

Kafka的索引组件使用二分搜索,而且社区还针对Kafka自身特点对其改良。


1 索引架构



如下几个类都位于该包下:


  • AbstractIndex.scala
    最顶层抽象类:封装了索引类型的公共操作

  • LazyIndex.scala
    定义了AbstractIndex上的一个包装类,实现索引项延迟加载,该类只为提高性能

  • OffsetIndex.scala
    偏移索引,保存<位移值,文件物理磁盘位置>对。

  • TimeIndex.scala
    时间戳索引,保存<时间戳,位移值>对。

  • TransactionIndex.scala
    事务索引,为已中止事务(Aborted Transcation)保存重要元数据。
    只有启用Kafka事务特性后,该索引才可能出现


2 AbstractIndex代码结构




2.1 类定义





2.2 属性


  • 索引文件(file)
    每个索引对象在磁盘上都对应一个索引文件。该字段是var型,说明它可被修改。难道索引对象还能动态更换底层索引文件?是的。1.1.0版本后,Kafka允许迁移底层的日志路径,所以,索引文件自然要是可以更换的

  • 起始位移值(baseOffset)

    索引对象对应日志段对象的起始位移值。查看Kafka日志路径,日志文件和索引文件都是成组出现。比如若日志文件是00000000000000000123.log,一定还有一组索引文件00000000000000000123.index00000000000000000123.timeindex等。这里的“123”就是这组文件的起始位移值,即baseOffset

  • 索引文件最大字节数(maxIndexSize)

    控制索引文件的最大长度。Kafka源码传入该参数的值是Broker端参数segment.index.bytes值,即10MB。所以默认下所有Kafka索引文件大小都是10MB。

  • 索引文件打开方式(writable)

    “True”:以“读写”方式打开,“False”:以“只读”方式打开。


每个继承AbstractIndex的子类负责定义具体的索引项结构,基于此架构设计,AbstractIndex定义抽象方法entrySize表示不同索引项的大小

// OffsetIndexoverride def entrySize = 8// TimeIndexoverride def entrySize = 12




为什么选择8、12?


在OffsetIndex中,位移值4字节,物理磁盘位置4字节,所以共8字节。但位移值不是长整型吗,不是应该8字节?。
其实AbstractIndex已保存baseOffset,这里的位移值,实际上是相对于baseOffset的相对位移值,即

真实位移值 - baseOffset

使用相对位移值能有效节省磁盘空间
而Broker端参数log.segment.bytes是整型,这说明Kafka中每个日志段文件的大小不会超过2^32,即4GB,这说明同一个日志段文件上的 位移值 - baseOffset 一定在整数范围内。因此,源码只需4字节保存。

同理,TimeIndex中的时间戳类型是长整型,占8字节,位移依然使用相对位移值,占用4个字节,因此共需12字节。

3 Kafka的索引底层实现原理


内存映射文件,即Java中的MappedByteBuffer。

内存映射文件的主要优势在于,它有很高的I/O性能,特别是对于索引这样的小文件来说,由于文件内存被直接映射到一段虚拟内存上,访问内存映射文件的速度要快于普通的读写文件速度。

在Linux的这段映射的内存区域就是内核的页缓存(Page Cache)。里面的数据无需重复拷贝到用户态空间,避免了大量不必要的时间、空间消耗。

在AbstractIndex中,这个MappedByteBuffer就是名为mmap的变量。接下来,我用注释的方式,带你深入了解下这个mmap的主要流程。

这些代码最主要的作用就是创建mmap对象。要知道,AbstractIndex其他大部分的操作都是和mmap相关。

案例:

  • 计算索引对象中当前有多少个索引项

protected var _entries: Int = mmap.position() / entrySize
  • 计算索引文件最多能容纳多少个索引项

private[this] var _maxEntries: Int = mmap.limit() / entrySize

再进一步,有了这两个变量,我们就能够很容易地编写一个方法,来判断当前索引文件是否已经写满:

 def isFull: Boolean = _entries >= _maxEntries

AbstractIndex最重要的就是这个mmap变量。事实上,AbstractIndex继承类实现添加索引项的主要逻辑,也就是向mmap中添加对应的字段。



写入索引项


下面这段代码是OffsetIndex的append方法,用于向索引文件中写入新索引项。


append方法的执行流程





查找索引项


索引项的写入逻辑并不复杂,难点在于如何查找索引项。AbstractIndex定义了抽象方法parseEntry用于查找给定的索引项,如下所示:

protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry


“n”表示要查找给定ByteBuffer中保存的第n个索引项(在Kafka中也称第n个槽)。IndexEntry是源码定义的一个接口,里面有两个方法:indexKey和indexValue,分别返回不同类型索引的对。

OffsetIndex实现parseEntry的逻辑如下:

   override protected def parseEntry(buffer: ByteBuffer, n: Int): OffsetPosition = {        OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))      }


OffsetPosition是实现IndexEntry的实现类,Key就是之前说的位移值,而Value就是物理磁盘位置值。所以,这里你能看到代码调用了relativeOffset(buffer, n) + baseOffset计算出绝对位移值,之后调用physical(buffer, n)计算物理磁盘位置,最后将它们封装到一起作为一个独立的索引项返回。

我建议你去看下relativeOffset和physical方法的实现,看看它们是如何计算相对位移值和物理磁盘位置信息的。

有了parseEntry方法,我们就能够根据给定的n来查找索引项了。但是,这里还有个问题需要解决,那就是,我们如何确定要找的索引项在第n个槽中呢?其实本质上,这是一个算法问题,也就是如何从一组已排序的数中快速定位符合条件的那个数。

4 二分查找算法


到目前为止,从已排序数组中寻找某个数字最快速的算法就是二分查找了,它能做到O(lgN)的时间复杂度。Kafka的索引组件就应用了二分查找算法。


Kafka索引应用二分查找算法快速定位待查找索引项位置,之后调用parseEntry来读取索引项。不过,这真的就是无懈可击的解决方案了吗?



改进版


显然不是!我前面说过了,大多数操作系统使用页缓存来实现内存映射,而目前几乎所有的操作系统都使用LRU(Least Recently Used)或类似于LRU的机制来管理页缓存。

Kafka写入索引文件的方式是在文件末尾追加写入,而几乎所有的索引查询都集中在索引的尾部。这么来看的话,LRU机制是非常适合Kafka的索引访问场景的。

但,这里有个问题是,当Kafka在查询索引的时候,原版的二分查找算法并没有考虑到缓存的问题,因此很可能会导致一些不必要的缺页中断(Page Fault)。此时,Kafka线程会被阻塞,等待对应的索引项从物理磁盘中读出并放入到页缓存中。

下面我举个例子来说明一下这个情况。假设Kafka的某个索引占用了操作系统页缓存13个页(Page),如果待查找的位移值位于最后一个页上,也就是Page 12,那么标准的二分查找算法会依次读取页号0、6、9、11和12,具体的推演流程如下所示:

通常来说,一个页上保存了成百上千的索引项数据。随着索引文件不断被写入,Page #12不断地被填充新的索引项。如果此时索引查询方都来自ISR副本或Lag很小的消费者,那么这些查询大多集中在对Page #12的查询,因此,Page #0、6、9、11、12一定经常性地被源码访问。也就是说,这些页一定保存在页缓存上。后面当新的索引项填满了Page #12,页缓存就会申请一个新的Page来保存索引项,即Page #13。

现在,最新索引项保存在Page #13中。如果要查找最新索引项,原版二分查找算法将会依次访问Page #0、7、10、12和13。此时,问题来了:Page 7和10已经很久没有被访问过了,它们大概率不在页缓存中,因此,一旦索引开始征用Page #13,就会发生Page Fault,等待那些冷页数据从磁盘中加载到页缓存。根据国外用户的测试,这种加载过程可能长达1秒。

显然,这是一个普遍的问题,即每当索引文件占用Page数发生变化时,就会强行变更二分查找的搜索路径,从而出现不在页缓存的冷数据必须要加载到页缓存的情形,而这种加载过程是非常耗时的。

基于这个问题,社区提出了改进版的二分查找策略,也就是缓存友好的搜索算法。总体的思路是,代码将所有索引项分成两个部分:热区(Warm Area)和冷区(Cold Area),然后分别在这两个区域内执行二分查找算法,如下图所示:

乍一看,该算法并没有什么高大上的改进,仅仅是把搜寻区域分成了冷、热两个区域,然后有条件地在不同区域执行普通的二分查找算法罢了。实际上,这个改进版算法提供了一个重要的保证:它能保证那些经常需要被访问的Page组合是固定的。

想想刚才的例子,同样是查询最热的那部分数据,一旦索引占用了更多的Page,要遍历的Page组合就会发生变化。这是导致性能下降的主要原因。

这个改进版算法的最大好处在于,查询最热那部分数据所遍历的Page永远是固定的,因此大概率在页缓存中,从而避免无意义的Page Fault。

下面我们来看实际的代码。我用注释的方式解释了改进版算法的实现逻辑。一旦你了解了冷区热区的分割原理,剩下的就不难了。


5 总结


AbstractIndex是Kafka所有类型索引的抽象父类,里面的mmap变量是实现索引机制的核心,你一定要掌握它。
改进版二分查找算法:社区在标准原版的基础上,对二分查找算法根据实际访问场景做了定制化的改进。你需要特别关注改进版在提升缓存性能方面做了哪些努力。改进版能够有效地提升页缓存的使用率,从而在整体上降低物理I/O,缓解系统负载瓶颈。你最好能够从索引这个维度去思考社区在这方面所做的工作。

实际上,无论是AbstractIndex还是它使用的二分查找算法,它们都属于Kafka索引共性的东西,即所有Kafka索引都具备这些特点或特性。

往期推荐


大厂如何解决数值精度/舍入/溢出问题

大厂数据库事务实践-事务生效就能保证正确回滚?

线上问题事迹(一)数据库事务居然都没生效?

硬核干货:HTTP超时、重复请求必见坑点及解决方案

给大忙人们看的Java NIO教程之Channel





目前交流群已有 800+人,旨在促进技术交流,可关注公众号添加笔者微信邀请进群


喜欢文章,点个“在看、点赞、分享”素质三连支持一下~

浏览 36
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报