Skip to content

Commit 6e5a4b3

Browse files
authored
[SQL] Improve key analysis for equijoins (#2232)
* [SQL] Improve key analysis for equijoins * [SQL] Change the implementation of NOW Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent f87efc4 commit 6e5a4b3

12 files changed

Lines changed: 212 additions & 142 deletions

File tree

benchmark/feldera-sql/benchmarks/nexmark/queries/q12.sql

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ CREATE VIEW Q12 AS
22
SELECT
33
B.bidder,
44
count(*) as bid_count,
5-
TUMBLE_START(B.p_time, INTERVAL '10' SECOND) as starttime,
6-
TUMBLE_END(B.p_time, INTERVAL '10' SECOND) as endtime
7-
FROM (SELECT *, NOW() as p_time FROM bid) B
8-
GROUP BY B.bidder, TUMBLE(B.p_time, INTERVAL '10' SECOND);
5+
TUMBLE_START(B.date_time, INTERVAL '10' SECOND) as starttime,
6+
TUMBLE_END(B.date_time, INTERVAL '10' SECOND) as endtime
7+
FROM bid B
8+
GROUP BY B.bidder, TUMBLE(B.date_time, INTERVAL '10' SECOND);

docs/sql/datetime.md

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -310,17 +310,18 @@ CREATE TABLE NOW(now TIMESTAMP NOT NULL LATENESS INTERVAL 0 SECONDS);
310310
```
311311

312312
All invocations of the `NOW()` function within the program
313-
will produce the value that currently exists in this table.
314-
315-
This table does not currently get populated automatically.
316-
Instead, the user is responsible for supplying the data to this table.
317-
The user has to maintain the invariant that this table always contains a
318-
single value. Moreover, deleting a value and inserting a new one
319-
requires the newly inserted value to be larger than the original
320-
value. The user can periodically update the contents of the table with
321-
the current physical timestamp. Alternatively, they can use this table to
322-
evaluate queries over historical data by writing a series of increasing
323-
past timestamps to it.
313+
will produce the last value inserted in this table.
314+
315+
This table does not currently get populated automatically. Instead,
316+
the user is responsible for supplying the data to this table. In
317+
every step of the circuit the user has to insert a new value in this
318+
table, which should be larger than the previous value.
319+
320+
:::warning
321+
322+
Programs that use `NOW()` can be very inefficient. For example, a
323+
program such as `SELECT * FROM T WHERE T.x > NOW()` has to scan the
324+
entire table T at every step. Use this function judiciously.
324325

325326
| Operation | Description | Example |
326327
|---------------|---------------------|--------------------------------|

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/ToDotVisitor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,12 +204,14 @@ String getColor(DBSPOperator operator) {
204204
case "waterline" -> " style=filled fillcolor=lightgreen";
205205
case "controlled_filter" -> " style=filled fillcolor=cyan";
206206
case "apply", "apply2" -> " style=filled fillcolor=yellow";
207-
case "integrate_trace_retain_keys", "integrate_trace_retain_values" -> " style=filled fillcolor=pink";
207+
case "integrate_trace_retain_keys",
208+
"partitioned_rolling_aggregate_with_waterline",
209+
"integrate_trace_retain_values" -> " style=filled fillcolor=pink";
208210
// stateful operators
209211
case "distinct",
210212
// all aggregates require an upsert, which is stateful, even the ones that are linear
211213
"aggregate", "partitioned_rolling_aggregate", "aggregate_linear",
212-
"stream_aggregate", "stream_aggregate_linear", "partitioned_rolling_aggregate_with_waterline",
214+
"stream_aggregate", "stream_aggregate_linear",
213215
"partitioned_tree_aggregate",
214216
// some joins require integrators
215217
"join", "join_flatmap",

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/JoinConditionAnalyzer.java

Lines changed: 86 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
package org.dbsp.sqlCompiler.compiler.frontend;
2525

2626
import org.apache.calcite.rex.*;
27+
import org.apache.calcite.sql.SqlKind;
2728
import org.dbsp.sqlCompiler.compiler.errors.InternalCompilerError;
2829
import org.dbsp.sqlCompiler.compiler.frontend.calciteObject.CalciteObject;
2930
import org.dbsp.sqlCompiler.ir.type.DBSPType;
@@ -35,15 +36,12 @@
3536
import java.util.List;
3637
import java.util.Objects;
3738

38-
public class JoinConditionAnalyzer extends RexVisitorImpl<Void> implements IWritesLogs {
39+
public class JoinConditionAnalyzer implements IWritesLogs {
3940
private final int leftTableColumnCount;
40-
private final ConditionDecomposition result;
4141
private final TypeCompiler typeCompiler;
4242

4343
public JoinConditionAnalyzer(CalciteObject object, int leftTableColumnCount, TypeCompiler typeCompiler) {
44-
super(true);
4544
this.leftTableColumnCount = leftTableColumnCount;
46-
this.result = new ConditionDecomposition(object);
4745
this.typeCompiler = typeCompiler;
4846
}
4947

@@ -99,19 +97,80 @@ void validate() {
9997
throw new InternalCompilerError("Unexpected empty join condition", this.object);
10098
}
10199

102-
/**
103-
* Part of the join condition that is not an equality test.
104-
* @return Null if the entire condition is an equality test.
105-
*/
100+
/** Part of the join condition that is not an equality test.
101+
* @return Null if the entire condition is an equality test. */
106102
@Nullable
107103
public RexNode getLeftOver() {
108104
this.validate();
109105
return this.leftOver;
110106
}
111-
}
112107

113-
public boolean completed() {
114-
return this.result.leftOver != null;
108+
void analyzeAnd(RexCall call) {
109+
List<RexNode> operands = call.getOperands();
110+
List<RexNode> unprocessed = new ArrayList<>();
111+
for (int i = 0; i < operands.size(); i++) {
112+
RexNode operand = call.operands.get(i);
113+
if (!(operand instanceof RexCall opCall)) {
114+
unprocessed.add(operand);
115+
continue;
116+
}
117+
118+
if (opCall.op.kind != SqlKind.EQUALS &&
119+
opCall.op.kind != SqlKind.IS_NOT_DISTINCT_FROM) {
120+
unprocessed.add(operand);
121+
continue;
122+
}
123+
124+
boolean eq = this.analyzeEquals(opCall);
125+
if (!eq) {
126+
unprocessed.add(opCall);
127+
}
128+
}
129+
130+
if (!unprocessed.isEmpty()) {
131+
if (unprocessed.size() == 1) {
132+
this.setLeftOver(unprocessed.get(0));
133+
} else {
134+
call = call.clone(call.type, unprocessed);
135+
this.setLeftOver(call);
136+
}
137+
}
138+
}
139+
140+
/** Analyze an equality comparison. Return 'true' if this is suitable for an equijoin */
141+
public boolean analyzeEquals(RexCall call) {
142+
assert call.operands.size() == 2: "Expected 2 operands for equality checking";
143+
RexNode left = call.operands.get(0);
144+
RexNode right = call.operands.get(1);
145+
@Nullable
146+
Boolean leftIsLeft = JoinConditionAnalyzer.this.isLeftTableColumnReference(left);
147+
@Nullable
148+
Boolean rightIsLeft = JoinConditionAnalyzer.this.isLeftTableColumnReference(right);
149+
if (leftIsLeft == null || rightIsLeft == null) {
150+
return false;
151+
}
152+
if (leftIsLeft == rightIsLeft) {
153+
// Both columns refer to the same table.
154+
return false;
155+
}
156+
DBSPType leftType = JoinConditionAnalyzer.this.typeCompiler.convertType(
157+
left.getType(), true);
158+
DBSPType rightType = JoinConditionAnalyzer.this.typeCompiler.convertType(
159+
right.getType(), true);
160+
if (call.op.kind == SqlKind.IS_NOT_DISTINCT_FROM) {
161+
// Only used if any of the operands is not nullable
162+
if (leftType.mayBeNull && rightType.mayBeNull) {
163+
return false;
164+
}
165+
}
166+
DBSPType commonType = ExpressionCompiler.reduceType(leftType, rightType).setMayBeNull(false);
167+
if (leftIsLeft) {
168+
this.addEquality(left, right, commonType);
169+
} else {
170+
this.addEquality(right, left, commonType);
171+
}
172+
return true;
173+
}
115174
}
116175

117176
@Nullable
@@ -134,79 +193,26 @@ public Boolean isLeftTableColumnReference(RexNode node) {
134193
return ref.getIndex() < this.leftTableColumnCount;
135194
}
136195

137-
@Override
138-
public Void visitInputRef(RexInputRef ref) {
139-
this.result.setLeftOver(ref);
140-
return null;
141-
}
142-
143-
@Override
144-
public Void visitLiteral(RexLiteral lit) {
145-
this.result.setLeftOver(lit);
146-
return null;
147-
}
148-
149-
@Override
150-
public Void visitCall(RexCall call) {
151-
switch (call.op.kind) {
152-
case AND:
153-
List<RexNode> operands = call.getOperands();
154-
for (int i = 0; i < operands.size(); i++) {
155-
call.operands.get(i).accept(this);
156-
if (this.completed()) {
157-
if (i == operands.size() - 1) {
158-
// Just one left
159-
this.result.setLeftOver(operands.get(i));
160-
return null;
161-
}
162-
List<RexNode> remaining = new ArrayList<>();
163-
for (int j = i; j < operands.size(); j++)
164-
remaining.add(call.operands.get(j));
165-
call = call.clone(call.type, remaining);
166-
this.result.setLeftOver(call);
167-
return null;
168-
}
169-
}
170-
return null;
171-
case EQUALS:
172-
assert call.operands.size() == 2: "Expected 2 operands for equality checking";
173-
RexNode left = call.operands.get(0);
174-
RexNode right = call.operands.get(1);
175-
@Nullable
176-
Boolean leftIsLeft = this.isLeftTableColumnReference(left);
177-
@Nullable
178-
Boolean rightIsLeft = this.isLeftTableColumnReference(right);
179-
if (leftIsLeft == null || rightIsLeft == null) {
180-
this.result.setLeftOver(call);
181-
return null;
182-
}
183-
if (leftIsLeft == rightIsLeft) {
184-
// Both columns refer to the same table.
185-
this.result.setLeftOver(call);
186-
return null;
187-
}
188-
DBSPType leftType = this.typeCompiler.convertType(left.getType(), true);
189-
DBSPType rightType = this.typeCompiler.convertType(right.getType(), true);
190-
DBSPType commonType = ExpressionCompiler.reduceType(leftType, rightType).setMayBeNull(false);
191-
if (leftIsLeft) {
192-
this.result.addEquality(left, right, commonType);
193-
} else {
194-
this.result.addEquality(right, left, commonType);
195-
}
196-
return null;
197-
default:
198-
// We are done: we don't know how to handle this condition.
199-
this.result.setLeftOver(call);
200-
return null;
201-
}
202-
}
203-
204196
JoinConditionAnalyzer.ConditionDecomposition analyze(RexNode expression) {
205197
Logger.INSTANCE.belowLevel(this, 1)
206198
.append("Analyzing ")
207199
.append(expression.toString())
208200
.newline();
209-
expression.accept(this);
210-
return this.result;
201+
final ConditionDecomposition result = new ConditionDecomposition(CalciteObject.create(expression));
202+
if (! (expression instanceof RexCall call)) {
203+
result.setLeftOver(expression);
204+
return result;
205+
}
206+
if (call.op.kind == SqlKind.AND) {
207+
result.analyzeAnd(call);
208+
} else if (call.op.kind == SqlKind.EQUALS || call.op.kind == SqlKind.IS_NOT_DISTINCT_FROM) {
209+
boolean success = result.analyzeEquals(call);
210+
if (!success) {
211+
result.setLeftOver(call);
212+
}
213+
} else {
214+
result.setLeftOver(call);
215+
}
216+
return result;
211217
}
212218
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/inner/monotone/MonotoneTransferFunctions.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -590,8 +590,10 @@ public void postorder(DBSPApplyExpression expression) {
590590
name.startsWith("sign_") ||
591591
name.startsWith("numeric_inc") ||
592592
name.startsWith("extract_year_") ||
593+
name.startsWith("extract_millennium_") ||
594+
name.startsWith("extract_century_") ||
593595
name.startsWith("extract_epoch_") ||
594-
name.startsWith("extract_hour_Time") ||
596+
name.startsWith("extract_hour_Time_") ||
595597
name.startsWith("dateadd_") ||
596598
name.equals("hop_start_timestamp") ||
597599
name.startsWith("to_bound_") ||

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/ImplementNow.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.dbsp.sqlCompiler.compiler.visitors.outer;
22

33
import org.dbsp.sqlCompiler.circuit.DBSPCircuit;
4+
import org.dbsp.sqlCompiler.circuit.operator.DBSPDifferentiateOperator;
45
import org.dbsp.sqlCompiler.circuit.operator.DBSPFilterOperator;
56
import org.dbsp.sqlCompiler.circuit.operator.DBSPMapIndexOperator;
67
import org.dbsp.sqlCompiler.circuit.operator.DBSPMapOperator;
@@ -22,6 +23,7 @@
2223
import org.dbsp.sqlCompiler.ir.IDBSPDeclaration;
2324
import org.dbsp.sqlCompiler.ir.IDBSPOuterNode;
2425
import org.dbsp.sqlCompiler.ir.annotation.AlwaysMonotone;
26+
import org.dbsp.sqlCompiler.ir.annotation.NoInc;
2527
import org.dbsp.sqlCompiler.ir.expression.DBSPApplyExpression;
2628
import org.dbsp.sqlCompiler.ir.expression.DBSPClosureExpression;
2729
import org.dbsp.sqlCompiler.ir.expression.DBSPExpression;
@@ -187,7 +189,7 @@ public RewriteNow(IErrorReporter reporter, ICompilerComponent compiler) {
187189
this.compiler = compiler;
188190
}
189191

190-
DBSPStreamJoinOperator createJoin(DBSPUnaryOperator operator) {
192+
DBSPOperator createJoin(DBSPUnaryOperator operator) {
191193
// Index the input
192194
DBSPType inputType = operator.input().getOutputZSetElementType();
193195
DBSPVariablePath var = inputType.ref().var();
@@ -210,8 +212,12 @@ DBSPStreamJoinOperator createJoin(DBSPUnaryOperator operator) {
210212
Linq.map(fields, DBSPExpression::getType)));
211213
DBSPExpression joinFunction = new DBSPTupleExpression(fields, false)
212214
.closure(key.asParameter(), left.asParameter(), right.asParameter());
215+
assert nowIndexed != null;
216+
DBSPDifferentiateOperator dNow = new DBSPDifferentiateOperator(operator.getNode(), nowIndexed);
217+
dNow.annotations.add(new NoInc());
218+
this.addOperator(dNow);
213219
DBSPStreamJoinOperator join = new DBSPStreamJoinOperator(operator.getNode(), joinType,
214-
joinFunction, operator.isMultiset, index, Objects.requireNonNull(this.nowIndexed));
220+
joinFunction, operator.isMultiset, index, dNow);
215221
this.addOperator(join);
216222
return join;
217223
}
@@ -222,7 +228,7 @@ public void postorder(DBSPMapOperator operator) {
222228
DBSPExpression function = operator.getFunction();
223229
cn.apply(function);
224230
if (cn.found()) {
225-
DBSPStreamJoinOperator join = this.createJoin(operator);
231+
DBSPOperator join = this.createJoin(operator);
226232
RewriteNowExpression rn = new RewriteNowExpression(this.errorReporter);
227233
function = rn.apply(function).to(DBSPExpression.class);
228234
DBSPOperator result = new DBSPMapOperator(operator.getNode(), function, operator.getOutputZSetType(), join);
@@ -238,7 +244,7 @@ public void postorder(DBSPFilterOperator operator) {
238244
DBSPExpression function = operator.getFunction();
239245
cn.apply(function);
240246
if (cn.found()) {
241-
DBSPStreamJoinOperator join = this.createJoin(operator);
247+
DBSPOperator join = this.createJoin(operator);
242248
RewriteNowExpression rn = new RewriteNowExpression(this.errorReporter);
243249
function = rn.apply(function).to(DBSPExpression.class);
244250
DBSPOperator filter = new DBSPFilterOperator(operator.getNode(), function, join);

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/OptimizeIncrementalVisitor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.dbsp.sqlCompiler.circuit.operator.*;
2727
import org.dbsp.sqlCompiler.compiler.IErrorReporter;
28+
import org.dbsp.sqlCompiler.ir.annotation.NoInc;
2829
import org.dbsp.util.Linq;
2930

3031
import java.util.List;
@@ -38,6 +39,11 @@ public OptimizeIncrementalVisitor(IErrorReporter reporter) {
3839

3940
@Override
4041
public void postorder(DBSPDifferentiateOperator operator) {
42+
if (operator.hasAnnotation(p -> p.is(NoInc.class))) {
43+
this.linear(operator);
44+
return;
45+
}
46+
4147
DBSPOperator source = this.mapped(operator.input());
4248
if (source.is(DBSPIntegrateOperator.class)) {
4349
DBSPIntegrateOperator integral = source.to(DBSPIntegrateOperator.class);

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/ir/annotation/AlwaysMonotone.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,4 @@
22

33
/** This annotation indicates that a DBSPOperator has an output that is always monotone.
44
* This is intended to mark the part of the circuit that derives from the NOW table. */
5-
public class AlwaysMonotone extends Annotation {
6-
@Override
7-
public String toString() {
8-
return "AlwaysMonotone";
9-
}
10-
}
5+
public class AlwaysMonotone extends Annotation { }

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/ir/annotation/Annotation.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,9 @@
33
import org.dbsp.util.ICastable;
44

55
/** Base class for annotations that can be attached to various IR nodes. */
6-
public abstract class Annotation implements ICastable { }
6+
public abstract class Annotation implements ICastable {
7+
@Override
8+
public String toString() {
9+
return this.getClass().getSimpleName();
10+
}
11+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.dbsp.sqlCompiler.ir.annotation;
2+
3+
/** Annotation set on a differentiation operator indicating
4+
* that it should be preserved by the incremental optimization step. */
5+
public class NoInc extends Annotation { }

0 commit comments

Comments
 (0)