SparkSQL 整体运行架构和底层实现

程序源代码

共 9694字,需浏览 20分钟

 ·

2020-08-26 23:02

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

大数据技术与架构
点击右侧关注,大数据开发领域最强公众号!

暴走大数据
点击右侧关注,暴走大数据!


对于:SELECT  a1,a2,a3  FROM  tableA  Where  condition,Spark SQL会做什么?


可以看得出来,该语句是由Projection(a1,a2,a3)、Data Source(tableA)、Filter(condition)组成,分别对应sql查询过程中的Result、Data Source、Operation,也就是说SQL语句按Result-->Data Source-->Operation的次序来描述的。那么,SQL语句在实际的运行过程中是怎么处理的呢?一般的数据库系统先将读入的SQL语句(Query)先进行解析(Parse),分辨出SQL语句中哪些词是关键词(如SELECT、FROM、WHERE),哪些是表达式、哪些是Projection、哪些是Data Source等等。这一步就可以判断SQL语句是否规范,不规范就报错,规范就继续下一步过程绑定(Bind),这个过程将SQL语句和数据库的数据字典(列、表、视图等等)进行绑定,如果相关的Projection、Data Source等等都是存在的话,就表示这个SQL语句是可以执行的;而在执行前,一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划(Optimize),最终执行该计划(Execute),并返回结果。当然在实际的执行过程中,是按Operation-->Data Source-->Result的次序来进行的,和SQL语句的次序刚好相反;在执行过程有时候甚至不需要读取物理表就可以返回结果,比如重新运行刚运行过的SQL语句,可能直接从数据库的缓冲池中获取返回结果。


以上过程看上去非常简单,但实际上会包含很多复杂的操作细节在里面。而这些操作细节都和Tree有关,在数据库解析(Parse)SQL语句的时候,会将SQL语句转换成一个树型结构来进行处理,如下面一个查询,会形成一个含有多个节点(TreeNode)的Tree,然后在后续的处理过程中对该Tree进行一系列的操作。



下图给出了对Tree的一些可能的操作细节,对于Tree的处理过程中所涉及更多的细节,可以查看相关的数据库论文。


1:Tree和Rule

SparkSQL对SQL语句的处理和关系型数据库对SQL语句的处理采用了类似的方法,首先会将SQL语句进行解析(Parse),然后形成一个Tree,在后续的如绑定、优化等处理过程都是对Tree的操作,而操作的方法是采用Rule,通过模式匹配,对不同类型的节点采用不同的操作。


/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala  */
def sql(sqlText: String): SchemaRDD = {
if (dialect == "sql") {
new SchemaRDD(this, parseSql(sqlText)) //parseSql(sqlText)对sql语句进行语法解析
} else {
sys.error(s"Unsupported SQL dialect: $dialect")
}
}

sqlContext.sql的返回结果是SchemaRDD,调用了new SchemaRDD(this, parseSql(sqlText)) 来对sql语句进行处理,处理之前先使用catalyst.SqlParser对sql语句进行语法解析,使之生成Unresolved LogicalPlan。

/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala  */
protected[sql] val parser = new catalyst.SqlParser
protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)

类SchemaRDD继承自SchemaRDDLike


/**源自sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala  */
class SchemaRDD(
@transient val sqlContext: SQLContext,
@transient val baseLogicalPlan: LogicalPlan)
extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike

SchemaRDDLike中调用sqlContext.executePlan(baseLogicalPlan)来执行catalyst.SqlParser解析后生成Unresolved LogicalPlan,这里的baseLogicalPlan就是指Unresolved LogicalPlan。

/**源自sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala  */
private[sql] trait SchemaRDDLike {
@transient val sqlContext: SQLContext
@transient val baseLogicalPlan: LogicalPlan
private[sql] def baseSchemaRDD: SchemaRDD

lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)

sqlContext.executePlan做了什么呢?它调用了QueryExecution类

/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala  */
protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }

QueryExecution类的定义:

/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala  */
protected abstract class QueryExecution {
def logical: LogicalPlan

//对Unresolved LogicalPlan进行analyzer,生成resolved LogicalPlan
lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
//对resolved LogicalPlan进行optimizer,生成optimized LogicalPlan
lazy val optimizedPlan = optimizer(analyzed)
// 将optimized LogicalPlan转换成PhysicalPlan
lazy val sparkPlan = {
SparkPlan.currentContext.set(self)
planner(optimizedPlan).next()
}
// PhysicalPlan执行前的准备工作,生成可执行的物理计划
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

//执行可执行物理计划
lazy val toRdd: RDD[Row] = executedPlan.execute()

......
}

sqlContext总的一个过程如下图所示:
1.SQL语句经过SqlParse解析成UnresolvedLogicalPlan;
2.使用analyzer结合数据数据字典(catalog)进行绑定,生成resolvedLogicalPlan;
3.使用optimizer对resolvedLogicalPlan进行优化,生成optimizedLogicalPlan;
4.使用SparkPlan将LogicalPlan转换成PhysicalPlan;
5.使用prepareForExecution()将PhysicalPlan转换成可执行物理计划;
6.使用execute()执行可执行物理计划;
7.生成SchemaRDD。
在整个运行过程中涉及到多个sparkSQL的组件,如SqlParse、analyzer、optimizer、SparkPlan等等.

3:hiveContext的运行过程

在分布式系统中,由于历史原因,很多数据已经定义了hive的元数据,通过这些hive元数据,sparkSQL使用hiveContext很容易实现对这些数据的访问。值得注意的是hiveContext继承自sqlContext,所以在hiveContext的的运行过程中除了override的函数和变量,可以使用和sqlContext一样的函数和变量。

 从sparkSQL1.1开始,hiveContext使用hiveContext.sql(sqlText)来提交用户sql语句进行查询:

/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */
override def sql(sqlText: String): SchemaRDD = {
// 使用spark.sql.dialect定义采用的语法解析器
if (dialect == "sql") {
super.sql(sqlText) //如果使用sql解析器,则使用sqlContext的sql方法
} else if (dialect == "hiveql") { //如果使用和hiveql解析器,则使用HiveQl.parseSql
new SchemaRDD(this, HiveQl.parseSql(sqlText))
} else {
sys.error(s"Unsupported SQL dialect: $dialect. Try 'sql' or 'hiveql'")
}
}

hiveContext.sql首先根据用户的语法设置(spark.sql.dialect)决定具体的执行过程,如果dialect == "sql"则采用sqlContext的sql语法执行过程;如果是dialect == "hiveql",则采用hiveql语法执行过程。在这里我们主要看看hiveql语法执行过程。可以看出,hiveContext.sql调用了new SchemaRDD(this, HiveQl.parseSql(sqlText))对hiveql语句进行处理,处理之前先使用对语句进行语法解析。

/**源自src/main/scala/org/apache/spark/sql/hive/HiveQl.scala  */
/** Returns a LogicalPlan for a given HiveQL string. */
def parseSql(sql: String): LogicalPlan = {
try {
if (条件) {
//非hive命令的处理,如set、cache table、add jar等直接转化成command类型的LogicalPlan
.....
} else {
val tree = getAst(sql)
if (nativeCommands contains tree.getText) {
NativeCommand(sql)
} else {
nodeToPlan(tree) match {
case NativePlaceholder => NativeCommand(sql)
case other => other
}
}
}
} catch {
//异常处理
......
}
}

因为sparkSQL所支持的hiveql除了兼容hive语句外,还兼容一些sparkSQL本身的语句,所以在HiveQl.parseSql对hiveql语句语法解析的时候:

  • 首先考虑一些非hive语句的处理,这些命令属于sparkSQL本身的命令语句,如设置sparkSQL运行参数的set命令、cache table、add jar等,将这些语句转换成command类型的LogicalPlan;

  • 如果是hive语句,则调用getAst(sql)使用hive的ParseUtils将该语句先解析成AST树,然后根据AST树中的关键字进行转换:类似命令型的语句、DDL类型的语句转换成command类型的LogicalPlan;其他的转换通过nodeToPlan转换成LogicalPlan。

/**源自src/main/scala/org/apache/spark/sql/hive/HiveQl.scala  */  
/** * Returns the AST for the given SQL string. */
def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql))

和sqlContext一样,类SchemaRDD继承自SchemaRDDLike,SchemaRDDLike调用sqlContext.executePlan(baseLogicalPlan),不过hiveContext重写了executePlan()函数:

/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */ 
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }

并使用了一个继承自sqlContext.QueryExecution的新的QueryExecution类:

/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */
protected[sql] abstract class QueryExecution extends super.QueryExecution {
// TODO: Create mixin for the analyzer instead of overriding things here.
override lazy val optimizedPlan =
optimizer(ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed))))

override lazy val toRdd: RDD[Row] = executedPlan.execute().map(_.copy())
......
}

所以在hiveContext的运行过程基本和sqlContext一致,除了override的catalog、functionRegistry、analyzer、planner、optimizedPlan、toRdd。
hiveContext的catalog,是指向 Hive Metastore:

/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */
/* A catalyst metadata catalog that points to the Hive Metastore. */
@transient
override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog {
override def lookupRelation(
databaseName: Option[String],
tableName: String,
alias: Option[String] = None): LogicalPlan = {

LowerCaseSchema(super.lookupRelation(databaseName, tableName, alias))
}
}

hiveContext的analyzer,使用了新的catalog和functionRegistry:

/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */
/* An analyzer that uses the Hive metastore. */
@transient
override protected[sql] lazy val analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = false)

hiveContext的planner,使用新定义的hivePlanner:

/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */
@transient
override protected[sql] val planner = hivePlanner

所以hiveContext总的一个过程如下图所示:
SQL语句经过HiveQl.parseSql解析成Unresolved LogicalPlan,在这个解析过程中对hiveql语句使用getAst()获取AST树,然后再进行解析;
1.使用analyzer结合数据hive源数据Metastore(新的catalog)进行绑定,生成resolved LogicalPlan;
2.使用optimizer对resolved LogicalPlan进行优化,生成optimized LogicalPlan,优化前使用了3.ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))进行预处理;
4.使用hivePlanner将LogicalPlan转换成PhysicalPlan;
5.使用prepareForExecution()将PhysicalPlan转换成可执行物理计划;
6.使用execute()执行可执行物理计划;
7.执行后,使用map(_.copy)将结果导入SchemaRDD。
hiveContxt还有很多针对hive的特性,更细节的内容参看源码。


4:catalyst优化器

SparkSQL1.1总体上由四个模块组成:core、catalyst、hive、hive-Thriftserver:

  • core处理数据的输入输出,从不同的数据源获取数据(RDD、Parquet、json等),将查询结果输出成schemaRDD;

  • catalyst处理查询语句的整个处理过程,包括解析、绑定、优化、物理计划等,说其是优化器,还不如说是查询引擎;

  • hive对hive数据的处理

  • hive-ThriftServer提供CLI和JDBC/ODBC接口
    在这四个模块中,catalyst处于最核心的部分,其性能优劣将影响整体的性能。由于发展时间尚短,还有很多不足的地方,但其插件式的设计,为未来的发展留下了很大的空间。下面是catalyst的一个设计图:



    其中虚线部分是以后版本要实现的功能,实线部分是已经实现的功能。从上图看,catalyst主要的实现组件有:

  • sqlParse,完成sql语句的语法解析功能,目前只提供了一个简单的sql解析器;

  • Analyzer,主要完成绑定工作,将不同来源的Unresolved LogicalPlan和数据元数据(如hive metastore、Schema catalog)进行绑定,生成resolved LogicalPlan;

  • optimizer对resolved LogicalPlan进行优化,生成optimized LogicalPlan;

  • Planner将LogicalPlan转换成PhysicalPlan;

  • CostModel,主要根据过去的性能统计数据,选择最佳的物理执行计划
    这些组件的基本实现方法:

  • 先将sql语句通过解析生成Tree,然后在不同阶段使用不同的Rule应用到Tree上,通过转换完成各个组件的功能。

  • Analyzer使用Analysis Rules,配合数据元数据(如hive metastore、Schema catalog),完善Unresolved LogicalPlan的属性而转换成resolved LogicalPlan;

  • optimizer使用Optimization Rules,对resolved LogicalPlan进行合并、列裁剪、过滤器下推等优化作业而转换成* optimized LogicalPlan;

  • Planner使用Planning Strategies,对optimized LogicalPlan

版权声明:

本文为大数据技术与架构整理,原作者独家授权。未经原作者允许转载追究侵权责任。
编辑|冷眼丶
微信公众号|import_bigdata


欢迎点赞+收藏+转发朋友圈素质三连


文章不错?点个【在看】吧! ?

浏览 21
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报