Yarn client 模式运行机制源码分析
执行下面的代码:
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100
启动类:
/opt/module/jdk1.8.0_172/bin/java 
-cp /opt/module/spark-yarn/conf/:/opt/module/spark-yarn/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ 
-Xmx1g 
org.apache.spark.deploy.SparkSubmit 
--master yarn 
--deploy-mode client 
--class org.apache.spark.examples.SparkPi 
./examples/jars/spark-examples_2.11-2.1.1.jar 100
依次启动 3 个不同的进程:
SparkSubmit
ExecutorLauncher
CoarseGrainedExecutorBackend
client 模式下直接运行用户的主类:
prepareSubmitEnvironment方法
/*
    client 模式下, 直接启动用户的主类
*/
if (deployMode == CLIENT || isYarnCluster) {
    // 如果是客户端模式, childMainClass 就是用户的类
    // 集群模式下, childMainClass 被重新赋值为 org.apache.spark.deploy.yarn.Client
    childMainClass = args.mainClass
}
然后不会创建ApplicationMaster, 而是直接执行用户类的main方法
然后开始实例化 SparkContext
实例化SparkContext
val (sched, ts):(SchedulerBackend, TaskScheduler) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
// 启动 YarnScheduler
_taskScheduler.start()
SparkContext.createTaskScheduler方法
关键代码:
private def createTaskScheduler(
                                   sc: SparkContext,
                                   master: String,
                                   deployMode: String): (SchedulerBackend, TaskScheduler) = {
    import SparkMasterRegex._
    master match {
        case masterUrl =>
            // 得到的是 YarnClusterManager
            val cm = getClusterManager(masterUrl) match {
                case Some(clusterMgr) => clusterMgr
                case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
            }
            try {
                // 创建 YarnScheduler
                val scheduler: TaskScheduler = cm.createTaskScheduler(sc, masterUrl)
                // 创建 YarnClientSchedulerBackend
                val backend: SchedulerBackend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
                cm.initialize(scheduler, backend)
                (backend, scheduler)
            } catch {
            }
    }
}
YarnClusterManager类
private[spark] class YarnClusterManager extends ExternalClusterManager {
    override def canCreate(masterURL: String): Boolean = {
        masterURL == "yarn"
    }
    override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
        sc.deployMode match {
            case "cluster" => new YarnClusterScheduler(sc)
            case "client" => new YarnScheduler(sc)
            case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
        }
    }
    override def createSchedulerBackend(sc: SparkContext,
                                        masterURL: String,
                                        scheduler: TaskScheduler): SchedulerBackend = {
        sc.deployMode match {
            case "cluster" =>
                new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
            case "client" =>
                new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
            case _ =>
                throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
        }
    }
    override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
        scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
    }
}
_taskScheduler.start()
YarnClientSchedulerBackend 的 start 方法
/**
  * Create a Yarn client to submit an application to the ResourceManager.
  * This waits until the application is running.
  *
  * 创建客户端, 提交应用给 ResourceManager
  * 会一直等到应用开始执行
  */
override def start() {
    val driverHost = conf.get("spark.driver.host")
    val driverPort = conf.get("spark.driver.port")
    val argsArrayBuf = new ArrayBuffer[String]()
    argsArrayBuf += ("--arg", hostport)
    val args = new ClientArguments(argsArrayBuf.toArray)
    client = new Client(args, conf)
    // 使用 Client 提交应用
    bindToYarn(client.submitApplication(), None)
    waitForApplication()
}
org.apache.spark.deploy.yarn.Client 源码再分析
submitApplication方法
yarnClient.submitApplication(appContext)
ExecutorLauncher类
yarnClient 提交应用的时候, 把要执行的主类(ExecutorLauncher)封装到配置中. 所以不是启动ApplicationMaster, 而是启动ExecutorLauncher
// createContainerLaunchContext()
val amClass =
    if (isClusterMode) {
        Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
    } else {
        Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
    }
/**
  * This object does not provide any special functionality. It exists so that it's easy to tell
  * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps.
  * 
  * 这个对象不提供任何特定的功能.
  * 
  * 它的存在使得在使用诸如ps或jps之类的工具时,很容易区分客户机模式AM和集群模式AM。
  * 
  */
object ExecutorLauncher {
    def main(args: Array[String]): Unit = {
        ApplicationMaster.main(args)
    }
}
ApplicationMaster 源码再分析
run方法
final def run(): Int = {
    try {
        if (isClusterMode) {
            runDriver(securityMgr)
        } else {
            // 非集群模式, 直接执行 ExecutorLauncher, 而不在需要运行 Driver
            runExecutorLauncher(securityMgr)
        }
    } catch {
    }
    exitCode
}
runExecutorLauncher
private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
    val driverRef = waitForSparkDriver()
    addAmIpFilter()
    registerAM(sparkConf, rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""),
        securityMgr)
    // In client mode the actor will stop the reporter thread.
    reporterThread.join()
}
在以后的执行流程就和yarn-cluster模式一样了. 不再赘述
总结
