Fix dbsp storage durability bugs/hardening#6260
Open
gz wants to merge 11 commits into
Open
Conversation
`Checkpointer::commit` used to push the new entry into `self.checkpoint_list` before persisting `checkpoints.feldera`. When the catalog write failed, the caller saw `Err` but the in-memory list silently retained a checkpoint that no restart would ever recognize. Pop the entry back off if `update_checkpoint_file` fails so the in-memory and on-disk views stay consistent. Adds a regression test that wraps `PosixBackend` to inject a write failure at the catalog path and asserts the failed UUID is absent from the in-memory list.
`gc_startup` used `.expect("Batches for a checkpoint should be
discoverable")` on the result of `gather_batches_for_checkpoint`, which
panics whenever `pspine-batches-*.dat` is malformed (truncated write,
disk corruption, partial copy, etc.). The process aborts with no path
to automatic recovery.
Replace the `.expect` with `?` so the error bubbles up through
`init_storage` to `Checkpointer::new`. Callers can then decide whether
to refuse startup or attempt repair.
Adds a regression test that writes invalid JSON into the metadata file
and asserts `Checkpointer::new` returns an error rather than panicking.
Live reproduction notes:
Setup:
- One pipeline bug with CREATE TABLE t / CREATE MATERIALIZED VIEW v
AS SELECT val, COUNT(*) GROUP BY val. The aggregation creates a
Spine and therefore pspine-batches-*.dat files in the checkpoint
dir.
- Inserted 5 rows, took checkpoint `019e3...`, verified state via
fda exec bug 'SELECT * FROM v'.
- Overwrote pspine-batches-0-1dca...integral.dat with `{not valid json`.
Before fix:
- Started pipeline. Operation failed because the pipeline failed to
initialize. Error details: Panic inside the DBSP runtime.
- Server log: thread 'circuit-thread' panicked at
crates/dbsp/src/circuit/checkpointer.rs:121:18: Batches for a
checkpoint should be discoverable: JsonError("key must be a string
at line 1 column 2").
- Exact line of the .expect() removed by this commit.
After fix (this commit):
- Same corrupt file still on disk.
- Started pipeline with --recompile so the runtime picked up the new
dbsp code.
- Operation failed because the pipeline failed to initialize. Error
details: DBSP error: storage error: Error deserializing JSON: key
must be a string at line 1 column 2.
- No panic in the server log. The runtime returned a structured
InitializationError with JsonError details. Pipeline transitioned
to Stopped with deployment_error, ready for operator intervention.
`gc_checkpoint` used `.filter_map(...ok())` when collecting the
batch files to keep: those referenced by the checkpoints in `except`,
plus those referenced by the newest retained checkpoint not in
`except`. Any read error (corrupt JSON, IO failure) was
silently dropped, leaving those batches unprotected. The subsequent
loop could then delete batches that the retained checkpoint still
needed.
Replace both swallowing pipelines with explicit `?` propagation so GC
aborts before deleting anything it cannot prove safe. Errors flow up
to the controller, which logs them as an ERROR; the pipeline itself
keeps running, so a transient gc problem never tears down the
pipeline, but it is now visible to the operator.
Adds a regression test that corrupts the protected checkpoint's
metadata and asserts `gc_checkpoint` returns Err instead of Ok.
Live reproduction (silent data loss):
Take four checkpoints A, B, C, D in chronological order. After C is
committed, auto-gc removes A as the oldest, leaving the catalog with
{B, C}. With dependencies.json from each retained checkpoint:
|B| = 12 batches, |C| = 18 batches
|B intersect C| = 12 (every batch in B is also referenced by C)
|B - C| = 0 (nothing is unique to B)
Snapshot of disk: all 12 shared files present.
Overwrite every C/pspine-batches-*.dat with `{not valid json`, then
commit D. The controller calls gc_checkpoint with except = {} (no
sync configured). list = {B, C, D}:
- to_remove = {B}, retain = {C, D}.
- The two .filter_map(...ok()) sites swallow C's gather_batches
error. batch_files_to_keep stays empty.
- to_remove loop: B's gather_batches succeeds (B's metadata fine).
B.difference(empty) = all 12 of B's batches. remove_batch_file
deletes each.
- gc returns Ok; no warning, no error log line.
- Pipeline keeps running; the next checkpoint commits fine.
After the buggy gc:
- Catalog still advertises C with its original dependencies.json.
- Of C's 18 referenced batches, 12 are MISSING from disk (exactly
B intersect C). C is silently destroyed: still listed, but its
batch files are gone.
- Restoring from C would now NotFound on 12 batches.
- Operator has no signal anything went wrong.
With the fix:
- Same scenario, same pipeline behaviour at the surface (still
running, no panic, no failed-checkpoint exposure).
- gather_batches(C) returns Err. gc_checkpoint returns Err before
the delete loop runs.
- Controller logs:
ERROR dbsp_adapters::controller: error removing old checkpoints:
storage error: Error deserializing JSON: key must be a string
at line 1 column 2
- No batches deleted; C's referenced files remain on disk, so a
manual repair of C's pspine-batches is still possible.
mythical-fred
left a comment
There was a problem hiding this comment.
Nice durability hardening pass — clean separation of GC truth (dependencies.json), intactness verification, fsync barriers, and rkyv validation. Good test coverage that actually injects the failure modes rather than just exercising the happy path. A few things to consider inline; none are hard blockers.
mythical-fred
left a comment
There was a problem hiding this comment.
Solid bug hunt — every fix lands with a regression test and the commit messages document each scenario with live repro. A few observations inline; nothing I'd block on.
Treat a missing `checkpoints.feldera` alongside surviving UUID-shaped
subdirectories as a corrupt state instead of a clean slate.
Previously `Checkpointer::new` accepted a missing catalog as "no
checkpoints yet" and proceeded. With an empty checkpoint list,
`gc_startup` then recursively deleted every directory in the storage
root, including UUID-named directories holding otherwise-intact
checkpoints.
`read_checkpoints` now scans the storage root when the catalog is
missing. If any UUID-shaped subdirectories exist, it returns
`InvalidData` so the caller can refuse to start and an operator can
recover manually. The original behavior (empty list, no error) is
preserved only when the directory has no orphan UUID dirs.
Live reproduction:
Pipeline with CREATE TABLE t / CREATE MATERIALIZED VIEW v AS
SELECT val, COUNT(*) GROUP BY val. Inserted rows, took two
fda checkpoints. Storage dir contained:
- 2 UUID checkpoint dirs (the two retained checkpoints)
- 18 w*.feldera batch files at the root, referenced by both
checkpoints' dependencies.json
- checkpoints.feldera catalog listing the two UUIDs
Without the fix:
- fda stop bug3
- rm checkpoints.feldera
- fda start bug3
- gc_startup runs with an empty checkpoint_list. Every directory
in the storage root that doesn't appear in in_use_paths is
delete_recursive'd; every w*.feldera not referenced (none, since
in_use_paths has no batches) is unlinked.
- After startup: 0 UUID dirs, 0 batch files. The pipeline then
fails to bootstrap because there's no state.
With the fix:
- Same scenario.
- read_checkpoints scans the root, finds the 2 orphan UUID-shaped
subdirectories, and returns InvalidData before gc_startup runs.
- Pipeline initialization fails with:
Error details: DBSP error: storage error: checkpoints.feldera:
checkpoint catalog missing while UUID directories remain on
disk failed: invalid data.
- All 2 UUID dirs and 18 batch files survive intact. An operator
can restore the catalog from backup or reconstruct it from the
surviving checkpoint directories.
A checkpoint records its batch list in two places: scattered across
per-spine `pspine-batches-<persistent_id>.dat` files (one per spine
x worker), and consolidated into a single `dependencies.json`
written at the end of `Checkpointer::commit`. Both files hold the
same information.
`gather_batches_for_checkpoint_uuid` walked the per-spine files,
enumerating every `pspine-batches-*.dat` in the checkpoint dir and
unioning the batches each lists. Switch to reading the consolidated
`dependencies.json` instead: one file. Fall back to the per-spine
scan only when `dependencies.json` is missing, preserving
compatibility with checkpoints written before that file existed.
The per-spine files can be retired later once we are sure no
in-flight checkpoints predate `dependencies.json`.
A side effect of consolidating onto the single file is that GC now
survives loss of any individual per-spine file. The regression
tests cover both directions:
* `missing_pspine_batches_preserves_batches`: drops the per-spine
files between commits; `dependencies.json` carries the batch
list, GC keeps the referenced batches.
* `legacy_checkpoint_without_dependencies_json_preserves_batches`:
drops `dependencies.json`, leaves the per-spine files; the
fallback path still gathers the batch list correctly.
Live walkthrough:
Pipeline with CREATE TABLE t / CREATE MATERIALIZED VIEW v AS
SELECT val, COUNT(*) GROUP BY val. Inserted five rows
((1,10), (2,10), (3,20), (4,30), (5,30)) and took one fda
checkpoint. Storage dir contained:
- 1 UUID checkpoint dir holding 32 pspine-batches-*.dat per-spine
files (one per spine x worker) and a dependencies.json listing
14 batches
- 14 w*.feldera batch files at the storage root
- checkpoints.feldera catalog listing the UUID
- fda stop bug4
- rm <checkpoint-dir>/pspine-batches-*.dat
- fda start bug4
- gather_batches_for_checkpoint_uuid reads dependencies.json,
gets all 14 batch names, and adds them to in_use_paths. The
root sweep skips them. The fallback per-spine scan is not
consulted because dependencies.json is present.
- After startup: 14 w*.feldera at root, pipeline status Running.
- The materialized view restored from the checkpoint reflects all
5 rows:
val=10 -> n=2 (ids 1, 2)
val=20 -> n=1 (id 3)
val=30 -> n=2 (ids 4, 5)
An operator's `restore` returning NotFound was indistinguishable
from "this operator is new since the last checkpoint". Storage
corruption (a state file that was committed
but vanished from disk) and circuit evolution (an operator that
didn't exist when the checkpoint was taken) both routed to the
same backfill path, producing this opaque error after restart:
The pipeline's query plan was modified since the last
checkpoint and requires bootstrapping; however we were not
able to detect the changes in the pipeline metadata.
`dependencies.json` previously listed only the w*.feldera batch
files referenced at the storage root. Record every per-operator
state file in the checkpoint dir alongside the batches, in a new
backward-compatible format. Restore now diffs the recorded
state-file list against the actual directory contents and refuses
to start when any are missing, instead of falling through to
backfill.
Live reproduction:
Pipeline with CREATE TABLE t / CREATE MATERIALIZED VIEW v AS
SELECT val, COUNT(*) GROUP BY val. Inserted 5 rows, took a
checkpoint, stopped. Deleted the per-worker integral spine state
file across all 8 workers (the same logical spine, every shard):
rm <cp>/pspine-{0..7}-1dca...integral.dat
Without the fix:
- Each worker independently sees NotFound on its share, adds
the integral node to need_backfill. All 8 agree, so no
scheduler conflict fires.
- The bootstrap layer treats unanimous backfill as "circuit was
modified" and fails initialization with the misleading "we
were not able to detect the changes in the pipeline metadata"
error. An operator chases a non-existent code bug instead of
realizing 8 state files are missing.
With the fix:
- dependencies.json's state-file list named those 8 files at
commit time. The restore-time check lists the dir, finds them
missing, returns InvalidData before restore runs.
- Pipeline initialization fails with:
Error details: DBSP error: storage error: <cp_uuid>:
pspine-0-1dca...integral.dat, pspine-1-1dca...integral.dat,
..., pspine-7-1dca...integral.dat: checkpoint state files
missing from checkpoint dir failed: invalid data.
Operator restore paths read a state file off disk and reinterpret
its bytes as a typed rkyv archive (`CommittedZ1`, `CommittedSpine`,
`CommittedTransactionZ1`, `CommittedWindow`). The reinterpret used
`unsafe { rkyv::archived_root::<T>(&bytes) }`, which does no
validation: the byte buffer is cast to `&Archived<T>` and accessed
directly. If the file is corrupt, the access can lead to undefined
behavior.
Switch every restore site to `rkyv::check_archived_root::<T>`.
Live reproduction:
Pipeline with CREATE TABLE t / CREATE MATERIALIZED VIEW v AS
SELECT val, COUNT(*) GROUP BY val. Inserted 5 rows, took a
checkpoint, stopped. Picked one pspine-<W>-<pid>.dat state file
and overwrote its 48 bytes with /dev/urandom. The file still
exists on disk at the right size, so the manifest check sees no
missing entries; only the content is garbage.
Without the fix:
- Spine::restore calls archived_root on the garbage.
- rkyv reads pointer/length fields from the garbage as if they
were real, computes offsets out of bounds, hands the resulting
slices to deserialize_unsized, which trips a downstream
.unwrap() on a LayoutError.
- Foreground worker thread 0 panics:
panic message: called `Result::unwrap()` on an `Err` value:
LayoutError
panic location: rkyv-0.7.45/src/impls/core/mod.rs:266:67
7: rkyv::impls::core::DeserializeUnsized for [T]>::deserialize_unsized
8: Spine::restore
- Pipeline-manager reports it as WorkerPanic. An operator sees a
runtime crash and can't distinguish corrupt input from a
real code bug.
With the fix:
- check_archived_root validates the archive against the 48-byte
buffer, finds the batches-list pointer out of range, returns
Err.
- Pipeline initialization fails with:
Error details: DBSP error: storage error: <uuid>/pspine-0-...
integral.dat: check bytes error: check failed for struct
member batches: pointer out of bounds: base 0x... offset
-7446907828345682421 not in range 0x...0x...: Spine checkpoint
validation failed failed: invalid data.
- No panic in the server log. Pipeline transitions to Stopped
with a structured InitializationError naming the file, the
failing field, and the bounds violation.
POSIX guarantees a file's data is durable after `fsync(file)` but says
nothing about the directory entry that points to it. `PosixWriter`
fsync'd each file and then renamed `<name>.mut` -> `<name>` without
fsyncing the parent. A crash between the rename and the kernel's
periodic directory writeback could lose the rename: the file data is
on disk but its directory entry is not, and the checkpoint vanishes.
Add an `fsync_dir` method on `StorageBackend`. `Checkpointer::commit`
invokes it at two barrier points:
- after writing dependencies.json into the checkpoint dir, flushing
every rename the per-operator state files made into that dir.
- after updating checkpoints.feldera, flushing the catalog rename
and the checkpoint dir's own entry under the storage root.
Newly-created directory hierarchies (when an operator writes the
first file into a fresh checkpoint UUID dir) still fsync the created
chain immediately, since the dir must exist durably before any rename
into it can be made durable.
Two writers were not committed alongside the rest of the checkpoint: * `Checkpointer::commit` wrote the per-UUID `CHECKPOINT` marker via `backend.write(...)` and dropped the returned `FileCommitter`. * `Spine::save` wrote `pspine-<id>.dat` the same way; the per-spine state file's committer was dropped instead of being pushed into the shared `files` vec processed by `CheckpointCommitter::commit`. Both files were still fsynced inside `complete()`, so the data hit the disk, but their durability was tied to an implementation detail of `complete` rather than the explicit `.commit()` path the rest of the checkpoint uses. I'm guessing that the double `fsync` is a bug that @blp hinted at fixing last week. In this case, it's probably better to just explicitly commit this `CHECKPOINT` and `pspine*` files.
988f7ed to
8834ad4
Compare
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
See commit messages for more details. Bugs and hardening suggestions from @abhirag
Checklist
Breaking Changes?
Hopefully none, please carefully look at dependency.json format change.