Skip to content

Commit cfc486e

Browse files
committed
[SQL] If an operator has multiple RetainKey successors, combine them using MIN. This is currently only used for self-joins, but if it works well, we may enable it for other cases in the future
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent ca3aff4 commit cfc486e

File tree

7 files changed

+238
-25
lines changed

7 files changed

+238
-25
lines changed

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/aggregates/RankAggregate.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.dbsp.sqlCompiler.ir.type.DBSPType;
2626
import org.dbsp.sqlCompiler.ir.type.derived.DBSPTypeTupleBase;
2727
import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeInteger;
28-
import org.dbsp.util.Utilities;
2928

3029
import java.util.List;
3130

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/LowerCircuitVisitor.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import org.dbsp.sqlCompiler.circuit.operator.DBSPAggregateOperator;
44
import org.dbsp.sqlCompiler.circuit.operator.DBSPApply2Operator;
5+
import org.dbsp.sqlCompiler.circuit.operator.DBSPApplyNOperator;
56
import org.dbsp.sqlCompiler.circuit.operator.DBSPApplyOperator;
67
import org.dbsp.sqlCompiler.circuit.operator.DBSPFlatMapOperator;
78
import org.dbsp.sqlCompiler.circuit.operator.DBSPJoinFilterMapOperator;
@@ -17,6 +18,7 @@
1718
import org.dbsp.sqlCompiler.circuit.OutputPort;
1819
import org.dbsp.sqlCompiler.compiler.DBSPCompiler;
1920
import org.dbsp.sqlCompiler.compiler.errors.InternalCompilerError;
21+
import org.dbsp.sqlCompiler.ir.DBSPParameter;
2022
import org.dbsp.sqlCompiler.ir.aggregate.DBSPFold;
2123
import org.dbsp.sqlCompiler.ir.expression.DBSPApplyExpression;
2224
import org.dbsp.sqlCompiler.ir.expression.DBSPApplyMethodExpression;
@@ -237,7 +239,7 @@ public void postorder(DBSPApplyOperator node) {
237239
// Instrument apply functions to print their parameters
238240
DBSPClosureExpression func = node.getClosureFunction();
239241
DBSPExpression print = new DBSPApplyExpression("println!", DBSPTypeAny.getDefault(),
240-
new DBSPStrLiteral(func.parameters[0].name + "={:?}"), func.parameters[0].asVariable());
242+
new DBSPStrLiteral(node.id + "={:?}"), func.parameters[0].asVariable());
241243
DBSPExpression block = new DBSPBlockExpression(
242244
Linq.list(new DBSPExpressionStatement(print)),
243245
func.body);
@@ -256,7 +258,7 @@ public void postorder(DBSPApply2Operator node) {
256258
// Instrument apply functions to print their parameters
257259
DBSPClosureExpression func = node.getClosureFunction();
258260
DBSPExpression print = new DBSPApplyExpression("println!", DBSPTypeAny.getDefault(),
259-
new DBSPStrLiteral(func.parameters[0].name + "={:?}," + func.parameters[1].name + "={:?}"),
261+
new DBSPStrLiteral(node.id + "=({:?},{:?})"),
260262
func.parameters[0].asVariable(), func.parameters[1].asVariable());
261263
DBSPExpression block = new DBSPBlockExpression(
262264
Linq.list(new DBSPExpressionStatement(print)),
@@ -267,6 +269,35 @@ public void postorder(DBSPApply2Operator node) {
267269
this.map(node, instrumented);
268270
}
269271

272+
@Override
273+
public void postorder(DBSPApplyNOperator node) {
274+
if (this.getDebugLevel() < 1) {
275+
super.postorder(node);
276+
return;
277+
}
278+
// Instrument apply functions to print their parameters
279+
DBSPClosureExpression func = node.getClosureFunction();
280+
DBSPExpression[] arguments = new DBSPExpression[func.parameters.length + 1];
281+
StringBuilder builder = new StringBuilder();
282+
builder.append("=(");
283+
int index = 1;
284+
for (DBSPParameter param: func.parameters) {
285+
arguments[index] = param.asVariable();
286+
builder.append("{:?}, ");
287+
index++;
288+
}
289+
builder.append(")");
290+
291+
arguments[0] = new DBSPStrLiteral(node.id + builder.toString());
292+
DBSPExpression print = new DBSPApplyExpression("println!", DBSPTypeAny.getDefault(), arguments);
293+
DBSPExpression block = new DBSPBlockExpression(
294+
Linq.list(new DBSPExpressionStatement(print)), func.body);
295+
DBSPSimpleOperator instrumented = node.with(
296+
block.closure(func.parameters), func.getResultType(),
297+
Linq.map(node.inputs, this::mapped), false);
298+
this.map(node, instrumented);
299+
}
300+
270301
@Override
271302
public void postorder(DBSPStreamAggregateOperator node) {
272303
if (node.function != null) {

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/monotonicity/InsertLimiters.java

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -191,15 +191,14 @@ OutputPort createApply(OutputPort source, @Nullable DBSPSimpleOperator represent
191191
* <p>
192192
* |param0, param1|
193193
* (param0.0 && param1.0,
194-
* if param0.0 && param1.0 {
195-
* function(param0.1, param1.1)
194+
* if *param0.0 && *param1.0 {
195+
* function(*param0.1, *param1.1)
196196
* } else {
197197
* min
198198
* })
199199
* where 'min' is the minimum constant value with the appropriate type.
200200
* Inserts the operator in the circuit. 'param0.0' is true when param0.1 is
201201
* not the minimum legal value - i.e., the waterline has seen some data
202-
* (same on the other side).
203202
*
204203
* @param left Left input operator.
205204
* @param right Right input operator
@@ -225,24 +224,23 @@ OutputPort createApply2(OutputPort left, OutputPort right, DBSPClosureExpression
225224
return result.outputPort();
226225
}
227226

228-
/** Given a function for an apply2 operator, synthesizes an operator that performs
227+
/** Given a function for an applyN operator, synthesizes an operator that performs
229228
* the following computation:
230229
* <p>
231230
* |param0, param1, ..., paramN|
232-
* (param0.0 && param1.0 ... & paramN.0,
233-
* if param0.0 && param1.0 ... && param1.0 {
234-
* function(param0.1, param1.1, ..., param1.1)
231+
* (*param0.0 && *param1.0 ... & *paramN.0,
232+
* if *param0.0 && *param1.0 ... && *param1.0 {
233+
* function(*param0.1, *param1.1, ..., *param1.1)
235234
* } else {
236235
* min
237236
* })
238237
* where 'min' is the minimum constant value with the appropriate type.
239-
* Inserts the operator in the circuit. 'param0.0' is true when param0.1 is
240-
* not the minimum legal value - i.e., the waterline has seen some data
241-
* (same on the other side).
238+
* 'param0.0' is true when param0.1 is not the minimum legal value - i.e.,
239+
* the waterline has seen some data.
242240
*
243241
* @param function Function to apply to the data.
244242
*/
245-
OutputPort createApplyN(List<OutputPort> inputs, DBSPClosureExpression function) {
243+
public static OutputPort createApplyN(DBSPCompiler compiler, List<OutputPort> inputs, DBSPClosureExpression function) {
246244
Utilities.enforce(!inputs.isEmpty());
247245
OutputPort first = inputs.get(0);
248246
CalciteRelNode relNode = first.node().getRelNode();
@@ -253,11 +251,10 @@ OutputPort createApplyN(List<OutputPort> inputs, DBSPClosureExpression function)
253251
DBSPExpression and = ExpressionCompiler.makeBinaryExpressions(relNode, v0.get(0).getType(), DBSPOpcode.AND, v0);
254252
DBSPExpression min = function.getResultType().minimumValue();
255253
DBSPExpression cond = new DBSPTupleExpression(and,
256-
new DBSPIfExpression(relNode, and, function.call(v1), min).reduce(this.compiler));
254+
new DBSPIfExpression(relNode, and, function.call(v1), min).reduce(compiler));
257255
DBSPVariablePath[] params = vars.toArray(new DBSPVariablePath[0]);
258256
DBSPApplyNOperator result = new DBSPApplyNOperator(relNode, cond.closure(params), inputs);
259257
result.addAnnotation(Waterline.INSTANCE, DBSPApplyNOperator.class);
260-
this.addOperator(result);
261258
return result.outputPort();
262259
}
263260

@@ -1897,7 +1894,7 @@ DBSPExpression project(DBSPExpression source,
18971894
}
18981895

18991896
/** Apply MIN pointwise to two expressions */
1900-
DBSPExpression min(DBSPExpression left, DBSPExpression right) {
1897+
static DBSPExpression min(DBSPExpression left, DBSPExpression right) {
19011898
Utilities.enforce(left.getType().sameTypeIgnoringNullability(right.getType()));
19021899
if (left.getType().is(DBSPTypeBaseType.class)) {
19031900
return ExpressionCompiler.makeBinaryExpression(
@@ -1930,7 +1927,7 @@ OutputPort project(OutputPort limit, IMaybeMonotoneType source, IMaybeMonotoneTy
19301927
}
19311928

19321929
/** Given a list of expressions, combine them by applying this.min() to them pairwise */
1933-
private DBSPExpression combineMin(List<DBSPExpression> inputs) {
1930+
public static DBSPExpression combineMin(List<DBSPExpression> inputs) {
19341931
if (inputs.size() == 1)
19351932
return inputs.get(0);
19361933

@@ -1939,12 +1936,12 @@ private DBSPExpression combineMin(List<DBSPExpression> inputs) {
19391936
if (i == inputs.size() - 1) {
19401937
pairs.add(inputs.get(i));
19411938
} else {
1942-
DBSPExpression combine = this.min(inputs.get(i), inputs.get(i + 1));
1939+
DBSPExpression combine = min(inputs.get(i), inputs.get(i + 1));
19431940
pairs.add(combine);
19441941
}
19451942
}
19461943
// Recursive call with ~1/2 the number of elements
1947-
return this.combineMin(pairs);
1944+
return combineMin(pairs);
19481945
}
19491946

19501947
@Nullable
@@ -1984,9 +1981,10 @@ private OutputPort processSumOrDiff(DBSPSimpleOperator expanded) {
19841981

19851982
List<DBSPExpression> inputs = Linq.map(variables, DBSPExpression::deref);
19861983
DBSPVariablePath[] vars = variables.toArray(new DBSPVariablePath[0]);
1987-
DBSPClosureExpression min = this.combineMin(inputs).closure(vars);
1984+
DBSPClosureExpression min = combineMin(inputs).closure(vars);
19881985

1989-
OutputPort apply = this.createApplyN(projected, min);
1986+
OutputPort apply = createApplyN(compiler, projected, min);
1987+
this.addOperator(apply.node());
19901988
this.markBound(expanded.outputPort(), apply);
19911989
return apply;
19921990
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/monotonicity/MergeGC.java

Lines changed: 139 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,46 @@
11
package org.dbsp.sqlCompiler.compiler.visitors.outer.monotonicity;
22

3+
import org.dbsp.sqlCompiler.circuit.OutputPort;
4+
import org.dbsp.sqlCompiler.circuit.operator.DBSPIntegrateTraceRetainKeysOperator;
35
import org.dbsp.sqlCompiler.circuit.operator.DBSPNoopOperator;
46
import org.dbsp.sqlCompiler.circuit.operator.DBSPOperator;
57
import org.dbsp.sqlCompiler.circuit.operator.DBSPSimpleOperator;
68
import org.dbsp.sqlCompiler.circuit.operator.IGCOperator;
79
import org.dbsp.sqlCompiler.compiler.DBSPCompiler;
810
import org.dbsp.sqlCompiler.compiler.visitors.outer.CSE;
11+
import org.dbsp.sqlCompiler.compiler.visitors.outer.CircuitCloneVisitor;
912
import org.dbsp.sqlCompiler.compiler.visitors.outer.CircuitGraphs;
1013
import org.dbsp.sqlCompiler.compiler.visitors.outer.CircuitWithGraphsVisitor;
14+
import org.dbsp.sqlCompiler.compiler.visitors.outer.Graph;
1115
import org.dbsp.sqlCompiler.compiler.visitors.outer.Passes;
1216
import org.dbsp.sqlCompiler.ir.IDBSPOuterNode;
17+
import org.dbsp.sqlCompiler.ir.expression.DBSPClosureExpression;
18+
import org.dbsp.sqlCompiler.ir.expression.DBSPExpression;
19+
import org.dbsp.sqlCompiler.ir.expression.DBSPVariablePath;
20+
import org.dbsp.sqlCompiler.ir.type.derived.DBSPTypeTuple;
1321
import org.dbsp.util.Linq;
1422
import org.dbsp.util.Logger;
23+
import org.dbsp.util.Utilities;
1524
import org.dbsp.util.graph.Port;
1625

1726
import javax.annotation.Nullable;
27+
import java.util.ArrayList;
1828
import java.util.HashMap;
29+
import java.util.HashSet;
1930
import java.util.List;
2031
import java.util.Map;
32+
import java.util.Set;
2133

2234
/** Look for the following pattern:
2335
* source -> noop -> retain
2436
* -> noop -> retain
2537
* Where the two retain operators are equivalent.
2638
* Replace this with a single chain.
39+
*
40+
* <p>Replace the following pattern
41+
* source -> retainKey1
42+
* -> retainKey2
43+
* With source -> retainKey (min of control inputs)
2744
*/
2845
public class MergeGC extends Passes {
2946
/** This is a modified form of {@link CSE.FindCSE} */
@@ -97,10 +114,130 @@ public void postorder(DBSPSimpleOperator operator) {
97114
}
98115
}
99116

100-
public MergeGC(DBSPCompiler compiler, CircuitGraphs graphs) {
117+
static class FindMultipleRetainKeys extends CircuitWithGraphsVisitor {
118+
List<List<DBSPIntegrateTraceRetainKeysOperator>> toMerge;
119+
Set<DBSPIntegrateTraceRetainKeysOperator> visited;
120+
121+
FindMultipleRetainKeys(DBSPCompiler compiler, CircuitGraphs graphs) {
122+
super(compiler, graphs);
123+
this.toMerge = new ArrayList<>();
124+
this.visited = new HashSet<>();
125+
}
126+
127+
@Override
128+
public void postorder(DBSPIntegrateTraceRetainKeysOperator retain) {
129+
if (this.visited.contains(retain))
130+
return;
131+
OutputPort left = retain.inputs.get(0);
132+
List<DBSPIntegrateTraceRetainKeysOperator> common = new ArrayList<>();
133+
common.add(retain);
134+
this.visited.add(retain);
135+
var successors = this.getGraph().getSuccessors(left.node());
136+
for (var succ: successors) {
137+
var ik = succ.node().as(DBSPIntegrateTraceRetainKeysOperator.class);
138+
if (ik != null && ik != retain) {
139+
common.add(ik);
140+
this.visited.add(ik);
141+
}
142+
}
143+
if (common.size() > 1) {
144+
this.toMerge.add(common);
145+
}
146+
}
147+
}
148+
149+
static class MergeRetain extends CircuitCloneVisitor {
150+
/** Keep a counter and a list; offer a method to decrement counter */
151+
static class ListCounter<T> {
152+
int counter;
153+
public final List<T> list;
154+
155+
ListCounter(List<T> data) {
156+
this.counter = data.size();
157+
this.list = data;
158+
}
159+
160+
boolean decrement() {
161+
this.counter--;
162+
return this.counter == 0;
163+
}
164+
}
165+
166+
Map<DBSPIntegrateTraceRetainKeysOperator, ListCounter<DBSPIntegrateTraceRetainKeysOperator>> toMerge;
167+
final FindMultipleRetainKeys fmk;
168+
169+
DBSPIntegrateTraceRetainKeysOperator merge(List<DBSPIntegrateTraceRetainKeysOperator> operators) {
170+
// The shared operators must all have the same left input.
171+
Utilities.enforce(operators.size() > 1);
172+
DBSPIntegrateTraceRetainKeysOperator first = operators.get(0);
173+
OutputPort left = this.mapped(first.left());
174+
List<OutputPort> rights = Linq.map(operators, o -> this.mapped(o.right()));
175+
176+
List<DBSPVariablePath> variables = new ArrayList<>();
177+
for (var r: rights)
178+
variables.add(r.outputType().to(DBSPTypeTuple.class).getFieldType(1).ref().var());
179+
180+
List<DBSPExpression> dataFields = Linq.map(variables, DBSPExpression::deref);
181+
DBSPVariablePath[] vars = variables.toArray(new DBSPVariablePath[0]);
182+
DBSPClosureExpression min = InsertLimiters.combineMin(dataFields).closure(vars);
183+
OutputPort apply = InsertLimiters.createApplyN(this.compiler, rights, min);
184+
this.addOperator(apply.node());
185+
return new DBSPIntegrateTraceRetainKeysOperator(
186+
first.getRelNode(), first.getClosureFunction(), left, apply);
187+
}
188+
189+
public MergeRetain(DBSPCompiler compiler, FindMultipleRetainKeys fmk) {
190+
super(compiler, false);
191+
this.toMerge = new HashMap<>();
192+
this.fmk = fmk;
193+
}
194+
195+
@Override
196+
public Token startVisit(IDBSPOuterNode circuit) {
197+
for (var l: this.fmk.toMerge) {
198+
ListCounter<DBSPIntegrateTraceRetainKeysOperator> list = new ListCounter<>(l);
199+
for (var e: l) {
200+
Utilities.putNew(this.toMerge, e, list);
201+
}
202+
}
203+
return super.startVisit(circuit);
204+
}
205+
206+
@Override
207+
public void postorder(DBSPIntegrateTraceRetainKeysOperator op) {
208+
if (!this.toMerge.containsKey(op)) {
209+
super.postorder(op);
210+
return;
211+
}
212+
var listCounter = Utilities.getExists(this.toMerge, op);
213+
boolean done = listCounter.decrement();
214+
if (!done) {
215+
// DO NOT PROCESS, it will be deleted
216+
return;
217+
}
218+
219+
// Create the replacement only when the last element in the list of equivalent
220+
// retain-key operators has been processed. This ensures that all their
221+
// inputs have been processed as well.
222+
DBSPIntegrateTraceRetainKeysOperator merge = this.merge(listCounter.list);
223+
this.map(op, merge);
224+
}
225+
}
226+
227+
public MergeGC(DBSPCompiler compiler) {
101228
super("MergeGC", compiler);
102-
FindEquivalentNoops find = new FindEquivalentNoops(compiler, graphs);
229+
// Remove redundant noops
230+
Graph graphs = new Graph(compiler);
231+
this.add(graphs);
232+
FindEquivalentNoops find = new FindEquivalentNoops(compiler, graphs.getGraphs());
103233
this.add(find);
104234
this.add(new CSE.RemoveCSE(compiler, find.canonical));
235+
236+
// Merge retainKey
237+
Graph graphs1 = new Graph(compiler);
238+
this.add(graphs1);
239+
FindMultipleRetainKeys findRetain = new FindMultipleRetainKeys(compiler, graphs1.getGraphs());
240+
this.add(findRetain);
241+
this.add(new MergeRetain(compiler, findRetain));
105242
}
106243
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/monotonicity/MonotoneAnalyzer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public String toString() {
163163
if (debug)
164164
ToDot.dump(compiler, "limited.png", details, "png", result);
165165

166-
CircuitTransform merger = new OptimizeWithGraph(this.compiler, g -> new MergeGC(this.compiler, g));
166+
CircuitTransform merger = new MergeGC(this.compiler);
167167
result = merger.apply(result);
168168
graph.apply(result);
169169
CheckRetain check = new CheckRetain(this.compiler, graph.getGraphs());

0 commit comments

Comments
 (0)