Skip to content

Commit f9384a7

Browse files
committed
[dbsp] Fix a bug in InputUpsert.
InputUpsert worked incorrectly when updates arrived in multiple batches within a step. In particular when the same key occurs as a delete in the first batch, and an insert in the second batch, we'd apply them in the wrong order. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent 824bb15 commit f9384a7

2 files changed

Lines changed: 92 additions & 3 deletions

File tree

crates/dbsp/src/operator/dynamic/input.rs

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1378,7 +1378,8 @@ mod test {
13781378
dynamic::{DowncastTrait, DynData, Erase},
13791379
indexed_zset,
13801380
operator::{
1381-
IndexedZSetHandle, MapHandle, SetHandle, Update, ZSetHandle, input::InputHandle,
1381+
IndexedZSetHandle, MapHandle, SetHandle, StagedBuffers, Update, ZSetHandle,
1382+
input::InputHandle,
13821383
},
13831384
trace::{BatchReaderFactories, Builder, Cursor},
13841385
typed_batch::{
@@ -1389,7 +1390,7 @@ mod test {
13891390
zset,
13901391
};
13911392
use anyhow::Result as AnyResult;
1392-
use std::{cmp::max, iter::once, ops::Mul};
1393+
use std::{cmp::max, collections::VecDeque, iter::once, ops::Mul};
13931394

13941395
fn input_batches() -> Vec<OrdZSet<u64>> {
13951396
vec![
@@ -2031,6 +2032,92 @@ mod test {
20312032
map_test_mt(4, input_map_updates2, output_map_updates2);
20322033
}
20332034

2035+
/// There was a bug in InputUpsert where if within the same step the operator received two
2036+
/// vectors with updates, where the first vector contained a key that was deleted and the second
2037+
/// vector contained the same key that was inserted, the output would be incorrect because we
2038+
/// reordered the insert and the delete.
2039+
#[test]
2040+
fn map_reinsert_within_step_accumulate_output() {
2041+
let (mut dbsp, (input_handle, output_handle)) = Runtime::init_circuit(1, |circuit| {
2042+
let (stream, handle) =
2043+
circuit.add_input_map::<u64, u64, i64, _>(|v, u| *v = ((*v as i64) + u) as u64);
2044+
Ok((handle, stream.accumulate_output()))
2045+
})
2046+
.unwrap();
2047+
2048+
// Seed the map with a few keys.
2049+
let initial_batch = vec![
2050+
Tup2(1, Update::Insert(10)),
2051+
Tup2(2, Update::Insert(20)),
2052+
Tup2(3, Update::Insert(30)),
2053+
];
2054+
// Use stage instead of append to make sure the updates don't get merged in a single vector.
2055+
input_handle
2056+
.stage(vec![VecDeque::from(initial_batch)])
2057+
.flush();
2058+
dbsp.transaction().unwrap();
2059+
assert_eq!(
2060+
output_handle.concat().consolidate(),
2061+
indexed_zset! { 1 => {10 => 1}, 2 => {20 => 1}, 3 => {30 => 1} }
2062+
);
2063+
2064+
// Step 1:
2065+
// - first batch deletes existing keys
2066+
// - second batch reinserts them and adds one extra key
2067+
let delete_batch_1 = vec![
2068+
Tup2(1, Update::Delete),
2069+
Tup2(2, Update::Delete),
2070+
Tup2(3, Update::Delete),
2071+
];
2072+
input_handle
2073+
.stage(vec![VecDeque::from(delete_batch_1)])
2074+
.flush();
2075+
let reinsert_batch_1 = vec![
2076+
Tup2(1, Update::Insert(10)),
2077+
Tup2(2, Update::Insert(20)),
2078+
Tup2(3, Update::Insert(30)),
2079+
Tup2(4, Update::Insert(40)),
2080+
];
2081+
input_handle
2082+
.stage(vec![VecDeque::from(reinsert_batch_1)])
2083+
.flush();
2084+
dbsp.transaction().unwrap();
2085+
assert_eq!(
2086+
output_handle.concat().consolidate(),
2087+
indexed_zset! { 4 => {40 => 1} }
2088+
);
2089+
2090+
// Step 2: repeat with one more additional key.
2091+
let delete_batch_2 = vec![
2092+
Tup2(1, Update::Delete),
2093+
Tup2(2, Update::Delete),
2094+
Tup2(3, Update::Delete),
2095+
Tup2(4, Update::Delete),
2096+
];
2097+
input_handle
2098+
.stage(vec![VecDeque::from(delete_batch_2)])
2099+
.flush();
2100+
let reinsert_batch_2 = vec![
2101+
Tup2(1, Update::Insert(10)),
2102+
Tup2(2, Update::Insert(20)),
2103+
Tup2(3, Update::Insert(30)),
2104+
Tup2(4, Update::Insert(40)),
2105+
Tup2(5, Update::Insert(50)),
2106+
];
2107+
input_handle
2108+
.stage(vec![VecDeque::from(reinsert_batch_2)])
2109+
.flush();
2110+
dbsp.transaction().unwrap();
2111+
assert_eq!(
2112+
output_handle.concat().consolidate(),
2113+
indexed_zset! {
2114+
5 => {50 => 1}
2115+
}
2116+
);
2117+
2118+
dbsp.kill().unwrap();
2119+
}
2120+
20342121
fn map_with_waterline_test_circuit(
20352122
circuit: &RootCircuit,
20362123
) -> (

crates/dbsp/src/operator/dynamic/input_upsert.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,9 @@ where
590590
.iter()
591591
.map(|(updates, index)| updates.index(*index))
592592
.enumerate()
593-
.min_by(|(_a_index, a), (_b_index, b)| a.cmp(b))
593+
// Find the first update with the smallest key (compare keys, not updates, so that we apply updates in order).
594+
// min_by is guaranteed to return the first among equal keys.
595+
.min_by(|(_a_index, a), (_b_index, b)| a.fst().cmp(b.fst()))
594596
.unwrap();
595597
updates[index].1 += 1;
596598
if updates[index].1 >= updates[index].0.len() {

0 commit comments

Comments
 (0)