diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPAggregateOperator.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPAggregateOperator.java index b0b1bcc64ee..a5cd13d5286 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPAggregateOperator.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPAggregateOperator.java @@ -69,4 +69,6 @@ public DBSPOperator withInputs(List newInputs, boolean force) { this.function, this.aggregate, newInputs.get(0), this.isLinear); return this; } + + // equivalent is inherited from base class } diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPDelayOutputOperator.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPDelayOutputOperator.java index 0e3d94d354a..e52b5ea7810 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPDelayOutputOperator.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPDelayOutputOperator.java @@ -16,8 +16,7 @@ * This operator does not have an explicit input; the input is coming implicitly from * the corresponding DBSPDelayOperator. This trick makes all graphs look acyclic. * This looks like a source operator, but it has a "hidden" input. - * The ToDot visitor will correct this situation when drawing them. - */ + * The ToDot visitor will correct this situation when drawing them. */ @NonCoreIR public final class DBSPDelayOutputOperator extends DBSPSourceBaseOperator { public DBSPDelayOutputOperator(CalciteObject node, DBSPType outputType, boolean isMultiset, diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPHopOperator.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPHopOperator.java index d074bb2cad7..ec513bb4618 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPHopOperator.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPHopOperator.java @@ -2,6 +2,7 @@ import org.dbsp.sqlCompiler.compiler.frontend.calciteObject.CalciteObject; import org.dbsp.sqlCompiler.compiler.visitors.VisitDecision; +import org.dbsp.sqlCompiler.compiler.visitors.inner.EquivalenceContext; import org.dbsp.sqlCompiler.compiler.visitors.outer.CircuitVisitor; import org.dbsp.sqlCompiler.ir.expression.DBSPExpression; import org.dbsp.sqlCompiler.ir.type.DBSPType; @@ -40,6 +41,19 @@ public void accept(CircuitVisitor visitor) { visitor.pop(this); } + @Override + public boolean equivalent(DBSPOperator other) { + if (!super.equivalent(other)) + return false; + DBSPHopOperator hop = other.as(DBSPHopOperator.class); + if (hop == null) + return false; + return EquivalenceContext.equiv(this.interval, hop.interval) && + this.timestampIndex == hop.timestampIndex && + EquivalenceContext.equiv(this.start, hop.start) && + EquivalenceContext.equiv(this.size, hop.size); + } + @Override public DBSPOperator withFunction(@Nullable DBSPExpression expression, DBSPType outputType) { return new DBSPHopOperator( diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPJoinFilterMap.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPJoinFilterMap.java index 6440006ec6b..bf97c95ba62 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPJoinFilterMap.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPJoinFilterMap.java @@ -2,6 +2,7 @@ import org.dbsp.sqlCompiler.compiler.frontend.calciteObject.CalciteObject; import org.dbsp.sqlCompiler.compiler.visitors.VisitDecision; +import org.dbsp.sqlCompiler.compiler.visitors.inner.EquivalenceContext; import org.dbsp.sqlCompiler.compiler.visitors.outer.CircuitVisitor; import org.dbsp.sqlCompiler.ir.expression.DBSPExpression; import org.dbsp.sqlCompiler.ir.type.DBSPType; @@ -52,6 +53,17 @@ public DBSPOperator withInputs(List newInputs, boolean force) { return this; } + @Override + public boolean equivalent(DBSPOperator other) { + if (!super.equivalent(other)) + return false; + DBSPJoinFilterMap jfm = other.as(DBSPJoinFilterMap.class); + if (jfm == null) + return false; + return EquivalenceContext.equiv(this.filter, jfm.filter) && + EquivalenceContext.equiv(this.map, jfm.map); + } + @Override public void accept(CircuitVisitor visitor) { visitor.push(this); diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPStreamAggregateOperator.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPStreamAggregateOperator.java index 00541feb380..225f12ed8b5 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPStreamAggregateOperator.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPStreamAggregateOperator.java @@ -69,4 +69,6 @@ public DBSPOperator withInputs(List newInputs, boolean force) { this.function, this.aggregate, newInputs.get(0), this.isLinear); return this; } + + // equivalent inherited from base class } diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/ToRustInnerVisitor.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/ToRustInnerVisitor.java index 096a64e507e..d3e129a0979 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/ToRustInnerVisitor.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/ToRustInnerVisitor.java @@ -54,6 +54,7 @@ import org.dbsp.sqlCompiler.ir.expression.DBSPForExpression; import org.dbsp.sqlCompiler.ir.expression.DBSPIfExpression; import org.dbsp.sqlCompiler.ir.expression.DBSPIsNullExpression; +import org.dbsp.sqlCompiler.ir.expression.DBSPNoComparatorExpression; import org.dbsp.sqlCompiler.ir.expression.DBSPOpcode; import org.dbsp.sqlCompiler.ir.expression.DBSPPathExpression; import org.dbsp.sqlCompiler.ir.expression.DBSPQualifyTypeExpression; @@ -182,19 +183,28 @@ public VisitDecision preorder(DBSPSortExpression expression) { this.builder.append(">)| -> Vec<"); expression.elementType.accept(this); this.builder.append("> {").increase(); - this.builder.append("let ec = "); - expression.comparator.accept(this); - this.builder.append(";").newline(); - this.builder.append("let comp = move |a: &"); - expression.elementType.accept(this); - this.builder.append(", b: &"); - expression.elementType.accept(this); - this.builder.append("| { ec.compare(a, b) };"); - this.builder.append("let mut v = v.clone();").newline() - // we don't use sort_unstable_by because it is - // non-deterministic - .append("v.sort_by(comp);").newline() - .append("v").newline() + if (!expression.comparator.is(DBSPNoComparatorExpression.class)) { + this.builder.append("let ec = "); + expression.comparator.accept(this); + this.builder.append(";").newline(); + this.builder.append("let comp = move |a: &"); + expression.elementType.accept(this); + this.builder.append(", b: &"); + expression.elementType.accept(this); + this.builder.append("| { ec.compare(a, b) };").newline(); + this.builder.append("let mut v = v.clone();").newline() + // we don't use sort_unstable_by because it is + // non-deterministic + .append("v.sort_by(comp);").newline(); + } // otherwise the vector doesn't need to be sorted at all + if (expression.limit != null) { + this.builder.append("let mut v = v.clone();").newline(); + this.builder.append("v.truncate("); + expression.limit.accept(this); + this.builder.append(");").newline(); + } + this.builder.append("v"); + this.builder.newline() .decrease() .append("}"); return VisitDecision.STOP; diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/CalciteToDBSPCompiler.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/CalciteToDBSPCompiler.java index 0da95e41ab5..a58c298edae 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/CalciteToDBSPCompiler.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/CalciteToDBSPCompiler.java @@ -166,6 +166,7 @@ import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeInteger; import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeMillisInterval; import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeTimestamp; +import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeUSize; import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeVoid; import org.dbsp.sqlCompiler.ir.type.user.DBSPTypeIndexedZSet; import org.dbsp.sqlCompiler.ir.type.user.DBSPTypeUser; @@ -1944,10 +1945,14 @@ void visitSort(LogicalSort sort) { if (!this.ancestors.isEmpty()) { RelNode last = Utilities.last(this.ancestors); if (last instanceof LogicalAggregate || - last instanceof LogicalProject) { + last instanceof LogicalProject || + last instanceof LogicalJoin) { done = true; } } + if (sort.getCollation().getFieldCollations().isEmpty()) + // We don't really need to sort; this is just a limit operator + done = true; if (done) { // We must drop the index we built. DBSPDeindexOperator deindex = new DBSPDeindexOperator(node, integral); @@ -1987,7 +1992,9 @@ void visitSort(LogicalSort sort) { folder, null, index, false); this.circuit.addOperator(agg); - DBSPSortExpression sorter = new DBSPSortExpression(node, inputRowType, comparator); + if (limit != null) + limit = limit.cast(new DBSPTypeUSize(node, false)); + DBSPSortExpression sorter = new DBSPSortExpression(node, inputRowType, comparator, limit); DBSPOperator result = new DBSPMapOperator( node, sorter, this.makeZSet(vecType), agg); this.assignOperator(sort, result); diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/inner/InnerRewriteVisitor.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/inner/InnerRewriteVisitor.java index 847477724be..19286c69b14 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/inner/InnerRewriteVisitor.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/inner/InnerRewriteVisitor.java @@ -960,9 +960,10 @@ public VisitDecision preorder(DBSPSortExpression expression) { this.push(expression); DBSPExpression comparator = this.transform(expression.comparator); DBSPType elementType = this.transform(expression.elementType); + @Nullable DBSPExpression limit = this.transformN(expression.limit); this.pop(expression); DBSPExpression result = new DBSPSortExpression( - expression.getNode(), elementType, comparator.to(DBSPComparatorExpression.class)); + expression.getNode(), elementType, comparator.to(DBSPComparatorExpression.class), limit); this.map(expression, result); return VisitDecision.STOP; } diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/ir/expression/DBSPSortExpression.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/ir/expression/DBSPSortExpression.java index bd58ed58122..d39f97e1927 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/ir/expression/DBSPSortExpression.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/ir/expression/DBSPSortExpression.java @@ -34,6 +34,8 @@ import org.dbsp.sqlCompiler.ir.type.user.DBSPTypeVec; import org.dbsp.util.IIndentStream; +import javax.annotation.Nullable; + /** * Represents a closure that sorts an IndexedZSet with empty keys and * a Vector of tuples as a value. @@ -45,8 +47,12 @@ public final class DBSPSortExpression extends DBSPExpression { public final DBSPComparatorExpression comparator; public final DBSPType elementType; + @Nullable + public final DBSPExpression limit; - public DBSPSortExpression(CalciteObject node, DBSPType elementType, DBSPComparatorExpression comparator) { + public DBSPSortExpression( + CalciteObject node, DBSPType elementType, + DBSPComparatorExpression comparator, @Nullable DBSPExpression limit) { super(node, new DBSPTypeFunction( // Return type new DBSPTypeVec(elementType, false), @@ -56,6 +62,7 @@ public DBSPSortExpression(CalciteObject node, DBSPType elementType, DBSPComparat new DBSPTypeVec(elementType, false).ref()))); this.comparator = comparator; this.elementType = elementType; + this.limit = limit; } @Override @@ -75,7 +82,8 @@ public boolean sameFields(IDBSPNode other) { if (o == null) return false; return this.comparator == o.comparator && - this.elementType == o.elementType; + this.elementType == o.elementType && + this.limit == o.limit; } @Override @@ -88,7 +96,7 @@ public IIndentStream toString(IIndentStream builder) { @Override public DBSPExpression deepCopy() { return new DBSPSortExpression(this.getNode(), this.elementType, - this.comparator.deepCopy().to(DBSPComparatorExpression.class)); + this.comparator.deepCopy().to(DBSPComparatorExpression.class), this.limit); } @Override @@ -96,6 +104,7 @@ public boolean equivalent(EquivalenceContext context, DBSPExpression other) { DBSPSortExpression otherExpression = other.as(DBSPSortExpression.class); if (otherExpression == null) return false; - return this.comparator.equivalent(context, otherExpression.comparator); + return this.comparator.equivalent(context, otherExpression.comparator) && + EquivalenceContext.equiv(this.limit, otherExpression.limit); } } diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/IncrementalRegressionTests.java b/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/IncrementalRegressionTests.java new file mode 100644 index 00000000000..0ff072a43f2 --- /dev/null +++ b/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/IncrementalRegressionTests.java @@ -0,0 +1,93 @@ +package org.dbsp.sqlCompiler.compiler.sql.simple; + +import org.dbsp.sqlCompiler.compiler.CompilerOptions; +import org.dbsp.sqlCompiler.compiler.DBSPCompiler; +import org.dbsp.sqlCompiler.compiler.sql.SqlIoTest; +import org.junit.Test; + +/** Regression tests that fail in incremental mode */ +public class IncrementalRegressionTests extends SqlIoTest { + @Override + public DBSPCompiler testCompiler() { + CompilerOptions options = this.testOptions(true, true); + return new DBSPCompiler(options); + } + + @Test + public void issue2018() { + String sql = """ + CREATE TABLE customer ( + c_id INT, + c_d_id INT, + c_w_id INT, + c_first VARCHAR(16), + c_middle CHAR(2), + c_last VARCHAR(16), + c_street_1 VARCHAR(20), + c_street_2 VARCHAR(20), + c_city VARCHAR(20), + c_state CHAR(2), + c_zip CHAR(9), + c_phone CHAR(16), + c_since TIMESTAMP, + c_credit CHAR(2), + c_credit_lim DECIMAL(12,2), + c_discount DECIMAL(4,4), + c_balance DECIMAL(12,2), + c_ytd_payment DECIMAL(12,2), + c_payment_cnt INT, + c_delivery_cnt INT, + c_data VARCHAR(500), + PRIMARY KEY (c_w_id, c_d_id, c_id), + FOREIGN KEY (c_w_id, c_d_id) REFERENCES district(d_w_id, d_id) + ); + + CREATE TABLE transaction_parameters ( + txn_id INT PRIMARY KEY, + w_id INT, + d_id INT, + c_id INT, + c_w_id INT, + c_d_id INT, + c_last VARCHAR(20), -- TODO check + h_amount DECIMAL(5,2), + h_date TIMESTAMP, + datetime_ TIMESTAMP + ); + + -- incremental fails with this query present + CREATE VIEW cust_enum AS + SELECT c.c_first, c.c_middle, c.c_id, + c.c_street_1, c.c_street_2, c.c_city, c.c_state, c.c_zip, + c.c_phone, c.c_credit, c.c_credit_lim, + c.c_discount, c.c_balance, c.c_since + FROM customer AS c, + transaction_parameters AS t + WHERE c.c_last = t.c_last + AND c.c_d_id = t.c_d_id + AND c.c_w_id = t.c_w_id + ORDER BY c_first; + + CREATE VIEW cust_agg AS + SELECT ARRAY_AGG(c_id ORDER BY c_first) AS cust_array + FROM (SELECT c.c_id, c.c_first + FROM customer AS c, + transaction_parameters AS t + WHERE c.c_last = t.c_last + AND c.c_d_id = t.c_d_id + AND c.c_w_id = t.c_w_id + ORDER BY c_first); + + CREATE VIEW cust_med AS + SELECT c.c_first, c.c_middle, c.c_id, + c.c_street_1, c.c_street_2, c.c_city, c.c_state, c.c_zip, + c.c_phone, c.c_credit, c.c_credit_lim, + c.c_discount, c.c_balance, c.c_since + FROM customer as c, + cust_agg as a, + transaction_parameters as t + WHERE c.c_id = a.cust_array[(ARRAY_LENGTH(a.cust_array) / 2) + 1]; + """; + this.compileRustTestCase(sql); + } +} diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/RegresssionTests.java b/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/RegressionTests.java similarity index 80% rename from sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/RegresssionTests.java rename to sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/RegressionTests.java index bf3fcfa0177..d4251a9ff42 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/RegresssionTests.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/RegressionTests.java @@ -12,7 +12,64 @@ import org.junit.Ignore; import org.junit.Test; -public class RegresssionTests extends SqlIoTest { +public class RegressionTests extends SqlIoTest { + @Test + public void issue2017() { + String sql = """ + CREATE TABLE customer ( + c_id INT, + c_d_id INT, + c_w_id INT, + c_first VARCHAR(16), + c_middle CHAR(2), + c_last VARCHAR(16), + c_street_1 VARCHAR(20), + c_street_2 VARCHAR(20), + c_city VARCHAR(20), + c_state CHAR(2), + c_zip CHAR(9), + c_phone CHAR(16), + c_since TIMESTAMP, + c_credit CHAR(2), + c_credit_lim DECIMAL(12,2), + c_discount DECIMAL(4,4), + c_balance DECIMAL(12,2), + c_ytd_payment DECIMAL(12,2), + c_payment_cnt INT, + c_delivery_cnt INT, + c_data VARCHAR(500), + PRIMARY KEY (c_w_id, c_d_id, c_id), + FOREIGN KEY (c_w_id, c_d_id) REFERENCES district(d_w_id, d_id) + ); + + CREATE TABLE transaction_parameters ( + txn_id INT PRIMARY KEY, + w_id INT, + d_id INT, + c_id INT, + c_w_id INT, + c_d_id INT, + c_last VARCHAR(20), -- TODO check + h_amount DECIMAL(5,2), + h_date TIMESTAMP, + datetime_ TIMESTAMP + ); + + CREATE VIEW cust_max AS + SELECT c.c_first, c.c_middle, c.c_id, + c.c_street_1, c.c_street_2, c.c_city, c.c_state, c.c_zip, + c.c_phone, c.c_credit, c.c_credit_lim, + c.c_discount, c.c_balance, c.c_since + FROM customer AS c, + transaction_parameters AS t + WHERE c.c_last = t.c_last + AND c.c_d_id = t.c_d_id + AND c.c_w_id = t.c_w_id + AND c_first = (select max(c_first) from customer LIMIT 1) + LIMIT 1;"""; + this.compileRustTestCase(sql); + } + @Test public void issue1868() { String sql = """