Journal replay fix#6566
Conversation
…n test Add `tests/platform/test_fault_tolerance.py::test_ft_input_replay_determinism`, an enterprise-only regression test for the intermittent fault-tolerant restart-recovery failures seen in CI. It creates a single-host fault-tolerant pipeline (`fault_tolerance: latest_checkpoint`, datagen input, materialized table + view) and repeatedly force-stops (crashes) and resumes it while data is flowing. Each resume restores the latest checkpoint and replays the input log; replay must be deterministic, so the pipeline must return to Running every cycle. On a buggy build the input is re-sharded across workers non-deterministically on replay and the engine self-terminates with "Logged and replayed step N contained different numbers of records or hashes" (PipelineTerminated), which the test reports as a recovery failure. The bug requires the input sharded across >= 2 workers (a single worker replays deterministically); the test pins 2 workers as the most reliable trigger and crashes for 15 cycles. Reproduces within the first few cycles. Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
mihaibudiu
left a comment
There was a problem hiding this comment.
The comments explain what is going on, but the state machine is spread over a large code fragment, so it's not really easy to ascertain that there are no gaps.
| build the input is re-sharded across workers non-deterministically on replay; the | ||
| engine detects | ||
|
|
||
| Logged and replayed step N contained different numbers of records or hashes |
There was a problem hiding this comment.
This seems in the wrong place
| /// transaction must commit (with no new input) before that step is fed. | ||
| /// `input_step` suppresses input on such steps and | ||
| /// `advance_transaction_state` commits the open transaction. | ||
| fn replay_at_boundary(&self) -> bool { |
There was a problem hiding this comment.
this stuff is quite complicated. I wonder if this can be simplified in a future design
| /// have to reproduce both the exact total record count and the XOR of every | ||
| /// per-step 64-bit hash while still ingesting different data. | ||
| #[derive(Clone, Copy, Default, PartialEq, Eq, Debug)] | ||
| struct ReplayChecksumAggregate { |
There was a problem hiding this comment.
Are these #[test] objects?
There was a problem hiding this comment.
nope, these are always on
| // step, so a host must not open a transaction on its own. Fixing | ||
| // multihost FT replay the same way needs the coordinator to drive one | ||
| // replay transaction across all hosts -- a separate change (multihost + | ||
| // fault tolerance is currently untested). |
There was a problem hiding this comment.
does this need an issue? you could refer to it here
ac49dd0 to
67ae582
Compare
mythical-fred
left a comment
There was a problem hiding this comment.
Coming to this after mihaibudiu's approve and ryzhyk's ping to blp. I share mihaibudiu's concern about the state machine being spread out, so I traced it end-to-end. Overall it holds together, but a few things below are worth another look before this lands (leaving as COMMENT so blp can be the final signoff on FT).
State-machine trace (single-host):
FtState::opensetsreplay_stepsfromJournal::list_stepsand primesreplay_step+replay_transaction_id.- Per step:
input_step(checksreplay_at_boundary) →step_circuit→ journalwrite_step(oraccumulate_replay_check) →push_output→next_step(replay_consumed). - Transaction advance:
advance_transaction_statereadsreplay_transaction_idvsreplay_open_transaction_idand drivesNone → Started → Committing → None.
The three invariants that make this work are: (a) replay_at_boundary suppresses input while a boundary commit is pending, (b) next_step refuses to advance the cursor when the step wasn't consumed, and (c) advance_transaction_state treats next != open (including next.is_none()) as "commit and drain". They all line up. Where I'd like a second pair of eyes:
- What happens when the last recorded transaction has no closing boundary in the journal (crash mid-transaction, which is exactly the motivating case). See inline on
next_step. The current code commits that partial transaction afterfinish_replayingclearsrestoringand control falls through to the normal (non-replay) commit path. That's likely the intended behavior, but the changelog wording ("commits each transaction at exactly the recorded boundary") doesn't quite describe it. Worth a sentence. - Interaction between checkpointing and replay: is a checkpoint permitted mid-transaction during replay?
RunningCheckpointsnapshotsget_transaction_number(); if a checkpoint can land mid-tx,transaction_numberin the checkpoint is off-by-one with respect tolast_transaction_id. Everywhere I looked, checkpoints seem to requireTransactionState::None, but a one-line assertion or comment naming that invariant would defend the state machine against future changes.
Journal-format compat: no #[serde(default)] on StepMetadata::transaction_id — an old journal will fail to deserialize (via rmp_serde). That matches the changelog. Fine, just noting it is a hard break, not a graceful one.
Checkpoint compat: Checkpoint::transaction_number is #[serde(default)] — old checkpoints decode with transaction_number = 0. Good.
Kafka output changes: the "exactly one non-empty batch per transaction" contract in batch_start depends on the controller skipping empty batches (the num_records > 0 guard in output_worker). That's now the correctness pivot for FT dedup — worth an inline comment on the guard tying it to the unreachable! in batch_start.
Test: test_ft_input_replay_determinism is a solid stress for the checksum-mismatch bug, but it doesn't force a mid-transaction crash across a transaction boundary (which is the state machine's hardest case). A follow-up test that pins transactions with a small commit interval and crashes mid-transaction would exercise the actual boundary logic, not just "some step got interrupted."
Small stuff inline. Deferring the final call to blp as ryzhyk requested.
| /// many physical steps each commit took during recording. Fault-tolerant | ||
| /// output connectors key their exactly-once dedup on this id, which (unlike | ||
| /// the physical step number) is deterministic across replay. | ||
| pub transaction_id: TransactionId, |
There was a problem hiding this comment.
No #[serde(default)] here (unlike Checkpoint::transaction_number), so rmp_serde will hard-fail on any pre-existing journal record. That matches the changelog's incompatible-change note, so this is by design — flagging so the choice is explicit: there is intentionally zero graceful path for an old journal, even though the checkpoint side got a default. Fine either way, but worth confirming.
| // No more steps to replay. The boundary check in | ||
| // `accumulate_replay_check` only fires when a later | ||
| // transaction begins, so verify the last one here. | ||
| self.verify_replay_transaction()?; |
There was a problem hiding this comment.
This handles "journal exhausted mid-transaction" (crash before the boundary was recorded — exactly the motivating case) by verifying the last transaction here and then letting finish_replaying clear restoring; the commit itself then falls through to the normal non-replay branch of advance_transaction_state. That is the correct behavior, but the changelog wording ("commits each transaction at exactly the recorded boundary") reads as if every replayed transaction has a recorded closing boundary, which the last one doesn't. Worth a sentence in the doc-comment on advance_transaction_state (or a comment right here) that spells out this final-transaction case — otherwise a future reader has to reconstruct it from three different places. This is the state-machine "gap" mihaibudiu was worried about, and it's the one I'd most like documented.
| // the transaction and requires strictly increasing ids. Empty | ||
| // in-progress steps of a transaction produce no output. | ||
| if let Some(data) = data | ||
| && num_records > 0 |
There was a problem hiding this comment.
The num_records > 0 guard is now load-bearing for exactly-once FT: batch_start in the Kafka FT output panics via unreachable! if a transaction number does not strictly increase, which requires "exactly one open batch per transaction", which requires this skip. Please add a cross-reference to KafkaOutputEndpoint::batch_start's comment (or vice versa), and consider an assert!(num_records > 0 || ...) invariant so a future refactor doesn't silently open an empty batch and later blow up in the Kafka producer path.
| // path does. | ||
| transaction_info.replay_open_transaction_id = Some(tid); | ||
| transaction_info.last_transaction_id += 1; | ||
| let engine_tid = transaction_info.last_transaction_id; |
There was a problem hiding this comment.
Two different counters get conflated in this log line: engine_tid here comes from last_transaction_id (fresh from 0 every restart), while the FT dedup key and the value stamped into journal records comes from self.transaction_number (restored from checkpoint). During replay these will typically not be equal — e.g. a pipeline that restored transaction_number = 42 will log "Transaction 1: replaying journaled transaction 43". Not a bug, but the log will confuse anyone triaging FT issues. Consider using the restored engine transaction_number as the display id here too, or renaming the log field.
| let Some(check) = self.replay_check.take() else { | ||
| return Ok(()); | ||
| }; | ||
| if check.replayed != check.recorded { |
There was a problem hiding this comment.
Aggregating over wrapping_add + XOR is fine for collision resistance across the aggregate, but worth a debug_assert! that check.recorded and check.replayed have the same key set before comparing — right now a divergence in which endpoints reported checksums would surface as a giant Debug-format diff in the error message rather than a targeted "endpoint X ingested but wasn't recorded". Cheap to add and much better error message.
| // and requires all hosts to report the same transaction state each | ||
| // step, so a host must not open a transaction on its own. Fixing | ||
| // multihost FT replay the same way needs the coordinator to drive one | ||
| // replay transaction across all hosts -- a separate change (multihost + |
There was a problem hiding this comment.
The !is_multihost gate silently falls through to the API/connector branch, which for a multihost FT pipeline in restoring state means the replay never starts a transaction and journaled input is fed under TransactionState::None. That is presumably safe because "multihost + FT is currently untested," but a warn! (or a hard ControllerError) when a multihost pipeline enters this code path with a non-empty journal would fail loud rather than silently doing the wrong thing when someone eventually enables it. At minimum, please open a tracking issue and link it here so this TODO doesn't get lost.
| _SQL = f""" | ||
| CREATE TABLE example1 ( | ||
| id BIGINT NOT NULL PRIMARY KEY, | ||
| name VARCHAR NOT NULL, |
There was a problem hiding this comment.
This test hits the checksum-comparison path but not the transaction-boundary path — it doesn't force a crash between transactions, and the datagen config doesn't pin a small transaction size, so most cycles will replay steps within a single implicit transaction. A companion test that (a) forces auto-transactions with a small interval / batch size, and (b) crashes with a partial transaction pending would exercise advance_transaction_state's Commit-on-boundary path directly, which is the code the PR actually adds. As-is, the state machine's core new behavior is only tested by higher-level behavioral tests. Follow-up is fine.
| # The bug needs the input sharded across >= 2 workers (a single worker replays | ||
| # deterministically). 2 workers is the most reliable trigger -- it reproduces | ||
| # within a few cycles, whereas higher worker counts need more cycles -- so the | ||
| # test pins 2 rather than using FELDERA_TEST_NUM_WORKERS. |
There was a problem hiding this comment.
"intermittency" → "nondeterminism" (per mihaibudiu's earlier comment). Also, the assertion inside recovered() will fire on the first cycle only if the pipeline enters Stopped with an error — but a healthy resume goes Initializing → Running and never enters Stopped. Might be clearer to fail explicitly on any Stopped deployment_status even without a deployment_error, since a Stopped under FT replay is by definition a bug.
With the introduction of transactions, the model of computation changed so that the circuit now produces one output per transaction rather than per step. As a result the existing journal replay mechanism that runs each input step in a separate transaction is only correct assuming that the original inputs were also ingested with a step per transaction. But even that is no longer true after the introduction of streaming exchange, which can cause a transaction processing the exact same input take more or fewer steps than the original run of the program. This broke the validation mechanism that assumed that step numbers during recording and replay should match precisely. The solution that this commit implements has repa - The journal records transaction boundaries in addition to individual steps. - The replay mechanisms breaks up the sequence of replayed input steps into transactions and commits each transaction at exactly the recorded boundary. This restores the invariant the the outputs of the circuit during replay are identical to the outputs produced while processing the same inputs before the failure. As a side effect of this fix, we also save the last transaction number as part of the checkpoint, so after restart the circuit will resume contigous transaction count. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
67ae582 to
592d215
Compare
blp
left a comment
There was a problem hiding this comment.
This is a huge change and I'm sorry you had to do it @ryzhyk. I honestly hadn't realized that our model for journaling was wrong in the world with transactions, but this PR makes it clear that it was. Ordinarily I'd spend a lot of time on a review of this nature because it's worthwhile to do a few experiments, etc. or at least to spot-check that I can't see problems in the corner cases that I can think of. In the world that I actually live in, I did read it as carefully as I could and I didn't spot problems.
mythical-fred
left a comment
There was a problem hiding this comment.
Rebase + follow-up commit 592d2157. My earlier COMMENT (4606622457) points, revisited against the new tip:
Addressed:
num_records > 0guard is now explicitly documented at the call site as load-bearing for FT dedup ("each transaction must open exactly one output batch... requires strictly increasing ids"). Good.!is_multihostgate has a proper "why" comment explaining that multihost FT replay needs coordinator-driven cross-host transactions and is a separate change (with "multihost + FT is currently untested" called out).- Kafka FT output was renamed end-to-end (
step/next_step→transaction/next_transaction,OutputPosition::step→.transaction,BatchClosed(Step)→BatchClosed(u64)), so the on-wire dedup key matches the invariant instead of relying on the caller happening to pass a transaction number in a field namedstep. Much clearer. - Test docstring rewritten; "intermittency" gone.
Still open (all nits, none blocking given blp's APPROVE):
- State-machine doc gap: the "journal exhausted mid-transaction" case (crash before the closing boundary was recorded) is still handled implicitly by
verify_replay_transaction()in theNonearm ofnext_step+finish_replaying. Correct behavior, but the invariant "last recorded transaction may have no closing boundary; it is verified on journal-exhaustion, not on a boundary transition" isn't spelled out where a future reader would look for it. - Log line at
advance_transaction_state:Transaction {engine_tid}: replaying journaled transaction {tid}still conflates two counters (engine_tidrestarts at 0 every replay,tidis the journaled/dedup id). Not wrong, but will confuse triage. ReplayTransactionCheck::verify_replay_transactioncompares the twoHashMaps directly; adebug_assert!thatrecorded.keys()==replayed.keys()would surface "endpoint present in one but not the other" as its own message instead of buried inside a{:?}diff.test_ft_input_replay_determinismstill hits the checksum path but not the transaction-boundary path directly. A follow-up test that pins a smallMaxTransactionLatency+ forces a crash mid-transaction would give real coverage of the new boundary handling. Fine as a follow-up.
APPROVE.
Yes, I agree, this could use more validation. |
Describe Manual Test Plan
With the introduction of transactions, the model of computation changed so that
the circuit now produces one output per transaction rather than per step. As a
result the existing journal replay mechanism that runs each input step in a
separate transaction is only correct assuming that the original inputs were
also ingested with a step per transaction. But even that is no longer true
after the introduction of streaming exchange, which can cause a transaction
processing the exact same input take more or fewer steps than the original run
of the program. This broke the validation mechanism that assumed that step
numbers during recording and replay should match precisely.
The solution that this commit implements has repa
transactions and commits each transaction at exactly the recorded boundary.
This restores the invariant the the outputs of the circuit during replay are
identical to the outputs produced while processing the same inputs before the
failure.
As a side effect of this fix, we also save the last transaction number as part
of the checkpoint, so after restart the circuit will resume contigous
transaction count.
Checklist
Breaking Changes?
Mark if you think the answer is yes for any of these components:
Describe Incompatible Changes
This PR introduces an incompatible change to the replay journal format; however it only affects existing pipelines in the following rare situatiions:
@blp , we will likely merge this before you come back from vacation. This could still use your review when you're back