Skip to content

Commit b59158d

Browse files
committed
[SQL] Improved unused field analysis: proper dataflow analysis, and less conservative handling of nullable tuples
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent 0614e88 commit b59158d

32 files changed

+1225
-774
lines changed

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/CalciteToDBSPCompiler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@
152152
import org.dbsp.sqlCompiler.compiler.frontend.statements.TableModifyStatement;
153153
import org.dbsp.sqlCompiler.compiler.visitors.inner.Simplify;
154154
import org.dbsp.sqlCompiler.compiler.visitors.unusedFields.FieldUseMap;
155-
import org.dbsp.sqlCompiler.compiler.visitors.unusedFields.FindUnusedFields;
155+
import org.dbsp.sqlCompiler.compiler.visitors.unusedFields.FindUsedFields;
156156
import org.dbsp.sqlCompiler.ir.DBSPParameter;
157157
import org.dbsp.sqlCompiler.ir.aggregate.DBSPFold;
158158
import org.dbsp.sqlCompiler.ir.aggregate.IAggregate;
@@ -1070,8 +1070,8 @@ DBSPSimpleOperator castOutput(CalciteRelNode node, DBSPSimpleOperator operator,
10701070
DBSPType inputElementType = operator.getOutputZSetElementType();
10711071
if (inputElementType.sameType(outputElementType))
10721072
return operator;
1073-
DBSPExpression function = inputElementType.caster(outputElementType, DBSPCastExpression.CastType.SqlUnsafe)
1074-
.reduce(this.compiler);
1073+
DBSPClosureExpression caster = inputElementType.caster(outputElementType, DBSPCastExpression.CastType.SqlUnsafe);
1074+
DBSPExpression function = caster.reduce(this.compiler);
10751075
DBSPSimpleOperator map = new DBSPMapOperator(
10761076
node, function, TypeCompiler.makeZSet(outputElementType), operator.outputPort());
10771077
this.addOperator(map);
@@ -1557,7 +1557,7 @@ private void visitJoin(LogicalJoin join) {
15571557
DBSPExpression result = makeAnd(pullLeft);
15581558
DBSPClosureExpression clo = result.closure(t);
15591559
leftPulled = new DBSPFilterOperator(node, clo, left.outputPort());
1560-
leftPulledFields = FindUnusedFields.computeUsedFields(clo, this.compiler);
1560+
leftPulledFields = FindUsedFields.computeUsedFields(clo, this.compiler);
15611561
this.addOperator(leftPulled);
15621562
}
15631563
if (!decomposition.rightPredicates.isEmpty()) {
@@ -1568,7 +1568,7 @@ private void visitJoin(LogicalJoin join) {
15681568
DBSPExpression result = makeAnd(pullRight);
15691569
DBSPClosureExpression clo = result.closure(t);
15701570
rightPulled = new DBSPFilterOperator(node, clo, right.outputPort());
1571-
rightPulledFields = FindUnusedFields.computeUsedFields(clo, this.compiler);
1571+
rightPulledFields = FindUsedFields.computeUsedFields(clo, this.compiler);
15721572
this.addOperator(rightPulled);
15731573
}
15741574

@@ -1735,7 +1735,7 @@ private void visitJoin(LogicalJoin join) {
17351735
this.addOperator(joinResult);
17361736
inner = new DBSPFilterOperator(node, postJoinCondition, joinResult.outputPort());
17371737
joinResult = inner;
1738-
conditionFields = FindUnusedFields.computeUsedFields(postJoinCondition, this.compiler);
1738+
conditionFields = FindUsedFields.computeUsedFields(postJoinCondition, this.compiler);
17391739
}
17401740
}
17411741

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/ExpressionCompiler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,10 @@ public DBSPExpression inputIndex(CalciteObject node, int index) {
241241
DBSPTypeTuple type = this.inputRow.getType().deref().to(DBSPTypeTuple.class);
242242
if (index < type.size()) {
243243
DBSPExpression field = this.inputRow.deref().field(index);
244+
if (field.getType().is(DBSPTypeNull.class)) {
245+
// Optimize away field accesses in ROW members which have null types.
246+
return field.getType().none();
247+
}
244248
return field.applyCloneIfNeeded();
245249
}
246250
if (index - type.size() < this.constants.size())

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/aggregates/AggregateCompiler.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -509,19 +509,21 @@ void processBasic(SqlBasicAggFunction function) {
509509
Linq.list(
510510
dataType.getFieldType(1).withMayBeNull(true),
511511
this.resultType));
512-
DBSPExpression zero = new DBSPRawTupleExpression(
512+
final DBSPExpression zero = new DBSPRawTupleExpression(
513513
accumulatorType.tupFields[0].none(),
514514
accumulatorType.tupFields[1].defaultValue());
515-
DBSPExpression aggregatedValue =
515+
final DBSPExpression aggregatedValue =
516516
new DBSPRawTupleExpression(
517517
ExpressionCompiler.expandTuple(node, tuple.fields[1]),
518518
ExpressionCompiler.expandTuple(node, tuple.fields[0]));
519-
DBSPVariablePath accumulator = accumulatorType.var();
520-
DBSPExpression increment = this.incrementOperation(
519+
final DBSPVariablePath accumulator = accumulatorType.var();
520+
final DBSPExpression increment = this.incrementOperation(
521521
node, opcode, accumulatorType, accumulator, aggregatedValue, this.filterArgument());
522-
DBSPTypeUser semigroup = new DBSPTypeUser(node, SEMIGROUP, semigroupName, false, accumulatorType);
523-
DBSPClosureExpression postProcessing = ExpressionCompiler.expandTuple(node, accumulator.field(1))
524-
.closure(accumulator);
522+
final DBSPTypeUser semigroup = new DBSPTypeUser(node, SEMIGROUP, semigroupName, false, accumulatorType);
523+
524+
var acc2 = accumulatorType.var();
525+
DBSPClosureExpression postProcessing = ExpressionCompiler.expandTuple(node, acc2.field(1))
526+
.closure(acc2);
525527

526528
if (this.filterArgument >= 0) {
527529
aggregate = new NonLinearAggregate(

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/inner/Simplify.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,8 @@ public void postorder(DBSPFieldExpression expression) {
449449
DBSPExpression result = source.field(expression.fieldNo);
450450
if (source.is(DBSPBaseTupleExpression.class)) {
451451
result = source.to(DBSPBaseTupleExpression.class).get(expression.fieldNo);
452+
if (source.getType().mayBeNull && !result.getType().mayBeNull)
453+
result = result.some();
452454
} if (source.is(DBSPBlockExpression.class)) {
453455
DBSPBlockExpression block = source.to(DBSPBlockExpression.class);
454456
Utilities.enforce(block.lastExpression != null);

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/CircuitOptimizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ void createOptimizer() {
121121

122122
this.add(new OptimizeWithGraph(compiler, g -> new CloneOperatorsWithFanout(compiler, g)));
123123
this.add(new LinearPostprocessRetainKeys(compiler));
124-
this.add(new IndexedInputs(compiler));
124+
this.add(new ExpandIndexedInputs(compiler));
125125
this.add(new OptimizeWithGraph(compiler, g -> new FilterJoinVisitor(compiler, g)));
126126
this.add(new DeadCode(compiler, true));
127127
this.add(new Simplify(compiler).circuitRewriter(true));
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package org.dbsp.sqlCompiler.compiler.visitors.outer;
2+
3+
import org.dbsp.sqlCompiler.circuit.operator.DBSPDeindexOperator;
4+
import org.dbsp.sqlCompiler.circuit.operator.DBSPSourceMultisetOperator;
5+
import org.dbsp.sqlCompiler.circuit.operator.DBSPSourceMapOperator;
6+
import org.dbsp.sqlCompiler.compiler.DBSPCompiler;
7+
import org.dbsp.sqlCompiler.compiler.InputColumnMetadata;
8+
import org.dbsp.sqlCompiler.compiler.visitors.unusedFields.FindCommonProjections;
9+
import org.dbsp.sqlCompiler.compiler.visitors.unusedFields.ReplaceCommonProjections;
10+
import org.dbsp.sqlCompiler.ir.type.DBSPType;
11+
import org.dbsp.sqlCompiler.ir.type.user.DBSPTypeIndexedZSet;
12+
import org.dbsp.sqlCompiler.ir.type.derived.DBSPTypeTuple;
13+
import org.dbsp.sqlCompiler.ir.type.user.DBSPTypeZSet;
14+
15+
import javax.annotation.Nullable;
16+
import java.util.ArrayList;
17+
import java.util.List;
18+
19+
/** Invokes {@link IndexedInputs} and then optimizes the circuit a bit */
20+
public class ExpandIndexedInputs extends Passes {
21+
public ExpandIndexedInputs(DBSPCompiler compiler) {
22+
super("ExpandIndexedInputs", compiler);
23+
this.add(new IndexedInputs(compiler));
24+
Graph graph = new Graph(compiler);
25+
this.add(graph);
26+
FindCommonProjections fcp = new FindCommonProjections(compiler, graph.getGraphs());
27+
this.add(fcp);
28+
this.add(new ReplaceCommonProjections(compiler, fcp));
29+
}
30+
31+
/**
32+
* Given a source node, return the type of the indexed Z-set that has as keys
33+
* the key fields, and as value the output value. Return null if there are no key fields.
34+
*/
35+
@Nullable
36+
public static DBSPTypeIndexedZSet getIndexedType(DBSPSourceMultisetOperator node) {
37+
List<DBSPType> keyFields = new ArrayList<>();
38+
List<Integer> keyColumnFields = new ArrayList<>();
39+
int i = 0;
40+
for (InputColumnMetadata inputColumnMetadata : node.metadata.getColumns()) {
41+
if (inputColumnMetadata.isPrimaryKey) {
42+
keyColumnFields.add(i);
43+
keyFields.add(inputColumnMetadata.type);
44+
}
45+
i++;
46+
}
47+
if (keyColumnFields.isEmpty()) {
48+
return null;
49+
}
50+
51+
DBSPType keyType = new DBSPTypeTuple(keyFields);
52+
DBSPTypeZSet inputType = node.outputType.to(DBSPTypeZSet.class);
53+
return new DBSPTypeIndexedZSet(node.getNode(), keyType, inputType.elementType);
54+
}
55+
56+
public static List<Integer> getKeyFields(DBSPSourceMultisetOperator node) {
57+
List<Integer> keyColumnFields = new ArrayList<>();
58+
int i = 0;
59+
for (InputColumnMetadata inputColumnMetadata : node.metadata.getColumns()) {
60+
if (inputColumnMetadata.isPrimaryKey) {
61+
keyColumnFields.add(i);
62+
}
63+
i++;
64+
}
65+
return keyColumnFields;
66+
}
67+
68+
/**
69+
* Converts {@link DBSPSourceMultisetOperator}s that have a primary key
70+
* into {@link DBSPSourceMapOperator} followed by a {@link DBSPDeindexOperator}.
71+
*/
72+
static class IndexedInputs extends CircuitCloneVisitor {
73+
public IndexedInputs(DBSPCompiler compiler) {
74+
super(compiler, false);
75+
}
76+
77+
@Override
78+
public void postorder(DBSPSourceMultisetOperator node) {
79+
DBSPTypeIndexedZSet ix = getIndexedType(node);
80+
if (ix == null) {
81+
super.postorder(node);
82+
return;
83+
}
84+
85+
List<Integer> keyColumnFields = getKeyFields(node);
86+
DBSPSourceMapOperator set = new DBSPSourceMapOperator(
87+
node.getRelNode(), node.sourceName, keyColumnFields,
88+
ix, node.originalRowType, node.metadata, node.tableName, node.comment);
89+
this.addOperator(set);
90+
DBSPDeindexOperator deindex = new DBSPDeindexOperator(node.getRelNode(), node.getFunctionNode(), set.outputPort());
91+
this.map(node, deindex);
92+
}
93+
}
94+
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/IndexedInputs.java

Lines changed: 0 additions & 75 deletions
This file was deleted.

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/OptimizeProjections.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@
4646
import org.dbsp.sqlCompiler.compiler.visitors.inner.ResolveReferences;
4747
import org.dbsp.sqlCompiler.compiler.visitors.inner.Substitution;
4848
import org.dbsp.sqlCompiler.compiler.visitors.unusedFields.FieldUseMap;
49-
import org.dbsp.sqlCompiler.compiler.visitors.unusedFields.FindUnusedFields;
49+
import org.dbsp.sqlCompiler.compiler.visitors.unusedFields.FindUsedFields;
50+
import org.dbsp.sqlCompiler.compiler.visitors.unusedFields.ParameterFieldUse;
5051
import org.dbsp.sqlCompiler.ir.DBSPParameter;
5152
import org.dbsp.sqlCompiler.ir.IDBSPDeclaration;
5253
import org.dbsp.sqlCompiler.ir.IDBSPInnerNode;
@@ -423,11 +424,11 @@ public void postorder(DBSPApplyOperator operator) {
423424
* required changes, false if they are unchanged. When the operator requires change,
424425
* it is inserted in the circuit and map is remapped to the new operator. */
425426
boolean processAggregate(DBSPStreamAggregateOperator aggregate, DBSPMapIndexOperator map) {
426-
FindUnusedFields unused = new FindUnusedFields(this.compiler);
427+
FindUsedFields unused = new FindUsedFields(this.compiler);
427428
DBSPClosureExpression function = map.getClosureFunction();
428429
Utilities.enforce(function.parameters.length == 1);
429-
unused.findUnusedFields(function);
430-
FieldUseMap useMap = unused.parameterFieldMap.get(function.parameters[0]);
430+
ParameterFieldUse uses = unused.findUsedFields(function);
431+
FieldUseMap useMap = uses.get(function.parameters[0]);
431432
FieldUseMap valueUse = useMap.field(1);
432433
DBSPTypeIndexedZSet ix = aggregate.getOutputIndexedZSetType();
433434

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/monotonicity/InsertLimiters.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.dbsp.sqlCompiler.compiler.visitors.monotone.PartiallyMonotoneTuple;
2424
import org.dbsp.sqlCompiler.compiler.visitors.monotone.ScalarMonotoneType;
2525
import org.dbsp.sqlCompiler.compiler.visitors.outer.CircuitCloneVisitor;
26-
import org.dbsp.sqlCompiler.compiler.visitors.outer.IndexedInputs;
26+
import org.dbsp.sqlCompiler.compiler.visitors.outer.ExpandIndexedInputs;
2727
import org.dbsp.sqlCompiler.compiler.visitors.outer.expansion.AggregateDeltaExpansion;
2828
import org.dbsp.sqlCompiler.compiler.visitors.outer.expansion.CommonJoinDeltaExpansion;
2929
import org.dbsp.sqlCompiler.compiler.visitors.outer.expansion.DistinctDeltaExpansion;
@@ -1591,7 +1591,7 @@ DBSPOperator processLateness(
15911591
}
15921592
DBSPSourceMultisetOperator multisetInput = operator.as(DBSPSourceMultisetOperator.class);
15931593
final DBSPTypeIndexedZSet indexedOutputType = (multisetInput != null) ?
1594-
IndexedInputs.getIndexedType(multisetInput) : null;
1594+
ExpandIndexedInputs.getIndexedType(multisetInput) : null;
15951595

15961596
List<DBSPExpression> timestamps = new ArrayList<>();
15971597
int index = 0;
@@ -1649,7 +1649,7 @@ DBSPOperator processLateness(
16491649
DBSPInputMapWithWaterlineOperator newSource = null;
16501650

16511651
if (replaceIndexedInput) {
1652-
List<Integer> keyFields = IndexedInputs.getKeyFields(multisetInput);
1652+
List<Integer> keyFields = ExpandIndexedInputs.getKeyFields(multisetInput);
16531653
// Many of these functions take the key as a parameter, although
16541654
// it is a subset of the value fields...
16551655
DBSPVariablePath k = indexedOutputType.keyType.ref().var();
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package org.dbsp.sqlCompiler.compiler.visitors.unusedFields;
2+
3+
import java.util.Objects;
4+
5+
/**
6+
* A field of the form &param.i0.i1....in
7+
*/
8+
public final class BorrowedField extends IUsedFields {
9+
private final ParameterField field;
10+
11+
public BorrowedField(ParameterField field) {
12+
this.field = field;
13+
}
14+
15+
@Override
16+
public ParameterFieldUse getParameterUse() {
17+
return this.field.getParameterUse();
18+
}
19+
20+
@Override
21+
public String toString() {
22+
return "&" + this.field;
23+
}
24+
25+
public ParameterField field() {
26+
return field;
27+
}
28+
29+
@Override
30+
public boolean equals(Object obj) {
31+
if (obj == this) return true;
32+
if (obj == null || obj.getClass() != this.getClass()) return false;
33+
var that = (BorrowedField) obj;
34+
return Objects.equals(this.field, that.field);
35+
}
36+
37+
@Override
38+
public int hashCode() {
39+
return Objects.hash(field);
40+
}
41+
}

0 commit comments

Comments
 (0)