@@ -120,11 +120,12 @@ def _delta_connector(
120120 location : DeltaTestLocation ,
121121 * ,
122122 send_snapshot : bool ,
123+ name : str = "delta_out" ,
123124 index : str | None = None ,
124125) -> dict :
125126 """Build a `delta_table_output` connector config."""
126127 connector : dict = {
127- "name" : "delta_out" ,
128+ "name" : name ,
128129 "send_snapshot" : send_snapshot ,
129130 "transport" : {
130131 "name" : "delta_table_output" ,
@@ -185,6 +186,58 @@ def _build_sql(locations: list[DeltaTestLocation], send_snapshot: bool) -> str:
185186 return "\n \n " .join (parts )
186187
187188
189+ def _build_sql_round3 (
190+ locations : list [DeltaTestLocation ],
191+ delta1_location : DeltaTestLocation ,
192+ delta2_location : DeltaTestLocation ,
193+ delta3_location : DeltaTestLocation ,
194+ ) -> str :
195+ """Render SQL for round 3.
196+
197+ The original connectors remain unchanged with ``send_snapshot=true``.
198+ The new view forces bootstrapping. ``delta2`` and ``delta3`` are added
199+ to an existing view with snapshot enabled and disabled, respectively.
200+ """
201+ parts = [
202+ "CREATE TABLE t1(id INT NOT NULL, n BIGINT NOT NULL, s VARCHAR NOT NULL)\n "
203+ " WITH ('materialized' = 'true');" ,
204+ ]
205+ for (label , view , index , indexes_sql ), loc in zip (_VIEWS , locations ):
206+ connectors = [
207+ _delta_connector (loc , send_snapshot = True , index = index ),
208+ ]
209+ if label == "v_plain" :
210+ connectors .extend (
211+ [
212+ _delta_connector (
213+ delta2_location ,
214+ send_snapshot = True ,
215+ name = "delta2" ,
216+ ),
217+ _delta_connector (
218+ delta3_location ,
219+ send_snapshot = False ,
220+ name = "delta3" ,
221+ ),
222+ ]
223+ )
224+ parts .append (
225+ f"CREATE MATERIALIZED VIEW { view } WITH (\n "
226+ f" 'connectors' = '{ json .dumps (connectors )} '\n "
227+ f") AS SELECT * FROM t1;"
228+ )
229+ if indexes_sql :
230+ parts .append (indexes_sql )
231+
232+ delta1 = _delta_connector (delta1_location , send_snapshot = False , name = "delta1" )
233+ parts .append (
234+ "CREATE MATERIALIZED VIEW v_added WITH (\n "
235+ f" 'connectors' = '{ json .dumps ([delta1 ])} '\n "
236+ ") AS SELECT * FROM t1;"
237+ )
238+ return "\n \n " .join (parts )
239+
240+
188241@skip_on_arm64 # https://github.com/delta-io/delta-rs/issues/4413
189242def test_delta_output_send_snapshot_after_flag_flip ():
190243 """Verify snapshot delivery to delta sinks across a connector
@@ -215,13 +268,23 @@ def test_delta_output_send_snapshot_after_flag_flip():
215268 proves the initial snapshot delivered the full materialized-view
216269 contents — for every combination of indexes and which index the
217270 connector reads through.
271+
272+ Round 3 adds a new view with a new Delta connector (``delta1``) and
273+ adds two Delta connectors to an existing view: ``delta2`` with
274+ ``send_snapshot=true`` and ``delta3`` with ``send_snapshot=false``.
275+ Adding the view forces actual bootstrapping. ``delta1`` and
276+ ``delta2`` should be populated during startup, while ``delta3``
277+ should remain empty until future deltas arrive.
218278 """
219279 pipeline_name = unique_pipeline_name (
220280 "test_delta_output_send_snapshot_after_flag_flip"
221281 )
222282 locations = [
223283 DeltaTestLocation .create (f"{ pipeline_name } _{ label } " ) for label , * _ in _VIEWS
224284 ]
285+ delta1_location = DeltaTestLocation .create (f"{ pipeline_name } _delta1" )
286+ delta2_location = DeltaTestLocation .create (f"{ pipeline_name } _delta2" )
287+ delta3_location = DeltaTestLocation .create (f"{ pipeline_name } _delta3" )
225288 expected = sorted_rows (_ROWS )
226289
227290 # Round 1: send_snapshot=false. Push three rows; they land via the
@@ -272,3 +335,40 @@ def test_delta_output_send_snapshot_after_flag_flip():
272335
273336 for (_ , * _ ), loc in zip (_VIEWS , locations ):
274337 assert sorted_rows (loc .read_rows ()) == expected
338+
339+ pipeline .checkpoint (wait = True )
340+ pipeline .stop (force = True )
341+
342+ # Round 3: add a new view and new output connectors to an existing view.
343+ # The new view forces bootstrapping. The connector on the new view and
344+ # the existing-view connector with send_snapshot=true are populated during
345+ # startup; the existing-view connector with send_snapshot=false is not.
346+ TEST_CLIENT .patch_pipeline (
347+ name = pipeline_name ,
348+ sql = _build_sql_round3 (
349+ locations ,
350+ delta1_location ,
351+ delta2_location ,
352+ delta3_location ,
353+ ),
354+ runtime_config = RuntimeConfig (
355+ workers = FELDERA_TEST_NUM_WORKERS ,
356+ storage = Storage (config = {"start_from_checkpoint" : "latest" }),
357+ ).to_dict (),
358+ )
359+ pipeline = Pipeline .get (pipeline_name , TEST_CLIENT )
360+ pipeline .start (bootstrap_policy = BootstrapPolicy .ALLOW )
361+
362+ wait_for_condition (
363+ "round 3: delta1 on new bootstrapped view receives rows" ,
364+ lambda : sorted_rows (delta1_location .read_rows ()) == expected ,
365+ timeout_s = 60.0 ,
366+ poll_interval_s = 1.0 ,
367+ )
368+ wait_for_condition (
369+ "round 3: delta2 on existing view receives snapshot rows" ,
370+ lambda : sorted_rows (delta2_location .read_rows ()) == expected ,
371+ timeout_s = 60.0 ,
372+ poll_interval_s = 1.0 ,
373+ )
374+ assert delta3_location .read_rows () == []
0 commit comments