Skip to content

Commit a1178aa

Browse files
committed
[adapters] Auto-tune step size for the number of workers.
Step size is the number of records pushed to the circuit from each connector. Our previous default of 10,000 records was selected before we introduced splitters and accumulators, which break up large outputs across multiple steps. Back then, a large input could easily explode causing performance and OOM issues. Nowadays, there is no real reason to keep input steps small. A reasonable default is to ingest 10K records per worker thread, which approximates how we split up the work within the circuit. This commit keeps the old `max_batch_size` setting for backward compatibility. When not specified, the new `max_worker_batch_size` setting is used to compute max batch size as `max_worker_batch_size x num_workers`. The defautl value is 10,000, meaning that by default a pipeline with 8 workers will ingest 80K records per connector per step. Why not remove input step cap altogether and ingest all buffered data at once (after all it's already kept in memory anyway)? - The InputUpsert operator is not yet implemented as a splitter and processes the entire input in one step, leading to potentially large output batches (expensive to sort!) - Very large batches can increase input/output latency, leading to the sawtooth throughput patter, which users don't like. The current solution is not ideal. We probably want to use batch size in bytes, not records as a cap. We may also want to cap input size across all connectors attached to a table, not per connector. Those improvements will require more work. Empirically, this commit improves ingestion speed 2x for pipelines with many delta connectors. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent b7c3878 commit a1178aa

File tree

8 files changed

+101
-38
lines changed

8 files changed

+101
-38
lines changed

crates/adapters/src/adhoc/table.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ use datafusion::physical_plan::{
2929
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
3030
};
3131
use feldera_types::config::{
32-
ConnectorConfig, FormatConfig, InputEndpointConfig, TransportConfig, default_max_batch_size,
33-
default_max_queued_records,
32+
ConnectorConfig, FormatConfig, InputEndpointConfig, TransportConfig, default_max_queued_records,
3433
};
3534
use feldera_types::program_schema::SqlIdentifier;
3635
use feldera_types::serde_with_context::serde_config::{
@@ -287,7 +286,8 @@ impl DataSink for AdHocTableSink {
287286
}),
288287
index: None,
289288
output_buffer_config: Default::default(),
290-
max_batch_size: default_max_batch_size(),
289+
max_batch_size: None,
290+
max_worker_batch_size: None,
291291
max_queued_records: default_max_queued_records(),
292292
paused: false,
293293
labels: vec![],

crates/adapters/src/controller.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,8 @@ pub use feldera_types::config::{
159159
RuntimeConfig, TransportConfig,
160160
};
161161
use feldera_types::config::{
162-
FileBackendConfig, FtConfig, FtModel, OutputBufferConfig, StorageBackendConfig, SyncConfig,
162+
DEFAULT_MAX_WORKER_BATCH_SIZE, FileBackendConfig, FtConfig, FtModel, OutputBufferConfig,
163+
StorageBackendConfig, SyncConfig,
163164
};
164165
use feldera_types::constants::{STATE_FILE, STEPS_FILE};
165166
use feldera_types::format::json::{JsonFlavor, JsonParserConfig, JsonUpdateFormat};
@@ -4786,6 +4787,8 @@ pub struct ControllerInner {
47864787
// The mutex is acquired from async context by actix and
47874788
// from the sync context by the circuit thread.
47884789
transaction_info: Mutex<TransactionInfo>,
4790+
4791+
/// Workers local to this host.
47894792
workers: Range<usize>,
47904793

47914794
/// Current transaction number.
@@ -4946,6 +4949,32 @@ impl ControllerInner {
49464949
))
49474950
}
49484951

4952+
/// Compute max_batch_size for a connector.
4953+
///
4954+
/// `max_batch_size` is a (soft) bound on the number of records ingested in one step from
4955+
/// the connector.
4956+
///
4957+
/// If the connector config specifies a `max_batch_size`, it is used as is.
4958+
///
4959+
/// Otherwise, `max_batch_size` is computed as the number of workers times `config.max_worker_batch_size` (if specified)
4960+
/// or `DEFAULT_MAX_WORKER_BATCH_SIZE` otherwise.
4961+
pub fn max_connector_batch_size(&self, connector_config: &ConnectorConfig) -> usize {
4962+
if let Some(max_batch_size) = connector_config.max_batch_size {
4963+
return max_batch_size as usize;
4964+
};
4965+
4966+
let num_local_workers = std::cmp::max(self.workers.len(), 1);
4967+
4968+
let max_worker_batch_size =
4969+
if let Some(max_worker_batch_size) = connector_config.max_worker_batch_size {
4970+
max_worker_batch_size as usize
4971+
} else {
4972+
DEFAULT_MAX_WORKER_BATCH_SIZE as usize
4973+
};
4974+
4975+
max_worker_batch_size * num_local_workers
4976+
}
4977+
49494978
fn last_checkpoint(&self) -> LastCheckpoint {
49504979
self.last_checkpoint.lock().unwrap().clone()
49514980
}
@@ -6224,11 +6253,13 @@ impl InputProbe {
62246253
connector_config: &ConnectorConfig,
62256254
controller: Arc<ControllerInner>,
62266255
) -> Self {
6256+
let max_batch_size = controller.max_connector_batch_size(connector_config);
6257+
62276258
Self {
62286259
endpoint_id,
62296260
endpoint_name: endpoint_name.to_owned(),
62306261
controller,
6231-
max_batch_size: connector_config.max_batch_size as usize,
6262+
max_batch_size,
62326263
transaction_in_progress: AtomicBool::new(false),
62336264
}
62346265
}

crates/adapters/src/server.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,7 @@ use feldera_types::runtime_status::{
7272
use feldera_types::suspend::{SuspendError, SuspendableResponse};
7373
use feldera_types::time_series::TimeSeries;
7474
use feldera_types::{
75-
checkpoint::CheckpointMetadata,
76-
config::{TransportConfig, default_max_batch_size},
77-
transport::http::HttpInputConfig,
75+
checkpoint::CheckpointMetadata, config::TransportConfig, transport::http::HttpInputConfig,
7876
};
7977
use feldera_types::{query::AdhocQueryArgs, transport::http::SERVER_PORT_FILE};
8078
use futures::StreamExt;
@@ -1997,7 +1995,8 @@ async fn create_http_input_endpoint(
19971995
format: Some(format),
19981996
index: None,
19991997
output_buffer_config: Default::default(),
2000-
max_batch_size: default_max_batch_size(),
1998+
max_batch_size: None,
1999+
max_worker_batch_size: None,
20012000
max_queued_records: HttpInputTransport::default_max_buffered_records(),
20022001
paused: false,
20032002
labels: vec![],
@@ -2177,7 +2176,8 @@ async fn output_endpoint(
21772176
)?),
21782177
index: None,
21792178
output_buffer_config: Default::default(),
2180-
max_batch_size: default_max_batch_size(),
2179+
max_batch_size: None,
2180+
max_worker_batch_size: None,
21812181
max_queued_records: HttpOutputTransport::default_max_buffered_records(),
21822182
paused: false,
21832183
labels: vec![],

crates/adapters/src/transport/clock.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ pub fn now_endpoint_config(config: &PipelineConfig) -> InputEndpointConfig {
6969
}),
7070
index: None,
7171
output_buffer_config: OutputBufferConfig::default(),
72-
max_batch_size: 1,
72+
max_batch_size: Some(1),
73+
max_worker_batch_size: None,
7374
// This must be >1; otherwise the controller will pause the connector after every input.
7475
max_queued_records: 2,
7576
paused: false,

crates/adapters/src/transport/kafka/ft/test.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use feldera_macros::IsNone;
2727
use feldera_sqllib::{ByteArray, SqlString, Variant};
2828
use feldera_types::config::{
2929
ConnectorConfig, FormatConfig, FtModel, InputEndpointConfig, OutputBufferConfig,
30-
TransportConfig, default_max_batch_size, default_max_queued_records,
30+
TransportConfig, default_max_queued_records,
3131
};
3232
use feldera_types::deserialize_table_record;
3333
use feldera_types::program_schema::{ColumnType, Field, Relation, SqlIdentifier};
@@ -1396,7 +1396,8 @@ fn test_offset(
13961396
}),
13971397
index: None,
13981398
output_buffer_config: OutputBufferConfig::default(),
1399-
max_batch_size: default_max_batch_size(),
1399+
max_batch_size: None,
1400+
max_worker_batch_size: None,
14001401
max_queued_records: default_max_queued_records(),
14011402
paused: false,
14021403
labels: Vec::new(),
@@ -1835,7 +1836,8 @@ fn test_input_partition(
18351836
}),
18361837
index: None,
18371838
output_buffer_config: OutputBufferConfig::default(),
1838-
max_batch_size: default_max_batch_size(),
1839+
max_batch_size: None,
1840+
max_worker_batch_size: None,
18391841
max_queued_records: default_max_queued_records(),
18401842
paused: false,
18411843
labels: Vec::new(),

crates/feldera-types/src/config.rs

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,7 @@ pub const fn default_max_queued_records() -> u64 {
4343
1_000_000
4444
}
4545

46-
/// Default maximum batch size for connectors, in records.
47-
///
48-
/// If you change this then update the comment on
49-
/// [ConnectorConfig::max_batch_size].
50-
pub const fn default_max_batch_size() -> u64 {
51-
10_000
52-
}
46+
pub const DEFAULT_MAX_WORKER_BATCH_SIZE: u64 = 10_000;
5347

5448
pub const DEFAULT_CLOCK_RESOLUTION_USECS: u64 = 1_000_000;
5549

@@ -1368,22 +1362,35 @@ pub struct ConnectorConfig {
13681362
#[serde(flatten)]
13691363
pub output_buffer_config: OutputBufferConfig,
13701364

1371-
/// Maximum batch size, in records.
1365+
/// Maximum number of records from this connector to process in a single batch.
1366+
///
1367+
/// When set, this caps how many records are taken from the connector’s input
1368+
/// buffer and pushed through the circuit at once.
13721369
///
1373-
/// This is the maximum number of records to process in one batch through
1374-
/// the circuit. The time and space cost of processing a batch is
1375-
/// asymptotically superlinear in the size of the batch, but very small
1376-
/// batches are less efficient due to constant factors.
1370+
/// This is typically configured lower than `max_queued_records` to allow the
1371+
/// connector time to restart and refill its buffer while a batch is being
1372+
/// processed.
1373+
///
1374+
/// Not all input adapters honor this limit.
1375+
///
1376+
/// If this is not set, the batch size is derived from `max_worker_batch_size`.
1377+
#[serde(skip_serializing_if = "Option::is_none")]
1378+
pub max_batch_size: Option<u64>,
1379+
1380+
/// Maximum number of records processed per batch, per worker thread.
13771381
///
1378-
/// This should usually be less than `max_queued_records`, to give the
1379-
/// connector a round-trip time to restart and refill the buffer while
1380-
/// batches are being processed.
1382+
/// When `max_batch_size` is not set, this setting is used to cap
1383+
/// the number of records that can be taken from the connector’s input
1384+
/// buffer and pushed through the circuit at once. The effective batch size is computed as:
1385+
/// `max_worker_batch_size × workers`.
13811386
///
1382-
/// Some input adapters might not honor this setting.
1387+
/// This provides an alternative to `max_batch_size` that automatically adjusts batch
1388+
/// size as the number of worker threads changes to maintain constant amount of
1389+
/// work per worker per batch.
13831390
///
1384-
/// The default is 10,000.
1385-
#[serde(default = "default_max_batch_size")]
1386-
pub max_batch_size: u64,
1391+
/// Defaults to 10,000 records per worker.
1392+
#[serde(skip_serializing_if = "Option::is_none")]
1393+
pub max_worker_batch_size: Option<u64>,
13871394

13881395
/// Backpressure threshold.
13891396
///

crates/pipeline-manager/src/db/types/program.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -888,7 +888,8 @@ mod tests {
888888
format: None,
889889
index: None,
890890
output_buffer_config: Default::default(),
891-
max_batch_size: 0,
891+
max_batch_size: Some(0),
892+
max_worker_batch_size: None,
892893
max_queued_records: 0,
893894
paused: false,
894895
labels: vec![],

docs.feldera.com/docs/connectors/index.mdx

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,31 @@ The following attributes are common to all connectors:
116116
circuit pauses execution until the backlog subsides. By default,
117117
this is 1,000,000.
118118

119-
* <a name="max_batch_size">`max_batch_size`</a> - For an input
120-
connector, the approximate maximum number of records that the
121-
pipeline will process in a single pipeline step. By default, this
122-
is 10,000.
119+
* <a name="max_batch_size">`max_batch_size`</a> - Maximum number of records from this connector to process in a single batch.
120+
121+
When set, this caps how many records are taken from the connector’s input
122+
buffer and pushed through the circuit at once.
123+
124+
This is typically configured lower than `max_queued_records` to allow the
125+
connector time to restart and refill its buffer while a batch is being
126+
processed.
127+
128+
Not all input adapters honor this limit.
129+
130+
If this is not set, the batch size is derived from `max_worker_batch_size`.
131+
132+
* <a name="max_worker_batch_size">`max_worker_batch_size`</a> - Maximum number of records processed per batch, per worker thread.
133+
134+
When `max_batch_size` is not set, this setting is used to cap
135+
the number of records that can be taken from the connector’s input
136+
buffer and pushed through the circuit at once. The effective batch size is computed as:
137+
`max_worker_batch_size × workers`.
138+
139+
This provides an alternative to `max_batch_size` that automatically adjusts batch
140+
size as the number of worker threads changes to maintain constant amount of
141+
work per worker per batch.
142+
143+
Defaults to 10,000 records per worker.
123144

124145
* `index`*(Output connectors only)* The name of an index created by a SQL
125146
CREATE INDEX statement that defines

0 commit comments

Comments
 (0)