@@ -1378,7 +1378,8 @@ mod test {
13781378 dynamic:: { DowncastTrait , DynData , Erase } ,
13791379 indexed_zset,
13801380 operator:: {
1381- IndexedZSetHandle , MapHandle , SetHandle , Update , ZSetHandle , input:: InputHandle ,
1381+ IndexedZSetHandle , MapHandle , SetHandle , StagedBuffers , Update , ZSetHandle ,
1382+ input:: InputHandle ,
13821383 } ,
13831384 trace:: { BatchReaderFactories , Builder , Cursor } ,
13841385 typed_batch:: {
@@ -1389,7 +1390,7 @@ mod test {
13891390 zset,
13901391 } ;
13911392 use anyhow:: Result as AnyResult ;
1392- use std:: { cmp:: max, iter:: once, ops:: Mul } ;
1393+ use std:: { cmp:: max, collections :: VecDeque , iter:: once, ops:: Mul } ;
13931394
13941395 fn input_batches ( ) -> Vec < OrdZSet < u64 > > {
13951396 vec ! [
@@ -2031,6 +2032,92 @@ mod test {
20312032 map_test_mt ( 4 , input_map_updates2, output_map_updates2) ;
20322033 }
20332034
2035+ /// There was a bug in InputUpsert where if within the same step the operator received two
2036+ /// vectors with updates, where the first vector contained a key that was deleted and the second
2037+ /// vector contained the same key that was inserted, the output would be incorrect because we
2038+ /// reordered the insert and the delete.
2039+ #[ test]
2040+ fn map_reinsert_within_step_accumulate_output ( ) {
2041+ let ( mut dbsp, ( input_handle, output_handle) ) = Runtime :: init_circuit ( 1 , |circuit| {
2042+ let ( stream, handle) =
2043+ circuit. add_input_map :: < u64 , u64 , i64 , _ > ( |v, u| * v = ( ( * v as i64 ) + u) as u64 ) ;
2044+ Ok ( ( handle, stream. accumulate_output ( ) ) )
2045+ } )
2046+ . unwrap ( ) ;
2047+
2048+ // Seed the map with a few keys.
2049+ let initial_batch = vec ! [
2050+ Tup2 ( 1 , Update :: Insert ( 10 ) ) ,
2051+ Tup2 ( 2 , Update :: Insert ( 20 ) ) ,
2052+ Tup2 ( 3 , Update :: Insert ( 30 ) ) ,
2053+ ] ;
2054+ // Use stage instead of append to make sure the updates don't get merged in a single vector.
2055+ input_handle
2056+ . stage ( vec ! [ VecDeque :: from( initial_batch) ] )
2057+ . flush ( ) ;
2058+ dbsp. transaction ( ) . unwrap ( ) ;
2059+ assert_eq ! (
2060+ output_handle. concat( ) . consolidate( ) ,
2061+ indexed_zset! { 1 => { 10 => 1 } , 2 => { 20 => 1 } , 3 => { 30 => 1 } }
2062+ ) ;
2063+
2064+ // Step 1:
2065+ // - first batch deletes existing keys
2066+ // - second batch reinserts them and adds one extra key
2067+ let delete_batch_1 = vec ! [
2068+ Tup2 ( 1 , Update :: Delete ) ,
2069+ Tup2 ( 2 , Update :: Delete ) ,
2070+ Tup2 ( 3 , Update :: Delete ) ,
2071+ ] ;
2072+ input_handle
2073+ . stage ( vec ! [ VecDeque :: from( delete_batch_1) ] )
2074+ . flush ( ) ;
2075+ let reinsert_batch_1 = vec ! [
2076+ Tup2 ( 1 , Update :: Insert ( 10 ) ) ,
2077+ Tup2 ( 2 , Update :: Insert ( 20 ) ) ,
2078+ Tup2 ( 3 , Update :: Insert ( 30 ) ) ,
2079+ Tup2 ( 4 , Update :: Insert ( 40 ) ) ,
2080+ ] ;
2081+ input_handle
2082+ . stage ( vec ! [ VecDeque :: from( reinsert_batch_1) ] )
2083+ . flush ( ) ;
2084+ dbsp. transaction ( ) . unwrap ( ) ;
2085+ assert_eq ! (
2086+ output_handle. concat( ) . consolidate( ) ,
2087+ indexed_zset! { 4 => { 40 => 1 } }
2088+ ) ;
2089+
2090+ // Step 2: repeat with one more additional key.
2091+ let delete_batch_2 = vec ! [
2092+ Tup2 ( 1 , Update :: Delete ) ,
2093+ Tup2 ( 2 , Update :: Delete ) ,
2094+ Tup2 ( 3 , Update :: Delete ) ,
2095+ Tup2 ( 4 , Update :: Delete ) ,
2096+ ] ;
2097+ input_handle
2098+ . stage ( vec ! [ VecDeque :: from( delete_batch_2) ] )
2099+ . flush ( ) ;
2100+ let reinsert_batch_2 = vec ! [
2101+ Tup2 ( 1 , Update :: Insert ( 10 ) ) ,
2102+ Tup2 ( 2 , Update :: Insert ( 20 ) ) ,
2103+ Tup2 ( 3 , Update :: Insert ( 30 ) ) ,
2104+ Tup2 ( 4 , Update :: Insert ( 40 ) ) ,
2105+ Tup2 ( 5 , Update :: Insert ( 50 ) ) ,
2106+ ] ;
2107+ input_handle
2108+ . stage ( vec ! [ VecDeque :: from( reinsert_batch_2) ] )
2109+ . flush ( ) ;
2110+ dbsp. transaction ( ) . unwrap ( ) ;
2111+ assert_eq ! (
2112+ output_handle. concat( ) . consolidate( ) ,
2113+ indexed_zset! {
2114+ 5 => { 50 => 1 }
2115+ }
2116+ ) ;
2117+
2118+ dbsp. kill ( ) . unwrap ( ) ;
2119+ }
2120+
20342121 fn map_with_waterline_test_circuit (
20352122 circuit : & RootCircuit ,
20362123 ) -> (
0 commit comments