spark sql解析过程中对tree的遍历(源码详解)

静下心来读源码,给想要了解spark sql底层解析原理的小伙伴们!

【本文大纲】

1、执行计划回顾

2、遍历过程概述

3、遍历过程详解

4、思考小问题

执行计划回顾

Spark  sql执行计划的生成过程:

  1. 接收 sql 语句,初步解析成 logical plan

  2. 分析上步生成的 logical plan,生成验证后的 logical plan

  3. 对分析过后的 logical plan,进行优化

  4. 对优化过后的 logical plan,生成 physical plan

  5. 根据 physical plan,生成 rdd 的程序,并且提交运行

SELECT A,B FROM TESTDATA2 WHERE A>2

 结合上图,写测试用例,每一步生成的执行计划如下:

Spark sql解析会生成四种plan:

Parsed Logical Plan, Analyzed Logical Plan, Optimized Logical Plan, Physical Plan

上面这四种plan,无论是 LogicalPlan 还是 PhysicalPlan,都是通过树的形式表示。每一步都是对树进行操作,生成新的树。在这个过程中,对树的遍历非常重要。

遍历过程概述

最常用到的有  后序遍历 和 前序遍历  两种

后序遍历

TreeNode 中的 transformUp方法以及AnalysisHelper 中的 resolveOperatorsUp方法 等

这两个方法类似,以TreeNode 中的 transformUp为例:

def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType = {// 先遍历子节点,得到叶子节点val afterRuleOnChildren = mapChildren(_.transformUp(rule))//对节点执行规则val newNode = if (this fastEquals afterRuleOnChildren) {CurrentOrigin.withOrigin(origin) {//这里用到了PartialFunction的applyOrElse方法,用来避免undefined的情况发生。如果当前节点应用rule没有匹配的话,则返回默认的当前节点本身rule.applyOrElse(this, identity[BaseType])}} else {CurrentOrigin.withOrigin(origin) {rule.applyOrElse(afterRuleOnChildren, identity[BaseType])}}// If the transform function replaces this node with a new one, carry over the tags.newNode.copyTagsFrom(this)newNode
}

递归逻辑:

  • 递归结束条件:如果是子节点,那么使用该规则执行该节点,并且返回执行规则后的节点

  • 递归继续条件:如果有子节点,那么先根据遍历子节点的结果,生成新节点。最后在使用该规则执行新节点

前序遍历

TreeNode 中的 transformDown方法以及AnalysisHelper 中的 resolveOperatorsDown方法 等

TreeNode 中的 transformDown为例:

def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {// 对当前节点,调用rule函数。val afterRule = CurrentOrigin.withOrigin(origin) {// 这里rule函数有可能会生成新的节点,新节点的子节点可能不一样rule.applyOrElse(this, identity[BaseType])}
​// Check if unchanged and then possibly return old copy to avoid gc churn.//再遍历子节点 if (this fastEquals afterRule) {
// 如果当前节点没有变化,则继续遍历它的子节点mapChildren(_.transformDown(rule))} else {// 如果当前节点发生改变,需要对改变后的节点进行遍历afterRule.copyTagsFrom(this)afterRule.mapChildren(_.transformDown(rule))}
}

递归逻辑:

  • 递归结束条件:如果是叶子节点,那么使用规则对该节点操作,并且返回操作后的节点。

  • 递归继续条件:如果不是叶子节点,那么先使用该规则对该节点操作。对操作后的该节点,继续遍历其子节点,用子节点的返回结果,来构建成新的节点。

遍历中的通用方法

上面几种方法中,都用到了TreeNode中的mapChildren、mapProductIterator方法

mapChildren

mapChildren 会依次调用函数对子节点操作,根据返回的结果生成一个新的节点。

def mapChildren(f: BaseType => BaseType): BaseType = {
//如果不是叶子节点,那么会执行mapChildren(f, forceCopy = false)方法,遍历构造函数的参数。如果参数是子节点,那么递归遍历
if (containsChild.nonEmpty) {mapChildren(f, forceCopy = false)} else {
//如果是叶子节点,则返回自身节点 this}
}
​
​
private def mapChildren(f: BaseType => BaseType,forceCopy: Boolean): BaseType = {var changed = false
​def mapChild(child: Any): Any = child match {case arg: TreeNode[_] if containsChild(arg) =>val newChild = f(arg.asInstanceOf[BaseType])if (forceCopy || !(newChild fastEquals arg)) {changed = truenewChild} else {arg}case tuple @ (arg1: TreeNode[_], arg2: TreeNode[_]) =>val newChild1 = if (containsChild(arg1)) {f(arg1.asInstanceOf[BaseType])} else {arg1.asInstanceOf[BaseType]}
​val newChild2 = if (containsChild(arg2)) {f(arg2.asInstanceOf[BaseType])} else {arg2.asInstanceOf[BaseType]}
​if (forceCopy || !(newChild1 fastEquals arg1) || !(newChild2 fastEquals arg2)) {changed = true(newChild1, newChild2)} else {tuple}case other => other}
​
// 调用了mapProductIterator方法,遍历构造函数的参数,返回新的构造参数val newArgs = mapProductIterator {
// 如果参数是TreeNode子类,并且是当前节点的子节点case arg: TreeNode[_] if containsChild(arg) =>
// 递归调用函数遍历 这里的f可能是 transformUp or transformDownval newChild = f(arg.asInstanceOf[BaseType])
// 如果子节点发生变化了,则更改changed标识if (forceCopy || !(newChild fastEquals arg)) {changed = truenewChild} else {arg}case Some(arg: TreeNode[_]) if containsChild(arg) =>val newChild = f(arg.asInstanceOf[BaseType])if (forceCopy || !(newChild fastEquals arg)) {changed = trueSome(newChild)} else {Some(arg)}case m: Map[_, _] => m.mapValues {case arg: TreeNode[_] if containsChild(arg) =>val newChild = f(arg.asInstanceOf[BaseType])if (forceCopy || !(newChild fastEquals arg)) {changed = truenewChild} else {arg}case other => other}.view.force // `mapValues` is lazy and we need to force it to materializecase d: DataType => d // Avoid unpacking Structscase args: Stream[_] => args.map(mapChild).force // Force materialization on streamcase args: Iterable[_] => args.map(mapChild)case nonChild: AnyRef => nonChildcase null => null}// 如果子节点发生变化,则利用新的构造参数,实例化新的节点if (forceCopy || changed) makeCopy(newArgs, forceCopy) else this
}

mapProductIterator

TreeNode 继承了 Product 接口,TreeNode 的子类实现了 Product 接口,所以支持访问构造方法的参数。TreeNode 的 mapProductIterator 方法,接收一个函数用来遍历当前节点的构造参数

这里有一个知识点(ClassTag用法):https://dzone.com/articles/scala-classtag-a-simple-use-case

//ClassTag用法def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = {
// protected def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = {val arr = Array.ofDim[B](productArity)var i = 0while (i < arr.length) {arr(i) = f(productElement(i))i += 1}arr
}

​遍历过程详解

下面以Parsed Logical Plan --> Analyzed Logical Plan的过程中 ,某个规则为例,详细跟踪一下这两种遍历方式。

分析一下当前的Parsed Logical Plan

当前sql

SELECT A,B FROM TESTDATA2 WHERE A>2

 生成的Parsed Logical Plan:

== Parsed Logical Plan =='Project ['A, 'B]+- 'Filter ('A > 2)   +- 'UnresolvedRelation [TESTDATA2]

上面执行计划涉及到 三个类(Project、Filter、UnresolvedRelation):

case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)

两个参数:

  • Project-projectList: Seq['A, 'B]

  • Project-child(LogicalPlan):

  • 'Filter ('A > 2)

  •    +- 'UnresolvedRelation [TESTDATA2]

Project有一个子节点Filter

case class Filter(condition: Expression, child: LogicalPlan)

两个参数:

  • Filter-condition:('A > 2)

  • Filter--child(LogicalPlan): 

      'UnresolvedRelation [TESTDATA2]

Filter有一个子节点UnresolvedRelation

case class UnresolvedRelation( multipartIdentifier: Seq[String])

一个参数:

  • UnresolvedRelation-multipartIdentifier: Seq[TESTDATA2]

UnresolvedRelation无子节点

Project、Filter、UnresolvedRelation与 LogicalPlan、 TreeNode的继承关系如下:

Project、Filter、UnresolvedRelation本身是Logical Plan 、TreeNode。

后序遍历(AnalysisHelper.resolveOperatorsUp)

Parsed Logical Plan 需要 通过Analyzer类中的一系列rule 转换生成Analyzed Logical Plan。

下图是Analyzer类中rule,会提前初始化在batches里:

​这里的rule通过apply方法遍历Parsed Logical Plan 的每个节点,依据定好的规则生成Analyzed Logical Plan,以  ResolveHints.ResolveJoinStrategyHint为例:

主要通过AnalysisHelper 中的 resolveOperatorsUp(后序遍历的)方法:

// 入参为rule,偏函数
def resolveOperatorsUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {if (!analyzed) {AnalysisHelper.allowInvokingTransformsInAnalyzer {// 1、先遍历子节点,得到叶子节点val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule))//2、为节点执行规则if (self fastEquals afterRuleOnChildren) {CurrentOrigin.withOrigin(origin) {// 如果遍历后当前节点没有发生变化,对当前的plan执行rule规则rule.applyOrElse(self, identity[LogicalPlan])}} else {CurrentOrigin.withOrigin(origin) {// 如果遍历后 当前 节点发 生了变化,则新负值的afterRuleOnChildren执行rule规则rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])}}}} else {self}
}

当前的节点为Project,执行Project 的 resolveOperatorsUp 方法,该方法会先遍历Project的子节点。

第一层遍历:

执行Project 的 mapChildren方法

​第二层遍历:

执行Filter 的 mapChildren方法

​第三层遍历:

执行UnresolvedRelation 的 mapChildren方法

由 于 UnresolvedRelation为子节点,返回节点本 身,为UnresolvedRelation执行rule。

为UnresolvedRelation节点执行ResolveJoinStrategyHint的apply方法:

// 该规则主要是针对Hint节点起作用 ,目前是UnresolvedRelation节点
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {case h: UnresolvedHint if STRATEGY_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>if (h.parameters.isEmpty) {// If there is no table alias specified, apply the hint on the entire subtree.ResolvedHint(h.child, createHintInfo(h.name))} else {// Otherwise, find within the subtree query plans to apply the hint.val relationNamesInHint = h.parameters.map {case tableName: String => UnresolvedAttribute.parseAttributeName(tableName)case tableId: UnresolvedAttribute => tableId.namePartscase unsupported => throw new AnalysisException("Join strategy hint parameter " +s"should be an identifier or string but was $unsupported (${unsupported.getClass}")}.toSetval relationsInHintWithMatch = new mutable.HashSet[Seq[String]]val applied = applyJoinStrategyHint(h.child, relationNamesInHint, relationsInHintWithMatch, h.name)
​// Filters unmatched relation identifiers in the hintval unmatchedIdents = relationNamesInHint -- relationsInHintWithMatchhintErrorHandler.hintRelationsNotFound(h.name, h.parameters, unmatchedIdents)applied}
}

​这个规则主要是 对Hint节点起作用,但目前是UnresolvedRelation节点,不能匹配的上。因此通过

rule.applyOrElse(self, identity[LogicalPlan])

之后,返回UnresolvedRelation本身。

UnresolvedRelation返回后,就会接着先后为Filter-->Project执行ResolveJoinStrategyHint规则,最后返回Project本身。

到此,整个ResolveJoinStrategyHint对Logical plan的 遍历及执行规则的 过 程 就结束了。 

前序遍历(AnalysisHelper.resolveOperatorsDown)

Analyzer 中的 ExtractWindowExpressions规则

 主要通过AnalysisHelper 中的 resolveOperatorsDown方法:

/** Similar to [[resolveOperatorsUp]], but does it top-down. */
def resolveOperatorsDown(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {if (!analyzed) {AnalysisHelper.allowInvokingTransformsInAnalyzer {   val afterRule = CurrentOrigin.withOrigin(origin) {// 1、为当前节点执行规则rule.applyOrElse(self, identity[LogicalPlan])}
​//  2、对执行完规则后的新节点遍历迭代if (self fastEquals afterRule) {//如果执行完规则后的节点没有变化(即规则没有起到作用),则对节点遍历迭代mapChildren(_.resolveOperatorsDown(rule))} else {//如果执行完规则后的节点发生变化,则对新节点遍历迭代afterRule.mapChildren(_.resolveOperatorsDown(rule))}}} else {self}
}

先为Project节点执行ExtractWindowExpressions.apply方法:

 Project节点模式匹配case p: LogicalPlan if !p.childrenResolved => p 返回Project节点本身

第一层遍历:

 得到Project的子节点Filter,执行Filter.resolveOperatorsDown方法,先对Filter节点执行ExtractWindowExpressions.apply方法,跑一遍规则,最后由于sql没有用到window相关函数,返回Filter节点本身,开始对Filter节点进行遍历

第二层遍历:

得到Filter的子节点UnresolvedRelation,执行UnresolvedRelation.resolveOperatorsDown方法,先对UnresolvedRelation节点执行ExtractWindowExpressions.apply方法,跑一遍规则,返回UnresolvedRelation节点本身,开始对UnresolvedRelation节点进行遍历

第三层遍历:

UnresolvedRelation没有子节点,在mapChildren方法被返回。

最终 返回Project节点,ExtractWindowExpressions执行完成。

思考

什么rule适合用后序遍历?什么rule适合前序遍历?

当我们自己开发规则时,该怎么选呢?

推荐阅读--

SparkSql源码成神之路

sparksql源码系列 |  最全的logical plan优化规则整理(spark2.3)

Sparksql源码系列 | 读源码必须掌握的scala基础语法

澄清 | snappy压缩到底支持不支持split? 为啥?

以后的事谁也说不准

转型【数仓开发】该怎么学

大数据开发轻量级入门方案

OLAP | 基础知识梳理


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部