Skip to content

Commit 7b350da

Browse files
committed
[SQL] Share indexes between joins when possible
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent fc8244c commit 7b350da

32 files changed

+1182
-144
lines changed

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPChainOperator.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.dbsp.sqlCompiler.compiler.frontend.calciteObject.CalciteRelNode;
1111
import org.dbsp.sqlCompiler.compiler.visitors.VisitDecision;
1212
import org.dbsp.sqlCompiler.compiler.visitors.inner.EquivalenceContext;
13+
import org.dbsp.sqlCompiler.compiler.visitors.inner.Expensive;
1314
import org.dbsp.sqlCompiler.compiler.visitors.inner.InnerVisitor;
1415
import org.dbsp.sqlCompiler.compiler.visitors.inner.Simplify;
1516
import org.dbsp.sqlCompiler.compiler.visitors.outer.CircuitVisitor;
@@ -29,6 +30,7 @@
2930
import org.dbsp.sqlCompiler.ir.type.user.DBSPTypeZSet;
3031
import org.dbsp.util.IndentStreamBuilder;
3132
import org.dbsp.util.Linq;
33+
import org.dbsp.util.Maybe;
3234
import org.dbsp.util.Utilities;
3335

3436
import javax.annotation.Nullable;
@@ -243,6 +245,90 @@ public String toString() {
243245
builder.decrease().newline();
244246
return builder.toString();
245247
}
248+
249+
/** Compose pairs of maps that can be efficiently composed, taking into advantage
250+
* the fact that function composition is associative. */
251+
public DBSPChainOperator.ComputationChain shrinkMaps(DBSPCompiler compiler) {
252+
List<DBSPChainOperator.Computation> result = new ArrayList<>();
253+
for (DBSPChainOperator.Computation comp: this.computations()) {
254+
if (result.isEmpty() || comp.kind() == DBSPChainOperator.ComputationKind.Filter) {
255+
result.add(comp);
256+
} else {
257+
DBSPChainOperator.Computation last = Utilities.removeLast(result);
258+
if (last.kind() == DBSPChainOperator.ComputationKind.Filter) {
259+
result.add(last);
260+
result.add(comp);
261+
continue;
262+
}
263+
264+
boolean expensive = Expensive.isExpensive(compiler, last.closure());
265+
if (expensive) {
266+
result.add(last);
267+
} else {
268+
DBSPClosureExpression composed;
269+
if (last.kind() == DBSPChainOperator.ComputationKind.Map) {
270+
composed = comp.closure().applyAfter(compiler, last.closure(), Maybe.MAYBE);
271+
} else {
272+
DBSPClosureExpression lastFunction = last.closure();
273+
DBSPExpression argument = new DBSPRawTupleExpression(
274+
lastFunction.body.field(0).borrow(),
275+
lastFunction.body.field(1).borrow());
276+
DBSPExpression apply = comp.closure().call(argument);
277+
composed = apply.reduce(compiler)
278+
.closure(lastFunction.parameters);
279+
}
280+
comp = new DBSPChainOperator.Computation(comp.kind(), composed);
281+
}
282+
result.add(comp);
283+
}
284+
}
285+
286+
if (result.size() == this.size())
287+
return this;
288+
return new DBSPChainOperator.ComputationChain(this.inputType(), result);
289+
}
290+
291+
/** Convert Map(m1) -> Filter(f) -> Map(m2) into Filter(f \circ m1) -> Map(m2 \circ m1) if m1 is simple */
292+
public ComputationChain shrinkMapFilterMap(DBSPCompiler compiler) {
293+
List<DBSPChainOperator.Computation> result = new ArrayList<>();
294+
if (this.size() < 3) {
295+
return this;
296+
}
297+
298+
// Find a sequence Map -> Filter -> Map/MapIndex
299+
int startIndex = -1;
300+
for (int i = 0; i < this.size() - 2; i++) {
301+
if (this.computations().get(i).kind() == DBSPChainOperator.ComputationKind.Map &&
302+
this.computations().get(i+1).kind() == DBSPChainOperator.ComputationKind.Filter &&
303+
this.computations().get(i+2).kind() != DBSPChainOperator.ComputationKind.Filter) {
304+
DBSPClosureExpression map = this.computations().get(i).closure();
305+
if (this.computations().get(i+1).closure().shouldInlineComposition(compiler, map) &&
306+
this.computations().get(i+2).closure().shouldInlineComposition(compiler, map)) {
307+
startIndex = i;
308+
break;
309+
}
310+
}
311+
result.add(this.computations().get(i));
312+
}
313+
314+
if (startIndex < 0)
315+
return this;
316+
317+
DBSPChainOperator.Computation first = this.computations().get(startIndex);
318+
DBSPChainOperator.Computation filter = this.computations().get(startIndex + 1);
319+
DBSPChainOperator.Computation third = this.computations().get(startIndex + 2);
320+
321+
DBSPClosureExpression filterMap = filter.closure().applyAfter(compiler, first.closure(), Maybe.MAYBE);
322+
DBSPClosureExpression mapMap = third.closure().applyAfter(compiler, first.closure(), Maybe.MAYBE);
323+
result.add(new DBSPChainOperator.Computation(DBSPChainOperator.ComputationKind.Filter, filterMap));
324+
result.add(new DBSPChainOperator.Computation(third.kind(), mapMap));
325+
// Keep the subsequent unchanged
326+
for (int i = startIndex + 3; i < this.size(); i++) {
327+
result.add(this.computations().get(i));
328+
}
329+
330+
return new DBSPChainOperator.ComputationChain(this.inputType(), result);
331+
}
246332
}
247333

248334
@Override

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/ToRustInnerVisitor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2326,6 +2326,8 @@ public VisitDecision preorder(DBSPFieldExpression expression) {
23262326
this.builder.append(expression.fieldNo);
23272327
if (fieldTypeIsNullable && !expression.getType().hasCopy() && !avoidRef) {
23282328
this.builder.append(".as_ref()");
2329+
} else if (fieldTypeIsNullable) {
2330+
this.builder.append(".clone()");
23292331
}
23302332
this.builder.append(")").decrease();
23312333
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/ExpressionCompiler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -932,7 +932,7 @@ public DBSPExpression visitCall(RexCall call) {
932932
DBSPType type = this.typeCompiler.convertType(node.getPositionRange(), call.getType(), false);
933933
// If type is NULL we can skip the call altogether...
934934
if (type.is(DBSPTypeNull.class))
935-
return DBSPNullLiteral.INSTANCE;
935+
return new DBSPNullLiteral();
936936
Utilities.enforce(!type.is(DBSPTypeStruct.class));
937937

938938
final RexCall finalCall = call;

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/aggregates/AggregateCompiler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -520,8 +520,9 @@ void processBasic(SqlBasicAggFunction function) {
520520
DBSPExpression increment = this.incrementOperation(
521521
node, opcode, accumulatorType, accumulator, aggregatedValue, this.filterArgument());
522522
DBSPTypeUser semigroup = new DBSPTypeUser(node, SEMIGROUP, semigroupName, false, accumulatorType);
523-
DBSPClosureExpression postProcessing = ExpressionCompiler.expandTuple(node, accumulator.field(1))
524-
.closure(accumulator);
523+
var acc2 = accumulatorType.var();
524+
DBSPClosureExpression postProcessing = ExpressionCompiler.expandTuple(node, acc2.field(1))
525+
.closure(acc2);
525526

526527
if (this.filterArgument >= 0) {
527528
aggregate = new NonLinearAggregate(

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/inner/EquivalenceContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import org.dbsp.sqlCompiler.ir.IDBSPDeclaration;
55
import org.dbsp.sqlCompiler.ir.expression.DBSPExpression;
66

7+
import javax.annotation.CheckReturnValue;
78
import javax.annotation.Nullable;
89
import java.util.List;
910

@@ -31,10 +32,12 @@ public EquivalenceContext() {
3132
this.leftToRight = new Substitution<>();
3233
}
3334

35+
@CheckReturnValue
3436
public static boolean equiv(@Nullable DBSPExpression left, @Nullable DBSPExpression right) {
3537
return new EquivalenceContext().equivalent(left, right);
3638
}
3739

40+
@CheckReturnValue
3841
public static boolean equiv(@Nullable DBSPAggregateList left, @Nullable DBSPAggregateList right) {
3942
if (left == null)
4043
return right == null;
@@ -43,6 +46,7 @@ public static boolean equiv(@Nullable DBSPAggregateList left, @Nullable DBSPAggr
4346
return left.equivalent(right);
4447
}
4548

49+
@CheckReturnValue
4650
public boolean equivalent(@Nullable DBSPExpression left, @Nullable DBSPExpression right) {
4751
if (left == null)
4852
return right == null;
@@ -51,6 +55,7 @@ public boolean equivalent(@Nullable DBSPExpression left, @Nullable DBSPExpressio
5155
return left.equivalent(this, right);
5256
}
5357

58+
@CheckReturnValue
5459
public boolean equivalent(@Nullable DBSPExpression[] left, @Nullable DBSPExpression[] right) {
5560
if (left == null)
5661
return right == null;
@@ -64,6 +69,7 @@ public boolean equivalent(@Nullable DBSPExpression[] left, @Nullable DBSPExpress
6469
return true;
6570
}
6671

72+
@CheckReturnValue
6773
public <T extends DBSPExpression> boolean equivalent(@Nullable List<T> left, @Nullable List<T> right) {
6874
if (left == null)
6975
return right == null;

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/inner/Expensive.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.dbsp.sqlCompiler.ir.IDBSPInnerNode;
77
import org.dbsp.sqlCompiler.ir.expression.DBSPApplyExpression;
88
import org.dbsp.sqlCompiler.ir.expression.DBSPApplyMethodExpression;
9+
import org.dbsp.sqlCompiler.ir.expression.DBSPExpression;
910
import org.dbsp.sqlCompiler.ir.type.DBSPType;
1011

1112
/** Visitor which detects whether an expression contains "expensive" subexpressions.
@@ -64,4 +65,10 @@ public VisitDecision preorder(DBSPApplyMethodExpression unused) {
6465
this.expensive = true;
6566
return VisitDecision.STOP;
6667
}
68+
69+
public static boolean isExpensive(DBSPCompiler compiler, DBSPExpression expression) {
70+
Expensive expensive = new Expensive(compiler);
71+
expensive.apply(expression);
72+
return expensive.isExpensive();
73+
}
6774
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/inner/InnerRewriteVisitor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,7 @@ public VisitDecision preorder(DBSPKeywordLiteral expression) {
648648
public VisitDecision preorder(DBSPNullLiteral expression) {
649649
this.push(expression);
650650
this.pop(expression);
651-
DBSPExpression result = DBSPNullLiteral.INSTANCE;
651+
DBSPExpression result = new DBSPNullLiteral();
652652
this.map(expression, result);
653653
return VisitDecision.STOP;
654654
}
@@ -657,7 +657,7 @@ public VisitDecision preorder(DBSPNullLiteral expression) {
657657
public VisitDecision preorder(DBSPVoidLiteral expression) {
658658
this.push(expression);
659659
this.pop(expression);
660-
DBSPExpression result = DBSPVoidLiteral.INSTANCE;
660+
DBSPExpression result = new DBSPVoidLiteral();
661661
this.map(expression, result);
662662
return VisitDecision.STOP;
663663
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/inner/Simplify.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,8 @@ public void postorder(DBSPFieldExpression expression) {
449449
DBSPExpression result = source.field(expression.fieldNo);
450450
if (source.is(DBSPBaseTupleExpression.class)) {
451451
result = source.to(DBSPBaseTupleExpression.class).get(expression.fieldNo);
452+
if (source.getType().mayBeNull && !result.getType().mayBeNull)
453+
result = result.some();
452454
} if (source.is(DBSPBlockExpression.class)) {
453455
DBSPBlockExpression block = source.to(DBSPBlockExpression.class);
454456
Utilities.enforce(block.lastExpression != null);
@@ -518,7 +520,7 @@ public void postorder(DBSPIfExpression expression) {
518520
} else {
519521
result = negative;
520522
if (result == null)
521-
result = DBSPVoidLiteral.INSTANCE;
523+
result = new DBSPVoidLiteral();
522524
}
523525
}
524526
} else if (negative != null &&

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/CircuitOptimizer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,11 @@ void createOptimizer() {
108108
this.add(new RemoveIAfterD(compiler));
109109
this.add(new DeadCode(compiler, true));
110110
this.add(new Simplify(compiler).circuitRewriter(true));
111-
this.add(new RemoveFilters(compiler));
111+
this.add(new RemoveConstantFilters(compiler));
112112
this.add(new OptimizeWithGraph(compiler, g -> new OptimizeProjectionVisitor(compiler, g)));
113113
this.add(new OptimizeWithGraph(compiler,
114-
g -> new OptimizeProjections(compiler, true, g, operatorsAnalyzed)));
114+
g -> new OptimizeProjections(compiler, false, g, operatorsAnalyzed)));
115+
this.add(new ShareIndexes(compiler));
115116
// Combining Joins with subsequent filters can improve the precision of the monotonicity analysis
116117
this.add(new OptimizeWithGraph(compiler, g -> new FilterJoinVisitor(compiler, g)));
117118
this.add(new MonotoneAnalyzer(compiler));

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/CloneOperatorsWithFanout.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,7 @@ boolean shouldCloneInput(DBSPUnaryOperator operator) {
3232
input.is(DBSPFilterOperator.class)))
3333
return false;
3434
DBSPClosureExpression function = input.to(DBSPSimpleOperator.class).getClosureFunction();
35-
Expensive expensive = new Expensive(this.compiler);
36-
expensive.apply(function);
37-
return !expensive.isExpensive();
35+
return Expensive.isExpensive(this.compiler, function);
3836
}
3937

4038
void cloneInput(DBSPUnaryOperator operator) {

0 commit comments

Comments
 (0)