|
13 | 13 | from __future__ import annotations |
14 | 14 |
|
15 | 15 | import json |
16 | | -import time |
17 | | -import pytest |
18 | | -import requests |
19 | | -import logging |
20 | | -from typing import Any, Dict, Iterable |
| 16 | +import os |
21 | 17 | from http import HTTPStatus |
| 18 | +from typing import Any, Dict, Iterable |
22 | 19 | from urllib.parse import quote, quote_plus |
23 | 20 |
|
| 21 | +import pytest |
| 22 | +import requests |
| 23 | + |
| 24 | +from feldera.testutils import FELDERA_TEST_NUM_HOSTS, FELDERA_TEST_NUM_WORKERS |
24 | 25 | from feldera.testutils_oidc import get_oidc_test_helper |
25 | 26 | from tests import ( |
26 | | - FELDERA_REQUESTS_VERIFY, |
27 | 27 | API_KEY, |
28 | 28 | BASE_URL, |
| 29 | + FELDERA_REQUESTS_VERIFY, |
29 | 30 | TEST_CLIENT, |
30 | 31 | unique_pipeline_name, |
31 | 32 | ) |
32 | | -from feldera.testutils import FELDERA_TEST_NUM_WORKERS, FELDERA_TEST_NUM_HOSTS |
| 33 | +from tests.utils import wait_for_condition |
33 | 34 |
|
34 | 35 | API_PREFIX = "/v0" |
35 | 36 |
|
@@ -122,18 +123,24 @@ def delete(path: str, **kw) -> requests.Response: |
122 | 123 |
|
123 | 124 |
|
124 | 125 | def create_pipeline(name: str, sql: str): |
125 | | - r = post_json( |
126 | | - api_url("/pipelines"), |
127 | | - { |
128 | | - "name": name, |
129 | | - "program_code": sql, |
130 | | - "runtime_config": { |
131 | | - "workers": FELDERA_TEST_NUM_WORKERS, |
132 | | - "hosts": FELDERA_TEST_NUM_HOSTS, |
133 | | - "logging": "debug", |
134 | | - }, |
| 126 | + payload: Dict[str, Any] = { |
| 127 | + "name": name, |
| 128 | + "program_code": sql, |
| 129 | + "runtime_config": { |
| 130 | + "workers": FELDERA_TEST_NUM_WORKERS, |
| 131 | + "hosts": FELDERA_TEST_NUM_HOSTS, |
| 132 | + "logging": "debug", |
135 | 133 | }, |
136 | | - ) |
| 134 | + } |
| 135 | + # Mirror `PipelineBuilder`: in CI `FELDERA_RUNTIME_VERSION` pins the runtime |
| 136 | + # in theory this isn't needed because all platform tests should NOT run with |
| 137 | + # a runtime version set and a runtime test should NOT use this function and |
| 138 | + # use PipelineBuilder, but it avoids a footgun in case a runtime test were to |
| 139 | + # ever use this helper by accident |
| 140 | + runtime_version = os.environ.get("FELDERA_RUNTIME_VERSION") |
| 141 | + if runtime_version: |
| 142 | + payload["program_config"] = {"runtime_version": runtime_version} |
| 143 | + r = post_json(api_url("/pipelines"), payload) |
137 | 144 | assert r.status_code == HTTPStatus.CREATED, r.text |
138 | 145 | wait_for_program_success(name, 1) |
139 | 146 |
|
@@ -244,59 +251,6 @@ def wait_for_program_success( |
244 | 251 | ) |
245 | 252 |
|
246 | 253 |
|
247 | | -def wait_for_condition( |
248 | | - description: str, |
249 | | - predicate_func, |
250 | | - timeout_s: float | None, |
251 | | - poll_interval_s: float, |
252 | | -) -> None: |
253 | | - """ |
254 | | - Waits until the condition is met by regularly checking if `predicate_func()` returns `True`. |
255 | | -
|
256 | | - :param description: Human-readable description used in timeout/errors. |
257 | | - :param predicate_func: Callable function (taking `this` pipeline as argument) returning `True` when condition is met. |
258 | | - :param timeout_s: Maximum wait time in seconds. `None` means there is no timeout. |
259 | | - :param poll_interval_s: Poll interval in seconds. The timeout is enforced at this granularity. |
260 | | -
|
261 | | - :raises TimeoutError: If the condition is not met within the specified timeout. |
262 | | - """ |
263 | | - # Polling interval should not exceed the timeout |
264 | | - if timeout_s is not None and poll_interval_s > timeout_s: |
265 | | - raise ValueError( |
266 | | - f"poll interval ({poll_interval_s}s) cannot be larger than" |
267 | | - f" timeout ({timeout_s}s)" |
268 | | - ) |
269 | | - |
270 | | - # Waiting constant variables |
271 | | - timestamp_start_s = time.monotonic() |
272 | | - timestamp_deadline_s = timestamp_start_s + timeout_s |
273 | | - if timeout_s is None: |
274 | | - timeout_info = "no timeout is enforced" |
275 | | - else: |
276 | | - timeout_info = f"timeout: {timeout_s:.1}s" |
277 | | - |
278 | | - # Waiting loop: exits either if the predicate function returns a value which evaluates to `True` |
279 | | - # or a timeout (if enforced) -- whichever occurs first. |
280 | | - attempt = 0 |
281 | | - while True: |
282 | | - timestamp_now_s = time.monotonic() |
283 | | - if timestamp_now_s > timestamp_deadline_s: |
284 | | - raise TimeoutError( |
285 | | - f"timeout ({timeout_s:.1}s) waiting for condition '{description}'" |
286 | | - ) |
287 | | - attempt += 1 |
288 | | - if predicate_func(): |
289 | | - logging.debug( |
290 | | - f"condition '{description}' has been met after {timestamp_now_s - timestamp_start_s:.1}s" |
291 | | - ) |
292 | | - return |
293 | | - else: |
294 | | - logging.debug( |
295 | | - f"condition '{description}' is not yet met (attempt: {attempt}); {timestamp_now_s - timestamp_start_s:.1}s have passed ({timeout_info})" |
296 | | - ) |
297 | | - time.sleep(poll_interval_s) |
298 | | - |
299 | | - |
300 | 254 | def extract_object_by_name( |
301 | 255 | collection: Iterable[Dict[str, Any]], name: str |
302 | 256 | ) -> Dict[str, Any]: |
|
0 commit comments