数据倾斜?Spark 3.0 AQE专治各种不服
CBO基本原理
Join应该选择哪种算法策略来执行?BroadcastJoin or ShuffleHashJoin or SortMergeJoin?不同的执行策略对系统的资源要求不同,执行效率也有天壤之别,同一个SQL,选择到合适的策略执行可能只需要几秒钟,而如果没有选择到合适的执行策略就可能会导致系统OOM。
对于雪花模型或者星型模型来讲,多表Join应该选择什么样的顺序执行?不同的Join顺序意味着不同的执行效率,比如A join B join C,A、B表都很大,C表很小,那A join B很显然需要大量的系统资源来运算,执行时间必然不会短。而如果使用A join C join B的执行顺序,因为C表很小,所以A join C会很快得到结果,而且结果集会很小,再使用小的结果集 join B,性能显而易见会好于前一种方案。
CBO这么难优化,Spark怎么解决?
动态合并shuffle分区(Dynamically coalescing shuffle partitions)
动态调整Join策略(Dynamically switching join strategies)
动态优化数据倾斜Join(Dynamically optimizing skew joins)
如果partition过少,每个partition数据量就会过多,可能就会导致大量数据要落到磁盘上,从而拖慢了查询。
如果partition过多,每个partition数据量就会很少,就会产生很多额外的网络开销,并且影响Spark task scheduler,从而拖慢查询。
非流式查询
包含至少一个exchange(如join、聚合、窗口算子)或者一个子查询
Spark CBO源码实现
// QueryExecution类
lazy val executedPlan: SparkPlan = {
executePhase(QueryPlanningTracker.PLANNING) {
QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
}
}
protected def preparations: Seq[Rule[SparkPlan]] = {
QueryExecution.preparations(sparkSession,
Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))))
}
private[execution] def preparations(
sparkSession: SparkSession,
adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = {
// `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
adaptiveExecutionRule.toSeq ++
Seq(
PlanDynamicPruningFilters(sparkSession),
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
sparkSession.sessionState.columnarRules),
CollapseCodegenStages(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf)
)
}
// InsertAdaptiveSparkPlan
override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false)
private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match {
// ...some checking
case _ if shouldApplyAQE(plan, isSubquery) =>
if (supportAdaptive(plan)) {
try {
// Plan sub-queries recursively and pass in the shared stage cache for exchange reuse.
// Fall back to non-AQE mode if AQE is not supported in any of the sub-queries.
val subqueryMap = buildSubqueryMap(plan)
val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap)
val preprocessingRules = Seq(
planSubqueriesRule)
// Run pre-processing rules.
val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules)
logDebug(s"Adaptive execution enabled for plan: $plan")
AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery)
} catch {
case SubqueryAdaptiveNotSupportedException(subquery) =>
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
s"but is not supported for sub-query: $subquery.")
plan
}
} else {
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
s"but is not supported for query: $plan.")
plan
}
case _ => plan
}
private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
// 第一次调用 getFinalPhysicalPlan方法时为false,等待该方法执行完毕,全部Stage不会再改变,直接返回最终plan
if (isFinalPlan) return currentPhysicalPlan
// In case of this adaptive plan being executed out of `withActive` scoped functions, e.g.,
// `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be
// created in the middle of the execution.
context.session.withActive {
val executionId = getExecutionId
var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
var result = createQueryStages(currentPhysicalPlan)
val events = new LinkedBlockingQueue[StageMaterializationEvent]()
val errors = new mutable.ArrayBuffer[Throwable]()
var stagesToReplace = Seq.empty[QueryStageExec]
while (!result.allChildStagesMaterialized) {
currentPhysicalPlan = result.newPlan
// 接下来有哪些Stage要执行,参考 createQueryStages(plan: SparkPlan) 方法
if (result.newStages.nonEmpty) {
stagesToReplace = result.newStages ++ stagesToReplace
// onUpdatePlan 通过listener更新UI
executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))
// Start materialization of all new stages and fail fast if any stages failed eagerly
result.newStages.foreach { stage =>
try {
// materialize() 方法对Stage的作为一个单独的Job提交执行,并返回 SimpleFutureAction 来接收执行结果
// QueryStageExec: materialize() -> doMaterialize() ->
// ShuffleExchangeExec: -> mapOutputStatisticsFuture -> ShuffleExchangeExec
// SparkContext: -> submitMapStage(shuffleDependency)
stage.materialize().onComplete { res =>
if (res.isSuccess) {
events.offer(StageSuccess(stage, res.get))
} else {
events.offer(StageFailure(stage, res.failed.get))
}
}(AdaptiveSparkPlanExec.executionContext)
} catch {
case e: Throwable =>
cleanUpAndThrowException(Seq(e), Some(stage.id))
}
}
}
// Wait on the next completed stage, which indicates new stats are available and probably
// new stages can be created. There might be other stages that finish at around the same
// time, so we process those stages too in order to reduce re-planning.
// 等待,直到有Stage执行完毕
val nextMsg = events.take()
val rem = new util.ArrayList[StageMaterializationEvent]()
events.drainTo(rem)
(Seq(nextMsg) ++ rem.asScala).foreach {
case StageSuccess(stage, res) =>
stage.resultOption = Some(res)
case StageFailure(stage, ex) =>
errors.append(ex)
}
// In case of errors, we cancel all running stages and throw exception.
if (errors.nonEmpty) {
cleanUpAndThrowException(errors, None)
}
// Try re-optimizing and re-planning. Adopt the new plan if its cost is equal to or less
// than that of the current plan; otherwise keep the current physical plan together with
// the current logical plan since the physical plan's logical links point to the logical
// plan it has originated from.
// Meanwhile, we keep a list of the query stages that have been created since last plan
// update, which stands for the "semantic gap" between the current logical and physical
// plans. And each time before re-planning, we replace the corresponding nodes in the
// current logical plan with logical query stages to make it semantically in sync with
// the current physical plan. Once a new plan is adopted and both logical and physical
// plans are updated, we can clear the query stage list because at this point the two plans
// are semantically and physically in sync again.
// 对前面的Stage替换为 LogicalQueryStage 节点
val logicalPlan = replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace)
// 再次调用optimizer 和planner 进行优化
val (newPhysicalPlan, newLogicalPlan) = reOptimize(logicalPlan)
val origCost = costEvaluator.evaluateCost(currentPhysicalPlan)
val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
if (newCost < origCost ||
(newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
logOnLevel(s"Plan changed from $currentPhysicalPlan to $newPhysicalPlan")
cleanUpTempTags(newPhysicalPlan)
currentPhysicalPlan = newPhysicalPlan
currentLogicalPlan = newLogicalPlan
stagesToReplace = Seq.empty[QueryStageExec]
}
// Now that some stages have finished, we can try creating new stages.
// 进入下一轮循环,如果存在Stage执行完毕, 对应的resultOption 会有值,对应的allChildStagesMaterialized 属性 = true
result = createQueryStages(currentPhysicalPlan)
}
// Run the final plan when there's no more unfinished stages.
// 所有前置stage全部执行完毕,根据stats信息优化物理执行计划,确定最终的 physical plan
currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules)
isFinalPlan = true
executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
currentPhysicalPlan
}
}
// SparkContext
/**
* Submit a map stage for execution. This is currently an internal API only, but might be
* promoted to DeveloperApi in the future.
*/
private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C])
: SimpleFutureAction[MapOutputStatistics] = {
assertNotStopped()
val callSite = getCallSite()
var result: MapOutputStatistics = null
val waiter = dagScheduler.submitMapStage(
dependency,
(r: MapOutputStatistics) => { result = r },
callSite,
localProperties.get)
new SimpleFutureAction[MapOutputStatistics](waiter, result)
}
// DAGScheduler
def submitMapStage[K, V, C](
dependency: ShuffleDependency[K, V, C],
callback: MapOutputStatistics => Unit,
callSite: CallSite,
properties: Properties): JobWaiter[MapOutputStatistics] = {
val rdd = dependency.rdd
val jobId = nextJobId.getAndIncrement()
if (rdd.partitions.length == 0) {
throw new SparkException("Can't run submitMapStage on RDD with 0 partitions")
}
// We create a JobWaiter with only one "task", which will be marked as complete when the whole
// map stage has completed, and will be passed the MapOutputStatistics for that stage.
// This makes it easier to avoid race conditions between the user code and the map output
// tracker that might result if we told the user the stage had finished, but then they queries
// the map output tracker and some node failures had caused the output statistics to be lost.
val waiter = new JobWaiter[MapOutputStatistics](
this, jobId, 1,
(_: Int, r: MapOutputStatistics) => callback(r))
eventProcessLoop.post(MapStageSubmitted(
jobId, dependency, callSite, waiter, Utils.cloneProperties(properties)))
waiter
}
// AdaptiveSparkPlanExec
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
ReuseAdaptiveSubquery(conf, context.subqueryCache),
CoalesceShufflePartitions(context.session),
// The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
// added by `CoalesceShufflePartitions`. So they must be executed after it.
OptimizeSkewedJoin(conf),
OptimizeLocalShuffleReader(conf),
ApplyColumnarRulesAndInsertTransitions(conf, context.session.sessionState.columnarRules),
CollapseCodegenStages(conf)
)
Spark3.0AQE在FreeWheel的应用与实践
主要升级改动
"spark.sql.adaptive.enabled": true,
"spark.sql.adaptive.coalescePartitions.enabled": true,
"spark.sql.adaptive.coalescePartitions.minPartitionNum": 1,
"spark.sql.adaptive.advisoryPartitionSizeInBytes": "128MB"
在 reduce 阶段从没有 AQE 的40320个 tasks 锐减到4580个 tasks,减少了一个数量级。
下图里下半部分是没有 AQE 的 Spark 2.x 的 task 情况,上半部分是打开 AQE 特性后的 Spark 3.x 的情况。
从更详细的运行时间图来看,shuffler reader后同样的 aggregate 的操作等时间也从4.44h到2.56h,节省将近一半。
左边是 spark 2.x 的运行指标明细,右边是打开 AQE 后通过custom shuffler reader后的运行指标情况。
性能提升
实践成果
历史数据 Pipeline 对于大 batch 的数据(200~400G/每小时)性能提升高达40%, 对于小 batch(小于 100G/每小时)提升效果没有大 batch 提升的那么明显,每天所有 batches平均提升水平27.5%左右。
预测数据性能平均提升30%。由于数据输入源不一样,目前是分别两个 pipelines 在跑历史和预测数据,产生的表的数目也不太一样,因此做了分别的评估。
欢迎点赞+收藏+转发朋友圈素质三连
文章不错?点个【在看】吧!