峰值超2亿/秒,Kafka在美团数据平台的实践
共 9604字,需浏览 20分钟
·
2022-02-25 11:55
导读:本文将介绍Kafka在美团数据平台的实践,主要内容包括:① Kafka在美团数据平台的发展现状和面临的挑战,主要是海量数据下如何保证读写延迟的问题,以及大规模的集群管理与优化;② 面对上述挑战,美团所做的优化实践;③ 未来美团数据平台Kafka的优化方向。
1. 现状
首先了解一下Kafka在美团数据平台的现状。
如图1-1所示,蓝色部分描述了Kafka在数据平台定位为数据存储层。主要的职责是做数据的缓存和分发,它会将收集到的binlog日志分发到不同的数据系统里,这些日志来源于业务日志、用户行为日志以及业务的数据库。这里的数据系统包括通过ODS入仓提供离线计算使用、直接供实时计算使用、通过DataLink同步到日志中心,以及OLAP做分析使用。
Kafka在美团的集群规模总体机器数已经超过了7500台,单集群的最大机器数也已经到了1500台。数据规模上,天级消息量已经超过了21P,天级的消息条数达到了11.3万亿,天级消息量峰值也达到了2.46亿/秒。
随着集群规模的增大,数据量的增长,Kafka面临的挑战也是愈发的严峻。下面看一下具体的挑战有哪些。
2. 挑战
图1-2 Kafka在美团数据平台面临的挑战
如图1-2所示,具体的挑战可以概括为两部分:
第一部分是慢节点影响读写,这里慢节点参考了HDFS的一个概念,具体定义指的是读写延迟tp99大于300ms的broker。影响慢节点的原因有三个,第一个是集群负载不均衡会导致局部热点,就是整个集群的磁盘空间很充裕或者ioutil很低,但部分磁盘即将写满或者ioutil打满;第二个是pagecache容量不足会导致磁盘IO瓶颈。比如说,80GB的pagecache在170MB/s的写入量下仅能缓存8分钟的数据量。那么如果消费的数据是8分钟前的数据,就有可能触发磁盘读;第三个是consumer线程模型缺陷会导致延时指标的失真。例如当消费的分区处于同一broker时,tp90可能小于100ms,但是当他们处于不同broker时,tp90可能会大于100ms。
第二部分是大规模集群运维的复杂性,具体表现有四个方面,一是不同topic之间会相互影响,某些或某个topic的流量突增,或者某些消费者的回溯读会影响整体集群的稳定性;二是Kafka原生的broker粒度的指标不够健全,问题的根因分析变得很困难;三是故障的感知无法做到及时,故障处理成本很高;四是Rack级别的故障会造成部分分区不可用。
接下来介绍一下针对读写延迟问题,美团的数据平台做了哪些优化。首先从宏观上将受影响因素分为应用层和系统层,然后详细介绍应用层和系统层存在的问题,并给出对应的解决方案,包括流水线加速、fetcher隔离、迁移取消和cgroup资源隔离等,下面具体来看一下各种优化方案是如何实现的。
1. 概览
图2-1 Kafka概览图
如图2-1,这张图是针对读写延迟碰到的问题,以及对应优化方案的一个概览图。我们把整个受影响的因素分为应用层和系统层。
应用层,主要表现在系统设计的不合理导致,包括消费者端的单线程模型存在缺陷导致运维指标失真,并且单consumer消费的分区数是不受限制的,当消费的分区数增多的时候可能会引起回溯读,因为消费能力不足就无法跟上实时最新的数据;其次是broker端,broker端主要表现在负载不均衡上,具体表现是磁盘使用率不均衡方面。
我们针对此做了磁盘均衡,但磁盘均衡需要使用分区迁移,分区迁移又引入了一些新的问题,包括迁移只能按批提交,这存在长尾问题,以及迁移fetcher和实时拉取fetcher存在资源竞争,分区迁移的fetcher会影响实时消费。
系统层,主要包括三个方面,一是pagecache的容量不足会导致磁盘读写,磁盘读写的性能显著慢于内存,而且容量不足时还会导致pagecache污染,pagecache污染后,磁盘读和回溯读会影响实时读;另一方面,Kafka目前使用的disk主要是HDD,HDD是比较符合顺序读写的场景。但是对于随机读写的场景,它的性能是不足的;最后由于CPU的资源竞争,在美团这边为了提高资源的利用率,IO密集型的服务(比如Kafka)会和CPU密集型的服务(比如实时作业)混布,混布其实是存在资源竞争的,也会影响读写的延迟。
针对刚才提到的应用层和系统层存在的各种问题,我们这边分层的去解决。对于应用层提到的每一点问题都会有针对性的解决方案,比如说限流、流水线加速、资源隔离等。针对系统层存在的问题,我们做了cgroup的优化以及物理核的隔离来保证当CPU实时计算的飙升时不会影响读写延迟。
2. 应用层
① 磁盘均衡
图2-2 Kafka应用层磁盘均衡
下面介绍一下读写延迟在应用层遇到到的问题,磁盘热点导致磁盘利用率不均衡,它会带来两个问题:
磁盘实时读写延迟变高,比如说tp99超过300ms就已经造成慢节点了;
集群整体利用率不足,虽然集群容量非常充裕,但是部分磁盘已经写满,这个时候甚至会导致某些分区暂时中断服务。
针对这两个问题我们做了基于空闲磁盘优先这样一个分区迁移计算计划,整个计划分为5个点。如图2-2 所示,首先会有一个组件叫rebalancer,rebalancer通过目标的使用率和Kafka monitor组件不断从Kafka broker集群上报上来的当前磁盘的使用状况这两类指标持续生成具体的分区迁移计划,执行迁移计划并检查进度;然后rebalancer会向zookeeper的reassign节点提交刚才生成的迁移计划,Kafka的controller收到这个reassign事件之后会向整个Kafka broker集群提交reassign事件,然后Kafka broker集群让整体磁盘利用率趋于均衡值这样一个目标执行磁盘迁移计划。
如图2-2所示,对于所有的disk,三个分区属于一个相对均衡的状态,那么如果有一个四个分区的disk,就会把其中一个分区迁移到另外两个分区的disk上,最终尽可能地保证整体磁盘利用率是均衡的。但是Kafka的分区迁移只能是按组提交的,在执行分区迁移过程中碰到了许多新的问题,下面会继续介绍这些问题是怎么解决的。
分区迁移存在一个迁移效率不足的问题,因为是按组提交的,在上一批没有完成之前,下一批无法开始提交,这样就会导致整体迁移进度受阻,进而对读写请求造成影响。
针对迁移效率问题以及带来的它带来的影响,我们主要做了三点改进:第一点是做流水线加速,流水线加速能够保证长尾分区不影响整体迁移进度;第二点是迁移取消,在原生Kafka版本中,当一个分区迁移被提交后,是无法中断的,只能等他迁移完成,那么如果他在影响一个实时读写请求的时候,如果它迟迟不能完成,可能另一个实时读写的请求一直都会受到影响;第三点是做fetcher隔离,Kafka在做分区迁移的时候会利用follower通过最近读去拉数据同步,当发起最近读的迁移请求和某一个实时写请求共享同一个fetcher的时候,迁移分区的读请求会影响实时分区的读请求,后面会进一步详细描述具体的问题和对应的解决方案。
② 迁移优化
优化一,流水线加速
图2-3 流水线加速
针对长尾分区问题,我们主要是做了流水线加速,如图2-3所示,箭头以上原生Kafka版本只支持按组提交,比如说一批提交了四个分区,当tp4这个分区一直卡着无法完成的时候,后续所有分区都无法继续进行。采用流水线加速之后,即使tp4这个分区还没有完成,当其它三个分区已经完成的时候,后续就可以继续提交新的分区。可以看出在相同的时间内,原有的方案受阻于tp4没有完成后续所有分区都没办法完成,但是在新的方案中,tp4分区已经迁移到tp11分区了。图中虚线代表了一个无序的时间窗口,主要用于控制并发,目的是为了和原有的按组提交的个数保持一致,避免过多的迁移影响读写请求服务。
优化二,迁移取消
图2-4 迁移取消
如图2-4所示,箭头左侧描述了因为迁移影响的三种线上类型。第一种是因为迁移会触发最旧读,同步大量的数据,在这个过程中会首先将数据回刷到pagecache上,那么可能会污染pagecache,进而导致某个实时读的分区发生cache miss,就会导致实时读触发磁盘度进而影响读写请求;第二类和第三类分别描述的是当存在某些异常节点导致迁移hang住的时候,想对topic做某些操作,比如对topic扩容,例如在午高峰时由于流量上涨想对topic扩容,实际上这个时候扩容是无法完成的。因为在Kafka迁移过程中这些操作都被限制住。第三个和第二个有些类似,它的主要问题是当目标节点挂了,这个时候topic扩容也是无法完成的,用户可能一直忍受读写请求受影响,直到迁移完成。针对这种场景,线上无法忍受由于长时间迁移导致读写延迟变高的。
针对上面提到的各种问题,我们支持了一个功能叫迁移取消。当遇到这类问题时,管理员可以调用迁移取消命令,中断正在迁移的分区,针对第一种场景,pagecache就不会被污染,实时读得以保证;在二、三类场景中,因为迁移取消,扩容得以完成。
迁移取消必须要删除那些还没有完成的分区,大量的删除会导致磁盘IO,称为性能瓶颈进而影响读写,因此在这里我们针对迁移取消做了平滑删除,避免因大量删除影响性能问题。
优化三,fetcher隔离
图2-5 fetcher隔离
如图2-5,绿色代表实时读,红色代表延时读。当某一个follower的实时读和延时读共享同一个fetcher时,延时读会影响实时读。因为每一次延时读的数据量是显著大于实时读的,而且延时读容易触发磁盘读,可能数据已经不在pagecache中了,显著的拖慢了fetcher的拉取效率。
针对这种问题我们做的策略叫fetcher隔离。也就是说所有isr的follower共享fetcher,所有非isr的follower共享fetcher,这样就能保证所有isr中的实时读不会被非isr的回溯读所影响。
③ Consumer异步化
首先来了解一下Kafka-broker分阶段延时统计模型,当一个Kafka的producer或consumer请求进入到Kafka-broker时,首先由processor将请求写入RequestQueue里面,然后RequestHandler就会从RequestQueue源源不断地去拉取请求进行处理,在RequestQueue中的等待时间是RequestQueueTime,RequestHandler具体的执行时间为LocalTime,当RequestHandler执行完毕后会将请求扔到DelayedPurgatory组件中,这个实际上就是一个延时队列。这个延时队列当触发某一个延时条件完成了以后会把请求塞到ResponseQueue中,在DelayedPurgatory队列持续的时间为RemoteTime,processor会不断的从ResponseQueue中将数据拉取出来发往客户端,标红的ResponseTime是可能会被客户端影响的,因为如果客户端接收能力不足,那么ResponseTime就会一直持续增加,从Kafka-broker角度,每一次请求总的延迟叫RequestTotalTime包含了刚才所有流程分阶段计时总和。
图2-7 Consumer异步化
主要问题是因为Kafka原生consumer基于NIO的单线程模型存在缺陷。如图2-7所示,在phase1,user首先在调用poll请求时,当本地无数据时,同时向broker1、broker2和broker3发送请求,实际上broker1的数据先回来了,Kafka Client立即将数据写入CompleteQueue,这个时候立即返回,不会再拉取broker2和broker3的数据,此时user线程会直接从CompleteQueue中读取数据,然后直接返回。此时broker2和broker3服务端可能已经处理好,数据已经准备就绪。user线程会继续调用poll,访问下一批请求,可是因为CompleteQueue依然存在broker1上次拉取的数据,这时user线程直接返回了,这样就会导致broker2和broker3中已就绪的数据一直得不到拉取。如图中phase2,因为单线程模型存在缺陷导致waitFetch这部分时长变大,导致Kafka-broker延时指标不断升高。
针对这个问题我们的改进是引入异步拉取线程。异步拉取线程会及时的拉取就绪的数据,避免服务端延时指标受影响,而且原生Kafka并没有限制同时拉取的分区数,我们在这里做了限速,避免GC和OOM的发生。异步线程在后台持续不断地拉取数据放到CompleteQueue中。
3. 系统层
① Raid卡加速
图2-9 Raid卡加速
众所周知,Kafka的写入借助了Zero Copy技术将数据直接写入pagecache,但是随着随机读写并发量的提升,随机写导致的性能不足问题就会显现出来。表现是随机写入的延时会显著升高,针对这个问题我们引入了Raid卡。Raid卡有一个好处是自带缓存,而且Raid卡使用的是Raid-0模式,并没有冗余,与pagecache类似,在这一层会继续做merge,把数据merge成更大的block写入disk。更加充分利用顺序写HDD的带宽,借助Raid卡保证了随机写的性能是比较稳定的。
② cgroup隔离优化
图2-10 cgroup隔离
在介绍cgroup的隔离优化之前需要提到的背景是,为了提高资源利用率,美团数据平台将IO密集型应用和CPU密集型应用混布。IO密集型应用在这里指的就是Kafka,CPU密集型应用在这里指的是Flink和Storm,但是原有的隔离策略存在两个问题:首先是物理核本身会存在资源竞争,在这个物理核下,共享的L1cache和L2cache都存在竞争,当实时平台CPU飙升时会导致Kafka读写延时受到影响;另外,Kafka的HT跨NUMA,增加内存访问耗时,如图2-10所示,跨NUMA节点是通过QPI去做远程访问,而这个远程访问的耗时是40ns。
针对这两个问题我们改进了隔离策略,针对物理核的资源竞争,我们新的混布策略Kafka是独占物理核的,也就是说在新的隔离策略中,不存在同一个物理核被Kafka和Flink同时使用;然后是保证Kafka的所有超线程处于同一侧的NUMA,避免Kafka跨NUMA带来的访问延时。通过新的隔离策略,Kafka的读写延时不再受Flink CPU飙升的影响。
4. 混合层-SSD新缓存架构
图2-11 混合层SSD新缓存架构
首先来了解一下Kafka的数据消费模型,Kafka利用操作系统提供的ZeroCopy技术处理数据读取请求,pagecache容量充裕时数据直接从pagecache拷贝到网卡,有效降低了读取延时。但是实际上往往pagecache的的容量是不足的,因为它不会超过一个机器的内存,容量不足时,ZeroCopy就会触发磁盘读,磁盘读不仅显著变慢,还会污染pagecache影响其他读写。
如图2-11中左半部分所示,当一个延迟消费者去拉取数据时,发现pagecache中没有它想要的数据,这个时候就会触发磁盘读,磁盘读后会将数据回写到pagecache,导致pagecache污染,自己读写延迟变慢同时也会导致另一个实时消费受影响,因为对于实时消费而言,它一直读的是最新的数据,最新的数据按正常来说时不应该出发磁盘读的。
图2-12 SSD新缓存架构方案选型
针对这个问题,我们这边在做方案选型时有两种方案,
方案一,读磁盘时不回写pagecache,比如使用DirectIO,不过Java并不支持;
方案二,在内存和HDD之间引入中间层,比如SDD,如图2-12所示,随着读取并发的增加,SSD的性能并不会显著降低,非常适合我们的使用场景。
图2-13 SSD新缓存架构决策
针对SSD的方案也有两种选型:
方案一,可以基于操作系统的内核实现,这种方案SSD与HDD存储空间按照固定大小分块,并且SSD与HDD建立映射关系,同时会基于数据局部性原理,cache miss后数据会按LRU和LFU替换SSD中部分数据,业界典型方案包括OpenCAS和FlashCache。优势是数据路由对应用层透明,对应用代码改动量小,并且社区活跃可用性好;但是问题是局部性原理并不满足Kafka的读写特性,而且缓存空间污染问题并未得到根本解决,因为它会根据LRU和LFU去替换SSD中的部分数据。
方案二,是基于Kafka的应用层去实现,具体就是Kafka的数据按照时间维度存储在不同设备上,对于近实时数据直接放在SSD上,针对较为久远的数据直接放在HDD上,然后leader直接根据offset从对应设备读取数据。这种方案的优势是他的缓存策略充分考虑了Kafka的读写特性,确保近实时的数据消费请求全部落在SSD上,保证这部分请求处理的低延迟,同时从HDD读取的数据不回刷到SSD防止缓存污染,同时由于每个日志段都有唯一明确的状态,因此每次请求目的明确,不存在因cache miss带来的额外性能开销。同时劣势也很明显,需要在server端代码上进行改进,涉及的开发以及测试工作量较多。
图2-14 SSD新缓存架构具体实现
下面来介绍一下SSD新缓存架构的具体实现。
首先新的缓存架构会将log内的多个segment按时间维度存储在不同的存储设备上,如图2-14中的红圈1,新缓存架构数据会有三种典型状态,一种叫only cache,指的是数据刚写进SSD,还未同步到HDD上;第2个是cached,指数据既同步到了HDD也有一部分缓存在SSD上;第三种类型叫withoutCache,指的是同步到了HDD但是SSD中已经没有缓存了;
然后后台异步线程持续地将SSD数据同步到HDD上;
随着SSD的持续写入,当存储空间达到阈值后,会按时间顺序删除距当前时间最久的数据,因为SSD他的数据空间也是有限的;
副本可根据可用性要求灵活开启是否写入SSD;
从HDD读取的数据是不会回刷到SSD上的,防止缓存污染。
图2-15 SSD新缓存架构细节优化
介绍了具体实现之后,再来看一下细节优化。
首先是关于日志段同步,就是刚才说到的segment,只同步Inactive的日志段,Inactive指的是现在并没有在写的日志段,低成本解决数据一致性问题;
其次是做同步限速优化,在SSD向HDD同步时是需要限速的,同时保护了两种设备,不会影响其他IO请求的处理,向SSD写入数据也是需要限速的,因为SSD的使用寿命是有限的。
了解了读写延迟优化之后,下面来看一下Kafka在美团数据平台是如何保证大规模集群的稳定性的。
1. 隔离优化
图3-1 隔离优化
由于Kafka服务于多个业务,这些业务的topic混布在一起的话很有可能造成不同业务的不同topic之间相互影响。例如broker如果和controller混布在一起,当broker负载明显变高的时候,会导致controller无法及时处理请求,从而可能会造成整个集群发生故障,因为元数据的变更请求无法发送出去。
针对这些相互影响的问题,我们从业务、角色和优先级三个维度来做隔离优化。
第一点是业务隔离,如图3-1所示,每一个大的业务会有一个独立的kafka集群,比如外卖、团购、优选;
第二点是分角色隔离,这里Kafka的broker和controller以及他们依赖的组件zookeeper是部署在不同机器上的,避免之间相互影响;
第三点是可以分优先级,有的业务可能它的topic可用性等级特别高,那么我们就可以给他划分为VIP集群,给他更多的资源冗余去保证可用性。
2. 全链路监控
图3-2 全链路优化
随着集群规模的增大分区数变多,读写的客户端也会变多,Kafka当前提供的broker端粒度的延时指标在很多情况下无法真实反映某些客户端是否慢,还有一类问题是当集群发生故障时,如何能及时得到感知和处理。
针对这两个问题,我们做了全链路监控这样一个项目。把Kafka核心组件以及指标全部收集起来做了一个全链路的追踪,通过分析上报上来的日志和指标,我们就可以建立细粒度的日志大盘。当某一个读写请求变慢时,我们通过日志大盘很容易就知道他具体是慢在哪个环节。日志和指标的解析服务可以自动实时感知故障还有一些慢节点,这两类故障有一部分我们可以做到自动处理,我们会把他通过事件的方式通知到Kafka manager,然后Kafka manager会根据这个事件自动去处理这些故障。还有一类故障是无法得到自动处理的,比如说僵尸节点,僵尸节点指的是zookeeper的临时节点还没有掉线,但是这个节点不管是controller也好还是客户端也好,都已经无法访问了,访问就会报错或者超时,这一类故障需要人工介入处理,会直接发给具体的管理员。
3. 服务生命周期管理
图3-3 服务生命周期管理
首先介绍一下当集群规模增大以后存在的一系列问题。之前版本的Kafka也有一套自动化运维的系统,但是它存在一些问题,首先是状态语义存在歧义,无法真实反映系统状态,往往需要借助日志和指标去找到真实系统是否健康或者异常;然后是状态不全面,异常case需人工介入处理,误操作风险极大。
基于这两点问题,我们引入了生命周期管理机制,保证状态能真实反映系统状态。生命周期管理指的是从服务开始运行到机器报废停止服务的全流程管理。并且做到了服务状态和机器状态联动,无需人工同步变更。而且新的生命周期管理机制的状态变更由特定的自动化运维触发,禁止人工变更。
4. TOR容灾
图3-4 TOR容灾-挑战
最后一个集群管理优化是TOR容灾。随着集群规模的变大,Rack级别的故障变得平凡起来,而我们是无法容忍Rack级别的故障的,因为Rack级别的故障可能会导致分区不可用,原因是分区的多副本在同一个rack下,特别是在流存储环境下,当某些分区不可用时,它会导致收集侧的拥堵,影响其他topic的收集上报。并且当实时作业的某个分区出现异常时,它会影响整个链路。
如图3-4所示,当rack1发生故障时,TopicPartition1是完全不可用的,因为他的两个副本都在rack1上,TopicPartition2也是不可用的,虽然他有三个副本,但是他的两个副本都已经不可用了。
图3-5 TOR容灾-改进
针对Rack级别的故障,我们做了TOR容灾。改进了副本的分配算法,保证同一个分区的不同副本不在同一个rack下,如图3-5所示,即使rack1整个发生故障,也能保证所有分区是可用的。
最后介绍一下美团数据平台的Kafka未来可以做哪些优化:首先我们会继续去做Kafka的高可用建设,比如说客户端主动去做一些故障节点的避让,服务端通过多队列的方式去隔离一些异常请求,避免它们之间相互影响。另外,高可靠方面会去做quorum write多数派写优化,因为Kafka的ack等于1时是需要写入所有副本的。我们还会去做流批一体的存储架构,比如Kafka on HDFS。