Skip to content

Commit a2ec1ed

Browse files
committed
[SQL] Share indexes between joins when possible
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent fc8244c commit a2ec1ed

26 files changed

+945
-44
lines changed

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/ExpressionCompiler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -932,7 +932,7 @@ public DBSPExpression visitCall(RexCall call) {
932932
DBSPType type = this.typeCompiler.convertType(node.getPositionRange(), call.getType(), false);
933933
// If type is NULL we can skip the call altogether...
934934
if (type.is(DBSPTypeNull.class))
935-
return DBSPNullLiteral.INSTANCE;
935+
return new DBSPNullLiteral();
936936
Utilities.enforce(!type.is(DBSPTypeStruct.class));
937937

938938
final RexCall finalCall = call;

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/aggregates/AggregateCompiler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -520,8 +520,9 @@ void processBasic(SqlBasicAggFunction function) {
520520
DBSPExpression increment = this.incrementOperation(
521521
node, opcode, accumulatorType, accumulator, aggregatedValue, this.filterArgument());
522522
DBSPTypeUser semigroup = new DBSPTypeUser(node, SEMIGROUP, semigroupName, false, accumulatorType);
523-
DBSPClosureExpression postProcessing = ExpressionCompiler.expandTuple(node, accumulator.field(1))
524-
.closure(accumulator);
523+
var acc2 = accumulatorType.var();
524+
DBSPClosureExpression postProcessing = ExpressionCompiler.expandTuple(node, acc2.field(1))
525+
.closure(acc2);
525526

526527
if (this.filterArgument >= 0) {
527528
aggregate = new NonLinearAggregate(

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/inner/EquivalenceContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import org.dbsp.sqlCompiler.ir.IDBSPDeclaration;
55
import org.dbsp.sqlCompiler.ir.expression.DBSPExpression;
66

7+
import javax.annotation.CheckReturnValue;
78
import javax.annotation.Nullable;
89
import java.util.List;
910

@@ -31,10 +32,12 @@ public EquivalenceContext() {
3132
this.leftToRight = new Substitution<>();
3233
}
3334

35+
@CheckReturnValue
3436
public static boolean equiv(@Nullable DBSPExpression left, @Nullable DBSPExpression right) {
3537
return new EquivalenceContext().equivalent(left, right);
3638
}
3739

40+
@CheckReturnValue
3841
public static boolean equiv(@Nullable DBSPAggregateList left, @Nullable DBSPAggregateList right) {
3942
if (left == null)
4043
return right == null;
@@ -43,6 +46,7 @@ public static boolean equiv(@Nullable DBSPAggregateList left, @Nullable DBSPAggr
4346
return left.equivalent(right);
4447
}
4548

49+
@CheckReturnValue
4650
public boolean equivalent(@Nullable DBSPExpression left, @Nullable DBSPExpression right) {
4751
if (left == null)
4852
return right == null;
@@ -51,6 +55,7 @@ public boolean equivalent(@Nullable DBSPExpression left, @Nullable DBSPExpressio
5155
return left.equivalent(this, right);
5256
}
5357

58+
@CheckReturnValue
5459
public boolean equivalent(@Nullable DBSPExpression[] left, @Nullable DBSPExpression[] right) {
5560
if (left == null)
5661
return right == null;
@@ -64,6 +69,7 @@ public boolean equivalent(@Nullable DBSPExpression[] left, @Nullable DBSPExpress
6469
return true;
6570
}
6671

72+
@CheckReturnValue
6773
public <T extends DBSPExpression> boolean equivalent(@Nullable List<T> left, @Nullable List<T> right) {
6874
if (left == null)
6975
return right == null;

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/inner/InnerRewriteVisitor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,7 @@ public VisitDecision preorder(DBSPKeywordLiteral expression) {
648648
public VisitDecision preorder(DBSPNullLiteral expression) {
649649
this.push(expression);
650650
this.pop(expression);
651-
DBSPExpression result = DBSPNullLiteral.INSTANCE;
651+
DBSPExpression result = new DBSPNullLiteral();
652652
this.map(expression, result);
653653
return VisitDecision.STOP;
654654
}
@@ -657,7 +657,7 @@ public VisitDecision preorder(DBSPNullLiteral expression) {
657657
public VisitDecision preorder(DBSPVoidLiteral expression) {
658658
this.push(expression);
659659
this.pop(expression);
660-
DBSPExpression result = DBSPVoidLiteral.INSTANCE;
660+
DBSPExpression result = new DBSPVoidLiteral();
661661
this.map(expression, result);
662662
return VisitDecision.STOP;
663663
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/inner/Simplify.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,8 @@ public void postorder(DBSPFieldExpression expression) {
449449
DBSPExpression result = source.field(expression.fieldNo);
450450
if (source.is(DBSPBaseTupleExpression.class)) {
451451
result = source.to(DBSPBaseTupleExpression.class).get(expression.fieldNo);
452+
if (source.getType().mayBeNull && !result.getType().mayBeNull)
453+
result = result.some();
452454
} if (source.is(DBSPBlockExpression.class)) {
453455
DBSPBlockExpression block = source.to(DBSPBlockExpression.class);
454456
Utilities.enforce(block.lastExpression != null);
@@ -518,7 +520,7 @@ public void postorder(DBSPIfExpression expression) {
518520
} else {
519521
result = negative;
520522
if (result == null)
521-
result = DBSPVoidLiteral.INSTANCE;
523+
result = new DBSPVoidLiteral();
522524
}
523525
}
524526
} else if (negative != null &&

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/CircuitOptimizer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,11 @@ void createOptimizer() {
108108
this.add(new RemoveIAfterD(compiler));
109109
this.add(new DeadCode(compiler, true));
110110
this.add(new Simplify(compiler).circuitRewriter(true));
111-
this.add(new RemoveFilters(compiler));
111+
this.add(new RemoveConstantFilters(compiler));
112112
this.add(new OptimizeWithGraph(compiler, g -> new OptimizeProjectionVisitor(compiler, g)));
113113
this.add(new OptimizeWithGraph(compiler,
114-
g -> new OptimizeProjections(compiler, true, g, operatorsAnalyzed)));
114+
g -> new OptimizeProjections(compiler, false, g, operatorsAnalyzed)));
115+
this.add(new ShareIndexes(compiler));
115116
// Combining Joins with subsequent filters can improve the precision of the monotonicity analysis
116117
this.add(new OptimizeWithGraph(compiler, g -> new FilterJoinVisitor(compiler, g)));
117118
this.add(new MonotoneAnalyzer(compiler));

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/OptimizeProjectionVisitor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@
3939
* - constant followed by projection
4040
* - flatmap followed by projection
4141
* - join followed by projection
42-
* - join followed by mapindex projection
43-
* - indexjoin followed by mapindex projection
44-
* - indexjoin followed by map projection
42+
* - join followed by mapIndex projection
43+
* - indexJoin followed by mapIndex projection
44+
* - indexJoin followed by map projection
4545
* Projections are map operations that have a function with a very simple
4646
* structure. The function is analyzed using the 'Projection' visitor. */
4747
public class OptimizeProjectionVisitor extends CircuitCloneWithGraphsVisitor {

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/OptimizeProjections.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@
7676
import java.util.List;
7777
import java.util.Objects;
7878

79-
/** Optimizes projections (Map or MapIndex) following various other operators. */
79+
/** Optimizes projections (Map or MapIndex) following various other operators.
80+
* TODO: unify this with OptimizeProjectionsVisitor */
8081
public class OptimizeProjections extends CircuitCloneWithGraphsVisitor {
8182
/** If true only optimize projections after joins */
8283
final boolean onlyProjections;
@@ -105,7 +106,7 @@ boolean canMergeSource(OutputPort source, int size) {
105106
@Override
106107
public void postorder(DBSPMapIndexOperator operator) {
107108
int inputFanout = this.getGraph().getFanout(operator.input().node());
108-
if (inputFanout != 1 || this.done(operator)) {
109+
if (this.done(operator) || (this.onlyProjections && inputFanout > 1)) {
109110
super.postorder(operator);
110111
return;
111112
}
@@ -163,7 +164,14 @@ public void postorder(DBSPMapIndexOperator operator) {
163164
.copyAnnotations(operator).copyAnnotations(source.simpleNode());
164165
this.map(operator, result);
165166
return;
166-
} else if ((source.node().is(DBSPIntegrateOperator.class)) ||
167+
}
168+
169+
if (inputFanout != 1) {
170+
super.postorder(operator);
171+
return;
172+
}
173+
174+
if ((source.node().is(DBSPIntegrateOperator.class)) ||
167175
source.node().is(DBSPDifferentiateOperator.class) ||
168176
source.node().is(DBSPDelayOperator.class) ||
169177
source.node().is(DBSPNegateOperator.class) ||
@@ -540,7 +548,6 @@ public void postorder(DBSPMapOperator operator) {
540548
}
541549
} else if ((source.node().is(DBSPStreamJoinOperator.class)
542550
|| source.node().is(DBSPLeftJoinOperator.class)
543-
|| source.node().is(DBSPAsofJoinOperator.class)
544551
|| source.node().is(DBSPJoinOperator.class)
545552
|| source.node().is(DBSPStarJoinOperator.class)) &&
546553
inputFanout == 1) {
@@ -555,6 +562,19 @@ public void postorder(DBSPMapOperator operator) {
555562
this.map(operator, result);
556563
return;
557564
}
565+
} else if (source.node().is(DBSPAsofJoinOperator.class) && inputFanout == 1) {
566+
Logger.INSTANCE.belowLevel(this, 2)
567+
.appendSupplier(() -> source.simpleNode().operation + " -> Map")
568+
.newline();
569+
Projection projection = new Projection(this.compiler());
570+
projection.apply(operator.getFunction());
571+
if (projection.isProjection) {
572+
// Only combine with pure projections
573+
DBSPSimpleOperator result = OptimizeProjectionVisitor.mapAfterJoin(
574+
this.compiler(), source.simpleNode(), operator);
575+
this.map(operator, result);
576+
return;
577+
}
558578
} else if ((source.node().is(DBSPJoinIndexOperator.class)
559579
|| source.node().is(DBSPStreamJoinIndexOperator.class)
560580
|| source.node().is(DBSPLeftJoinIndexOperator.class)

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/RemoveFilters.java renamed to sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/RemoveConstantFilters.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212

1313
import javax.annotation.Nullable;
1414

15-
/** Removes filters constant predicates. */
16-
public class RemoveFilters extends CircuitCloneVisitor {
17-
public RemoveFilters(DBSPCompiler compiler) {
15+
/** Removes filters with constant predicates. */
16+
public class RemoveConstantFilters extends CircuitCloneVisitor {
17+
public RemoveConstantFilters(DBSPCompiler compiler) {
1818
super(compiler, false);
1919
}
2020

0 commit comments

Comments
 (0)