Skip to content

Commit 47b7efc

Browse files
committed
requested changes
1 parent adf3e2c commit 47b7efc

File tree

8 files changed

+326
-126
lines changed

8 files changed

+326
-126
lines changed

crates/adapters/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,3 +246,7 @@ required-features = ["with-avro"]
246246
name = "postgres_output"
247247
harness = false
248248
required-features = ["bench-mode"]
249+
250+
[[bench]]
251+
name = "delta_encoder"
252+
harness = false
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
mod bench_common;
2+
3+
use bench_common::{BenchKeyStruct, BenchTestStruct, build_indexed_batch, generate_test_data};
4+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
5+
use dbsp_adapters::Encoder;
6+
use dbsp_adapters::integrated::delta_table::DeltaTableWriter;
7+
use feldera_types::transport::delta_table::{DeltaTableWriteMode, DeltaTableWriterConfig};
8+
use std::sync::Weak;
9+
use tempfile::TempDir;
10+
11+
// ---------------------------------------------------------------------------
12+
// Delta-specific helpers
13+
// ---------------------------------------------------------------------------
14+
15+
fn create_indexed_writer(threads: usize, table_uri: &str) -> DeltaTableWriter {
16+
let config = DeltaTableWriterConfig {
17+
uri: table_uri.to_string(),
18+
mode: DeltaTableWriteMode::Truncate,
19+
max_retries: Some(0),
20+
threads: Some(threads),
21+
object_store_config: Default::default(),
22+
};
23+
let key_schema = Some(BenchKeyStruct::relation_schema());
24+
let mut value_schema = BenchTestStruct::relation_schema();
25+
value_schema.materialized = true;
26+
DeltaTableWriter::new(
27+
Default::default(),
28+
"bench_endpoint",
29+
&config,
30+
&key_schema,
31+
&value_schema,
32+
Weak::new(),
33+
)
34+
.unwrap()
35+
}
36+
37+
// ---------------------------------------------------------------------------
38+
// Benchmarks
39+
// ---------------------------------------------------------------------------
40+
41+
/// Benchmark parallel Delta table encoding with 100k records across 1/2/4/8 workers.
42+
fn bench_indexed_encode(c: &mut Criterion) {
43+
let num_records = 100_000;
44+
let data = generate_test_data(num_records);
45+
let batch = build_indexed_batch(&data);
46+
47+
let mut group = c.benchmark_group("delta_indexed_encode");
48+
group.throughput(criterion::Throughput::Elements(num_records as u64));
49+
50+
for workers in [1, 2, 4, 8] {
51+
group.bench_with_input(
52+
BenchmarkId::new("workers", workers),
53+
&workers,
54+
|b, &workers| {
55+
// Each iteration needs a fresh directory since the writer
56+
// creates real Parquet files.
57+
b.iter_with_setup(
58+
|| {
59+
let table_dir = TempDir::new().unwrap();
60+
let table_uri = table_dir.path().display().to_string();
61+
let writer = create_indexed_writer(workers, &table_uri);
62+
(writer, table_dir)
63+
},
64+
|(mut writer, _table_dir)| {
65+
writer.consumer().batch_start(0);
66+
writer.encode(batch.clone().arc_as_batch_reader()).unwrap();
67+
writer.consumer().batch_end();
68+
},
69+
);
70+
},
71+
);
72+
}
73+
74+
group.finish();
75+
}
76+
77+
criterion_group!(benches, bench_indexed_encode);
78+
criterion_main!(benches);

crates/adapters/src/integrated.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use feldera_types::program_schema::Relation;
66
use std::sync::Weak;
77

88
#[cfg(feature = "with-deltalake")]
9-
mod delta_table;
9+
pub mod delta_table;
1010
mod postgres;
1111

1212
use crate::integrated::postgres::PostgresInputEndpoint;

0 commit comments

Comments
 (0)