4.5 Task 级别任务调度源码分析
taskScheduler.submitTasks(new TaskSet(
            tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
根据前面的分析介绍, DAGSheduler将Task 提交给TaskScheduler时, 需要将多个 Task打包为TaskSet.
TaskSet是整个调度池中对Task进行调度管理的基本单位, 由调度池中的TaskManager来管理.
taskScheduler.submitTasks方法
// 把 TaskSet 交给任务调度池来调度
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
schedulableBuilder的类型是:SchedulableBuilder, 它是一个Trait, 有两个已知的实现子类: FIFOSchedulableBuilder 和 FairSchedulableBuilder
SchedulableBuilder(调度池构建器)
1. FIFOSchedulableBuilder
FIFOSchedulableBuilder.addTaskSetManager
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
    // 对 FIFO 调度, 则直接交给根调度器来调度
    // 因为 FIFO 调度只有一个根调度度池
    rootPool.addSchedulable(manager)
}
说明:
rootPool是根调度池, 它的类型是Pool, 表示Poll或TaskSet的可调度实体.FIFO调度是默认调度算法spark.scheduler.mode类设置调度算法:FIFO,FAIR根调度池是在初始化
TaskSchedulerImpl的时候创建的.FIFOSchedulableBuilder不需要再构建新的子调度池, 只需要有rootPoll就可以了override def buildPools() { // nothing }
2. FairSchedulableBuilder
不仅仅需要根调度池, 还需要创建更多的调度池
 FairSchedulableBuilder.buildPools 方法内会创建更多的子调度池.
SchedulingAlgorithm(调度算法)
/**
  * An interface for sort algorithm
  *     用于排序算法的接口
  * FIFO: FIFO algorithm between TaskSetManagers
  *  FIFO:   TaskSetManager 之间的排序
  *
  * FS: FS algorithm between Pools, and FIFO or FS within Pools
  *  FS: 池之间排序
  */
private[spark] trait SchedulingAlgorithm {
    def comparator(s1: Schedulable, s2: Schedulable): Boolean
}
1. FIFOSchedulingAlgorithm
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
    // 是不是先调度 s1
    override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
        val priority1 = s1.priority
        val priority2 = s2.priority
        var res = math.signum(priority1 - priority2)
        if (res == 0) {
            val stageId1 = s1.stageId
            val stageId2 = s2.stageId
            res = math.signum(stageId1 - stageId2)
        }
        res < 0  // 值小的先调度
    }
}
2. FairSchedulingAlgorithm
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
    override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
        val minShare1 = s1.minShare
        val minShare2 = s2.minShare
        val runningTasks1 = s1.runningTasks
        val runningTasks2 = s2.runningTasks
        val s1Needy = runningTasks1 < minShare1
        val s2Needy = runningTasks2 < minShare2
        val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
        val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
        val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
        val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
        var compare = 0
        if (s1Needy && !s2Needy) { // 谁的 runningTasks1 < minShare1 谁先被调度
            return true
        } else if (!s1Needy && s2Needy) {
            return false
        } else if (s1Needy && s2Needy) { // 如果都 runningTasks < minShare
            // 则比较 runningTasks / math.max(minShare1, 1.0) 的比值 小的优先级高
            compare = minShareRatio1.compareTo(minShareRatio2)
        } else {
            // 如果都runningTasks > minShare, 则比较 runningTasks / weight 的比值
            // 小的优先级高
            compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
        }
        if (compare < 0) {
            true
        } else if (compare > 0) {
            false
        } else {
            // 如果前面都一样, 则比较 TaskSetManager 或 Pool 的名字
            s1.name < s2.name
        }
    }
}