Skip to content

Commit 3cf0b77

Browse files
committed
[dbsp] Fix bootstrapping of tables with a PK.
This fixes an issue when bootstrapping a table with a PK when there is a downstream operator attached to it that creates an integral of the same table. We ended up with two integrals that can both be used to replay the same stream, one with and one without an accumulator. We used to replay from the last registered replay source, which meant that if the second integral was added in the modified version of the program, it was empty and replay failed, despite the fact that the input integral could be used for replay. To make things worse, we report this error as the input table not being materialized, which is simply wrong. This commit adds a simle workaround that uses the first registered replay source (by refusing to register new replay sources for the same stream) and a regression test for this. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent 32d7d5b commit 3cf0b77

2 files changed

Lines changed: 101 additions & 21 deletions

File tree

crates/dbsp/src/circuit/circuit_builder.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1497,10 +1497,20 @@ pub(crate) fn register_replay_stream<C, B>(
14971497
// We currently only support using operators in the top-level circuit
14981498
// as replay sources.
14991499
if TypeId::of::<()>() == TypeId::of::<C::Time>() {
1500-
circuit.cache_insert(
1501-
ReplaySource::new(stream.stream_id()),
1502-
Box::new(replay_stream.clone()),
1503-
);
1500+
// If a replay source already exists, don't overwrite it. This normally shouldn't
1501+
// happen as we should not have more than one integral for each stream. One situation
1502+
// where this does happen today is for input streams that have an integral without
1503+
// an accumulator as part of input_upsert, and another integral with an accumulator
1504+
// created by a downstream join or aggregate. In this case, we want to use the former
1505+
// for replay, as the latter may have been added in the new version of the program
1506+
// and may be empty, while the former can have state (conversely, if the input integral
1507+
// is empty, the downstream integral is guaranteed to be empty too).
1508+
if !circuit.cache_contains(&ReplaySource::new(stream.stream_id())) {
1509+
circuit.cache_insert(
1510+
ReplaySource::new(stream.stream_id()),
1511+
Box::new(replay_stream.clone()),
1512+
);
1513+
}
15041514
}
15051515
}
15061516

crates/dbsp/src/circuit/replay_tests.rs

Lines changed: 87 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use feldera_types::config::StorageConfig;
33
use crate::{
44
CmpFunc, DBData, OrdZSet, OutputHandle, RootCircuit, Runtime, Stream, ZSetHandle, ZWeight,
55
circuit::dbsp_handle::CircuitStorageConfig,
6-
default_hash,
6+
default_hash, indexed_zset,
77
operator::{
88
Max, Min,
99
time_series::{RelOffset, RelRange},
@@ -186,6 +186,22 @@ fn test_replay<I1, I2, I3, O1, O2, O3>(
186186
);
187187
}
188188

189+
fn circuit_config(path: &PathBuf) -> CircuitConfig {
190+
CircuitConfig::with_workers(NUM_WORKERS)
191+
.with_splitter_chunk_size_records(2)
192+
.with_mode(Mode::Persistent)
193+
.with_storage(Some(
194+
CircuitStorageConfig::for_config(
195+
StorageConfig {
196+
path: path.to_string_lossy().into_owned(),
197+
cache: Default::default(),
198+
},
199+
Default::default(),
200+
)
201+
.unwrap(),
202+
))
203+
}
204+
189205
fn test_replay_with_step_size<I1, I2, I3, O1, O2, O3>(
190206
circuit_constructor1: CircuitFn<I1, I2, O1, O2>,
191207
circuit_constructor2: CircuitFn<I2, I3, O2, O3>,
@@ -210,22 +226,6 @@ fn test_replay_with_step_size<I1, I2, I3, O1, O2, O3>(
210226
let path = tempfile::tempdir().unwrap().keep();
211227
println!("Running replay_test in {}", path.display());
212228

213-
fn circuit_config(path: &PathBuf) -> CircuitConfig {
214-
CircuitConfig::with_workers(NUM_WORKERS)
215-
.with_splitter_chunk_size_records(2)
216-
.with_mode(Mode::Persistent)
217-
.with_storage(Some(
218-
CircuitStorageConfig::for_config(
219-
StorageConfig {
220-
path: path.to_string_lossy().into_owned(),
221-
cache: Default::default(),
222-
},
223-
Default::default(),
224-
)
225-
.unwrap(),
226-
))
227-
}
228-
229229
// Create both reference circuits, feed I1 and I2 to circuit1; feed I2 and I3 to circuit2.
230230
let mut reference_output1 = Vec::new();
231231
let mut reference_output2 = Vec::new();
@@ -1690,3 +1690,73 @@ fn test_rolling_circuit() {
16901690
std::iter::repeat_n((), 20).collect(),
16911691
);
16921692
}
1693+
1694+
// Regression test:
1695+
//
1696+
// Pipeline 1:
1697+
// ---> input_map
1698+
//
1699+
// Pipeline 2:
1700+
// ---> input_map ---> aggregate --> output
1701+
//
1702+
// The second pipeline should replay the input from the input_map operator.
1703+
// A bug prevented this from happening, because the integral built by the
1704+
// aggregate operator was used to replay instead.
1705+
1706+
#[test]
1707+
fn regression1() {
1708+
init_test_logger();
1709+
1710+
let path = tempfile::tempdir().unwrap().keep();
1711+
1712+
let (mut circuit1, input_handle1) =
1713+
Runtime::init_circuit(circuit_config(&path), move |circuit| {
1714+
let (input_stream, input_handle) = circuit
1715+
.add_input_map_persistent::<u64, u64, u64, _>(Some("input_map"), |v, u| *v = *u);
1716+
input_stream.set_persistent_id(Some("input_map"));
1717+
Ok(input_handle)
1718+
})
1719+
.unwrap();
1720+
1721+
input_handle1.push(0, crate::operator::Update::Insert(0));
1722+
1723+
circuit1.transaction().unwrap();
1724+
1725+
// Checkpoint.
1726+
let checkpoint = circuit1.checkpoint().run().unwrap();
1727+
circuit1.kill().unwrap();
1728+
1729+
// Restart the second circuit from the checkpoint.
1730+
let mut circuit_config = circuit_config(&path);
1731+
circuit_config.storage.as_mut().unwrap().init_checkpoint = Some(checkpoint.uuid);
1732+
1733+
let (mut circuit2, (_input_handle2, output_handle2)) =
1734+
Runtime::init_circuit(circuit_config, move |circuit| {
1735+
let (input_stream, input_handle) = circuit
1736+
.add_input_map_persistent::<u64, u64, u64, _>(Some("input_map"), |v, u| *v = *u);
1737+
input_stream.set_persistent_id(Some("input_map"));
1738+
1739+
//let input_stream = input_stream.apply_owned(|x| x).set_persistent_id(Some("input_map2"));
1740+
1741+
let aggregate = input_stream
1742+
.aggregate_persistent(Some("aggregate1"), Max)
1743+
.set_persistent_id(Some("aggregate1"));
1744+
1745+
let output_handle = aggregate
1746+
.accumulate_trace()
1747+
.apply(|trace| trace.ro_snapshot().consolidate())
1748+
.output_persistent(Some("output"));
1749+
Ok((input_handle, output_handle))
1750+
})
1751+
.unwrap();
1752+
1753+
while circuit2.bootstrap_in_progress() {
1754+
circuit2.transaction().unwrap();
1755+
}
1756+
println!("Replay finished");
1757+
1758+
//let output = output_handle2.take_from_all().concat().consolidate();
1759+
let actual_output = &output_handle2.concat().consolidate();
1760+
1761+
assert_eq!(actual_output, &indexed_zset!(0 => {0 => 1}));
1762+
}

0 commit comments

Comments
 (0)