Skip to content

Commit 0cbde5e

Browse files
committed
[SQL] Support for ORDER BY with OFFSET
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent c110dcd commit 0cbde5e

File tree

11 files changed

+484
-51
lines changed

11 files changed

+484
-51
lines changed

docs.feldera.com/docs/sql/grammar.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,9 @@ query
283283
| query INTERSECT [ ALL | DISTINCT ] query
284284
}
285285
[ ORDER BY orderItem [, orderItem ]* ]
286-
[ LIMIT { count | ALL } ]
286+
[ LIMIT [ start, ] { count | ALL } ]
287+
[ OFFSET start [ { ROW | ROWS } ] ]
288+
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY ]
287289
288290
289291
withItem

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/ToRustInnerVisitor.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -344,12 +344,6 @@ public VisitDecision preorder(DBSPSortExpression expression) {
344344
// we don't use sort_unstable_by because it is
345345
// non-deterministic
346346
.append("v.sort_by(comp);").newline();
347-
if (expression.limit != null) {
348-
this.builder.append("let mut v = (**array).clone();").newline();
349-
this.builder.append("v.truncate(");
350-
expression.limit.accept(this);
351-
this.builder.append(");").newline();
352-
}
353347
this.builder.append("v.into()");
354348
this.builder.newline()
355349
.decrease()

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/CalciteToDBSPCompiler.java

Lines changed: 60 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
import org.dbsp.sqlCompiler.circuit.operator.DBSPSourceMultisetOperator;
9797
import org.dbsp.sqlCompiler.circuit.operator.DBSPSourceTableOperator;
9898
import org.dbsp.sqlCompiler.circuit.operator.DBSPStreamAntiJoinOperator;
99+
import org.dbsp.sqlCompiler.circuit.operator.DBSPSubtractOperator;
99100
import org.dbsp.sqlCompiler.circuit.operator.DBSPViewDeclarationOperator;
100101
import org.dbsp.sqlCompiler.circuit.operator.DBSPStreamAggregateOperator;
101102
import org.dbsp.sqlCompiler.circuit.operator.DBSPStreamDistinctOperator;
@@ -2743,19 +2744,23 @@ void visitSort(LogicalSort sort) {
27432744
IntermediateRel node = CalciteObject.create(sort);
27442745
RelNode input = sort.getInput();
27452746
DBSPSimpleOperator opInput = this.getOperator(input);
2746-
if (this.options.languageOptions.ignoreOrderBy && sort.fetch == null) {
2747+
if (this.options.languageOptions.ignoreOrderBy && sort.fetch == null && sort.offset == null) {
27472748
this.warnNoSort(node);
27482749
Utilities.putNew(this.nodeOperator, sort, opInput);
27492750
return;
27502751
}
2751-
if (sort.offset != null)
2752-
throw new UnimplementedException("OFFSET in SORT not yet implemented", 172, node);
27532752

27542753
DBSPExpression limit = null;
27552754
if (sort.fetch != null) {
27562755
ExpressionCompiler expressionCompiler = new ExpressionCompiler(sort, null, this.compiler);
2757-
// We expect the limit to be a constant
27582756
limit = expressionCompiler.compile(sort.fetch);
2757+
limit = limit.cast(limit.getNode(), DBSPTypeUSize.create(limit.getType().mayBeNull), false);
2758+
}
2759+
DBSPExpression offset = null;
2760+
if (sort.offset != null) {
2761+
ExpressionCompiler expressionCompiler = new ExpressionCompiler(sort, null, this.compiler);
2762+
offset = expressionCompiler.compile(sort.offset);
2763+
offset = offset.cast(offset.getNode(), DBSPTypeUSize.create(offset.getType().mayBeNull), false);
27592764
}
27602765

27612766
DBSPType inputRowType = this.convertType(node.getPositionRange(), input.getRowType(), false);
@@ -2773,12 +2778,19 @@ void visitSort(LogicalSort sort) {
27732778
// Generate comparison function for sorting the vector
27742779
DBSPComparatorExpression comparator = generateComparator(
27752780
node, sort.getCollation().getFieldCollations(), inputRowType, false);
2776-
if (sort.fetch != null) {
2777-
// TopK operator.
2781+
DBSPEqualityComparatorExpression eq = new DBSPEqualityComparatorExpression(node, comparator);
2782+
2783+
DBSPExpression total = null;
2784+
if (offset != null && limit != null) {
2785+
total = ExpressionCompiler.makeBinaryExpression(
2786+
node, limit.getType(), DBSPOpcode.ADD, limit, offset);
2787+
}
2788+
2789+
if (limit != null || offset != null) {
2790+
// We build one or two TopK operators: one for total and one for offset
27782791
// Since TopK is always incremental we have to wrap it into a D-I pair
27792792
DBSPDifferentiateOperator diff = new DBSPDifferentiateOperator(node, index.outputPort());
27802793
this.addOperator(diff);
2781-
DBSPEqualityComparatorExpression eq = new DBSPEqualityComparatorExpression(node, comparator);
27822794

27832795
// Output producer is (index, row) -> row
27842796
DBSPVariablePath left = DBSPTypeInteger.getType(node, INT64, false).var();
@@ -2787,21 +2799,49 @@ void visitSort(LogicalSort sort) {
27872799
DBSPTupleExpression tuple = new DBSPTupleExpression(flattened, false);
27882800
DBSPClosureExpression outputProducer = tuple.closure(left, right);
27892801

2790-
limit = limit.cast(limit.getNode(), DBSPTypeUSize.create(limit.getType().mayBeNull), false);
2791-
DBSPIndexedTopKOperator topK = new DBSPIndexedTopKOperator(
2792-
node, DBSPIndexedTopKOperator.TopKNumbering.ROW_NUMBER,
2793-
comparator, limit, eq, outputProducer, diff.outputPort());
2794-
this.addOperator(topK);
2795-
DBSPIntegrateOperator integral = new DBSPIntegrateOperator(node, topK.outputPort());
2796-
this.addOperator(integral);
2802+
// TopK operator to compute the first offset rows, which will be dropped
2803+
DBSPSimpleOperator offsetOperator = null;
2804+
if (offset != null) {
2805+
offsetOperator = new DBSPIndexedTopKOperator(
2806+
node, DBSPIndexedTopKOperator.TopKNumbering.ROW_NUMBER,
2807+
comparator, offset, eq, outputProducer, diff.outputPort());
2808+
this.addOperator(offsetOperator);
2809+
offsetOperator = new DBSPIntegrateOperator(node, offsetOperator.outputPort());
2810+
this.addOperator(offsetOperator);
2811+
2812+
if (limit != null)
2813+
limit = total;
2814+
}
2815+
2816+
DBSPSimpleOperator limitOperator = null;
2817+
if (limit != null) {
2818+
limitOperator = new DBSPIndexedTopKOperator(
2819+
node, DBSPIndexedTopKOperator.TopKNumbering.ROW_NUMBER,
2820+
comparator, limit, eq, outputProducer, diff.outputPort());
2821+
this.addOperator(limitOperator);
2822+
limitOperator = new DBSPIntegrateOperator(node, limitOperator.outputPort());
2823+
this.addOperator(limitOperator);
2824+
}
2825+
2826+
if (offsetOperator != null) {
2827+
if (limitOperator != null) {
2828+
limitOperator = new DBSPSubtractOperator(node, limitOperator.outputPort(), offsetOperator.outputPort());
2829+
} else {
2830+
limitOperator = new DBSPSubtractOperator(node, index.outputPort(), offsetOperator.outputPort());
2831+
}
2832+
this.addOperator(limitOperator);
2833+
}
2834+
27972835
// If we ignore ORDER BY this is the result.
27982836
boolean done = this.options.languageOptions.ignoreOrderBy;
2799-
// We can also ignore the order by for some ancestors
2837+
// We can also ignore the order by for ancestor nodes that do not care about it.
2838+
// Perhaps the optimizer should have handled this case.
28002839
if (!this.ancestors.isEmpty()) {
28012840
RelNode last = Utilities.last(this.ancestors);
28022841
if (last instanceof LogicalAggregate ||
28032842
last instanceof LogicalProject ||
2804-
last instanceof LogicalJoin) {
2843+
last instanceof LogicalJoin ||
2844+
last instanceof LogicalAsofJoin) {
28052845
done = true;
28062846
}
28072847
}
@@ -2812,14 +2852,15 @@ void visitSort(LogicalSort sort) {
28122852
this.warnNoSort(node);
28132853
if (done) {
28142854
// We must drop the index we built.
2815-
DBSPDeindexOperator deindex = new DBSPDeindexOperator(node.getFinal(), node, integral.outputPort());
2855+
DBSPDeindexOperator deindex = new DBSPDeindexOperator(node.getFinal(), node, limitOperator.outputPort());
28162856
this.assignOperator(sort, deindex);
28172857
return;
28182858
}
28192859
// Otherwise we have to sort again in a vector!
28202860
// Fall through, continuing from the integral.
2821-
index = integral;
2861+
index = limitOperator;
28222862
}
2863+
28232864
// Global sort. Implemented by aggregate in a single Vec<> which is then sorted.
28242865
// Apply an aggregation function that just creates a vector.
28252866
DBSPTypeArray arrayType = new DBSPTypeArray(inputRowType, false);
@@ -2843,9 +2884,7 @@ void visitSort(LogicalSort sort) {
28432884
folder, null, index.outputPort());
28442885
this.addOperator(agg);
28452886

2846-
if (limit != null)
2847-
limit = limit.cast(limit.getNode(), DBSPTypeUSize.INSTANCE, false);
2848-
DBSPSortExpression sorter = new DBSPSortExpression(node, inputRowType, comparator, limit);
2887+
DBSPSortExpression sorter = new DBSPSortExpression(node, inputRowType, comparator);
28492888
DBSPSimpleOperator result = new DBSPMapOperator(
28502889
node.getFinal(), sorter, TypeCompiler.makeZSet(arrayType), agg.outputPort());
28512890
this.assignOperator(sort, result);

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/inner/ExpressionTranslator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -404,9 +404,8 @@ public void postorder(DBSPSomeExpression node) {
404404
@Override
405405
public void postorder(DBSPSortExpression node) {
406406
DBSPExpression comparator = this.getE(node.comparator);
407-
DBSPExpression limit = this.getEN(node.limit);
408407
this.map(node, new DBSPSortExpression(node.getNode(), node.elementType,
409-
comparator.to(DBSPComparatorExpression.class), limit));
408+
comparator.to(DBSPComparatorExpression.class)));
410409
}
411410

412411
@Override

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/inner/InnerRewriteVisitor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1189,9 +1189,9 @@ public VisitDecision preorder(DBSPSortExpression expression) {
11891189
this.push(expression);
11901190
DBSPExpression comparator = this.transform(expression.comparator);
11911191
DBSPType elementType = this.transform(expression.elementType);
1192-
@Nullable DBSPExpression limit = this.transformN(expression.limit);
11931192
this.pop(expression);
1194-
DBSPExpression result = new DBSPSortExpression(expression.getNode(), elementType, comparator, limit);
1193+
DBSPExpression result = new DBSPSortExpression(
1194+
expression.getNode(), elementType, comparator);
11951195
this.map(expression, result);
11961196
return VisitDecision.STOP;
11971197
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/ir/expression/DBSPSortExpression.java

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,9 @@ public final class DBSPSortExpression extends DBSPExpression {
5151
// Usually a DBSPComparatorExpression, but can be a PathExpression too.
5252
public final DBSPExpression comparator;
5353
public final DBSPType elementType;
54-
@Nullable
55-
public final DBSPExpression limit;
5654

5755
public DBSPSortExpression(
58-
CalciteObject node, DBSPType elementType,
59-
DBSPExpression comparator, @Nullable DBSPExpression limit) {
56+
CalciteObject node, DBSPType elementType, DBSPExpression comparator) {
6057
super(node, new DBSPTypeFunction(
6158
// Return type
6259
new DBSPTypeArray(elementType, false),
@@ -66,7 +63,6 @@ public DBSPSortExpression(
6663
new DBSPTypeArray(elementType, false).ref())));
6764
this.comparator = comparator;
6865
this.elementType = elementType;
69-
this.limit = limit;
7066
}
7167

7268
@Override
@@ -76,10 +72,6 @@ public void accept(InnerVisitor visitor) {
7672
visitor.push(this);
7773
visitor.property("comparator");
7874
this.comparator.accept(visitor);
79-
if (this.limit != null) {
80-
visitor.property("limit");
81-
this.limit.accept(visitor);
82-
}
8375
visitor.property("elementType");
8476
this.elementType.accept(visitor);
8577
visitor.pop(this);
@@ -93,8 +85,7 @@ public boolean sameFields(IDBSPInnerNode other) {
9385
if (o == null)
9486
return false;
9587
return this.comparator == o.comparator &&
96-
this.elementType == o.elementType &&
97-
this.limit == o.limit;
88+
this.elementType == o.elementType;
9889
}
9990

10091
@Override
@@ -107,26 +98,22 @@ public IIndentStream toString(IIndentStream builder) {
10798
@Override
10899
public DBSPExpression deepCopy() {
109100
return new DBSPSortExpression(this.getNode(), this.elementType,
110-
this.comparator.deepCopy().to(DBSPComparatorExpression.class), this.limit);
101+
this.comparator.deepCopy().to(DBSPComparatorExpression.class));
111102
}
112103

113104
@Override
114105
public boolean equivalent(EquivalenceContext context, DBSPExpression other) {
115106
DBSPSortExpression otherExpression = other.as(DBSPSortExpression.class);
116107
if (otherExpression == null)
117108
return false;
118-
return this.comparator.equivalent(context, otherExpression.comparator) &&
119-
EquivalenceContext.equiv(this.limit, otherExpression.limit);
109+
return this.comparator.equivalent(context, otherExpression.comparator);
120110
}
121111

122112

123113
@SuppressWarnings("unused")
124114
public static DBSPSortExpression fromJson(JsonNode node, JsonDecoder decoder) {
125115
DBSPExpression comparator = fromJsonInner(node, "comparator", decoder, DBSPExpression.class);
126116
DBSPType elementType = fromJsonInner(node, "elementType", decoder, DBSPType.class);
127-
DBSPExpression limit = null;
128-
if (node.has("limit"))
129-
limit = fromJsonInner(node, "limit", decoder, DBSPExpression.class);
130-
return new DBSPSortExpression(CalciteObject.EMPTY, elementType, comparator, limit);
117+
return new DBSPSortExpression(CalciteObject.EMPTY, elementType, comparator);
131118
}
132119
}

0 commit comments

Comments
 (0)