Skip to content

Fix dbsp storage durability bugs/hardening#6260

Open
gz wants to merge 11 commits into
mainfrom
fix/dbsp-storage-durability-bugs
Open

Fix dbsp storage durability bugs/hardening#6260
gz wants to merge 11 commits into
mainfrom
fix/dbsp-storage-durability-bugs

Conversation

@gz
Copy link
Copy Markdown
Contributor

@gz gz commented May 17, 2026

See commit messages for more details. Bugs and hardening suggestions from @abhirag

Checklist

  • Unit tests added/updated
  • Integration tests added/updated

Breaking Changes?

Hopefully none, please carefully look at dependency.json format change.

gz added 3 commits May 16, 2026 23:51
`Checkpointer::commit` used to push the new entry into
`self.checkpoint_list` before persisting `checkpoints.feldera`. When
the catalog write failed, the caller saw `Err` but the in-memory list
silently retained a checkpoint that no restart would ever recognize.

Pop the entry back off if `update_checkpoint_file` fails so the
in-memory and on-disk views stay consistent.

Adds a regression test that wraps `PosixBackend` to inject a write
failure at the catalog path and asserts the failed UUID is absent from
the in-memory list.
`gc_startup` used `.expect("Batches for a checkpoint should be
discoverable")` on the result of `gather_batches_for_checkpoint`, which
panics whenever `pspine-batches-*.dat` is malformed (truncated write,
disk corruption, partial copy, etc.). The process aborts with no path
to automatic recovery.

Replace the `.expect` with `?` so the error bubbles up through
`init_storage` to `Checkpointer::new`. Callers can then decide whether
to refuse startup or attempt repair.

Adds a regression test that writes invalid JSON into the metadata file
and asserts `Checkpointer::new` returns an error rather than panicking.

Live reproduction notes:

Setup:
- One pipeline bug with CREATE TABLE t / CREATE MATERIALIZED VIEW v
  AS SELECT val, COUNT(*) GROUP BY val. The aggregation creates a
  Spine and therefore pspine-batches-*.dat files in the checkpoint
  dir.
- Inserted 5 rows, took checkpoint `019e3...`, verified state via
  fda exec bug 'SELECT * FROM v'.
- Overwrote pspine-batches-0-1dca...integral.dat with `{not valid json`.

Before fix:
- Started pipeline. Operation failed because the pipeline failed to
  initialize. Error details: Panic inside the DBSP runtime.
- Server log: thread 'circuit-thread' panicked at
  crates/dbsp/src/circuit/checkpointer.rs:121:18: Batches for a
  checkpoint should be discoverable: JsonError("key must be a string
  at line 1 column 2").
- Exact line of the .expect() removed by this commit.

After fix (this commit):
- Same corrupt file still on disk.
- Started pipeline with --recompile so the runtime picked up the new
  dbsp code.
- Operation failed because the pipeline failed to initialize. Error
  details: DBSP error: storage error: Error deserializing JSON: key
  must be a string at line 1 column 2.
- No panic in the server log. The runtime returned a structured
  InitializationError with JsonError details. Pipeline transitioned
  to Stopped with deployment_error, ready for operator intervention.
`gc_checkpoint` used `.filter_map(...ok())` when collecting the
batch files to keep: those referenced by the checkpoints in `except`,
plus those referenced by the newest retained checkpoint not in
`except`. Any read error (corrupt JSON, IO failure) was
silently dropped, leaving those batches unprotected. The subsequent
loop could then delete batches that the retained checkpoint still
needed.

Replace both swallowing pipelines with explicit `?` propagation so GC
aborts before deleting anything it cannot prove safe. Errors flow up
to the controller, which logs them as an ERROR; the pipeline itself
keeps running, so a transient gc problem never tears down the
pipeline, but it is now visible to the operator.

Adds a regression test that corrupts the protected checkpoint's
metadata and asserts `gc_checkpoint` returns Err instead of Ok.

Live reproduction (silent data loss):

Take four checkpoints A, B, C, D in chronological order. After C is
committed, auto-gc removes A as the oldest, leaving the catalog with
{B, C}. With dependencies.json from each retained checkpoint:

    |B| = 12 batches, |C| = 18 batches
    |B intersect C| = 12 (every batch in B is also referenced by C)
    |B - C|         = 0  (nothing is unique to B)

Snapshot of disk: all 12 shared files present.

Overwrite every C/pspine-batches-*.dat with `{not valid json`, then
commit D. The controller calls gc_checkpoint with except = {} (no
sync configured). list = {B, C, D}:

  - to_remove = {B}, retain = {C, D}.
  - The two .filter_map(...ok()) sites swallow C's gather_batches
    error. batch_files_to_keep stays empty.
  - to_remove loop: B's gather_batches succeeds (B's metadata fine).
    B.difference(empty) = all 12 of B's batches. remove_batch_file
    deletes each.
  - gc returns Ok; no warning, no error log line.
  - Pipeline keeps running; the next checkpoint commits fine.

After the buggy gc:
  - Catalog still advertises C with its original dependencies.json.
  - Of C's 18 referenced batches, 12 are MISSING from disk (exactly
    B intersect C). C is silently destroyed: still listed, but its
    batch files are gone.
  - Restoring from C would now NotFound on 12 batches.
  - Operator has no signal anything went wrong.

With the fix:
  - Same scenario, same pipeline behaviour at the surface (still
    running, no panic, no failed-checkpoint exposure).
  - gather_batches(C) returns Err. gc_checkpoint returns Err before
    the delete loop runs.
  - Controller logs:
      ERROR dbsp_adapters::controller: error removing old checkpoints:
      storage error: Error deserializing JSON: key must be a string
      at line 1 column 2
  - No batches deleted; C's referenced files remain on disk, so a
    manual repair of C's pspine-batches is still possible.
@gz gz requested review from abhizer and blp May 17, 2026 07:06
@gz gz changed the title Fix/dbsp storage durability bugs Fix dbsp storage durability bugs/hardening May 17, 2026
@gz gz requested review from abhizer and removed request for abhizer May 17, 2026 07:11
Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice durability hardening pass — clean separation of GC truth (dependencies.json), intactness verification, fsync barriers, and rkyv validation. Good test coverage that actually injects the failure modes rather than just exercising the happy path. A few things to consider inline; none are hard blockers.

Comment thread crates/feldera-types/src/checkpoint.rs
Comment thread crates/feldera-types/src/checkpoint.rs Outdated
Comment thread crates/dbsp/src/circuit/checkpointer.rs
Comment thread crates/dbsp/src/storage/backend/posixio_impl.rs Outdated
Comment thread crates/dbsp/src/trace/spine_async.rs Outdated
Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid bug hunt — every fix lands with a regression test and the commit messages document each scenario with live repro. A few observations inline; nothing I'd block on.

Comment thread crates/dbsp/src/circuit/checkpointer.rs Outdated
Comment thread crates/dbsp/src/circuit/circuit_builder.rs Outdated
Comment thread crates/dbsp/src/storage/backend/posixio_impl.rs Outdated
Comment thread crates/feldera-types/src/checkpoint.rs Outdated
Comment thread crates/dbsp/src/circuit/checkpointer.rs Outdated
Comment thread crates/dbsp/src/circuit/checkpointer.rs
Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

gz added 7 commits May 17, 2026 02:42
Treat a missing `checkpoints.feldera` alongside surviving UUID-shaped
subdirectories as a corrupt state instead of a clean slate.

Previously `Checkpointer::new` accepted a missing catalog as "no
checkpoints yet" and proceeded. With an empty checkpoint list,
`gc_startup` then recursively deleted every directory in the storage
root, including UUID-named directories holding otherwise-intact
checkpoints.

`read_checkpoints` now scans the storage root when the catalog is
missing. If any UUID-shaped subdirectories exist, it returns
`InvalidData` so the caller can refuse to start and an operator can
recover manually. The original behavior (empty list, no error) is
preserved only when the directory has no orphan UUID dirs.

Live reproduction:

Pipeline with CREATE TABLE t / CREATE MATERIALIZED VIEW v AS
SELECT val, COUNT(*) GROUP BY val. Inserted rows, took two
fda checkpoints. Storage dir contained:

  - 2 UUID checkpoint dirs (the two retained checkpoints)
  - 18 w*.feldera batch files at the root, referenced by both
    checkpoints' dependencies.json
  - checkpoints.feldera catalog listing the two UUIDs

Without the fix:
  - fda stop bug3
  - rm checkpoints.feldera
  - fda start bug3
  - gc_startup runs with an empty checkpoint_list. Every directory
    in the storage root that doesn't appear in in_use_paths is
    delete_recursive'd; every w*.feldera not referenced (none, since
    in_use_paths has no batches) is unlinked.
  - After startup: 0 UUID dirs, 0 batch files. The pipeline then
    fails to bootstrap because there's no state.

With the fix:
  - Same scenario.
  - read_checkpoints scans the root, finds the 2 orphan UUID-shaped
    subdirectories, and returns InvalidData before gc_startup runs.
  - Pipeline initialization fails with:
      Error details: DBSP error: storage error: checkpoints.feldera:
      checkpoint catalog missing while UUID directories remain on
      disk failed: invalid data.
  - All 2 UUID dirs and 18 batch files survive intact. An operator
    can restore the catalog from backup or reconstruct it from the
    surviving checkpoint directories.
A checkpoint records its batch list in two places: scattered across
per-spine `pspine-batches-<persistent_id>.dat` files (one per spine
x worker), and consolidated into a single `dependencies.json`
written at the end of `Checkpointer::commit`. Both files hold the
same information.

`gather_batches_for_checkpoint_uuid` walked the per-spine files,
enumerating every `pspine-batches-*.dat` in the checkpoint dir and
unioning the batches each lists. Switch to reading the consolidated
`dependencies.json` instead: one file. Fall back to the per-spine
scan only when `dependencies.json` is missing, preserving
compatibility with checkpoints written before that file existed.
The per-spine files can be retired later once we are sure no
in-flight checkpoints predate `dependencies.json`.

A side effect of consolidating onto the single file is that GC now
survives loss of any individual per-spine file. The regression
tests cover both directions:

  * `missing_pspine_batches_preserves_batches`: drops the per-spine
    files between commits; `dependencies.json` carries the batch
    list, GC keeps the referenced batches.
  * `legacy_checkpoint_without_dependencies_json_preserves_batches`:
    drops `dependencies.json`, leaves the per-spine files; the
    fallback path still gathers the batch list correctly.

Live walkthrough:

Pipeline with CREATE TABLE t / CREATE MATERIALIZED VIEW v AS
SELECT val, COUNT(*) GROUP BY val. Inserted five rows
((1,10), (2,10), (3,20), (4,30), (5,30)) and took one fda
checkpoint. Storage dir contained:

  - 1 UUID checkpoint dir holding 32 pspine-batches-*.dat per-spine
    files (one per spine x worker) and a dependencies.json listing
    14 batches
  - 14 w*.feldera batch files at the storage root
  - checkpoints.feldera catalog listing the UUID

  - fda stop bug4
  - rm <checkpoint-dir>/pspine-batches-*.dat
  - fda start bug4
  - gather_batches_for_checkpoint_uuid reads dependencies.json,
    gets all 14 batch names, and adds them to in_use_paths. The
    root sweep skips them. The fallback per-spine scan is not
    consulted because dependencies.json is present.
  - After startup: 14 w*.feldera at root, pipeline status Running.
  - The materialized view restored from the checkpoint reflects all
    5 rows:
        val=10 -> n=2 (ids 1, 2)
        val=20 -> n=1 (id 3)
        val=30 -> n=2 (ids 4, 5)
An operator's `restore` returning NotFound was indistinguishable
from "this operator is new since the last checkpoint". Storage
corruption (a state file that was committed
but vanished from disk) and circuit evolution (an operator that
didn't exist when the checkpoint was taken) both routed to the
same backfill path, producing this opaque error after restart:

    The pipeline's query plan was modified since the last
    checkpoint and requires bootstrapping; however we were not
    able to detect the changes in the pipeline metadata.

`dependencies.json` previously listed only the w*.feldera batch
files referenced at the storage root. Record every per-operator
state file in the checkpoint dir alongside the batches, in a new
backward-compatible format. Restore now diffs the recorded
state-file list against the actual directory contents and refuses
to start when any are missing, instead of falling through to
backfill.

Live reproduction:

Pipeline with CREATE TABLE t / CREATE MATERIALIZED VIEW v AS
SELECT val, COUNT(*) GROUP BY val. Inserted 5 rows, took a
checkpoint, stopped. Deleted the per-worker integral spine state
file across all 8 workers (the same logical spine, every shard):

    rm <cp>/pspine-{0..7}-1dca...integral.dat

Without the fix:
  - Each worker independently sees NotFound on its share, adds
    the integral node to need_backfill. All 8 agree, so no
    scheduler conflict fires.
  - The bootstrap layer treats unanimous backfill as "circuit was
    modified" and fails initialization with the misleading "we
    were not able to detect the changes in the pipeline metadata"
    error. An operator chases a non-existent code bug instead of
    realizing 8 state files are missing.

With the fix:
  - dependencies.json's state-file list named those 8 files at
    commit time. The restore-time check lists the dir, finds them
    missing, returns InvalidData before restore runs.
  - Pipeline initialization fails with:
      Error details: DBSP error: storage error: <cp_uuid>:
      pspine-0-1dca...integral.dat, pspine-1-1dca...integral.dat,
      ..., pspine-7-1dca...integral.dat: checkpoint state files
      missing from checkpoint dir failed: invalid data.
Operator restore paths read a state file off disk and reinterpret
its bytes as a typed rkyv archive (`CommittedZ1`, `CommittedSpine`,
`CommittedTransactionZ1`, `CommittedWindow`). The reinterpret used
`unsafe { rkyv::archived_root::<T>(&bytes) }`, which does no
validation: the byte buffer is cast to `&Archived<T>` and accessed
directly. If the file is corrupt, the access can lead to undefined
behavior.

Switch every restore site to `rkyv::check_archived_root::<T>`.

Live reproduction:

Pipeline with CREATE TABLE t / CREATE MATERIALIZED VIEW v AS
SELECT val, COUNT(*) GROUP BY val. Inserted 5 rows, took a
checkpoint, stopped. Picked one pspine-<W>-<pid>.dat state file
and overwrote its 48 bytes with /dev/urandom. The file still
exists on disk at the right size, so the manifest check sees no
missing entries; only the content is garbage.

Without the fix:
  - Spine::restore calls archived_root on the garbage.
  - rkyv reads pointer/length fields from the garbage as if they
    were real, computes offsets out of bounds, hands the resulting
    slices to deserialize_unsized, which trips a downstream
    .unwrap() on a LayoutError.
  - Foreground worker thread 0 panics:
      panic message: called `Result::unwrap()` on an `Err` value:
      LayoutError
      panic location: rkyv-0.7.45/src/impls/core/mod.rs:266:67
         7: rkyv::impls::core::DeserializeUnsized for [T]>::deserialize_unsized
         8: Spine::restore
  - Pipeline-manager reports it as WorkerPanic. An operator sees a
    runtime crash and can't distinguish corrupt input from a
    real code bug.

With the fix:
  - check_archived_root validates the archive against the 48-byte
    buffer, finds the batches-list pointer out of range, returns
    Err.
  - Pipeline initialization fails with:
      Error details: DBSP error: storage error: <uuid>/pspine-0-...
      integral.dat: check bytes error: check failed for struct
      member batches: pointer out of bounds: base 0x... offset
      -7446907828345682421 not in range 0x...0x...: Spine checkpoint
      validation failed failed: invalid data.
  - No panic in the server log. Pipeline transitions to Stopped
    with a structured InitializationError naming the file, the
    failing field, and the bounds violation.
POSIX guarantees a file's data is durable after `fsync(file)` but says
nothing about the directory entry that points to it. `PosixWriter`
fsync'd each file and then renamed `<name>.mut` -> `<name>` without
fsyncing the parent. A crash between the rename and the kernel's
periodic directory writeback could lose the rename: the file data is
on disk but its directory entry is not, and the checkpoint vanishes.

Add an `fsync_dir` method on `StorageBackend`. `Checkpointer::commit`
invokes it at two barrier points:

  - after writing dependencies.json into the checkpoint dir, flushing
    every rename the per-operator state files made into that dir.
  - after updating checkpoints.feldera, flushing the catalog rename
    and the checkpoint dir's own entry under the storage root.

Newly-created directory hierarchies (when an operator writes the
first file into a fresh checkpoint UUID dir) still fsync the created
chain immediately, since the dir must exist durably before any rename
into it can be made durable.
Two writers were not committed alongside the rest of the checkpoint:

* `Checkpointer::commit` wrote the per-UUID `CHECKPOINT` marker via
  `backend.write(...)` and dropped the returned `FileCommitter`.
* `Spine::save` wrote `pspine-<id>.dat` the same way; the per-spine
  state file's committer was dropped instead of being pushed into the
  shared `files` vec processed by `CheckpointCommitter::commit`.

Both files were still fsynced inside `complete()`, so the data hit
the disk, but their durability was tied to an implementation detail
of `complete` rather than the explicit `.commit()` path the rest of
the checkpoint uses.

I'm guessing that the double `fsync` is a bug that @blp hinted at
fixing last week. In this case, it's probably better to just
explicitly commit this `CHECKPOINT` and `pspine*` files.
@gz gz force-pushed the fix/dbsp-storage-durability-bugs branch from 988f7ed to 8834ad4 Compare May 17, 2026 09:46
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants