Skip to content

Commit dffc5b1

Browse files
committed
[adapters] Don't initiailize snapshots until bootstrapping completes.
Don't update the snapshot until bootstrapping is complete (including the additional post-bootstrap transaction). This guarantees that: 1. Ad hoc queries observe a consistent snapshot of all views. 2. Connectors configured with `send_snapshot=true` don't receive empty . Extended the Python output snapshot test with an extra step that fails without this fix. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent 555c33c commit dffc5b1

2 files changed

Lines changed: 111 additions & 5 deletions

File tree

crates/adapters/src/controller.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3035,10 +3035,16 @@ impl CircuitThread {
30353035
// processing; otherwise, there is a race for any code that runs a query
30363036
// as soon as input has been processed.
30373037
if transaction_state == TransactionState::None {
3038-
Span::new("update")
3039-
.with_category("Step")
3040-
.with_tooltip(|| format!("update ad-hoc tables after step {}", self.step))
3041-
.in_scope(|| self.update_snapshot());
3038+
// Don't update the snapshot until bootstrapping is complete (including the additional post-bootstrap transaction).
3039+
// This guarantees that:
3040+
// 1. Ad hoc queries observe a consistent view of the data.
3041+
// 2. Ad hoc snapshots are up-to-date before the pipeline is marked as running.
3042+
if !self.controller.status.bootstrap_in_progress() {
3043+
Span::new("update")
3044+
.with_category("Step")
3045+
.with_tooltip(|| format!("update ad-hoc tables after step {}", self.step))
3046+
.in_scope(|| self.update_snapshot());
3047+
}
30423048

30433049
let bootstrapping = self.circuit.bootstrap_in_progress();
30443050

python/tests/runtime/test_output_snapshot.py

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,12 @@ def _delta_connector(
120120
location: DeltaTestLocation,
121121
*,
122122
send_snapshot: bool,
123+
name: str = "delta_out",
123124
index: str | None = None,
124125
) -> dict:
125126
"""Build a `delta_table_output` connector config."""
126127
connector: dict = {
127-
"name": "delta_out",
128+
"name": name,
128129
"send_snapshot": send_snapshot,
129130
"transport": {
130131
"name": "delta_table_output",
@@ -185,6 +186,58 @@ def _build_sql(locations: list[DeltaTestLocation], send_snapshot: bool) -> str:
185186
return "\n\n".join(parts)
186187

187188

189+
def _build_sql_round3(
190+
locations: list[DeltaTestLocation],
191+
delta1_location: DeltaTestLocation,
192+
delta2_location: DeltaTestLocation,
193+
delta3_location: DeltaTestLocation,
194+
) -> str:
195+
"""Render SQL for round 3.
196+
197+
The original connectors remain unchanged with ``send_snapshot=true``.
198+
The new view forces bootstrapping. ``delta2`` and ``delta3`` are added
199+
to an existing view with snapshot enabled and disabled, respectively.
200+
"""
201+
parts = [
202+
"CREATE TABLE t1(id INT NOT NULL, n BIGINT NOT NULL, s VARCHAR NOT NULL)\n"
203+
" WITH ('materialized' = 'true');",
204+
]
205+
for (label, view, index, indexes_sql), loc in zip(_VIEWS, locations):
206+
connectors = [
207+
_delta_connector(loc, send_snapshot=True, index=index),
208+
]
209+
if label == "v_plain":
210+
connectors.extend(
211+
[
212+
_delta_connector(
213+
delta2_location,
214+
send_snapshot=True,
215+
name="delta2",
216+
),
217+
_delta_connector(
218+
delta3_location,
219+
send_snapshot=False,
220+
name="delta3",
221+
),
222+
]
223+
)
224+
parts.append(
225+
f"CREATE MATERIALIZED VIEW {view} WITH (\n"
226+
f" 'connectors' = '{json.dumps(connectors)}'\n"
227+
f") AS SELECT * FROM t1;"
228+
)
229+
if indexes_sql:
230+
parts.append(indexes_sql)
231+
232+
delta1 = _delta_connector(delta1_location, send_snapshot=False, name="delta1")
233+
parts.append(
234+
"CREATE MATERIALIZED VIEW v_added WITH (\n"
235+
f" 'connectors' = '{json.dumps([delta1])}'\n"
236+
") AS SELECT * FROM t1;"
237+
)
238+
return "\n\n".join(parts)
239+
240+
188241
@skip_on_arm64 # https://github.com/delta-io/delta-rs/issues/4413
189242
def test_delta_output_send_snapshot_after_flag_flip():
190243
"""Verify snapshot delivery to delta sinks across a connector
@@ -215,13 +268,23 @@ def test_delta_output_send_snapshot_after_flag_flip():
215268
proves the initial snapshot delivered the full materialized-view
216269
contents — for every combination of indexes and which index the
217270
connector reads through.
271+
272+
Round 3 adds a new view with a new Delta connector (``delta1``) and
273+
adds two Delta connectors to an existing view: ``delta2`` with
274+
``send_snapshot=true`` and ``delta3`` with ``send_snapshot=false``.
275+
Adding the view forces actual bootstrapping. ``delta1`` and
276+
``delta2`` should be populated during startup, while ``delta3``
277+
should remain empty until future deltas arrive.
218278
"""
219279
pipeline_name = unique_pipeline_name(
220280
"test_delta_output_send_snapshot_after_flag_flip"
221281
)
222282
locations = [
223283
DeltaTestLocation.create(f"{pipeline_name}_{label}") for label, *_ in _VIEWS
224284
]
285+
delta1_location = DeltaTestLocation.create(f"{pipeline_name}_delta1")
286+
delta2_location = DeltaTestLocation.create(f"{pipeline_name}_delta2")
287+
delta3_location = DeltaTestLocation.create(f"{pipeline_name}_delta3")
225288
expected = sorted_rows(_ROWS)
226289

227290
# Round 1: send_snapshot=false. Push three rows; they land via the
@@ -272,3 +335,40 @@ def test_delta_output_send_snapshot_after_flag_flip():
272335

273336
for (_, *_), loc in zip(_VIEWS, locations):
274337
assert sorted_rows(loc.read_rows()) == expected
338+
339+
pipeline.checkpoint(wait=True)
340+
pipeline.stop(force=True)
341+
342+
# Round 3: add a new view and new output connectors to an existing view.
343+
# The new view forces bootstrapping. The connector on the new view and
344+
# the existing-view connector with send_snapshot=true are populated during
345+
# startup; the existing-view connector with send_snapshot=false is not.
346+
TEST_CLIENT.patch_pipeline(
347+
name=pipeline_name,
348+
sql=_build_sql_round3(
349+
locations,
350+
delta1_location,
351+
delta2_location,
352+
delta3_location,
353+
),
354+
runtime_config=RuntimeConfig(
355+
workers=FELDERA_TEST_NUM_WORKERS,
356+
storage=Storage(config={"start_from_checkpoint": "latest"}),
357+
).to_dict(),
358+
)
359+
pipeline = Pipeline.get(pipeline_name, TEST_CLIENT)
360+
pipeline.start(bootstrap_policy=BootstrapPolicy.ALLOW)
361+
362+
wait_for_condition(
363+
"round 3: delta1 on new bootstrapped view receives rows",
364+
lambda: sorted_rows(delta1_location.read_rows()) == expected,
365+
timeout_s=60.0,
366+
poll_interval_s=1.0,
367+
)
368+
wait_for_condition(
369+
"round 3: delta2 on existing view receives snapshot rows",
370+
lambda: sorted_rows(delta2_location.read_rows()) == expected,
371+
timeout_s=60.0,
372+
poll_interval_s=1.0,
373+
)
374+
assert delta3_location.read_rows() == []

0 commit comments

Comments
 (0)