@@ -499,15 +499,21 @@ impl RootCircuit {
499499 /// key. Upsert/delete commands are routed to the worker in charge of
500500 /// the given key.
501501 ///
502+ ///
502503 /// # Data retention
503504 ///
504- /// Applying [`Stream::dyn_integrate_trace_retain_keys`],
505- /// [`Stream::dyn_integrate_trace_retain_values`], and
506- /// [`Stream::dyn_integrate_trace_with_bound`] methods to the stream has the
507- /// additional effect of filtering out all values that don't satisfy the
508- /// retention policy configured by these methods from the stream.
509- /// Specifically, retention conditions configured at logical time `t`
505+ /// Applying the [`Stream::dyn_integrate_trace_retain_keys`] to the stream has the
506+ /// additional effect of filtering out all updates that don't satisfy the
507+ /// retention policy.
508+ /// In particular, this means that attempts to overwrite, update, or delete
509+ /// a key-value pair whose key doesn't satisfy current retention
510+ /// conditions are ignored, since all these operations involve deleting
511+ /// an existing tuple.
512+ ///
513+ /// Retention conditions configured at logical time `t`
510514 /// are applied starting from logical time `t+1`.
515+ ///
516+ /// FIXME: see <https://github.com/feldera/feldera/issues/2669>
511517 // TODO: Add a version that takes a custom hash function.
512518 pub fn dyn_add_input_map < K , V , U > (
513519 & self ,
@@ -1387,11 +1393,11 @@ mod test {
13871393 indexed_zset! { 1 => { 2 => -1 } , 2 => { 2 => 1 } , 3 => { 3 => -1 , 4 => 1 } , 4 => { 5 => 1 } } ,
13881394 indexed_zset! { 1 => { 6 => 1 } , 3 => { 4 => -1 } , 4 => { 5 => -1 , 6 => 1 } } ,
13891395 indexed_zset! { 1 => { 6 => -1 , 100 => 1 } } ,
1390- indexed_zset! { } ,
1391- indexed_zset! { 1 => { 91 => 1 , 100 => -1 } } ,
1396+ indexed_zset! { 1 => { 100 => - 1 , 80 => 1 } } ,
1397+ indexed_zset! { 1 => { 91 => 1 , 80 => -1 } } ,
13921398 indexed_zset! { 5 => { 200 => 1 } } ,
1393- indexed_zset! { } ,
1394- indexed_zset! { 5 => { 191 => 1 , 200 => -1 } } ,
1399+ indexed_zset! { 5 => { 200 => - 1 , 91 => 1 } } ,
1400+ indexed_zset! { 5 => { 191 => 1 , 91 => -1 } } ,
13951401 ]
13961402 }
13971403 fn input_map_updates2 ( ) -> Vec < Vec < Tup2 < u64 , Update < u64 , i64 > > > > {
@@ -1489,9 +1495,9 @@ mod test {
14891495 indexed_zset! { 1 => { 4 => -1 } } ,
14901496 indexed_zset! { 1 => { 1 => 1 } , 2 => { 5 =>1 } } ,
14911497 indexed_zset! { 3 => { 5 => -1 , 15 => 1 } } ,
1492- indexed_zset! { 2 => { 5 => -1 , 15 => 1 } } ,
1493- indexed_zset! { } ,
1494- indexed_zset! { 2 => { 15 => -1 , 10 => 1 } , 4 => { 15 => 1 } } ,
1498+ indexed_zset! { 1 => { 1 => - 1 , 11 => 1 } , 2 => { 5 => -1 , 15 => 1 } } ,
1499+ indexed_zset! { 1 => { 11 => - 1 } , 2 => { 15 => - 1 , 4 => 1 } , 4 => { 1 => 1 } } ,
1500+ indexed_zset! { 2 => { 4 => -1 , 10 => 1 } , 4 => { 1 => - 1 , 4 => 1 } } ,
14951501 indexed_zset! { } ,
14961502 ]
14971503 }
@@ -1502,12 +1508,6 @@ mod test {
15021508 ) -> AnyResult < MapHandle < u64 , u64 , i64 > > {
15031509 let ( stream, handle) =
15041510 circuit. add_input_map :: < u64 , u64 , i64 , _ > ( |v, u| * v = ( ( * v as i64 ) + u) as u64 ) ;
1505- let watermark = stream. waterline (
1506- || ( 0 , 0 ) ,
1507- |k, v| ( * k, * v) ,
1508- |ts1, ts2| ( max ( ts1. 0 , ts2. 0 ) , max ( ts1. 1 , ts2. 1 ) ) ,
1509- ) ;
1510- stream. integrate_trace_retain_values ( & watermark, |v, ts| * v >= ts. 1 . saturating_sub ( 10 ) ) ;
15111511
15121512 let mut expected_batches = expected_outputs ( ) . into_iter ( ) ;
15131513
@@ -1520,6 +1520,10 @@ mod test {
15201520 Ok ( handle)
15211521 }
15221522
1523+ // FIXME: the inputs to these tests are meant to exercise the logic that filters inputs based
1524+ // on lateness, but it does not currently work correctly (see https://github.com/feldera/feldera/issues/2669).
1525+ // We therefore don't use waterlines in tests and check for the standard upsert behavior
1526+ // without filtering.
15231527 #[ test]
15241528 fn map_test_st ( ) {
15251529 let ( circuit, mut input_handle) =
@@ -1577,15 +1581,17 @@ mod test {
15771581 dbsp. kill ( ) . unwrap ( ) ;
15781582 }
15791583
1584+ // FIXME: the inputs to these tests are meant to exercise the logic that filters inputs based
1585+ // on lateness, but it does not currently work correctly (see https://github.com/feldera/feldera/issues/2669).
1586+ // We therefore don't use waterlines in tests and check for the standard upsert behavior
1587+ // without filtering.
15801588 #[ test]
1581- #[ ignore = "non-deterministic failures" ]
15821589 fn map_test_mt1 ( ) {
15831590 map_test_mt ( 1 , input_map_updates1, output_map_updates1) ;
15841591 map_test_mt ( 1 , input_map_updates2, output_map_updates2) ;
15851592 }
15861593
15871594 #[ test]
1588- #[ ignore = "non-deterministic failures" ]
15891595 fn map_test_mt4 ( ) {
15901596 map_test_mt ( 4 , input_map_updates1, output_map_updates1) ;
15911597 map_test_mt ( 4 , input_map_updates2, output_map_updates2) ;
0 commit comments