spark的stage划分的原理

????????在 Apache Spark 中,stage?是执行作业时的重要执行单元。一个 Spark 作业会被划分为若干个?stage,每个?stage?由一组可以并行执行的任务组成。这种划分主要依赖于 RDD 中的操作类型(窄依赖和宽依赖)。下面我们来讨论 Spark?stage?的创建和划分的原理以及代码实现的核心逻辑。

Spark Stage 划分的原理

  1. RDD 依赖(窄依赖和宽依赖)

    • Spark 中,RDD 可以有两种依赖关系:
      • 窄依赖(narrow dependency):父 RDD 的每个分区至多被子 RDD 的一个分区使用,典型的操作如?mapfilter?等。
      • 宽依赖(wide dependency):父 RDD 的每个分区可能被多个子 RDD 的分区使用,典型的操作如?reduceByKeygroupByKey?等,这类操作会触发?shuffle
    • 窄依赖的 RDD 操作可以被划分到同一个?stage?中,而宽依赖的 RDD 操作会触发?shuffle,导致?stage?划分。
  2. DAG(有向无环图)

    Spark 的作业会构建一个 RDD 的依赖图(DAG)。这个 DAG 中每个 RDD 的窄依赖操作会被合并成一个?stage,宽依赖操作会划分出不同的?stage,并在两个?stage?之间插入?shuffle
  3. Stage?划分规则

    • 每当遇到一个宽依赖(如?reduceByKeyjoingroupByKey?等),Spark 会创建一个新的?stage,并将之前的 RDD 操作划分到一个?stage?中,形成一个有序的?stage?执行链。
    • stage?划分的核心任务是:将窄依赖操作尽可能合并到一起,直到遇到需要?shuffle?的宽依赖操作。

Spark?Stage?划分的核心代码逻辑

????????Spark 的 DAG 划分及?stage?划分主要在?DAGScheduler?中实现。DAGScheduler?是 Spark 作业调度的核心组件,负责将逻辑作业(job)划分为多个?stage,并调度这些?stage?执行。

以下是 Spark 3.x 版本中有关?stage?划分的核心逻辑及其简化代码片段。

1.?DAGScheduler 类

? DAGScheduler?类位于?org.apache.spark.scheduler?包下,它负责管理 RDD 依赖关系并创建?stageDAGScheduler?会根据 RDD 的依赖图和操作类型,生成任务的 DAG 并划分?stage

class DAGScheduler(
    // 参数略...
) extends Logging {

  // stage 列表
  private val stages = new HashMap[StageId, Stage]()

  // 提交 Job 时触发的函数
  def submitJob(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Seq[Int],
      callSite: CallSite,
      allowLocal: Boolean,
      resultHandler: (Int, _) => Unit,
      properties: Properties = null): JobWaiter[_] = {

    // 根据 RDD 和依赖关系生成最终的 ResultStage
    val finalStage = createFinalStage(rdd, partitions, callSite)

    // 提交该 stage 执行
    submitStage(finalStage)
  }

  // 创建 ResultStage 和后续的 Stage
  private def createFinalStage(
      rdd: RDD[_],
      partitions: Seq[Int],
      callSite: CallSite): ResultStage = {
    // 创建该作业的最终的 stage,并递归创建所有依赖的 stage
    val finalStage = newStage(rdd, partitions)
    finalStage
  }

  // 递归生成各个 Stage,核心逻辑
  private def newStage(rdd: RDD[_], partitions: Seq[Int]): Stage = {
    // 检查缓存,避免重复生成 Stage
    stages.getOrElseUpdate(rdd.id, {
      val shuffleDeps = getShuffleDependencies(rdd)

      // 如果存在宽依赖,则要划分为不同的 stage
      if (shuffleDeps.nonEmpty) {
        val parentStages = shuffleDeps.map { dep =>
          newStage(dep.rdd, dep.rdd.partitions.indices)
        }
        val newStage = new ShuffleMapStage(rdd, parentStages)
        stages(newStage.id) = newStage
        newStage
      } else {
        // 如果只有窄依赖,当前操作在同一个 stage 内
        val parentStages = getNarrowDependencies(rdd).map { dep =>
          newStage(dep.rdd, dep.rdd.partitions.indices)
        }
        val newStage = new ResultStage(rdd, parentStages)
        stages(newStage.id) = newStage
        newStage
      }
    })
  }

  // 获取 RDD 的 shuffle 依赖(宽依赖)
  private def getShuffleDependencies(rdd: RDD[_]): List[ShuffleDependency[_, _, _]] = {
    rdd.dependencies.collect {
      case shuffleDep: ShuffleDependency[_, _, _] => shuffleDep
    }
  }

  // 获取 RDD 的窄依赖
  private def getNarrowDependencies(rdd: RDD[_]): List[Dependency[_]] = {
    rdd.dependencies.collect {
      case narrowDep: NarrowDependency[_] => narrowDep
    }
  }
}
2.?Stage 划分的基本过程
  • RDD 依赖遍历:通过?newStage?函数递归遍历 RDD 的依赖关系,将遇到的每一个?shuffle?依赖(宽依赖)创建一个新的?ShuffleMapStage,而?ResultStage?则用于最终计算结果。

  • 宽依赖处理:当遇到宽依赖(ShuffleDependency),说明需要进行?shuffle,因此要创建一个新的?stage

  • 窄依赖处理:当只有窄依赖时,RDD 可以继续合并在当前的?stage?中。

3.?ShuffleMapStage 和 ResultStage

ShuffleMapStage?和?ResultStage?是 Spark 中两种类型的?Stage

  • ShuffleMapStage:处理宽依赖(shuffle),该?stage?会产生?shuffle?文件供下游?stage?使用。
  • ResultStage:最终计算?Action(如?collectsaveAsTextFile?等)结果的?stage,是 DAG 中的最后一个?stage

代码流程总结

  1. DAGScheduler?在收到作业时,会从最后的?Action?开始,通过递归函数?newStage,根据 RDD 的依赖关系逐步向上遍历。
  2. 当遇到?shuffle?依赖时,会将其划分为不同的?stage,每个?shuffle?依赖会产生一个?ShuffleMapStage
  3. 所有的窄依赖 RDD 操作则合并为一个?stage,在同一个?stage?中执行。
  4. submitStage?负责将划分好的?stage?发送给 TaskScheduler,TaskScheduler 则进一步调度任务到集群执行。

总结

  • 窄依赖操作:操作在同一个?stage?中执行,尽可能合并,减少?shuffle
  • 宽依赖操作:每个宽依赖会触发新的?stage,并引入?shuffle,每个?shuffle?会将数据重新分布给后续的?stage
  • DAGScheduler?的作用:DAG 调度器负责将 RDD 操作链划分为多个?stage,并根据依赖关系生成一个 DAG。
上一篇:discuz 搭建论坛
下一篇:RabbitMQ 快速入门