干货|Spark优化之高性能Range Join

数据分析挖掘与算法

共 6146字,需浏览 13分钟

 ·

2021-12-14 20:00

作者|张兴超
编辑|林颖
供稿|ADI Carmel Team
本文共3884字,预计阅读时间10分钟

 导 读

Carmel是eBay内部基于Apache Spark打造的一款SQL-on-Hadoop查询引擎。通过对Apache Spark的改进,我们为用户提供了一套高可用高性能的服务,以满足eBay内部大量分析型的查询需求(如今单日查询量已超过30万)。
在生产中,我们发现有很多包含非等值连接的查询。因为BroadcastNestedLoop的实现,这些查询会产生效率问题,而变得非常耗时。本文就非等值连接中的Range Join进行分析,并重点介绍了我们对此所做的优化。

1 背 景

Background

Range Join 发生在两个表的连接(Join)条件中包含“点是否在区间中”或者“两个区间是否相交”的时候[1]。过去一周,我们的OLAP引擎(Spark)中,检测到7k多条这样的SQL查询语句,在所有包含非等值连接的SQL中占比82.95%(如下图所示)。
在现在的Spark实现中,Range Join作为一种非等值连接,是通过BroadcastNestedLoop(嵌套循环)的方式来实现的,时间复杂度为N*M,其中N为Stream表的行数,M为Build表的行数。当两个表都很大的时候,BroadcastNestedLoop效率不高的缺点就会变得愈发明显,连接过程可能会花费数个小时来完成,有的甚至无法给出结果。
比如下图中的两个例子:
案例1:数据分析师希望根据150w左右的用户登录IP,来查询用户所在的国家和地区。这就需要User Login IP表和eBay Data Warehouse(以下简称DW)中IP Range Lookup表(>1200w行)来进行连接,这在Spark引擎中需要4小时才能返回。
(点击可查看大图)

案例2:这个属于更为常见的案例,数据分析师会经常根据日期来查询相应时间段的关联数据,如下图所示,在我们系统中同样发现了很多耗时的查询语句(Query)。
(点击可查看大图)

无论从用户等待的耗时,还是系统资源的使用角度来看,这都是不能接受的。
本文中涉及的方案将在Spark中支持Range Join,以解决现有实现中效率低、耗时长的问题。结合Spark社区对Range Join的讨论[2-3],我们对原始方案进行了升级和重写,并成功应用于eBay OLAP的生产实践中。

2 Range Join的定义

Definition of Range Join

典型的Range Join主要有以下两种形式[1]
1)点在区间中
2)两个区间相交

Range Join的优化可以作用于有以下特点的连接上:
1)连接条件中包含“点在区间中”或者“两个区间重叠”;
2)连接条件中的所有值为以下类型:数值(Integral、Floating Point、Decimal)、日期(DATE)、时间戳(TIMESTAMP)或者空值(NULL);
3)连接条件中的Range值有相同的类型。如对于Decimal类型,要有相同的长度和精度;
4)连接类型可以是内连接(INNER JOIN)、交叉连接(CROSS JOIN)、左外连接(LEFT OUTER JOIN)和右外连接(RIGHT OUTER JOIN)。

3 方案设计

Project Design

我们对原始方案进行了升级和重写,主要包含以下几个步骤:
1)基于Build表创建一个Range Index数据;
2)Broadcast这个Index数据到Stream端;
3)Stream表基于这个Index进行连接匹配。
     和传统的嵌套循环连接(Nested Loop Join)相比,这会将连接的时间复杂度从n大幅降低为log(n),其中n是Build表的行数。

下图简要说明了该方案和传统Nested Loop Join的区别:Range Join的实现重点在构建Range Index,然后基于Index进行数据连接。
(点击可查看大图)

下面我们将分别阐述Index的构建过程和连接时的查找过程。

3.1 基于Range构建的查询方案设计
如下表所示,我们现有一个Range表(原始数据是非排序的,为了更好的展示例子,这里按照第一列做了排序),含有6行数据:
基于上述这个表,我们建立了一个Range Index,如下图所示,其数据结构包含5个部分:
1)Keys
对表中的Range列(即range_start 和 range_end)排序,并做Distinct后组成的一个有序数组。
2)Offsets
是一个有序数组。其下标Index和“Keys”中的下标Index相同,其值表示小于“Keys”中相同Index对应值的Rows数,同时也表示“activatedRows”的下标Index。
3)activiatedRows
记录了原始表中的数据。
4)activeRows
记录了和相应Key有重叠的Rows。
5)activeNewOffsets
主要用于边界情况检查。
(点击可查看大图)

3.1.1 Range Index的创建

Index的创建需要对Build表做一些预处理,过程如下:
1)基于Rows创建Range Event,一个包含Range的Row往往可以产生两个Range Event。比如(range_start, 0, (row, index))和(range_end, 1, (row, index)),其中0和1表示Range的开和闭,row表示原始Row的值,index表示原始Row的index;
2)对Range Event按照三元组的前两个值进行排序;
3)循环排序好的Range Event填充Range Index,比如“Keys”(为Build表中range start和range end唯一不同的值)、“activated Rows”(等价于原始Build表中的Rows)以及“Offsets”(用于映射“Keys”和“activatedRows”);
4)对于activeRows:
如果是Range Event起始,则把当前行加入到“CurrentActiveRows”;
如果是Range Event结束,则把当前行从“CurrentActiveRows”中移除;
如果本次循环的Key与上次循环的Key不同,则把“CurrentActiveRows”写入“activeRows”。

3.1.2 Range数据的查找

我们对上述Range表基于Range Index进行查找。
(点击可查看大图)

比如,对于一个Point(108),从上面的示意图中可以直观地得到可能匹配到的Rows——R1和R2。而对于一个Range(150, 310),从示意图中也可以得到可能匹配到的Rows——R3和R4,那么是如何通过算法来进行查找的呢?
1)点查找一个数据(如Point(108))
A. 采用二分查找算法,在“Keys”中找到比108小又最接近的Key:3->100
B. 在“activeRows”中找到下标3对应的Row:R1和R2
C. 得到最终结果为R1和R2
2)匹配一个Range(如Range(150, 310))
A. 采用二分查找算法,在Keys中找到比150小又最接近的Key:6->140
B. 在“activeRows”中找到下标6对应的Row:R3
C. 在“Keys”中找到比310小又最接近的Key:8->300
D. 结合步骤B中的下标“6”,我们要找到比6大而又小于C中“8+1”对应的Rows。于是,在Offsets中获得下标区间[6+1, 8+1],其对应的值为:(6+1)->4,(7+1)->4,(8+1)->5,即得到左闭右开的区间[4, 5)
E. 在“activatedRows”中根据区间[4, 5)找到对应的Row:R4
F. 得到最终结果:R3和R4

3.2 基于Point构建的查询方案设计
实践中,我们发现非Range表(不包含Range)一般比较小,是可以进行Broadcast的。对于这种情况,我们也可以建立只包含点的Range Index。比如下表所示的Point表(同样原始数据是非排序的,为了更好的展示例子,这里按照第一列做了排序),含有7行数据:

3.2.1 Range Index的创建

我们对Point列构建Range Index,得到的如下所示的Index数据。与Range表生成的Range Index不同的是:这次的Range Index中只有Keys、Offsets和activiatedRows被填充了数据。
(点击可查看大图)

3.2.2 Range数据的查找

我们对上Point表基于Range Index进行查找。
(点击可查看大图)

比如,对于一个Range(300, 600),从以上示意图中,可以直观地得到可能匹配到的Rows:R3、R4和R5。以下是通过算法进行的查找过程:
A. 采用二分查找算法,在“Keys”中找到比300小又最接近的Key:3->200
B. 在“Keys”中找到比600小又最接近的Key:5->500
C. 结合步骤A中的下标“3”,我们要找到比3大而又小于步骤B中“5+1”对应的Rows。于是,在Offsets中获得下标区间[3+1, 5+1],其对应的值为:4->3,5->4和6->6,即得到左闭右开的区间[3, 6)
D. 在 “activatedRows”中对应的下标区间[3, 6)找到对应的值:R3、R4和R5
E. 得到最终结果:R3、R4和R5

4 性能对比

 Performance Comparison

4.1 时间复杂度对比
相比嵌套循环连接(Nested Loop Join),时间复杂度的变化为:
其中,N = 大表中的Records数量;M = 小表中的Records数量;2 = 我们需要在Range Index分别查找下限和上限。
12M*1M12M*2*20,理论上可以节省99.996%的计算量。

4.2 优化后的SQL查询时间对比
我们可以看到经过优化以后(如下图所示),案例1“IP Range”可以在26秒内完成,节约了99.8%的时间,而案例2“Date Range”也节约了93.9%的查询时间。如此看来,基于Range Index数据进行的连接,表现得非常令人满意。
(点击可查看大图)


4.3 Spark DAG对比
相比于传统的BroadcastNestLoopJoin算子(如下表所示),我们引入了一种新的BroadcastRangeJoin算子来进行连接的计算,同时选择BroadcastRangeExechange来代替BroadcastExechange,用于基于Build表数据来创建RangeIndex。
(点击可查看大图)


4.4 和业界主流的OLAP引擎对比
如下表所示,我们选取了其中几个比较有代表性的引擎——OLAP中社区版Spark、Presto、Doris以及传统关系型数据库“Postgres”。通过对比可以发现,业界对Range Join的优化较少。
(点击可查看大图)


5 实 现

Realize

我们已经上线了Range Join优化中的大部分Feature,覆盖了线上85%含有Range形式的非等值连接。
其中的Feature主要包括:
1)支持Point in Interval(点在区间中)的Range Join。这是Range Join的第一个Feature,包含了
A. Range Join的识别和选择
B. Range Index的创建
C. BroadcastRangeJoin算子的实现
D. 对“A Between B and C”这样的连接场景的支持,比如
2)支持部分Range Join。这是对“A Between B and C”的扩展,支持了“AB”这样单一大小的比较场景,比如
3)重用Broadcast Range Exchange。BroadcastRangeJoin引入了BroadcastRangeExchange算子,同时增强了规范化相关的计算方式以支持Shuffle Exechange复用。
4)支持从复杂连接条件中检测Range形式[4],使其适用于Range Join。比如连接条件:
上述连接条件中隐含了以下两个Range:
(1)CAL_DT在区间[AD_STATUS_START, AD_STATUS_END]
(2)CAL_DT在区间[AD_ORGNL_START, AD_ACTL_END]
Range Join会自动选择其中一个Range条件来创建Range Index,另外一个Range条件或者其他条件会作为辅助条件在连接发生时进行进一步的匹配。
5)支持Interval Overlap(区间重叠)场景[5]。比如:
除了上述已实现的Range Join,我们正在进行进一步的优化,使其可以支持左/右/全外连接(Left/Right/Full Outer Join)。鉴于Broadcast Range Join已经非常高效,所以暂时还未支持代码生成。

6 总 结

Conclusion

对于Range Join这个案例,我们解决问题的整体基本思路是:
①发现问题:连接耗时长;
②发现Build表不是很大,而且一般可以做Broadcast;
③对Build表基于某种算法建立Index数据;
④基于Index数据进行连接,代替传统的Nested Loop Join基于Row数据的连接。
(点击可查看大图)

这种优化的方式可以用于解决其他类似的连接耗时问题,给那些可以Broadcast又可以建立某种Index数据的慢查询提供了一种优化思路。

参考链接

[1]https://docs.databricks.com/delta/join-performance/range-join.html

[2]https://issues.apache.org/jira/browse/SPARK-8682

[3]https://www.pilosa.com/blog/range-encoded-bitmaps/

[4]https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/AnalyzingData/Queries/Joins/RangeJoins.htm

[5]https://link.springer.com/article/10.1007/s00778-021-00692-3


猜你喜欢
Spark SQL知识点与实战
Hive计算最大连续登陆天数
Hadoop 数据迁移用法详解
数仓建模—宽表的设计
数仓建模分层理论
Flink 是如何统一批流引擎的

浏览 36
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报