Skip to content

[adapters] Avoid merge backpressure in the output buffer#6442

Open
ryzhyk wants to merge 3 commits into
mainfrom
non_blocking_outpu_buffer
Open

[adapters] Avoid merge backpressure in the output buffer#6442
ryzhyk wants to merge 3 commits into
mainfrom
non_blocking_outpu_buffer

Conversation

@ryzhyk

@ryzhyk ryzhyk commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

Two improvements to avoid merge backpressure in the output buffer. See commit messages for details.

Added a platform test, since (de)serialization of connector config involves the manager.

Describe Manual Test Plan

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

Mark if you think the answer is yes for any of these components:

Describe Incompatible Changes

max_output_buffer_size_records defaults to 10M.

This commit reduces the likelihood of the following situation:

* A large transaction (e.g., backfill) produces a large output batch, say
  1B records. This batch consists of `workers` spines, each with potentially dozens
  of batches.
* These batches are pushed into a single spine inside the output buffer. Once
  the number of batches in the spine exceeds 128, backpressure kicks in. We've
  seen pipelines with 32 workers spend >1hr waiting for backpressure.

We address this in two ways. First we insert all batches into the spine before
waiting for backpressure. This is likely to trigger fewer larger merges. Second
we postpone checking for backpressure until additional batches retrieved from the
output queue are added to the buffer. This avoids waiting for the merger when the
buffer is large enough to be sent to the connector.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
@ryzhyk ryzhyk requested a review from blp June 10, 2026 08:44
@ryzhyk ryzhyk added the connectors Issues related to the adapters/connectors crate label Jun 10, 2026
In the past, if the user set max_output_buffer_time_millis, but not
max_output_buffer_size_records, then the output buffer would hold data
for the specified duration regardless of its size. The user was required
to also configure max_output_buffer_size_records to force a large buffer
to be sent immediately.

The downside of not setting max_output_buffer_size_records is that
quickly adding more batches to an already large output buffer could
cause expensive backpressure stalls.

This commit changes the default to 10,000,000 records, meaning that once
the buffer reaches this size it will be send immediately.

The purpose of the output buffer is to avoid the small file problem for
connectors such as Delta. In this type of use case, waiting for exactly
max_output_buffer_time_millis is not a hard requirement, so the new
default should be harmless. However it is a behavioral change, which
I documented in the changelog.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
@ryzhyk ryzhyk force-pushed the non_blocking_outpu_buffer branch from c186e35 to db5a23f Compare June 10, 2026 08:48
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
@ryzhyk ryzhyk changed the title [adapters] Avoid merge backpress in the output buffer [adapters] Avoid merge backpressure in the output buffer Jun 10, 2026

## Unreleased

- The default value of `max_output_buffer_size_records` is now 10,000,000

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we were moving towards a world where sizes are expressed in bytes

Comment on lines +584 to +596
let batch = Arc::unwrap_or_clone(
batch
.as_any()
.downcast::<SerBatchImpl<
TypedBatch<T::Key, T::Val, T::R, <T::InnerTrace as DynTrace>::Batch>,
KD,
VD,
>>()
.unwrap(),
);
self.batch
.inner_mut()
.insert_without_blocking(batch.batch.into_inner())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is almost the same as insert(), so it might be worthwhile to factor out the common code.

Comment on lines +1352 to +1362
fn insert_without_blocking(&mut self, batch: impl Into<Arc<Self::Batch>>) -> bool {
self.data = Self::merge(
self,
batch.into().as_ref(),
&self.key_filter,
&self.value_filter,
)
.data;

false
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would change insert() to a call to this new function to avoid redundant code.

@mythical-fred mythical-fred left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in commit subject: [adaprers][adapters]. Worth a quick git rebase -i fix before merge since Feldera enforces linear history on main.

Otherwise LGTM — clean decomposition, good commit messages explaining the why, solid platform test.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

connectors Issues related to the adapters/connectors crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants