Skip to content

Commit 252011a

Browse files
committed
[SQL] More deterministic code structure
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent 0147128 commit 252011a

File tree

78 files changed

+1705
-8717
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+1705
-8717
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ members = [
1616
exclude = [
1717
"sql-to-dbsp-compiler/temp",
1818
"sql-to-dbsp-compiler/experiments",
19+
"sql-to-dbsp-compiler/multi",
1920
"demo/project_demo12-HopsworksTikTokRecSys/tiktok-gen"
2021
]
2122
resolver = "2"

crates/sqllib/src/binary.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ pub fn octet_length_(value: ByteArray) -> i32 {
245245
some_function1!(octet_length, ByteArray, i32);
246246

247247
#[doc(hidden)]
248-
pub fn position__(needle: ByteArray, haystack: ByteArray) -> i32 {
248+
pub fn binary_position__(needle: ByteArray, haystack: ByteArray) -> i32 {
249249
haystack
250250
.data
251251
.windows(needle.data.len())
@@ -254,21 +254,21 @@ pub fn position__(needle: ByteArray, haystack: ByteArray) -> i32 {
254254
.unwrap_or(0) as i32
255255
}
256256

257-
some_function2!(position, ByteArray, ByteArray, i32);
257+
some_function2!(binary_position, ByteArray, ByteArray, i32);
258258

259259
#[doc(hidden)]
260-
pub fn substring2__(source: ByteArray, left: i32) -> ByteArray {
260+
pub fn binary_substring2__(source: ByteArray, left: i32) -> ByteArray {
261261
// SQL indexing starts at 1
262262
let start = if left < 1 { 0 } else { left - 1 };
263263
let data = source.data.into_iter().skip(start as usize).collect();
264264

265265
ByteArray { data }
266266
}
267267

268-
some_function2!(substring2, ByteArray, i32, ByteArray);
268+
some_function2!(binary_substring2, ByteArray, i32, ByteArray);
269269

270270
#[doc(hidden)]
271-
pub fn substring3___(source: ByteArray, left: i32, count: i32) -> ByteArray {
271+
pub fn binary_substring3___(source: ByteArray, left: i32, count: i32) -> ByteArray {
272272
// SQL indexing starts at 1
273273
let start = if left < 1 { 0 } else { left - 1 };
274274
let count = usize::try_from(count).expect("negative substring length not allowed");
@@ -283,18 +283,18 @@ pub fn substring3___(source: ByteArray, left: i32, count: i32) -> ByteArray {
283283
ByteArray { data }
284284
}
285285

286-
some_function3!(substring3, ByteArray, i32, i32, ByteArray);
286+
some_function3!(binary_substring3, ByteArray, i32, i32, ByteArray);
287287

288288
#[doc(hidden)]
289-
pub fn overlay3___(source: ByteArray, replacement: ByteArray, position: i32) -> ByteArray {
289+
pub fn binary_overlay3___(source: ByteArray, replacement: ByteArray, position: i32) -> ByteArray {
290290
let len = replacement.length() as i32;
291-
overlay4____(source, replacement, position, len)
291+
binary_overlay4____(source, replacement, position, len)
292292
}
293293

294-
some_function3!(overlay3, ByteArray, ByteArray, i32, ByteArray);
294+
some_function3!(binary_overlay3, ByteArray, ByteArray, i32, ByteArray);
295295

296296
#[doc(hidden)]
297-
pub fn overlay4____(
297+
pub fn binary_overlay4____(
298298
source: ByteArray,
299299
mut replacement: ByteArray,
300300
position: i32,
@@ -310,15 +310,15 @@ pub fn overlay4____(
310310
} else if position > source.length() as i32 {
311311
source.concat(&replacement)
312312
} else {
313-
let mut result = substring3___(source.clone(), 0, position - 1);
313+
let mut result = binary_substring3___(source.clone(), 0, position - 1);
314314
result.data.append(&mut replacement.data);
315-
let mut substr = substring2__(source, position + remove);
315+
let mut substr = binary_substring2__(source, position + remove);
316316
result.data.append(&mut substr.data);
317317
result
318318
}
319319
}
320320

321-
some_function4!(overlay4, ByteArray, ByteArray, i32, i32, ByteArray);
321+
some_function4!(binary_overlay4, ByteArray, ByteArray, i32, i32, ByteArray);
322322

323323
#[doc(hidden)]
324324
pub fn gunzip_(source: ByteArray) -> SqlString {

crates/sqllib/src/lib.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,34 +2,47 @@
22

33
#[doc(hidden)]
44
pub mod aggregates;
5-
#[doc(hidden)]
5+
pub use aggregates::*;
66
pub mod array;
7+
pub use array::*;
78
#[doc(hidden)]
89
pub mod binary;
10+
pub use binary::*;
911
#[doc(hidden)]
1012
pub mod casts;
13+
pub use casts::*;
1114
#[doc(hidden)]
1215
pub mod decimal;
16+
pub use decimal::*;
1317
#[doc(hidden)]
1418
pub mod error;
19+
pub use error::*;
1520
#[doc(hidden)]
1621
pub mod geopoint;
22+
pub use geopoint::*;
1723
#[doc(hidden)]
1824
pub mod interval;
25+
pub use interval::*;
1926
#[doc(hidden)]
2027
pub mod map;
28+
pub use map::*;
2129
#[doc(hidden)]
2230
pub mod operators;
31+
pub use operators::*;
2332
#[doc(hidden)]
2433
pub mod source;
2534
#[doc(hidden)]
2635
pub mod string;
36+
pub use string::*;
2737
#[doc(hidden)]
2838
pub mod timestamp;
39+
pub use timestamp::*;
2940
#[doc(hidden)]
3041
pub mod uuid;
42+
pub use uuid::*;
3143
#[doc(hidden)]
3244
pub mod variant;
45+
pub use variant::*;
3346

3447
pub use array::Array;
3548
pub use binary::ByteArray;

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/CompilerMain.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -230,21 +230,22 @@ CompilerMessages run() throws SQLException {
230230
this.options.ioOptions.verbosity, dotFormat, circuit);
231231
return compiler.messages;
232232
}
233+
MultiCratesWriter multiWriter = null;
233234
try {
234-
if (!compiler.options.ioOptions.crates) {
235+
if (!compiler.options.ioOptions.multiCrates()) {
235236
PrintStream stream = this.getOutputStream();
236237
RustFileWriter writer = new RustFileWriter();
237238
IIndentStream indent = new IndentStream(stream);
238-
writer.setOutputStream(indent);
239+
writer.setOutputBuilder(indent);
239240
writer.add(circuit);
240241
writer.write(compiler);
241242
stream.close();
242243
} else {
243244
if (options.ioOptions.emitHandles)
244245
throw new CompilationError("The option '--crates' cannot be used with '--handles'");
245-
MultiCratesWriter writer = new MultiCratesWriter(options.ioOptions.outputFile);
246-
writer.add(circuit);
247-
writer.write(compiler);
246+
multiWriter = new MultiCratesWriter(options.ioOptions.outputFile, options.ioOptions.crates, true);
247+
multiWriter.add(circuit);
248+
multiWriter.write(compiler);
248249
}
249250
} catch (IOException e) {
250251
compiler.reportError(SourcePositionRange.INVALID,
@@ -256,8 +257,16 @@ CompilerMessages run() throws SQLException {
256257
List<DBSPFunction> extern = Linq.where(compiler.functions, f -> f.body == null);
257258
String outputFile = this.options.ioOptions.outputFile;
258259
if (!outputFile.isEmpty()) {
260+
Path stubs;
259261
String outputPath = new File(outputFile).getAbsolutePath();
260-
Path stubs = Paths.get(outputPath).getParent().resolve(DBSPCompiler.STUBS_FILE_NAME);
262+
if (options.ioOptions.multiCrates()) {
263+
// Generate globals/src/stubs.rs
264+
String globals = multiWriter.getGlobalsName();
265+
stubs = Paths.get(outputPath).resolve(globals).resolve("src").resolve(DBSPCompiler.STUBS_FILE_NAME);
266+
} else {
267+
// Generate stubs.rs in the same directory
268+
stubs = Paths.get(outputPath).getParent().resolve(DBSPCompiler.STUBS_FILE_NAME);
269+
}
261270
PrintStream protosStream = new PrintStream(Files.newOutputStream(stubs));
262271

263272
if (compiler.options.ioOptions.verbosity > 0)
@@ -273,9 +282,7 @@ CompilerMessages run() throws SQLException {
273282
274283
use feldera_sqllib::*;
275284
use crate::*;
276-
277285
""");
278-
279286
for (DBSPFunction function : extern) {
280287
function = this.generateStubBody(function);
281288
String str = ToRustInnerVisitor.toRustString(compiler, function, false);

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/DBSPCircuit.java

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.dbsp.sqlCompiler.circuit.operator.DBSPViewOperator;
3232
import org.dbsp.sqlCompiler.compiler.ProgramMetadata;
3333
import org.dbsp.sqlCompiler.compiler.backend.JsonDecoder;
34-
import org.dbsp.sqlCompiler.compiler.errors.CompilationError;
3534
import org.dbsp.sqlCompiler.compiler.frontend.calciteCompiler.ProgramIdentifier;
3635
import org.dbsp.sqlCompiler.compiler.frontend.calciteObject.CalciteObject;
3736
import org.dbsp.sqlCompiler.compiler.visitors.VisitDecision;
@@ -154,26 +153,10 @@ public void setName(String name) {
154153
this.name = name;
155154
}
156155

157-
public void replaceDeclaration(DBSPDeclaration decl) {
158-
DBSPDeclaration existing = Utilities.getExists(this.declarationMap, decl.getName());
159-
this.declarations.remove(existing);
160-
this.declarations.add(decl);
161-
this.declarationMap.put(decl.getName(), decl);
162-
}
163-
164-
@Nullable
165-
public DBSPDeclaration getDeclaration(String name) {
166-
return this.declarationMap.get(name);
167-
}
168-
169156
public void addDeclaration(DBSPDeclaration decl) {
157+
if (this.declarationMap.containsKey(decl.getName()))
158+
return;
170159
this.declarations.add(decl);
171-
if (this.declarationMap.containsKey(decl.getName())) {
172-
DBSPDeclaration prev = Utilities.getExists(this.declarationMap, decl.getName());
173-
throw new CompilationError(
174-
"Declaration " + decl + " has the same name as an existing declaration " + prev,
175-
decl.item.getNode());
176-
}
177160
Utilities.putNew(this.declarationMap, decl.getName(), decl);
178161
}
179162

@@ -275,7 +258,8 @@ public void accept(CircuitVisitor visitor) {
275258
visitor.pop(this);
276259
}
277260

278-
/** Return true if this circuit and other are identical (have the exact same operators and declarations). */
261+
/** Return true if this circuit and other are identical
262+
* (have the exact same operators and declarations). */
279263
public boolean sameCircuit(ICircuit other) {
280264
if (this == other)
281265
return true;

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/OutputPort.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.dbsp.sqlCompiler.circuit;
22

33
import com.fasterxml.jackson.databind.JsonNode;
4+
import org.dbsp.sqlCompiler.circuit.operator.DBSPNestedOperator;
45
import org.dbsp.sqlCompiler.circuit.operator.DBSPOperator;
56
import org.dbsp.sqlCompiler.circuit.operator.DBSPSimpleOperator;
67
import org.dbsp.sqlCompiler.compiler.backend.JsonDecoder;
@@ -50,14 +51,25 @@ public int port() {
5051
return this.outputNumber;
5152
}
5253

53-
public String getNodeName(boolean preferHash) {
54+
public String getName(boolean preferHash) {
55+
if (this.isSimpleNode())
56+
return this.getNodeName(preferHash);
57+
else if (this.node().is(DBSPNestedOperator.class))
58+
return this.node().to(DBSPNestedOperator.class)
59+
.internalOutputs.get(this.outputNumber)
60+
.getName(preferHash);
61+
else
62+
return this.getNodeName(preferHash) + "_" + this.outputNumber;
63+
}
64+
65+
private String getNodeName(boolean preferHash) {
5466
return this.node().getNodeName(preferHash);
5567
}
5668

5769
public String getOutputName() {
5870
if (this.isSimpleNode())
5971
return this.getNodeName(false);
60-
return this.getNodeName(false) + "." + this.outputNumber;
72+
return this.getNodeName(false) + "_" + this.outputNumber;
6173
}
6274

6375
public DBSPTypeZSet getOutputZSetType() { return this.outputType().to(DBSPTypeZSet.class); }
@@ -136,4 +148,8 @@ public void asJson(ToJsonOuterVisitor visitor) {
136148
public String asJson() {
137149
return "{ \"node\": " + this.operator.id + ", \"output\": " + this.outputNumber + " }";
138150
}
151+
152+
public DBSPType streamType(boolean outerCircuit) {
153+
return this.node().outputStreamType(this.outputNumber, outerCircuit);
154+
}
139155
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/annotation/MerkleHash.java

Lines changed: 0 additions & 46 deletions
This file was deleted.
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package org.dbsp.sqlCompiler.circuit.annotation;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import org.dbsp.sqlCompiler.circuit.operator.DBSPOperator;
5+
import org.dbsp.util.HashString;
6+
import org.dbsp.util.JsonStream;
7+
import org.dbsp.util.Utilities;
8+
9+
import javax.annotation.Nullable;
10+
import java.util.List;
11+
12+
/** Stores a hash value for each operator */
13+
public class OperatorHash extends Annotation {
14+
public final HashString hash;
15+
public final boolean global;
16+
17+
public OperatorHash(HashString hash, boolean global) {
18+
this.hash = hash;
19+
this.global = global;
20+
}
21+
22+
@Override
23+
public boolean invisible() {
24+
return true;
25+
}
26+
27+
public boolean isGlobal(boolean global) {
28+
return this.global == global;
29+
}
30+
31+
public static OperatorHash fromJson(JsonNode node) {
32+
String hash = Utilities.getStringProperty(node, "hash");
33+
boolean global = Utilities.getBooleanProperty(node, "global");
34+
return new OperatorHash(new HashString(hash), global);
35+
}
36+
37+
@Nullable
38+
public static HashString getHash(DBSPOperator operator, boolean global) {
39+
List<Annotation> name = operator.annotations.get(
40+
t -> t.is(OperatorHash.class) && t.to(OperatorHash.class).isGlobal(global));
41+
if (!name.isEmpty()) {
42+
// there should be only one
43+
return name.get(0).to(OperatorHash.class).hash;
44+
}
45+
return null;
46+
}
47+
48+
@Override
49+
public void asJson(JsonStream stream) {
50+
stream.beginObject().appendClass(this);
51+
stream.label("hash");
52+
stream.append(this.hash.toString());
53+
stream.label("global");
54+
stream.append(this.global);
55+
stream.endObject();
56+
}
57+
58+
@Override
59+
public String toString() {
60+
return (this.global ? "Global " : "") + "hash: " + this.hash;
61+
}
62+
}

0 commit comments

Comments
 (0)