Skip to content

Commit 1ed7d0f

Browse files
committed
[SQL] Use correct stream to materialize tables with LATENESS
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent 4ac9eb6 commit 1ed7d0f

6 files changed

Lines changed: 197 additions & 34 deletions

File tree

crates/adapters/src/adhoc/table.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,6 @@ use `with ('materialized' = 'true')` for tables, or `create materialized view` f
491491
}
492492
let mut w = cursor.weight();
493493

494-
// Skip deleted records.
495494
if w < 0 {
496495
cursor.step_key();
497496
panic!("Unexpected key with negative weight encountered while processing ad-hoc query");

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/ToRustVisitor.java

Lines changed: 74 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.dbsp.sqlCompiler.circuit.operator.DBSPDistinctOperator;
4949
import org.dbsp.sqlCompiler.circuit.operator.DBSPJoinBaseOperator;
5050
import org.dbsp.sqlCompiler.circuit.operator.DBSPOperator;
51+
import org.dbsp.sqlCompiler.circuit.operator.DBSPSourceMultisetOperator;
5152
import org.dbsp.sqlCompiler.circuit.operator.DBSPStreamJoinIndexOperator;
5253
import org.dbsp.sqlCompiler.circuit.operator.DBSPIndexedTopKOperator;
5354
import org.dbsp.sqlCompiler.circuit.operator.DBSPIntegrateTraceRetainKeysOperator;
@@ -89,6 +90,7 @@
8990
import org.dbsp.sqlCompiler.compiler.visitors.outer.DeclareComparators;
9091
import org.dbsp.sqlCompiler.ir.IDBSPInnerNode;
9192
import org.dbsp.sqlCompiler.ir.IDBSPNode;
93+
import org.dbsp.sqlCompiler.ir.IDBSPOuterNode;
9294
import org.dbsp.sqlCompiler.ir.expression.DBSPBinaryExpression;
9395
import org.dbsp.sqlCompiler.ir.expression.DBSPClosureExpression;
9496
import org.dbsp.sqlCompiler.ir.expression.DBSPComparatorExpression;
@@ -112,6 +114,7 @@
112114
import org.dbsp.sqlCompiler.ir.type.derived.DBSPTypeStruct;
113115
import org.dbsp.sqlCompiler.ir.type.user.DBSPTypeZSet;
114116
import org.dbsp.sqlCompiler.ir.type.primitive.DBSPTypeBool;
117+
import org.dbsp.util.Bijection;
115118
import org.dbsp.util.HashString;
116119
import org.dbsp.util.IIndentStream;
117120
import org.dbsp.util.IndentStream;
@@ -508,6 +511,36 @@ public VisitDecision preorder(DBSPDeltaOperator delta) {
508511
return VisitDecision.STOP;
509512
}
510513

514+
void registerTable(DBSPSourceMultisetOperator operator, DBSPOperator materialized) {
515+
// Register a table. If the table is materialized and has lateness,
516+
// the "materialized" operator may actually be the controlled filter following the table.
517+
// In other cases, materialized == operator.
518+
// The "materialized" operator's output stream is used to build the integral that serves the table contents,
519+
// and that integral should not contain the late values.
520+
// Note that this is never a problem for SourceMapOperators with lateness, due to the
521+
// nature of the circuits synthesized for them.
522+
this.builder.newline();
523+
this.generateStructHelpers(operator.originalRowType, operator.metadata);
524+
String registerFunction = operator.metadata.materialized ?
525+
"register_materialized_input_zset" : "register_input_zset";
526+
this.builder.append("catalog.")
527+
.append(registerFunction)
528+
.append("::<_, ");
529+
IHasSchema tableDescription = this.metadata.getTableDescription(operator.tableName);
530+
JsonNode j = tableDescription.asJson(true);
531+
j = this.stripProperties(j);
532+
DBSPStrLiteral json = new DBSPStrLiteral(j.toString(), true);
533+
operator.originalRowType.accept(this.innerVisitor);
534+
this.builder.append(">(")
535+
.append(materialized.getOutput(0).getName(this.preferHash))
536+
.append(".clone(), ")
537+
.append(this.handleName(operator))
538+
.append(", ");
539+
json.accept(this.innerVisitor);
540+
this.innerVisitor.setOperatorContext(null);
541+
this.builder.append(");");
542+
}
543+
511544
@Override
512545
public VisitDecision preorder(DBSPSourceMultisetOperator operator) {
513546
this.computeHash(operator);
@@ -532,26 +565,9 @@ public VisitDecision preorder(DBSPSourceMultisetOperator operator) {
532565
}
533566
this.tagStream(operator);
534567
if (!this.useHandles) {
535-
this.builder.newline();
536-
this.generateStructHelpers(operator.originalRowType, operator.metadata);
537-
String registerFunction = operator.metadata.materialized ?
538-
"register_materialized_input_zset" : "register_input_zset";
539-
this.builder.append("catalog.")
540-
.append(registerFunction)
541-
.append("::<_, ");
542-
IHasSchema tableDescription = this.metadata.getTableDescription(operator.tableName);
543-
JsonNode j = tableDescription.asJson(true);
544-
j = this.stripProperties(j);
545-
DBSPStrLiteral json = new DBSPStrLiteral(j.toString(), true);
546-
operator.originalRowType.accept(this.innerVisitor);
547-
this.builder.append(">(")
548-
.append(operator.getNodeName(this.preferHash))
549-
.append(".clone(), ")
550-
.append(this.handleName(operator))
551-
.append(", ");
552-
json.accept(this.innerVisitor);
553-
this.innerVisitor.setOperatorContext(null);
554-
this.builder.append(");");
568+
if (!this.materialization.has(operator)) {
569+
this.registerTable(operator, operator);
570+
}
555571
}
556572
return VisitDecision.STOP;
557573
}
@@ -798,6 +814,13 @@ public VisitDecision preorder(DBSPControlledKeyFilterOperator operator) {
798814
this.builder.newline()
799815
.append(operator.getOutput(0).getName(this.preferHash))
800816
.append(".set_persistent_id(hash);");
817+
if (!this.useHandles) {
818+
if (this.materialization.hasRight(operator)) {
819+
// Materialize now; otherwise, materialize at the ControlledFilter operator
820+
DBSPSourceMultisetOperator table = this.materialization.getLeft(operator);
821+
this.registerTable(table, operator);
822+
}
823+
}
801824
return VisitDecision.STOP;
802825
}
803826

@@ -1542,14 +1565,44 @@ public VisitDecision preorder(DBSPConstantOperator operator) {
15421565
return this.constantLike(operator);
15431566
}
15441567

1568+
// Keeps track of tables that have to be materialized at a different point in the circuit.
1569+
Bijection<DBSPSourceMultisetOperator, DBSPControlledKeyFilterOperator> materialization = new Bijection<>();
1570+
1571+
void discoverLateMaterializations(DBSPCircuit circuit) {
1572+
// Discover input nodes that immediately feed the left input of a controlled_key_filter operator.
1573+
// This pattern is generated by inputs with LATENESS annotations.
1574+
// If the input table is materialized, the materialized stream has
1575+
// to be the output of the controlled_key_filter operator.
1576+
for (DBSPOperator op: circuit.getAllOperators()) {
1577+
// I don't think we need to recurse for nested nodes; the input nodes are all the outer level
1578+
if (op.is(DBSPControlledKeyFilterOperator.class)) {
1579+
DBSPControlledKeyFilterOperator filter = op.to(DBSPControlledKeyFilterOperator.class);
1580+
DBSPOperator left = filter.left().operator;
1581+
if (!left.is(DBSPSourceMultisetOperator.class)) continue;
1582+
DBSPSourceMultisetOperator source = left.to(DBSPSourceMultisetOperator.class);
1583+
if (source.metadata.materialized)
1584+
this.materialization.map(source, filter);
1585+
}
1586+
}
1587+
}
1588+
1589+
@Override
1590+
public Token startVisit(IDBSPOuterNode node) {
1591+
Token result = super.startVisit(node);
1592+
DBSPCircuit circuit = node.to(DBSPCircuit.class);
1593+
this.discoverLateMaterializations(circuit);
1594+
return result;
1595+
}
1596+
15451597
@Override
15461598
public VisitDecision preorder(DBSPNowOperator operator) {
15471599
return this.constantLike(operator);
15481600
}
15491601

15501602
public static void toRustString(DBSPCompiler compiler, IIndentStream stream,
15511603
DBSPCircuit circuit, ProjectDeclarations projectDeclarations) {
1552-
ToRustVisitor visitor = new ToRustVisitor(compiler, stream, circuit.getMetadata(), projectDeclarations);
1604+
ToRustVisitor visitor = new ToRustVisitor(
1605+
compiler, stream, circuit.getMetadata(), projectDeclarations);
15531606
visitor.apply(circuit);
15541607
}
15551608
}

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/CSE.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@
1616

1717
import java.util.ArrayList;
1818
import java.util.HashMap;
19-
import java.util.HashSet;
2019
import java.util.List;
2120
import java.util.Map;
22-
import java.util.Set;
2321

2422
/** Common-subexpression elimination at the level of circuit operators */
2523
public class CSE extends Repeat {
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package org.dbsp.util;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
6+
/** A Bijection is a pair of one-to-one maps */
7+
public class Bijection<L, R> {
8+
Map<L, R> left;
9+
Map<R, L> right;
10+
11+
public Bijection() {
12+
this.left = new HashMap<>();
13+
this.right = new HashMap<>();
14+
}
15+
16+
public void map(L left, R right) {
17+
Utilities.putNew(this.left, left, right);
18+
Utilities.putNew(this.right, right, left);
19+
}
20+
21+
public boolean has(L left) { return this.left.containsKey(left); }
22+
23+
public boolean hasRight(R right) { return this.right.containsKey(right); }
24+
25+
public R getRight(L left) {
26+
return Utilities.getExists(this.left, left);
27+
}
28+
29+
public L getLeft(R right) {
30+
return Utilities.getExists(this.right, right);
31+
}
32+
}

sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/OtherTests.java

Lines changed: 80 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,7 @@ pub fn test() {
458458
#[test]
459459
pub fn test() {
460460
use dbsp_adapters::{CircuitCatalog, RecordFormat};
461+
use feldera_types::format::csv::CsvParserConfig;
461462
462463
let (mut circuit, catalog) = circuit(CircuitConfig::with_workers(2))
463464
.expect("Failed to build circuit");
@@ -489,11 +490,16 @@ pub fn test() {
489490
.delta_handle;
490491
491492
// Read the produced output
492-
let out = adult.consolidate();
493-
// Print the produced output
494-
println!("{:?}", out);
495-
}
496-
""";
493+
let reader = adult.concat();
494+
let mut cursor = reader
495+
.cursor(RecordFormat::Csv(CsvParserConfig::default()))
496+
.unwrap();
497+
while cursor.key_valid() {
498+
let mut w = cursor.weight();
499+
println!("{}: {}", cursor.key_to_json().unwrap(), w);
500+
cursor.step_key();
501+
}
502+
}""";
497503
File file = createInputScript(sql);
498504
CompilerMessages message = CompilerMain.execute(
499505
"--handles", "-o", BaseSQLTests.TEST_FILE_PATH, file.getPath());
@@ -505,7 +511,7 @@ pub fn test() {
505511
fr.write(rustHandlesTest);
506512
}
507513
if (!BaseSQLTests.skipRust)
508-
Utilities.compileAndCheckRust(BaseSQLTests.RUST_DIRECTORY, true);
514+
Utilities.compileAndTestRust(BaseSQLTests.RUST_DIRECTORY, true);
509515

510516
// Second test
511517
message = CompilerMain.execute(
@@ -518,7 +524,74 @@ pub fn test() {
518524
fr.write(rustCatalogTest);
519525
}
520526
if (!BaseSQLTests.skipRust)
521-
Utilities.compileAndCheckRust(BaseSQLTests.RUST_DIRECTORY, true);
527+
Utilities.compileAndTestRust(BaseSQLTests.RUST_DIRECTORY, true);
528+
}
529+
530+
@Test
531+
public void issue4895() throws IOException, InterruptedException, SQLException {
532+
String sql = """
533+
create table t (x int lateness 0) with
534+
('materialized' = 'true');
535+
create materialized view V as SELECT * FROM t;""";
536+
String rustCatalogTest = """
537+
#[test]
538+
pub fn test0() {
539+
use dbsp_adapters::{CircuitCatalog, RecordFormat};
540+
use feldera_types::format::json::JsonFlavor;
541+
542+
let mut circuitAndStreams = circuit(CircuitConfig::with_workers(2usize)).unwrap();
543+
let streams: Catalog = circuitAndStreams.1;
544+
let t = &SqlIdentifier::new("t", false);
545+
let input = streams.input_collection_handle(t).unwrap();
546+
let mut writer = input
547+
.handle
548+
.configure_deserializer(RecordFormat::Csv(Default::default()))
549+
.unwrap();
550+
writer.insert(b"1").unwrap();
551+
writer.flush();
552+
writer.insert(b"2").unwrap();
553+
writer.flush();
554+
writer.insert(b"3").unwrap();
555+
writer.flush();
556+
circuitAndStreams.0.transaction().unwrap();
557+
// late value
558+
writer.insert(b"0").unwrap();
559+
writer.flush();
560+
circuitAndStreams.0.transaction().unwrap();
561+
562+
let output = streams
563+
.output_handles(t)
564+
.unwrap()
565+
.integrate_handle
566+
.clone()
567+
.unwrap()
568+
.clone();
569+
let reader = output.concat();
570+
let mut cursor = reader
571+
.cursor(RecordFormat::Json(JsonFlavor::default()))
572+
.unwrap();
573+
let mut rows: u32 = 0;
574+
while (cursor.key_valid()) {
575+
let mut w = cursor.weight();
576+
// println!("{}: {}", cursor.key_to_json().unwrap(), w);
577+
rows = rows + 1;
578+
cursor.step_key();
579+
}
580+
// There should be only 3 rows; the last inserted one should be missing
581+
assert!(3 == rows, "Expected 3 rows, got {}", rows);
582+
}""";
583+
File file = createInputScript(sql);
584+
CompilerMessages message = CompilerMain.execute(
585+
"-i", "-o", BaseSQLTests.TEST_FILE_PATH, file.getPath());
586+
Assert.assertEquals(0, message.exitCode);
587+
Assert.assertTrue(file.exists());
588+
589+
File rust = new File(BaseSQLTests.TEST_FILE_PATH);
590+
try (FileWriter fr = new FileWriter(rust, true)) { // append
591+
fr.write(rustCatalogTest);
592+
}
593+
if (!BaseSQLTests.skipRust)
594+
Utilities.compileAndTestRust(BaseSQLTests.RUST_DIRECTORY, true);
522595
}
523596

524597
@Test

sql-to-dbsp-compiler/using.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -388,13 +388,15 @@ We exercise this circuit by inserting data using a CSV format:
388388
#[test]
389389
pub fn test() {
390390
use dbsp_adapters::{CircuitCatalog, RecordFormat};
391+
use use feldera_types::format::csv::CsvParserConfig;
391392
392393
let (mut circuit, catalog) = circuit(2)
393394
.expect("Failed to build circuit");
394395
let persons = catalog
395396
.input_collection_handle(&SqlIdentifier::from("PERSON"))
396397
.expect("Failed to get input collection handle");
397398
let mut persons_stream = persons
399+
.handle
398400
.configure_deserializer(RecordFormat::Csv(Default::default())
399401
.expect("Failed to configure deserializer");
400402
persons_stream
@@ -418,9 +420,15 @@ pub fn test() {
418420
.delta_handle;
419421
420422
// Read the produced output
421-
let out = adult.concat().consolidate();
422-
// Print the produced output
423-
println!("{:?}", out);
423+
let reader = adult.concat();
424+
let mut cursor = reader
425+
.cursor(RecordFormat::Csv(CsvParserConfig::default()))
426+
.unwrap();
427+
while cursor.key_valid() {
428+
let mut w = cursor.weight();
429+
println!("{}: {}", cursor.key_to_json().unwrap(), w);
430+
cursor.step_key();
431+
}
424432
}
425433
```
426434

0 commit comments

Comments
 (0)