Skip to content

Commit ebda7af

Browse files
Leonid Ryzhykryzhyk
authored andcommitted
[dbsp] Fix broken add_input_map tests.
As explained in #2669, the use of GC with `add_input_map` is unsound. This caused random errors in three tests, two of them previously disabled. We now fix the tests by disabling the unsound functionality. Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
1 parent 4759d17 commit ebda7af

2 files changed

Lines changed: 32 additions & 26 deletions

File tree

crates/dbsp/src/operator/dynamic/input.rs

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -499,15 +499,21 @@ impl RootCircuit {
499499
/// key. Upsert/delete commands are routed to the worker in charge of
500500
/// the given key.
501501
///
502+
///
502503
/// # Data retention
503504
///
504-
/// Applying [`Stream::dyn_integrate_trace_retain_keys`],
505-
/// [`Stream::dyn_integrate_trace_retain_values`], and
506-
/// [`Stream::dyn_integrate_trace_with_bound`] methods to the stream has the
507-
/// additional effect of filtering out all values that don't satisfy the
508-
/// retention policy configured by these methods from the stream.
509-
/// Specifically, retention conditions configured at logical time `t`
505+
/// Applying the [`Stream::dyn_integrate_trace_retain_keys`] to the stream has the
506+
/// additional effect of filtering out all updates that don't satisfy the
507+
/// retention policy.
508+
/// In particular, this means that attempts to overwrite, update, or delete
509+
/// a key-value pair whose key doesn't satisfy current retention
510+
/// conditions are ignored, since all these operations involve deleting
511+
/// an existing tuple.
512+
///
513+
/// Retention conditions configured at logical time `t`
510514
/// are applied starting from logical time `t+1`.
515+
///
516+
/// FIXME: see <https://github.com/feldera/feldera/issues/2669>
511517
// TODO: Add a version that takes a custom hash function.
512518
pub fn dyn_add_input_map<K, V, U>(
513519
&self,
@@ -1387,11 +1393,11 @@ mod test {
13871393
indexed_zset! { 1 => {2 => -1}, 2 => {2 => 1}, 3 => {3 => -1, 4 => 1}, 4 => {5 => 1}},
13881394
indexed_zset! { 1 => {6 => 1}, 3 => {4 => -1}, 4 => {5 => -1, 6 => 1}},
13891395
indexed_zset! { 1 => {6 => -1, 100 => 1}},
1390-
indexed_zset! {},
1391-
indexed_zset! { 1 => {91 => 1, 100 => -1}},
1396+
indexed_zset! { 1 => { 100 => -1, 80 => 1 }},
1397+
indexed_zset! { 1 => {91 => 1, 80 => -1}},
13921398
indexed_zset! { 5 => {200 => 1}},
1393-
indexed_zset! {},
1394-
indexed_zset! { 5 => {191 => 1, 200 => -1}},
1399+
indexed_zset! { 5 => { 200 => -1, 91 => 1 }},
1400+
indexed_zset! { 5 => {191 => 1, 91 => -1}},
13951401
]
13961402
}
13971403
fn input_map_updates2() -> Vec<Vec<Tup2<u64, Update<u64, i64>>>> {
@@ -1489,9 +1495,9 @@ mod test {
14891495
indexed_zset! { 1 => {4 => -1} },
14901496
indexed_zset! { 1 => {1 => 1}, 2 => {5=>1} },
14911497
indexed_zset! { 3 => {5 => -1, 15 => 1} },
1492-
indexed_zset! { 2 => {5 => -1, 15 => 1} },
1493-
indexed_zset! {},
1494-
indexed_zset! {2 => {15 => -1, 10 => 1}, 4 => {15 => 1}},
1498+
indexed_zset! { 1 => {1 => -1, 11 => 1 } , 2 => {5 => -1, 15 => 1} },
1499+
indexed_zset! { 1 => {11 => -1}, 2 => { 15 => -1, 4 => 1}, 4 => { 1 => 1}},
1500+
indexed_zset! {2 => {4 => -1, 10 => 1}, 4 => {1 => -1, 4 => 1}},
14951501
indexed_zset! {},
14961502
]
14971503
}
@@ -1502,12 +1508,6 @@ mod test {
15021508
) -> AnyResult<MapHandle<u64, u64, i64>> {
15031509
let (stream, handle) =
15041510
circuit.add_input_map::<u64, u64, i64, _>(|v, u| *v = ((*v as i64) + u) as u64);
1505-
let watermark = stream.waterline(
1506-
|| (0, 0),
1507-
|k, v| (*k, *v),
1508-
|ts1, ts2| (max(ts1.0, ts2.0), max(ts1.1, ts2.1)),
1509-
);
1510-
stream.integrate_trace_retain_values(&watermark, |v, ts| *v >= ts.1.saturating_sub(10));
15111511

15121512
let mut expected_batches = expected_outputs().into_iter();
15131513

@@ -1520,6 +1520,10 @@ mod test {
15201520
Ok(handle)
15211521
}
15221522

1523+
// FIXME: the inputs to these tests are meant to exercise the logic that filters inputs based
1524+
// on lateness, but it does not currently work correctly (see https://github.com/feldera/feldera/issues/2669).
1525+
// We therefore don't use waterlines in tests and check for the standard upsert behavior
1526+
// without filtering.
15231527
#[test]
15241528
fn map_test_st() {
15251529
let (circuit, mut input_handle) =
@@ -1577,15 +1581,17 @@ mod test {
15771581
dbsp.kill().unwrap();
15781582
}
15791583

1584+
// FIXME: the inputs to these tests are meant to exercise the logic that filters inputs based
1585+
// on lateness, but it does not currently work correctly (see https://github.com/feldera/feldera/issues/2669).
1586+
// We therefore don't use waterlines in tests and check for the standard upsert behavior
1587+
// without filtering.
15801588
#[test]
1581-
#[ignore = "non-deterministic failures"]
15821589
fn map_test_mt1() {
15831590
map_test_mt(1, input_map_updates1, output_map_updates1);
15841591
map_test_mt(1, input_map_updates2, output_map_updates2);
15851592
}
15861593

15871594
#[test]
1588-
#[ignore = "non-deterministic failures"]
15891595
fn map_test_mt4() {
15901596
map_test_mt(4, input_map_updates1, output_map_updates1);
15911597
map_test_mt(4, input_map_updates2, output_map_updates2);

crates/dbsp/src/operator/input.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -428,18 +428,18 @@ impl RootCircuit {
428428
///
429429
/// # Data retention
430430
///
431-
/// Applying [`Stream::integrate_trace_retain_keys`],
432-
/// [`Stream::integrate_trace_retain_values`], and
433-
/// [`Stream::integrate_trace_with_bound`] methods to the stream has the
431+
/// Applying the [`Stream::integrate_trace_retain_keys`] to the stream has the
434432
/// additional effect of filtering out all updates that don't satisfy the
435-
/// retention policy configured by these methods from the stream.
433+
/// retention policy.
436434
/// In particular, this means that attempts to overwrite, update, or delete
437-
/// a key-value pair whose key or value don't satisfy current retention
435+
/// a key-value pair whose key doesn't satisfy current retention
438436
/// conditions are ignored, since all these operations involve deleting
439437
/// an existing tuple.
440438
///
441439
/// Retention conditions configured at logical time `t`
442440
/// are applied starting from logical time `t+1`.
441+
///
442+
/// FIXME: see <https://github.com/feldera/feldera/issues/2669>
443443
// TODO: Add a version that takes a custom hash function.
444444
pub fn add_input_map<K, V, U, PF>(
445445
&self,

0 commit comments

Comments
 (0)