Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,6 @@ public DBSPOperator withInputs(List<DBSPOperator> newInputs, boolean force) {
this.function, this.aggregate, newInputs.get(0), this.isLinear);
return this;
}

// equivalent is inherited from base class
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
* This operator does not have an explicit input; the input is coming implicitly from
* the corresponding DBSPDelayOperator. This trick makes all graphs look acyclic.
* This looks like a source operator, but it has a "hidden" input.
* The ToDot visitor will correct this situation when drawing them.
*/
* The ToDot visitor will correct this situation when drawing them. */
@NonCoreIR
public final class DBSPDelayOutputOperator extends DBSPSourceBaseOperator {
public DBSPDelayOutputOperator(CalciteObject node, DBSPType outputType, boolean isMultiset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.dbsp.sqlCompiler.compiler.frontend.calciteObject.CalciteObject;
import org.dbsp.sqlCompiler.compiler.visitors.VisitDecision;
import org.dbsp.sqlCompiler.compiler.visitors.inner.EquivalenceContext;
import org.dbsp.sqlCompiler.compiler.visitors.outer.CircuitVisitor;
import org.dbsp.sqlCompiler.ir.expression.DBSPExpression;
import org.dbsp.sqlCompiler.ir.type.DBSPType;
Expand Down Expand Up @@ -40,6 +41,19 @@ public void accept(CircuitVisitor visitor) {
visitor.pop(this);
}

@Override
public boolean equivalent(DBSPOperator other) {
if (!super.equivalent(other))
return false;
DBSPHopOperator hop = other.as(DBSPHopOperator.class);
if (hop == null)
return false;
return EquivalenceContext.equiv(this.interval, hop.interval) &&
this.timestampIndex == hop.timestampIndex &&
EquivalenceContext.equiv(this.start, hop.start) &&
EquivalenceContext.equiv(this.size, hop.size);
}

@Override
public DBSPOperator withFunction(@Nullable DBSPExpression expression, DBSPType outputType) {
return new DBSPHopOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.dbsp.sqlCompiler.compiler.frontend.calciteObject.CalciteObject;
import org.dbsp.sqlCompiler.compiler.visitors.VisitDecision;
import org.dbsp.sqlCompiler.compiler.visitors.inner.EquivalenceContext;
import org.dbsp.sqlCompiler.compiler.visitors.outer.CircuitVisitor;
import org.dbsp.sqlCompiler.ir.expression.DBSPExpression;
import org.dbsp.sqlCompiler.ir.type.DBSPType;
Expand Down Expand Up @@ -52,6 +53,17 @@ public DBSPOperator withInputs(List<DBSPOperator> newInputs, boolean force) {
return this;
}

@Override
public boolean equivalent(DBSPOperator other) {
if (!super.equivalent(other))
return false;
DBSPJoinFilterMap jfm = other.as(DBSPJoinFilterMap.class);
if (jfm == null)
return false;
return EquivalenceContext.equiv(this.filter, jfm.filter) &&
EquivalenceContext.equiv(this.map, jfm.map);
}

@Override
public void accept(CircuitVisitor visitor) {
visitor.push(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,6 @@ public DBSPOperator withInputs(List<DBSPOperator> newInputs, boolean force) {
this.function, this.aggregate, newInputs.get(0), this.isLinear);
return this;
}

// equivalent inherited from base class
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.dbsp.sqlCompiler.ir.expression.DBSPForExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPIfExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPIsNullExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPNoComparatorExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPOpcode;
import org.dbsp.sqlCompiler.ir.expression.DBSPPathExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPQualifyTypeExpression;
Expand Down Expand Up @@ -182,19 +183,28 @@ public VisitDecision preorder(DBSPSortExpression expression) {
this.builder.append(">)| -> Vec<");
expression.elementType.accept(this);
this.builder.append("> {").increase();
this.builder.append("let ec = ");
expression.comparator.accept(this);
this.builder.append(";").newline();
this.builder.append("let comp = move |a: &");
expression.elementType.accept(this);
this.builder.append(", b: &");
expression.elementType.accept(this);
this.builder.append("| { ec.compare(a, b) };");
this.builder.append("let mut v = v.clone();").newline()
// we don't use sort_unstable_by because it is
// non-deterministic
.append("v.sort_by(comp);").newline()
.append("v").newline()
if (!expression.comparator.is(DBSPNoComparatorExpression.class)) {
this.builder.append("let ec = ");
expression.comparator.accept(this);
this.builder.append(";").newline();
this.builder.append("let comp = move |a: &");
expression.elementType.accept(this);
this.builder.append(", b: &");
expression.elementType.accept(this);
this.builder.append("| { ec.compare(a, b) };").newline();
this.builder.append("let mut v = v.clone();").newline()
// we don't use sort_unstable_by because it is
// non-deterministic
.append("v.sort_by(comp);").newline();
} // otherwise the vector doesn't need to be sorted at all
if (expression.limit != null) {
this.builder.append("let mut v = v.clone();").newline();
this.builder.append("v.truncate(");
expression.limit.accept(this);
this.builder.append(");").newline();
}
this.builder.append("v");
this.builder.newline()
.decrease()
.append("}");
return VisitDecision.STOP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@
import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeInteger;
import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeMillisInterval;
import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeTimestamp;
import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeUSize;
import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeVoid;
import org.dbsp.sqlCompiler.ir.type.user.DBSPTypeIndexedZSet;
import org.dbsp.sqlCompiler.ir.type.user.DBSPTypeUser;
Expand Down Expand Up @@ -1944,10 +1945,14 @@ void visitSort(LogicalSort sort) {
if (!this.ancestors.isEmpty()) {
RelNode last = Utilities.last(this.ancestors);
if (last instanceof LogicalAggregate ||
last instanceof LogicalProject) {
last instanceof LogicalProject ||
last instanceof LogicalJoin) {
done = true;
}
}
if (sort.getCollation().getFieldCollations().isEmpty())
// We don't really need to sort; this is just a limit operator
done = true;
if (done) {
// We must drop the index we built.
DBSPDeindexOperator deindex = new DBSPDeindexOperator(node, integral);
Expand Down Expand Up @@ -1987,7 +1992,9 @@ void visitSort(LogicalSort sort) {
folder, null, index, false);
this.circuit.addOperator(agg);

DBSPSortExpression sorter = new DBSPSortExpression(node, inputRowType, comparator);
if (limit != null)
limit = limit.cast(new DBSPTypeUSize(node, false));
DBSPSortExpression sorter = new DBSPSortExpression(node, inputRowType, comparator, limit);
DBSPOperator result = new DBSPMapOperator(
node, sorter, this.makeZSet(vecType), agg);
this.assignOperator(sort, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -960,9 +960,10 @@ public VisitDecision preorder(DBSPSortExpression expression) {
this.push(expression);
DBSPExpression comparator = this.transform(expression.comparator);
DBSPType elementType = this.transform(expression.elementType);
@Nullable DBSPExpression limit = this.transformN(expression.limit);
this.pop(expression);
DBSPExpression result = new DBSPSortExpression(
expression.getNode(), elementType, comparator.to(DBSPComparatorExpression.class));
expression.getNode(), elementType, comparator.to(DBSPComparatorExpression.class), limit);
this.map(expression, result);
return VisitDecision.STOP;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.dbsp.sqlCompiler.ir.type.user.DBSPTypeVec;
import org.dbsp.util.IIndentStream;

import javax.annotation.Nullable;

/**
* Represents a closure that sorts an IndexedZSet with empty keys and
* a Vector of tuples as a value.
Expand All @@ -45,8 +47,12 @@
public final class DBSPSortExpression extends DBSPExpression {
public final DBSPComparatorExpression comparator;
public final DBSPType elementType;
@Nullable
public final DBSPExpression limit;

public DBSPSortExpression(CalciteObject node, DBSPType elementType, DBSPComparatorExpression comparator) {
public DBSPSortExpression(
CalciteObject node, DBSPType elementType,
DBSPComparatorExpression comparator, @Nullable DBSPExpression limit) {
super(node, new DBSPTypeFunction(
// Return type
new DBSPTypeVec(elementType, false),
Expand All @@ -56,6 +62,7 @@ public DBSPSortExpression(CalciteObject node, DBSPType elementType, DBSPComparat
new DBSPTypeVec(elementType, false).ref())));
this.comparator = comparator;
this.elementType = elementType;
this.limit = limit;
}

@Override
Expand All @@ -75,7 +82,8 @@ public boolean sameFields(IDBSPNode other) {
if (o == null)
return false;
return this.comparator == o.comparator &&
this.elementType == o.elementType;
this.elementType == o.elementType &&
this.limit == o.limit;
}

@Override
Expand All @@ -88,14 +96,15 @@ public IIndentStream toString(IIndentStream builder) {
@Override
public DBSPExpression deepCopy() {
return new DBSPSortExpression(this.getNode(), this.elementType,
this.comparator.deepCopy().to(DBSPComparatorExpression.class));
this.comparator.deepCopy().to(DBSPComparatorExpression.class), this.limit);
}

@Override
public boolean equivalent(EquivalenceContext context, DBSPExpression other) {
DBSPSortExpression otherExpression = other.as(DBSPSortExpression.class);
if (otherExpression == null)
return false;
return this.comparator.equivalent(context, otherExpression.comparator);
return this.comparator.equivalent(context, otherExpression.comparator) &&
EquivalenceContext.equiv(this.limit, otherExpression.limit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.dbsp.sqlCompiler.compiler.sql.simple;

import org.dbsp.sqlCompiler.compiler.CompilerOptions;
import org.dbsp.sqlCompiler.compiler.DBSPCompiler;
import org.dbsp.sqlCompiler.compiler.sql.SqlIoTest;
import org.junit.Test;

/** Regression tests that fail in incremental mode */
public class IncrementalRegressionTests extends SqlIoTest {
@Override
public DBSPCompiler testCompiler() {
CompilerOptions options = this.testOptions(true, true);
return new DBSPCompiler(options);
}

@Test
public void issue2018() {
String sql = """
CREATE TABLE customer (
c_id INT,
c_d_id INT,
c_w_id INT,
c_first VARCHAR(16),
c_middle CHAR(2),
c_last VARCHAR(16),
c_street_1 VARCHAR(20),
c_street_2 VARCHAR(20),
c_city VARCHAR(20),
c_state CHAR(2),
c_zip CHAR(9),
c_phone CHAR(16),
c_since TIMESTAMP,
c_credit CHAR(2),
c_credit_lim DECIMAL(12,2),
c_discount DECIMAL(4,4),
c_balance DECIMAL(12,2),
c_ytd_payment DECIMAL(12,2),
c_payment_cnt INT,
c_delivery_cnt INT,
c_data VARCHAR(500),
PRIMARY KEY (c_w_id, c_d_id, c_id),
FOREIGN KEY (c_w_id, c_d_id) REFERENCES district(d_w_id, d_id)
);

CREATE TABLE transaction_parameters (
txn_id INT PRIMARY KEY,
w_id INT,
d_id INT,
c_id INT,
c_w_id INT,
c_d_id INT,
c_last VARCHAR(20), -- TODO check
h_amount DECIMAL(5,2),
h_date TIMESTAMP,
datetime_ TIMESTAMP
);

-- incremental fails with this query present
CREATE VIEW cust_enum AS
SELECT c.c_first, c.c_middle, c.c_id,
c.c_street_1, c.c_street_2, c.c_city, c.c_state, c.c_zip,
c.c_phone, c.c_credit, c.c_credit_lim,
c.c_discount, c.c_balance, c.c_since
FROM customer AS c,
transaction_parameters AS t
WHERE c.c_last = t.c_last
AND c.c_d_id = t.c_d_id
AND c.c_w_id = t.c_w_id
ORDER BY c_first;

CREATE VIEW cust_agg AS
SELECT ARRAY_AGG(c_id ORDER BY c_first) AS cust_array
FROM (SELECT c.c_id, c.c_first
FROM customer AS c,
transaction_parameters AS t
WHERE c.c_last = t.c_last
AND c.c_d_id = t.c_d_id
AND c.c_w_id = t.c_w_id
ORDER BY c_first);

CREATE VIEW cust_med AS
SELECT c.c_first, c.c_middle, c.c_id,
c.c_street_1, c.c_street_2, c.c_city, c.c_state, c.c_zip,
c.c_phone, c.c_credit, c.c_credit_lim,
c.c_discount, c.c_balance, c.c_since
FROM customer as c,
cust_agg as a,
transaction_parameters as t
WHERE c.c_id = a.cust_array[(ARRAY_LENGTH(a.cust_array) / 2) + 1];
""";
this.compileRustTestCase(sql);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,64 @@
import org.junit.Ignore;
import org.junit.Test;

public class RegresssionTests extends SqlIoTest {
public class RegressionTests extends SqlIoTest {
@Test
public void issue2017() {
String sql = """
CREATE TABLE customer (
c_id INT,
c_d_id INT,
c_w_id INT,
c_first VARCHAR(16),
c_middle CHAR(2),
c_last VARCHAR(16),
c_street_1 VARCHAR(20),
c_street_2 VARCHAR(20),
c_city VARCHAR(20),
c_state CHAR(2),
c_zip CHAR(9),
c_phone CHAR(16),
c_since TIMESTAMP,
c_credit CHAR(2),
c_credit_lim DECIMAL(12,2),
c_discount DECIMAL(4,4),
c_balance DECIMAL(12,2),
c_ytd_payment DECIMAL(12,2),
c_payment_cnt INT,
c_delivery_cnt INT,
c_data VARCHAR(500),
PRIMARY KEY (c_w_id, c_d_id, c_id),
FOREIGN KEY (c_w_id, c_d_id) REFERENCES district(d_w_id, d_id)
);

CREATE TABLE transaction_parameters (
txn_id INT PRIMARY KEY,
w_id INT,
d_id INT,
c_id INT,
c_w_id INT,
c_d_id INT,
c_last VARCHAR(20), -- TODO check
h_amount DECIMAL(5,2),
h_date TIMESTAMP,
datetime_ TIMESTAMP
);

CREATE VIEW cust_max AS
SELECT c.c_first, c.c_middle, c.c_id,
c.c_street_1, c.c_street_2, c.c_city, c.c_state, c.c_zip,
c.c_phone, c.c_credit, c.c_credit_lim,
c.c_discount, c.c_balance, c.c_since
FROM customer AS c,
transaction_parameters AS t
WHERE c.c_last = t.c_last
AND c.c_d_id = t.c_d_id
AND c.c_w_id = t.c_w_id
AND c_first = (select max(c_first) from customer LIMIT 1)
LIMIT 1;""";
this.compileRustTestCase(sql);
}

@Test
public void issue1868() {
String sql = """
Expand Down