diff --git a/crates/adapterlib/src/catalog.rs b/crates/adapterlib/src/catalog.rs index 7c0864a9d6f..8c9713dc575 100644 --- a/crates/adapterlib/src/catalog.rs +++ b/crates/adapterlib/src/catalog.rs @@ -359,6 +359,10 @@ pub trait SerTrace: SerBatchReader { /// Insert a batch into the trace. fn insert(&mut self, batch: Arc); + fn insert_without_blocking(&mut self, batch: Arc) -> bool; + + fn backpressure_wait(&self); + fn as_batch_reader(&self) -> &dyn SerBatchReader; } diff --git a/crates/adapters/src/controller.rs b/crates/adapters/src/controller.rs index 8e26b4895ec..7bd0745e4bf 100644 --- a/crates/adapters/src/controller.rs +++ b/crates/adapters/src/controller.rs @@ -5345,13 +5345,19 @@ impl OutputBuffer { ) { if let Some(batch) = batch { if let Some(buffer) = &mut self.buffer { + buffer.backpressure_wait(); + + // Insert all batches at once without blocking. This will help trigger fewer + // larger merges when producing a large output batch. In addition, we postpone + // waiting for backpressure until the next iteration. This increases the likelihood + // that a large batch will be sent to the connector without stalling for backpressure. for batch in batch.batches() { - buffer.insert(batch); + buffer.insert_without_blocking(batch); } } else { for batch in batch.batches() { if let Some(buffer) = self.buffer.as_mut() { - buffer.insert(batch); + buffer.insert_without_blocking(batch); } else { self.buffer = Some(batch.into_trace()); }; @@ -6606,12 +6612,6 @@ impl ControllerInner { let self_weak = Arc::downgrade(self); - endpoint_config - .connector_config - .output_buffer_config - .validate() - .map_err(|e| ControllerError::invalid_output_buffer_configuration(endpoint_name, &e))?; - // Initialize endpoint stats early so that connectors can register // batch-progress counters (or other metrics) during construction. self.status.add_output( diff --git a/crates/adapters/src/static_compile/seroutput.rs b/crates/adapters/src/static_compile/seroutput.rs index 2fca3090e54..27bc0ea43f7 100644 --- a/crates/adapters/src/static_compile/seroutput.rs +++ b/crates/adapters/src/static_compile/seroutput.rs @@ -580,6 +580,26 @@ where TOKIO.block_on(self.batch.inner_mut().insert(batch.batch.into_inner())); } + fn insert_without_blocking(&mut self, batch: Arc) -> bool { + let batch = Arc::unwrap_or_clone( + batch + .as_any() + .downcast::::Batch>, + KD, + VD, + >>() + .unwrap(), + ); + self.batch + .inner_mut() + .insert_without_blocking(batch.batch.into_inner()) + } + + fn backpressure_wait(&self) { + TOKIO.block_on(self.batch.inner().backpressure_wait()); + } + fn as_batch_reader(&self) -> &dyn SerBatchReader { self } diff --git a/crates/dbsp/src/trace.rs b/crates/dbsp/src/trace.rs index 5900a37c3ed..34afb765769 100644 --- a/crates/dbsp/src/trace.rs +++ b/crates/dbsp/src/trace.rs @@ -261,6 +261,20 @@ pub trait Trace: BatchReader { /// (asynchronously) until some of them have been merged. fn insert(&mut self, batch: impl Into>) -> impl Future; + /// Inserts a batch into the spine without blocking. Thus, this omits: + /// + /// - Spilling the batch to storage when that is a good idea. + /// + /// - Waiting until the number of batches in the spine falls below the level + /// at which we impose backpressure. The function returns true if + /// backpressure is warranted. The caller may do so afterward by calling + /// [Self::backpressure_wait]. + fn insert_without_blocking(&mut self, batch: impl Into>) -> bool; + + /// Waits for the number of batches in the spine to fall below the level at + /// which we impose backpressure. + fn backpressure_wait(&self) -> impl Future; + /// Clears the value of the "dirty" flag to `false`. /// /// The "dirty" flag is used to efficiently track changes to the trace, diff --git a/crates/dbsp/src/trace/spine_async.rs b/crates/dbsp/src/trace/spine_async.rs index a87ec98d221..692756c76fe 100644 --- a/crates/dbsp/src/trace/spine_async.rs +++ b/crates/dbsp/src/trace/spine_async.rs @@ -1954,6 +1954,34 @@ where } } + /// Inserts a batch into the spine without blocking. Thus, this omits: + /// + /// - Spilling the batch to storage when that is a good idea. The caller + /// may do this beforehand by calling [Spine::maybe_flush_batch]. + /// + /// - Waiting until the number of batches in the spine falls below the level + /// at which we impose backpressure. The function returns true if + /// backpressure is warranted. The caller may do so afterward by calling + /// [Spine::backpressure_wait]. + fn insert_without_blocking(&mut self, batch: impl Into>) -> bool { + let batch = batch.into(); + if !batch.is_empty() { + self.dirty = true; + self.merger + .add_batch(batch, false) + .batch_count() + .should_apply_backpressure() + } else { + false + } + } + + /// Waits for the number of batches in the spine to fall below the level at + /// which we impose backpressure. + async fn backpressure_wait(&self) { + self.merger.backpressure_wait().await; + } + fn clear_dirty_flag(&mut self) { self.dirty = false; } @@ -2222,34 +2250,6 @@ where } } - /// Inserts a batch into the spine without blocking. Thus, this omits: - /// - /// - Spilling the batch to storage when that is a good idea. The caller - /// may do this beforehand by calling [Spine::maybe_flush_batch]. - /// - /// - Waiting until the number of batches in the spine falls below the level - /// at which we impose backpressure. The function returns true if - /// backpressure is warranted. The caller may do so afterward by calling - /// [Spine::backpressure_wait]. - pub fn insert_without_blocking(&mut self, batch: impl Into>) -> bool { - let batch = batch.into(); - if !batch.is_empty() { - self.dirty = true; - self.merger - .add_batch(batch, false) - .batch_count() - .should_apply_backpressure() - } else { - false - } - } - - /// Waits for the number of batches in the spine to fall below the level at - /// which we impose backpressure. - pub async fn backpressure_wait(&self) { - self.merger.backpressure_wait().await; - } - /// Returns an object that can be used to wait for backpressure to be /// relieved. Returns `None` if no backpressure is needed. pub fn backpressure_waiter(&self) -> Option { diff --git a/crates/dbsp/src/trace/test/test_batch.rs b/crates/dbsp/src/trace/test/test_batch.rs index 62cca06f11f..5fb5677416f 100644 --- a/crates/dbsp/src/trace/test/test_batch.rs +++ b/crates/dbsp/src/trace/test/test_batch.rs @@ -1349,6 +1349,20 @@ where .data; } + fn insert_without_blocking(&mut self, batch: impl Into>) -> bool { + self.data = Self::merge( + self, + batch.into().as_ref(), + &self.key_filter, + &self.value_filter, + ) + .data; + + false + } + + async fn backpressure_wait(&self) {} + fn clear_dirty_flag(&mut self) {} fn dirty(&self) -> bool { diff --git a/crates/feldera-types/src/config.rs b/crates/feldera-types/src/config.rs index 264db56be07..6a2c56dcc68 100644 --- a/crates/feldera-types/src/config.rs +++ b/crates/feldera-types/src/config.rs @@ -43,6 +43,9 @@ pub use dev_tweaks::DevTweaks; const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10; +/// Default maximum number of updates to be kept in the output buffer. +const DEFAULT_MAX_OUTPUT_BUFFER_SIZE_RECORDS: usize = 10_000_000; + /// Default value of `ConnectorConfig::max_queued_records`. pub const fn default_max_queued_records() -> u64 { 1_000_000 @@ -1762,8 +1765,7 @@ pub struct OutputBufferConfig { /// total number of updates output by the pipeline. Updates to the /// same record can overwrite or cancel previous updates. /// - /// By default, the buffer can grow indefinitely until one of - /// the other output conditions is satisfied. + /// The default is 10,000,000. /// /// NOTE: this configuration option requires the `enable_output_buffer` flag /// to be set. @@ -1774,28 +1776,12 @@ impl Default for OutputBufferConfig { fn default() -> Self { Self { enable_output_buffer: false, - max_output_buffer_size_records: usize::MAX, + max_output_buffer_size_records: DEFAULT_MAX_OUTPUT_BUFFER_SIZE_RECORDS, max_output_buffer_time_millis: usize::MAX, } } } -impl OutputBufferConfig { - pub fn validate(&self) -> Result<(), String> { - if self.enable_output_buffer - && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records - && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis - { - return Err( - "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified" - .to_string(), - ); - } - - Ok(()) - } -} - /// Describes an output connector configuration #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)] pub struct OutputEndpointConfig { diff --git a/docs.feldera.com/docs/changelog.md b/docs.feldera.com/docs/changelog.md index 73375353805..c533011cbd2 100644 --- a/docs.feldera.com/docs/changelog.md +++ b/docs.feldera.com/docs/changelog.md @@ -14,11 +14,16 @@ import TabItem from '@theme/TabItem'; ## Unreleased + - The default value of `max_output_buffer_size_records` is now 10,000,000 + instead of unbounded. + - Casts of strings to Boolean and floating point values will produce runtime errors instead of legal values for illegal string values. The set of strings that can be legally converted to Booleans has been changed. + ## v0.306.0 + - No longer allowed to edit `runtime_config.resources.storage_class` if the pipeline storage is not cleared. - Calling `/start` on a pipeline that already failed to compile will directly return an error instead of diff --git a/docs.feldera.com/docs/connectors/index.mdx b/docs.feldera.com/docs/connectors/index.mdx index c2129d49731..63fc12d9832 100644 --- a/docs.feldera.com/docs/connectors/index.mdx +++ b/docs.feldera.com/docs/connectors/index.mdx @@ -209,20 +209,11 @@ output buffer: total number of updates output by the pipeline. Updates to the same record can overwrite or cancel previous updates. - When not specified, the buffer can grow indefinitely until one of - the other trigger conditions is satisfied. + When not specified, defaults to 10,000,000. This configuration option requires the `enable_output_buffer` flag to be set. -:::note - -When the `enable_output_buffer` flag is set, at least one of -`max_output_buffer_time_millis` or `max_output_buffer_size_records` must be -specified. - -::: - See [Delta Lake output connector documentation](/connectors/sinks/delta) for an example of configuring the output buffer. diff --git a/openapi.json b/openapi.json index 858c28cf589..04f1f86a63f 100644 --- a/openapi.json +++ b/openapi.json @@ -10387,8 +10387,8 @@ }, "max_output_buffer_size_records": { "type": "integer", - "description": "Maximum number of updates to be kept in the output buffer.\n\nThis parameter bounds the maximal size of the buffer.\nNote that the size of the buffer is not always equal to the\ntotal number of updates output by the pipeline. Updates to the\nsame record can overwrite or cancel previous updates.\n\nBy default, the buffer can grow indefinitely until one of\nthe other output conditions is satisfied.\n\nNOTE: this configuration option requires the `enable_output_buffer` flag\nto be set.", - "default": 18446744073709551615, + "description": "Maximum number of updates to be kept in the output buffer.\n\nThis parameter bounds the maximal size of the buffer.\nNote that the size of the buffer is not always equal to the\ntotal number of updates output by the pipeline. Updates to the\nsame record can overwrite or cancel previous updates.\n\nThe default is 10,000,000.\n\nNOTE: this configuration option requires the `enable_output_buffer` flag\nto be set.", + "default": 10000000, "minimum": 0 }, "max_output_buffer_time_millis": { diff --git a/python/tests/platform/test_output_buffer_size_limit.py b/python/tests/platform/test_output_buffer_size_limit.py new file mode 100644 index 00000000000..41a320fb438 --- /dev/null +++ b/python/tests/platform/test_output_buffer_size_limit.py @@ -0,0 +1,113 @@ +""" +Test for the default value of ``max_output_buffer_size_records``. + +Output buffering decouples the rate at which the pipeline produces changes from +the rate at which they are pushed to an output connector. The buffer is flushed +when either of two thresholds is crossed: it has held data for longer than +``max_output_buffer_time_millis`` or it has accumulated more than +``max_output_buffer_size_records`` records. + +``max_output_buffer_size_records`` defaults to 10,000,000 records, which bounds +the buffer and guarantees it is sent once it grows past that size even when no +time limit is configured. + +This test enables output buffering on a Delta Lake sink without setting a time +limit, feeds it more than 10,000,000 records, and verifies that records are +written out: the pipeline's completed-record count advances past the default +size cap. +""" + +import json + +from feldera import Pipeline, PipelineBuilder +from feldera.runtime_config import RuntimeConfig +from feldera.testutils import FELDERA_TEST_NUM_WORKERS +from tests import TEST_CLIENT +from tests.utils import DeltaTestLocation, wait_for_condition + +from .helper import gen_pipeline_name + +# Default value of ``max_output_buffer_size_records``. The buffer must flush +# once it grows past this many records, even without a time limit. +_DEFAULT_MAX_OUTPUT_BUFFER_SIZE_RECORDS = 10_000_000 + +# Number of records to generate. Chosen comfortably above the default size cap +# so that the buffer is forced to flush at least once. +_NUM_RECORDS = 12_000_000 + + +@gen_pipeline_name +def test_output_buffer_flushes_at_default_size_limit(pipeline_name): + """A buffered Delta sink with no time limit still flushes at 10M records.""" + + location = DeltaTestLocation.create(pipeline_name) + + # Use 8 writer threads so the Delta sink keeps up with the large flush. + delta_config = dict(location.connector_config) + delta_config["threads"] = 8 + + delta_connector = { + "name": "delta_out", + "transport": { + "name": "delta_table_output", + "config": delta_config, + }, + # The Delta sink needs an index on the view to write with threads > 1. + "index": "v_idx", + # Enable buffering but set neither ``max_output_buffer_time_millis`` + # nor ``max_output_buffer_size_records``, so the buffer relies on the + # default 10M size cap to flush. + "enable_output_buffer": True, + } + + # Generate incrementing primary keys so that every record is distinct and + # the buffer cannot shrink by consolidating updates to the same key. + datagen_connector = { + "transport": { + "name": "datagen", + "config": { + "workers": 4, + "plan": [ + { + "limit": _NUM_RECORDS, + "fields": {"id": {"range": [0, _NUM_RECORDS]}}, + } + ], + }, + } + } + + sql = f""" +CREATE TABLE t (id BIGINT NOT NULL PRIMARY KEY) WITH ( + 'connectors' = '{json.dumps([datagen_connector])}' +); + +CREATE MATERIALIZED VIEW v WITH ( + 'connectors' = '{json.dumps([delta_connector])}' +) AS SELECT * FROM t; + +CREATE INDEX v_idx ON v(id); +""".strip() + + pipeline: Pipeline = PipelineBuilder( + TEST_CLIENT, + name=pipeline_name, + sql=sql, + runtime_config=RuntimeConfig(workers=FELDERA_TEST_NUM_WORKERS), + ).create_or_replace() + + pipeline.start() + + try: + # The buffer flushes once it crosses the 10M default size cap, pushing + # those records through the Delta sink and advancing the completed + # count past the cap. + wait_for_condition( + "completed-record count advances past the default buffer size cap", + lambda: (pipeline.stats().global_metrics.total_completed_records or 0) + >= _DEFAULT_MAX_OUTPUT_BUFFER_SIZE_RECORDS, + timeout_s=600.0, + poll_interval_s=2.0, + ) + finally: + pipeline.stop(force=True)