diff --git a/crates/adapters/src/controller.rs b/crates/adapters/src/controller.rs index 49ce170204b..855b95423c1 100644 --- a/crates/adapters/src/controller.rs +++ b/crates/adapters/src/controller.rs @@ -3418,6 +3418,10 @@ impl CircuitThread { // involve ingesting some inputs from connectors. By pausing those inputs // we may prevent the transaction from ever completing. // + // Don't pause inputs if a checkpoint has already started: once it has started, + // the checkpoint may be processing in a background thread but this shouldn't + // stop the pipeline from processing inputs. + // // FIXME: the last point means that checkpoints can get delayed indefinitely // if the user runs end-to-end transactions. One possible way to solve this // in the future is to remove the notion of barriers altogether, making input @@ -3425,8 +3429,12 @@ impl CircuitThread { let coordination_request = self.controller.coordination_request.lock().unwrap().clone(); let inputs = if self.checkpoint_requested() && self.ft.is_none() + && self.running_checkpoint.is_none() && self.controller.get_transaction_state() == TransactionState::None { + tracing::debug!( + "checkpoint requested: only CheckpointBarrier inputs will be processed" + ); StepInputs::CheckpointBarriers } else if let Some(coordination_request) = &coordination_request { coordination_request.inputs diff --git a/python/tests/platform/test_output_buffer_checkpoint.py b/python/tests/platform/test_output_buffer_checkpoint.py new file mode 100644 index 00000000000..e04fba73872 --- /dev/null +++ b/python/tests/platform/test_output_buffer_checkpoint.py @@ -0,0 +1,114 @@ +""" +Regression test for issue #6100: a pipeline with enable_output_buffer=true and +max_output_buffer_time_millis=60000 stalls processing for ~60 seconds on every +checkpoint when checkpoint_interval_secs=5. + +The root cause: while a checkpoint is in progress, the circuit skips non-barrier +inputs (datagen, ad-hoc queries, etc.), so no records are processed until the +checkpoint completes. The checkpoint waits for output connectors to finish +transmitting records up to the checkpoint threshold, but with output buffering +enabled the connector holds records in memory until the buffer timeout expires. +The fix lets the pipeline continue processing inputs while the checkpoint runs +in the background. +""" + +import time +import uuid + +from feldera import PipelineBuilder +from feldera.enums import FaultToleranceModel +from feldera.runtime_config import RuntimeConfig +from feldera.testutils import ( + FELDERA_TEST_NUM_HOSTS, + FELDERA_TEST_NUM_WORKERS, + enterprise_only, + single_host_only, +) +from tests import TEST_CLIENT + +from .helper import gen_pipeline_name + + +@enterprise_only +@single_host_only +@gen_pipeline_name +def test_output_buffer_does_not_stall_checkpoint(pipeline_name): + """Throughput must not drop to zero while automated checkpointing is active.""" + output_path = f"/tmp/feldera_ob_{uuid.uuid4().hex}.jsonl" + + sql = f""" +CREATE TABLE t (id BIGINT NOT NULL PRIMARY KEY) +WITH ( + 'connectors' = '[{{ + "transport": {{ + "name": "datagen", + "config": {{"plan": [{{"rate": 10000}}]}} + }} + }}]' +); + +CREATE MATERIALIZED VIEW v +WITH ( + 'connectors' = '[{{ + "transport": {{ + "name": "file_output", + "config": {{"path": "{output_path}"}} + }}, + "format": {{"name": "json"}}, + "enable_output_buffer": true, + "max_output_buffer_time_millis": 60000 + }}]' +) AS SELECT * FROM t; +""".strip() + + pipeline = PipelineBuilder( + TEST_CLIENT, + pipeline_name, + sql, + runtime_config=RuntimeConfig( + workers=FELDERA_TEST_NUM_WORKERS, + hosts=FELDERA_TEST_NUM_HOSTS, + fault_tolerance_model=FaultToleranceModel.AtLeastOnce, + checkpoint_interval_secs=5, + ), + ).create_or_replace() + + pipeline.start() + + interval_s = 5.0 + num_intervals = 3 + total_s = interval_s * num_intervals + + try: + # Wait for the pipeline to start processing. + deadline = time.monotonic() + total_s + while time.monotonic() < deadline: + if (pipeline.stats().global_metrics.total_processed_records or 0) > 0: + break + time.sleep(0.5) + + # Sample total_processed_records once per checkpoint interval. + # Checkpoints fire at ~5 s intervals, so at least 2 checkpoints occur. + # Without the fix, each checkpoint stalls the circuit for ~60 s, and + # some windows show zero new records. + samples = [pipeline.stats().global_metrics.total_processed_records or 0] + for _ in range(num_intervals): + time.sleep(interval_s) + samples.append(pipeline.stats().global_metrics.total_processed_records or 0) + + import sys + + print( + f"\nprocessed_records samples (one per {interval_s}s): {samples}", + file=sys.stderr, + ) + + for i in range(1, len(samples)): + delta = samples[i] - samples[i - 1] + assert delta > 0, ( + f"Throughput dropped to zero in window {i - 1}→{i} " + f"(processed_records: {samples[i - 1]} → {samples[i]}). " + f"The output buffer likely stalled the checkpoint (issue #6100)." + ) + finally: + pipeline.stop(force=True)