Spark的stage划分算法源码分析
Spark Application中可以有不同的Action触发多个Job,也就是说一个Application中可以有很多的Job,每个Job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行。
然而Stage划分的依据就是宽依赖,什么时候产生宽依赖(产生shuffle)呢?例如reduceByKey,groupByKey等等。
DAGScheduler的handleJobSubmitted方法主要是用来创建最后一个stage,同时将job划分成多个stage。
一、stage的划分算法’
/**** 来处理这次提交的Job来处理这次提交的Job*/private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],allowLocal: Boolean,callSite: CallSite,listener: JobListener,properties: Properties = null){// 一、使用触发job的RDD最后一个stagevar finalStage: Stage = nulltry {// New stage creation may throw an exception if, for example, jobs are run on a// HadoopRDD whose underlying HDFS files have been deleted.// stage的划分是从最后一个stage往前倒序划分的,最后一个就是一个stage// 并将stage放入DAGschedule的缓存中finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)} catch {case e: Exception =>logWarning("Creating new stage failed due to exception - job: " + jobId, e)listener.jobFailed(e)return}if (finalStage != null) {// 用finalStage创建job,也就是说这个job最后一个stage,肯定就是finalstage// 这里就创建了一个job了val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)clearCacheLocs()logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(job.jobId, callSite.shortForm, partitions.length, allowLocal))logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")logInfo("Parents of final stage: " + finalStage.parents)logInfo("Missing parents: " + getMissingParentStages(finalStage))val shouldRunLocally =localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1val jobSubmissionTime = clock.getTimeMillis()if (shouldRunLocally) {// Compute very short actions like first() or take() with no parent stages locally.listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))runLocally(job)} else {// 三、将job添加到内存缓存中jobIdToActiveJob(jobId) = jobactiveJobs += jobfinalStage.resultOfJob = Some(job)val stageIds = jobIdToStageIds(jobId).toArrayval stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))// 四、提交最后一个stage// 这个方法会提交第一个stage 并把其余的stage放在缓存中submitStage(finalStage)}}// 在提交等待队列的stagesubmitWaitingStages()}
/*** 提交stage的方法* 同时包含stage的划分算法* @param stage*/private def submitStage(stage: Stage) {val jobId = activeJobForStage(stage)if (jobId.isDefined) {logDebug("submitStage(" + stage + ")")if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {// 这个很重要,获取某个stage 的父stageval missing = getMissingParentStages(stage).sortBy(_.id)logDebug("missing: " + missing)// 如果返回为空if (missing == Nil) {logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")submitMissingTasks(stage, jobId.get)} else {// 继续递归调用划分stagefor (parent <- missing) {submitStage(parent)}// 同时将stage加入到等待队列waitingStages += stage}}} else {abortStage(stage, "No active job for stage " + stage.id)}}
获取某个stage的父stage
/*** 获取stage划分的父stage* @param stage* @return*/private def getMissingParentStages(stage: Stage): List[Stage] = {val missing = new HashSet[Stage]val visited = new HashSet[RDD[_]]// We are manually maintaining a stack here to prevent StackOverflowError// caused by recursively visiting// 压栈的方式 先入后出val waitingForVisit = new Stack[RDD[_]]def visit(rdd: RDD[_]) {if (!visited(rdd)) {visited += rddif (getCacheLocs(rdd).contains(Nil)) {// 遍历rdd的依赖for (dep <- rdd.dependencies) {dep match {// 宽依赖(shuffle依赖)case shufDep: ShuffleDependency[_, _, _] =>// 使用那个宽依赖创建一个ShuffleMapStage,并且会将isshuffleMap设置为true// 那么默认最后一个stage,不是shuffleMap stage// 但是finalStage之前所有的stage,都是shuffleMap stageval mapStage = getShuffleMapStage(shufDep, stage.jobId)if (!mapStage.isAvailable) {missing += mapStage}// 窄依赖,直接压栈case narrowDep: NarrowDependency[_] =>waitingForVisit.push(narrowDep.rdd)}}}}}// 首先往栈中压入一个RDDwaitingForVisit.push(stage.rdd)//遍历RDDwhile (!waitingForVisit.isEmpty) {visit(waitingForVisit.pop())}missing.toList}
通过以上两个方法递归循环调用将所有的stage保存在waitingStages缓存中。
循环调用下面的方法将stage提交
private def submitWaitingStages() {// TODO: We might want to run this less often, when we are sure that something has become// runnable that wasn't before.logTrace("Checking for newly runnable parent stages")logTrace("running: " + runningStages)logTrace("waiting: " + waitingStages)logTrace("failed: " + failedStages)val waitingStagesCopy = waitingStages.toArraywaitingStages.clear()// 循环提交stagefor (stage <- waitingStagesCopy.sortBy(_.jobId)) {submitStage(stage)}}
stage划分算法的总结:
1、stage从finalstage倒推
2、通过宽依赖,来对新的stage进行划分提交
3、通过递归的方式,优先提交父stage
对于产生shuffle的算子,底层会产生三个RDD,分别是MappartitionRDD、shuffleRDD和MappartitionRDD,第一个MappartitionRDD和ShuffleRDD之间会产生shuffle,所以这个就是stage分配的分割点。
二、task的最佳位置计算算法
/*** 提交stage,为stage创建一批task,task 的数量与partition数量相同*/private def submitMissingTasks(stage: Stage, jobId: Int) {logDebug("submitMissingTasks(" + stage + ")")// Get our pending tasks and remember them in our pendingTasks entrystage.pendingTasks.clear()// First figure out the indexes of partition ids to compute.// 获取创建task的数量val partitionsToCompute: Seq[Int] = {if (stage.isShuffleMap) {(0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)} else {val job = stage.resultOfJob.get(0 until job.numPartitions).filter(id => !job.finished(id))}}val properties = if (jobIdToActiveJob.contains(jobId)) {jobIdToActiveJob(stage.jobId).properties} else {// this stage will be assigned to "default" poolnull}// 将stage加入到runningStagesrunningStages += stage// SparkListenerStageSubmitted should be posted before testing whether tasks are// serializable. If tasks are not serializable, a SparkListenerStageCompleted event// will be posted, which should always come after a corresponding SparkListenerStageSubmitted// event.stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))outputCommitCoordinator.stageStart(stage.id)listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast// the serialized copy of the RDD and for each task we will deserialize it, which means each// task gets a different copy of the RDD. This provides stronger isolation between tasks that// might modify state of objects referenced in their closures. This is necessary in Hadoop// where the JobConf/Configuration object is not thread-safe.var taskBinary: Broadcast[Array[Byte]] = nulltry {// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).// For ResultTask, serialize and broadcast (rdd, func).val taskBinaryBytes: Array[Byte] =if (stage.isShuffleMap) {closureSerializer.serialize((stage.rdd, stage.shuffleDep.get) : AnyRef).array()} else {closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array()}taskBinary = sc.broadcast(taskBinaryBytes)} catch {// In the case of a failure during serialization, abort the stage.case e: NotSerializableException =>abortStage(stage, "Task not serializable: " + e.toString)runningStages -= stagereturncase NonFatal(e) =>abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")runningStages -= stagereturn}// 为stage创建指定数量的task// task最佳位置的计算算法// 最后一个stage的Task是ResultTask,其他的都是ShuffleMapTaskval tasks: Seq[Task[_]] = if (stage.isShuffleMap) {partitionsToCompute.map { id =>// 给每个partition创建一个task// 并计算task的最佳位置val locs = getPreferredLocs(stage.rdd, id)val part = stage.rdd.partitions(id)// 对于finalStage之外的stage,他的isshuffleMap设置为true// 所以会创建ShuffleMapTasknew ShuffleMapTask(stage.id, taskBinary, part, locs)}} else {// 不是ShuffleMapTask,那就是finalStage// finalStage,是用来创建ResultTask的val job = stage.resultOfJob.getpartitionsToCompute.map { id =>val p: Int = job.partitions(id)val part = stage.rdd.partitions(p)val locs = getPreferredLocs(stage.rdd, p)new ResultTask(stage.id, taskBinary, part, locs, id)}}if (tasks.size > 0) {logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")stage.pendingTasks ++= taskslogDebug("New pending tasks: " + stage.pendingTasks)// 最后通过taskScheduler提交task settaskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))stage.latestInfo.submissionTime = Some(clock.getTimeMillis())} else {// Because we posted SparkListenerStageSubmitted earlier, we should post// SparkListenerStageCompleted here in case there are no tasks to run.outputCommitCoordinator.stageEnd(stage.id)listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))logDebug("Stage " + stage + " is actually done; %b %d %d".format(stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))runningStages -= stage}}
总结:
1、一个stage内部会有很多个task来执行
2、task的位置是根据cache和checkpoint决定的。
3、从stage的最后一个RDD开始,去查找RDD的partition上寻找是不是被cache了还是checkpoint了,如果有的话那么最佳位置就是cache或者checkpoint的partition上,因为在这个partition上,就不需要计算以前的父RDD了
4、如果既有cache还有checkpoint,那么以cache的partition为准。
5、如果没有查找到最佳位置,那么最后由taskschedule来决定。
6、只有最后一个stage的task是resultTask,其他的都是shuffleMaptTask
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
