Skip to content

Commit 8d54782

Browse files
committed
Rebase
1 parent 2925be6 commit 8d54782

File tree

8 files changed

+175
-362
lines changed

8 files changed

+175
-362
lines changed

.gitattributes

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Cargo.lock -diff linguist-generated=true
1+
Cargo.lock merge=binary linguist-generated=true
22
proptest-regressions/**/* -diff linguist-generated=true

Cargo.lock

Lines changed: 154 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benches/ldbc-graphalytics/bfs.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@
66
//! [A.1]: https://arxiv.org/pdf/2011.15028v4.pdf#section.A.1
77
88
use crate::data::{Distance, DistanceSet, Edges, Node, Vertices};
9-
use dbsp::{operator::recursive::RecursiveStreams, time::NestedTimestamp32, Circuit, Stream};
9+
use dbsp::{
10+
operator::{recursive::RecursiveStreams, FilterMap},
11+
time::NestedTimestamp32,
12+
Circuit, Stream,
13+
};
1014

1115
type Distances<P> = Stream<Circuit<P>, DistanceSet>;
1216

@@ -48,10 +52,10 @@ where
4852
.expect("failed to build dfs recursive scope");
4953

5054
// Collect all reachable nodes
51-
let reachable_nodes = distances.map_keys(|&(node, _)| node);
55+
let reachable_nodes = distances.map(|&(node, _)| node);
5256
// Find all unreachable nodes (vertices not included in `distances`) and give them a weight of -1
5357
let unreachable_nodes =
54-
antijoin(&vertices, &reachable_nodes).map_keys(|&node| (node, i64::MAX as Distance));
58+
antijoin(&vertices, &reachable_nodes).map(|&node| (node, i64::MAX as Distance));
5559

5660
distances.plus(&unreachable_nodes)
5761
}

benches/ldbc-graphalytics/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ fn main() {
6161
"using dataset {} with {} vertices and {} edges",
6262
dataset.name, properties.vertices, properties.edges,
6363
);
64-
Runtime::run(1, move |_runtime, _index| {
64+
Runtime::run(1, move || {
6565
print!("building dataflow... ");
6666
io::stdout().flush().unwrap();
6767
let start = Instant::now();

benches/ldbc-graphalytics/pagerank.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::data::{Edges, Node, Rank, RankPairs, RankSet, Ranks, Streamed, Vertic
22
use dbsp::{
33
algebra::HasOne,
44
circuit::operator_traits::Data,
5-
operator::{DelayedFeedback, Generator},
5+
operator::{DelayedFeedback, FilterMap, Generator},
66
trace::{
77
layers::Trie, ord::indexed_zset_batch::OrdIndexedZSetBuilder, Batch, BatchReader, Batcher,
88
Builder, Cursor,
@@ -80,10 +80,10 @@ where
8080
});
8181

8282
// Calculate the teleport, `(1 - 𝑑) ÷ |V|`
83-
let teleport =
84-
total_vertices.map_values::<OrdIndexedZSet<_, _, _>, _>(move |&(), &total_vertices| {
85-
F64::new((1.0 - damping_factor) / total_vertices as f64)
86-
});
83+
let teleport = total_vertices.map_index(move |(&node, &total_vertices)| {
84+
let teleport = F64::new((1.0 - damping_factor) / total_vertices as f64);
85+
(node, teleport)
86+
});
8787

8888
// Count the number of outgoing edges for each node
8989
let outgoing_sans_dangling = edges.apply(|weights| {
@@ -112,7 +112,7 @@ where
112112
vertices.minus(&outgoing_sans_dangling.semijoin_stream_core(&vertices, |&node, _| node));
113113

114114
// Make the edge weights for all dangling nodes, we only need to calculate this once
115-
let dangling_edge_weights = dangling_nodes.map_keys(|&node| (node, F64::new(0.0)));
115+
let dangling_edge_weights = dangling_nodes.map(|&node| (node, F64::new(0.0)));
116116

117117
let reversed = if directed {
118118
edges.apply(|edges| {
@@ -134,14 +134,14 @@ where
134134

135135
// Undirected graphs already have all the edges we need
136136
} else {
137-
edges.map_values(|_, _| ())
137+
edges.map(|(&src, _)| src)
138138
}
139139
.distinct();
140140

141141
// Find vertices without any incoming edges
142142
let zero_incoming = vertices
143143
.minus(&reversed.semijoin::<(), _, _, _>(&vertices, |&node, _| node))
144-
.map_keys::<OrdZSet<_, _>, _>(|&node| ((), (node, F64::new(0.0))));
144+
.map(|&node| ((), (node, F64::new(0.0))));
145145

146146
// Create a stream containing one `((), 0)` pair
147147
let zero = vertices

src/operator/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,11 @@ pub use apply::Apply;
3737
pub use condition::Condition;
3838
pub use delta0::Delta0;
3939
pub use distinct::Distinct;
40-
pub use filter::FilterKeys;
41-
pub use filter_map::FilterMapKeys;
40+
pub use filter_map::{FilterKeys, FilterMap, FilterVals, FlatMap, Map, MapKeys};
4241
pub use generator::{Generator, GeneratorNested};
4342
pub use index::Index;
4443
pub use inspect::Inspect;
4544
pub use join::Join;
46-
pub use map::{MapKeys, MapValues};
4745
pub use neg::UnaryMinus;
4846
pub use plus::{Minus, Plus};
4947
pub use sum::Sum;

0 commit comments

Comments
 (0)