Skip to content

Commit bb434f9

Browse files
committed
[SQL] Java test support for the API that blocks for compaction
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent fd21237 commit bb434f9

9 files changed

Lines changed: 216 additions & 127 deletions

File tree

crates/dbsp/src/circuit/dbsp_handle.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1771,15 +1771,18 @@ impl DBSPHandle {
17711771
}
17721772

17731773
/// Block until background compaction has fully converged on every worker,
1774-
/// or until `timeout` elapses. This method is designed for use in tests.
1774+
/// or until `timeout` elapses.
17751775
///
17761776
/// Polls [`is_compaction_complete`](Self::is_compaction_complete) with
17771777
/// exponential back-off, starting at 1 ms and doubling each iteration up
17781778
/// to a cap of 1 s.
17791779
///
1780-
/// Returns `Ok(())` when compaction is complete, or
1781-
/// `Err(`[`DbspError::Constructor`]`)` when the timeout expires.
1782-
pub fn wait_for_compaction(&mut self, timeout: std::time::Duration) -> Result<(), DbspError> {
1780+
/// Returns `Ok(())` when compaction is complete, or an error if the
1781+
/// circuit fails or the timeout expires.
1782+
pub fn wait_for_compaction(
1783+
&mut self,
1784+
timeout: std::time::Duration,
1785+
) -> Result<(), anyhow::Error> {
17831786
use std::thread;
17841787
use std::time::Instant;
17851788

@@ -1795,9 +1798,7 @@ impl DBSPHandle {
17951798
}
17961799
let remaining = deadline.saturating_duration_since(Instant::now());
17971800
if remaining.is_zero() {
1798-
return Err(DbspError::Constructor(anyhow::anyhow!(
1799-
"timed out after {timeout:?} waiting for compaction to complete"
1800-
)));
1801+
anyhow::bail!("timed out after {timeout:?} waiting for compaction to complete");
18011802
}
18021803
let sleep = std::time::Duration::from_millis(sleep_ms).min(remaining);
18031804
thread::sleep(sleep);

sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/quidem/HrBaseTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@ CREATE TABLE Department(
5252
--(30, 'Marketing', ARRAY()),
5353
--(40, 'HR', ARRAY[Employee(200, 20, 'Eric', 8000, 500)])
5454
--;
55+
56+
-- Postgres syntax
57+
--INSERT INTO Depts VALUES
58+
--(10, 'Sales', ARRAY[ROW(100, 10, 'Bill', 10000, 1000)::Employee, ROW(150, 10, 'Sebastian', 7000, null)::Employee]),
59+
--(30, 'Marketing', ARRAY[]::Employee ARRAY),
60+
--(40, 'HR', ARRAY[ROW(200, 20, 'Eric', 8000, 500)::Employee])
61+
--;
5562
""");
5663
}
5764
}

sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/IncrementalRegression2Tests.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,6 @@ public void rankTests() throws SQLException {
690690
}
691691
}
692692

693-
694693
@Test
695694
public void testSumCase() {
696695
var ccs = this.getCCS("""
@@ -727,6 +726,32 @@ public void testSumCase() {
727726
0 | 0 | 0 | -1""");
728727
}
729728

729+
@Test
730+
public void issue2808() {
731+
// Check the API for blocking until compaction is completed
732+
var ccs = this.getCCS("""
733+
CREATE TABLE T(id INT, bid INT, ts TIMESTAMP, ts0 TIMESTAMP, s INT);
734+
CREATE VIEW V AS
735+
SELECT id, bid,
736+
SUM(
737+
CASE
738+
WHEN ts >= (ts0 - INTERVAL '180' DAY)
739+
AND NOT s IS NULL THEN 1
740+
ELSE 0
741+
END
742+
)
743+
FROM T GROUP BY id, bid;
744+
""");
745+
ccs.stepWeightOne("", """
746+
id | bid | sum
747+
----------------""");
748+
ccs.blockForCompaction();
749+
ccs.stepWeightOne("INSERT INTO T VALUES(1, 1, 0, 0, 0);", """
750+
id | bid | sum
751+
----------------
752+
1 | 1 | 1""");
753+
ccs.blockForCompaction();
754+
}
730755

731756
@Test
732757
public void issue6350a() {
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package org.dbsp.sqlCompiler.compiler.sql.tools;
2+
3+
/** Emit a command to block for compaction */
4+
public class BlockForCompaction implements IStreamCommand {
5+
@Override
6+
public boolean compatible(IStreamCommand other) {
7+
return true;
8+
}
9+
}

sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/tools/CompilerCircuitStream.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ public void step(String script, String expected) {
7676
this.stream.addPair(input, output);
7777
}
7878

79+
/**
80+
* Block the circuit until compaction is finished.
81+
*/
82+
public void blockForCompaction() {
83+
this.stream.addBlockForCompaction();
84+
}
85+
7986
/** Like step, but every record in the output has weight one, and the
8087
* weight column is omitted for the expected output. */
8188
public void stepWeightOne(String script, String expected) {
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.dbsp.sqlCompiler.compiler.sql.tools;
2+
3+
/** A command part of a stream of commands that a test supplies to a circuit */
4+
public interface IStreamCommand {
5+
/** True if the two stream commands can be applied to the same circuit */
6+
boolean compatible(IStreamCommand other);
7+
}

sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/tools/InputOutputChange.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
/** A pair of changes, one for inputs and the expected corresponding outputs,
44
* used in a test. */
5-
public class InputOutputChange {
5+
public class InputOutputChange implements IStreamCommand {
66
/** An input value for every input table. */
77
public final Change inputs;
88
/** An expected output value for every output view. */
@@ -25,8 +25,11 @@ public Change getOutputs() {
2525
return this.outputs;
2626
}
2727

28-
public boolean compatible(InputOutputChange change) {
29-
return this.inputs.compatible(change.inputs) &&
30-
this.outputs.compatible(change.outputs);
28+
public boolean compatible(IStreamCommand change) {
29+
if (change instanceof InputOutputChange otherChange) {
30+
return this.inputs.compatible(otherChange.inputs) &&
31+
this.outputs.compatible(otherChange.outputs);
32+
}
33+
return true;
3134
}
3235
}

sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/tools/InputOutputChangeStream.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,35 @@ public class InputOutputChangeStream {
1616
/** If non-empty it may be used to permute changes.
1717
* In this case outputs changes correspond to the views. */
1818
public final List<String> outputTables;
19-
public final List<InputOutputChange> changes;
19+
public final List<IStreamCommand> commands;
2020

2121
public InputOutputChangeStream(List<String> inputTables, List<String> outputTables) {
22-
this.changes = new ArrayList<>();
22+
this.commands = new ArrayList<>();
2323
this.inputTables = inputTables;
2424
this.outputTables = outputTables;
2525
}
2626

2727
public InputOutputChangeStream() {
28-
this.changes = new ArrayList<>();
28+
this.commands = new ArrayList<>();
2929
this.inputTables = new ArrayList<>();
3030
this.outputTables = new ArrayList<>();
3131
}
3232

33+
public InputOutputChangeStream addBlockForCompaction() {
34+
// We don't expect "block" to be the first command
35+
Utilities.enforce(! this.commands.isEmpty());
36+
this.commands.add(new BlockForCompaction());
37+
return this;
38+
}
39+
3340
public InputOutputChangeStream addChange(InputOutputChange change) {
34-
Utilities.enforce(this.changes.isEmpty() || this.changes.get(0).compatible(change),
41+
Utilities.enforce(this.commands.isEmpty() || this.commands.get(0).compatible(change),
3542
() -> "Incompatible change");
3643
Utilities.enforce(this.inputTables.isEmpty() || change.inputs.getSetCount() == this.inputTables.size(),
3744
() -> "Change does not have the same number of input tables as specified");
3845
Utilities.enforce(this.outputTables.isEmpty() || change.outputs.getSetCount() == this.outputTables.size(),
3946
() -> "Change does not have the same number of output tables as specified");
40-
this.changes.add(change);
47+
this.commands.add(change);
4148
return this;
4249
}
4350

0 commit comments

Comments
 (0)