Skip to content

Commit af439ff

Browse files
committed
[SQL] Fix multi-crate CI tests
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent 984419c commit af439ff

23 files changed

Lines changed: 187 additions & 92 deletions

crates/sqllib/src/source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ impl fmt::Display for SourcePositionRange {
6969

7070
/// Maps "keys" to source line position information
7171
#[doc(hidden)]
72-
#[derive(Default)]
72+
#[derive(Debug, Default)]
7373
pub struct SourceMap {
7474
map: BTreeMap<(&'static str, u32), SourcePosition>,
7575
}

sql-to-dbsp-compiler/SQL-compiler/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
<artifactId>maven-surefire-plugin</artifactId>
7474
<version>3.1.2</version>
7575
<configuration>
76+
<runOrder>alphabetical</runOrder>
7677
<argLine>-ea</argLine>
7778
<properties>
7879
<property>

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/operator/DBSPInputMapWithWaterlineOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public DBSPType outputType(int outputNo) {
9898
case 0 -> this.outputType;
9999
case 1 -> this.errorType;
100100
case 2 -> new DBSPTypeTypedBox(this.lub.getResultType(), false);
101-
default -> throw new InternalCompilerError("Unpexected output " + outputNo);
101+
default -> throw new InternalCompilerError("Unexpected output " + outputNo);
102102
};
103103
}
104104

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/BaseRustCodeGenerator.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ public abstract class BaseRustCodeGenerator implements ICodeGenerator {
2525
boolean generateUdfInclude = true;
2626
boolean generateMalloc = true;
2727
boolean generateTuples = true;
28-
boolean declareSourceMap = false;
2928

3029
protected BaseRustCodeGenerator() {
3130
this.id = crdId++;
@@ -39,11 +38,6 @@ public BaseRustCodeGenerator withGenerateTuples(boolean generate) {
3938
return this;
4039
}
4140

42-
public BaseRustCodeGenerator withDeclareSourceMap(boolean declare) {
43-
this.declareSourceMap = declare;
44-
return this;
45-
}
46-
4741
public BaseRustCodeGenerator withUdf(boolean udf) {
4842
this.generateUdfInclude = udf;
4943
return this;
@@ -100,7 +94,8 @@ public void addDependency(String crate) {
10094
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
10195
#[allow(non_upper_case_globals)]
10296
#[export_name = "malloc_conf"]
103-
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\\0";""";
97+
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\\0";
98+
""";
10499

105100
public static final String STANDARD_PREAMBLE = """
106101
use dbsp::{

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/RustFileWriter.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ public class RustFileWriter extends RustWriter {
2020
StructuresUsed used = new StructuresUsed();
2121
boolean findUsed = true;
2222
boolean slt = false;
23-
boolean test = false;
2423
final LateMaterializations materializations;
2524

2625
public RustFileWriter(LateMaterializations materializations) {
@@ -42,12 +41,6 @@ String rustPreamble() {
4241
/** Special support for running the SLT tests */
4342
public RustFileWriter forSlt() {
4443
this.slt = true;
45-
this.test = true;
46-
return this;
47-
}
48-
49-
public RustFileWriter withTest(boolean test) {
50-
this.test = test;
5144
return this;
5245
}
5346

@@ -119,17 +112,13 @@ public void write(DBSPCompiler compiler) {
119112
this.outputBuilder.append(BaseRustCodeGenerator.ALLOC_PREAMBLE);
120113
if (this.generateUdfInclude)
121114
this.generateUdfInclude();
122-
if (this.test)
115+
if (compiler.options.ioOptions.testing)
123116
this.builder().append("""
124117
#[cfg(test)]
125118
use readers::*;""").newline();
126119

127120
for (String dep : this.dependencies)
128-
this.builder().append("use ").append(dep).append("::*;");
129-
130-
if (this.declareSourceMap) {
131-
SourcePositionResource.generateDeclaration(this.outputBuilder);
132-
}
121+
this.builder().append("use ").append(dep).append("::*;").newline();
133122

134123
ToRustInnerVisitor innerVisitor = new ToRustInnerVisitor(compiler, this.builder(), null, false);
135124
ProjectDeclarations declarationsDone = new ProjectDeclarations();

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/SourcePositionResource.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,22 +57,26 @@ HashAndIndex getKey(@Nullable DBSPOperator operator, DBSPHandleErrorExpression e
5757
return new HashAndIndex(name, expression.index);
5858
}
5959

60-
public static final String STATIC_NAME = "SOURCE_MAP_STATIC";
60+
static final String STATIC_NAME_PREFIX = "SOURCE_MAP_STATIC";
6161

6262
public boolean isEmpty() {
6363
return this.keyToPosition.isEmpty();
6464
}
6565

66-
public static void generateDeclaration(IIndentStream builder) {
66+
private static String staticName(String name) {
67+
return STATIC_NAME_PREFIX + "_" + name;
68+
}
69+
70+
public static void generateDeclaration(IIndentStream builder, String name) {
6771
builder.append("pub static ")
68-
.append(STATIC_NAME)
72+
.append(staticName(name))
6973
.append(": ")
7074
.append("OnceLock<SourceMap> = OnceLock::new();")
7175
.newline();
7276
}
7377

74-
public void generateInitializer(IIndentStream builder) {
75-
builder.append(STATIC_NAME)
78+
public void generateInitializer(IIndentStream builder, String name) {
79+
builder.append(staticName(name))
7680
.append(".get_or_init(|| {")
7781
.increase()
7882
.append("let mut m = SourceMap::new();")
@@ -93,11 +97,11 @@ public void generateInitializer(IIndentStream builder) {
9397
.newline();
9498
}
9599

96-
public static void generateReference(IIndentStream builder, String sourceMapName) {
100+
public static void generateReference(IIndentStream builder, String sourceMapName, String staticName) {
97101
builder.append("let ")
98102
.append(sourceMapName)
99103
.append(" = ")
100-
.append(STATIC_NAME)
104+
.append(staticName(staticName))
101105
.append(".get().unwrap();")
102106
.newline();
103107
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/ToRustInnerVisitor.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,17 @@ else if (Float.isInfinite(value)) {
732732
return VisitDecision.STOP;
733733
}
734734

735+
@Override
736+
public VisitDecision preorder(DBSPWindowBoundExpression bound) {
737+
String beforeAfter = bound.isPreceding ? "Before" : "After";
738+
this.builder.append("RelOffset::")
739+
.append(beforeAfter)
740+
.append("(");
741+
bound.representation.accept(this);
742+
this.builder.append(")");
743+
return VisitDecision.STOP;
744+
}
745+
735746
@Override
736747
public VisitDecision preorder(DBSPComment comment) {
737748
if (comment.comment.contains("\n")) {
@@ -2012,7 +2023,9 @@ public VisitDecision preorder(DBSPFunction function) {
20122023
NeedsSourceMap finder = new NeedsSourceMap(this.compiler);
20132024
finder.apply(function.body);
20142025
if (finder.found) {
2015-
SourcePositionResource.generateReference(this.builder, CircuitWriter.SOURCE_MAP_VARIABLE_NAME);
2026+
SourcePositionResource.generateReference(
2027+
this.builder, CircuitWriter.SOURCE_MAP_VARIABLE_NAME,
2028+
Objects.requireNonNull(this.circuitContext).name);
20162029
}
20172030
function.body.accept(this);
20182031
this.builder.decrease()

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/ToRustVisitor.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ public ToRustVisitor withPreferHash(boolean preferHash) {
156156
return this;
157157
}
158158

159+
String getCircuitName() {
160+
return this.getCircuit().name;
161+
}
162+
159163
void processNode(IDBSPNode node) {
160164
DBSPOperator op = node.as(DBSPOperator.class);
161165
if (op != null)
@@ -307,9 +311,10 @@ public VisitDecision preorder(DBSPCircuit circuit) {
307311
.getCircuitVisitor(true);
308312
collector.apply(circuit);
309313
if (!this.sourcePositionResource.isEmpty()) {
310-
SourcePositionResource.generateDeclaration(this.builder);
311-
this.sourcePositionResource.generateInitializer(this.builder);
312-
SourcePositionResource.generateReference(this.builder, CircuitWriter.SOURCE_MAP_VARIABLE_NAME);
314+
SourcePositionResource.generateDeclaration(this.builder, this.getCircuitName());
315+
this.sourcePositionResource.generateInitializer(this.builder, this.getCircuitName());
316+
SourcePositionResource.generateReference(
317+
this.builder, CircuitWriter.SOURCE_MAP_VARIABLE_NAME, this.getCircuitName());
313318
}
314319

315320
registerPreprocessors(this.compiler, this.builder);
@@ -1441,21 +1446,12 @@ public VisitDecision preorder(DBSPLagOperator operator) {
14411446

14421447
void emitWindowBounds(DBSPWindowBoundExpression lower, DBSPWindowBoundExpression upper) {
14431448
this.builder.append("RelRange::new(").increase();
1444-
this.emitWindowBound(lower);
1449+
lower.accept(this.innerVisitor);
14451450
this.builder.append(",").newline();
1446-
this.emitWindowBound(upper);
1451+
upper.accept(this.innerVisitor);
14471452
this.builder.newline().decrease().append(")");
14481453
}
14491454

1450-
void emitWindowBound(DBSPWindowBoundExpression bound) {
1451-
String beforeAfter = bound.isPreceding ? "Before" : "After";
1452-
this.builder.append("RelOffset::")
1453-
.append(beforeAfter)
1454-
.append("(");
1455-
bound.representation.accept(this.innerVisitor);
1456-
this.builder.append(")");
1457-
}
1458-
14591455
boolean inOuterCircuit() {
14601456
return this.getParent().is(DBSPCircuit.class);
14611457
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/multi/CircuitWriter.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import org.dbsp.sqlCompiler.circuit.DBSPCircuit;
44
import org.dbsp.sqlCompiler.circuit.annotation.OperatorHash;
55
import org.dbsp.sqlCompiler.circuit.operator.DBSPControlledKeyFilterOperator;
6+
import org.dbsp.sqlCompiler.circuit.operator.DBSPInputMapWithWaterlineOperator;
67
import org.dbsp.sqlCompiler.circuit.operator.DBSPNestedOperator;
78
import org.dbsp.sqlCompiler.circuit.operator.DBSPOperator;
89
import org.dbsp.sqlCompiler.circuit.operator.DBSPSimpleOperator;
@@ -60,6 +61,9 @@ private void processChild(DBSPOperator node, boolean useHandles) {
6061
.append(",");
6162
}
6263
}
64+
if (useHandles && node.is(DBSPInputMapWithWaterlineOperator.class)) {
65+
this.builder().append("handle_").append(name);
66+
}
6367
this.builder().append(")");
6468
}
6569
this.builder().append(" = ");
@@ -82,14 +86,14 @@ private void processChild(DBSPOperator node, boolean useHandles) {
8286
name = input.getName(false);
8387
this.builder().append("&")
8488
.append(name)
85-
.append(",");
89+
.append(".clone(), ");
8690
}
8791
if (node.is(DBSPControlledKeyFilterOperator.class)) {
8892
DBSPControlledKeyFilterOperator filter = node.to(DBSPControlledKeyFilterOperator.class);
8993
if (this.materializations.hasRight(filter)) {
9094
DBSPSourceMultisetOperator source = this.materializations.getLeft(filter);
9195
String sourceName = source.getNodeName(false);
92-
this.builder().append("handle_").append(sourceName);
96+
this.builder().append("handle_").append(sourceName).append(".clone()");
9397
}
9498
}
9599
this.builder().append(");").newline();
@@ -118,6 +122,7 @@ void writeCircuit(DBSPCompiler compiler, DBSPCircuit circuit) {
118122
signature.decrease().append(")");
119123
}
120124

125+
SourcePositionResource.generateDeclaration(this.builder(), circuit.name);
121126
this.builder().append("pub fn ")
122127
.append(circuit.getName());
123128

@@ -148,8 +153,9 @@ void writeCircuit(DBSPCompiler compiler, DBSPCircuit circuit) {
148153
CircuitVisitor collector = new CollectSourcePositions(compiler, sourcePositionResource)
149154
.getCircuitVisitor(true);
150155
collector.apply(circuit);
151-
sourcePositionResource.generateInitializer(this.builder());
152-
SourcePositionResource.generateReference(this.builder(), CircuitWriter.SOURCE_MAP_VARIABLE_NAME);
156+
sourcePositionResource.generateInitializer(this.builder(), circuit.name);
157+
SourcePositionResource.generateReference(
158+
this.builder(), CircuitWriter.SOURCE_MAP_VARIABLE_NAME, circuit.name);
153159

154160
// Process sources first
155161
for (DBSPOperator node : circuit.getAllOperators())
@@ -166,11 +172,11 @@ void writeCircuit(DBSPCompiler compiler, DBSPCircuit circuit) {
166172
this.builder().append("Ok((");
167173
for (IInputOperator operator: circuit.sourceOperators.values()) {
168174
String name = operator.asOperator().getNodeName(false);
169-
this.builder().append("handle_").append(name).append(", ");
175+
this.builder().append("handle_").append(name).append(".clone(), ");
170176
}
171177
for (DBSPSinkOperator operator: circuit.sinkOperators.values()) {
172178
String name = operator.getNodeName(false);
173-
this.builder().append(name).append(", ");
179+
this.builder().append(name).append(".clone(), ");
174180
}
175181
this.builder().append("))");
176182
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/multi/CrateGenerator.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public final class CrateGenerator {
2525
public final String crateName;
2626
public final boolean enterprise;
2727
public final boolean lib;
28+
public final boolean testing;
2829

2930
/** Cargo file name */
3031
public static final String CARGO = "Cargo.toml";
@@ -38,14 +39,15 @@ public final class CrateGenerator {
3839
final ICodeGenerator codeGenerator;
3940

4041
public CrateGenerator(File baseDirectory, String directory, String crateName, ICodeGenerator codeGenerator,
41-
boolean enterprise, boolean lib) {
42+
boolean enterprise, boolean lib, boolean testing) {
4243
this.crateName = crateName;
4344
this.directory = directory;
4445
this.baseDirectory = baseDirectory;
4546
this.dependencies = new HashSet<>();
4647
this.codeGenerator = codeGenerator;
4748
this.enterprise = enterprise;
4849
this.lib = lib;
50+
this.testing = testing;
4951
}
5052

5153
@Override
@@ -55,14 +57,16 @@ public boolean equals(Object o) {
5557
CrateGenerator that = (CrateGenerator) o;
5658
return this.baseDirectory.equals(that.baseDirectory) &&
5759
this.crateName.equals(that.crateName) &&
58-
this.directory.equals(that.directory);
60+
this.directory.equals(that.directory) &&
61+
this.testing == that.testing;
5962
}
6063

6164
@Override
6265
public int hashCode() {
6366
int result = this.baseDirectory.hashCode();
6467
result = 31 * result + this.crateName.hashCode();
6568
result = 31 * result + this.directory.hashCode();
69+
result = 31 * result + Boolean.hashCode(this.testing);
6670
return result;
6771
}
6872

@@ -85,10 +89,16 @@ void generateCargo(PrintStream stream) {
8589
stream.println("""
8690
version = "0.1.0"
8791
edition = "2021"
88-
publish = false
89-
92+
publish = false""");
93+
if (this.testing) {
94+
stream.println("""
9095
[dev-dependencies]
91-
uuid = { version = "1.6.1" }""");
96+
# Only used in tests
97+
readers = { workspace = true }
98+
uuid = { workspace = true }
99+
metrics = { workspace = true }
100+
metrics-util = { workspace = true }""");
101+
}
92102
stream.println();
93103
if (this.lib) {
94104
stream.println("[lib]");

0 commit comments

Comments
 (0)