贝壳基于Druid的OLAP引擎应用实践
分享嘉宾:王啸@贝壳找房 资深研发工程师
编辑整理:王建新
出品平台:DataFunTalk
导读:贝壳作为全国领先的房产交易和租赁在线服务平台,有很多业务场景会产出大量实时和离线数据,针对这些数据进行查询分析,对于企业发展和业务拓展至关重要。不同业务线不同查询场景下,单一技术手段很难满足业务方的需求,Druid就是我们在探索之路上发现的比较切合业务方需求的OLAP引擎之一,基于Druid我们做了深入地实践,接下来就由我和业界朋友们一起分享。
内容包括:
贝壳OLAP平台介绍
OLAP技术选型策略
Druid在贝壳的应用实践
Druid结合贝壳业务场景的改进
未来规划
1. 平台简介
平台的使用对象主要是经纪人、运营人员、房产分析师和客户。平台的架构就如上图所示,整个平台分为四层,第一层为应用层,应用层主要是看板和报表。第二层是指标层,提供了一个一站式的指标平台,主要使用对象是数仓人员,数仓人员可以在一站式平台上做数据建模、例行作业任务配置、指标定义加工以及指标API输出。第三层为路由层,路由层是一个统一查询引擎,提供查询语义转换、查询缓存、查询降级以及不同类型的引擎切换。第四层是OLAP引擎,目前使用的主要引擎是Kylin、Druid以及Clickhouse,其中Kylin和Druid主要是分担离线指标业务,Clickhouse负担实时指标业务。在2020年4月份之前,平台底层的离线指标引擎主要依托Kylin为主,在2020年5月份之后,逐渐引入Druid引擎,目前两个引擎的流量比例Druid占60%左右,kylin在40%左右。
2. 引入Druid的原因
引入Druid引擎是因为在使用离线指标的时候发现用Kylin引擎存在一些问题,主要包括五大问题:
使用Kylin的数据源构建时间比较长,有些业务方要求在某一个时间点前数据必须就绪。
数据源的底层存储占用比较大。
查询的灵活性比较差,有时候为了适配不一样的查询,需要构建多个cube进行适配。
源数据的膨胀率巨大,相对于源数据表里ORC的文件大小,它的膨胀率比较惊人,有可能产生可怕的维度爆炸。
它的调优门槛相对较高,需要对数仓的同学做一定的培训才能上手。
1. 为什么选择Druid
首先分享下贝壳在选用Druid时候采用了哪些选型策略。其实选型最重要的是大家要知道自己需要什么样的一个OLAP引擎。针对贝壳来说,主要有五点要求,第一个是PB级别的数据量;第二个是亚秒级别的响应;第三个是支持较高的并发,在贝壳的业务场景下,QPS来说相对较高,平均QPS在五六百左右,峰值可达到2000;第四个是灵活应用的查询接口,支持在QE引擎层灵活接入,让统一SQL查询引擎屏蔽底层OALP引擎查询语法差异;第五个是快捷导入数据,按时生成查询数据源,满足业务方查询需求。
根据如上五点要求,可选用OLAP引擎主要是Druid、Kylin、Doris、Clickhouse这四种。根据贝壳需要支持高并发和支持精确去重的需求,Druid的并发性能和Kylin的并发性能比较接近,Doris比Clickhouse好一些,Clickhouse的高并发性能没有那么好。Druid的原生版本支持SQL级别的精确去重,但是离线的多列精确去重原生版本是不支持的。在社区里有伙伴用bitmap实现的一个版本,它参照Kylin的字典编码用AppendTrie Tree实现了离线多列精确去重。综合以上因素及运营成本,最终选定了使用Druid。
2. Druid与Kylin对比测试
我们做了一些初步测试来验证两种引擎的性能
在构建时长方面,选用了包括全量表,增量表在内的线上常用七个数据源,使用相同的计算资源情况下,对近一个月的平均导入时长进行对比,Druid导入时长比Kylin导时长要短,基本上是Kylin的1/3左右。
在平均查询时长方面,对比了在200QPS左右的查询时长,选取的查询范围包括日/周/月/季度/半年等时间范围,Druid的查询时间和Kylin比较接近。理论上Kylin的查询时间应该更快,因为Kylin的预聚合程度更高,相当于把所有的查询条件及度量都已经进行了预计算,只要调优的技术比较好,它的查询速度应该是最好的。
在数据源占用HDFS存储及相较于源数据的膨胀率方面,也做了一些统计,发现Druid在HDFS占用存储量相较与Kylin的cube占用存储明显占优。图中前面一排蓝色是Druid,后面几个颜色都是kylin的cube,可能有同学会问为什么有的数据源有很多cube,因为kylin要适配不同的查询类型,会预聚合多个cube来满足查询速度。从膨胀率看,相较于ADS层的hive源表数据,Druid的膨胀率大概是1到3倍左右,Kylin的膨胀率在18倍到100倍。
3. Druid的架构
Druid架构主要有四层,第一个是查询服务层,第二个是数据存储层,第三个是集群管理层,第四个是数据摄取层。查询服务层主要是它的broker,用于接收用户端的查询请求。数据存储层在生产部署的时候,再分两层,一个是数据热层,一个是数据冷层。数据热层一般存储近半年的数据,以天为聚合条件;数据冷层一般存储超过半年的数据,包括一年、两年甚至五年的数据,以月度粒度聚合。它们的存储介质也有区别,热层用NVMe的SSD盘,冷层用HDD普通的机械硬盘做raid10,以提升IO读写性能。数据摄取执行层主要是负责离线任务、实时任务执行。集群管理层就是管理数据摄取的master即 overlord和数据存储层的master即coordinator。
1. 指标构建
Druid在贝壳的应用是通过一站式指标平台实现的,平台整合了数仓建模、指标定义、指标加工、指标API四项功能。
一个Druid指标的创建流程如下:首先,用户在元数据平台上去找到目标OLAP表;然后,创建model和cube,这里model和cube参考kylin建模思想。其中model指定事实表和维度表的join关系,同时指定度量列和维度列。cube是在model的基础上再次做维度和度量选取,指定度量列的度量规则,包括count distinct/sum/count/avg等度量规则。在创建完模型后一站式指标平台会自动构建一个Hive2Druid作业任务。
目前离线指标任务支持小时级别、天级、周级、月级,也支持in多少个pt(日期分区)、大于等于、小于等于这些复杂的时间表达式,用于支持用户做历史数据重刷。任务构建完成之后,会自动在定时的时间点往Druid里灌入数据。最后用户可以在一站式指标平台上创建指标,比如经纪人带客户看房的指标,建完后关联相应的cube就可用了。一些看板的RD研发同学可以通过API的方式直接调用,还有一些用户可以直接在Odin上配置看板,来查询底层数据。
2. 应用效果
从支撑的查询量看,当下查询量每日6000万左右,四月份前约为3000万左右,相较于年初翻了一倍,Kylin和Druid的分担比例大概是4:6,Druid占了60%。
从构建时长看,Druid的数据源构建时长仅为Kylin构建时长的1/2左右。
从存储占用大小看,据不完全统计,在2020年4月份,Kylin底层占用600TB,全部迁移Druid后,存储量预计为Kylin的1/10左右,节省的底层存储资源非常可观。
从三秒内到达率看,Druid三秒内到达率基本维持在99.9%以上,kylin维持在99.3%-99.4%的水平,因为Kylin的cube调优比较麻烦,所以预期也会比Druid稍低。
1. 改进总体说明
关于Druid,贝壳结合业务场景做了一些改进,本次分享的改进点主要从两个方面介绍,第一个是Druid数据源的导入方式,第二个是保障Druid集群稳定性。
2. Druid数据源导入优化
Index hadoop作业类型数据导入优化
第一个方面的优化是针对离线的hadoop作业类型数据导入。
首先,介绍一下整个数据导入过程:
① 之前提到cube构建好后,会自动构建一个调度任务,在触发点触发的时候,会从hive仓库中获取数据。
② 从hive数仓中获取数据落盘形成parquet的文件,parquet文件就绪后会通知Druid overload(执行数据灌入的master节点)数据已经就绪。
③ overload会去加载hdfs上的parquet文件,开始执行hadoop index作业;Hadoop index作业的执行主要是分为三个步骤,第一步是partition作业,这一步决定会分多少个segments。第二步是构建字典左右,当前构建字典部分引用的是社区小伙伴提供的离线精确去重的版本,注意只有度量规则中指定count distinct列的时候,才会去触发这个作业。第三步是生成索引作业,针对维度列和度量列生成倒排索引和bitmap索引。
④ Hadoop index作业运行完后,segment持久化到深存hdfs,落盘后historical从hdfs上拉取文件,生成自有存储格式,这样整个数据导入就结束了。
关于上述步骤中离线数据灌入的作业时长,主要取决于两个因素,第一个是源表本身的数据量大小,第二个是使用了多少map reduce的资源。
这里举一个例子,对于一个数据量是1亿4000万、列数40列、有count distinct和sum度量,基数在600万左右的数据增量表,去hive里查数据生成parquet文件时候,我们预先repartition出20个分区,500万行一个分区,生成20个parquet文件,分区的数量会决定partition job的map数量;第二个是时序字段里面只有一天,因为它是一个增量表,当日PT里面只有昨天一日的数据,如果按照昨天一日数据直接向druid里面灌入,在生成索引阶段,它的reduce数量只有一个,作业运行效率非常差,这里我们根据经验直接指定numShards的数量是5,每个map reduce的内存资源是8G。按如上步骤进行优化后数据导入效率提升明显。
上图为近七日优化前和优化后数据源导入时长对比,优化后的时间约为优化前的1/3。
使用index hadoop作业类型往Druid导入数据存在列精确去重的一些问题,如果列为高基数列,比如5000万、6000万、1亿这样的高基数,index generator job在map阶段去拉取字典生成bitmap数组的时候,在container里会出现full GC的问题。一般的解决方法就是调整map的内存,但是不可能无限制的调整,这个也是我们未来优化的一个重点。
Kafka Index 作业类型增加多列实时精确去重能力
第二个改进是针对kafka index作业类型增加多列实时精确去重的能力,主要是因为我们的业务方有GMV、GSV、分享数实时精确去重统计需求。Druid的原生版本支持sql语法的精确去重,但这种查询性能并不高且只能支持单列精确去重,也就是说一个查询语句里面只能执行select count(distinct A) from table 1,不能执行select count(distinct A), count(distinct B) from table 1。社区之前提供的离线精确去重版本不能支持实时场景,仅可在近实时场景中采用(即小时级别任务),秒级别或者分钟级别不适合。
贝壳的解决方法是借鉴社区的一些经验,实现了一个CommonUnique的扩展。扩展的实现主要有三点,第一点是用雪花算法生成数字编码,就是在执行kafka index job时候在本地服务内起动一个生成雪花ID的service,这样可以使得ID生成速率更快。第二点是用redis集群实现字典存储,通过redis基于缓存的分布式锁,可以保证字符串的数值编码的唯一性。因为ID是一直递增的,传递进来一个字符串生成一个ID,再往Redis里set的时候,如果发现value值已经有了,直接就返回已有值。第三点是查询的时候使用64位的bitmap做去重计数,这是因为雪花ID生成的数值编码是long型,所以用64位更合理。测试了近一个月内的查询,基本可以达到秒级以下的返回。
上图是如何使用CommonUnique这种指标类型的说明,在数据摄入阶段,可以在metricsSpec里去定义CommonUnique这种指标计算类型,fieldName是原始列,name是需要进行bitmap编码的列。右边举的是一个groupby的查询例子,在数据查询的时候,在aggregation里可以指定CommonUnique,然后name和fieldName都指定bitmap编码列,就可以实现一个select count distinct这一列的查询需求。
3. 保障Druid集群稳定的措施
背景说明
当前Druid查询的高峰期在上午7点到中午12点,Druid峰值QPS最高约1200左右,上层统一SQL查询引擎峰值QPS在2000左右。
在Druid上面承载了20多个业务线的查询需求,如果仅仅依靠Druid原生提供的负载限流策略是没有办法满足的。因为每个业务线的查询重要程度不一样,查询的sql的复杂程度也不一样,所以需要针对不同的业务线、不同的查询数据源做精细化的控制,原生控制粒度太粗,单纯只是超时时长自动kill在高峰期很难满足不同业务线的查询需求。
三项集群稳定措施:
① 查询缓存
② 动态限流
③ hdfs存储优化。
查询缓存
这里的查询缓存不是指Druid自身的缓存(即Druid broker、historical上的缓存)。此处的缓存是上层服务针对Druid查询结果的缓存,即指标API缓存和统一SQL查询引擎缓存。在实际应用当中,指标API层缓存命中率约为30%,查询引擎的缓存命中率约为17%,这样上层就可以缓解掉40%多的请求。
既然使用了查询缓存,就需要思考在什么样的时间点去清理查询缓存,不让用户查到一些脏数据。我们缓存清理的时机一般设置在historical segment cache就绪的一个时间点。这里需要提一下,Druid数据摄取任务的hadoop index task作业结束的时间跟segment落盘的时间是不一致的,也就是说task任务结束了,但segment落盘可能还要很长时间。segment落盘取决于两个因素,一个是historical的数量,一个是historical上用于load segment的线程数。此时就不能用task执行结束时间作为数据就绪的时间。在社区里面也有人提过能不能让task执行结束的时间就是historical segment落盘的时间,但社区没有同意这个改良建议,因为如果task里需要很多segment去落盘,比如说要灌两年的数据,就要每次落700多个segment,有的用户还可能要落五年的数据,这样去落盘时会影响task线程,落的数据量大会导致task一直处于占用状态,进而会影响新的数据摄取,浪费线程资源。我们的解决办法是用户提交这种index hadoop作业的时候,会将taskid放在一个队列里,当任务是success后,去比较成功任务的执行时间和落到盘上的segment的version版本时间。如果segment version版本时间比 hadoop index作业的时间戳要大,就认为已经落盘成功了,这样才会去触发清理缓存,如果没有的话,就直接放到队列里面等待下次轮询,超过一定生命周期会自动清理。
动态限流
第二个举措是动态限流,原生Druid限流策略是在broker端限流,比如说集群能扛400QPS,如果超过400QPS就直接拒绝,但是这不足维持我们业务场景下集群规模限流,因为当流量打过来的时候,400QPS内的查询如果语法复杂度比较高,会直接把historical的CPU打满,进而影响到其他高优先业务线的查询。我们采用的限流策略是通过broker端去收集historical的CPU load负载信息,如果historical的负载信息相对比较高,会根据业务线的重要程度及近五分钟内高热度的查询数据源逐级去限流,也就是说会去保障最高优先级的业务线不被限流,对次要的业务线进行限流。实际应用中有些业务线的查询请求不是人工触发的,而是他们为了展现报表速度更快会用程序去刷非常高的QPS。像这种只要保证它一次成功就可以。所以我们可以针对这种查询进行限流,在CPU负载比较低的时候再去执行该查询请求,保证高优的一级报表产出。
上图反映的是限流后一些效果,高峰期7点到12点左右会有很多毛刺现象,其实是CPU打得比较高的时候会针对某些次要的业务线或者是它使用的一些数据源进行限流,起到一个削峰填谷的作用,保证高优先级的业务线查询不受到较大影响。
深存优化
第三个举措是针对深存HDFS上的优化。当前整个平台的Druid引擎承载了300多个数据源,10万多个segment,但是在HDFS的数目上,有一个特别有趣的现象,就是它的目录数竟然达到400万,文件数也达到400万,小于20兆的小文件非常多,占比在50%以上。主要原因是有些数据源是全量表,一次作业任务可能刷一年、两年的数据,它的segment是按照天进行聚合的,因此当任务例行了一个月或者是一个季度,它在hdfs上目录数会特别多。如果目录数太多会影响整个hdfs namenode的性能。我们的优化策略主要是三个方面,第一个是近半年的数据用天粒度去聚合,其他历史数据用月粒度的聚合。第二个就是在查询低峰期的时候,用Druid的健康检查功能自动获取哪些数据源的分片不合理,我们需要进行shard合并,然后在低峰区触发compact任务合并多余的分片信息。大家需要注意是合并任务一般不要放在查询高峰期执行,因为会影响整个集群的线程资源,特别是影响segment落盘,对查询性能影响比较大。第三个是刚才说的那种全量表加载时间跨度比较大,短则一年,多则五年,历史沉淀的segment只保留最近三天的版本。
关于未来的规划,主要涉及两个方面:
第一个是实时指标在Druid上的深入实践,目前Druid主要承载的是离线指标,在实时指标方面,主要是用Clickhouse做了100+的实时指标,数量还比较少,后续会把实时指标业务逐渐往Druid上倾斜。因为Druid已经实现了实时精确去重能力,相较于Clickhouse较高的运维成本,具备了分担实时业务的能力。
第二个是离线导入方式的进一步探索,主要分为三点:
① 针对离线作业类型,之前原生支持的是index hadoop,计划尝试用index spark job作为导入类型,这样会比map reduse的导入速度有较大提升。② 尝试使用index parallel job这种针对小的数据源导入,如果数据源的数据级别不大,可以不依赖hadoop方式,因为分配map reduce资源也会比较耗时。
③ 使用hive做全局字典,因为在做高基数列精确去重的时候,index generator map阶段很容易出现fullGC的问题,因为不可能无限制地对map内存进行调优,所以希望能参照kylin 4.0实现用hive做字典存储用于精确去重。