Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 147 additions & 1 deletion crates/dbsp/src/operator/dynamic/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1390,7 +1390,15 @@ mod test {
zset,
};
use anyhow::Result as AnyResult;
use std::{cmp::max, collections::VecDeque, iter::once, ops::Mul};
use rand::seq::index;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;
use std::{
cmp::max,
collections::{BTreeSet, HashMap, VecDeque},
iter::once,
ops::Mul,
};

fn input_batches() -> Vec<OrdZSet<u64>> {
vec![
Expand Down Expand Up @@ -2118,6 +2126,144 @@ mod test {
dbsp.kill().unwrap();
}

/// Split `items` into `k` contiguous non-empty segments (`k` in 1..=min(3, n)).
fn partition_into_k_contiguous_batches<T: Clone>(
items: Vec<T>,
k: usize,
rng: &mut ChaCha8Rng,
) -> Vec<Vec<T>> {
let n = items.len();
debug_assert!(k >= 1);
if n == 0 {
return vec![];
}
let k = k.min(n).max(1);
if k == 1 {
return vec![items];
}
let split_at: Vec<usize> = index::sample(rng, n - 1, k - 1)
.into_iter()
.map(|i| i + 1)
.collect();
let mut split_at = split_at;
split_at.sort_unstable();
let mut out = Vec::with_capacity(k);
let mut start = 0usize;
for cut in split_at {
out.push(items[start..cut].to_vec());
start = cut;
}
out.push(items[start..].to_vec());
out
}

fn apply_map_update(state: &mut HashMap<u64, u64>, key: u64, upd: Update<u64, u64>) {
match upd {
Update::Insert(v) => {
state.insert(key, v);
}
Update::Delete => {
state.remove(&key);
}
Update::Update(v) => {
if state.contains_key(&key) {
state.insert(key, v);
}
}
}
}

fn indexed_zset_state_diff(
before: &HashMap<u64, u64>,
after: &HashMap<u64, u64>,
) -> OrdIndexedZSet<u64, u64> {
let keys: BTreeSet<u64> = before.keys().chain(after.keys()).copied().collect();
let mut tuples = Vec::new();
for k in keys {
let old_v = before.get(&k).copied();
let new_v = after.get(&k).copied();
match (old_v, new_v) {
(None, None) => {}
(None, Some(nv)) => tuples.push(Tup2(Tup2(k, nv), 1)),
(Some(ov), None) => tuples.push(Tup2(Tup2(k, ov), -1)),
(Some(ov), Some(nv)) if ov != nv => {
tuples.push(Tup2(Tup2(k, ov), -1));
tuples.push(Tup2(Tup2(k, nv), 1));
}
_ => {}
}
}
OrdIndexedZSet::from_tuples((), tuples)
}

/// Stress-test the implementation of the InputUpsert operator.
///
/// Every iteration generates several upates to the same three keys and feeds them in up to three batches
/// using `stage().flush()`. The accumulated output must match applying updates in order (last update per
/// key wins within the folded semantics of [`InputUpsert`]).
#[test]
fn randomized_input_map_test() {
let (mut dbsp, (input_handle, output_handle)) = Runtime::init_circuit(1, |circuit| {
let (stream, handle) = circuit.add_input_map::<u64, u64, u64, _>(|v, u| *v = *u);
Ok((handle, stream.accumulate_output()))
})
.unwrap();

let mut state: HashMap<u64, u64> = HashMap::new();
let mut rng = ChaCha8Rng::seed_from_u64(0x_6D61_705F_7374_6167_u64);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this number


for _step in 0..50000 {
//println!("step {}", step);
let before = state.clone();
let num_updates = rng.gen_range(0..=12);

let updates: Vec<Tup2<u64, Update<u64, u64>>> = (0..num_updates)
.map(|_| {
let key = rng.gen_range(1u64..=3);
let upd = match rng.gen_range(0..3) {
0 => Update::Insert(rng.gen_range(0u64..512)),
1 => Update::Delete,
2 => Update::Update(rng.gen_range(0u64..=512)),
_ => unreachable!(),
};
Tup2(key, upd)
})
.collect();

for Tup2(k, u) in &updates {
apply_map_update(&mut state, *k, u.clone());
}

let num_batches = if num_updates == 0 {
1usize
} else {
rng.gen_range(1..=std::cmp::min(3, num_updates))
};

let batches = if num_updates == 0 {
vec![Vec::new()]
} else {
partition_into_k_contiguous_batches(updates, num_batches, &mut rng)
};

for batch in batches {
//println!("batch {:?}", batch);
input_handle.stage(once(VecDeque::from(batch))).flush();
}

dbsp.transaction().unwrap();

let expected = indexed_zset_state_diff(&before, &state);
assert_eq!(
output_handle.concat().consolidate(),
expected,
"accumulated output should equal the net map change for this transaction"
);
}

dbsp.kill().unwrap();
}

fn map_with_waterline_test_circuit(
circuit: &RootCircuit,
) -> (
Expand Down
Loading