Skip to content

Commit 14c84f4

Browse files
committed
python: keep runtime tests off the platform-test helper
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
1 parent 0c540f0 commit 14c84f4

3 files changed

Lines changed: 83 additions & 85 deletions

File tree

python/tests/platform/helper.py

Lines changed: 25 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,24 @@
1313
from __future__ import annotations
1414

1515
import json
16-
import time
17-
import pytest
18-
import requests
19-
import logging
20-
from typing import Any, Dict, Iterable
16+
import os
2117
from http import HTTPStatus
18+
from typing import Any, Dict, Iterable
2219
from urllib.parse import quote, quote_plus
2320

21+
import pytest
22+
import requests
23+
24+
from feldera.testutils import FELDERA_TEST_NUM_HOSTS, FELDERA_TEST_NUM_WORKERS
2425
from feldera.testutils_oidc import get_oidc_test_helper
2526
from tests import (
26-
FELDERA_REQUESTS_VERIFY,
2727
API_KEY,
2828
BASE_URL,
29+
FELDERA_REQUESTS_VERIFY,
2930
TEST_CLIENT,
3031
unique_pipeline_name,
3132
)
32-
from feldera.testutils import FELDERA_TEST_NUM_WORKERS, FELDERA_TEST_NUM_HOSTS
33+
from tests.utils import wait_for_condition
3334

3435
API_PREFIX = "/v0"
3536

@@ -122,18 +123,24 @@ def delete(path: str, **kw) -> requests.Response:
122123

123124

124125
def create_pipeline(name: str, sql: str):
125-
r = post_json(
126-
api_url("/pipelines"),
127-
{
128-
"name": name,
129-
"program_code": sql,
130-
"runtime_config": {
131-
"workers": FELDERA_TEST_NUM_WORKERS,
132-
"hosts": FELDERA_TEST_NUM_HOSTS,
133-
"logging": "debug",
134-
},
126+
payload: Dict[str, Any] = {
127+
"name": name,
128+
"program_code": sql,
129+
"runtime_config": {
130+
"workers": FELDERA_TEST_NUM_WORKERS,
131+
"hosts": FELDERA_TEST_NUM_HOSTS,
132+
"logging": "debug",
135133
},
136-
)
134+
}
135+
# Mirror `PipelineBuilder`: in CI `FELDERA_RUNTIME_VERSION` pins the runtime
136+
# in theory this isn't needed because all platform tests should NOT run with
137+
# a runtime version set and a runtime test should NOT use this function and
138+
# use PipelineBuilder, but it avoids a footgun in case a runtime test were to
139+
# ever use this helper by accident
140+
runtime_version = os.environ.get("FELDERA_RUNTIME_VERSION")
141+
if runtime_version:
142+
payload["program_config"] = {"runtime_version": runtime_version}
143+
r = post_json(api_url("/pipelines"), payload)
137144
assert r.status_code == HTTPStatus.CREATED, r.text
138145
wait_for_program_success(name, 1)
139146

@@ -244,59 +251,6 @@ def wait_for_program_success(
244251
)
245252

246253

247-
def wait_for_condition(
248-
description: str,
249-
predicate_func,
250-
timeout_s: float | None,
251-
poll_interval_s: float,
252-
) -> None:
253-
"""
254-
Waits until the condition is met by regularly checking if `predicate_func()` returns `True`.
255-
256-
:param description: Human-readable description used in timeout/errors.
257-
:param predicate_func: Callable function (taking `this` pipeline as argument) returning `True` when condition is met.
258-
:param timeout_s: Maximum wait time in seconds. `None` means there is no timeout.
259-
:param poll_interval_s: Poll interval in seconds. The timeout is enforced at this granularity.
260-
261-
:raises TimeoutError: If the condition is not met within the specified timeout.
262-
"""
263-
# Polling interval should not exceed the timeout
264-
if timeout_s is not None and poll_interval_s > timeout_s:
265-
raise ValueError(
266-
f"poll interval ({poll_interval_s}s) cannot be larger than"
267-
f" timeout ({timeout_s}s)"
268-
)
269-
270-
# Waiting constant variables
271-
timestamp_start_s = time.monotonic()
272-
timestamp_deadline_s = timestamp_start_s + timeout_s
273-
if timeout_s is None:
274-
timeout_info = "no timeout is enforced"
275-
else:
276-
timeout_info = f"timeout: {timeout_s:.1}s"
277-
278-
# Waiting loop: exits either if the predicate function returns a value which evaluates to `True`
279-
# or a timeout (if enforced) -- whichever occurs first.
280-
attempt = 0
281-
while True:
282-
timestamp_now_s = time.monotonic()
283-
if timestamp_now_s > timestamp_deadline_s:
284-
raise TimeoutError(
285-
f"timeout ({timeout_s:.1}s) waiting for condition '{description}'"
286-
)
287-
attempt += 1
288-
if predicate_func():
289-
logging.debug(
290-
f"condition '{description}' has been met after {timestamp_now_s - timestamp_start_s:.1}s"
291-
)
292-
return
293-
else:
294-
logging.debug(
295-
f"condition '{description}' is not yet met (attempt: {attempt}); {timestamp_now_s - timestamp_start_s:.1}s have passed ({timeout_info})"
296-
)
297-
time.sleep(poll_interval_s)
298-
299-
300254
def extract_object_by_name(
301255
collection: Iterable[Dict[str, Any]], name: str
302256
) -> Dict[str, Any]:

python/tests/runtime/test_output_snapshot.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,8 @@
55
from feldera.pipeline import Pipeline
66
from feldera.runtime_config import RuntimeConfig, Storage
77
from feldera.testutils import FELDERA_TEST_NUM_WORKERS, skip_on_arm64
8-
from tests import TEST_CLIENT
9-
from tests.platform.helper import (
10-
create_pipeline,
11-
gen_pipeline_name,
12-
start_pipeline,
13-
wait_for_condition,
14-
)
15-
from tests.utils import DeltaTestLocation
8+
from tests import TEST_CLIENT, unique_pipeline_name
9+
from tests.utils import DeltaTestLocation, wait_for_condition
1610

1711

1812
def sorted_rows(rows: list[dict]) -> list[dict]:
@@ -46,15 +40,20 @@ def collect_output_chunks(stream, expected_rows: int) -> tuple[list[dict], list[
4640
return chunks, rows
4741

4842

49-
@gen_pipeline_name
50-
def test_egress_send_snapshot(pipeline_name):
43+
def test_egress_send_snapshot():
44+
pipeline_name = unique_pipeline_name("test_egress_send_snapshot")
5145
sql = """
5246
CREATE TABLE t1(id INT) WITH ('materialized' = 'true');
5347
CREATE MATERIALIZED VIEW v1 AS SELECT * FROM t1;
5448
""".strip()
5549

56-
create_pipeline(pipeline_name, sql)
57-
start_pipeline(pipeline_name)
50+
pipeline = PipelineBuilder(
51+
TEST_CLIENT,
52+
name=pipeline_name,
53+
sql=sql,
54+
runtime_config=RuntimeConfig(workers=FELDERA_TEST_NUM_WORKERS),
55+
).create_or_replace()
56+
pipeline.start()
5857

5958
TEST_CLIENT.push_to_pipeline(
6059
pipeline_name,
@@ -187,8 +186,7 @@ def _build_sql(locations: list[DeltaTestLocation], send_snapshot: bool) -> str:
187186

188187

189188
@skip_on_arm64 # https://github.com/delta-io/delta-rs/issues/4413
190-
@gen_pipeline_name
191-
def test_delta_output_send_snapshot_after_flag_flip(pipeline_name):
189+
def test_delta_output_send_snapshot_after_flag_flip():
192190
"""Verify snapshot delivery to delta sinks across a connector
193191
modification (`send_snapshot: false` → `send_snapshot: true`).
194192
@@ -218,6 +216,9 @@ def test_delta_output_send_snapshot_after_flag_flip(pipeline_name):
218216
contents — for every combination of indexes and which index the
219217
connector reads through.
220218
"""
219+
pipeline_name = unique_pipeline_name(
220+
"test_delta_output_send_snapshot_after_flag_flip"
221+
)
221222
locations = [
222223
DeltaTestLocation.create(f"{pipeline_name}_{label}") for label, *_ in _VIEWS
223224
]

python/tests/utils.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import logging
12
import os
23
import pathlib
34
import shutil
45
import tempfile
6+
import time
57
import uuid
68
from dataclasses import dataclass
79
from urllib.parse import urlparse
@@ -248,3 +250,44 @@ def cleanup(self) -> None:
248250
if self.local_dir is not None:
249251
shutil.rmtree(self.local_dir, ignore_errors=True)
250252
self.local_dir = None
253+
254+
255+
def wait_for_condition(
256+
description: str,
257+
predicate_func,
258+
timeout_s: float | None,
259+
poll_interval_s: float,
260+
) -> None:
261+
"""Poll ``predicate_func`` until it returns truthy or the timeout elapses.
262+
263+
:param description: Human-readable description used in timeout/errors.
264+
:param predicate_func: Callable returning ``True`` when condition is met.
265+
:param timeout_s: Maximum wait time in seconds. ``None`` means wait forever.
266+
:param poll_interval_s: Poll interval in seconds.
267+
268+
:raises TimeoutError: If the condition is not met within ``timeout_s``.
269+
"""
270+
if timeout_s is not None and poll_interval_s > timeout_s:
271+
raise ValueError(
272+
f"poll interval ({poll_interval_s}s) cannot be larger than"
273+
f" timeout ({timeout_s}s)"
274+
)
275+
276+
timestamp_start_s = time.monotonic()
277+
timestamp_deadline_s = (
278+
timestamp_start_s + timeout_s if timeout_s is not None else float("inf")
279+
)
280+
attempt = 0
281+
while True:
282+
if time.monotonic() > timestamp_deadline_s:
283+
raise TimeoutError(
284+
f"timeout ({timeout_s:.1f}s) waiting for condition '{description}'"
285+
)
286+
attempt += 1
287+
if predicate_func():
288+
logging.debug(
289+
f"condition '{description}' met after"
290+
f" {time.monotonic() - timestamp_start_s:.1f}s"
291+
)
292+
return
293+
time.sleep(poll_interval_s)

0 commit comments

Comments
 (0)