Worker启动源码分析

Youtube视频分享

start-slave.sh启动脚本

  • worker启动脚本跟master一样,都调用spark-daemon.sh,只是启动类不一样

CLASS="org.apache.spark.deploy.worker.Worker"
spark-daemon.sh start $CLASS 

Worker main入口

主要源码

  • 启动 'sparkWorker' 的服务

def main(argStrings: Array[String]) {
    Utils.initDaemon(log)
    val conf = new SparkConf
    val args = new WorkerArguments(argStrings, conf)
    val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
      args.memory, args.masters, args.workDir, conf = conf)
    rpcEnv.awaitTermination()
  }

  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      cores: Int,
      memory: Int,
      masterUrls: Array[String],
      workDir: String,
      workerNumber: Option[Int] = None,
      conf: SparkConf = new SparkConf): RpcEnv = {

    // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
    val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
    val securityMgr = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
    val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
    rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
      masterAddresses, systemName, ENDPOINT_NAME, workDir, conf, securityMgr))
    rpcEnv
  }

Worker onStart方法调用

WorkUI

  • 启动WorkerUI

向所有master注册

  • 线程池中每个master单独一个线程,向master注册worker

  • worker通过 masterEndpoint.ask向master发送注册worker消息 : RegisterWorker

  • master 接收到消息(RegisterWorker)处理后,回应worker消息 : RegisteredWorker

  • worker收到RegisteredWorker消息后,进行 registered = true,和刷新内存中的master信息

Last updated

Was this helpful?