The scheduling pipeline
action → sc.runJob
│
▼
DAGScheduler.submitJob ──► event loop ──► handleJobSubmitted
│ │
│ createResultStage + ancestor ShuffleMapStages (split at shuffles)
│ ▼
│ submitStage (parents first)
│ ▼
│ submitMissingTasks
│ │ builds ShuffleMapTask / ResultTask
▼ ▼
TaskSchedulerImpl.submitTasks ──► TaskSetManager (one per stage attempt)
│
▼
SchedulerBackend.reviveOffers ──► resourceOffers ──► launchTasks ──► EXECUTOR
▲ │
└──────────────── statusUpdate / taskEnded ◄──────────────────────────┘
DAGScheduler — the stage-oriented planner
The class doc describes it precisely: it "computes a DAG of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule." It also chooses preferred locations for tasks based on cache and shuffle-output placement. It runs on a single-threaded event loop, so all scheduling state mutations are serialized and free of locks.
submitJob is asynchronous; runJob blocks
submitJob validates the partitions, assigns a job id, posts a JobSubmitted event to the event loop, and immediately returns a JobWaiter. runJob is a thin wrapper that calls submitJob and then blocks on that waiter, re-throwing any failure with the driver's stack trace attached. This separation lets the event loop process scheduling decisions without holding up the calling thread.
Splitting the graph into stages
handleJobSubmitted builds a ResultStage for the action. To find its parents, the scheduler walks the RDD graph: when it hits a ShuffleDependency it records a parent-stage boundary and creates (or reuses) a ShuffleMapStage for that shuffle; narrow dependencies are followed straight through. Stages keyed by shuffle id are shared across jobs, so a reused shuffle is not recomputed.
getShuffleDependenciesAndResourceProfiles(rdd):
for each dependency while walking up the graph:
case shuffleDep: ShuffleDependency => record as parent boundary
case narrowDep: NarrowDependency => keep walking up
DAGScheduler.scala L801-L815 — finding shuffle boundaries
DAGScheduler.scala L528-L603 — getOrCreate / createShuffleMapStage
ResultStage vs. ShuffleMapStage
Every job is exactly one ResultStage plus zero or more ancestor ShuffleMapStages. A ResultStage carries the action's function and the partition indices to compute; finishing all its tasks completes the job. A ShuffleMapStage is intermediate: its tasks write shuffle map output, and it is "available" only when every partition's output has been registered with the MapOutputTracker — not merely when its tasks finish.
submitMissingTasks — stage becomes a TaskSet
Once a stage's parents are available, this method asks which partitions are still missing, computes preferred locations for each, serializes the task binary — (rdd, shuffleDep) for a map stage or (rdd, func) for a result stage — and constructs one ShuffleMapTask or ResultTask per partition. It then calls taskScheduler.submitTasks(new TaskSet(...)).
The two task kinds
A ShuffleMapTask runs on a map stage: it iterates its partition and writes the records out through the shuffle writer, returning a MapStatus (block location and per-partition sizes). A ResultTask runs on the final stage: it applies func(context, rdd.iterator(...)) and returns the value to the driver. Both extend Task, whose run sets up the TaskContext and memory manager before calling the subclass's runTask.
Task.run // sets TaskContext, memory, then →
ShuffleMapTask.runTask → writes shuffle data → MapStatus
ResultTask.runTask → func(ctx, rdd.iterator) → result value
Task.scala L34-L43 — the two kinds
TaskSchedulerImpl.resourceOffers — placing tasks
The backend periodically presents WorkerOffers (executor id, host, free cores). resourceOffers is the core assignment routine: it registers new executors, filters unhealthy ones, sorts the active task sets by scheduling pool priority (FIFO or fair), and for each task set walks locality levels — PROCESS_LOCAL → NODE_LOCAL → NO_PREF → RACK_LOCAL → ANY — handing offers to the TaskSetManager until they are full. It returns a list of TaskDescriptions grouped per executor.
TaskSetManager — locality, delay scheduling, retries
One TaskSetManager owns a single stage attempt. It buckets pending tasks by executor, host, and rack so it can answer offers at the tightest possible locality. Delay scheduling means it will briefly reject a less-local offer, waiting for a better one, before falling back. On failure it retries a task up to maxTaskFailures times by re-queuing it; a FetchFailed is special — it makes the set a "zombie" and lets the DAGScheduler resubmit the whole map stage.
TaskSetManager.scala L40-L61 — role & class
TaskSetManager.scala L461-L525 — resourceOffer / delay scheduling
SchedulerBackend — launching on executors
The CoarseGrainedSchedulerBackend keeps a registry of live executors. reviveOffers (called from submitTasks and on a timer) triggers makeOffers, which builds a WorkerOffer per executor, calls resourceOffers, and then launchTasks. launchTasks reserves cores on the executor's record and sends a LaunchTask RPC carrying the serialized TaskDescription. When a task finishes, the status update frees those cores and re-offers them.
submitTasks → reviveOffers → makeOffers → resourceOffers
→ launchTasks → (RPC) LaunchTask → executor
CoarseGrainedSchedulerBackend.scala L372-L456 — makeOffers & launchTasks
Completion, the MapOutputTracker, and recovery
When a ShuffleMapTask succeeds, the DAGScheduler registers its MapStatus with the MapOutputTrackerMaster; when the whole map stage finishes it bumps the tracker's epoch and submits the waiting child stages. If a reduce task reports a FetchFailed, the scheduler unregisters the lost map outputs and resubmits both the failed map stage and the stage that depends on it. The epoch number stamped on every task lets stale completions from before a failure be ignored.
Who owns what
| Layer | Owns | Hands off |
|---|---|---|
DAGScheduler | Job DAG, stage boundaries, shuffle bookkeeping, stage retries | A TaskSet per ready stage |
TaskSchedulerImpl | Task-set priority, executor offers, within-stage task retries | TaskDescription lists |
TaskSetManager | Per-stage task state, locality, speculation | Serialized task + launch metadata |
SchedulerBackend | Executor registry, RPC to executors | LaunchTask messages |
MapOutputTracker | Shuffle output locations and the epoch | Lookups for reduce-side fetch |
Key takeaways
- Stages are cut at shuffle boundaries; the last stage of a job is always a
ResultStage. - A map stage is "done" only when its outputs are registered, not when its tasks return.
- Locality and delay scheduling try hard to run a task where its data already lives.
- Within-stage retries are the
TaskSetManager's job; stage-level retries are theDAGScheduler's. - The map output tracker's epoch is the clock that makes fault recovery consistent.