Skip to content

Commit 6ade88a

Browse files
snkasIgor Smolyar
andcommitted
python: minor improvements and documentation
- Improvement: use `time.monotonic()` instead of `time.time()` in timeout mechanisms, as this clock does not go backward even if the system clock is changed - Improvement: use `logger` instead of `logging` such that the module name is included in the log line - Documentation: make clear that specifying `None` as timeout means no timeout is enforced - Documentation: arguments of `start_pipeline_as_paused()` and `start_pipeline_as_standby()` - Documentation: clarify that some parts of REST API are asynchronous - Testing: - Reuse `FelderaClient._wait_for_compilation()` (removed one prefixed underscore in the function name to have it accessible in the tests, but still communicate that the function is for internal use) - Use `FelderaClient` in the main platform test helper - Add generic `wait_for_condition()` that can be used by any test as timeout mechanism, rather than re-implementing it every time Co-authored-by: Igor Smolyar <igor.smolyar@feldera.com> Signed-off-by: Igor Smolyar <igor.smolyar@feldera.com> Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
1 parent b967a24 commit 6ade88a

17 files changed

+396
-380
lines changed

docs.feldera.com/docs/tutorials/rest_api/index.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,13 +200,15 @@ curl -i -X POST http://127.0.0.1:8080/v0/pipelines/supply-chain/start
200200

201201
... which will return `HTTP/1.1 202 Accepted` when successful.
202202

203-
Check that it has successfully started using:
203+
The start action is asynchronous (hence its `202` Accepted response).
204+
As such, it will take some time before the final target state has been reached.
205+
Regularly poll the status until the start has completed:
204206

205207
```
206208
curl -s GET http://127.0.0.1:8080/v0/pipelines/supply-chain | jq '.deployment_status'
207209
```
208210

209-
... which will say 'Running` when the pipeline has started:
211+
... which eventually will say `Running` when the pipeline has started.
210212

211213
> Note: Connectors are only initialized when a pipeline starts to use them.
212214
> A pipeline will not start if a connector is unable to connect to its

python/feldera/pipeline.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,13 @@ def wait_for_status(
8484
:param timeout: Maximum time to wait in seconds. If None, waits forever (default: None)
8585
:raises TimeoutError: If the expected status is not reached within the timeout
8686
"""
87-
start_time = time.time()
88-
87+
start_time = time.monotonic()
8988
while True:
9089
current_status = self.status()
9190
if current_status == expected_status:
9291
return
9392

94-
if timeout is not None and time.time() - start_time >= timeout:
93+
if timeout is not None and time.monotonic() - start_time >= timeout:
9594
raise TimeoutError(
9695
f"Pipeline did not reach {expected_status.name} status within {timeout} seconds"
9796
)
@@ -392,7 +391,7 @@ def wait_for_idle(
392391
have been processed).
393392
394393
:param idle_interval_s: Idle interval duration (default is 5.0 seconds).
395-
:param timeout_s: Timeout waiting for idle (default is 600.0 seconds).
394+
:param timeout_s: Timeout waiting for idle (`None` = no timeout is enforced).
396395
:param poll_interval_s: Polling interval, should be set substantially
397396
smaller than the idle interval (default is 0.2 seconds).
398397
:raises ValueError: If idle interval is larger than timeout, poll interval
@@ -512,6 +511,13 @@ def start_paused(
512511
):
513512
"""
514513
Starts the pipeline in the paused state.
514+
515+
:param bootstrap_policy: The bootstrap policy to use.
516+
:param wait: Set True to wait for the pipeline to start. True by default.
517+
:param timeout_s: The maximum time (in seconds) to wait for the
518+
pipeline to start (defaults to `None` = no timeout is enforced).
519+
:param dismiss_error: Set True to dismiss any deployment error before starting;
520+
set False to make it fail in that case. True by default.
515521
"""
516522

517523
return self.client.start_pipeline_as_paused(
@@ -531,6 +537,13 @@ def start_standby(
531537
):
532538
"""
533539
Starts the pipeline in the standby state.
540+
541+
:param bootstrap_policy: The bootstrap policy to use.
542+
:param wait: Set True to wait for the pipeline to start. True by default.
543+
:param timeout_s: The maximum time (in seconds) to wait for the
544+
pipeline to start (defaults to `None` = no timeout is enforced).
545+
:param dismiss_error: Set True to dismiss any deployment error before starting;
546+
set False to make it fail in that case. True by default.
534547
"""
535548

536549
self.client.start_pipeline_as_standby(
@@ -747,7 +760,7 @@ def checkpoint(self, wait: bool = False, timeout_s: Optional[float] = None) -> i
747760
748761
:param wait: If true, will block until the checkpoint completes.
749762
:param timeout_s: The maximum time (in seconds) to wait for the
750-
checkpoint to complete.
763+
checkpoint to complete (defaults to `None` = no timeout is enforced).
751764
752765
:return: The checkpoint sequence number.
753766

python/feldera/rest/_httprequests.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,21 +71,21 @@ def _wait_for_health_recovery(self, max_wait_seconds: int = 300) -> bool:
7171
Returns:
7272
bool: True if cluster became healthy within timeout, False otherwise
7373
"""
74-
start_time = time.time()
74+
start_time = time.monotonic()
7575
check_interval = 5
7676

7777
logging.info(
7878
f"Waiting for cluster health recovery (max {max_wait_seconds}s)..."
7979
)
8080

81-
while time.time() - start_time < max_wait_seconds:
81+
while time.monotonic() - start_time < max_wait_seconds:
8282
if self._check_cluster_health():
83-
elapsed = time.time() - start_time
83+
elapsed = time.monotonic() - start_time
8484
logging.info(f"Instance health recovered after {elapsed:.1f}s")
8585
return True
8686

8787
time.sleep(check_interval)
88-
elapsed = time.time() - start_time
88+
elapsed = time.monotonic() - start_time
8989
logging.debug(
9090
f"Still waiting for health recovery ({elapsed:.1f}s elapsed)..."
9191
)

python/feldera/rest/feldera_client.py

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
from feldera.rest.feldera_config import FelderaConfig
1717
from feldera.rest.pipeline import Pipeline
1818

19+
logger = logging.getLogger(__name__)
20+
1921

2022
def _validate_no_none_keys_in_map(data):
2123
def validate_no_none_keys(d: Dict[Any, Any]) -> None:
@@ -93,12 +95,12 @@ def __init__(
9395
client_version = determine_client_version()
9496
server_config = self.get_config()
9597
if client_version != server_config.version:
96-
logging.warning(
98+
logger.warning(
9799
f"Feldera client is on version {client_version} while server is at "
98100
f"{server_config.version}. There could be incompatibilities."
99101
)
100102
except Exception as e:
101-
logging.error(f"Failed to connect to Feldera API: {e}")
103+
logger.error(f"Failed to connect to Feldera API: {e}")
102104
raise e
103105

104106
@staticmethod
@@ -153,15 +155,38 @@ def pipelines(
153155

154156
return [Pipeline.from_dict(pipeline) for pipeline in resp]
155157

156-
def __wait_for_compilation(self, name: str):
158+
def _wait_for_compilation(
159+
self,
160+
name: str,
161+
expected_program_version: int | None = None,
162+
timeout_s: float | None = None,
163+
poll_interval_s: float = 1.0,
164+
) -> Pipeline:
165+
"""Wait for pipeline compilation -- internal use only."""
157166
wait = ["Pending", "CompilingSql", "SqlCompiled", "CompilingRust"]
158-
167+
start_time = time.monotonic()
159168
while True:
169+
elapsed = time.monotonic() - start_time
170+
if timeout_s is not None and elapsed > timeout_s:
171+
raise TimeoutError(
172+
f"Timed out waiting for pipeline '{name}' to compile "
173+
f"(expected program_version >= {expected_program_version})"
174+
)
175+
160176
p = self.get_pipeline(name, PipelineFieldSelector.STATUS)
161177
status = p.program_status
162178

163179
if status == "Success":
164-
return self.get_pipeline(name, PipelineFieldSelector.ALL)
180+
if expected_program_version is None:
181+
return self.get_pipeline(name, PipelineFieldSelector.ALL)
182+
183+
current_version = p.program_version or 0
184+
if current_version == expected_program_version:
185+
return self.get_pipeline(name, PipelineFieldSelector.ALL)
186+
else:
187+
raise RuntimeError(
188+
f"program version ({current_version}) != expected program version ({expected_program_version})"
189+
)
165190
elif status not in wait:
166191
p = self.get_pipeline(name, PipelineFieldSelector.ALL)
167192

@@ -189,15 +214,20 @@ def __wait_for_compilation(self, name: str):
189214

190215
raise RuntimeError(error_message)
191216

192-
logging.debug("still compiling %s, waiting for 100 more milliseconds", name)
193-
time.sleep(0.1)
217+
logger.debug(
218+
"still compiling %s, waiting for %.1f more seconds",
219+
name,
220+
poll_interval_s,
221+
)
222+
time.sleep(poll_interval_s)
194223

195224
def __wait_for_pipeline_state(
196225
self,
197226
pipeline_name: str,
198227
state: str,
199228
timeout_s: Optional[float] = None,
200229
start: bool = True,
230+
poll_interval_s: float = 0.5,
201231
):
202232
start_time = time.monotonic()
203233

@@ -227,20 +257,22 @@ def __wait_for_pipeline_state(
227257
{resp.deployment_error.get("message", "")}"""
228258
)
229259

230-
logging.debug(
231-
"still starting %s, waiting for 100 more milliseconds", pipeline_name
260+
logger.debug(
261+
"still starting %s, waiting for %.1f more seconds",
262+
pipeline_name,
263+
poll_interval_s,
232264
)
233-
time.sleep(0.1)
265+
time.sleep(poll_interval_s)
234266

235267
def __wait_for_pipeline_state_one_of(
236268
self,
237269
pipeline_name: str,
238270
states: list[str],
239271
timeout_s: float | None = None,
240272
start: bool = True,
273+
poll_interval_s: float = 0.5,
241274
) -> PipelineStatus:
242275
start_time = time.monotonic()
243-
poll_interval_s = 0.1
244276
states = [state.lower() for state in states]
245277

246278
while True:
@@ -268,8 +300,10 @@ def __wait_for_pipeline_state_one_of(
268300
Reason: The pipeline is in a STOPPED state due to the following error:
269301
{resp.deployment_error.get("message", "")}"""
270302
)
271-
logging.debug(
272-
"still starting %s, waiting for 100 more milliseconds", pipeline_name
303+
logger.debug(
304+
"still starting %s, waiting for %.1f more seconds",
305+
pipeline_name,
306+
poll_interval_s,
273307
)
274308
time.sleep(poll_interval_s)
275309

@@ -299,7 +333,7 @@ def create_pipeline(self, pipeline: Pipeline, wait: bool = True) -> Pipeline:
299333
if not wait:
300334
return pipeline
301335

302-
return self.__wait_for_compilation(pipeline.name)
336+
return self._wait_for_compilation(pipeline.name)
303337

304338
def create_or_update_pipeline(
305339
self, pipeline: Pipeline, wait: bool = True
@@ -331,7 +365,7 @@ def create_or_update_pipeline(
331365
if not wait:
332366
return pipeline
333367

334-
return self.__wait_for_compilation(pipeline.name)
368+
return self._wait_for_compilation(pipeline.name)
335369

336370
def patch_pipeline(
337371
self,
@@ -667,7 +701,7 @@ def stop_pipeline(
667701
if status == "Stopped":
668702
return
669703

670-
logging.debug(
704+
logger.debug(
671705
"still stopping %s, waiting for 100 more milliseconds",
672706
pipeline_name,
673707
)
@@ -1021,7 +1055,7 @@ def wait_for_token(
10211055
break
10221056

10231057
elapsed = time.monotonic() - start
1024-
logging.debug(
1058+
logger.debug(
10251059
f"still waiting for inputs represented by {token} to be processed; elapsed: {elapsed}s"
10261060
)
10271061

python/tests/platform/conftest.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"""
77

88
import pytest
9+
import logging
910

1011

1112
def is_master(config) -> bool:
@@ -15,6 +16,9 @@ def is_master(config) -> bool:
1516

1617
def pytest_configure(config):
1718
"""Configure hook: fetch OIDC token on master node only."""
19+
# Keep SDK debug logs enabled in tests without affecting production defaults.
20+
logging.getLogger("feldera.rest.feldera_client").setLevel(logging.DEBUG)
21+
1822
if is_master(config):
1923
# This runs only on the master node (or in single-node mode)
2024
from feldera.testutils_oidc import setup_token_cache

0 commit comments

Comments
 (0)