The micro-batch loop
df.writeStream...start()
│
▼
StreamingQueryManager.createQuery → MicroBatchExecution → StreamExecution.start()
│
▼ dedicated query-execution thread runs runActivatedStream()
│
╭───────────────────────── per trigger ─────────────────────────╮
│ 1. poll each Source for its latest offset │
│ 2. write those offsets to the OFFSET LOG (write-ahead) │
│ 3. build a normal QueryExecution over [lastOffset, newOffset) │
│ 4. run the batch → write results to the Sink │
│ 5. write the batch id to the COMMIT LOG │
│ 6. tell sources they may discard committed data │
╰────────────────────────────────────────────────────────────────╯
(repeat until stopped)
From writeStream to a running query
Calling start() on a DataStreamWriter resolves the checkpoint location and trigger, then asks the StreamingQueryManager to create the query. The manager picks the engine: a ContinuousExecution for a continuous trigger with a v2 sink, otherwise a MicroBatchExecution (the default). It then starts the query's dedicated thread.
DataStreamWriter.scala L304-L333 — startQuery
StreamingQueryManager.scala L177-L257 — createQuery picks the engine
StreamExecution — the engine base
StreamExecution is the abstract base for streaming query execution. It owns the checkpoint metadata, the offset and commit logs, and the lifecycle state. Crucially it runs the query on its own UninterruptibleThread (queryExecutionThread) to avoid interrupt-related hangs in source clients, repeatedly attempting batches until the query is stopped.
MicroBatchExecution — one trigger at a time
runActivatedStream drives a TriggerExecutor that calls executeOneBatch on each tick. A batch constructs the next offsets, runs the batch if there is new data or pending state cleanup, then advances the batch id. The default ProcessingTimeExecutor simply loops, running a batch and sleeping until the next interval.
MicroBatchExecution.scala L592-L687 — runActivatedStream & executeOneBatch
runBatch — a normal query over new data
The heart of the design: runBatch asks each source for the records between the committed offset and the new offset, rewires those into the logical plan, builds an IncrementalExecution (the same Catalyst pipeline, extended with stateful-operator rules), and writes the output to the sink. The streaming "magic" is mostly bookkeeping around an otherwise ordinary batch query.
Offsets, the WAL, and exactly-once
Fault tolerance comes from two append-only logs in the checkpoint directory. The offset log records, before a batch runs, exactly which input range that batch will process (a write-ahead log). The commit log records, after a batch succeeds, that the batch is durably complete. On restart the engine compares the two: if the last batch was written to the offset log but not the commit log, it re-runs it. Combined with idempotent sinks, this yields end-to-end exactly-once processing.
offsets/ ← OffsetSeqLog : "batch N will read up to these offsets" (BEFORE)
commits/ ← CommitLog : "batch N completed" (AFTER)
state/ ← versioned state store for stateful operators
metadata/ ← stream id
MicroBatchExecution.scala L1257-L1275 — markMicroBatchStart (offset WAL)
HDFSMetadataLog.scala L205-L246 — atomic write (temp-then-rename)
Sources and sinks
A Source exposes getOffset (the latest available position), getBatch(start, end) (the data in a range), and commit(end) (permission to discard). A Sink exposes an idempotent addBatch(batchId, data). These small contracts (and their v2 equivalents) are what let connectors like Kafka, files, and Delta plug into the same execution loop with consistent recovery semantics.
Continuous processing (the other engine)
For the lowest latency, a ContinuousExecution runs long-lived tasks that process records as they arrive, checkpointing progress by epoch rather than by discrete batches. It is a separate code path selected only for a continuous trigger with a compatible v2 sink; the micro-batch engine remains the default and the most widely used.
Key takeaways
- A streaming query is a loop of ordinary batch queries over new input ranges.
- The offset log is written before a batch; the commit log after — this ordering enables replay.
- Idempotent sinks plus the two logs give end-to-end exactly-once semantics.
- Continuous mode trades the batch model for epoch-based low-latency processing.