CoarseGrainedExecutorBackend启动源码分析

Youtube视频分析

executor启动图解

316KB
Open

SparkContext向Master发送消息

  • SparkContext向Master发送消息RegisterApplication

    /**
     *  Register with all masters asynchronously and returns an array `Future`s for cancellation.
     */
    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered.get) {
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            val masterRef =
              rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
            masterRef.send(RegisterApplication(appDescription, self))
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        })
      }
    }

Master向Worker发送消息

  • master处理RegisterApplication消息时调用,资源调度方法

  • 资源调度方法中调用launchExecutor方法

Worker处理LaunchExecutor消息

  • new ExecutorRunner 并启动新线程来进行executor进程

  • 向master发送Executor状态改变消息: ExecutorStateChange

CoarseGrainedExecutorBackend启动

  • main方法启动executor进程

Last updated

Was this helpful?