Skip to content

Commit 7e00e06

Browse files
committed
print metadata
1 parent b297bb0 commit 7e00e06

File tree

9 files changed

+76
-24
lines changed

9 files changed

+76
-24
lines changed

crates/adapterlib/src/format.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -682,7 +682,7 @@ pub trait OutputConsumer: Send {
682682
/// The encoder should not generate buffers exceeding this size.
683683
fn max_buffer_size_bytes(&self) -> usize;
684684

685-
fn batch_start(&mut self, step: Step);
685+
fn batch_start(&mut self, step: Step, processed_records: usize);
686686

687687
/// See OutputEndpoint::push_buffer.
688688
fn push_buffer(&mut self, buffer: &[u8], num_records: usize);

crates/adapters/src/controller.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6253,9 +6253,10 @@ impl ControllerInner {
62536253
endpoint_name: &str,
62546254
encoder: &mut dyn Encoder,
62556255
step: Step,
6256+
processed_records: usize,
62566257
controller: &ControllerInner,
62576258
) {
6258-
encoder.consumer().batch_start(step);
6259+
encoder.consumer().batch_start(step, processed_records);
62596260
encoder.encode(batch).unwrap_or_else(|e| {
62606261
controller.encode_error(endpoint_id, endpoint_name, e, Some("encoder_error"))
62616262
});
@@ -6300,6 +6301,9 @@ impl ControllerInner {
63006301
&endpoint_name,
63016302
encoder.as_mut(),
63026303
output_buffer.buffered_step,
6304+
output_buffer
6305+
.buffered_processed_records
6306+
.total_processed_input_records as usize,
63036307
&controller,
63046308
);
63056309

@@ -6346,6 +6350,9 @@ impl ControllerInner {
63466350
&endpoint_name,
63476351
encoder.as_mut(),
63486352
step,
6353+
processed_records
6354+
.map(|r| r.total_processed_input_records as usize)
6355+
.unwrap_or(0),
63496356
&controller,
63506357
);
63516358
}
@@ -7127,7 +7134,7 @@ impl OutputConsumer for OutputProbe {
71277134
self.endpoint.max_buffer_size_bytes()
71287135
}
71297136

7130-
fn batch_start(&mut self, step: Step) {
7137+
fn batch_start(&mut self, step: Step, _processed_records: usize) {
71317138
self.endpoint.batch_start(step).unwrap_or_else(|e| {
71327139
self.controller.output_transport_error(
71337140
self.endpoint_id,

crates/adapters/src/controller/stats.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,16 @@ impl ControllerStatus {
484484
self.global_metrics.state.load(Ordering::Relaxed)
485485
}
486486

487+
pub fn metadata_as_of(&self, offset: u64) -> BTreeMap<String, Option<WatermarkListEntry>> {
488+
let mut result = BTreeMap::new();
489+
490+
for (endpoint_id, input_status) in self.inputs.read().iter() {
491+
result.insert(endpoint_id.to_string(), input_status.metadata_as_of(offset));
492+
}
493+
494+
result
495+
}
496+
487497
/// Set the state to `desired`.
488498
///
489499
/// Setting the state to [PipelineState::Terminated] is permanent; the state
@@ -1712,6 +1722,10 @@ impl WatermarkTracker {
17121722
pub fn completed_watermark(&self) -> Option<CompletedWatermark> {
17131723
self.0.lock().unwrap().completed_watermark.clone()
17141724
}
1725+
1726+
pub fn metadata_as_of(&self, offset: u64) -> Option<WatermarkListEntry> {
1727+
self.0.lock().unwrap().as_of(offset)
1728+
}
17151729
}
17161730

17171731
/// Bound the number of tracked watermarks to avoid unbounded memory growth.
@@ -1860,10 +1874,22 @@ impl WatermarkTrackerInner {
18601874
}
18611875
}
18621876
}
1877+
1878+
fn as_of(&self, offset: u64) -> Option<WatermarkListEntry> {
1879+
let mut watermark = None;
1880+
for entry in self.watermark_with_metadata_list.iter() {
1881+
if entry.global_offset <= offset {
1882+
watermark = Some(entry.clone());
1883+
} else {
1884+
break;
1885+
}
1886+
}
1887+
watermark
1888+
}
18631889
}
18641890

1865-
#[derive(Debug)]
1866-
struct WatermarkListEntry {
1891+
#[derive(Debug, Clone)]
1892+
pub struct WatermarkListEntry {
18671893
watermark: Watermark,
18681894
global_offset: u64,
18691895
processed_at: Option<DateTime<Utc>>,
@@ -1965,6 +1991,10 @@ impl InputEndpointStatus {
19651991
completed_frontier: self.completed_frontier.completed_watermark(),
19661992
}
19671993
}
1994+
1995+
pub fn metadata_as_of(&self, offset: u64) -> Option<WatermarkListEntry> {
1996+
self.completed_frontier.metadata_as_of(offset)
1997+
}
19681998
}
19691999

19702000
impl InputEndpointStatus {

crates/adapters/src/format/avro/test.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,7 +1259,7 @@ where
12591259
})
12601260
.collect::<Vec<_>>();
12611261
for (step, zset) in zsets.into_iter().enumerate() {
1262-
encoder.consumer().batch_start(step as u64);
1262+
encoder.consumer().batch_start(step as u64, 0);
12631263
encoder.encode(zset.arc_as_batch_reader()).unwrap();
12641264
encoder.consumer().batch_end();
12651265
}
@@ -1356,7 +1356,7 @@ fn test_raw_avro_output_indexed<K, T>(
13561356
})
13571357
.collect::<Vec<_>>();
13581358
for (step, zset) in zsets.into_iter().enumerate() {
1359-
encoder.consumer().batch_start(step as u64);
1359+
encoder.consumer().batch_start(step as u64, 0);
13601360
encoder.encode(zset.arc_as_batch_reader()).unwrap();
13611361
encoder.consumer().batch_end();
13621362
}
@@ -1448,7 +1448,7 @@ fn test_confluent_avro_output<K, V, KF>(
14481448
})
14491449
.collect::<Vec<_>>();
14501450
for (step, zset) in zsets.into_iter().enumerate() {
1451-
encoder.consumer().batch_start(step as u64);
1451+
encoder.consumer().batch_start(step as u64, 0);
14521452
encoder.encode(zset.arc_as_batch_reader()).unwrap();
14531453
encoder.consumer().batch_end();
14541454
}
@@ -1557,7 +1557,7 @@ fn test_confluent_avro_output_indexed<K, V>(
15571557
.collect::<Vec<_>>();
15581558

15591559
for (step, zset) in zsets.into_iter().enumerate() {
1560-
encoder.consumer().batch_start(step as u64);
1560+
encoder.consumer().batch_start(step as u64, 0);
15611561
encoder.encode(zset.arc_as_batch_reader()).unwrap();
15621562
encoder.consumer().batch_end();
15631563
}
@@ -1644,15 +1644,15 @@ fn test_non_unique_keys() {
16441644
let zset = OrdIndexedZSet::from_tuples((), vec![Tup2(Tup2(k1.clone(), v1.clone()), 2)]);
16451645
let zset = Arc::new(<SerBatchImpl<_, KeyStruct, TestStruct>>::new(zset)) as Arc<dyn SerBatch>;
16461646

1647-
encoder.consumer().batch_start(0);
1647+
encoder.consumer().batch_start(0, 0);
16481648
let err = encoder.encode(zset.arc_as_batch_reader()).unwrap_err();
16491649
assert!(err.to_string().contains(r#"is inserted 2 times"#));
16501650
encoder.consumer().batch_end();
16511651

16521652
let zset = OrdIndexedZSet::from_tuples((), vec![Tup2(Tup2(k1.clone(), v1.clone()), -2)]);
16531653
let zset = Arc::new(<SerBatchImpl<_, KeyStruct, TestStruct>>::new(zset)) as Arc<dyn SerBatch>;
16541654

1655-
encoder.consumer().batch_start(0);
1655+
encoder.consumer().batch_start(0, 0);
16561656
let err = encoder.encode(zset.arc_as_batch_reader()).unwrap_err();
16571657
assert!(err.to_string().contains(r#"is deleted 2 times"#));
16581658
encoder.consumer().batch_end();
@@ -1666,7 +1666,7 @@ fn test_non_unique_keys() {
16661666
);
16671667
let zset = Arc::new(<SerBatchImpl<_, KeyStruct, TestStruct>>::new(zset)) as Arc<dyn SerBatch>;
16681668

1669-
encoder.consumer().batch_start(0);
1669+
encoder.consumer().batch_start(0, 0);
16701670
let err = encoder.encode(zset.arc_as_batch_reader()).unwrap_err();
16711671
println!("{err}");
16721672
assert!(
@@ -1684,7 +1684,7 @@ fn test_non_unique_keys() {
16841684
);
16851685
let zset = Arc::new(<SerBatchImpl<_, KeyStruct, TestStruct>>::new(zset)) as Arc<dyn SerBatch>;
16861686

1687-
encoder.consumer().batch_start(0);
1687+
encoder.consumer().batch_start(0, 0);
16881688
let err = encoder.encode(zset.arc_as_batch_reader()).unwrap_err();
16891689
println!("{err}");
16901690
assert!(
@@ -1788,7 +1788,7 @@ fn run_avro_encoder_spine_snapshot_indexed<K, V>(
17881788
)
17891789
.unwrap();
17901790

1791-
encoder.consumer().batch_start(0);
1791+
encoder.consumer().batch_start(0, 0);
17921792
encoder.encode(snapshot).unwrap();
17931793
encoder.consumer().batch_end();
17941794

crates/adapters/src/format/json/output.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -674,7 +674,7 @@ mod test {
674674
})
675675
.collect::<Vec<_>>();
676676
for (step, zset) in zsets.into_iter().enumerate() {
677-
encoder.consumer().batch_start(step as u64);
677+
encoder.consumer().batch_start(step as u64, 0);
678678
encoder.encode(zset.arc_as_batch_reader()).unwrap();
679679
encoder.consumer().batch_end();
680680
}

crates/adapters/src/format/parquet/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ fn parquet_output() {
156156
);
157157

158158
let zset = Arc::new(SerBatchImpl::<_, TestStruct2, ()>::new(zset)) as Arc<dyn SerBatchReader>;
159-
encoder.consumer().batch_start(0);
159+
encoder.consumer().batch_start(0, 0);
160160
encoder.encode(zset).unwrap();
161161
encoder.consumer().batch_end();
162162

crates/adapters/src/integrated/delta_table/output.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ use serde::Serialize;
3535
use serde_arrow::ArrayBuilder;
3636
use serde_arrow::schema::SerdeArrowSchema;
3737
use std::cmp::min;
38+
use std::sync::atomic::AtomicUsize;
39+
use std::sync::atomic::Ordering;
3840
use std::sync::{Arc, Weak};
3941
use tokio::time::{Duration, sleep};
4042
use tracing::{Instrument, info, info_span, warn};
@@ -62,6 +64,7 @@ struct DeltaTableWriterInner {
6264
key_schema: Option<Relation>,
6365
value_schema: Relation,
6466
controller: Weak<ControllerInner>,
67+
processed_records: AtomicUsize,
6568
}
6669

6770
pub struct DeltaTableWriter {
@@ -141,6 +144,7 @@ impl DeltaTableWriter {
141144
key_schema: key_schema.clone(),
142145
value_schema: value_schema.clone(),
143146
controller,
147+
processed_records: AtomicUsize::new(0),
144148
});
145149
// Create or open the delta table.
146150
// Panic safety: block_on() panics if called from a tokio async context.
@@ -525,7 +529,15 @@ async fn stream_encode_and_write(
525529

526530
while cursor.key_valid() {
527531
let op = indexed_operation_type(&inner.value_schema.name, index_name, &mut cursor)
528-
.map_err(WriteError::Deterministic)?;
532+
.map_err(|e| {
533+
if let Some(controller) = inner.controller.upgrade() {
534+
let metadata = controller
535+
.status
536+
.metadata_as_of(inner.processed_records.load(Ordering::Relaxed) as u64);
537+
println!("metadata: {:?}", metadata);
538+
};
539+
WriteError::Deterministic(e)
540+
})?;
529541

530542
if let Some(op) = op {
531543
cursor.rewind_vals();
@@ -634,9 +646,12 @@ impl OutputConsumer for DeltaTableWriter {
634646
usize::MAX
635647
}
636648

637-
fn batch_start(&mut self, _step: Step) {
649+
fn batch_start(&mut self, _step: Step, processed_records: usize) {
638650
self.pending_actions.clear();
639651
self.num_rows = 0;
652+
self.inner
653+
.processed_records
654+
.store(processed_records, Ordering::Relaxed);
640655
}
641656

642657
fn push_buffer(&mut self, _buffer: &[u8], _num_records: usize) {
@@ -1038,7 +1053,7 @@ mod parallel {
10381053
}
10391054

10401055
fn encode_batch(endpoint: &mut DeltaTableWriter, batch: &Arc<dyn SerBatch>) {
1041-
endpoint.consumer().batch_start(0);
1056+
endpoint.consumer().batch_start(0, 0);
10421057
endpoint
10431058
.encode(batch.clone().arc_as_batch_reader())
10441059
.unwrap();
@@ -1376,7 +1391,7 @@ mod parallel {
13761391
// Make directory read-only to trigger write failure.
13771392
std::fs::set_permissions(table_dir.path(), std::fs::Permissions::from_mode(0o555)).unwrap();
13781393

1379-
endpoint.consumer().batch_start(0);
1394+
endpoint.consumer().batch_start(0, 0);
13801395
let result = endpoint.encode(batch.arc_as_batch_reader());
13811396

13821397
// Restore permissions before asserting (so TempDir cleanup succeeds).
@@ -1415,7 +1430,7 @@ mod parallel {
14151430
// Make directory read-only to trigger write failure.
14161431
std::fs::set_permissions(table_dir.path(), std::fs::Permissions::from_mode(0o555)).unwrap();
14171432

1418-
endpoint.consumer().batch_start(0);
1433+
endpoint.consumer().batch_start(0, 0);
14191434
let result = endpoint.encode(batch.arc_as_batch_reader());
14201435

14211436
// Restore permissions.

crates/adapters/src/integrated/postgres/output.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,7 @@ impl OutputConsumer for PostgresOutputEndpoint {
692692
self.config.max_buffer_size_bytes
693693
}
694694

695-
fn batch_start(&mut self, _step: Step) {
695+
fn batch_start(&mut self, _step: Step, _processed_records: usize) {
696696
self.txn_start = std::time::Instant::now();
697697

698698
match self.broadcast_and_collect(BroadcastCommand::BatchStart) {
@@ -1272,7 +1272,7 @@ mod tests {
12721272
}
12731273

12741274
fn encode_batch(endpoint: &mut PostgresOutputEndpoint, batch: &Arc<dyn SerBatch>) {
1275-
endpoint.consumer().batch_start(0);
1275+
endpoint.consumer().batch_start(0, 0);
12761276
endpoint
12771277
.encode(batch.clone().arc_as_batch_reader())
12781278
.unwrap();

crates/adapters/src/test/mock_output_consumer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ impl OutputConsumer for MockOutputConsumer {
6767
self.max_buffer_size_bytes
6868
}
6969

70-
fn batch_start(&mut self, _step: Step) {}
70+
fn batch_start(&mut self, _step: Step, _processed_records: usize) {}
7171
fn push_buffer(&mut self, buffer: &[u8], _num_records: usize) {
7272
self.data
7373
.lock()

0 commit comments

Comments
 (0)