SparkContext 源码分析

SparkContext 源码分析,带Youtub视频

Youtub 视频分享

文档说明

原文

Main entry point for Spark functionality.
A SparkContext represents the connection to a Spark cluster, 
and can be used to create RDDs, accumulators and broadcast variables on that cluster.
Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one. 
This limitation may eventually be removed; see SPARK-2243 for more details.

翻译

).Spark功能主要入口点
).一个SparkContext表示与一个Spark集群的连接
).在Spark集群上,能创建RDDs,累加器,广播变量
).每个JVM仅仅只有一个SparkContext可能是活动的
).在创建一个新的SparkContext之前,你必须停掉活动的SparkContext,这个限制最终可能被 移除,看SPARK-2243 更多详情

SparkContext原理图

124KB
Open
SparkContext 原理图

SparkContext原理图

xmind文件下载

SparkContext原理图xmind文件

https://github.com/opensourceteams/spark-scala-maven/blob/master/md/images/spark/SparkContext.xmind

配置信息

可配置信息

  • spark.jars = jar文件路径(可迭代的)

  • spark.files = 文件路径

  • spark.eventLog.dir=/tmp/spark-events // 事件日志目录

  • spark.eventLog.compress=false //事件日志是否压缩

  • spark.shuffle.manager=sort //指定shuffler manager

  • spark.memory.useLegacyMode=true //指定内存管理器

  • spark.cores.max=2 设置executor占用cpu内核个数

  • spark.scheduler.mode=FIFO //TaskSchedulerImpl 调度模式,可选(FIFO,FAIR,NONE)

  • spark.executor.extraJavaOptions= //设置executor启动执行的java参数

  • spark.executor.extraClassPath= //设置 executor 执行的classpath

  • spark.executor.extraLibraryPath= //设置 executor LibraryPath

  • spark.executor.cores= //executor core 个数分配

  • spark.rpc.lookupTimeout="120s" //设置 RPCTimeout超时时间

  • spark.network.timeout="120s" //设置 RPCTimeout超时时间

Spark系统设置配置信息

  • spark.driver.host = Utils.localHostName()

  • spark.driver.port = 0

  • spark.executor.id = driver

主要内容

创建作业进度监听器

创建SparkEnv

  • 创建DriverEnv

  • 指定默认的 spark.rpc = org.apache.spark.rpc.netty.NettyRpcEnvFactory

  • 创建NettyRpcEnv并启动,此时启动 'sparkDriver'

  • 创建ActorSystem并启动,此时启动 'sparkDriverActorSystem'

  • 指定spark序列化器: org.apache.spark.serializer.JavaSerializer

val serializer = instantiateClassFromConfSerializer logDebug(s"Using serializer: ${serializer.getClass}")

  • 注册 MapOutputTracker 到 NettyRpcEndpointRef(通信用 和map输出信息的追踪)

  • 实例化ShuflleManager

  • 实例化内存管理器

val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", true) val memoryManager: MemoryManager = if (useLegacyMemoryManager) { new StaticMemoryManager(conf, numUsableCores) } else { UnifiedMemoryManager(conf, numUsableCores) }

  • 缓存管理器 实例化BlockManager

  • 创建测量系统

  • 创建临时目录,如果是分布式模式,这是一个executor的当前工作目录

  • 注册 OutputCommitCoordinator 到 NettyRpcEndpointRef(通信用)

  • new SparkEnv 并返回

创建SparkUI

注册心跳接收器

创建和初使化调度器(TaskScheduler,DAGScheduler)

  • org.apache.spark.scheduler.TaskSchedulerImpl 文档说明

  • Standalone模式创建TaskSchedulerImpl并初使化中指定 backend为SparkDeploySchedulerBackend

任务调度器启动

  • 任务调度器启动_taskScheduler.start()

  • 调用SparkDeploySchedulerBackend start方法

  • 再调用CoarseGrainedSchedulerBackend 的start方法 registerRpcEndpoint 注册(通信用) [CoarseGrainedScheduler]

  • 实例化ApplicationDescription 包含 command (org.apache.spark.executor.CoarseGrainedExecutorBackend)

  • 启动 AppClient registerRpcEndpoint 注册(通信用)[AppClient]

  • CoarseGrainedSchedulerBackend 文档

  • 调用ClientEndpoint 的 onStart方法 异步向所有master注册,向master发送消息: RegisterApplication

org.apache.spark.rpc.netty.Dispatcher

类文档说明

注册RPC端点 (关键通信及OnStart方法的调用)

Inbox

A inbox that stores messages for an [[RpcEndpoint]] and posts messages to it thread-safely.

入口代码块 400行

Last updated

Was this helpful?