Worker启动源码分析
Youtube视频分享
youtube: https://youtu.be/ll_Ae6rP7II
start-slave.sh启动脚本
worker启动脚本跟master一样,都调用spark-daemon.sh,只是启动类不一样
CLASS="org.apache.spark.deploy.worker.Worker"
spark-daemon.sh start $CLASS Worker main入口
主要源码
启动 'sparkWorker' 的服务
def main(argStrings: Array[String]) {
Utils.initDaemon(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir, conf = conf)
rpcEnv.awaitTermination()
}
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): RpcEnv = {
// The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, systemName, ENDPOINT_NAME, workDir, conf, securityMgr))
rpcEnv
}
Worker onStart方法调用
WorkUI
启动WorkerUI
向所有master注册
线程池中每个master单独一个线程,向master注册worker
worker通过 masterEndpoint.ask向master发送注册worker消息 : RegisterWorker
master 接收到消息(RegisterWorker)处理后,回应worker消息 : RegisteredWorker
worker收到RegisteredWorker消息后,进行 registered = true,和刷新内存中的master信息
Last updated
Was this helpful?