Skip to content

Commit 3179ddb

Browse files
committed
[adapters] Report batch write progress for Delta Lake output connector.
When a Delta Lake output connector writes a large batch, the write can take a long time with no visible progress. Add a `batch_records_written` metric that reports how many records have been written so far in the current batch, giving users live feedback during long writes. The metric is updated incrementally per chunk by each parallel write range and resets to 0 after the batch is committed or on failure. Forward-progress updates use `fetch_add` to atomically increment the shared counter, while rollbacks on retry use `fetch_sub` to subtract exactly the failed attempt's contribution. Batch boundaries and terminal failures use `store(0) for a clean reset. The counter is owned by encoder, e.g. DeltaTableWriter and registered with the controller's OutputEndpointMetrics via register_batch_progress_counter() during construction. To make this possible, add_output() is called before encoder creation so the metrics slot exists when the connector is built. Connectors that don't register a counter show batch_records_written: null. Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com>
1 parent 49913a8 commit 3179ddb

File tree

7 files changed

+424
-73
lines changed

7 files changed

+424
-73
lines changed

crates/adapters/src/controller.rs

Lines changed: 83 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1834,6 +1834,24 @@ impl Controller {
18341834
ValueType::Gauge,
18351835
|m| &m.memory,
18361836
);
1837+
1838+
// batch_records_written is Option<Arc<AtomicU64>> (only present for
1839+
// connectors that support it), so it can't use write_output_metric.
1840+
metrics.values(
1841+
"output_connector_batch_records_written",
1842+
"Number of records written so far in the current output batch. Non-zero while a batch write is in progress. Resets to 0 after the batch is committed.",
1843+
ValueType::Gauge,
1844+
|w| {
1845+
for output in status.output_status().values() {
1846+
if let Some(counter) = &output.metrics.batch_records_written {
1847+
w.write_value(
1848+
&labels.with("endpoint", &output.endpoint_name),
1849+
counter.as_ref(),
1850+
);
1851+
}
1852+
}
1853+
},
1854+
);
18371855
}
18381856

18391857
/// Execute a SQL query over materialized tables and views;
@@ -6137,61 +6155,75 @@ impl ControllerInner {
61376155
.validate()
61386156
.map_err(|e| ControllerError::invalid_output_buffer_configuration(endpoint_name, &e))?;
61396157

6140-
let encoder = if let Some(mut endpoint) = endpoint {
6141-
endpoint
6142-
.connect(Box::new(
6143-
move |fatal: bool, e: AnyError, error_tag: Option<&str>| {
6144-
if let Some(controller) = self_weak.upgrade() {
6145-
controller.output_transport_error(
6146-
endpoint_id,
6147-
&endpoint_name_str,
6148-
fatal,
6149-
e,
6150-
error_tag,
6151-
)
6152-
}
6153-
},
6154-
))
6155-
.map_err(|e| ControllerError::output_transport_error(endpoint_name, true, e))?;
6158+
// Initialize endpoint stats early so that connectors can register
6159+
// batch-progress counters (or other metrics) during construction.
6160+
self.status.add_output(
6161+
&endpoint_id,
6162+
endpoint_name,
6163+
endpoint_config,
6164+
initial_statistics,
6165+
);
61566166

6157-
// Create probe.
6158-
let probe = Box::new(OutputProbe::new(
6159-
endpoint_id,
6160-
endpoint_name,
6161-
endpoint,
6162-
self.clone(),
6163-
));
6167+
let encoder = (|| -> Result<Box<dyn Encoder>, ControllerError> {
6168+
if let Some(mut endpoint) = endpoint {
6169+
endpoint
6170+
.connect(Box::new(
6171+
move |fatal: bool, e: AnyError, error_tag: Option<&str>| {
6172+
if let Some(controller) = self_weak.upgrade() {
6173+
controller.output_transport_error(
6174+
endpoint_id,
6175+
&endpoint_name_str,
6176+
fatal,
6177+
e,
6178+
error_tag,
6179+
)
6180+
}
6181+
},
6182+
))
6183+
.map_err(|e| ControllerError::output_transport_error(endpoint_name, true, e))?;
61646184

6165-
// Create encoder.
6166-
let format_config = resolved_connector_config
6167-
.format
6168-
.as_ref()
6169-
.ok_or_else(|| ControllerError::output_format_not_specified(endpoint_name))?
6170-
.clone();
6185+
// Create probe.
6186+
let probe = Box::new(OutputProbe::new(
6187+
endpoint_id,
6188+
endpoint_name,
6189+
endpoint,
6190+
self.clone(),
6191+
));
61716192

6172-
let format = get_output_format(&format_config.name).ok_or_else(|| {
6173-
ControllerError::unknown_output_format(endpoint_name, &format_config.name)
6174-
})?;
6175-
format.new_encoder(
6176-
endpoint_name,
6177-
&resolved_connector_config,
6178-
&handles.key_schema,
6179-
&handles.value_schema,
6180-
probe,
6181-
)?
6182-
} else {
6183-
// `endpoint` is `None` - instantiate an integrated endpoint.
6184-
let endpoint = create_integrated_output_endpoint(
6185-
endpoint_id,
6186-
endpoint_name,
6187-
&resolved_connector_config,
6188-
&handles.key_schema,
6189-
&handles.value_schema,
6190-
self_weak,
6191-
)?;
6193+
// Create encoder.
6194+
let format_config = resolved_connector_config
6195+
.format
6196+
.as_ref()
6197+
.ok_or_else(|| ControllerError::output_format_not_specified(endpoint_name))?
6198+
.clone();
61926199

6193-
endpoint.into_encoder()
6194-
};
6200+
let format = get_output_format(&format_config.name).ok_or_else(|| {
6201+
ControllerError::unknown_output_format(endpoint_name, &format_config.name)
6202+
})?;
6203+
Ok(format.new_encoder(
6204+
endpoint_name,
6205+
&resolved_connector_config,
6206+
&handles.key_schema,
6207+
&handles.value_schema,
6208+
probe,
6209+
)?)
6210+
} else {
6211+
// `endpoint` is `None` - instantiate an integrated endpoint.
6212+
let endpoint = create_integrated_output_endpoint(
6213+
endpoint_id,
6214+
endpoint_name,
6215+
&resolved_connector_config,
6216+
&handles.key_schema,
6217+
&handles.value_schema,
6218+
self_weak,
6219+
)?;
6220+
6221+
Ok(endpoint.into_encoder())
6222+
}
6223+
})()
6224+
.inspect_err(|_e| {
6225+
self.status.remove_output(&endpoint_id);
6226+
})?;
61956227

61966228
let parker = Parker::new();
61976229
let endpoint_descr = OutputEndpointDescr::new(
@@ -6215,14 +6247,6 @@ impl ControllerInner {
62156247
.output_buffer_config
62166248
.clone();
62176249

6218-
// Initialize endpoint stats.
6219-
self.status.add_output(
6220-
&endpoint_id,
6221-
endpoint_name,
6222-
endpoint_config,
6223-
initial_statistics,
6224-
);
6225-
62266250
// Thread to run the output pipeline. We run it inside the DBSP runtime as an aux thread, so
62276251
// that it can use the storage backend to maintain the output buffer.
62286252
self.runtime

crates/adapters/src/controller/stats.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,21 @@ impl ControllerStatus {
723723
self.outputs.read_recursive()
724724
}
725725

726+
/// Register a batch-progress counter for an output endpoint.
727+
///
728+
/// Called by integrated connectors (e.g. Delta Lake) that create their own
729+
/// `Arc<AtomicU64>` progress counter. The counter is stored in the
730+
/// endpoint's metrics so that snapshots include it.
731+
pub fn register_batch_progress_counter(
732+
&self,
733+
endpoint_id: &EndpointId,
734+
counter: Arc<AtomicU64>,
735+
) {
736+
if let Some(status) = self.outputs.write().get_mut(endpoint_id) {
737+
status.metrics.batch_records_written = Some(counter);
738+
}
739+
}
740+
726741
/// Number of records buffered by the endpoint or 0 if the endpoint
727742
/// doesn't exist (the latter is possible if the endpoint is being
728743
/// destroyed).
@@ -2251,6 +2266,15 @@ pub struct OutputEndpointMetrics {
22512266
/// Extra memory in use beyond that used for queuing records. Not all
22522267
/// output connectors report this.
22532268
pub memory: AtomicU64,
2269+
2270+
/// Number of records written so far while the connector is processing a
2271+
/// batch of updates. Resets to 0 after the batch is committed. This is
2272+
/// a transient metric and is not checkpointed.
2273+
///
2274+
/// `None` when the connector does not support batch-progress reporting.
2275+
/// Wrapped in an `Arc` so that connectors can update the counter directly
2276+
/// without acquiring any lock on the metrics structure.
2277+
pub batch_records_written: Option<Arc<AtomicU64>>,
22542278
}
22552279

22562280
impl OutputEndpointMetrics {
@@ -2271,6 +2295,7 @@ impl OutputEndpointMetrics {
22712295
total_processed_input_records: AtomicU64::new(total_processed_input_records),
22722296
total_processed_steps: Atomic::new(0),
22732297
memory: AtomicU64::new(0),
2298+
batch_records_written: None,
22742299
}
22752300
}
22762301

@@ -2307,6 +2332,10 @@ impl OutputEndpointMetrics {
23072332
.load(Ordering::Relaxed),
23082333
total_processed_steps: self.total_processed_steps.load(Ordering::Relaxed),
23092334
memory: self.memory.load(Ordering::Relaxed),
2335+
batch_records_written: self
2336+
.batch_records_written
2337+
.as_ref()
2338+
.map(|c| c.load(Ordering::Relaxed)),
23102339
}
23112340
}
23122341
}

crates/adapters/src/integrated.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ where
3838
pub fn create_integrated_output_endpoint(
3939
endpoint_id: EndpointId,
4040
endpoint_name: &str,
41-
4241
connector_config: &ConnectorConfig,
4342
key_schema: &Option<Relation>,
4443
schema: &Relation,

0 commit comments

Comments
 (0)