Skip to content

Commit b9a90c2

Browse files
committed
[adapters] Report batch write progress for Delta Lake output connector.
When a Delta Lake output connector writes a large batch, the write can take a long time with no visible progress. Add a `batch_records_written` metric that reports how many records have been written so far in the current batch, giving users live feedback during long writes. The metric is updated incrementally per chunk by each parallel write range and resets to 0 after the batch is committed or on failure. Forward-progress updates use `fetch_add` to atomically increment the shared counter, while rollbacks on retry use `fetch_sub` to subtract exactly the failed attempt's contribution. Batch boundaries and terminal failures use `store(0)` for a clean reset. The `Arc<AtomicU64>` counter is shared directly between the connector and `OutputEndpointMetrics`, so metrics snapshots read live progress without any extra synchronisation. The counter is only wired into metrics for connectors that actually report progress (currently Delta Lake); other integrated connectors receive the Arc but their metrics show `batch_records_written: null` rather than a misleading `0`. Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com>
1 parent 48d51fb commit b9a90c2

File tree

8 files changed

+353
-17
lines changed

8 files changed

+353
-17
lines changed

crates/adapters/benches/delta_encoder.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
55
use dbsp_adapters::Encoder;
66
use dbsp_adapters::integrated::delta_table::DeltaTableWriter;
77
use feldera_types::transport::delta_table::{DeltaTableWriteMode, DeltaTableWriterConfig};
8-
use std::sync::Weak;
8+
use std::sync::atomic::AtomicU64;
9+
use std::sync::{Arc, Weak};
910
use tempfile::TempDir;
1011

1112
// ---------------------------------------------------------------------------
@@ -30,6 +31,7 @@ fn create_indexed_writer(threads: usize, table_uri: &str) -> DeltaTableWriter {
3031
&key_schema,
3132
&value_schema,
3233
Weak::new(),
34+
Arc::new(AtomicU64::new(0)),
3335
)
3436
.unwrap()
3537
}

crates/adapters/src/controller.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6137,6 +6137,7 @@ impl ControllerInner {
61376137
.validate()
61386138
.map_err(|e| ControllerError::invalid_output_buffer_configuration(endpoint_name, &e))?;
61396139

6140+
let mut batch_records_written = None;
61406141
let encoder = if let Some(mut endpoint) = endpoint {
61416142
endpoint
61426143
.connect(Box::new(
@@ -6181,15 +6182,28 @@ impl ControllerInner {
61816182
)?
61826183
} else {
61836184
// `endpoint` is `None` - instantiate an integrated endpoint.
6185+
let batch_records_written_counter = Arc::new(AtomicU64::new(0));
61846186
let endpoint = create_integrated_output_endpoint(
61856187
endpoint_id,
61866188
endpoint_name,
61876189
&resolved_connector_config,
61886190
&handles.key_schema,
61896191
&handles.value_schema,
61906192
self_weak,
6193+
batch_records_written_counter.clone(),
61916194
)?;
61926195

6196+
// Only expose the progress counter for connectors that actually
6197+
// update it (currently Delta Lake). Other integrated connectors
6198+
// receive the Arc but never increment it, so surfacing it would
6199+
// show a misleading permanent 0.
6200+
if matches!(
6201+
&resolved_connector_config.transport,
6202+
TransportConfig::DeltaTableOutput(_)
6203+
) {
6204+
batch_records_written = Some(batch_records_written_counter);
6205+
}
6206+
61936207
endpoint.into_encoder()
61946208
};
61956209

@@ -6221,6 +6235,7 @@ impl ControllerInner {
62216235
endpoint_name,
62226236
endpoint_config,
62236237
initial_statistics,
6238+
batch_records_written,
62246239
);
62256240

62266241
// Thread to run the output pipeline. We run it inside the DBSP runtime as an aux thread, so

crates/adapters/src/controller/stats.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,7 @@ impl ControllerStatus {
624624
endpoint_name: &str,
625625
config: &OutputEndpointConfig,
626626
initial_statistics: Option<&CheckpointOutputEndpointMetrics>,
627+
batch_records_written: Option<Arc<AtomicU64>>,
627628
) {
628629
// Initialize the `total_processed_input_records` counter on the new endpoint to `total_processed_records`:
629630
// logically the new endpoint is up to speed with the outputs produced by the pipeline so far and only needs to
@@ -639,6 +640,7 @@ impl ControllerStatus {
639640
config,
640641
total_processed_records,
641642
initial_statistics,
643+
batch_records_written,
642644
),
643645
);
644646
}
@@ -2251,12 +2253,22 @@ pub struct OutputEndpointMetrics {
22512253
/// Extra memory in use beyond that used for queuing records. Not all
22522254
/// output connectors report this.
22532255
pub memory: AtomicU64,
2256+
2257+
/// Number of records written so far while the connector is processing a
2258+
/// batch of updates. Resets to 0 after the batch is committed. This is
2259+
/// a transient metric and is not checkpointed.
2260+
///
2261+
/// `None` when the connector does not support batch-progress reporting.
2262+
/// Wrapped in an `Arc` so that connectors can update the counter directly
2263+
/// without acquiring any lock on the metrics structure.
2264+
pub batch_records_written: Option<Arc<AtomicU64>>,
22542265
}
22552266

22562267
impl OutputEndpointMetrics {
22572268
fn new(
22582269
total_processed_input_records: u64,
22592270
initial_statistics: Option<&CheckpointOutputEndpointMetrics>,
2271+
batch_records_written: Option<Arc<AtomicU64>>,
22602272
) -> Self {
22612273
let initial_statistics = initial_statistics.cloned().unwrap_or_default();
22622274
Self {
@@ -2271,6 +2283,7 @@ impl OutputEndpointMetrics {
22712283
total_processed_input_records: AtomicU64::new(total_processed_input_records),
22722284
total_processed_steps: Atomic::new(0),
22732285
memory: AtomicU64::new(0),
2286+
batch_records_written,
22742287
}
22752288
}
22762289

@@ -2307,6 +2320,10 @@ impl OutputEndpointMetrics {
23072320
.load(Ordering::Relaxed),
23082321
total_processed_steps: self.total_processed_steps.load(Ordering::Relaxed),
23092322
memory: self.memory.load(Ordering::Relaxed),
2323+
batch_records_written: self
2324+
.batch_records_written
2325+
.as_ref()
2326+
.map(|c| c.load(Ordering::Relaxed)),
23102327
}
23112328
}
23122329
}
@@ -2463,11 +2480,16 @@ impl OutputEndpointStatus {
24632480
config: &OutputEndpointConfig,
24642481
total_processed_records: u64,
24652482
initial_statistics: Option<&CheckpointOutputEndpointMetrics>,
2483+
batch_records_written: Option<Arc<AtomicU64>>,
24662484
) -> Self {
24672485
Self {
24682486
endpoint_name: endpoint_name.to_string(),
24692487
config: config.clone(),
2470-
metrics: OutputEndpointMetrics::new(total_processed_records, initial_statistics),
2488+
metrics: OutputEndpointMetrics::new(
2489+
total_processed_records,
2490+
initial_statistics,
2491+
batch_records_written,
2492+
),
24712493
fatal_error: Mutex::new(None),
24722494
encode_errors: Mutex::new(ConnectorErrorList::new()),
24732495
transport_errors: Mutex::new(ConnectorErrorList::new()),

crates/adapters/src/controller/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2261,7 +2261,7 @@ fn test_external_controller_status_serialization() {
22612261
}
22622262
}))
22632263
.unwrap();
2264-
status.add_output(&0, "http_output", &output_config, None);
2264+
status.add_output(&0, "http_output", &output_config, None, None);
22652265

22662266
// Set output metrics
22672267
if let Some(output) = status.output_status().get(&0) {

crates/adapters/src/integrated.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ use crate::transport::IntegratedInputEndpoint;
33
use crate::{ControllerError, Encoder, InputConsumer, OutputEndpoint};
44
use feldera_types::config::{ConnectorConfig, TransportConfig};
55
use feldera_types::program_schema::Relation;
6-
use std::sync::Weak;
6+
use std::sync::atomic::AtomicU64;
7+
use std::sync::{Arc, Weak};
78

89
#[cfg(feature = "with-deltalake")]
910
pub mod delta_table;
@@ -43,6 +44,7 @@ pub fn create_integrated_output_endpoint(
4344
key_schema: &Option<Relation>,
4445
schema: &Relation,
4546
controller: Weak<ControllerInner>,
47+
batch_records_written: Arc<AtomicU64>,
4648
) -> Result<Box<dyn IntegratedOutputEndpoint>, ControllerError> {
4749
let ep: Box<dyn IntegratedOutputEndpoint> = match &connector_config.transport {
4850
#[cfg(feature = "with-deltalake")]
@@ -53,6 +55,7 @@ pub fn create_integrated_output_endpoint(
5355
key_schema,
5456
schema,
5557
controller,
58+
batch_records_written,
5659
)?),
5760
TransportConfig::PostgresOutput(config) => Box::new(PostgresOutputEndpoint::new(
5861
endpoint_id,

0 commit comments

Comments
 (0)