[adapters] Avoid merge backpressure in the output buffer#6442
Open
ryzhyk wants to merge 3 commits into
Open
Conversation
This commit reduces the likelihood of the following situation: * A large transaction (e.g., backfill) produces a large output batch, say 1B records. This batch consists of `workers` spines, each with potentially dozens of batches. * These batches are pushed into a single spine inside the output buffer. Once the number of batches in the spine exceeds 128, backpressure kicks in. We've seen pipelines with 32 workers spend >1hr waiting for backpressure. We address this in two ways. First we insert all batches into the spine before waiting for backpressure. This is likely to trigger fewer larger merges. Second we postpone checking for backpressure until additional batches retrieved from the output queue are added to the buffer. This avoids waiting for the merger when the buffer is large enough to be sent to the connector. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
In the past, if the user set max_output_buffer_time_millis, but not max_output_buffer_size_records, then the output buffer would hold data for the specified duration regardless of its size. The user was required to also configure max_output_buffer_size_records to force a large buffer to be sent immediately. The downside of not setting max_output_buffer_size_records is that quickly adding more batches to an already large output buffer could cause expensive backpressure stalls. This commit changes the default to 10,000,000 records, meaning that once the buffer reaches this size it will be send immediately. The purpose of the output buffer is to avoid the small file problem for connectors such as Delta. In this type of use case, waiting for exactly max_output_buffer_time_millis is not a hard requirement, so the new default should be harmless. However it is a behavioral change, which I documented in the changelog. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
c186e35 to
db5a23f
Compare
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
mihaibudiu
reviewed
Jun 10, 2026
|
|
||
| ## Unreleased | ||
|
|
||
| - The default value of `max_output_buffer_size_records` is now 10,000,000 |
Contributor
There was a problem hiding this comment.
I thought we were moving towards a world where sizes are expressed in bytes
blp
approved these changes
Jun 10, 2026
Comment on lines
+584
to
+596
| let batch = Arc::unwrap_or_clone( | ||
| batch | ||
| .as_any() | ||
| .downcast::<SerBatchImpl< | ||
| TypedBatch<T::Key, T::Val, T::R, <T::InnerTrace as DynTrace>::Batch>, | ||
| KD, | ||
| VD, | ||
| >>() | ||
| .unwrap(), | ||
| ); | ||
| self.batch | ||
| .inner_mut() | ||
| .insert_without_blocking(batch.batch.into_inner()) |
Member
There was a problem hiding this comment.
This is almost the same as insert(), so it might be worthwhile to factor out the common code.
Comment on lines
+1352
to
+1362
| fn insert_without_blocking(&mut self, batch: impl Into<Arc<Self::Batch>>) -> bool { | ||
| self.data = Self::merge( | ||
| self, | ||
| batch.into().as_ref(), | ||
| &self.key_filter, | ||
| &self.value_filter, | ||
| ) | ||
| .data; | ||
|
|
||
| false | ||
| } |
Member
There was a problem hiding this comment.
I would change insert() to a call to this new function to avoid redundant code.
mythical-fred
left a comment
There was a problem hiding this comment.
Typo in commit subject: [adaprers] → [adapters]. Worth a quick git rebase -i fix before merge since Feldera enforces linear history on main.
Otherwise LGTM — clean decomposition, good commit messages explaining the why, solid platform test.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Two improvements to avoid merge backpressure in the output buffer. See commit messages for details.
Added a platform test, since (de)serialization of connector config involves the manager.
Describe Manual Test Plan
Checklist
Breaking Changes?
Mark if you think the answer is yes for any of these components:
Describe Incompatible Changes
max_output_buffer_size_records defaults to 10M.