|
2122 | 2122 | //! pairs of reachable nodes. If you want to see this in action, |
2123 | 2123 | //! we invite you to play around with `tutorial10.rs`. |
2124 | 2124 | //! |
| 2125 | +//! To fix this issue, we have to change the code to stop iterating once the |
| 2126 | +//! shortest path for each pair of nodes has been discovered. One approach to |
| 2127 | +//! achieve this is to group by each pair and use the |
| 2128 | +//! [`Min`](`crate::operator::dynamic::aggregate::Min`) aggregation operator |
| 2129 | +//! to only retain the shortest path for each pair. |
| 2130 | +//! Aggregation requires to index the stream, so there are more code changes |
| 2131 | +//! required than shown here. You can find the full code in `tutorial11.rs` but |
| 2132 | +//! the important changes take place within the child circuit: |
| 2133 | +//! |
| 2134 | +//! ``` |
| 2135 | +//! # use anyhow::Result; |
| 2136 | +//! # use dbsp::{ |
| 2137 | +//! # indexed_zset, |
| 2138 | +//! # operator::{Generator, Min}, |
| 2139 | +//! # utils::{Tup2, Tup3, Tup4}, |
| 2140 | +//! # zset_set, Circuit, NestedCircuit, OrdIndexedZSet, RootCircuit, Stream, |
| 2141 | +//! # }; |
| 2142 | +//! # |
| 2143 | +//! type Accumulator = |
| 2144 | +//! Stream<NestedCircuit, OrdIndexedZSet<Tup2<usize, usize>, Tup4<usize, usize, usize, usize>>>; |
| 2145 | +//! # |
| 2146 | +//! # fn main() -> Result<()> { |
| 2147 | +//! # const STEPS: usize = 2; |
| 2148 | +//! # |
| 2149 | +//! # let (circuit_handle, output_handle) = RootCircuit::build(move |root_circuit| { |
| 2150 | +//! # let mut edges_data = ([ |
| 2151 | +//! # zset_set! { Tup3(0_usize, 1_usize, 1_usize), Tup3(1, 2, 1), Tup3(2, 3, 2), Tup3(3, 4, 2) }, |
| 2152 | +//! # zset_set! { Tup3(4, 0, 3)} |
| 2153 | +//! # ] as [_; STEPS]) |
| 2154 | +//! # .into_iter(); |
| 2155 | +//! # |
| 2156 | +//! # let edges = root_circuit.add_source(Generator::new(move || edges_data.next().unwrap())); |
| 2157 | +//! # |
| 2158 | +//! # let len_1 = edges |
| 2159 | +//! # .map_index(|Tup3(from, to, weight)| (Tup2(*from, *to), Tup4(*from, *to, *weight, 1))); |
| 2160 | +//! |
| 2161 | +//! let closure = root_circuit.recursive(|child_circuit, len_n_minus_1: Accumulator| { |
| 2162 | +//! // Import the `edges` and `len_1` stream from the parent circuit. |
| 2163 | +//! let edges = edges.delta0(child_circuit); |
| 2164 | +//! let len_1 = len_1.delta0(child_circuit); |
| 2165 | +//! |
| 2166 | +//! // Perform an iterative step (n-1 to n) through joining the |
| 2167 | +//! // paths of length n-1 with the edges. |
| 2168 | +//! let len_n = len_n_minus_1 |
| 2169 | +//! .map_index( |
| 2170 | +//! |(Tup2(_start, _end), Tup4(start, end, cum_weight, hopcnt))| { |
| 2171 | +//! (*end, Tup4(*start, *end, *cum_weight, *hopcnt)) |
| 2172 | +//! }, |
| 2173 | +//! ) |
| 2174 | +//! // We now use `join_index` instead of `join` to index the stream on node pairs. |
| 2175 | +//! .join_index( |
| 2176 | +//! &edges.map_index(|Tup3(from, to, weight)| (*from, Tup3(*from, *to, *weight))), |
| 2177 | +//! |_end_from, Tup4(start, _end, cum_weight, hopcnt), Tup3(_from, to, weight)| { |
| 2178 | +//! Some(( |
| 2179 | +//! Tup2(*start, *to), |
| 2180 | +//! Tup4(*start, *to, cum_weight + weight, hopcnt + 1), |
| 2181 | +//! )) |
| 2182 | +//! }, |
| 2183 | +//! ) |
| 2184 | +//! .plus(&len_1) |
| 2185 | +//! // Here, we use the `aggregate` operator to only keep the shortest path. |
| 2186 | +//! .aggregate(Min); |
| 2187 | +//! |
| 2188 | +//! Ok(len_n) |
| 2189 | +//! })?; |
| 2190 | +//! |
| 2191 | +//! # let mut expected_outputs = ([ |
| 2192 | +//! # indexed_zset! { Tup2<usize, usize> => Tup4<usize, usize, usize, usize>: |
| 2193 | +//! # Tup2(0, 1) => { Tup4(0, 1, 1, 1) => 1 }, |
| 2194 | +//! # Tup2(0, 2) => { Tup4(0, 2, 2, 2) => 1 }, |
| 2195 | +//! # Tup2(0, 3) => { Tup4(0, 3, 4, 3) => 1 }, |
| 2196 | +//! # Tup2(0, 4) => { Tup4(0, 4, 6, 4) => 1 }, |
| 2197 | +//! # Tup2(1, 2) => { Tup4(1, 2, 1, 1) => 1 }, |
| 2198 | +//! # Tup2(1, 3) => { Tup4(1, 3, 3, 2) => 1 }, |
| 2199 | +//! # Tup2(1, 4) => { Tup4(1, 4, 5, 3) => 1 }, |
| 2200 | +//! # Tup2(2, 3) => { Tup4(2, 3, 2, 1) => 1 }, |
| 2201 | +//! # Tup2(2, 4) => { Tup4(2, 4, 4, 2) => 1 }, |
| 2202 | +//! # Tup2(3, 4) => { Tup4(3, 4, 2, 1) => 1 }, |
| 2203 | +//! # }, |
| 2204 | +//! # indexed_zset! { Tup2<usize, usize> => Tup4<usize, usize, usize, usize>: |
| 2205 | +//! # Tup2(0, 0) => { Tup4(0, 0, 9, 5) => 1 }, |
| 2206 | +//! # Tup2(1, 0) => { Tup4(1, 0, 8, 4) => 1 }, |
| 2207 | +//! # Tup2(1, 1) => { Tup4(1, 1, 9, 5) => 1 }, |
| 2208 | +//! # Tup2(2, 0) => { Tup4(2, 0, 7, 3) => 1 }, |
| 2209 | +//! # Tup2(2, 1) => { Tup4(2, 1, 8, 4) => 1 }, |
| 2210 | +//! # Tup2(2, 2) => { Tup4(2, 2, 9, 5) => 1 }, |
| 2211 | +//! # Tup2(3, 0) => { Tup4(3, 0, 5, 2) => 1 }, |
| 2212 | +//! # Tup2(3, 1) => { Tup4(3, 1, 6, 3) => 1 }, |
| 2213 | +//! # Tup2(3, 2) => { Tup4(3, 2, 7, 4) => 1 }, |
| 2214 | +//! # Tup2(3, 3) => { Tup4(3, 3, 9, 5) => 1 }, |
| 2215 | +//! # Tup2(4, 0) => { Tup4(4, 0, 3, 1) => 1 }, |
| 2216 | +//! # Tup2(4, 1) => { Tup4(4, 1, 4, 2) => 1 }, |
| 2217 | +//! # Tup2(4, 2) => { Tup4(4, 2, 5, 3) => 1 }, |
| 2218 | +//! # Tup2(4, 3) => { Tup4(4, 3, 7, 4) => 1 }, |
| 2219 | +//! # Tup2(4, 4) => { Tup4(4, 4, 9, 5) => 1 }, |
| 2220 | +//! # }, |
| 2221 | +//! # ] as [_; STEPS]) |
| 2222 | +//! # .into_iter(); |
| 2223 | +//! # |
| 2224 | +//! # closure.inspect(move |output| { |
| 2225 | +//! # assert_eq!(*output, expected_outputs.next().unwrap()); |
| 2226 | +//! # }); |
| 2227 | +//! # |
| 2228 | +//! # Ok(closure.output()) |
| 2229 | +//! # })?; |
| 2230 | +//! # |
| 2231 | +//! # for i in 0..STEPS { |
| 2232 | +//! # circuit_handle.step()?; |
| 2233 | +//! # } |
| 2234 | +//! # |
| 2235 | +//! # Ok(()) |
| 2236 | +//! # } |
| 2237 | +//! ``` |
| 2238 | +//! |
| 2239 | +//! Keep in mind that this is just one way to fix the issue and that in general |
| 2240 | +//! recursive queries with aggregates are not guaranteed to converge to the |
| 2241 | +//! optimum of the aggregation function (here, the minimum function), |
| 2242 | +//! even though there exists a finite solution. |
| 2243 | +//! If the problem at hand is not _monotonic_, the computed fixed-point may |
| 2244 | +//! not be the optimal solution to the aggregation function. |
| 2245 | +//! |
2125 | 2246 | //! # Next steps |
2126 | 2247 | //! |
2127 | 2248 | //! We've shown how input, computation, and output work in DBSP. That's all |
|
0 commit comments