Skip to content

Commit d6d3ef3

Browse files
authored
[SQL] Fix bugs in implementations of LIMIT (#2020)
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent d8dce7a commit d6d3ef3

11 files changed

Lines changed: 229 additions & 23 deletions

File tree

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPAggregateOperator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,6 @@ public DBSPOperator withInputs(List<DBSPOperator> newInputs, boolean force) {
6969
this.function, this.aggregate, newInputs.get(0), this.isLinear);
7070
return this;
7171
}
72+
73+
// equivalent is inherited from base class
7274
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPDelayOutputOperator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
* This operator does not have an explicit input; the input is coming implicitly from
1717
* the corresponding DBSPDelayOperator. This trick makes all graphs look acyclic.
1818
* This looks like a source operator, but it has a "hidden" input.
19-
* The ToDot visitor will correct this situation when drawing them.
20-
*/
19+
* The ToDot visitor will correct this situation when drawing them. */
2120
@NonCoreIR
2221
public final class DBSPDelayOutputOperator extends DBSPSourceBaseOperator {
2322
public DBSPDelayOutputOperator(CalciteObject node, DBSPType outputType, boolean isMultiset,

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPHopOperator.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import org.dbsp.sqlCompiler.compiler.frontend.calciteObject.CalciteObject;
44
import org.dbsp.sqlCompiler.compiler.visitors.VisitDecision;
5+
import org.dbsp.sqlCompiler.compiler.visitors.inner.EquivalenceContext;
56
import org.dbsp.sqlCompiler.compiler.visitors.outer.CircuitVisitor;
67
import org.dbsp.sqlCompiler.ir.expression.DBSPExpression;
78
import org.dbsp.sqlCompiler.ir.type.DBSPType;
@@ -40,6 +41,19 @@ public void accept(CircuitVisitor visitor) {
4041
visitor.pop(this);
4142
}
4243

44+
@Override
45+
public boolean equivalent(DBSPOperator other) {
46+
if (!super.equivalent(other))
47+
return false;
48+
DBSPHopOperator hop = other.as(DBSPHopOperator.class);
49+
if (hop == null)
50+
return false;
51+
return EquivalenceContext.equiv(this.interval, hop.interval) &&
52+
this.timestampIndex == hop.timestampIndex &&
53+
EquivalenceContext.equiv(this.start, hop.start) &&
54+
EquivalenceContext.equiv(this.size, hop.size);
55+
}
56+
4357
@Override
4458
public DBSPOperator withFunction(@Nullable DBSPExpression expression, DBSPType outputType) {
4559
return new DBSPHopOperator(

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPJoinFilterMap.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import org.dbsp.sqlCompiler.compiler.frontend.calciteObject.CalciteObject;
44
import org.dbsp.sqlCompiler.compiler.visitors.VisitDecision;
5+
import org.dbsp.sqlCompiler.compiler.visitors.inner.EquivalenceContext;
56
import org.dbsp.sqlCompiler.compiler.visitors.outer.CircuitVisitor;
67
import org.dbsp.sqlCompiler.ir.expression.DBSPExpression;
78
import org.dbsp.sqlCompiler.ir.type.DBSPType;
@@ -52,6 +53,17 @@ public DBSPOperator withInputs(List<DBSPOperator> newInputs, boolean force) {
5253
return this;
5354
}
5455

56+
@Override
57+
public boolean equivalent(DBSPOperator other) {
58+
if (!super.equivalent(other))
59+
return false;
60+
DBSPJoinFilterMap jfm = other.as(DBSPJoinFilterMap.class);
61+
if (jfm == null)
62+
return false;
63+
return EquivalenceContext.equiv(this.filter, jfm.filter) &&
64+
EquivalenceContext.equiv(this.map, jfm.map);
65+
}
66+
5567
@Override
5668
public void accept(CircuitVisitor visitor) {
5769
visitor.push(this);

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPStreamAggregateOperator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,6 @@ public DBSPOperator withInputs(List<DBSPOperator> newInputs, boolean force) {
6969
this.function, this.aggregate, newInputs.get(0), this.isLinear);
7070
return this;
7171
}
72+
73+
// equivalent inherited from base class
7274
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/ToRustInnerVisitor.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.dbsp.sqlCompiler.ir.expression.DBSPForExpression;
5555
import org.dbsp.sqlCompiler.ir.expression.DBSPIfExpression;
5656
import org.dbsp.sqlCompiler.ir.expression.DBSPIsNullExpression;
57+
import org.dbsp.sqlCompiler.ir.expression.DBSPNoComparatorExpression;
5758
import org.dbsp.sqlCompiler.ir.expression.DBSPOpcode;
5859
import org.dbsp.sqlCompiler.ir.expression.DBSPPathExpression;
5960
import org.dbsp.sqlCompiler.ir.expression.DBSPQualifyTypeExpression;
@@ -182,19 +183,28 @@ public VisitDecision preorder(DBSPSortExpression expression) {
182183
this.builder.append(">)| -> Vec<");
183184
expression.elementType.accept(this);
184185
this.builder.append("> {").increase();
185-
this.builder.append("let ec = ");
186-
expression.comparator.accept(this);
187-
this.builder.append(";").newline();
188-
this.builder.append("let comp = move |a: &");
189-
expression.elementType.accept(this);
190-
this.builder.append(", b: &");
191-
expression.elementType.accept(this);
192-
this.builder.append("| { ec.compare(a, b) };");
193-
this.builder.append("let mut v = v.clone();").newline()
194-
// we don't use sort_unstable_by because it is
195-
// non-deterministic
196-
.append("v.sort_by(comp);").newline()
197-
.append("v").newline()
186+
if (!expression.comparator.is(DBSPNoComparatorExpression.class)) {
187+
this.builder.append("let ec = ");
188+
expression.comparator.accept(this);
189+
this.builder.append(";").newline();
190+
this.builder.append("let comp = move |a: &");
191+
expression.elementType.accept(this);
192+
this.builder.append(", b: &");
193+
expression.elementType.accept(this);
194+
this.builder.append("| { ec.compare(a, b) };").newline();
195+
this.builder.append("let mut v = v.clone();").newline()
196+
// we don't use sort_unstable_by because it is
197+
// non-deterministic
198+
.append("v.sort_by(comp);").newline();
199+
} // otherwise the vector doesn't need to be sorted at all
200+
if (expression.limit != null) {
201+
this.builder.append("let mut v = v.clone();").newline();
202+
this.builder.append("v.truncate(");
203+
expression.limit.accept(this);
204+
this.builder.append(");").newline();
205+
}
206+
this.builder.append("v");
207+
this.builder.newline()
198208
.decrease()
199209
.append("}");
200210
return VisitDecision.STOP;

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/CalciteToDBSPCompiler.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@
166166
import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeInteger;
167167
import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeMillisInterval;
168168
import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeTimestamp;
169+
import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeUSize;
169170
import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeVoid;
170171
import org.dbsp.sqlCompiler.ir.type.user.DBSPTypeIndexedZSet;
171172
import org.dbsp.sqlCompiler.ir.type.user.DBSPTypeUser;
@@ -1944,10 +1945,14 @@ void visitSort(LogicalSort sort) {
19441945
if (!this.ancestors.isEmpty()) {
19451946
RelNode last = Utilities.last(this.ancestors);
19461947
if (last instanceof LogicalAggregate ||
1947-
last instanceof LogicalProject) {
1948+
last instanceof LogicalProject ||
1949+
last instanceof LogicalJoin) {
19481950
done = true;
19491951
}
19501952
}
1953+
if (sort.getCollation().getFieldCollations().isEmpty())
1954+
// We don't really need to sort; this is just a limit operator
1955+
done = true;
19511956
if (done) {
19521957
// We must drop the index we built.
19531958
DBSPDeindexOperator deindex = new DBSPDeindexOperator(node, integral);
@@ -1987,7 +1992,9 @@ void visitSort(LogicalSort sort) {
19871992
folder, null, index, false);
19881993
this.circuit.addOperator(agg);
19891994

1990-
DBSPSortExpression sorter = new DBSPSortExpression(node, inputRowType, comparator);
1995+
if (limit != null)
1996+
limit = limit.cast(new DBSPTypeUSize(node, false));
1997+
DBSPSortExpression sorter = new DBSPSortExpression(node, inputRowType, comparator, limit);
19911998
DBSPOperator result = new DBSPMapOperator(
19921999
node, sorter, this.makeZSet(vecType), agg);
19932000
this.assignOperator(sort, result);

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/inner/InnerRewriteVisitor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -960,9 +960,10 @@ public VisitDecision preorder(DBSPSortExpression expression) {
960960
this.push(expression);
961961
DBSPExpression comparator = this.transform(expression.comparator);
962962
DBSPType elementType = this.transform(expression.elementType);
963+
@Nullable DBSPExpression limit = this.transformN(expression.limit);
963964
this.pop(expression);
964965
DBSPExpression result = new DBSPSortExpression(
965-
expression.getNode(), elementType, comparator.to(DBSPComparatorExpression.class));
966+
expression.getNode(), elementType, comparator.to(DBSPComparatorExpression.class), limit);
966967
this.map(expression, result);
967968
return VisitDecision.STOP;
968969
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/ir/expression/DBSPSortExpression.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.dbsp.sqlCompiler.ir.type.user.DBSPTypeVec;
3535
import org.dbsp.util.IIndentStream;
3636

37+
import javax.annotation.Nullable;
38+
3739
/**
3840
* Represents a closure that sorts an IndexedZSet with empty keys and
3941
* a Vector of tuples as a value.
@@ -45,8 +47,12 @@
4547
public final class DBSPSortExpression extends DBSPExpression {
4648
public final DBSPComparatorExpression comparator;
4749
public final DBSPType elementType;
50+
@Nullable
51+
public final DBSPExpression limit;
4852

49-
public DBSPSortExpression(CalciteObject node, DBSPType elementType, DBSPComparatorExpression comparator) {
53+
public DBSPSortExpression(
54+
CalciteObject node, DBSPType elementType,
55+
DBSPComparatorExpression comparator, @Nullable DBSPExpression limit) {
5056
super(node, new DBSPTypeFunction(
5157
// Return type
5258
new DBSPTypeVec(elementType, false),
@@ -56,6 +62,7 @@ public DBSPSortExpression(CalciteObject node, DBSPType elementType, DBSPComparat
5662
new DBSPTypeVec(elementType, false).ref())));
5763
this.comparator = comparator;
5864
this.elementType = elementType;
65+
this.limit = limit;
5966
}
6067

6168
@Override
@@ -75,7 +82,8 @@ public boolean sameFields(IDBSPNode other) {
7582
if (o == null)
7683
return false;
7784
return this.comparator == o.comparator &&
78-
this.elementType == o.elementType;
85+
this.elementType == o.elementType &&
86+
this.limit == o.limit;
7987
}
8088

8189
@Override
@@ -88,14 +96,15 @@ public IIndentStream toString(IIndentStream builder) {
8896
@Override
8997
public DBSPExpression deepCopy() {
9098
return new DBSPSortExpression(this.getNode(), this.elementType,
91-
this.comparator.deepCopy().to(DBSPComparatorExpression.class));
99+
this.comparator.deepCopy().to(DBSPComparatorExpression.class), this.limit);
92100
}
93101

94102
@Override
95103
public boolean equivalent(EquivalenceContext context, DBSPExpression other) {
96104
DBSPSortExpression otherExpression = other.as(DBSPSortExpression.class);
97105
if (otherExpression == null)
98106
return false;
99-
return this.comparator.equivalent(context, otherExpression.comparator);
107+
return this.comparator.equivalent(context, otherExpression.comparator) &&
108+
EquivalenceContext.equiv(this.limit, otherExpression.limit);
100109
}
101110
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package org.dbsp.sqlCompiler.compiler.sql.simple;
2+
3+
import org.dbsp.sqlCompiler.compiler.CompilerOptions;
4+
import org.dbsp.sqlCompiler.compiler.DBSPCompiler;
5+
import org.dbsp.sqlCompiler.compiler.sql.SqlIoTest;
6+
import org.junit.Test;
7+
8+
/** Regression tests that fail in incremental mode */
9+
public class IncrementalRegressionTests extends SqlIoTest {
10+
@Override
11+
public DBSPCompiler testCompiler() {
12+
CompilerOptions options = this.testOptions(true, true);
13+
return new DBSPCompiler(options);
14+
}
15+
16+
@Test
17+
public void issue2018() {
18+
String sql = """
19+
CREATE TABLE customer (
20+
c_id INT,
21+
c_d_id INT,
22+
c_w_id INT,
23+
c_first VARCHAR(16),
24+
c_middle CHAR(2),
25+
c_last VARCHAR(16),
26+
c_street_1 VARCHAR(20),
27+
c_street_2 VARCHAR(20),
28+
c_city VARCHAR(20),
29+
c_state CHAR(2),
30+
c_zip CHAR(9),
31+
c_phone CHAR(16),
32+
c_since TIMESTAMP,
33+
c_credit CHAR(2),
34+
c_credit_lim DECIMAL(12,2),
35+
c_discount DECIMAL(4,4),
36+
c_balance DECIMAL(12,2),
37+
c_ytd_payment DECIMAL(12,2),
38+
c_payment_cnt INT,
39+
c_delivery_cnt INT,
40+
c_data VARCHAR(500),
41+
PRIMARY KEY (c_w_id, c_d_id, c_id),
42+
FOREIGN KEY (c_w_id, c_d_id) REFERENCES district(d_w_id, d_id)
43+
);
44+
45+
CREATE TABLE transaction_parameters (
46+
txn_id INT PRIMARY KEY,
47+
w_id INT,
48+
d_id INT,
49+
c_id INT,
50+
c_w_id INT,
51+
c_d_id INT,
52+
c_last VARCHAR(20), -- TODO check
53+
h_amount DECIMAL(5,2),
54+
h_date TIMESTAMP,
55+
datetime_ TIMESTAMP
56+
);
57+
58+
-- incremental fails with this query present
59+
CREATE VIEW cust_enum AS
60+
SELECT c.c_first, c.c_middle, c.c_id,
61+
c.c_street_1, c.c_street_2, c.c_city, c.c_state, c.c_zip,
62+
c.c_phone, c.c_credit, c.c_credit_lim,
63+
c.c_discount, c.c_balance, c.c_since
64+
FROM customer AS c,
65+
transaction_parameters AS t
66+
WHERE c.c_last = t.c_last
67+
AND c.c_d_id = t.c_d_id
68+
AND c.c_w_id = t.c_w_id
69+
ORDER BY c_first;
70+
71+
CREATE VIEW cust_agg AS
72+
SELECT ARRAY_AGG(c_id ORDER BY c_first) AS cust_array
73+
FROM (SELECT c.c_id, c.c_first
74+
FROM customer AS c,
75+
transaction_parameters AS t
76+
WHERE c.c_last = t.c_last
77+
AND c.c_d_id = t.c_d_id
78+
AND c.c_w_id = t.c_w_id
79+
ORDER BY c_first);
80+
81+
CREATE VIEW cust_med AS
82+
SELECT c.c_first, c.c_middle, c.c_id,
83+
c.c_street_1, c.c_street_2, c.c_city, c.c_state, c.c_zip,
84+
c.c_phone, c.c_credit, c.c_credit_lim,
85+
c.c_discount, c.c_balance, c.c_since
86+
FROM customer as c,
87+
cust_agg as a,
88+
transaction_parameters as t
89+
WHERE c.c_id = a.cust_array[(ARRAY_LENGTH(a.cust_array) / 2) + 1];
90+
""";
91+
this.compileRustTestCase(sql);
92+
}
93+
}

0 commit comments

Comments
 (0)