Skip to content

Journal replay fix#6566

Merged
ryzhyk merged 2 commits into
mainfrom
journal-replay-fix
Jul 1, 2026
Merged

Journal replay fix#6566
ryzhyk merged 2 commits into
mainfrom
journal-replay-fix

Conversation

@ryzhyk

@ryzhyk ryzhyk commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Describe Manual Test Plan

With the introduction of transactions, the model of computation changed so that
the circuit now produces one output per transaction rather than per step. As a
result the existing journal replay mechanism that runs each input step in a
separate transaction is only correct assuming that the original inputs were
also ingested with a step per transaction. But even that is no longer true
after the introduction of streaming exchange, which can cause a transaction
processing the exact same input take more or fewer steps than the original run
of the program. This broke the validation mechanism that assumed that step
numbers during recording and replay should match precisely.

The solution that this commit implements has repa

  • The journal records transaction boundaries in addition to individual steps.
  • The replay mechanisms breaks up the sequence of replayed input steps into
    transactions and commits each transaction at exactly the recorded boundary.

This restores the invariant the the outputs of the circuit during replay are
identical to the outputs produced while processing the same inputs before the
failure.

As a side effect of this fix, we also save the last transaction number as part
of the checkpoint, so after restart the circuit will resume contigous
transaction count.

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

Mark if you think the answer is yes for any of these components:

Describe Incompatible Changes

This PR introduces an incompatible change to the replay journal format; however it only affects existing pipelines in the following rare situatiions:

  • The pipeline is configured with exactly-once FT
  • The pipeline crashed with a non-empty journal and for some reason did not restart automatically
  • The user upgraded the pipeline's runtime before restarting the failed pipeline.

@blp , we will likely merge this before you come back from vacation. This could still use your review when you're back

…n test

Add `tests/platform/test_fault_tolerance.py::test_ft_input_replay_determinism`,
an enterprise-only regression test for the intermittent fault-tolerant
restart-recovery failures seen in CI.

It creates a single-host fault-tolerant pipeline (`fault_tolerance:
latest_checkpoint`, datagen input, materialized table + view) and repeatedly
force-stops (crashes) and resumes it while data is flowing. Each resume restores
the latest checkpoint and replays the input log; replay must be deterministic,
so the pipeline must return to Running every cycle. On a buggy build the input
is re-sharded across workers non-deterministically on replay and the engine
self-terminates with "Logged and replayed step N contained different numbers of
records or hashes" (PipelineTerminated), which the test reports as a recovery
failure.

The bug requires the input sharded across >= 2 workers (a single worker replays
deterministically); the test pins 2 workers as the most reliable trigger and
crashes for 15 cycles. Reproduces within the first few cycles.

Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
@ryzhyk ryzhyk requested a review from blp July 1, 2026 04:54
@ryzhyk ryzhyk added the ft Fault tolerant, distributed, and scale-out implementation label Jul 1, 2026

@mihaibudiu mihaibudiu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments explain what is going on, but the state machine is spread over a large code fragment, so it's not really easy to ascertain that there are no gaps.

Comment thread crates/adapters/src/controller/checkpoint.rs Outdated
build the input is re-sharded across workers non-deterministically on replay; the
engine detects

Logged and replayed step N contained different numbers of records or hashes

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems in the wrong place

Comment thread python/tests/platform/test_fault_tolerance.py
Comment thread crates/adapters/src/controller.rs Outdated
Comment thread crates/adapters/src/controller.rs Outdated
Comment thread crates/adapters/src/controller.rs
Comment thread crates/adapters/src/controller.rs Outdated
/// transaction must commit (with no new input) before that step is fed.
/// `input_step` suppresses input on such steps and
/// `advance_transaction_state` commits the open transaction.
fn replay_at_boundary(&self) -> bool {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this stuff is quite complicated. I wonder if this can be simplified in a future design

/// have to reproduce both the exact total record count and the XOR of every
/// per-step 64-bit hash while still ingesting different data.
#[derive(Clone, Copy, Default, PartialEq, Eq, Debug)]
struct ReplayChecksumAggregate {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these #[test] objects?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, these are always on

// step, so a host must not open a transaction on its own. Fixing
// multihost FT replay the same way needs the coordinator to drive one
// replay transaction across all hosts -- a separate change (multihost +
// fault tolerance is currently untested).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need an issue? you could refer to it here

@ryzhyk ryzhyk force-pushed the journal-replay-fix branch 3 times, most recently from ac49dd0 to 67ae582 Compare July 1, 2026 07:09

@mythical-fred mythical-fred left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coming to this after mihaibudiu's approve and ryzhyk's ping to blp. I share mihaibudiu's concern about the state machine being spread out, so I traced it end-to-end. Overall it holds together, but a few things below are worth another look before this lands (leaving as COMMENT so blp can be the final signoff on FT).

State-machine trace (single-host):

  • FtState::open sets replay_steps from Journal::list_steps and primes replay_step + replay_transaction_id.
  • Per step: input_step (checks replay_at_boundary) → step_circuit → journal write_step (or accumulate_replay_check) → push_outputnext_step(replay_consumed).
  • Transaction advance: advance_transaction_state reads replay_transaction_id vs replay_open_transaction_id and drives None → Started → Committing → None.

The three invariants that make this work are: (a) replay_at_boundary suppresses input while a boundary commit is pending, (b) next_step refuses to advance the cursor when the step wasn't consumed, and (c) advance_transaction_state treats next != open (including next.is_none()) as "commit and drain". They all line up. Where I'd like a second pair of eyes:

  • What happens when the last recorded transaction has no closing boundary in the journal (crash mid-transaction, which is exactly the motivating case). See inline on next_step. The current code commits that partial transaction after finish_replaying clears restoring and control falls through to the normal (non-replay) commit path. That's likely the intended behavior, but the changelog wording ("commits each transaction at exactly the recorded boundary") doesn't quite describe it. Worth a sentence.
  • Interaction between checkpointing and replay: is a checkpoint permitted mid-transaction during replay? RunningCheckpoint snapshots get_transaction_number(); if a checkpoint can land mid-tx, transaction_number in the checkpoint is off-by-one with respect to last_transaction_id. Everywhere I looked, checkpoints seem to require TransactionState::None, but a one-line assertion or comment naming that invariant would defend the state machine against future changes.

Journal-format compat: no #[serde(default)] on StepMetadata::transaction_id — an old journal will fail to deserialize (via rmp_serde). That matches the changelog. Fine, just noting it is a hard break, not a graceful one.

Checkpoint compat: Checkpoint::transaction_number is #[serde(default)] — old checkpoints decode with transaction_number = 0. Good.

Kafka output changes: the "exactly one non-empty batch per transaction" contract in batch_start depends on the controller skipping empty batches (the num_records > 0 guard in output_worker). That's now the correctness pivot for FT dedup — worth an inline comment on the guard tying it to the unreachable! in batch_start.

Test: test_ft_input_replay_determinism is a solid stress for the checksum-mismatch bug, but it doesn't force a mid-transaction crash across a transaction boundary (which is the state machine's hardest case). A follow-up test that pins transactions with a small commit interval and crashes mid-transaction would exercise the actual boundary logic, not just "some step got interrupted."

Small stuff inline. Deferring the final call to blp as ryzhyk requested.

/// many physical steps each commit took during recording. Fault-tolerant
/// output connectors key their exactly-once dedup on this id, which (unlike
/// the physical step number) is deterministic across replay.
pub transaction_id: TransactionId,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No #[serde(default)] here (unlike Checkpoint::transaction_number), so rmp_serde will hard-fail on any pre-existing journal record. That matches the changelog's incompatible-change note, so this is by design — flagging so the choice is explicit: there is intentionally zero graceful path for an old journal, even though the checkpoint side got a default. Fine either way, but worth confirming.

// No more steps to replay. The boundary check in
// `accumulate_replay_check` only fires when a later
// transaction begins, so verify the last one here.
self.verify_replay_transaction()?;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This handles "journal exhausted mid-transaction" (crash before the boundary was recorded — exactly the motivating case) by verifying the last transaction here and then letting finish_replaying clear restoring; the commit itself then falls through to the normal non-replay branch of advance_transaction_state. That is the correct behavior, but the changelog wording ("commits each transaction at exactly the recorded boundary") reads as if every replayed transaction has a recorded closing boundary, which the last one doesn't. Worth a sentence in the doc-comment on advance_transaction_state (or a comment right here) that spells out this final-transaction case — otherwise a future reader has to reconstruct it from three different places. This is the state-machine "gap" mihaibudiu was worried about, and it's the one I'd most like documented.

// the transaction and requires strictly increasing ids. Empty
// in-progress steps of a transaction produce no output.
if let Some(data) = data
&& num_records > 0

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The num_records > 0 guard is now load-bearing for exactly-once FT: batch_start in the Kafka FT output panics via unreachable! if a transaction number does not strictly increase, which requires "exactly one open batch per transaction", which requires this skip. Please add a cross-reference to KafkaOutputEndpoint::batch_start's comment (or vice versa), and consider an assert!(num_records > 0 || ...) invariant so a future refactor doesn't silently open an empty batch and later blow up in the Kafka producer path.

// path does.
transaction_info.replay_open_transaction_id = Some(tid);
transaction_info.last_transaction_id += 1;
let engine_tid = transaction_info.last_transaction_id;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two different counters get conflated in this log line: engine_tid here comes from last_transaction_id (fresh from 0 every restart), while the FT dedup key and the value stamped into journal records comes from self.transaction_number (restored from checkpoint). During replay these will typically not be equal — e.g. a pipeline that restored transaction_number = 42 will log "Transaction 1: replaying journaled transaction 43". Not a bug, but the log will confuse anyone triaging FT issues. Consider using the restored engine transaction_number as the display id here too, or renaming the log field.

let Some(check) = self.replay_check.take() else {
return Ok(());
};
if check.replayed != check.recorded {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aggregating over wrapping_add + XOR is fine for collision resistance across the aggregate, but worth a debug_assert! that check.recorded and check.replayed have the same key set before comparing — right now a divergence in which endpoints reported checksums would surface as a giant Debug-format diff in the error message rather than a targeted "endpoint X ingested but wasn't recorded". Cheap to add and much better error message.

// and requires all hosts to report the same transaction state each
// step, so a host must not open a transaction on its own. Fixing
// multihost FT replay the same way needs the coordinator to drive one
// replay transaction across all hosts -- a separate change (multihost +

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The !is_multihost gate silently falls through to the API/connector branch, which for a multihost FT pipeline in restoring state means the replay never starts a transaction and journaled input is fed under TransactionState::None. That is presumably safe because "multihost + FT is currently untested," but a warn! (or a hard ControllerError) when a multihost pipeline enters this code path with a non-empty journal would fail loud rather than silently doing the wrong thing when someone eventually enables it. At minimum, please open a tracking issue and link it here so this TODO doesn't get lost.

_SQL = f"""
CREATE TABLE example1 (
id BIGINT NOT NULL PRIMARY KEY,
name VARCHAR NOT NULL,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test hits the checksum-comparison path but not the transaction-boundary path — it doesn't force a crash between transactions, and the datagen config doesn't pin a small transaction size, so most cycles will replay steps within a single implicit transaction. A companion test that (a) forces auto-transactions with a small interval / batch size, and (b) crashes with a partial transaction pending would exercise advance_transaction_state's Commit-on-boundary path directly, which is the code the PR actually adds. As-is, the state machine's core new behavior is only tested by higher-level behavioral tests. Follow-up is fine.

# The bug needs the input sharded across >= 2 workers (a single worker replays
# deterministically). 2 workers is the most reliable trigger -- it reproduces
# within a few cycles, whereas higher worker counts need more cycles -- so the
# test pins 2 rather than using FELDERA_TEST_NUM_WORKERS.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"intermittency" → "nondeterminism" (per mihaibudiu's earlier comment). Also, the assertion inside recovered() will fire on the first cycle only if the pipeline enters Stopped with an error — but a healthy resume goes Initializing → Running and never enters Stopped. Might be clearer to fail explicitly on any Stopped deployment_status even without a deployment_error, since a Stopped under FT replay is by definition a bug.

With the introduction of transactions, the model of computation changed so that
the circuit now produces one output per transaction rather than per step. As a
result the existing journal replay mechanism that runs each input step in a
separate transaction is only correct assuming that the original inputs were
also ingested with a step per transaction. But even that is no longer true
after the introduction of streaming exchange, which can cause a transaction
processing the exact same input take more or fewer steps than the original run
of the program. This broke the validation mechanism that assumed that step
numbers during recording and replay should match precisely.

The solution that this commit implements has repa

- The journal records transaction boundaries in addition to individual steps.
- The replay mechanisms breaks up the sequence of replayed input steps into
  transactions and commits each transaction at exactly the recorded boundary.

This restores the invariant the the outputs of the circuit during replay are
identical to the outputs produced while processing the same inputs before the
failure.

As a side effect of this fix, we also save the last transaction number as part
of the checkpoint, so after restart the circuit will resume contigous
transaction count.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
@ryzhyk ryzhyk force-pushed the journal-replay-fix branch from 67ae582 to 592d215 Compare July 1, 2026 07:19

@blp blp left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a huge change and I'm sorry you had to do it @ryzhyk. I honestly hadn't realized that our model for journaling was wrong in the world with transactions, but this PR makes it clear that it was. Ordinarily I'd spend a lot of time on a review of this nature because it's worthwhile to do a few experiments, etc. or at least to spot-check that I can't see problems in the corner cases that I can think of. In the world that I actually live in, I did read it as carefully as I could and I didn't spot problems.

@mythical-fred mythical-fred left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rebase + follow-up commit 592d2157. My earlier COMMENT (4606622457) points, revisited against the new tip:

Addressed:

  • num_records > 0 guard is now explicitly documented at the call site as load-bearing for FT dedup ("each transaction must open exactly one output batch... requires strictly increasing ids"). Good.
  • !is_multihost gate has a proper "why" comment explaining that multihost FT replay needs coordinator-driven cross-host transactions and is a separate change (with "multihost + FT is currently untested" called out).
  • Kafka FT output was renamed end-to-end (step/next_steptransaction/next_transaction, OutputPosition::step.transaction, BatchClosed(Step)BatchClosed(u64)), so the on-wire dedup key matches the invariant instead of relying on the caller happening to pass a transaction number in a field named step. Much clearer.
  • Test docstring rewritten; "intermittency" gone.

Still open (all nits, none blocking given blp's APPROVE):

  • State-machine doc gap: the "journal exhausted mid-transaction" case (crash before the closing boundary was recorded) is still handled implicitly by verify_replay_transaction() in the None arm of next_step + finish_replaying. Correct behavior, but the invariant "last recorded transaction may have no closing boundary; it is verified on journal-exhaustion, not on a boundary transition" isn't spelled out where a future reader would look for it.
  • Log line at advance_transaction_state: Transaction {engine_tid}: replaying journaled transaction {tid} still conflates two counters (engine_tid restarts at 0 every replay, tid is the journaled/dedup id). Not wrong, but will confuse triage.
  • ReplayTransactionCheck::verify_replay_transaction compares the two HashMaps directly; a debug_assert! that recorded.keys() == replayed.keys() would surface "endpoint present in one but not the other" as its own message instead of buried inside a {:?} diff.
  • test_ft_input_replay_determinism still hits the checksum path but not the transaction-boundary path directly. A follow-up test that pins a small MaxTransactionLatency + forces a crash mid-transaction would give real coverage of the new boundary handling. Fine as a follow-up.

APPROVE.

@ryzhyk ryzhyk added this pull request to the merge queue Jul 1, 2026
@ryzhyk

ryzhyk commented Jul 1, 2026

Copy link
Copy Markdown
Contributor Author

This is a huge change and I'm sorry you had to do it @ryzhyk. I honestly hadn't realized that our model for journaling was wrong in the world with transactions, but this PR makes it clear that it was. Ordinarily I'd spend a lot of time on a review of this nature because it's worthwhile to do a few experiments, etc. or at least to spot-check that I can't see problems in the corner cases that I can think of. In the world that I actually live in, I did read it as carefully as I could and I didn't spot problems.

Yes, I agree, this could use more validation.

@github-merge-queue github-merge-queue Bot removed this pull request from the merge queue due to failed status checks Jul 1, 2026
@ryzhyk ryzhyk added this pull request to the merge queue Jul 1, 2026
Merged via the queue into main with commit 3101ecd Jul 1, 2026
1 check passed
@ryzhyk ryzhyk deleted the journal-replay-fix branch July 1, 2026 18:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ft Fault tolerant, distributed, and scale-out implementation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants