4.5 Task 级别任务调度源码分析

taskScheduler.submitTasks(new TaskSet(
            tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

根据前面的分析介绍, DAGShedulerTask 提交给TaskScheduler时, 需要将多个 Task打包为TaskSet.

TaskSet是整个调度池中对Task进行调度管理的基本单位, 由调度池中的TaskManager来管理.

taskScheduler.submitTasks 方法

// 把 TaskSet 交给任务调度池来调度
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

schedulableBuilder的类型是:SchedulableBuilder, 它是一个Trait, 有两个已知的实现子类: FIFOSchedulableBuilderFairSchedulableBuilder

SchedulableBuilder(调度池构建器)

1. FIFOSchedulableBuilder

FIFOSchedulableBuilder.addTaskSetManager

override def addTaskSetManager(manager: Schedulable, properties: Properties) {
    // 对 FIFO 调度, 则直接交给根调度器来调度
    // 因为 FIFO 调度只有一个根调度度池
    rootPool.addSchedulable(manager)
}

说明:

  • rootPool是根调度池, 它的类型是Pool, 表示PollTaskSet的可调度实体.

  • FIFO 调度是默认调度算法

  • spark.scheduler.mode类设置调度算法:FIFO,FAIR

  • 根调度池是在初始化TaskSchedulerImpl的时候创建的.

  • FIFOSchedulableBuilder 不需要再构建新的子调度池, 只需要有 rootPoll就可以了

    override def buildPools() {
      // nothing
    }
    

2. FairSchedulableBuilder

不仅仅需要根调度池, 还需要创建更多的调度池

FairSchedulableBuilder.buildPools 方法内会创建更多的子调度池.


SchedulingAlgorithm(调度算法)

/**
  * An interface for sort algorithm
  *     用于排序算法的接口
  * FIFO: FIFO algorithm between TaskSetManagers
  *  FIFO:   TaskSetManager 之间的排序
  *
  * FS: FS algorithm between Pools, and FIFO or FS within Pools
  *  FS: 池之间排序
  */
private[spark] trait SchedulingAlgorithm {
    def comparator(s1: Schedulable, s2: Schedulable): Boolean
}

1. FIFOSchedulingAlgorithm

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
    // 是不是先调度 s1
    override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
        val priority1 = s1.priority
        val priority2 = s2.priority
        var res = math.signum(priority1 - priority2)
        if (res == 0) {
            val stageId1 = s1.stageId
            val stageId2 = s2.stageId
            res = math.signum(stageId1 - stageId2)
        }
        res < 0  // 值小的先调度
    }
}

2. FairSchedulingAlgorithm

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
    override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
        val minShare1 = s1.minShare
        val minShare2 = s2.minShare
        val runningTasks1 = s1.runningTasks
        val runningTasks2 = s2.runningTasks
        val s1Needy = runningTasks1 < minShare1
        val s2Needy = runningTasks2 < minShare2
        val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
        val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
        val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
        val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble

        var compare = 0
        if (s1Needy && !s2Needy) { // 谁的 runningTasks1 < minShare1 谁先被调度
            return true
        } else if (!s1Needy && s2Needy) {
            return false
        } else if (s1Needy && s2Needy) { // 如果都 runningTasks < minShare
            // 则比较 runningTasks / math.max(minShare1, 1.0) 的比值 小的优先级高
            compare = minShareRatio1.compareTo(minShareRatio2)
        } else {
            // 如果都runningTasks > minShare, 则比较 runningTasks / weight 的比值
            // 小的优先级高
            compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
        }

        if (compare < 0) {
            true
        } else if (compare > 0) {
            false
        } else {
            // 如果前面都一样, 则比较 TaskSetManager 或 Pool 的名字
            s1.name < s2.name
        }
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-04-11 20:47:12

results matching ""

    No results matching ""