Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sql-to-dbsp-compiler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Java dependencies.
The code generated by the compiler is Rust. To run it you need a
working installation of Rust (we recommend using rustup to install:
<https://www.rust-lang.org/tools/install>). The compiler uses the
current version of the DBSP library sources from github
current version of the DBSP library sources from Github
(<https://github.com/feldera/feldera>).

If you want to generate images of the query plans you need
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,9 @@ public class CompilerMain {
}

void usage(JCommander commander) {
/*
// JCommander mistakenly prints this as default value
// if it manages to parse it partially.
this.options.ioOptions.loggingLevel.clear();
*/
// this.options.ioOptions.loggingLevel.clear();
commander.usage();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import org.dbsp.sqlCompiler.circuit.operator.DBSPOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPSinkOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPSourceBaseOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPSourceTableOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPViewDeclarationOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPViewOperator;
import org.dbsp.sqlCompiler.circuit.operator.IInputOperator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* For this we synthesize the following graph:
* |
* {}/{c->1}------------------------
* | map (|x| x -> z} |
* | map (|x| x -> z) |
* {}/{z->1} |
* | - |
* {} {z->-1} {z->1} (constant) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.dbsp.sqlCompiler.compiler.visitors.monotone.PartiallyMonotoneTuple;
import org.dbsp.sqlCompiler.compiler.visitors.outer.CircuitVisitor;
import org.dbsp.sqlCompiler.ir.DBSPParameter;
import org.dbsp.sqlCompiler.ir.expression.DBSPClosureExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPOpcode;
import org.dbsp.sqlCompiler.ir.expression.DBSPVariablePath;
Expand All @@ -35,7 +34,7 @@ public DBSPIntegrateTraceRetainKeysOperator(
data.outputType(), data.isMultiset(), data, control, false);
}

/** Create a operator to retain keys and returns it. May return null if the keys contain no fields. */
/** Create an operator to retain keys and returns it. May return null if the keys contain no fields. */
@Nullable
public static DBSPIntegrateTraceRetainKeysOperator create(
CalciteRelNode node, OutputPort data, IMaybeMonotoneType dataProjection, OutputPort control) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ public DBSPIntegrateTraceRetainValuesOperator(
public static DBSPIntegrateTraceRetainValuesOperator create(
CalciteRelNode node, OutputPort data, IMaybeMonotoneType dataProjection, OutputPort control) {
DBSPType controlType = control.outputType();
Utilities.enforce(controlType.is(DBSPTypeTupleBase.class), "Control type is not a tuple: " + controlType);
Utilities.enforce(controlType.is(DBSPTypeTupleBase.class),
"Control type is not a tuple: " + controlType);
DBSPTypeTupleBase controlTuple = controlType.to(DBSPTypeTupleBase.class);
Utilities.enforce(controlTuple.size() == 2);

DBSPVariablePath controlArg = controlType.ref().var();
Utilities.enforce(data.outputType().is(DBSPTypeIndexedZSet.class));
Utilities.enforce(data.outputType().is(DBSPTypeIndexedZSet.class),
"Data is not indexed: " + data.outputType());
DBSPType valueType = data.getOutputIndexedZSetType().elementType;
DBSPVariablePath dataArg = valueType.ref().var();
DBSPParameter param = new DBSPParameter(dataArg.variable, dataArg.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void addOperator(DBSPOperator operator) {
/** Add an output for this nested operator. The port may be null if the corresponding output
* has actually been deleted.
* @param view View corresponding to output.
* @param port Output port corresponding to the view (may be in a differentiator).
* @param port Output port corresponding to the view (could be in a differentiator).
* @return A port of this operator that corresponds
*/
public OutputPort addOutput(ProgramIdentifier view, @Nullable OutputPort port) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import org.dbsp.sqlCompiler.ir.expression.DBSPClosureExpression;
import org.dbsp.sqlCompiler.ir.type.DBSPType;

import java.util.List;

/** These operators have *two* outputs: a regular stream output
* and an error output */
public abstract class DBSPOperatorWithError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public abstract class DBSPSimpleOperator extends DBSPOperator
/** Type of output produced. */
public final DBSPType outputType;
/** True if the output of the operator is a multiset. Conservative approximation;
* if this is 'false', it is surely false. It if is true, the output may still be a set. */
* if this is 'false', it is surely false. If it is true, the output may still be a set. */
Comment thread
mihaibudiu marked this conversation as resolved.
public final boolean isMultiset;
/** True if the operator contains an integrator */
public final boolean containsIntegrator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.dbsp.sqlCompiler.ir.expression.DBSPExpression;
import org.dbsp.sqlCompiler.ir.type.DBSPType;
import org.dbsp.sqlCompiler.ir.type.derived.DBSPTypeStruct;
import org.dbsp.util.Linq;
import org.dbsp.util.Utilities;

import javax.annotation.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.dbsp.sqlCompiler.ir.DBSPNode;
import org.dbsp.sqlCompiler.ir.expression.DBSPExpression;
import org.dbsp.sqlCompiler.ir.type.DBSPType;
import org.dbsp.sqlCompiler.ir.type.IHasType;
import org.dbsp.util.IJson;
import org.dbsp.util.Utilities;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@

import org.dbsp.sqlCompiler.circuit.annotation.OperatorHash;
import org.dbsp.sqlCompiler.circuit.operator.DBSPOperator;
import org.dbsp.sqlCompiler.compiler.backend.rust.multi.CircuitWriter;
import org.dbsp.sqlCompiler.compiler.errors.SourcePosition;
import org.dbsp.sqlCompiler.compiler.errors.SourcePositionRange;
import org.dbsp.sqlCompiler.ir.expression.DBSPHandleErrorExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPStaticExpression;
import org.dbsp.util.HashString;
import org.dbsp.util.IIndentStream;
import org.dbsp.util.Utilities;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.dbsp.sqlCompiler.circuit.operator.DBSPDistinctOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPJoinBaseOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPSourceTableOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPStreamJoinIndexOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPIndexedTopKOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPIntegrateTraceRetainKeysOperator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.dbsp.sqlCompiler.circuit.operator.DBSPSimpleOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPSinkOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPSourceBaseOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPSourceTableOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPViewBaseOperator;
import org.dbsp.sqlCompiler.circuit.operator.IInputOperator;
import org.dbsp.sqlCompiler.compiler.DBSPCompiler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public final class SingleOperatorWriter extends BaseRustCodeGenerator {

/* Example output:
* ... preamble ...
* pub fn create_xxx(circuit: &RootCircuit, hash: Option<&'static str>, sourceMap: &'static SourceMap, catalog: &mut Catalog,
* pub fn create_xxx(circuit: &RootCircuit, hash: Option<&'static str>,
* sourceMap: &'static SourceMap, catalog: &mut Catalog,
* i0: &Stream<RootCircuit, WSet<Tup1<Option<i32>>>>,
* i1: &Stream<RootCircuit, WSet<Tup1<Option<i32>>>>,
* i2: &Stream<RootCircuit, WSet<Tup1<Option<i32>>>>, ) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,34 +449,36 @@ UnimplementedException decorrelateError(CalciteObject node) {
}

void visitCorrelate(LogicalCorrelate correlate) {
// We decorrelate queries using Calcite's optimizer, which doesn't always work.
// In particular, it won't decorrelate queries with unnest.
// Here we check for unnest-type queries. We assume that unnest queries
// have a restricted plan of this form:
// LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{...}])
// LeftSubquery (arbitrary)
// Uncollect
// LogicalProject(COL=[$cor0.ARRAY]) // uncollectInput
// LogicalValues(tuples=[[{ 0 }]])
// or
// LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{...}])
// LeftSubquery
// LogicalProject // rightProject
// Uncollect
// LogicalProject(COL=[$cor0.ARRAY]) // uncollectInput
// LogicalValues(tuples=[[{ 0 }]])
// Instead of projecting and joining again we directly apply flatmap.
// The translation for this is:
// stream.flat_map({
// move |x: &Tuple2<Vec<i32>, Option<i32>>, | -> _ {
// let xA: Vec<i32> = x.0.clone();
// let xB: x.1.clone();
// x.0.clone().into_iter().map({
// move |e: i32, | -> Tuple3<Vec<i32>, Option<i32>, i32> {
// Tuple3::new(xA.clone(), xB.clone(), e)
// }
// })
// });
/*
We decorrelate queries using Calcite's optimizer, which doesn't always work.
In particular, it won't decorrelate queries with unnest.
Here we check for unnest-type queries. We assume that unnest queries
have a restricted plan of this form:
LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{...}])
LeftSubquery (arbitrary)
Uncollect
LogicalProject(COL=[$cor0.ARRAY]) // uncollectInput
LogicalValues(tuples=[[{ 0 }]])
or
LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{...}])
LeftSubquery
LogicalProject // rightProject
Uncollect
LogicalProject(COL=[$cor0.ARRAY]) // uncollectInput
LogicalValues(tuples=[[{ 0 }]])
Instead of projecting and joining again we directly apply flatmap.
The translation for this is:
stream.flat_map({
move |x: &Tuple2<Vec<i32>, Option<i32>>, | -> _ {
let xA: Vec<i32> = x.0.clone();
let xB: x.1.clone();
x.0.clone().into_iter().map({
move |e: i32, | -> Tuple3<Vec<i32>, Option<i32>, i32> {
Tuple3::new(xA.clone(), xB.clone(), e)
}
})
});
*/
CalciteObject node = CalciteObject.create(correlate);
DBSPTypeTuple type = this.convertType(correlate.getRowType(), false).to(DBSPTypeTuple.class);
/*
Expand All @@ -485,7 +487,6 @@ void visitCorrelate(LogicalCorrelate correlate) {
if (correlate.getJoinType().isOuterJoin())
throw this.decorrelateError(node);
*/

this.visit(correlate.getLeft(), 0, correlate);
DBSPSimpleOperator left = this.getInputAs(correlate.getLeft(), true);
DBSPTypeTuple leftElementType = left.getOutputZSetElementType().to(DBSPTypeTuple.class);
Expand Down Expand Up @@ -2664,7 +2665,7 @@ DBSPSimpleOperator implement(DBSPSimpleOperator unusedInput, DBSPSimpleOperator
}

// This operator is always incremental, so create the non-incremental version
// of it by adding a D and an I around it.
// of it by adding a Differentiator and an Integrator around it.
DBSPDifferentiateOperator diff = new DBSPDifferentiateOperator(this.node, inputIndexed);
this.compiler.getCircuit().addOperator(diff);

Expand Down Expand Up @@ -3091,7 +3092,7 @@ DBSPSimpleOperator implement(DBSPSimpleOperator input, DBSPSimpleOperator lastOp
// Compute the window aggregate

// This operator is always incremental, so create the non-incremental version
// of it by adding a D and an I around it.
// of it by adding a Differentiator and an Integrator around it.
DBSPDifferentiateOperator diff = new DBSPDifferentiateOperator(node, mapIndex.outputPort());
this.compiler.getCircuit().addOperator(diff);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ static CompilationError operandCountError(CalciteObject node, String name, int o
@Nullable
DBSPExpression makeRegex(DBSPStringLiteral lit) {
DBSPTypeUser user = new DBSPTypeUser(CalciteObject.EMPTY, USER, "Regex", true);
// Here we lie about the type: new does not return an Regex, but a Result<Regex, Error>.
// Here we lie about the type: new does not return a Regex, but a Result<Regex, Error>.
// We lie again that ok returns an unchanged type. These two lies cancel out.
DBSPExpression init = user.constructor("new", lit.toStr());
init = init.applyMethod("ok", init.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public interface CalciteOptimizerStep {
RelNode optimize(RelNode rel, int level);
}

/** Base class for optimizations that use a Hep optimizer */
/** Base class for optimizations that use a HEP optimizer */
public static abstract class HepOptimizerStep implements CalciteOptimizerStep {
/** The program that performs the optimization for the specified optimization level */
abstract HepProgram getProgram(RelNode node, int level);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private void match(RelOptRuleCall call) {
Pair<RelNode, @Nullable RexNode> first = extractSourceAndCond(inputs.get(0).stripped().getInput(0).stripped());

// Groups conditions by their source relational node and input position.
// - Key: Pair of (sourceRelNode, inputPosition)
// - Key: A pair of (sourceRelNode, inputPosition)
// - inputPosition is null for mergeable conditions
// - inputPosition contains original index for non-mergeable inputs
// - Value: List of conditions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ static RelColumnMetadata relMetaFromJson(JsonNode node, int index, RelDataTypeFa
String json = Utilities.deterministicObjectMapper().writeValueAsString(jsonType);
RelDataType type = RelJsonReader.readType(typeFactory, json);
RelDataTypeField field = new RelDataTypeFieldImpl(name, index, type);
// Do we need lateness, watermark, etc?
// Do we need lateness, watermark, etc.?
boolean interned = node.has("interned");
return new RelColumnMetadata(CalciteObject.EMPTY, field, isPrimaryKey, caseSensitive,
null, null, null, SourcePositionRange.INVALID, interned);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ void checkRepresentation(DBSPExpression expression, Representation repr, boolean
@Override
public void startVisit(IDBSPInnerNode node) {
// This node may not be a DBSPClosureExpression.
// It can be a e.g., Fold constructor.
// It can be e.g., Fold constructor.
Logger.INSTANCE.belowLevel(this, 1)
.append("CSE analyzing ")
.appendSupplier(node::toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.dbsp.sqlCompiler.ir.type.IsNumericType;
import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeISize;

/** Converts a[index] into a[(isize)(index - 1] */
/** Converts a[index] into a[(isize)(index - 1)] */
public class AdjustSqlIndex extends ExpressionTranslator {
public AdjustSqlIndex(DBSPCompiler compiler) {
super(compiler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.dbsp.sqlCompiler.circuit.operator.DBSPIntegrateTraceRetainValuesOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPNestedOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPOperatorWithError;
import org.dbsp.sqlCompiler.circuit.operator.DBSPSimpleOperator;
import org.dbsp.sqlCompiler.circuit.OutputPort;
import org.dbsp.sqlCompiler.compiler.DBSPCompiler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
/** Implements the "now" operator.
* This requires:
* - using the input stream called now
* - rewriting map operators that have calls to "now()' into a join followed by a map
* - rewriting map operators that have calls to "now()" into a join followed by a map
* - rewriting the invocations to the now() function in the map function to references to the input variable */
public class ImplementNow extends Passes {
/** Discovers whether an expression contains a call to the now() function */
Expand Down Expand Up @@ -215,7 +215,7 @@ public VisitDecision preorder(DBSPApplyExpression expression) {

/** Replace map operators that contain now() as an expression with
* map operators that take an extra field and use that instead of the now() call.
* Insert a join prior to such operators (which also requires a MapIndex operator.
* Insert a join prior to such operators (which also requires a MapIndex operator).
* Also inserts a MapIndex operator to index the 'NOW' built-in table.
* Same for filter operators. */
static class RewriteNow extends CircuitCloneVisitor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void postorder(DBSPMapIndexOperator operator) {

DBSPSimpleOperator newJoin = join
.withInputs(Linq.list(leftPort, rightPort), false)
.to(DBSPSimpleOperator.class);;
.to(DBSPSimpleOperator.class);
if (newJoin.outputType.sameType(join.outputType)) {
super.postorder(operator);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void postorder(DBSPMapOperator operator) {
sourceFunction.rightProjections, sourceFunction.ordinalityIndexType, shuffle);
DBSPSimpleOperator result = source.simpleNode()
.withFunction(newFunction, operator.outputType)
.to(DBSPSimpleOperator.class);;
.to(DBSPSimpleOperator.class);
this.map(operator, result);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.dbsp.sqlCompiler.circuit.operator.DBSPAggregateOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPBinaryOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPChainAggregateOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPInputMapWithWaterlineOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPIntegrateTraceRetainKeysOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPIntegrateTraceRetainValuesOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPLagOperator;
Expand All @@ -22,6 +23,8 @@ public StrayGC(DBSPCompiler compiler, CircuitGraphs g) {
void check(DBSPBinaryOperator operator) {
// At least one sibling on the left input must contain an integral
var left = operator.left();
if (left.operator.is(DBSPInputMapWithWaterlineOperator.class))
return;
CircuitGraph graph = this.getGraph();
for (Port<DBSPOperator> sibling: graph.getSuccessors(left.operator)) {
DBSPOperator so = sibling.node();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public boolean contains(int i) {
public String toString() {
return this.columns.toString();
}
};
}

final Map<DBSPSourceTableOperator, InternedColumnList> internedInputs;

Expand Down
Loading