Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ dev = [
"sphinx-rtd-theme==2.0.0",
"sphinx==7.3.7",
"simplejson==3.20.1",
"confluent-kafka>=2.2.0"
"confluent-kafka>=2.2.0",
"deltalake>=0.18"
]

[tool.pytest.ini_options]
Expand Down
254 changes: 78 additions & 176 deletions python/tests/platform/test_delta_output_restart.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
"""
Delta Lake output connector: restart semantics.

Exercises the interaction between `delta_table_output` `truncate` mode and
pipeline suspend/resume. Covers three scenarios:

1. Clean suspend/resume preserves data written before the checkpoint.
2. Modifying the connector config between suspend and resume re-truncates
the table (the modified connector is treated as new).
3. Modifying the upstream view's schema between suspend and resume also
re-truncates the table.

The tests require enterprise features (checkpoint / suspend) and a local
filesystem shared with the Feldera server. Delta table state is inferred
by replaying `_delta_log/*.json` to find the currently-active `add` actions
and summing the `numRecords` field from each action's `stats`. We do not
scan parquet files directly because `SaveMode::Overwrite` leaves orphaned
parquet files on disk that a naive recursive scan would double-count, and
we want the test to rely only on what the delta log reports.
Exercises the interaction between `delta_table_output` `truncate` mode
and pipeline suspend/resume. Three scenarios:

* `test_clean_resume_preserves_table` — restarting an unchanged pipeline
keeps the data written before the checkpoint.
* `test_modified_connector_re_truncates_on_resume` — changing the
connector config on resume re-truncates the table (the modified
connector is treated as a new incarnation).
* `test_modified_view_re_truncates_on_resume` — changing the view's
schema on resume forces a rebuild from scratch.

The delta table backend toggles automatically via `DeltaTestLocation`:

* Local runs use a `file://` URI under `/tmp`.
* CI runs use the in-cluster MinIO endpoint, so the pipeline pod and the
test runner reach the table over S3.
"""

import json
import shutil
import tempfile
from pathlib import Path
from typing import List

import pytest

from feldera import PipelineBuilder
from feldera.runtime_config import RuntimeConfig
Expand All @@ -34,72 +32,37 @@
)
from tests import TEST_CLIENT, enterprise_only

pytest.importorskip("deltalake")

# ─── delta log reader ──────────────────────────────────────────────────


def _active_add_actions(table_dir: Path) -> List[dict]:
"""Replay the delta log and return the `add` actions currently in effect."""
log_dir = table_dir / "_delta_log"
if not log_dir.exists():
return []
active: dict[str, dict] = {}
for entry in sorted(log_dir.glob("*.json")):
for line in entry.read_text().splitlines():
if not line.strip():
continue
obj = json.loads(line)
if "add" in obj:
active[obj["add"]["path"]] = obj["add"]
elif "remove" in obj:
active.pop(obj["remove"]["path"], None)
return list(active.values())

from tests.utils import DeltaTestLocation # noqa: E402

def delta_row_count(table_dir: Path) -> int:
"""Sum of `numRecords` across all active files, per the delta log stats."""
total = 0
for add in _active_add_actions(table_dir):
stats = add.get("stats")
if not stats:
raise RuntimeError(f"delta add action missing 'stats' field: {add}")
total += json.loads(stats)["numRecords"]
return total

# ─── helpers ───────────────────────────────────────────────────────────

# ─── pipeline helpers ──────────────────────────────────────────────────


def _connector_config(uri: str, extra: dict | None = None) -> str:
config = {"uri": uri, "mode": "truncate"}
def _connector_config(loc: DeltaTestLocation, extra: dict | None = None) -> str:
config = dict(loc.connector_config)
if extra:
config.update(extra)
return json.dumps(
[
{
"transport": {
"name": "delta_table_output",
"config": config,
}
}
]
)
return json.dumps([{"transport": {"name": "delta_table_output", "config": config}}])


def _sql(
loc: DeltaTestLocation,
*,
uri: str,
extra_connector_options: dict | None = None,
extra_view_column: bool = False,
) -> str:
v_body = "SELECT id, 'v1' AS tag FROM t"
if extra_view_column:
v_body = "SELECT id, 'v2' AS tag, id * 10 AS extra FROM t"
v_body = (
"SELECT id, 'v2' AS tag, id * 10 AS extra FROM t"
if extra_view_column
else "SELECT id, 'v1' AS tag FROM t"
)
return (
"CREATE TABLE t (id INT) WITH ('materialized' = 'true');\n"
"CREATE MATERIALIZED VIEW v WITH ("
"'connectors' = '"
+ _connector_config(uri, extra_connector_options)
+ _connector_config(loc, extra_connector_options)
+ "') AS "
+ v_body
+ ";"
Expand All @@ -119,143 +82,82 @@ def _build_pipeline(name: str, sql: str):
).create_or_replace()


def _seed_50_rows_and_suspend(name: str, loc: DeltaTestLocation):
"""Start a fresh pipeline, write 50 rows, checkpoint, and stop."""
pipeline = _build_pipeline(name, _sql(loc))
pipeline.start()
pipeline.input_json("t", [{"id": i} for i in range(50)], wait=True)
pipeline.checkpoint(wait=True)
pipeline.stop(force=False)
return pipeline


# ─── tests ─────────────────────────────────────────────────────────────


@enterprise_only
def test_truncate_preserved_across_clean_suspend_resume():
"""Unchanged pipeline: suspend+resume keeps data in the delta table."""
def test_clean_resume_preserves_table():
"""Restarting an unchanged pipeline keeps the delta table contents."""
name = unique_pipeline_name("delta_restart_clean")
table_dir = Path(tempfile.mkdtemp(prefix="feldera_delta_test_"))
uri = f"file://{table_dir}"

loc = DeltaTestLocation.create(name)
try:
pipeline = _build_pipeline(name, _sql(uri=uri))
pipeline.start()
pipeline.input_json("t", [{"id": i} for i in range(50)], wait=True)
assert delta_row_count(table_dir) == 50
pipeline = _seed_50_rows_and_suspend(name, loc)

pipeline.checkpoint(wait=True)
pipeline.stop(force=False)

# Resume with the SAME config — no truncation
# Resume the SAME pipeline object — connector identity is preserved.
pipeline.start()
assert delta_row_count(table_dir) == 50, (
"clean restart should preserve delta table contents"
)
assert loc.row_count() == 50

pipeline.input_json("t", [{"id": i} for i in range(50, 80)], wait=True)
assert delta_row_count(table_dir) == 80
assert loc.row_count() == 80

pipeline.stop(force=True)
pipeline.clear_storage()
finally:
try:
pipeline.stop(force=True)
pipeline.clear_storage()
except Exception:
pass
shutil.rmtree(table_dir, ignore_errors=True)
loc.cleanup()


@enterprise_only
def test_truncate_reapplied_when_connector_modified():
"""Changing the connector config between suspend and resume must re-truncate."""
name = unique_pipeline_name("delta_restart_conn_changed")
table_dir = Path(tempfile.mkdtemp(prefix="feldera_delta_test_"))
uri = f"file://{table_dir}"

def test_modified_connector_re_truncates_on_resume():
"""Changing the connector config on resume makes it a new incarnation that re-truncates."""
name = unique_pipeline_name("delta_restart_conn_modified")
loc = DeltaTestLocation.create(name)
try:
pipeline = _build_pipeline(name, _sql(uri=uri))
pipeline.start()
pipeline.input_json("t", [{"id": i} for i in range(50)], wait=True)
assert delta_row_count(table_dir) == 50
_seed_50_rows_and_suspend(name, loc)

pipeline.checkpoint(wait=True)
pipeline.stop(force=False)

# Resume with a MODIFIED connector config
# (extra `checkpoint_interval` field).
# Resume with a MODIFIED connector config (extra `checkpoint_interval` field).
pipeline = _build_pipeline(
name,
_sql(uri=uri, extra_connector_options={"checkpoint_interval": 60}),
_sql(loc, extra_connector_options={"checkpoint_interval": 60}),
)
pipeline.start()
assert loc.row_count() == 0 # re-truncated

# The new connector incarnation must truncate the table and must not
# inherit the stale 50-row snapshot that would survive without the
# output_statistics filter. No new data has been fed since resume, so
# the post-restart count is exactly zero.
post_resume = delta_row_count(table_dir)
assert post_resume == 0, (
f"modified connector should truncate on resume with no re-emission; "
f"expected 0 rows, got {post_resume}"
)
assert (table_dir / "_delta_log").exists()

# Feed new data and confirm the new incarnation accepts it with an
# exact row-count delta — proving the connector is live, not stuck.
pipeline.input_json("t", [{"id": i} for i in range(50, 80)], wait=True)
assert delta_row_count(table_dir) == 30
assert loc.row_count() == 30

pipeline.stop(force=True)
pipeline.clear_storage()
finally:
try:
pipeline.stop(force=True)
pipeline.clear_storage()
except Exception:
pass
shutil.rmtree(table_dir, ignore_errors=True)
loc.cleanup()


@enterprise_only
def test_truncate_reapplied_when_view_modified():
"""Changing the view's schema between suspend and resume must re-truncate."""
name = unique_pipeline_name("delta_restart_view_changed")
table_dir = Path(tempfile.mkdtemp(prefix="feldera_delta_test_"))
uri = f"file://{table_dir}"

def test_modified_view_re_truncates_on_resume():
"""Changing the view's schema on resume forces a rebuild from scratch."""
name = unique_pipeline_name("delta_restart_view_modified")
loc = DeltaTestLocation.create(name)
try:
pipeline = _build_pipeline(name, _sql(uri=uri))
pipeline.start()
pipeline.input_json("t", [{"id": i} for i in range(50)], wait=True)
assert delta_row_count(table_dir) == 50

pipeline.checkpoint(wait=True)
pipeline.stop(force=False)
_seed_50_rows_and_suspend(name, loc)

# Resume with a MODIFIED view (adds a column), connector config unchanged.
pipeline = _build_pipeline(name, _sql(uri=uri, extra_view_column=True))
# Resume with a MODIFIED view (adds an `extra` column).
pipeline = _build_pipeline(name, _sql(loc, extra_view_column=True))
pipeline.start()
# A schema change forces the connector to rebuild the table from
# scratch; the old 50-row snapshot must not survive. With no new
# upstream input, the post-resume row count is exactly zero.
post_resume = delta_row_count(table_dir)
assert post_resume == 0, (
f"schema change should reset row count; expected 0, got {post_resume}"
)
assert loc.row_count() == 0 # rebuilt empty

# Confirm the `extra` column is present in the most recent `metaData`
# action of the delta log.
log_dir = table_dir / "_delta_log"
latest_meta = None
for entry in sorted(log_dir.glob("*.json")):
for line in entry.read_text().splitlines():
if not line.strip():
continue
obj = json.loads(line)
if "metaData" in obj:
latest_meta = obj["metaData"]
assert latest_meta is not None, "no metaData action found in delta log"
schema = json.loads(latest_meta["schemaString"])
columns = [f["name"] for f in schema.get("fields", [])]
assert "extra" in columns, (
f"delta log schema after view change missing 'extra' column: {columns}"
)

# Feed new data and verify it lands in the rebuilt table with the new
# schema — exact row count, no leftover or duplicated rows.
pipeline.input_json("t", [{"id": i} for i in range(50, 80)], wait=True)
assert delta_row_count(table_dir) == 30
assert loc.row_count() == 30

pipeline.stop(force=True)
pipeline.clear_storage()
finally:
try:
pipeline.stop(force=True)
pipeline.clear_storage()
except Exception:
pass
shutil.rmtree(table_dir, ignore_errors=True)
loc.cleanup()
Loading
Loading