Skip to content

Commit 341e2bc

Browse files
Leonid Ryzhykryzhyk
authored andcommitted
[adapters] Improve DeltaLake data format tests.
Make sure that DeltaLake tests exercise all supported DeltaLake types. Created a custom data structure that contains instances of all supported DeltaLake types and use that instead of `TestStruct2` in testing. Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
1 parent d2cd742 commit 341e2bc

File tree

5 files changed

+443
-61
lines changed

5 files changed

+443
-61
lines changed

crates/adapters/src/format/parquet/test.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ pub fn load_parquet_file<T: for<'de> DeserializeWithContext<'de, SqlSerdeConfig>
3939
.unwrap_or_else(|_| panic!("error opening parquet file {path:?}"))
4040
.into_iter()
4141
.map(|row| {
42-
let row = row.unwrap().to_json_value();
43-
//println!("row: {}", &row);
42+
let row = row.unwrap();
43+
44+
let row = row.to_json_value();
4445

4546
T::deserialize_with_context(row, &SqlSerdeConfig::from(JsonFlavor::ParquetConverter))
4647
.unwrap()

crates/adapters/src/integrated/delta_table/test.rs

Lines changed: 70 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use crate::format::parquet::test::load_parquet_file;
44
use crate::format::relation_to_parquet_schema;
55
use crate::integrated::delta_table::register_storage_handlers;
66
use crate::test::{
7-
file_to_zset, list_files_recursive, test_circuit, wait, DatabricksPeople, MockDeZSet,
8-
MockUpdate, TestStruct2,
7+
file_to_zset, list_files_recursive, test_circuit, wait, DatabricksPeople, DeltaTestStruct,
8+
MockDeZSet, MockUpdate,
99
};
1010
use crate::{Controller, ControllerError, InputFormat};
1111
use anyhow::anyhow;
@@ -25,7 +25,9 @@ use deltalake::operations::create::CreateBuilder;
2525
use deltalake::protocol::SaveMode;
2626
use deltalake::{DeltaOps, DeltaTable, DeltaTableBuilder};
2727
use feldera_types::config::PipelineConfig;
28+
use feldera_types::format::json::JsonFlavor;
2829
use feldera_types::program_schema::{Field, Relation};
30+
use feldera_types::serde_with_context::serde_config::DecimalFormat;
2931
use feldera_types::serde_with_context::serialize::SerializeWithContextWrapper;
3032
use feldera_types::serde_with_context::{
3133
DateFormat, DeserializeWithContext, SerializeWithContext, SqlSerdeConfig, TimeFormat,
@@ -60,6 +62,7 @@ use uuid::Uuid;
6062
fn delta_output_serde_config() -> SqlSerdeConfig {
6163
SqlSerdeConfig::default()
6264
.with_date_format(DateFormat::String("%Y-%m-%d"))
65+
.with_decimal_format(DecimalFormat::String)
6366
// DeltaLake only supports microsecond-based timestamp encoding, so we just
6467
// hardwire that for now. See also `format/parquet/mod.rs`.
6568
.with_timestamp_format(TimestampFormat::MicrosSinceEpoch)
@@ -298,7 +301,7 @@ outputs:
298301
let config: PipelineConfig = serde_yaml::from_str(&config_str).unwrap();
299302

300303
Controller::with_config(
301-
|workers| Ok(test_circuit::<T>(workers, &TestStruct2::schema())),
304+
|workers| Ok(test_circuit::<T>(workers, &DeltaTestStruct::schema())),
302305
&config,
303306
Box::new(move |e| panic!("delta_to_delta pipeline: error: {e}")),
304307
)
@@ -314,7 +317,7 @@ outputs:
314317
/// by reading parquet files directly. I guess the best way to do this is
315318
/// to build an input connector.
316319
fn delta_table_output_test(
317-
data: Vec<TestStruct2>,
320+
data: Vec<DeltaTestStruct>,
318321
table_uri: &str,
319322
object_store_config: &HashMap<String, String>,
320323
verify: bool,
@@ -330,8 +333,11 @@ fn delta_table_output_test(
330333
for v in data.iter() {
331334
let buffer: Vec<u8> = Vec::new();
332335
let mut serializer = serde_json::Serializer::new(buffer);
333-
v.serialize_with_context(&mut serializer, &SqlSerdeConfig::default())
334-
.unwrap();
336+
v.serialize_with_context(
337+
&mut serializer,
338+
&SqlSerdeConfig::from(JsonFlavor::default()),
339+
)
340+
.unwrap();
335341
input_file
336342
.as_file_mut()
337343
.write_all(&serializer.into_inner())
@@ -385,7 +391,12 @@ outputs:
385391
let config: PipelineConfig = serde_yaml::from_str(&config_str).unwrap();
386392

387393
let controller = Controller::with_config(
388-
|workers| Ok(test_circuit::<TestStruct2>(workers, &TestStruct2::schema())),
394+
|workers| {
395+
Ok(test_circuit::<DeltaTestStruct>(
396+
workers,
397+
&DeltaTestStruct::schema(),
398+
))
399+
},
389400
&config,
390401
Box::new(move |e| panic!("delta_table_output_test: error: {e}")),
391402
)
@@ -404,7 +415,7 @@ outputs:
404415

405416
let mut output_records = Vec::with_capacity(data.len());
406417
for parquet_file in parquet_files {
407-
let mut records: Vec<TestStruct2> = load_parquet_file(&parquet_file);
418+
let mut records: Vec<DeltaTestStruct> = load_parquet_file(&parquet_file);
408419
output_records.append(&mut records);
409420
}
410421

@@ -453,7 +464,7 @@ async fn test_follow(
453464
input_table_uri: &str,
454465
output_table_uri: &str,
455466
storage_options: &HashMap<String, String>,
456-
data: Vec<TestStruct2>,
467+
data: Vec<DeltaTestStruct>,
457468
snapshot: bool,
458469
buffer_size: u64,
459470
buffer_timeout_ms: u64,
@@ -504,7 +515,7 @@ async fn test_follow(
504515
let output_table_uri_clone = output_table_uri.to_string();
505516

506517
let pipeline = tokio::task::spawn_blocking(move || {
507-
delta_to_delta_pipeline::<TestStruct2>(
518+
delta_to_delta_pipeline::<DeltaTestStruct>(
508519
&input_table_uri_clone,
509520
&input_config,
510521
&output_table_uri_clone,
@@ -582,12 +593,12 @@ async fn test_follow(
582593
}
583594

584595
/// Generate up to `max_records` _unique_ records.
585-
fn data(max_records: usize) -> impl Strategy<Value = Vec<TestStruct2>> {
586-
vec(TestStruct2::arbitrary(), 0..max_records).prop_map(|vec| {
596+
fn delta_data(max_records: usize) -> impl Strategy<Value = Vec<DeltaTestStruct>> {
597+
vec(DeltaTestStruct::arbitrary(), 0..max_records).prop_map(|vec| {
587598
let mut idx = 0;
588599
vec.into_iter()
589600
.map(|mut x| {
590-
x.field = idx;
601+
x.bigint = idx;
591602
idx += 1;
592603
x
593604
})
@@ -599,9 +610,9 @@ async fn delta_table_follow_file_test_common(snapshot: bool) {
599610
// We cannot use proptest macros in `async` context, so generate
600611
// some random data manually.
601612
let mut runner = TestRunner::default();
602-
let data = data(100_000).new_tree(&mut runner).unwrap().current();
613+
let data = delta_data(20_000).new_tree(&mut runner).unwrap().current();
603614

604-
let relation_schema = TestStruct2::schema();
615+
let relation_schema = DeltaTestStruct::schema();
605616

606617
let input_table_dir = TempDir::new().unwrap();
607618
let input_table_uri = input_table_dir.path().display().to_string();
@@ -639,9 +650,9 @@ async fn delta_table_follow_s3_test_common(snapshot: bool) {
639650
// We cannot use proptest macros in `async` context, so generate
640651
// some random data manually.
641652
let mut runner = TestRunner::default();
642-
let data = data(100_000).new_tree(&mut runner).unwrap().current();
653+
let data = delta_data(20_000).new_tree(&mut runner).unwrap().current();
643654

644-
let relation_schema = TestStruct2::schema();
655+
let relation_schema = DeltaTestStruct::schema();
645656

646657
let input_uuid = uuid::Uuid::new_v4();
647658
let output_uuid = uuid::Uuid::new_v4();
@@ -691,95 +702,97 @@ async fn delta_table_snapshot_and_follow_s3_test() {
691702
}
692703

693704
proptest! {
694-
#![proptest_config(ProptestConfig::with_cases(2))]
705+
#![proptest_config(ProptestConfig::with_cases(1))]
695706

696707
/// ```text
697708
/// input.json --> [pipeline1]--->delta_table-->[pipeline2]-->output.json
698709
/// ```
699710
#[test]
700-
fn delta_table_file_output_proptest(data in data(100_000))
711+
fn delta_table_file_output_proptest(data in delta_data(20_000))
701712
{
702713
let table_dir = TempDir::new().unwrap();
703714
let table_uri = table_dir.path().display().to_string();
715+
716+
// Uncomment to inspect output parquet files produced by the test.
717+
forget(table_dir);
718+
704719
delta_table_output_test(data.clone(), &table_uri, &HashMap::new(), true);
705720

706-
// // Uncomment to inspect output parquet files produced by the test.
707-
// forget(table_dir);
708721

709722
// Read delta table unordered.
710-
let mut json_file = delta_table_snapshot_to_json::<TestStruct2>(
723+
let mut json_file = delta_table_snapshot_to_json::<DeltaTestStruct>(
711724
&table_uri,
712-
&TestStruct2::schema(),
725+
&DeltaTestStruct::schema(),
713726
&HashMap::new());
714727

715728
let expected_zset = OrdZSet::from_tuples((), data.clone().into_iter().map(|x| Tup2(Tup2(x,()),1)).collect());
716-
let zset = file_to_zset::<TestStruct2>(json_file.as_file_mut(), "json", r#"update_format: "insert_delete""#);
729+
let zset = file_to_zset::<DeltaTestStruct>(json_file.as_file_mut(), "json", r#"update_format: "insert_delete""#);
717730
assert_eq!(zset, expected_zset);
718731

719-
// Order delta table by `id` (which should be its natural order).
720-
let mut json_file_ordered_by_id = delta_table_snapshot_to_json::<TestStruct2>(
732+
// Order delta table by `bigint` (which should be its natural order).
733+
let mut json_file_ordered_by_id = delta_table_snapshot_to_json::<DeltaTestStruct>(
721734
&table_uri,
722-
&TestStruct2::schema_with_lateness(),
723-
&HashMap::from([("timestamp_column".to_string(), "id".to_string())]));
735+
&DeltaTestStruct::schema_with_lateness(),
736+
&HashMap::from([("timestamp_column".to_string(), "bigint".to_string())]));
724737

725-
let zset = file_to_zset::<TestStruct2>(json_file_ordered_by_id.as_file_mut(), "json", r#"update_format: "insert_delete""#);
738+
let zset = file_to_zset::<DeltaTestStruct>(json_file_ordered_by_id.as_file_mut(), "json", r#"update_format: "insert_delete""#);
726739
assert_eq!(zset, expected_zset);
727740

728-
// Order delta table by `id`, specify id range.
729-
let mut json_file_ordered_filtered = delta_table_snapshot_to_json::<TestStruct2>(
741+
// Order delta table by `bigint`, specify range.
742+
let mut json_file_ordered_filtered = delta_table_snapshot_to_json::<DeltaTestStruct>(
730743
&table_uri,
731-
&TestStruct2::schema_with_lateness(),
732-
&HashMap::from([("timestamp_column".to_string(), "id".to_string()), ("snapshot_filter".to_string(), "id >= 10000 ".to_string())]));
744+
&DeltaTestStruct::schema_with_lateness(),
745+
&HashMap::from([("timestamp_column".to_string(), "bigint".to_string()), ("snapshot_filter".to_string(), "bigint >= 10000 ".to_string())]));
733746

734747
let expected_filtered_zset = OrdZSet::from_tuples(
735748
(),
736749
data.clone().into_iter()
737-
.filter(|x| x.field >= 10000)
750+
.filter(|x| x.bigint >= 10000)
738751
.map(|x| Tup2(Tup2(x,()),1)).collect()
739752
);
740753

741-
let zset = file_to_zset::<TestStruct2>(json_file_ordered_filtered.as_file_mut(), "json", r#"update_format: "insert_delete""#);
754+
let zset = file_to_zset::<DeltaTestStruct>(json_file_ordered_filtered.as_file_mut(), "json", r#"update_format: "insert_delete""#);
742755
assert_eq!(zset, expected_filtered_zset);
743756

744757
// Order delta table by `timestamp`.
745-
let mut json_file_ordered_by_ts = delta_table_snapshot_to_json::<TestStruct2>(
758+
let mut json_file_ordered_by_ts = delta_table_snapshot_to_json::<DeltaTestStruct>(
746759
&table_uri,
747-
&TestStruct2::schema_with_lateness(),
748-
&HashMap::from([("timestamp_column".to_string(), "ts".to_string())]));
760+
&DeltaTestStruct::schema_with_lateness(),
761+
&HashMap::from([("timestamp_column".to_string(), "timestamp_ntz".to_string())]));
749762

750-
let zset = file_to_zset::<TestStruct2>(json_file_ordered_by_ts.as_file_mut(), "json", r#"update_format: "insert_delete""#);
763+
let zset = file_to_zset::<DeltaTestStruct>(json_file_ordered_by_ts.as_file_mut(), "json", r#"update_format: "insert_delete""#);
751764
assert_eq!(zset, expected_zset);
752765

753766
// Order delta table by `timestamp`; specify an empty filter condition
754-
let mut json_file_ordered_by_ts = delta_table_snapshot_to_json::<TestStruct2>(
767+
let mut json_file_ordered_by_ts = delta_table_snapshot_to_json::<DeltaTestStruct>(
755768
&table_uri,
756-
&TestStruct2::schema_with_lateness(),
757-
&HashMap::from([("timestamp_column".to_string(), "ts".to_string()), ("snapshot_filter".to_string(), "ts < timestamp '2005-01-01T00:00:00'".to_string())]));
769+
&DeltaTestStruct::schema_with_lateness(),
770+
&HashMap::from([("timestamp_column".to_string(), "timestamp_ntz".to_string()), ("snapshot_filter".to_string(), "timestamp_ntz < timestamp '2005-01-01T00:00:00'".to_string())]));
758771

759-
let zset = file_to_zset::<TestStruct2>(json_file_ordered_by_ts.as_file_mut(), "json", r#"update_format: "insert_delete""#);
772+
let zset = file_to_zset::<DeltaTestStruct>(json_file_ordered_by_ts.as_file_mut(), "json", r#"update_format: "insert_delete""#);
760773
assert_eq!(zset, OrdZSet::empty());
761774

762775
// Filter delta table by id
763-
let mut json_file_filtered_by_id = delta_table_snapshot_to_json::<TestStruct2>(
776+
let mut json_file_filtered_by_id = delta_table_snapshot_to_json::<DeltaTestStruct>(
764777
&table_uri,
765-
&TestStruct2::schema(),
766-
&HashMap::from([("snapshot_filter".to_string(), "id >= 10000 ".to_string())]));
778+
&DeltaTestStruct::schema(),
779+
&HashMap::from([("snapshot_filter".to_string(), "bigint >= 10000 ".to_string())]));
767780

768781
let expected_filtered_zset = OrdZSet::from_tuples(
769782
(),
770783
data.clone().into_iter()
771-
.filter(|x| x.field >= 10000)
784+
.filter(|x| x.bigint >= 10000)
772785
.map(|x| Tup2(Tup2(x,()),1)).collect()
773786
);
774787

775-
let zset = file_to_zset::<TestStruct2>(json_file_filtered_by_id.as_file_mut(), "json", r#"update_format: "insert_delete""#);
788+
let zset = file_to_zset::<DeltaTestStruct>(json_file_filtered_by_id.as_file_mut(), "json", r#"update_format: "insert_delete""#);
776789
assert_eq!(zset, expected_filtered_zset);
777790

778791
// Filter delta table by timestamp.
779-
let mut json_file_filtered_by_ts = delta_table_snapshot_to_json::<TestStruct2>(
792+
let mut json_file_filtered_by_ts = delta_table_snapshot_to_json::<DeltaTestStruct>(
780793
&table_uri,
781-
&TestStruct2::schema(),
782-
&HashMap::from([("snapshot_filter".to_string(), "ts >= '2005-01-01 00:00:00'".to_string())]));
794+
&DeltaTestStruct::schema(),
795+
&HashMap::from([("snapshot_filter".to_string(), "timestamp_ntz >= '2005-01-01 00:00:00'".to_string())]));
783796

784797
let start = NaiveDate::from_ymd_opt(2005, 1, 1)
785798
.unwrap()
@@ -789,11 +802,11 @@ proptest! {
789802
let expected_filtered_zset = OrdZSet::from_tuples(
790803
(),
791804
data.into_iter()
792-
.filter(|x| x.field_2.milliseconds() >= start.and_utc().timestamp_millis())
805+
.filter(|x| x.timestamp_ntz.milliseconds() >= start.and_utc().timestamp_millis())
793806
.map(|x| Tup2(Tup2(x,()),1)).collect()
794807
);
795808

796-
let zset = file_to_zset::<TestStruct2>(json_file_filtered_by_ts.as_file_mut(), "json", r#"update_format: "insert_delete""#);
809+
let zset = file_to_zset::<DeltaTestStruct>(json_file_filtered_by_ts.as_file_mut(), "json", r#"update_format: "insert_delete""#);
797810
assert_eq!(zset, expected_filtered_zset);
798811

799812

@@ -808,7 +821,7 @@ proptest! {
808821
/// Write to a Delta table in S3.
809822
#[cfg(feature = "delta-s3-test")]
810823
#[test]
811-
fn delta_table_s3_output_proptest(data in data(100_000))
824+
fn delta_table_s3_output_proptest(data in delta_data(20_000))
812825
{
813826
let uuid = uuid::Uuid::new_v4();
814827
let object_store_config = [
@@ -825,13 +838,13 @@ proptest! {
825838
delta_table_output_test(data.clone(), &table_uri, &object_store_config, false);
826839
//delta_table_output_test(data.clone(), &table_uri, &object_store_config, false);
827840

828-
let mut json_file = delta_table_snapshot_to_json::<TestStruct2>(
841+
let mut json_file = delta_table_snapshot_to_json::<DeltaTestStruct>(
829842
&table_uri,
830-
&TestStruct2::schema(),
843+
&DeltaTestStruct::schema(),
831844
&object_store_config);
832845

833846
let expected_zset = OrdZSet::from_tuples((), data.into_iter().map(|x| Tup2(Tup2(x,()),1)).collect());
834-
let zset = file_to_zset::<TestStruct2>(json_file.as_file_mut(), "json", r#"update_format: "insert_delete""#);
847+
let zset = file_to_zset::<DeltaTestStruct>(json_file.as_file_mut(), "json", r#"update_format: "insert_delete""#);
835848
assert_eq!(zset, expected_zset);
836849
}
837850
}

0 commit comments

Comments
 (0)