Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/adapterlib/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,10 @@ pub trait SerTrace: SerBatchReader {
/// Insert a batch into the trace.
fn insert(&mut self, batch: Arc<dyn SerBatch>);

fn insert_without_blocking(&mut self, batch: Arc<dyn SerBatch>) -> bool;

fn backpressure_wait(&self);

fn as_batch_reader(&self) -> &dyn SerBatchReader;
}

Expand Down
16 changes: 8 additions & 8 deletions crates/adapters/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5345,13 +5345,19 @@ impl OutputBuffer {
) {
if let Some(batch) = batch {
if let Some(buffer) = &mut self.buffer {
buffer.backpressure_wait();

// Insert all batches at once without blocking. This will help trigger fewer
// larger merges when producing a large output batch. In addition, we postpone
// waiting for backpressure until the next iteration. This increases the likelihood
// that a large batch will be sent to the connector without stalling for backpressure.
for batch in batch.batches() {
buffer.insert(batch);
buffer.insert_without_blocking(batch);
}
} else {
for batch in batch.batches() {
if let Some(buffer) = self.buffer.as_mut() {
buffer.insert(batch);
buffer.insert_without_blocking(batch);
} else {
self.buffer = Some(batch.into_trace());
};
Expand Down Expand Up @@ -6606,12 +6612,6 @@ impl ControllerInner {

let self_weak = Arc::downgrade(self);

endpoint_config
.connector_config
.output_buffer_config
.validate()
.map_err(|e| ControllerError::invalid_output_buffer_configuration(endpoint_name, &e))?;

// Initialize endpoint stats early so that connectors can register
// batch-progress counters (or other metrics) during construction.
self.status.add_output(
Expand Down
20 changes: 20 additions & 0 deletions crates/adapters/src/static_compile/seroutput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,26 @@ where
TOKIO.block_on(self.batch.inner_mut().insert(batch.batch.into_inner()));
}

fn insert_without_blocking(&mut self, batch: Arc<dyn SerBatch>) -> bool {
let batch = Arc::unwrap_or_clone(
batch
.as_any()
.downcast::<SerBatchImpl<
TypedBatch<T::Key, T::Val, T::R, <T::InnerTrace as DynTrace>::Batch>,
KD,
VD,
>>()
.unwrap(),
);
self.batch
.inner_mut()
.insert_without_blocking(batch.batch.into_inner())
Comment on lines +584 to +596

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is almost the same as insert(), so it might be worthwhile to factor out the common code.

}

fn backpressure_wait(&self) {
TOKIO.block_on(self.batch.inner().backpressure_wait());
}

fn as_batch_reader(&self) -> &dyn SerBatchReader {
self
}
Expand Down
14 changes: 14 additions & 0 deletions crates/dbsp/src/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,20 @@ pub trait Trace: BatchReader {
/// (asynchronously) until some of them have been merged.
fn insert(&mut self, batch: impl Into<Arc<Self::Batch>>) -> impl Future<Output = ()>;

/// Inserts a batch into the spine without blocking. Thus, this omits:
///
/// - Spilling the batch to storage when that is a good idea.
///
/// - Waiting until the number of batches in the spine falls below the level
/// at which we impose backpressure. The function returns true if
/// backpressure is warranted. The caller may do so afterward by calling
/// [Self::backpressure_wait].
fn insert_without_blocking(&mut self, batch: impl Into<Arc<Self::Batch>>) -> bool;

/// Waits for the number of batches in the spine to fall below the level at
/// which we impose backpressure.
fn backpressure_wait(&self) -> impl Future<Output = ()>;

/// Clears the value of the "dirty" flag to `false`.
///
/// The "dirty" flag is used to efficiently track changes to the trace,
Expand Down
56 changes: 28 additions & 28 deletions crates/dbsp/src/trace/spine_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1954,6 +1954,34 @@ where
}
}

/// Inserts a batch into the spine without blocking. Thus, this omits:
///
/// - Spilling the batch to storage when that is a good idea. The caller
/// may do this beforehand by calling [Spine::maybe_flush_batch].
///
/// - Waiting until the number of batches in the spine falls below the level
/// at which we impose backpressure. The function returns true if
/// backpressure is warranted. The caller may do so afterward by calling
/// [Spine::backpressure_wait].
fn insert_without_blocking(&mut self, batch: impl Into<Arc<B>>) -> bool {
let batch = batch.into();
if !batch.is_empty() {
self.dirty = true;
self.merger
.add_batch(batch, false)
.batch_count()
.should_apply_backpressure()
} else {
false
}
}

/// Waits for the number of batches in the spine to fall below the level at
/// which we impose backpressure.
async fn backpressure_wait(&self) {
self.merger.backpressure_wait().await;
}

fn clear_dirty_flag(&mut self) {
self.dirty = false;
}
Expand Down Expand Up @@ -2222,34 +2250,6 @@ where
}
}

/// Inserts a batch into the spine without blocking. Thus, this omits:
///
/// - Spilling the batch to storage when that is a good idea. The caller
/// may do this beforehand by calling [Spine::maybe_flush_batch].
///
/// - Waiting until the number of batches in the spine falls below the level
/// at which we impose backpressure. The function returns true if
/// backpressure is warranted. The caller may do so afterward by calling
/// [Spine::backpressure_wait].
pub fn insert_without_blocking(&mut self, batch: impl Into<Arc<B>>) -> bool {
let batch = batch.into();
if !batch.is_empty() {
self.dirty = true;
self.merger
.add_batch(batch, false)
.batch_count()
.should_apply_backpressure()
} else {
false
}
}

/// Waits for the number of batches in the spine to fall below the level at
/// which we impose backpressure.
pub async fn backpressure_wait(&self) {
self.merger.backpressure_wait().await;
}

/// Returns an object that can be used to wait for backpressure to be
/// relieved. Returns `None` if no backpressure is needed.
pub fn backpressure_waiter(&self) -> Option<OwnedNotified> {
Expand Down
14 changes: 14 additions & 0 deletions crates/dbsp/src/trace/test/test_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,20 @@ where
.data;
}

fn insert_without_blocking(&mut self, batch: impl Into<Arc<Self::Batch>>) -> bool {
self.data = Self::merge(
self,
batch.into().as_ref(),
&self.key_filter,
&self.value_filter,
)
.data;

false
}
Comment on lines +1352 to +1362

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would change insert() to a call to this new function to avoid redundant code.


async fn backpressure_wait(&self) {}

fn clear_dirty_flag(&mut self) {}

fn dirty(&self) -> bool {
Expand Down
24 changes: 5 additions & 19 deletions crates/feldera-types/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub use dev_tweaks::DevTweaks;

const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;

/// Default maximum number of updates to be kept in the output buffer.
const DEFAULT_MAX_OUTPUT_BUFFER_SIZE_RECORDS: usize = 10_000_000;

/// Default value of `ConnectorConfig::max_queued_records`.
pub const fn default_max_queued_records() -> u64 {
1_000_000
Expand Down Expand Up @@ -1762,8 +1765,7 @@ pub struct OutputBufferConfig {
/// total number of updates output by the pipeline. Updates to the
/// same record can overwrite or cancel previous updates.
///
/// By default, the buffer can grow indefinitely until one of
/// the other output conditions is satisfied.
/// The default is 10,000,000.
///
/// NOTE: this configuration option requires the `enable_output_buffer` flag
/// to be set.
Expand All @@ -1774,28 +1776,12 @@ impl Default for OutputBufferConfig {
fn default() -> Self {
Self {
enable_output_buffer: false,
max_output_buffer_size_records: usize::MAX,
max_output_buffer_size_records: DEFAULT_MAX_OUTPUT_BUFFER_SIZE_RECORDS,
max_output_buffer_time_millis: usize::MAX,
}
}
}

impl OutputBufferConfig {
pub fn validate(&self) -> Result<(), String> {
if self.enable_output_buffer
&& self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
&& self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
{
return Err(
"when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
.to_string(),
);
}

Ok(())
}
}

/// Describes an output connector configuration
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
pub struct OutputEndpointConfig {
Expand Down
5 changes: 5 additions & 0 deletions docs.feldera.com/docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@ import TabItem from '@theme/TabItem';

## Unreleased

- The default value of `max_output_buffer_size_records` is now 10,000,000

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we were moving towards a world where sizes are expressed in bytes

instead of unbounded.

- Casts of strings to Boolean and floating point values will
produce runtime errors instead of legal values for illegal string
values. The set of strings that can be legally converted to
Booleans has been changed.

## v0.306.0

- No longer allowed to edit `runtime_config.resources.storage_class` if the pipeline storage is not cleared.

- Calling `/start` on a pipeline that already failed to compile will directly return an error instead of
Expand Down
11 changes: 1 addition & 10 deletions docs.feldera.com/docs/connectors/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -209,20 +209,11 @@ output buffer:
total number of updates output by the pipeline. Updates to the
same record can overwrite or cancel previous updates.

When not specified, the buffer can grow indefinitely until one of
the other trigger conditions is satisfied.
When not specified, defaults to 10,000,000.

This configuration option requires the `enable_output_buffer` flag
to be set.

:::note

When the `enable_output_buffer` flag is set, at least one of
`max_output_buffer_time_millis` or `max_output_buffer_size_records` must be
specified.

:::

See [Delta Lake output connector documentation](/connectors/sinks/delta)
for an example of configuring the output buffer.

Expand Down
4 changes: 2 additions & 2 deletions openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -10387,8 +10387,8 @@
},
"max_output_buffer_size_records": {
"type": "integer",
"description": "Maximum number of updates to be kept in the output buffer.\n\nThis parameter bounds the maximal size of the buffer.\nNote that the size of the buffer is not always equal to the\ntotal number of updates output by the pipeline. Updates to the\nsame record can overwrite or cancel previous updates.\n\nBy default, the buffer can grow indefinitely until one of\nthe other output conditions is satisfied.\n\nNOTE: this configuration option requires the `enable_output_buffer` flag\nto be set.",
"default": 18446744073709551615,
"description": "Maximum number of updates to be kept in the output buffer.\n\nThis parameter bounds the maximal size of the buffer.\nNote that the size of the buffer is not always equal to the\ntotal number of updates output by the pipeline. Updates to the\nsame record can overwrite or cancel previous updates.\n\nThe default is 10,000,000.\n\nNOTE: this configuration option requires the `enable_output_buffer` flag\nto be set.",
"default": 10000000,
"minimum": 0
},
"max_output_buffer_time_millis": {
Expand Down
113 changes: 113 additions & 0 deletions python/tests/platform/test_output_buffer_size_limit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""
Test for the default value of ``max_output_buffer_size_records``.

Output buffering decouples the rate at which the pipeline produces changes from
the rate at which they are pushed to an output connector. The buffer is flushed
when either of two thresholds is crossed: it has held data for longer than
``max_output_buffer_time_millis`` or it has accumulated more than
``max_output_buffer_size_records`` records.

``max_output_buffer_size_records`` defaults to 10,000,000 records, which bounds
the buffer and guarantees it is sent once it grows past that size even when no
time limit is configured.

This test enables output buffering on a Delta Lake sink without setting a time
limit, feeds it more than 10,000,000 records, and verifies that records are
written out: the pipeline's completed-record count advances past the default
size cap.
"""

import json

from feldera import Pipeline, PipelineBuilder
from feldera.runtime_config import RuntimeConfig
from feldera.testutils import FELDERA_TEST_NUM_WORKERS
from tests import TEST_CLIENT
from tests.utils import DeltaTestLocation, wait_for_condition

from .helper import gen_pipeline_name

# Default value of ``max_output_buffer_size_records``. The buffer must flush
# once it grows past this many records, even without a time limit.
_DEFAULT_MAX_OUTPUT_BUFFER_SIZE_RECORDS = 10_000_000

# Number of records to generate. Chosen comfortably above the default size cap
# so that the buffer is forced to flush at least once.
_NUM_RECORDS = 12_000_000


@gen_pipeline_name
def test_output_buffer_flushes_at_default_size_limit(pipeline_name):
"""A buffered Delta sink with no time limit still flushes at 10M records."""

location = DeltaTestLocation.create(pipeline_name)

# Use 8 writer threads so the Delta sink keeps up with the large flush.
delta_config = dict(location.connector_config)
delta_config["threads"] = 8

delta_connector = {
"name": "delta_out",
"transport": {
"name": "delta_table_output",
"config": delta_config,
},
# The Delta sink needs an index on the view to write with threads > 1.
"index": "v_idx",
# Enable buffering but set neither ``max_output_buffer_time_millis``
# nor ``max_output_buffer_size_records``, so the buffer relies on the
# default 10M size cap to flush.
"enable_output_buffer": True,
}

# Generate incrementing primary keys so that every record is distinct and
# the buffer cannot shrink by consolidating updates to the same key.
datagen_connector = {
"transport": {
"name": "datagen",
"config": {
"workers": 4,
"plan": [
{
"limit": _NUM_RECORDS,
"fields": {"id": {"range": [0, _NUM_RECORDS]}},
}
],
},
}
}

sql = f"""
CREATE TABLE t (id BIGINT NOT NULL PRIMARY KEY) WITH (
'connectors' = '{json.dumps([datagen_connector])}'
);

CREATE MATERIALIZED VIEW v WITH (
'connectors' = '{json.dumps([delta_connector])}'
) AS SELECT * FROM t;

CREATE INDEX v_idx ON v(id);
""".strip()

pipeline: Pipeline = PipelineBuilder(
TEST_CLIENT,
name=pipeline_name,
sql=sql,
runtime_config=RuntimeConfig(workers=FELDERA_TEST_NUM_WORKERS),
).create_or_replace()

pipeline.start()

try:
# The buffer flushes once it crosses the 10M default size cap, pushing
# those records through the Delta sink and advancing the completed
# count past the cap.
wait_for_condition(
"completed-record count advances past the default buffer size cap",
lambda: (pipeline.stats().global_metrics.total_completed_records or 0)
>= _DEFAULT_MAX_OUTPUT_BUFFER_SIZE_RECORDS,
timeout_s=600.0,
poll_interval_s=2.0,
)
finally:
pipeline.stop(force=True)
Loading