Skip to content

Commit 7bd0ecd

Browse files
committed
[SQL] Use stable sorting for the circuit graph -- will produce fewer changes when program is edited
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent 4b5272b commit 7bd0ecd

File tree

7 files changed

+127
-89
lines changed

7 files changed

+127
-89
lines changed

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/DBSPCircuit.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public void sortOperators(Comparator<DBSPOperator> comparator) {
9595
}
9696

9797
/** Sort the nodes to be compatible with a topological order on the specified graph.
98+
* Preserves the order of inputs and outputs with respect to each other.
9899
* @param graph Topological order to enforce. Mutated by the method. */
99100
public void resort(CircuitGraph graph) {
100101
this.allOperators.clear();

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/CircuitGraph.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,17 @@
77
import org.dbsp.util.IHasId;
88
import org.dbsp.util.IIndentStream;
99
import org.dbsp.util.ToIndentableString;
10-
import org.dbsp.util.graph.DFSOrder;
1110
import org.dbsp.util.graph.DiGraph;
1211
import org.dbsp.util.graph.Port;
1312
import org.dbsp.util.Utilities;
1413

14+
import java.util.ArrayDeque;
1515
import java.util.ArrayList;
1616
import java.util.HashMap;
1717
import java.util.HashSet;
1818
import java.util.List;
1919
import java.util.Map;
20+
import java.util.Queue;
2021
import java.util.Set;
2122

2223
/* The Graph represents edges source->destination,
@@ -121,9 +122,44 @@ public IIndentStream toString(IIndentStream builder) {
121122
return builder.decrease().append("}");
122123
}
123124

124-
/** Return a topological sort of this graph */
125+
/** Return a stable topological sort of this graph */
125126
public Iterable<DBSPOperator> sort() {
126-
DFSOrder<DBSPOperator> dfs = new DFSOrder<>(this);
127-
return dfs.reversePost();
127+
// Uses Kahn's algorithm for a stable sort
128+
Map<DBSPOperator, Integer> inDegree = new HashMap<>();
129+
for (DBSPOperator op: this.nodes)
130+
Utilities.putNew(inDegree, op, 0);
131+
132+
for (DBSPOperator op : this.edges.keySet()) {
133+
for (Port<DBSPOperator> next: this.edges.get(op)) {
134+
DBSPOperator v = next.node();
135+
inDegree.put(v, inDegree.get(v) + 1);
136+
}
137+
}
138+
139+
// Queue of zero-indegree nodes, in *input order*
140+
Queue<DBSPOperator> q = new ArrayDeque<>();
141+
for (var n : this.nodes) {
142+
if (inDegree.get(n) == 0)
143+
q.add(n);
144+
}
145+
146+
List<DBSPOperator> result = new ArrayList<>();
147+
while (!q.isEmpty()) {
148+
DBSPOperator u = q.remove();
149+
result.add(u);
150+
151+
for (Port<DBSPOperator> port : this.edges.get(u)) {
152+
DBSPOperator v = port.node();
153+
int d = inDegree.get(v) - 1;
154+
inDegree.put(v, d);
155+
if (d == 0) {
156+
q.add(v);
157+
}
158+
}
159+
}
160+
161+
if (result.size() != this.nodes.size())
162+
throw new IllegalStateException("Graph has a cycle");
163+
return result;
128164
}
129165
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/LinearPostprocessRetainKeys.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,11 @@ public DBSPCircuit apply(DBSPCircuit circuit) {
9090
}
9191
}
9292

93+
if (toAdd.isEmpty())
94+
return circuit;
95+
9396
for (var p: toAdd)
9497
graph.addEdge(p.left, p.right, 0);
95-
// Note: this may reorder the graph dramatically. We should probably avoid doing this
96-
// altogether if the circuit will not change, but this would change dramatically the output
97-
// of the compiler for many programs, preventing bootstrapping.
9898
circuit.resort(graph);
9999

100100
graphs.apply(circuit);

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/util/graph/DFSOrder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ public DFSOrder(DiGraph<Node> graph) {
2323
this.postorder = new ArrayList<>();
2424
this.preorder = new ArrayList<>();
2525
this.marked = new HashSet<>();
26-
for (Node v: graph.getNodes())
26+
for (Node v: graph.getNodes()) {
2727
if (!this.marked.contains(v))
2828
this.dfs(graph, v);
29+
}
2930
}
3031

3132
// run DFS in digraph G from vertex v and compute preorder/postorder

sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/OtherTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -119,22 +119,22 @@ private DBSPCompiler compileDef() {
119119
String str = circuit.toString();
120120
String expected = """
121121
Circuit circuit {
122-
// DBSPConstantOperator s0
123-
let s0 = constant(zset!());
124-
// DBSPSourceMultisetOperator s1
122+
// DBSPSourceMultisetOperator s0
125123
// CREATE TABLE `t` (`col1` INTEGER NOT NULL, `col2` DOUBLE NOT NULL, `col3` BOOLEAN NOT NULL, `col4` VARCHAR NOT NULL, `col5` INTEGER, `col6` DOUBLE)
126-
let s1 = t();
127-
// DBSPMapOperator s2
128-
let s2 = s1.map((|p0: &Tup6<i32, d, b, s, i32?, d?>|
124+
let s0 = t();
125+
// DBSPMapOperator s1
126+
let s1 = s0.map((|p0: &Tup6<i32, d, b, s, i32?, d?>|
129127
Tup1::new(((*p0).2), )));
130128
// CREATE VIEW `v` AS
131129
// SELECT `t`.`col3`
132130
// FROM `schema`.`t` AS `t`
133-
let s3 = s2;
131+
let s2 = s1;
132+
// DBSPConstantOperator s3
133+
let s3 = constant(zset!());
134134
// CREATE VIEW `error_view` AS
135135
// SELECT `feldera_error_table`.`table_or_view_name`, `feldera_error_table`.`message`, `feldera_error_table`.`metadata`
136136
// FROM `schema`.`feldera_error_table` AS `feldera_error_table`
137-
let s4 = s0;
137+
let s4 = s3;
138138
}
139139
""";
140140
Assert.assertEquals(expected, str);

sql-to-dbsp-compiler/SQL-compiler/src/test/resources/metadataTests-generateDF.json

Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -71,22 +71,6 @@
7171
},
7272
"mir": {
7373
"s0": {
74-
"operation": "constant",
75-
"inputs": [],
76-
"calcite": {
77-
"partial": 0
78-
},
79-
"positions": [],
80-
"persistent_id": "dabdc0517fb639de8ebd480cb4350e3b2054b584a8c9c42515f268f99294f72c"
81-
}, "s1": {
82-
"operation": "constant",
83-
"inputs": [],
84-
"calcite": {
85-
"partial": 3
86-
},
87-
"positions": [],
88-
"persistent_id": "073120d9c4eea1862d5aae5f99d5753fc61d32b368fde8f3ba9a82dbd532860d"
89-
}, "s2": {
9074
"operation": "source_multiset",
9175
"inputs": [],
9276
"table": "t",
@@ -97,20 +81,20 @@
9781
{"start_line_number":1,"start_column":14,"end_line_number":1,"end_column":14}
9882
],
9983
"persistent_id": "5e6c4775639ef50da58436d192195ae13fa8cd9681af464d09f0927ebbd24bbf"
100-
}, "s3": {
84+
}, "s1": {
10185
"operation": "differentiate",
10286
"inputs": [
103-
{ "node": "s2", "output": 0 }
87+
{ "node": "s0", "output": 0 }
10488
],
10589
"calcite": {
10690
"partial": 3
10791
},
10892
"positions": [],
10993
"persistent_id": "75f6aef62d77fe19e76200d21472b7cb2e51629638054899decb829ba3df3859"
110-
}, "s4": {
94+
}, "s2": {
11195
"operation": "map_index",
11296
"inputs": [
113-
{ "node": "s3", "output": 0 }
97+
{ "node": "s1", "output": 0 }
11498
],
11599
"calcite": {
116100
"seq": [
@@ -122,10 +106,10 @@
122106
},
123107
"positions": [],
124108
"persistent_id": "82bbfee37e33a4b0fedde6bc5e8bb31f17ccce1fd1957ad029e9cba1bc79d2b4"
125-
}, "s5": {
109+
}, "s3": {
126110
"operation": "aggregate_linear_postprocess",
127111
"inputs": [
128-
{ "node": "s4", "output": 0 }
112+
{ "node": "s2", "output": 0 }
129113
],
130114
"calcite": {
131115
"partial": 3
@@ -134,10 +118,30 @@
134118
{"start_line_number":2,"start_column":25,"end_line_number":2,"end_column":33}
135119
],
136120
"persistent_id": "134c6466aaa7694ea35a60b8949ec0a4d70a5bf9bdf014d4fb9426d96aa7dbb7"
121+
}, "s4": {
122+
"operation": "map",
123+
"inputs": [
124+
{ "node": "s3", "output": 0 }
125+
],
126+
"calcite": {
127+
"partial": 3
128+
},
129+
"positions": [],
130+
"persistent_id": "bcc438bd472b190d25cfdac257bc4f4263a0003f9f939a63be4497b268ed837f"
131+
}, "s5": {
132+
"operation": "integrate",
133+
"inputs": [
134+
{ "node": "s4", "output": 0 }
135+
],
136+
"calcite": {
137+
"partial": 3
138+
},
139+
"positions": [],
140+
"persistent_id": "7f3b5c52bed875fc3ade540f9bcb1b1fb96f64fca107f34eb8e56469317c7b82"
137141
}, "s6": {
138142
"operation": "map",
139143
"inputs": [
140-
{ "node": "s5", "output": 0 }
144+
{ "node": "s3", "output": 0 }
141145
],
142146
"calcite": {
143147
"partial": 3
@@ -165,41 +169,29 @@
165169
"positions": [],
166170
"persistent_id": "7bc400efc53be162b85fdaadf583824555a184e370242b950c422e39488bfd4d"
167171
}, "s9": {
168-
"operation": "map",
169-
"inputs": [
170-
{ "node": "s5", "output": 0 }
171-
],
172+
"operation": "constant",
173+
"inputs": [],
172174
"calcite": {
173175
"partial": 3
174176
},
175177
"positions": [],
176-
"persistent_id": "bcc438bd472b190d25cfdac257bc4f4263a0003f9f939a63be4497b268ed837f"
178+
"persistent_id": "bd39cf1159208fe39f804045bb097cac071084865503769ca862515fd35f1289"
177179
}, "s10": {
178-
"operation": "integrate",
179-
"inputs": [
180-
{ "node": "s9", "output": 0 }
181-
],
182-
"calcite": {
183-
"partial": 3
184-
},
185-
"positions": [],
186-
"persistent_id": "7f3b5c52bed875fc3ade540f9bcb1b1fb96f64fca107f34eb8e56469317c7b82"
187-
}, "s11": {
188180
"operation": "sum",
189181
"inputs": [
190-
{ "node": "s1", "output": 0 },
182+
{ "node": "s9", "output": 0 },
191183
{ "node": "s8", "output": 0 },
192-
{ "node": "s10", "output": 0 }
184+
{ "node": "s5", "output": 0 }
193185
],
194186
"calcite": {
195187
"partial": 3
196188
},
197189
"positions": [],
198-
"persistent_id": "040f303a787c124052f91dfb293018d52b0e35aa98fb24e3078bca8c09e7ddb7"
199-
}, "s12": {
190+
"persistent_id": "5a531ea30f94f3cc3a4e724e1e9a72540d2a574be0d63565e5caa388c9f59221"
191+
}, "s11": {
200192
"operation": "inspect",
201193
"inputs": [
202-
{ "node": "s11", "output": 0 }
194+
{ "node": "s10", "output": 0 }
203195
],
204196
"view": "v",
205197
"calcite": {
@@ -209,11 +201,19 @@
209201
{"start_line_number":2,"start_column":1,"end_line_number":2,"end_column":40},
210202
{"start_line_number":2,"start_column":1,"end_line_number":2,"end_column":40}
211203
],
212-
"persistent_id": "c1ffe4f3132e9426e7a70371ee283614d01223417b97444e03f933502c26a854"
204+
"persistent_id": "a5563f0944296ab7968e9015c9d8ced05a8e9955dec0aa1e727f76c2f4d05489"
205+
}, "s12": {
206+
"operation": "constant",
207+
"inputs": [],
208+
"calcite": {
209+
"partial": 0
210+
},
211+
"positions": [],
212+
"persistent_id": "dabdc0517fb639de8ebd480cb4350e3b2054b584a8c9c42515f268f99294f72c"
213213
}, "s13": {
214214
"operation": "inspect",
215215
"inputs": [
216-
{ "node": "s0", "output": 0 }
216+
{ "node": "s12", "output": 0 }
217217
],
218218
"view": "error_view",
219219
"calcite": {

0 commit comments

Comments
 (0)