Master资源调度--SparkContext向所有master注册

Youtube视频分享

SparkContext启动向master发送消息

  • ClientEndpoint向master发送消息: RegisterApplication

    /**
     *  Register with all masters asynchronously and returns an array `Future`s for cancellation.
     */
    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered.get) {
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            val masterRef =
              rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
            masterRef.send(RegisterApplication(appDescription, self))
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        })
      }
    }

master处理消息RegisterApplication

  • 创建 Application 并注册到master上

  • Application 保存到 master 存储引擎中

  • 向driver发送已注册成功消息: RegisteredApplication

  • 过滤所有已注册的Worker(状态为ALIVE)

  • 遍历 waitingDrivers,如果有等待中的Drivers,给worker发送启动Driver消息: LaunchDriver

  • 调用在worker上启动executor方法

  • 过滤waitingApps,刚才注册的Application已经在ArrayBuffer中

  • 对已注册的worker进行过滤

  • 过滤条件状态为ALIVE,可用cpu内核数大于等于每个executor的内核数,可用内存大于等于Application在每个executor需要的内存数

  • 对可用worker进行排序(按可用内核数从大到小排序)

  • 调用方法 scheduleExecutorsOnWorkers,worker给executor分配多少个cpu内核

  • 进行具体的当前Application在Worker上给executor分配几个cpu内核

  • 分配worker资源给executor

  • 给worker发送启动executor消息: LaunchExecutor

  • 给driver发送Executor已增加消息:ExecutorAdded

Last updated

Was this helpful?