From 3bc1d14da85ed99c7139e19ddfd99d5567687de8 Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Tue, 9 Jun 2026 21:39:19 -0700 Subject: [PATCH 1/3] [adaprers] Reduce merge backpressure in output buffers. This commit reduces the likelihood of the following situation: * A large transaction (e.g., backfill) produces a large output batch, say 1B records. This batch consists of `workers` spines, each with potentially dozens of batches. * These batches are pushed into a single spine inside the output buffer. Once the number of batches in the spine exceeds 128, backpressure kicks in. We've seen pipelines with 32 workers spend >1hr waiting for backpressure. We address this in two ways. First we insert all batches into the spine before waiting for backpressure. This is likely to trigger fewer larger merges. Second we postpone checking for backpressure until additional batches retrieved from the output queue are added to the buffer. This avoids waiting for the merger when the buffer is large enough to be sent to the connector. Signed-off-by: Leonid Ryzhyk --- crates/adapterlib/src/catalog.rs | 4 ++ crates/adapters/src/controller.rs | 10 +++- .../adapters/src/static_compile/seroutput.rs | 20 +++++++ crates/dbsp/src/trace.rs | 14 +++++ crates/dbsp/src/trace/spine_async.rs | 56 +++++++++---------- crates/dbsp/src/trace/test/test_batch.rs | 14 +++++ 6 files changed, 88 insertions(+), 30 deletions(-) diff --git a/crates/adapterlib/src/catalog.rs b/crates/adapterlib/src/catalog.rs index 7c0864a9d6f..8c9713dc575 100644 --- a/crates/adapterlib/src/catalog.rs +++ b/crates/adapterlib/src/catalog.rs @@ -359,6 +359,10 @@ pub trait SerTrace: SerBatchReader { /// Insert a batch into the trace. fn insert(&mut self, batch: Arc); + fn insert_without_blocking(&mut self, batch: Arc) -> bool; + + fn backpressure_wait(&self); + fn as_batch_reader(&self) -> &dyn SerBatchReader; } diff --git a/crates/adapters/src/controller.rs b/crates/adapters/src/controller.rs index 8e26b4895ec..d730b5cbf20 100644 --- a/crates/adapters/src/controller.rs +++ b/crates/adapters/src/controller.rs @@ -5345,13 +5345,19 @@ impl OutputBuffer { ) { if let Some(batch) = batch { if let Some(buffer) = &mut self.buffer { + buffer.backpressure_wait(); + + // Insert all batches at once without blocking. This will help trigger fewer + // larger merges when producing a large output batch. In addition, we postpone + // waiting for backpressure until the next iteration. This increases the likelihood + // that a large batch will be sent to the connector without stalling for backpressure. for batch in batch.batches() { - buffer.insert(batch); + buffer.insert_without_blocking(batch); } } else { for batch in batch.batches() { if let Some(buffer) = self.buffer.as_mut() { - buffer.insert(batch); + buffer.insert_without_blocking(batch); } else { self.buffer = Some(batch.into_trace()); }; diff --git a/crates/adapters/src/static_compile/seroutput.rs b/crates/adapters/src/static_compile/seroutput.rs index 2fca3090e54..27bc0ea43f7 100644 --- a/crates/adapters/src/static_compile/seroutput.rs +++ b/crates/adapters/src/static_compile/seroutput.rs @@ -580,6 +580,26 @@ where TOKIO.block_on(self.batch.inner_mut().insert(batch.batch.into_inner())); } + fn insert_without_blocking(&mut self, batch: Arc) -> bool { + let batch = Arc::unwrap_or_clone( + batch + .as_any() + .downcast::::Batch>, + KD, + VD, + >>() + .unwrap(), + ); + self.batch + .inner_mut() + .insert_without_blocking(batch.batch.into_inner()) + } + + fn backpressure_wait(&self) { + TOKIO.block_on(self.batch.inner().backpressure_wait()); + } + fn as_batch_reader(&self) -> &dyn SerBatchReader { self } diff --git a/crates/dbsp/src/trace.rs b/crates/dbsp/src/trace.rs index 5900a37c3ed..34afb765769 100644 --- a/crates/dbsp/src/trace.rs +++ b/crates/dbsp/src/trace.rs @@ -261,6 +261,20 @@ pub trait Trace: BatchReader { /// (asynchronously) until some of them have been merged. fn insert(&mut self, batch: impl Into>) -> impl Future; + /// Inserts a batch into the spine without blocking. Thus, this omits: + /// + /// - Spilling the batch to storage when that is a good idea. + /// + /// - Waiting until the number of batches in the spine falls below the level + /// at which we impose backpressure. The function returns true if + /// backpressure is warranted. The caller may do so afterward by calling + /// [Self::backpressure_wait]. + fn insert_without_blocking(&mut self, batch: impl Into>) -> bool; + + /// Waits for the number of batches in the spine to fall below the level at + /// which we impose backpressure. + fn backpressure_wait(&self) -> impl Future; + /// Clears the value of the "dirty" flag to `false`. /// /// The "dirty" flag is used to efficiently track changes to the trace, diff --git a/crates/dbsp/src/trace/spine_async.rs b/crates/dbsp/src/trace/spine_async.rs index a87ec98d221..692756c76fe 100644 --- a/crates/dbsp/src/trace/spine_async.rs +++ b/crates/dbsp/src/trace/spine_async.rs @@ -1954,6 +1954,34 @@ where } } + /// Inserts a batch into the spine without blocking. Thus, this omits: + /// + /// - Spilling the batch to storage when that is a good idea. The caller + /// may do this beforehand by calling [Spine::maybe_flush_batch]. + /// + /// - Waiting until the number of batches in the spine falls below the level + /// at which we impose backpressure. The function returns true if + /// backpressure is warranted. The caller may do so afterward by calling + /// [Spine::backpressure_wait]. + fn insert_without_blocking(&mut self, batch: impl Into>) -> bool { + let batch = batch.into(); + if !batch.is_empty() { + self.dirty = true; + self.merger + .add_batch(batch, false) + .batch_count() + .should_apply_backpressure() + } else { + false + } + } + + /// Waits for the number of batches in the spine to fall below the level at + /// which we impose backpressure. + async fn backpressure_wait(&self) { + self.merger.backpressure_wait().await; + } + fn clear_dirty_flag(&mut self) { self.dirty = false; } @@ -2222,34 +2250,6 @@ where } } - /// Inserts a batch into the spine without blocking. Thus, this omits: - /// - /// - Spilling the batch to storage when that is a good idea. The caller - /// may do this beforehand by calling [Spine::maybe_flush_batch]. - /// - /// - Waiting until the number of batches in the spine falls below the level - /// at which we impose backpressure. The function returns true if - /// backpressure is warranted. The caller may do so afterward by calling - /// [Spine::backpressure_wait]. - pub fn insert_without_blocking(&mut self, batch: impl Into>) -> bool { - let batch = batch.into(); - if !batch.is_empty() { - self.dirty = true; - self.merger - .add_batch(batch, false) - .batch_count() - .should_apply_backpressure() - } else { - false - } - } - - /// Waits for the number of batches in the spine to fall below the level at - /// which we impose backpressure. - pub async fn backpressure_wait(&self) { - self.merger.backpressure_wait().await; - } - /// Returns an object that can be used to wait for backpressure to be /// relieved. Returns `None` if no backpressure is needed. pub fn backpressure_waiter(&self) -> Option { diff --git a/crates/dbsp/src/trace/test/test_batch.rs b/crates/dbsp/src/trace/test/test_batch.rs index 62cca06f11f..5fb5677416f 100644 --- a/crates/dbsp/src/trace/test/test_batch.rs +++ b/crates/dbsp/src/trace/test/test_batch.rs @@ -1349,6 +1349,20 @@ where .data; } + fn insert_without_blocking(&mut self, batch: impl Into>) -> bool { + self.data = Self::merge( + self, + batch.into().as_ref(), + &self.key_filter, + &self.value_filter, + ) + .data; + + false + } + + async fn backpressure_wait(&self) {} + fn clear_dirty_flag(&mut self) {} fn dirty(&self) -> bool { From db5a23fa2267079bfb045f5a6690f53e1df53204 Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Wed, 10 Jun 2026 00:00:24 -0700 Subject: [PATCH 2/3] [adapters] Change default max_output_buffer_size_records to 10M. In the past, if the user set max_output_buffer_time_millis, but not max_output_buffer_size_records, then the output buffer would hold data for the specified duration regardless of its size. The user was required to also configure max_output_buffer_size_records to force a large buffer to be sent immediately. The downside of not setting max_output_buffer_size_records is that quickly adding more batches to an already large output buffer could cause expensive backpressure stalls. This commit changes the default to 10,000,000 records, meaning that once the buffer reaches this size it will be send immediately. The purpose of the output buffer is to avoid the small file problem for connectors such as Delta. In this type of use case, waiting for exactly max_output_buffer_time_millis is not a hard requirement, so the new default should be harmless. However it is a behavioral change, which I documented in the changelog. Signed-off-by: Leonid Ryzhyk --- crates/adapters/src/controller.rs | 6 - crates/feldera-types/src/config.rs | 24 +--- docs.feldera.com/docs/changelog.md | 5 + docs.feldera.com/docs/connectors/index.mdx | 11 +- .../platform/test_output_buffer_size_limit.py | 113 ++++++++++++++++++ 5 files changed, 124 insertions(+), 35 deletions(-) create mode 100644 python/tests/platform/test_output_buffer_size_limit.py diff --git a/crates/adapters/src/controller.rs b/crates/adapters/src/controller.rs index d730b5cbf20..7bd0745e4bf 100644 --- a/crates/adapters/src/controller.rs +++ b/crates/adapters/src/controller.rs @@ -6612,12 +6612,6 @@ impl ControllerInner { let self_weak = Arc::downgrade(self); - endpoint_config - .connector_config - .output_buffer_config - .validate() - .map_err(|e| ControllerError::invalid_output_buffer_configuration(endpoint_name, &e))?; - // Initialize endpoint stats early so that connectors can register // batch-progress counters (or other metrics) during construction. self.status.add_output( diff --git a/crates/feldera-types/src/config.rs b/crates/feldera-types/src/config.rs index 264db56be07..6a2c56dcc68 100644 --- a/crates/feldera-types/src/config.rs +++ b/crates/feldera-types/src/config.rs @@ -43,6 +43,9 @@ pub use dev_tweaks::DevTweaks; const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10; +/// Default maximum number of updates to be kept in the output buffer. +const DEFAULT_MAX_OUTPUT_BUFFER_SIZE_RECORDS: usize = 10_000_000; + /// Default value of `ConnectorConfig::max_queued_records`. pub const fn default_max_queued_records() -> u64 { 1_000_000 @@ -1762,8 +1765,7 @@ pub struct OutputBufferConfig { /// total number of updates output by the pipeline. Updates to the /// same record can overwrite or cancel previous updates. /// - /// By default, the buffer can grow indefinitely until one of - /// the other output conditions is satisfied. + /// The default is 10,000,000. /// /// NOTE: this configuration option requires the `enable_output_buffer` flag /// to be set. @@ -1774,28 +1776,12 @@ impl Default for OutputBufferConfig { fn default() -> Self { Self { enable_output_buffer: false, - max_output_buffer_size_records: usize::MAX, + max_output_buffer_size_records: DEFAULT_MAX_OUTPUT_BUFFER_SIZE_RECORDS, max_output_buffer_time_millis: usize::MAX, } } } -impl OutputBufferConfig { - pub fn validate(&self) -> Result<(), String> { - if self.enable_output_buffer - && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records - && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis - { - return Err( - "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified" - .to_string(), - ); - } - - Ok(()) - } -} - /// Describes an output connector configuration #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)] pub struct OutputEndpointConfig { diff --git a/docs.feldera.com/docs/changelog.md b/docs.feldera.com/docs/changelog.md index 73375353805..c533011cbd2 100644 --- a/docs.feldera.com/docs/changelog.md +++ b/docs.feldera.com/docs/changelog.md @@ -14,11 +14,16 @@ import TabItem from '@theme/TabItem'; ## Unreleased + - The default value of `max_output_buffer_size_records` is now 10,000,000 + instead of unbounded. + - Casts of strings to Boolean and floating point values will produce runtime errors instead of legal values for illegal string values. The set of strings that can be legally converted to Booleans has been changed. + ## v0.306.0 + - No longer allowed to edit `runtime_config.resources.storage_class` if the pipeline storage is not cleared. - Calling `/start` on a pipeline that already failed to compile will directly return an error instead of diff --git a/docs.feldera.com/docs/connectors/index.mdx b/docs.feldera.com/docs/connectors/index.mdx index c2129d49731..63fc12d9832 100644 --- a/docs.feldera.com/docs/connectors/index.mdx +++ b/docs.feldera.com/docs/connectors/index.mdx @@ -209,20 +209,11 @@ output buffer: total number of updates output by the pipeline. Updates to the same record can overwrite or cancel previous updates. - When not specified, the buffer can grow indefinitely until one of - the other trigger conditions is satisfied. + When not specified, defaults to 10,000,000. This configuration option requires the `enable_output_buffer` flag to be set. -:::note - -When the `enable_output_buffer` flag is set, at least one of -`max_output_buffer_time_millis` or `max_output_buffer_size_records` must be -specified. - -::: - See [Delta Lake output connector documentation](/connectors/sinks/delta) for an example of configuring the output buffer. diff --git a/python/tests/platform/test_output_buffer_size_limit.py b/python/tests/platform/test_output_buffer_size_limit.py new file mode 100644 index 00000000000..41a320fb438 --- /dev/null +++ b/python/tests/platform/test_output_buffer_size_limit.py @@ -0,0 +1,113 @@ +""" +Test for the default value of ``max_output_buffer_size_records``. + +Output buffering decouples the rate at which the pipeline produces changes from +the rate at which they are pushed to an output connector. The buffer is flushed +when either of two thresholds is crossed: it has held data for longer than +``max_output_buffer_time_millis`` or it has accumulated more than +``max_output_buffer_size_records`` records. + +``max_output_buffer_size_records`` defaults to 10,000,000 records, which bounds +the buffer and guarantees it is sent once it grows past that size even when no +time limit is configured. + +This test enables output buffering on a Delta Lake sink without setting a time +limit, feeds it more than 10,000,000 records, and verifies that records are +written out: the pipeline's completed-record count advances past the default +size cap. +""" + +import json + +from feldera import Pipeline, PipelineBuilder +from feldera.runtime_config import RuntimeConfig +from feldera.testutils import FELDERA_TEST_NUM_WORKERS +from tests import TEST_CLIENT +from tests.utils import DeltaTestLocation, wait_for_condition + +from .helper import gen_pipeline_name + +# Default value of ``max_output_buffer_size_records``. The buffer must flush +# once it grows past this many records, even without a time limit. +_DEFAULT_MAX_OUTPUT_BUFFER_SIZE_RECORDS = 10_000_000 + +# Number of records to generate. Chosen comfortably above the default size cap +# so that the buffer is forced to flush at least once. +_NUM_RECORDS = 12_000_000 + + +@gen_pipeline_name +def test_output_buffer_flushes_at_default_size_limit(pipeline_name): + """A buffered Delta sink with no time limit still flushes at 10M records.""" + + location = DeltaTestLocation.create(pipeline_name) + + # Use 8 writer threads so the Delta sink keeps up with the large flush. + delta_config = dict(location.connector_config) + delta_config["threads"] = 8 + + delta_connector = { + "name": "delta_out", + "transport": { + "name": "delta_table_output", + "config": delta_config, + }, + # The Delta sink needs an index on the view to write with threads > 1. + "index": "v_idx", + # Enable buffering but set neither ``max_output_buffer_time_millis`` + # nor ``max_output_buffer_size_records``, so the buffer relies on the + # default 10M size cap to flush. + "enable_output_buffer": True, + } + + # Generate incrementing primary keys so that every record is distinct and + # the buffer cannot shrink by consolidating updates to the same key. + datagen_connector = { + "transport": { + "name": "datagen", + "config": { + "workers": 4, + "plan": [ + { + "limit": _NUM_RECORDS, + "fields": {"id": {"range": [0, _NUM_RECORDS]}}, + } + ], + }, + } + } + + sql = f""" +CREATE TABLE t (id BIGINT NOT NULL PRIMARY KEY) WITH ( + 'connectors' = '{json.dumps([datagen_connector])}' +); + +CREATE MATERIALIZED VIEW v WITH ( + 'connectors' = '{json.dumps([delta_connector])}' +) AS SELECT * FROM t; + +CREATE INDEX v_idx ON v(id); +""".strip() + + pipeline: Pipeline = PipelineBuilder( + TEST_CLIENT, + name=pipeline_name, + sql=sql, + runtime_config=RuntimeConfig(workers=FELDERA_TEST_NUM_WORKERS), + ).create_or_replace() + + pipeline.start() + + try: + # The buffer flushes once it crosses the 10M default size cap, pushing + # those records through the Delta sink and advancing the completed + # count past the cap. + wait_for_condition( + "completed-record count advances past the default buffer size cap", + lambda: (pipeline.stats().global_metrics.total_completed_records or 0) + >= _DEFAULT_MAX_OUTPUT_BUFFER_SIZE_RECORDS, + timeout_s=600.0, + poll_interval_s=2.0, + ) + finally: + pipeline.stop(force=True) From a9446f95982e6fa9b07e43116539c0a0bb7e05c9 Mon Sep 17 00:00:00 2001 From: feldera-bot Date: Wed, 10 Jun 2026 08:52:48 +0000 Subject: [PATCH 3/3] [ci] apply automatic fixes Signed-off-by: feldera-bot --- openapi.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openapi.json b/openapi.json index 858c28cf589..04f1f86a63f 100644 --- a/openapi.json +++ b/openapi.json @@ -10387,8 +10387,8 @@ }, "max_output_buffer_size_records": { "type": "integer", - "description": "Maximum number of updates to be kept in the output buffer.\n\nThis parameter bounds the maximal size of the buffer.\nNote that the size of the buffer is not always equal to the\ntotal number of updates output by the pipeline. Updates to the\nsame record can overwrite or cancel previous updates.\n\nBy default, the buffer can grow indefinitely until one of\nthe other output conditions is satisfied.\n\nNOTE: this configuration option requires the `enable_output_buffer` flag\nto be set.", - "default": 18446744073709551615, + "description": "Maximum number of updates to be kept in the output buffer.\n\nThis parameter bounds the maximal size of the buffer.\nNote that the size of the buffer is not always equal to the\ntotal number of updates output by the pipeline. Updates to the\nsame record can overwrite or cancel previous updates.\n\nThe default is 10,000,000.\n\nNOTE: this configuration option requires the `enable_output_buffer` flag\nto be set.", + "default": 10000000, "minimum": 0 }, "max_output_buffer_time_millis": {