Skip to content

Commit 86b1a1f

Browse files
mihaibudiuryzhyk
authored andcommitted
[SQL] Support for CREATE INDEX statement on views
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent 0c2ca43 commit 86b1a1f

File tree

22 files changed

+443
-58
lines changed

22 files changed

+443
-58
lines changed

benchmark/feldera-sql/benchmarks/nexmark/queries/q22.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
CREATE FUNCTION SPLIT_INDEX(s VARCHAR, sep CHAR, index INT) RETURNS VARCHAR
2-
AS SPLIT(s, CAST(sep AS VARCHAR))[index + 1];
1+
CREATE FUNCTION SPLIT_INDEX(s VARCHAR, sep CHAR, idx INT) RETURNS VARCHAR
2+
AS SPLIT(s, CAST(sep AS VARCHAR))[idx + 1];
33

44
CREATE VIEW Q22 AS
55
SELECT

sql-to-dbsp-compiler/SQL-compiler/src/main/codegen/config.fmpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@ data: {
2020
"org.dbsp.sqlCompiler.compiler.frontend.parser.SqlLateness"
2121
"org.dbsp.sqlCompiler.compiler.frontend.parser.SqlCreateView"
2222
"org.dbsp.sqlCompiler.compiler.frontend.parser.SqlDeclareView"
23+
"org.dbsp.sqlCompiler.compiler.frontend.parser.SqlCreateIndex"
2324
"org.dbsp.sqlCompiler.compiler.frontend.parser.SqlCreateTable"
2425
"org.dbsp.sqlCompiler.compiler.frontend.parser.SqlForeignKey"
2526
]
2627

2728
# List of new keywords. Example: "DATABASES", "TABLES". If the keyword is
2829
# not a reserved keyword, add it to the 'nonReservedKeywords' section.
2930
keywords: [
31+
"INDEX"
3032
"DISCARD"
3133
"IF"
3234
"LATENESS"
@@ -923,6 +925,7 @@ data: {
923925
createStatementParserMethods: [
924926
"SqlCreateView"
925927
"SqlCreateExtendedTable"
928+
"SqlCreateIndex"
926929
"SqlCreateType"
927930
"SqlCreateFunction"
928931
]

sql-to-dbsp-compiler/SQL-compiler/src/main/codegen/includes/ddl.ftl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,23 @@ SqlDeclareView SqlDeclareView() :
314314
}
315315
}
316316

317+
SqlCreateIndex SqlCreateIndex(Span s, boolean replace) :
318+
{
319+
final SqlIdentifier id;
320+
final SqlIdentifier indexed;
321+
List<SqlNode> columns = new ArrayList<SqlNode>();
322+
SqlNodeList columnList;
323+
}
324+
{
325+
<INDEX>
326+
id = CompoundIdentifier() <ON>
327+
indexed = CompoundIdentifier()
328+
columnList = ParenthesizedSimpleIdentifierList()
329+
{
330+
return new SqlCreateIndex(s.end(this), id, indexed, columnList);
331+
}
332+
}
333+
317334
void ColumnDeclaration(List<SqlNode> list) :
318335
{
319336
final SqlIdentifier id;

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPSinkOperator.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,36 @@
2929
import org.dbsp.sqlCompiler.compiler.frontend.calciteObject.CalciteObject;
3030
import org.dbsp.sqlCompiler.compiler.visitors.VisitDecision;
3131
import org.dbsp.sqlCompiler.compiler.visitors.outer.CircuitVisitor;
32+
import org.dbsp.sqlCompiler.ir.type.DBSPType;
3233
import org.dbsp.sqlCompiler.ir.type.derived.DBSPTypeStruct;
34+
import org.dbsp.sqlCompiler.ir.type.user.DBSPTypeIndexedZSet;
3335

3436
import java.util.List;
3537

3638
public final class DBSPSinkOperator extends DBSPViewBaseOperator {
37-
public DBSPSinkOperator(CalciteObject node, ProgramIdentifier viewName, String query,
38-
DBSPTypeStruct originalRowType,
39+
/** Create a SinkOperator. May represent an output view or an index over a view.
40+
* TODO: this should probably be broken in two separate sublcasses for views and indexes.
41+
*
42+
* @param node Calcite node represented.
43+
* @param viewName Name of view or index represented by the sink.
44+
* @param queryOrViewName Query defining the view, or the view name that is indexed.
45+
* @param originalRowType Type of data of the view. This is a struct type for a view,
46+
* and a raw tuple with 2 fields for an index: (key, value), each a struct.
47+
* @param metadata Metadata of view (for an index this is the view's metadata).
48+
* @param input Unique input stream. */
49+
public DBSPSinkOperator(CalciteObject node, ProgramIdentifier viewName, String queryOrViewName,
50+
DBSPType originalRowType,
3951
ViewMetadata metadata,
4052
OutputPort input) {
41-
super(node, "inspect", null, viewName, query,
53+
super(node, "inspect", null, viewName, queryOrViewName,
4254
originalRowType, metadata, input);
4355
}
4456

57+
/** True if this sink corresponds to an INDEX statement */
58+
public boolean isIndex() {
59+
return this.outputType.is(DBSPTypeIndexedZSet.class);
60+
}
61+
4562
@Override
4663
public void accept(CircuitVisitor visitor) {
4764
visitor.push(this);

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPViewBaseOperator.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,25 @@
55
import org.dbsp.sqlCompiler.compiler.frontend.calciteCompiler.ProgramIdentifier;
66
import org.dbsp.sqlCompiler.compiler.frontend.calciteObject.CalciteObject;
77
import org.dbsp.sqlCompiler.ir.expression.DBSPExpression;
8-
import org.dbsp.sqlCompiler.ir.type.derived.DBSPTypeStruct;
8+
import org.dbsp.sqlCompiler.ir.type.DBSPType;
99
import org.dbsp.util.IIndentStream;
1010

1111
import javax.annotation.Nullable;
1212

1313
/** Base class for an operator representing a view declared by the user.
1414
* If the view is an output then it is represented by a Sink operator.
15-
* Otherwise, the view is represented by a DBSPViewOperator. */
15+
* Otherwise, the view is represented by a DBSPViewOperator.
16+
* Also represents Index operators applied to views. */
1617
public abstract class DBSPViewBaseOperator extends DBSPUnaryOperator {
18+
// Called viewName, but it could also be an index name
1719
public final ProgramIdentifier viewName;
1820
public final String query;
19-
public final DBSPTypeStruct originalRowType;
21+
public final DBSPType originalRowType;
2022
public final ViewMetadata metadata;
2123

2224
protected DBSPViewBaseOperator(
2325
CalciteObject node, String operation, @Nullable DBSPExpression function,
24-
ProgramIdentifier viewName, String query, DBSPTypeStruct originalRowType,
26+
ProgramIdentifier viewName, String query, DBSPType originalRowType,
2527
ViewMetadata metadata, OutputPort input) {
2628
super(node, operation, function, input.outputType(), input.isMultiset(), input);
2729
this.metadata = metadata;

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPViewOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ public final class DBSPViewOperator
2525
implements IHasColumnsMetadata
2626
{
2727
public DBSPViewOperator(
28-
CalciteObject node, ProgramIdentifier viewName, String query, DBSPTypeStruct originalRowType,
28+
CalciteObject node, ProgramIdentifier viewName, String query, DBSPType originalRowType,
2929
ViewMetadata metadata, OutputPort input) {
3030
super(node, "map", DBSPClosureExpression.id(), viewName, query,
3131
originalRowType, metadata, input);
32-
assert metadata.size() == originalRowType.fields.size();
32+
assert metadata.size() == originalRowType.to(DBSPTypeStruct.class).fields.size();
3333
}
3434

3535
/** True if any column has LATENESS information */

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/DBSPCompiler.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.dbsp.sqlCompiler.compiler.frontend.parser.SqlFragment;
6060
import org.dbsp.sqlCompiler.compiler.frontend.parser.SqlFragmentIdentifier;
6161
import org.dbsp.sqlCompiler.compiler.frontend.statements.CreateFunctionStatement;
62+
import org.dbsp.sqlCompiler.compiler.frontend.statements.CreateIndexStatement;
6263
import org.dbsp.sqlCompiler.compiler.frontend.statements.CreateTableStatement;
6364
import org.dbsp.sqlCompiler.compiler.frontend.statements.CreateViewStatement;
6465
import org.dbsp.sqlCompiler.compiler.frontend.statements.RelStatement;
@@ -143,6 +144,7 @@ public enum InputSource {
143144
final Map<ProgramIdentifier, Map<ProgramIdentifier, SqlLateness>> viewLateness = new HashMap<>();
144145

145146
final Map<ProgramIdentifier, CreateViewStatement> views = new HashMap<>();
147+
final Map<ProgramIdentifier, CreateIndexStatement> indexes = new HashMap<>();
146148
/** All UDFs from the SQL program. The ones in Rust have no bodies */
147149
public final List<DBSPFunction> functions = new ArrayList<>();
148150

@@ -477,6 +479,44 @@ List<ParsedStatement> runParser() {
477479
return parsed;
478480
}
479481

482+
boolean validateCreateIndex(CreateIndexStatement statement) {
483+
ProgramIdentifier refersTo = statement.refersTo;
484+
if (this.metadata.hasTable(refersTo)) {
485+
this.reportWarning(
486+
new SourcePositionRange(statement.createIndex.name.getParserPosition()),
487+
"Indexed table",
488+
"INDEX " + statement.indexName.singleQuote() + " refers to TABLE " +
489+
statement.refersTo.singleQuote() + "; this has no effect.");
490+
return true;
491+
}
492+
if (!this.metadata.hasView(refersTo)) {
493+
this.reportError(
494+
new SourcePositionRange(statement.createIndex.indexed.getParserPosition()),
495+
"Indexed object not found",
496+
"Object with name " + statement.refersTo.singleQuote() +
497+
" used in CREATE INDEX statement " + statement.indexName.singleQuote() +
498+
" does not exist.");
499+
return false;
500+
}
501+
CreateViewStatement view = Utilities.getExists(this.views, statement.refersTo);
502+
int i = 0;
503+
for (ProgramIdentifier col: statement.columns) {
504+
int index = view.getColumnIndex(col);
505+
if (index < 0) {
506+
SqlNode sqlNode = statement.createIndex.columns.get(i);
507+
this.reportError(
508+
new SourcePositionRange(sqlNode.getParserPosition()),
509+
"Column not found",
510+
"Column " + col.singleQuote() +
511+
" used in CREATE INDEX statement " + statement.indexName.singleQuote() +
512+
" does not exist in view " + view.relationName.singleQuote());
513+
return false;
514+
}
515+
i++;
516+
}
517+
return true;
518+
}
519+
480520
@Nullable DBSPCircuit runAllCompilerStages() {
481521
List<ParsedStatement> parsed = this.runParser();
482522
if (this.hasErrors())
@@ -576,13 +616,19 @@ List<ParsedStatement> runParser() {
576616
if (properties != null)
577617
properties.checkKnownProperties(this::validateViewProperty);
578618
currentViewPosition = cv.createView.getParserPosition();
579-
this.views.put(cv.getName(), cv);
619+
Utilities.putNew(this.views, cv.getName(), cv);
580620
} else if (fe.is(CreateTableStatement.class)) {
581621
CreateTableStatement ct = fe.to(CreateTableStatement.class);
582622
PropertyList properties = ct.getProperties();
583623
if (properties != null)
584624
properties.checkKnownProperties(this::validateTableProperty);
585625
foreignKeys.addAll(ct.foreignKeys);
626+
} else if (fe.is(CreateIndexStatement.class)) {
627+
CreateIndexStatement ct = fe.to(CreateIndexStatement.class);
628+
boolean success = this.validateCreateIndex(ct);
629+
if (!success)
630+
return null;
631+
Utilities.putNew(this.indexes, ct.getName(), ct);
586632
}
587633
this.relToDBSPCompiler.compile(fe);
588634
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/ProgramMetadata.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,14 @@ public IHasSchema getTableDescription(ProgramIdentifier name) {
4242
return Utilities.getExists(this.inputTables, name);
4343
}
4444

45+
public boolean hasTable(ProgramIdentifier name) {
46+
return this.inputTables.containsKey(name);
47+
}
48+
49+
public boolean hasView(ProgramIdentifier name) {
50+
return this.outputViews.containsKey(name);
51+
}
52+
4553
public IHasSchema getViewDescription(ProgramIdentifier name) {
4654
return Utilities.getExists(this.outputViews, name);
4755
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/ToRustVisitor.java

Lines changed: 49 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@
107107
import org.dbsp.sqlCompiler.ir.statement.DBSPStructWithHelperItem;
108108
import org.dbsp.sqlCompiler.ir.type.DBSPType;
109109
import org.dbsp.sqlCompiler.ir.type.DBSPTypeCode;
110+
import org.dbsp.sqlCompiler.ir.type.derived.DBSPTypeRawTuple;
110111
import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeNull;
111112
import org.dbsp.sqlCompiler.ir.type.user.DBSPTypeIndexedZSet;
112113
import org.dbsp.sqlCompiler.ir.type.user.DBSPTypeMap;
@@ -632,7 +633,7 @@ public void postorder(DBSPTypeStruct struct) {
632633
}
633634
}
634635

635-
void findNestedStructs(DBSPTypeStruct struct, List<DBSPTypeStruct> result) {
636+
void findNestedStructs(DBSPType struct, List<DBSPTypeStruct> result) {
636637
FindNestedStructs fn = new FindNestedStructs(this.compiler, result);
637638
fn.apply(struct);
638639
}
@@ -642,7 +643,7 @@ void generateStructDeclarations(DBSPTypeStruct struct) {
642643
item.accept(this.innerVisitor);
643644
}
644645

645-
void generateStructHelpers(DBSPTypeStruct type, @Nullable TableMetadata metadata) {
646+
void generateStructHelpers(DBSPType type, @Nullable TableMetadata metadata) {
646647
List<DBSPTypeStruct> nested = new ArrayList<>();
647648
findNestedStructs(type, nested);
648649
for (DBSPTypeStruct s: nested) {
@@ -949,37 +950,54 @@ public VisitDecision preorder(DBSPWaterlineOperator operator) {
949950
@Override
950951
public VisitDecision preorder(DBSPSinkOperator operator) {
951952
this.writeComments(operator);
952-
DBSPTypeStruct type = operator.originalRowType;
953+
DBSPType type = operator.originalRowType;
953954
if (!this.useHandles) {
954955
this.generateStructHelpers(type, null);
955-
this.builder.append("type ")
956-
.append(operator.viewName.name())
957-
.append("_struct = ")
958-
.append(type.sanitizedName)
959-
.append(";")
960-
.newline();
961-
}
962-
if (!this.useHandles) {
963-
IHasSchema description = this.metadata.getViewDescription(operator.viewName);
964-
JsonNode j = description.asJson();
965-
j = this.stripConnectors(j);
966-
DBSPStrLiteral json = new DBSPStrLiteral(j.toString(), false, true);
967-
String registerFunction = switch (operator.metadata.viewKind) {
968-
case MATERIALIZED -> "register_materialized_output_zset";
969-
case LOCAL -> throw new InternalCompilerError("Sink operator for local view " + operator);
970-
case STANDARD -> "register_output_zset";
971-
};
972-
this.builder.append("catalog.")
973-
.append(registerFunction)
974-
.append("::<_, ");
975-
operator.originalRowType.accept(this.innerVisitor);
976-
this.builder.append(">(")
977-
.append(operator.input().getOutputName())
978-
.append(".clone()")
979-
.append(", ");
980-
json.accept(this.innerVisitor);
981-
this.builder.append(");")
982-
.newline();
956+
if (operator.isIndex()) {
957+
DBSPTypeRawTuple raw = operator.originalRowType.to(DBSPTypeRawTuple.class);
958+
assert raw.size() == 2;
959+
960+
this.builder.append("catalog.register_index");
961+
this.builder.append("::<").increase();
962+
operator.getOutputIndexedZSetType().keyType.accept(this.innerVisitor);
963+
this.builder.append(",").newline();
964+
raw.tupFields[0].accept(this.innerVisitor);
965+
this.builder.append(",").newline();
966+
operator.getOutputIndexedZSetType().elementType.accept(this.innerVisitor);
967+
this.builder.append(",").newline();
968+
raw.tupFields[1].accept(this.innerVisitor);
969+
this.builder.decrease().append(">");
970+
this.builder.append("(").newline()
971+
.append(operator.input().getOutputName())
972+
.append(".clone()")
973+
.append(", &SqlIdentifier::from(\"");
974+
this.builder.append(operator.viewName.toString())
975+
.append("\"), &SqlIdentifier::from(\"")
976+
.append(operator.query);
977+
this.builder.append("\"));")
978+
.newline();
979+
} else {
980+
IHasSchema description = this.metadata.getViewDescription(operator.viewName);
981+
JsonNode j = description.asJson();
982+
j = this.stripConnectors(j);
983+
DBSPStrLiteral json = new DBSPStrLiteral(j.toString(), false, true);
984+
String registerFunction = switch (operator.metadata.viewKind) {
985+
case MATERIALIZED -> "register_materialized_output_zset";
986+
case LOCAL -> throw new InternalCompilerError("Sink operator for local view " + operator);
987+
case STANDARD -> "register_output_zset";
988+
};
989+
this.builder.append("catalog.")
990+
.append(registerFunction)
991+
.append("::<_, ");
992+
operator.originalRowType.accept(this.innerVisitor);
993+
this.builder.append(">(")
994+
.append(operator.input().getOutputName())
995+
.append(".clone()")
996+
.append(", ");
997+
json.accept(this.innerVisitor);
998+
this.builder.append(");")
999+
.newline();
1000+
}
9831001
if (this.options.ioOptions.sqlNames) {
9841002
this.builder.append("let ")
9851003
.append(operator.viewName.name())

0 commit comments

Comments
 (0)