@@ -182,7 +182,7 @@ fn debezium_avro_schema(value_schema: &str, value_type_name: &str) -> AvroSchema
182182 ],
183183 "connect.version": 1,
184184 "connect.name": "test_namespace.Envelope"
185- }"# . replace ( "VALUE_SCHEMA" , & value_schema) . replace ( "VALUE_TYPE" , value_type_name) ;
185+ }"# . replace ( "VALUE_SCHEMA" , value_schema) . replace ( "VALUE_TYPE" , value_type_name) ;
186186
187187 println ! ( "Debezium Avro schema: {schema_str}" ) ;
188188
@@ -202,11 +202,11 @@ where
202202 // 5-byte header
203203 let mut buffer = vec ! [ 0 ; 5 ] ;
204204 let refs = HashMap :: new ( ) ;
205- let serializer = AvroSchemaSerializer :: new ( & schema, & refs, false ) ;
205+ let serializer = AvroSchemaSerializer :: new ( schema, & refs, false ) ;
206206 let val = x
207207 . serialize_with_context ( serializer, & avro_ser_config ( ) )
208208 . unwrap ( ) ;
209- let mut avro_record = to_avro_datum ( & schema, val) . unwrap ( ) ;
209+ let mut avro_record = to_avro_datum ( schema, val) . unwrap ( ) ;
210210 buffer. append ( & mut avro_record) ;
211211 buffer
212212}
@@ -301,8 +301,7 @@ where
301301
302302 let expected_output = data
303303 . iter ( )
304- . map ( |x| vec ! [ MockUpdate :: Delete ( x. clone( ) ) , MockUpdate :: Insert ( x. clone( ) ) ] )
305- . flatten ( )
304+ . flat_map ( |x| vec ! [ MockUpdate :: Delete ( x. clone( ) ) , MockUpdate :: Insert ( x. clone( ) ) ] )
306305 . collect :: < Vec < _ > > ( ) ;
307306
308307 TestCase {
@@ -341,7 +340,7 @@ fn test_raw_avro_parser() {
341340 let test_case = gen_raw_parser_test (
342341 & TestStruct2 :: data ( ) ,
343342 & TestStruct2 :: relation_schema ( ) ,
344- & TestStruct2 :: avro_schema ( ) ,
343+ TestStruct2 :: avro_schema ( ) ,
345344 ) ;
346345
347346 run_parser_test ( vec ! [ test_case] )
@@ -352,7 +351,7 @@ fn test_debezium_avro_parser() {
352351 let test_case = gen_debezium_parser_test (
353352 & TestStruct2 :: data ( ) ,
354353 & TestStruct2 :: relation_schema ( ) ,
355- & TestStruct2 :: avro_schema ( ) ,
354+ TestStruct2 :: avro_schema ( ) ,
356355 "TestStruct2" ,
357356 ) ;
358357
@@ -393,7 +392,7 @@ fn test_extra_columns() {
393392 ]
394393 }"# ;
395394
396- let schema = AvroSchema :: parse_str ( & schema_str) . unwrap ( ) ;
395+ let schema = AvroSchema :: parse_str ( schema_str) . unwrap ( ) ;
397396 let vals = TestStruct2 :: data ( ) ;
398397 let input_batches = vals
399398 . iter ( )
@@ -460,8 +459,8 @@ fn test_non_null_to_nullable() {
460459 ]
461460 }"# ;
462461
463- let schema = AvroSchema :: parse_str ( & schema_str) . unwrap ( ) ;
464- let vals = vec ! [ TestStruct2 {
462+ let schema = AvroSchema :: parse_str ( schema_str) . unwrap ( ) ;
463+ let vals = [ TestStruct2 {
465464 field : 1 ,
466465 field_0 : Some ( "test" . to_string ( ) ) ,
467466 ..Default :: default ( )
@@ -526,7 +525,7 @@ fn test_ms_time() {
526525 ]
527526 }"# ;
528527
529- let schema = AvroSchema :: parse_str ( & schema_str) . unwrap ( ) ;
528+ let schema = AvroSchema :: parse_str ( schema_str) . unwrap ( ) ;
530529 let vals = TestStruct2 :: data ( ) ;
531530 let input_batches = vals
532531 . iter ( )
@@ -558,15 +557,15 @@ proptest! {
558557 #[ test]
559558 fn proptest_raw_avro_parser( data in proptest:: collection:: vec( any:: <TestStruct2 >( ) , 0 ..=10000 ) )
560559 {
561- let test_case = gen_raw_parser_test( & data, & TestStruct2 :: relation_schema( ) , & TestStruct2 :: avro_schema( ) ) ;
560+ let test_case = gen_raw_parser_test( & data, & TestStruct2 :: relation_schema( ) , TestStruct2 :: avro_schema( ) ) ;
562561
563562 run_parser_test( vec![ test_case] )
564563 }
565564
566565 #[ test]
567566 fn proptest_debezium_avro_parser( data in proptest:: collection:: vec( any:: <TestStruct2 >( ) , 0 ..=10000 ) )
568567 {
569- let test_case = gen_debezium_parser_test( & data, & TestStruct2 :: relation_schema( ) , & TestStruct2 :: avro_schema( ) , "TestStruct2" ) ;
568+ let test_case = gen_debezium_parser_test( & data, & TestStruct2 :: relation_schema( ) , TestStruct2 :: avro_schema( ) , "TestStruct2" ) ;
570569
571570 run_parser_test( vec![ test_case] )
572571 }
@@ -672,14 +671,13 @@ fn test_confluent_avro_output<K, V, KF>(
672671 let ( expected_inserts, expected_deletes) : ( Vec < _ > , Vec < _ > ) = batches
673672 . concat ( )
674673 . into_iter ( )
675- . map ( |Tup2 ( v, w) | {
674+ . flat_map ( |Tup2 ( v, w) | {
676675 if w > 0 {
677676 repeat ( Tup2 ( v. clone ( ) , 1 ) ) . take ( w as usize )
678677 } else {
679678 repeat ( Tup2 ( v. clone ( ) , -1 ) ) . take ( -w as usize )
680679 }
681680 } )
682- . flatten ( )
683681 . partition ( |Tup2 ( _, w) | * w > 0 ) ;
684682 let expected_deletes = expected_deletes
685683 . into_iter ( )
0 commit comments