4. Key execution flows
Flow A: spark-submit → user application
| Step | Component | Detail |
|---|---|---|
| Trigger | Shell | bin/spark-submit --class com.example.App app.jar |
| 1 | spark-class | Runs org.apache.spark.launcher.Main with minimal classpath (launcher/target/.../classes + jars) |
| 2 | SparkSubmitCommandBuilder | Builds full java command: classpath, memory, main class SparkSubmit |
| 3 | SparkSubmit.doSubmit | Parses args via SparkSubmitArguments, builds SparkConf |
| 4 | prepareSubmitEnvironment | Sets master URL routing, deploy mode (client/cluster), child main class for cluster mode |
| 5 | runMain | Loads user JAR; reflects main method (client) or submits to YARN/K8s/Standalone (cluster) |
Validation: Master URL format, deploy mode compatibility, primary resource existence — in SparkSubmitArguments and SparkSubmitUtils.
Failure modes: Missing JAR, invalid master, classpath conflicts, cluster manager rejection.
Flow B: RDD action (e.g. rdd.count())
Data structures: ActiveJob, Stage, TaskSet, TaskDescription (serialized to executor).
Side effects: Shuffle files written (map stages), blocks cached if persisted, metrics posted to LiveListenerBus.
Output: Aggregated count returned to driver via JobWaiter.
Failures: Task retry (spark.task.maxFailures); stage retry on fetch failure; job fails if stage exceeds spark.stage.maxConsecutiveAttempts.
Flow C: SQL query (spark.sql("SELECT ...").collect())
Validation: Catalyst CheckAnalysis during analyzer.executeAndCheck; type checking, unresolved attributes rejected.
Business logic: Optimizer rules (predicate pushdown, join reorder) in Catalyst; physical strategy selection in SparkStrategies.
Flow D: PySpark action
Gateway startup: python/pyspark/java_gateway.py:launch_gateway spawns bin/spark-submit pyspark-shell-main, connects Py4J to JVM.
Failure modes: Py4J connection loss, Python worker OOM, serializer mismatch.
Flow E: Structured Streaming micro-batch
State: StateStore on executors, checkpoint dir on durable FS (spark.sql.streaming.checkpointLocation).
Failures: Batch retry from checkpoint; state schema evolution errors; sink commit failures.
Flow F: Spark Connect client query
10. Important code walkthroughs
Walkthrough 1: RDD five properties
File: core/src/main/scala/org/apache/spark/rdd/RDD.scala
The class docstring (lines 69–76) defines the contract every scheduler relies on:
// Partitions, compute(split, context), dependencies, // optional Partitioner, optional preferred locations
map creates MapPartitionsRDD with OneToOneDependency — narrow, same stage. reduceByKey introduces ShuffleDependency which registers shuffle with ShuffleManager at construction time (Dependency.scala).
If changed incorrectly: Wrong partitions → incorrect results; missing shuffle registration → fetch failures; broken preferred locations → slow scans.
Walkthrough 2: DAGScheduler stage submission
File: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
The header comment (lines 58–123) is essential reading — defines Jobs, Stages, Tasks, cache tracking, and failure recovery invariants.
submitStage walks backward until all parent shuffle map stages are complete, then calls submitMissingTasks. Shuffle map stages produce output tracked by MapOutputTrackerMaster.
If changed incorrectly: Memory leaks in long sessions (failure to clear jobIdToStageIds); duplicate task submission; incorrect stage retry logic.
Walkthrough 3: QueryExecution lazy phases
File: sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
Each phase is a separate lazy val instrumented by QueryPlanningTracker. Transaction-aware analysis (lines 103–120) shows nested queries must inherit analyzer for connector transactions — subtle coupling easy to miss.
executedPlan.execute() produces RDD[InternalRow], bridging SQL to core.
Walkthrough 4: Executor TaskRunner
File: core/src/main/scala/org/apache/spark/executor/Executor.scala (~806+)
Per-task: classloader isolation for REPL/artifacts, deserialize task, update epoch for shuffle map output cache, run task.run, report metrics. Kill checks before and during execution via TaskKilledException.