Skip to content

Commit 862d32b

Browse files
lstwnryzhyk
authored andcommitted
Add tutorial 11 example to demonstrate termination of recursive
computation with cycles The example demonstrates how to compute the transitive closure using recursive queries with the `Min` aggregation operator to ensure termination in the presence of cycles. This addresses the feedback on the PR. Signed-off-by: Leo Stewen <lstwn@mailbox.org>
1 parent 99f00cb commit 862d32b

File tree

4 files changed

+250
-2
lines changed

4 files changed

+250
-2
lines changed

crates/dbsp/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ path = "examples/tutorial/tutorial9.rs"
187187
name = "tutorial10"
188188
path = "examples/tutorial/tutorial10.rs"
189189

190+
[[example]]
191+
name = "tutorial11"
192+
path = "examples/tutorial/tutorial11.rs"
193+
190194
[[example]]
191195
name = "coord"
192196
path = "examples/dist/coord.rs"

crates/dbsp/examples/tutorial/tutorial10.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ fn main() -> Result<()> {
6262
)?;
6363

6464
let mut expected_outputs = ([
65-
// We expect the full transitive closure in the first clock cycle.
65+
// We expect the full transitive closure in the first step.
6666
zset! {
6767
Tup4(0, 1, 1, 1) => 1,
6868
Tup4(0, 2, 2, 2) => 1,
@@ -75,7 +75,7 @@ fn main() -> Result<()> {
7575
Tup4(2, 4, 4, 2) => 1,
7676
Tup4(3, 4, 2, 1) => 1,
7777
},
78-
// These paths are removed in the second clock cycle.
78+
// These paths are removed in the second step.
7979
zset! {
8080
Tup4(0, 2, 2, 2) => -1,
8181
Tup4(0, 3, 4, 3) => -1,
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
use anyhow::Result;
2+
use dbsp::{
3+
indexed_zset,
4+
operator::{Generator, Min},
5+
utils::{Tup2, Tup3, Tup4},
6+
zset_set, Circuit, NestedCircuit, OrdIndexedZSet, RootCircuit, Stream,
7+
};
8+
9+
type Accumulator =
10+
Stream<NestedCircuit, OrdIndexedZSet<Tup2<usize, usize>, Tup4<usize, usize, usize, usize>>>;
11+
12+
fn main() -> Result<()> {
13+
const STEPS: usize = 2;
14+
15+
let (circuit_handle, output_handle) = RootCircuit::build(move |root_circuit| {
16+
let mut edges_data = ([
17+
// The first step adds a graph of four nodes, just like before:
18+
// |0| -1-> |1| -1-> |2| -2-> |3| -2-> |4|
19+
zset_set! { Tup3(0_usize, 1_usize, 1_usize), Tup3(1, 2, 1), Tup3(2, 3, 2), Tup3(3, 4, 2) },
20+
// The second step introduces a cycle. Due to the code changes below,
21+
// the query does terminate though. In total, the graph now looks
22+
// like this:
23+
// |0| -1-> |1| -1-> |2| -2-> |3| -2-> |4|
24+
// ^ |
25+
// | |
26+
// ------------------3------------------
27+
zset_set! { Tup3(4, 0, 3)}
28+
] as [_; STEPS])
29+
.into_iter();
30+
31+
let edges = root_circuit.add_source(Generator::new(move || edges_data.next().unwrap()));
32+
33+
// Create a base stream with all paths of length 1.
34+
let len_1 = edges
35+
.map_index(|Tup3(from, to, weight)| (Tup2(*from, *to), Tup4(*from, *to, *weight, 1)));
36+
37+
let closure = root_circuit.recursive(|child_circuit, len_n_minus_1: Accumulator| {
38+
// Import the `edges` and `len_1` stream from the parent circuit.
39+
let edges = edges.delta0(child_circuit);
40+
let len_1 = len_1.delta0(child_circuit);
41+
42+
// Perform an iterative step (n-1 to n) through joining the
43+
// paths of length n-1 with the edges.
44+
let len_n = len_n_minus_1
45+
.map_index(
46+
|(Tup2(_start, _end), Tup4(start, end, cum_weight, hopcnt))| {
47+
(*end, Tup4(*start, *end, *cum_weight, *hopcnt))
48+
},
49+
)
50+
.join_index(
51+
&edges.map_index(|Tup3(from, to, weight)| (*from, Tup3(*from, *to, *weight))),
52+
|_end_from, Tup4(start, _end, cum_weight, hopcnt), Tup3(_from, to, weight)| {
53+
Some((
54+
Tup2(*start, *to),
55+
Tup4(*start, *to, cum_weight + weight, hopcnt + 1),
56+
))
57+
},
58+
)
59+
.plus(&len_1)
60+
.aggregate(Min);
61+
62+
Ok(len_n)
63+
})?;
64+
65+
let mut expected_outputs = ([
66+
// The transitive closure in the first step remains the same as in
67+
// `tutorial10.rs`.
68+
indexed_zset! { Tup2<usize, usize> => Tup4<usize, usize, usize, usize>:
69+
Tup2(0, 1) => { Tup4(0, 1, 1, 1) => 1 },
70+
Tup2(0, 2) => { Tup4(0, 2, 2, 2) => 1 },
71+
Tup2(0, 3) => { Tup4(0, 3, 4, 3) => 1 },
72+
Tup2(0, 4) => { Tup4(0, 4, 6, 4) => 1 },
73+
Tup2(1, 2) => { Tup4(1, 2, 1, 1) => 1 },
74+
Tup2(1, 3) => { Tup4(1, 3, 3, 2) => 1 },
75+
Tup2(1, 4) => { Tup4(1, 4, 5, 3) => 1 },
76+
Tup2(2, 3) => { Tup4(2, 3, 2, 1) => 1 },
77+
Tup2(2, 4) => { Tup4(2, 4, 4, 2) => 1 },
78+
Tup2(3, 4) => { Tup4(3, 4, 2, 1) => 1 },
79+
},
80+
// The second step's introduction of a cycle yields these new paths.
81+
indexed_zset! { Tup2<usize, usize> => Tup4<usize, usize, usize, usize>:
82+
Tup2(0, 0) => { Tup4(0, 0, 9, 5) => 1 },
83+
Tup2(1, 0) => { Tup4(1, 0, 8, 4) => 1 },
84+
Tup2(1, 1) => { Tup4(1, 1, 9, 5) => 1 },
85+
Tup2(2, 0) => { Tup4(2, 0, 7, 3) => 1 },
86+
Tup2(2, 1) => { Tup4(2, 1, 8, 4) => 1 },
87+
Tup2(2, 2) => { Tup4(2, 2, 9, 5) => 1 },
88+
Tup2(3, 0) => { Tup4(3, 0, 5, 2) => 1 },
89+
Tup2(3, 1) => { Tup4(3, 1, 6, 3) => 1 },
90+
Tup2(3, 2) => { Tup4(3, 2, 7, 4) => 1 },
91+
Tup2(3, 3) => { Tup4(3, 3, 9, 5) => 1 },
92+
Tup2(4, 0) => { Tup4(4, 0, 3, 1) => 1 },
93+
Tup2(4, 1) => { Tup4(4, 1, 4, 2) => 1 },
94+
Tup2(4, 2) => { Tup4(4, 2, 5, 3) => 1 },
95+
Tup2(4, 3) => { Tup4(4, 3, 7, 4) => 1 },
96+
Tup2(4, 4) => { Tup4(4, 4, 9, 5) => 1 },
97+
},
98+
] as [_; STEPS])
99+
.into_iter();
100+
101+
closure.inspect(move |output| {
102+
assert_eq!(*output, expected_outputs.next().unwrap());
103+
});
104+
105+
Ok(closure.output())
106+
})?;
107+
108+
for i in 0..STEPS {
109+
let iteration = i + 1;
110+
println!("Iteration {} starts...", iteration);
111+
circuit_handle.step()?;
112+
output_handle.consolidate().iter().for_each(
113+
|(Tup2(_start, _end), Tup4(start, end, cum_weight, hopcnt), z_weight)| {
114+
println!(
115+
"{start} -> {end} (cum weight: {cum_weight}, hops: {hopcnt}) => {z_weight}"
116+
);
117+
},
118+
);
119+
println!("Iteration {} finished.", iteration);
120+
}
121+
122+
Ok(())
123+
}

crates/dbsp/src/tutorial.rs

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2122,6 +2122,127 @@
21222122
//! pairs of reachable nodes. If you want to see this in action,
21232123
//! we invite you to play around with `tutorial10.rs`.
21242124
//!
2125+
//! To fix this issue, we have to change the code to stop iterating once the
2126+
//! shortest path for each pair of nodes has been discovered. One approach to
2127+
//! achieve this is to group by each pair and use the
2128+
//! [`Min`](`crate::operator::dynamic::aggregate::Min`) aggregation operator
2129+
//! to only retain the shortest path for each pair.
2130+
//! Aggregation requires to index the stream, so there are more code changes
2131+
//! required than shown here. You can find the full code in `tutorial11.rs` but
2132+
//! the important changes take place within the child circuit:
2133+
//!
2134+
//! ```
2135+
//! # use anyhow::Result;
2136+
//! # use dbsp::{
2137+
//! # indexed_zset,
2138+
//! # operator::{Generator, Min},
2139+
//! # utils::{Tup2, Tup3, Tup4},
2140+
//! # zset_set, Circuit, NestedCircuit, OrdIndexedZSet, RootCircuit, Stream,
2141+
//! # };
2142+
//! #
2143+
//! type Accumulator =
2144+
//! Stream<NestedCircuit, OrdIndexedZSet<Tup2<usize, usize>, Tup4<usize, usize, usize, usize>>>;
2145+
//! #
2146+
//! # fn main() -> Result<()> {
2147+
//! # const STEPS: usize = 2;
2148+
//! #
2149+
//! # let (circuit_handle, output_handle) = RootCircuit::build(move |root_circuit| {
2150+
//! # let mut edges_data = ([
2151+
//! # zset_set! { Tup3(0_usize, 1_usize, 1_usize), Tup3(1, 2, 1), Tup3(2, 3, 2), Tup3(3, 4, 2) },
2152+
//! # zset_set! { Tup3(4, 0, 3)}
2153+
//! # ] as [_; STEPS])
2154+
//! # .into_iter();
2155+
//! #
2156+
//! # let edges = root_circuit.add_source(Generator::new(move || edges_data.next().unwrap()));
2157+
//! #
2158+
//! # let len_1 = edges
2159+
//! # .map_index(|Tup3(from, to, weight)| (Tup2(*from, *to), Tup4(*from, *to, *weight, 1)));
2160+
//!
2161+
//! let closure = root_circuit.recursive(|child_circuit, len_n_minus_1: Accumulator| {
2162+
//! // Import the `edges` and `len_1` stream from the parent circuit.
2163+
//! let edges = edges.delta0(child_circuit);
2164+
//! let len_1 = len_1.delta0(child_circuit);
2165+
//!
2166+
//! // Perform an iterative step (n-1 to n) through joining the
2167+
//! // paths of length n-1 with the edges.
2168+
//! let len_n = len_n_minus_1
2169+
//! .map_index(
2170+
//! |(Tup2(_start, _end), Tup4(start, end, cum_weight, hopcnt))| {
2171+
//! (*end, Tup4(*start, *end, *cum_weight, *hopcnt))
2172+
//! },
2173+
//! )
2174+
//! // We now use `join_index` instead of `join` to index the stream on node pairs.
2175+
//! .join_index(
2176+
//! &edges.map_index(|Tup3(from, to, weight)| (*from, Tup3(*from, *to, *weight))),
2177+
//! |_end_from, Tup4(start, _end, cum_weight, hopcnt), Tup3(_from, to, weight)| {
2178+
//! Some((
2179+
//! Tup2(*start, *to),
2180+
//! Tup4(*start, *to, cum_weight + weight, hopcnt + 1),
2181+
//! ))
2182+
//! },
2183+
//! )
2184+
//! .plus(&len_1)
2185+
//! // Here, we use the `aggregate` operator to only keep the shortest path.
2186+
//! .aggregate(Min);
2187+
//!
2188+
//! Ok(len_n)
2189+
//! })?;
2190+
//!
2191+
//! # let mut expected_outputs = ([
2192+
//! # indexed_zset! { Tup2<usize, usize> => Tup4<usize, usize, usize, usize>:
2193+
//! # Tup2(0, 1) => { Tup4(0, 1, 1, 1) => 1 },
2194+
//! # Tup2(0, 2) => { Tup4(0, 2, 2, 2) => 1 },
2195+
//! # Tup2(0, 3) => { Tup4(0, 3, 4, 3) => 1 },
2196+
//! # Tup2(0, 4) => { Tup4(0, 4, 6, 4) => 1 },
2197+
//! # Tup2(1, 2) => { Tup4(1, 2, 1, 1) => 1 },
2198+
//! # Tup2(1, 3) => { Tup4(1, 3, 3, 2) => 1 },
2199+
//! # Tup2(1, 4) => { Tup4(1, 4, 5, 3) => 1 },
2200+
//! # Tup2(2, 3) => { Tup4(2, 3, 2, 1) => 1 },
2201+
//! # Tup2(2, 4) => { Tup4(2, 4, 4, 2) => 1 },
2202+
//! # Tup2(3, 4) => { Tup4(3, 4, 2, 1) => 1 },
2203+
//! # },
2204+
//! # indexed_zset! { Tup2<usize, usize> => Tup4<usize, usize, usize, usize>:
2205+
//! # Tup2(0, 0) => { Tup4(0, 0, 9, 5) => 1 },
2206+
//! # Tup2(1, 0) => { Tup4(1, 0, 8, 4) => 1 },
2207+
//! # Tup2(1, 1) => { Tup4(1, 1, 9, 5) => 1 },
2208+
//! # Tup2(2, 0) => { Tup4(2, 0, 7, 3) => 1 },
2209+
//! # Tup2(2, 1) => { Tup4(2, 1, 8, 4) => 1 },
2210+
//! # Tup2(2, 2) => { Tup4(2, 2, 9, 5) => 1 },
2211+
//! # Tup2(3, 0) => { Tup4(3, 0, 5, 2) => 1 },
2212+
//! # Tup2(3, 1) => { Tup4(3, 1, 6, 3) => 1 },
2213+
//! # Tup2(3, 2) => { Tup4(3, 2, 7, 4) => 1 },
2214+
//! # Tup2(3, 3) => { Tup4(3, 3, 9, 5) => 1 },
2215+
//! # Tup2(4, 0) => { Tup4(4, 0, 3, 1) => 1 },
2216+
//! # Tup2(4, 1) => { Tup4(4, 1, 4, 2) => 1 },
2217+
//! # Tup2(4, 2) => { Tup4(4, 2, 5, 3) => 1 },
2218+
//! # Tup2(4, 3) => { Tup4(4, 3, 7, 4) => 1 },
2219+
//! # Tup2(4, 4) => { Tup4(4, 4, 9, 5) => 1 },
2220+
//! # },
2221+
//! # ] as [_; STEPS])
2222+
//! # .into_iter();
2223+
//! #
2224+
//! # closure.inspect(move |output| {
2225+
//! # assert_eq!(*output, expected_outputs.next().unwrap());
2226+
//! # });
2227+
//! #
2228+
//! # Ok(closure.output())
2229+
//! # })?;
2230+
//! #
2231+
//! # for i in 0..STEPS {
2232+
//! # circuit_handle.step()?;
2233+
//! # }
2234+
//! #
2235+
//! # Ok(())
2236+
//! # }
2237+
//! ```
2238+
//!
2239+
//! Keep in mind that this is just one way to fix the issue and that in general
2240+
//! recursive queries with aggregates are not guaranteed to converge to the
2241+
//! optimum of the aggregation function (here, the minimum function),
2242+
//! even though there exists a finite solution.
2243+
//! If the problem at hand is not _monotonic_, the computed fixed-point may
2244+
//! not be the optimal solution to the aggregation function.
2245+
//!
21252246
//! # Next steps
21262247
//!
21272248
//! We've shown how input, computation, and output work in DBSP. That's all

0 commit comments

Comments
 (0)