@@ -4,8 +4,8 @@ use crate::format::parquet::test::load_parquet_file;
44use crate :: format:: relation_to_parquet_schema;
55use crate :: integrated:: delta_table:: register_storage_handlers;
66use crate :: test:: {
7- file_to_zset, list_files_recursive, test_circuit, wait, DatabricksPeople , MockDeZSet ,
8- MockUpdate , TestStruct2 ,
7+ file_to_zset, list_files_recursive, test_circuit, wait, DatabricksPeople , DeltaTestStruct ,
8+ MockDeZSet , MockUpdate ,
99} ;
1010use crate :: { Controller , ControllerError , InputFormat } ;
1111use anyhow:: anyhow;
@@ -25,7 +25,9 @@ use deltalake::operations::create::CreateBuilder;
2525use deltalake:: protocol:: SaveMode ;
2626use deltalake:: { DeltaOps , DeltaTable , DeltaTableBuilder } ;
2727use feldera_types:: config:: PipelineConfig ;
28+ use feldera_types:: format:: json:: JsonFlavor ;
2829use feldera_types:: program_schema:: { Field , Relation } ;
30+ use feldera_types:: serde_with_context:: serde_config:: DecimalFormat ;
2931use feldera_types:: serde_with_context:: serialize:: SerializeWithContextWrapper ;
3032use feldera_types:: serde_with_context:: {
3133 DateFormat , DeserializeWithContext , SerializeWithContext , SqlSerdeConfig , TimeFormat ,
@@ -60,6 +62,7 @@ use uuid::Uuid;
6062fn delta_output_serde_config ( ) -> SqlSerdeConfig {
6163 SqlSerdeConfig :: default ( )
6264 . with_date_format ( DateFormat :: String ( "%Y-%m-%d" ) )
65+ . with_decimal_format ( DecimalFormat :: String )
6366 // DeltaLake only supports microsecond-based timestamp encoding, so we just
6467 // hardwire that for now. See also `format/parquet/mod.rs`.
6568 . with_timestamp_format ( TimestampFormat :: MicrosSinceEpoch )
@@ -298,7 +301,7 @@ outputs:
298301 let config: PipelineConfig = serde_yaml:: from_str ( & config_str) . unwrap ( ) ;
299302
300303 Controller :: with_config (
301- |workers| Ok ( test_circuit :: < T > ( workers, & TestStruct2 :: schema ( ) ) ) ,
304+ |workers| Ok ( test_circuit :: < T > ( workers, & DeltaTestStruct :: schema ( ) ) ) ,
302305 & config,
303306 Box :: new ( move |e| panic ! ( "delta_to_delta pipeline: error: {e}" ) ) ,
304307 )
@@ -314,7 +317,7 @@ outputs:
314317/// by reading parquet files directly. I guess the best way to do this is
315318/// to build an input connector.
316319fn delta_table_output_test (
317- data : Vec < TestStruct2 > ,
320+ data : Vec < DeltaTestStruct > ,
318321 table_uri : & str ,
319322 object_store_config : & HashMap < String , String > ,
320323 verify : bool ,
@@ -330,8 +333,11 @@ fn delta_table_output_test(
330333 for v in data. iter ( ) {
331334 let buffer: Vec < u8 > = Vec :: new ( ) ;
332335 let mut serializer = serde_json:: Serializer :: new ( buffer) ;
333- v. serialize_with_context ( & mut serializer, & SqlSerdeConfig :: default ( ) )
334- . unwrap ( ) ;
336+ v. serialize_with_context (
337+ & mut serializer,
338+ & SqlSerdeConfig :: from ( JsonFlavor :: default ( ) ) ,
339+ )
340+ . unwrap ( ) ;
335341 input_file
336342 . as_file_mut ( )
337343 . write_all ( & serializer. into_inner ( ) )
@@ -385,7 +391,12 @@ outputs:
385391 let config: PipelineConfig = serde_yaml:: from_str ( & config_str) . unwrap ( ) ;
386392
387393 let controller = Controller :: with_config (
388- |workers| Ok ( test_circuit :: < TestStruct2 > ( workers, & TestStruct2 :: schema ( ) ) ) ,
394+ |workers| {
395+ Ok ( test_circuit :: < DeltaTestStruct > (
396+ workers,
397+ & DeltaTestStruct :: schema ( ) ,
398+ ) )
399+ } ,
389400 & config,
390401 Box :: new ( move |e| panic ! ( "delta_table_output_test: error: {e}" ) ) ,
391402 )
@@ -404,7 +415,7 @@ outputs:
404415
405416 let mut output_records = Vec :: with_capacity ( data. len ( ) ) ;
406417 for parquet_file in parquet_files {
407- let mut records: Vec < TestStruct2 > = load_parquet_file ( & parquet_file) ;
418+ let mut records: Vec < DeltaTestStruct > = load_parquet_file ( & parquet_file) ;
408419 output_records. append ( & mut records) ;
409420 }
410421
@@ -453,7 +464,7 @@ async fn test_follow(
453464 input_table_uri : & str ,
454465 output_table_uri : & str ,
455466 storage_options : & HashMap < String , String > ,
456- data : Vec < TestStruct2 > ,
467+ data : Vec < DeltaTestStruct > ,
457468 snapshot : bool ,
458469 buffer_size : u64 ,
459470 buffer_timeout_ms : u64 ,
@@ -504,7 +515,7 @@ async fn test_follow(
504515 let output_table_uri_clone = output_table_uri. to_string ( ) ;
505516
506517 let pipeline = tokio:: task:: spawn_blocking ( move || {
507- delta_to_delta_pipeline :: < TestStruct2 > (
518+ delta_to_delta_pipeline :: < DeltaTestStruct > (
508519 & input_table_uri_clone,
509520 & input_config,
510521 & output_table_uri_clone,
@@ -582,12 +593,12 @@ async fn test_follow(
582593}
583594
584595/// Generate up to `max_records` _unique_ records.
585- fn data ( max_records : usize ) -> impl Strategy < Value = Vec < TestStruct2 > > {
586- vec ( TestStruct2 :: arbitrary ( ) , 0 ..max_records) . prop_map ( |vec| {
596+ fn delta_data ( max_records : usize ) -> impl Strategy < Value = Vec < DeltaTestStruct > > {
597+ vec ( DeltaTestStruct :: arbitrary ( ) , 0 ..max_records) . prop_map ( |vec| {
587598 let mut idx = 0 ;
588599 vec. into_iter ( )
589600 . map ( |mut x| {
590- x. field = idx;
601+ x. bigint = idx;
591602 idx += 1 ;
592603 x
593604 } )
@@ -599,9 +610,9 @@ async fn delta_table_follow_file_test_common(snapshot: bool) {
599610 // We cannot use proptest macros in `async` context, so generate
600611 // some random data manually.
601612 let mut runner = TestRunner :: default ( ) ;
602- let data = data ( 100_000 ) . new_tree ( & mut runner) . unwrap ( ) . current ( ) ;
613+ let data = delta_data ( 20_000 ) . new_tree ( & mut runner) . unwrap ( ) . current ( ) ;
603614
604- let relation_schema = TestStruct2 :: schema ( ) ;
615+ let relation_schema = DeltaTestStruct :: schema ( ) ;
605616
606617 let input_table_dir = TempDir :: new ( ) . unwrap ( ) ;
607618 let input_table_uri = input_table_dir. path ( ) . display ( ) . to_string ( ) ;
@@ -639,9 +650,9 @@ async fn delta_table_follow_s3_test_common(snapshot: bool) {
639650 // We cannot use proptest macros in `async` context, so generate
640651 // some random data manually.
641652 let mut runner = TestRunner :: default ( ) ;
642- let data = data ( 100_000 ) . new_tree ( & mut runner) . unwrap ( ) . current ( ) ;
653+ let data = delta_data ( 20_000 ) . new_tree ( & mut runner) . unwrap ( ) . current ( ) ;
643654
644- let relation_schema = TestStruct2 :: schema ( ) ;
655+ let relation_schema = DeltaTestStruct :: schema ( ) ;
645656
646657 let input_uuid = uuid:: Uuid :: new_v4 ( ) ;
647658 let output_uuid = uuid:: Uuid :: new_v4 ( ) ;
@@ -691,95 +702,97 @@ async fn delta_table_snapshot_and_follow_s3_test() {
691702}
692703
693704proptest ! {
694- #![ proptest_config( ProptestConfig :: with_cases( 2 ) ) ]
705+ #![ proptest_config( ProptestConfig :: with_cases( 1 ) ) ]
695706
696707 /// ```text
697708 /// input.json --> [pipeline1]--->delta_table-->[pipeline2]-->output.json
698709 /// ```
699710 #[ test]
700- fn delta_table_file_output_proptest( data in data ( 100_000 ) )
711+ fn delta_table_file_output_proptest( data in delta_data ( 20_000 ) )
701712 {
702713 let table_dir = TempDir :: new( ) . unwrap( ) ;
703714 let table_uri = table_dir. path( ) . display( ) . to_string( ) ;
715+
716+ // Uncomment to inspect output parquet files produced by the test.
717+ forget( table_dir) ;
718+
704719 delta_table_output_test( data. clone( ) , & table_uri, & HashMap :: new( ) , true ) ;
705720
706- // // Uncomment to inspect output parquet files produced by the test.
707- // forget(table_dir);
708721
709722 // Read delta table unordered.
710- let mut json_file = delta_table_snapshot_to_json:: <TestStruct2 >(
723+ let mut json_file = delta_table_snapshot_to_json:: <DeltaTestStruct >(
711724 & table_uri,
712- & TestStruct2 :: schema( ) ,
725+ & DeltaTestStruct :: schema( ) ,
713726 & HashMap :: new( ) ) ;
714727
715728 let expected_zset = OrdZSet :: from_tuples( ( ) , data. clone( ) . into_iter( ) . map( |x| Tup2 ( Tup2 ( x, ( ) ) , 1 ) ) . collect( ) ) ;
716- let zset = file_to_zset:: <TestStruct2 >( json_file. as_file_mut( ) , "json" , r#"update_format: "insert_delete""# ) ;
729+ let zset = file_to_zset:: <DeltaTestStruct >( json_file. as_file_mut( ) , "json" , r#"update_format: "insert_delete""# ) ;
717730 assert_eq!( zset, expected_zset) ;
718731
719- // Order delta table by `id ` (which should be its natural order).
720- let mut json_file_ordered_by_id = delta_table_snapshot_to_json:: <TestStruct2 >(
732+ // Order delta table by `bigint ` (which should be its natural order).
733+ let mut json_file_ordered_by_id = delta_table_snapshot_to_json:: <DeltaTestStruct >(
721734 & table_uri,
722- & TestStruct2 :: schema_with_lateness( ) ,
723- & HashMap :: from( [ ( "timestamp_column" . to_string( ) , "id " . to_string( ) ) ] ) ) ;
735+ & DeltaTestStruct :: schema_with_lateness( ) ,
736+ & HashMap :: from( [ ( "timestamp_column" . to_string( ) , "bigint " . to_string( ) ) ] ) ) ;
724737
725- let zset = file_to_zset:: <TestStruct2 >( json_file_ordered_by_id. as_file_mut( ) , "json" , r#"update_format: "insert_delete""# ) ;
738+ let zset = file_to_zset:: <DeltaTestStruct >( json_file_ordered_by_id. as_file_mut( ) , "json" , r#"update_format: "insert_delete""# ) ;
726739 assert_eq!( zset, expected_zset) ;
727740
728- // Order delta table by `id `, specify id range.
729- let mut json_file_ordered_filtered = delta_table_snapshot_to_json:: <TestStruct2 >(
741+ // Order delta table by `bigint `, specify range.
742+ let mut json_file_ordered_filtered = delta_table_snapshot_to_json:: <DeltaTestStruct >(
730743 & table_uri,
731- & TestStruct2 :: schema_with_lateness( ) ,
732- & HashMap :: from( [ ( "timestamp_column" . to_string( ) , "id " . to_string( ) ) , ( "snapshot_filter" . to_string( ) , "id >= 10000 " . to_string( ) ) ] ) ) ;
744+ & DeltaTestStruct :: schema_with_lateness( ) ,
745+ & HashMap :: from( [ ( "timestamp_column" . to_string( ) , "bigint " . to_string( ) ) , ( "snapshot_filter" . to_string( ) , "bigint >= 10000 " . to_string( ) ) ] ) ) ;
733746
734747 let expected_filtered_zset = OrdZSet :: from_tuples(
735748 ( ) ,
736749 data. clone( ) . into_iter( )
737- . filter( |x| x. field >= 10000 )
750+ . filter( |x| x. bigint >= 10000 )
738751 . map( |x| Tup2 ( Tup2 ( x, ( ) ) , 1 ) ) . collect( )
739752 ) ;
740753
741- let zset = file_to_zset:: <TestStruct2 >( json_file_ordered_filtered. as_file_mut( ) , "json" , r#"update_format: "insert_delete""# ) ;
754+ let zset = file_to_zset:: <DeltaTestStruct >( json_file_ordered_filtered. as_file_mut( ) , "json" , r#"update_format: "insert_delete""# ) ;
742755 assert_eq!( zset, expected_filtered_zset) ;
743756
744757 // Order delta table by `timestamp`.
745- let mut json_file_ordered_by_ts = delta_table_snapshot_to_json:: <TestStruct2 >(
758+ let mut json_file_ordered_by_ts = delta_table_snapshot_to_json:: <DeltaTestStruct >(
746759 & table_uri,
747- & TestStruct2 :: schema_with_lateness( ) ,
748- & HashMap :: from( [ ( "timestamp_column" . to_string( ) , "ts " . to_string( ) ) ] ) ) ;
760+ & DeltaTestStruct :: schema_with_lateness( ) ,
761+ & HashMap :: from( [ ( "timestamp_column" . to_string( ) , "timestamp_ntz " . to_string( ) ) ] ) ) ;
749762
750- let zset = file_to_zset:: <TestStruct2 >( json_file_ordered_by_ts. as_file_mut( ) , "json" , r#"update_format: "insert_delete""# ) ;
763+ let zset = file_to_zset:: <DeltaTestStruct >( json_file_ordered_by_ts. as_file_mut( ) , "json" , r#"update_format: "insert_delete""# ) ;
751764 assert_eq!( zset, expected_zset) ;
752765
753766 // Order delta table by `timestamp`; specify an empty filter condition
754- let mut json_file_ordered_by_ts = delta_table_snapshot_to_json:: <TestStruct2 >(
767+ let mut json_file_ordered_by_ts = delta_table_snapshot_to_json:: <DeltaTestStruct >(
755768 & table_uri,
756- & TestStruct2 :: schema_with_lateness( ) ,
757- & HashMap :: from( [ ( "timestamp_column" . to_string( ) , "ts " . to_string( ) ) , ( "snapshot_filter" . to_string( ) , "ts < timestamp '2005-01-01T00:00:00'" . to_string( ) ) ] ) ) ;
769+ & DeltaTestStruct :: schema_with_lateness( ) ,
770+ & HashMap :: from( [ ( "timestamp_column" . to_string( ) , "timestamp_ntz " . to_string( ) ) , ( "snapshot_filter" . to_string( ) , "timestamp_ntz < timestamp '2005-01-01T00:00:00'" . to_string( ) ) ] ) ) ;
758771
759- let zset = file_to_zset:: <TestStruct2 >( json_file_ordered_by_ts. as_file_mut( ) , "json" , r#"update_format: "insert_delete""# ) ;
772+ let zset = file_to_zset:: <DeltaTestStruct >( json_file_ordered_by_ts. as_file_mut( ) , "json" , r#"update_format: "insert_delete""# ) ;
760773 assert_eq!( zset, OrdZSet :: empty( ) ) ;
761774
762775 // Filter delta table by id
763- let mut json_file_filtered_by_id = delta_table_snapshot_to_json:: <TestStruct2 >(
776+ let mut json_file_filtered_by_id = delta_table_snapshot_to_json:: <DeltaTestStruct >(
764777 & table_uri,
765- & TestStruct2 :: schema( ) ,
766- & HashMap :: from( [ ( "snapshot_filter" . to_string( ) , "id >= 10000 " . to_string( ) ) ] ) ) ;
778+ & DeltaTestStruct :: schema( ) ,
779+ & HashMap :: from( [ ( "snapshot_filter" . to_string( ) , "bigint >= 10000 " . to_string( ) ) ] ) ) ;
767780
768781 let expected_filtered_zset = OrdZSet :: from_tuples(
769782 ( ) ,
770783 data. clone( ) . into_iter( )
771- . filter( |x| x. field >= 10000 )
784+ . filter( |x| x. bigint >= 10000 )
772785 . map( |x| Tup2 ( Tup2 ( x, ( ) ) , 1 ) ) . collect( )
773786 ) ;
774787
775- let zset = file_to_zset:: <TestStruct2 >( json_file_filtered_by_id. as_file_mut( ) , "json" , r#"update_format: "insert_delete""# ) ;
788+ let zset = file_to_zset:: <DeltaTestStruct >( json_file_filtered_by_id. as_file_mut( ) , "json" , r#"update_format: "insert_delete""# ) ;
776789 assert_eq!( zset, expected_filtered_zset) ;
777790
778791 // Filter delta table by timestamp.
779- let mut json_file_filtered_by_ts = delta_table_snapshot_to_json:: <TestStruct2 >(
792+ let mut json_file_filtered_by_ts = delta_table_snapshot_to_json:: <DeltaTestStruct >(
780793 & table_uri,
781- & TestStruct2 :: schema( ) ,
782- & HashMap :: from( [ ( "snapshot_filter" . to_string( ) , "ts >= '2005-01-01 00:00:00'" . to_string( ) ) ] ) ) ;
794+ & DeltaTestStruct :: schema( ) ,
795+ & HashMap :: from( [ ( "snapshot_filter" . to_string( ) , "timestamp_ntz >= '2005-01-01 00:00:00'" . to_string( ) ) ] ) ) ;
783796
784797 let start = NaiveDate :: from_ymd_opt( 2005 , 1 , 1 )
785798 . unwrap( )
@@ -789,11 +802,11 @@ proptest! {
789802 let expected_filtered_zset = OrdZSet :: from_tuples(
790803 ( ) ,
791804 data. into_iter( )
792- . filter( |x| x. field_2 . milliseconds( ) >= start. and_utc( ) . timestamp_millis( ) )
805+ . filter( |x| x. timestamp_ntz . milliseconds( ) >= start. and_utc( ) . timestamp_millis( ) )
793806 . map( |x| Tup2 ( Tup2 ( x, ( ) ) , 1 ) ) . collect( )
794807 ) ;
795808
796- let zset = file_to_zset:: <TestStruct2 >( json_file_filtered_by_ts. as_file_mut( ) , "json" , r#"update_format: "insert_delete""# ) ;
809+ let zset = file_to_zset:: <DeltaTestStruct >( json_file_filtered_by_ts. as_file_mut( ) , "json" , r#"update_format: "insert_delete""# ) ;
797810 assert_eq!( zset, expected_filtered_zset) ;
798811
799812
@@ -808,7 +821,7 @@ proptest! {
808821 /// Write to a Delta table in S3.
809822 #[ cfg( feature = "delta-s3-test" ) ]
810823 #[ test]
811- fn delta_table_s3_output_proptest( data in data ( 100_000 ) )
824+ fn delta_table_s3_output_proptest( data in delta_data ( 20_000 ) )
812825 {
813826 let uuid = uuid:: Uuid :: new_v4( ) ;
814827 let object_store_config = [
@@ -825,13 +838,13 @@ proptest! {
825838 delta_table_output_test( data. clone( ) , & table_uri, & object_store_config, false ) ;
826839 //delta_table_output_test(data.clone(), &table_uri, &object_store_config, false);
827840
828- let mut json_file = delta_table_snapshot_to_json:: <TestStruct2 >(
841+ let mut json_file = delta_table_snapshot_to_json:: <DeltaTestStruct >(
829842 & table_uri,
830- & TestStruct2 :: schema( ) ,
843+ & DeltaTestStruct :: schema( ) ,
831844 & object_store_config) ;
832845
833846 let expected_zset = OrdZSet :: from_tuples( ( ) , data. into_iter( ) . map( |x| Tup2 ( Tup2 ( x, ( ) ) , 1 ) ) . collect( ) ) ;
834- let zset = file_to_zset:: <TestStruct2 >( json_file. as_file_mut( ) , "json" , r#"update_format: "insert_delete""# ) ;
847+ let zset = file_to_zset:: <DeltaTestStruct >( json_file. as_file_mut( ) , "json" , r#"update_format: "insert_delete""# ) ;
835848 assert_eq!( zset, expected_zset) ;
836849 }
837850}
0 commit comments