2.3.1 Master 启动源码分析

Master 源码分析

org.apache.spark.deploy.master.Master

Master伴生对象

启动Master的入口为Master伴生对象的main方法:

private[deploy] object Master extends Logging {
    val SYSTEM_NAME = "sparkMaster"
    val ENDPOINT_NAME = "Master"
    // 启动 Master 的入口函数
    def main(argStrings: Array[String]) {
        Utils.initDaemon(log)
        val conf = new SparkConf
        // 构建用于参数解析的实例   --host hadoop201 --port 7077 --webui-port 8080
        val args = new MasterArguments(argStrings, conf)
        // 启动 RPC 通信环境和 MasterEndPoint(通信终端)
        val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
        rpcEnv.awaitTermination()
    }

    /**
      * Start the Master and return a three tuple of:
      * 启动 Master 并返回一个三元组
      * (1) The Master RpcEnv
      * (2) The web UI bound port
      * (3) The REST server bound port, if any
      */
    def startRpcEnvAndEndpoint(
                                  host: String,
                                  port: Int,
                                  webUiPort: Int,
                                  conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
        val securityMgr = new SecurityManager(conf)
        // 创建 Master 端的 RpcEnv 环境   参数: sparkMaster hadoop201 7077 conf securityMgr
        // 实际类型是: NettyRpcEnv
        val rpcEnv: RpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
        // 创建 Master对象, 该对象就是一个 RpcEndpoint, 在 RpcEnv中注册这个RpcEndpoint
        // 返回该 RpcEndpoint 的引用, 使用该引用来接收信息和发送信息
        val masterEndpoint: RpcEndpointRef = rpcEnv.setupEndpoint(ENDPOINT_NAME,
            new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
        // 向 Master 的通信终端发送请求,获取 BoundPortsResponse 对象
        // BoundPortsResponse 是一个样例类包含三个属性: rpcEndpointPort webUIPort restPort
        val portsResponse: BoundPortsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
        (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
    }
}

RpcEnv的创建

真正的创建是调用NettyRpcEnvFactorycreate方法创建的.

创建 NettyRpcEnv的时候, 会创建消息分发器, 收件箱和存储远程地址与发件箱的 Map

RpcEnv.scala

def create(
              name: String,
              bindAddress: String,
              advertiseAddress: String,
              port: Int,
              conf: SparkConf,
              securityManager: SecurityManager,
              clientMode: Boolean): RpcEnv = {
    // 保存 RpcEnv 的配置信息
    val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
        clientMode)
    // 创建 NettyRpcEvn
    new NettyRpcEnvFactory().create(config)
}

NettyRpcEnvFactory

private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {

    def create(config: RpcEnvConfig): RpcEnv = {
        val sparkConf = config.conf
        // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
        // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
        // 用于 Rpc传输对象时的序列化
        val javaSerializerInstance: JavaSerializerInstance = new JavaSerializer(sparkConf)
            .newInstance()
            .asInstanceOf[JavaSerializerInstance]
        // 实例化 NettyRpcEnv
        val nettyEnv = new NettyRpcEnv(
            sparkConf,
            javaSerializerInstance,
            config.advertiseAddress,
            config.securityManager)
        if (!config.clientMode) {
            // 定义 NettyRpcEnv 的启动函数
            val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
                nettyEnv.startServer(config.bindAddress, actualPort)
                (nettyEnv, nettyEnv.address.port)
            }
            try {
                // 启动 NettyRpcEnv
                Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
            } catch {
                case NonFatal(e) =>
                    nettyEnv.shutdown()
                    throw e
            }
        }
        nettyEnv
    }
}

Master伴生类(Master 端的 RpcEndpoint 启动)

Master是一个RpcEndpoint.

他的生命周期方法是:

constructor -> onStart -> receive* -> onStop

onStart 主要代码片段

// 创建 WebUI 服务器
webUi = new MasterWebUI(this, webUiPort)
// 按照固定的频率去启动线程来检查 Worker 是否超时. 其实就是给自己发信息: CheckForWorkerTimeOut
// 默认是每分钟检查一次.
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
    override def run(): Unit = Utils.tryLogNonFatalError {
        // 在 receive 方法中对 CheckForWorkerTimeOut 进行处理
        self.send(CheckForWorkerTimeOut)
    }
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)

处理Worker是否超时的方法:

/** Check for, and remove, any timed-out workers */
private def timeOutDeadWorkers() {
    // Copy the workers into an array so we don't modify the hashset while iterating through it
    val currentTime = System.currentTimeMillis()
    // 把超时的 Worker 从 Workers 中移除
    val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
    for (worker <- toRemove) {
        if (worker.state != WorkerState.DEAD) {
            logWarning("Removing %s because we got no heartbeat in %d seconds".format(
                worker.id, WORKER_TIMEOUT_MS / 1000))
            removeWorker(worker)
        } else {
            if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
                workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
            }
        }
    }
}

到此, Master启动完成.

Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-06-08 08:51:16

results matching ""

    No results matching ""