Skip to content

Commit 6c71c44

Browse files
committed
[SQL] Linear implementation for STDDEV
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent 6b1c1bd commit 6c71c44

File tree

8 files changed

+263
-204
lines changed

8 files changed

+263
-204
lines changed

python/tests/runtime_aggtest/run.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,5 @@ if [ "${RUNTIME_AGGTEST_JOBS:-1}" -le 1 ]; then
2828
for t in "${TESTS[@]}"; do run_one "$t"; done
2929
else
3030
echo "Running tests in parallel: ${RUNTIME_AGGTEST_JOBS} jobs"
31-
printf '%s\n' "${TESTS[@]}" | xargs -P "${RUNTIME_AGGTEST_JOBS}" -I{} bash -e -c 'echo "Running: {}"; uv run --locked "$PYTHONPATH/tests/runtime_aggtest/{}"'
31+
printf '%s\n' "${TESTS[@]}" | xargs -P "${RUNTIME_AGGTEST_JOBS}" -I{} bash -e -c 'echo "Running: {}"; uv run --locked "$PYTHONPATH/tests/runtime_aggtest/{}"' || echo "{} failed"
3232
fi

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/circuit/DBSPCircuit.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,8 @@ public void sortOperators(Comparator<DBSPOperator> comparator) {
9494
this.allOperators.sort(comparator);
9595
}
9696

97-
/** Sort the nodes to be compatible with a topological order
98-
* on the specified graph.
99-
* @param graph Topological order to enforce.
100-
* Mutated by the method. */
97+
/** Sort the nodes to be compatible with a topological order on the specified graph.
98+
* @param graph Topological order to enforce. Mutated by the method. */
10199
public void resort(CircuitGraph graph) {
102100
this.allOperators.clear();
103101
DBSPOperator previous = null;

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/dot/ToDotNodesVisitor.java

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
import org.dbsp.sqlCompiler.circuit.operator.DBSPSourceBaseOperator;
1818
import org.dbsp.sqlCompiler.circuit.operator.DBSPViewBaseOperator;
1919
import org.dbsp.sqlCompiler.circuit.operator.DBSPWaterlineOperator;
20-
import org.dbsp.sqlCompiler.circuit.operator.IJoin;
20+
import org.dbsp.sqlCompiler.circuit.operator.IGCOperator;
21+
import org.dbsp.sqlCompiler.circuit.operator.IStateful;
2122
import org.dbsp.sqlCompiler.compiler.DBSPCompiler;
2223
import org.dbsp.sqlCompiler.compiler.backend.rust.ToRustInnerVisitor;
2324
import org.dbsp.sqlCompiler.compiler.errors.SourcePositionRange;
@@ -235,32 +236,20 @@ String getColor(DBSPSimpleOperator operator) {
235236
if (operator.id > lastCircuit)
236237
return " style=filled fillcolor=green";
237238
*/
238-
if (operator.is(IJoin.class)) {
239-
// There are more of these every day
240-
return " style=filled fillcolor=orangered";
241-
}
242-
if (operator.operation.contains("retain")) {
239+
if (operator.is(IGCOperator.class)) {
243240
return " style=filled fillcolor=pink";
244241
}
245-
return switch (operator.operation) {
246-
case "waterline" -> " style=filled fillcolor=lightgreen";
247-
case "controlled_filter" -> " style=filled fillcolor=cyan";
248-
case "apply", "apply2", "apply_n" -> " style=filled fillcolor=yellow";
249-
case "partitioned_rolling_aggregate_with_waterline", "window" -> " style=filled fillcolor=pink";
250-
// stateful operators
251-
case "distinct", "stream_distinct",
252-
// all aggregates require an upsert, which is stateful, even the ones that are linear
253-
"aggregate", "partitioned_rolling_aggregate",
254-
"stream_aggregate", "chain_aggregate", "linear_aggregate",
255-
// these are not derived from JoinBase
256-
"antijoin", "stream_antijoin",
257-
// delays contain state, but not that much
258-
"accumulate_delay_trace", // "transaction_delay", "differentiate",
259-
// group operators
260-
"topK", "lag_custom_order", "upsert",
261-
"integrate" -> " style=filled fillcolor=orangered";
262-
default -> "";
242+
switch (operator.operation) {
243+
case "waterline": return " style=filled fillcolor=lightgreen";
244+
case "controlled_filter": return " style=filled fillcolor=cyan";
245+
case "apply", "apply2", "apply_n": return " style=filled fillcolor=yellow";
246+
default: break;
263247
};
248+
if (operator.is(IStateful.class)) {
249+
// There are more of these every day
250+
return " style=filled fillcolor=orangered";
251+
}
252+
return "";
264253
}
265254

266255
String shorten(String operation) {

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/aggregates/AggregateCompiler.java

Lines changed: 201 additions & 160 deletions
Large diffs are not rendered by default.

sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/LinearPostprocessRetainKeys.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ public DBSPCircuit apply(DBSPCircuit circuit) {
9292

9393
for (var p: toAdd)
9494
graph.addEdge(p.left, p.right, 0);
95+
// Note: this may reorder the graph dramatically. We should probably avoid doing this
96+
// altogether if the circuit will not change, but this would change dramatically the output
97+
// of the compiler for many programs, preventing bootstrapping.
9598
circuit.resort(graph);
9699

97100
graphs.apply(circuit);

sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/MetadataTests.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.sql.SQLException;
4646
import java.sql.Statement;
4747
import java.util.Arrays;
48+
import java.util.Iterator;
4849
import java.util.List;
4950
import java.util.stream.Collectors;
5051

@@ -1472,9 +1473,19 @@ public void unusedTable() throws IOException, SQLException {
14721473
ObjectMapper mapper = new ObjectMapper();
14731474
JsonNode jsonNode = mapper.readTree(json);
14741475
// table t is not used in the calcite plan, so the lineage is an empty "AND" node.
1475-
Assert.assertEquals("source_multiset",
1476-
jsonNode.get("mir").get("s1").get("operation").asText());
1477-
JsonNode node = jsonNode.get("mir").get("s1").get("calcite").get("and");
1476+
// Locate table t:
1477+
JsonNode t = null;
1478+
for (Iterator<JsonNode> it = jsonNode.get("mir").elements(); it.hasNext(); ) {
1479+
var el = it.next();
1480+
if (el.get("operation").asText().equalsIgnoreCase("source_multiset") &&
1481+
el.get("table").asText().equalsIgnoreCase("t")) {
1482+
t = el;
1483+
break;
1484+
}
1485+
}
1486+
1487+
Assert.assertNotNull(t);
1488+
JsonNode node = t.get("calcite").get("and");
14781489
Assert.assertTrue(node.isArray());
14791490
Assert.assertTrue(node.isEmpty());
14801491
}

sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/IncrementalRegressionTests.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -919,12 +919,12 @@ SELECT id, ARG_MAX (value, ts)
919919
SELECT id FROM test_agg;""");
920920
}
921921

922-
Set<String> collectHashes(CompilerCircuit cc) {
923-
HashSet<String> result = new HashSet<>();
922+
Map<String, String> collectHashes(CompilerCircuit cc) {
923+
HashMap<String, String> result = new HashMap<>();
924924
CircuitVisitor vis = new CircuitVisitor(cc.compiler) {
925925
@Override
926-
public void postorder(DBSPOperator ignored) {
927-
result.add(ignored.getNodeName(true));
926+
public void postorder(DBSPOperator operator) {
927+
result.put(operator.getNodeName(true), operator.getNodeName(false));
928928
}
929929
};
930930
cc.getCircuit().accept(vis);
@@ -941,9 +941,9 @@ public void testHashes() {
941941
CREATE MATERIALIZED VIEW v1 AS SELECT COUNT(*) AS c FROM t1;
942942
CREATE MATERIALIZED VIEW v2 AS SELECT COUNT(*) AS c FROM t1;""");
943943
// Check that all hashes from cc1 appear unchanged in cc1
944-
Set<String> hash0 = this.collectHashes(cc0);
945-
Set<String> hash1 = this.collectHashes(cc1);
946-
Assert.assertTrue(hash1.containsAll(hash0));
944+
Map<String, String> hash0 = this.collectHashes(cc0);
945+
Map<String, String> hash1 = this.collectHashes(cc1);
946+
Assert.assertTrue(hash1.keySet().containsAll(hash0.keySet()));
947947
Assert.assertEquals(hash0.size() + 1, hash1.size());
948948
}
949949

@@ -1358,11 +1358,18 @@ and exists (
13581358
limit 20;
13591359
""";
13601360

1361-
var cc0 = this.getCC(common);
1362-
var cc1 = this.getCC(common + extra);
1363-
Set<String> hash0 = this.collectHashes(cc0);
1364-
Set<String> hash1 = this.collectHashes(cc1);
1365-
Assert.assertTrue(hash1.containsAll(hash0));
1361+
var cc0 = this.getCCS(common);
1362+
Map<String, String> hash0 = this.collectHashes(cc0);
1363+
Set<String> hash0keys = new HashSet<>(hash0.keySet());
1364+
1365+
var cc1 = this.getCCS(common + extra);
1366+
Map<String, String> hash1 = this.collectHashes(cc1);
1367+
1368+
hash0keys.removeAll(hash1.keySet());
1369+
for (String k : hash0keys)
1370+
// Nothing should be printed
1371+
System.out.println(hash0.get(k));
1372+
Assert.assertTrue(hash1.keySet().containsAll(hash0.keySet()));
13661373
}
13671374

13681375
@Test

sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/Regression2Tests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,4 +554,14 @@ public void testBetween() {
554554
false
555555
(1 row)""");
556556
}
557+
558+
@Test
559+
public void testStdDevPop() {
560+
this.qs("""
561+
WITH T(x) as (VALUES(CAST(NULL AS DECIMAL(5, 2)))) SELECT STDDEV_POP(x) FROM T;
562+
r
563+
---
564+
NULL
565+
(1 row)""");
566+
}
557567
}

0 commit comments

Comments
 (0)