Skip to content

Commit 3e996db

Browse files
Igor Smolyarigorscs
authored andcommitted
[python] move stale start-error polling from test into client start()
Handle the start(reject) + diff -> start(allow) flow in the feldera python client instead of test helpers. Pipeline.start() in feldera_client now supports ignore_deployment_error for expected stale deployment errors from a previous failed start, and the wait is bounded with a default timeout to avoid infinite polling. This change removes additional start/stop manipulations in start(reject) exception in the test to clear the deployment error. The additional start/stop manipulations affected later start(await_approval) that started the pipeline to running state because the pipeline was cleared.
1 parent 4f4d96a commit 3e996db

3 files changed

Lines changed: 110 additions & 104 deletions

File tree

python/feldera/pipeline.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ def start(
475475
bootstrap_policy: Optional[BootstrapPolicy] = None,
476476
wait: bool = True,
477477
timeout_s: Optional[float] = None,
478+
ignore_deployment_error: bool = False,
478479
):
479480
"""
480481
.. _start:
@@ -489,12 +490,18 @@ def start(
489490
:param timeout_s: The maximum time (in seconds) to wait for the
490491
pipeline to start.
491492
:param wait: Set True to wait for the pipeline to start. True by default
493+
:param ignore_deployment_error: Set True to ignore deployment errors while waiting
494+
for START transition. False by default.
492495
493496
:raises RuntimeError: If the pipeline is not in STOPPED state.
494497
"""
495498

496499
self.client.start_pipeline(
497-
self.name, bootstrap_policy=bootstrap_policy, wait=wait, timeout_s=timeout_s
500+
self.name,
501+
bootstrap_policy=bootstrap_policy,
502+
wait=wait,
503+
timeout_s=timeout_s,
504+
ignore_deployment_error=ignore_deployment_error,
498505
)
499506

500507
def start_paused(

python/feldera/rest/feldera_client.py

Lines changed: 93 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -232,15 +232,17 @@ def __wait_for_pipeline_state(
232232
)
233233
time.sleep(0.1)
234234

235-
def __wait_for_pipeline_state_one_of(
236-
self,
237-
pipeline_name: str,
238-
states: list[str],
239-
timeout_s: float | None = None,
240-
start: bool = True,
241-
) -> PipelineStatus:
242-
start_time = time.monotonic()
243-
states = [state.lower() for state in states]
235+
def __wait_for_pipeline_state_one_of(
236+
self,
237+
pipeline_name: str,
238+
states: list[str],
239+
timeout_s: float | None = None,
240+
start: bool = True,
241+
ignore_deployment_error: bool = False,
242+
) -> PipelineStatus:
243+
start_time = time.monotonic()
244+
poll_interval_s = 0.1
245+
states = [state.lower() for state in states]
244246

245247
while True:
246248
if timeout_s is not None:
@@ -256,21 +258,29 @@ def __wait_for_pipeline_state_one_of(
256258

257259
if status.lower() in states:
258260
return PipelineStatus.from_str(status)
259-
elif (
260-
status == "Stopped"
261-
and len(resp.deployment_error or {}) > 0
262-
and resp.deployment_desired_status == "Stopped"
263-
):
264-
err_msg = "Unable to START the pipeline:\n" if start else ""
265-
raise RuntimeError(
266-
f"""{err_msg}Unable to transition the pipeline to one of the states: {states}.
267-
Reason: The pipeline is in a STOPPED state due to the following error:
268-
{resp.deployment_error.get("message", "")}"""
261+
elif (
262+
status == "Stopped"
263+
and len(resp.deployment_error or {}) > 0
264+
and resp.deployment_desired_status == "Stopped"
265+
):
266+
if ignore_deployment_error:
267+
logging.debug(
268+
"ignoring stopped deployment error while waiting for %s to start: %s",
269+
pipeline_name,
270+
(resp.deployment_error or {}).get("message", ""),
271+
)
272+
time.sleep(poll_interval_s)
273+
continue
274+
err_msg = "Unable to START the pipeline:\n" if start else ""
275+
raise RuntimeError(
276+
f"""{err_msg}Unable to transition the pipeline to one of the states: {states}.
277+
Reason: The pipeline is in a STOPPED state due to the following error:
278+
{resp.deployment_error.get("message", "")}"""
269279
)
270-
logging.debug(
271-
"still starting %s, waiting for 100 more milliseconds", pipeline_name
272-
)
273-
time.sleep(0.1)
280+
logging.debug(
281+
"still starting %s, waiting for 100 more milliseconds", pipeline_name
282+
)
283+
time.sleep(poll_interval_s)
274284

275285
def create_pipeline(self, pipeline: Pipeline, wait: bool = True) -> Pipeline:
276286
"""
@@ -437,22 +447,26 @@ def activate_pipeline(
437447
pipeline_name, ["running", "AwaitingApproval"], timeout_s
438448
)
439449

440-
def _inner_start_pipeline(
441-
self,
442-
pipeline_name: str,
443-
initial: str = "running",
444-
bootstrap_policy: Optional[BootstrapPolicy] = None,
445-
wait: bool = True,
446-
timeout_s: Optional[float] = None,
447-
) -> Optional[PipelineStatus]:
450+
def _inner_start_pipeline(
451+
self,
452+
pipeline_name: str,
453+
initial: str = "running",
454+
bootstrap_policy: Optional[BootstrapPolicy] = None,
455+
wait: bool = True,
456+
timeout_s: Optional[float] = None,
457+
ignore_deployment_error: bool = False,
458+
) -> Optional[PipelineStatus]:
448459
"""
449460
450461
:param pipeline_name: The name of the pipeline to start
451462
:param initial: The initial state to start the pipeline in. "running"
452463
by default.
453-
:param wait: Set True to wait for the pipeline to start. True by default
454-
:param timeout_s: The amount of time in seconds to wait for the
455-
pipeline to start.
464+
:param wait: Set True to wait for the pipeline to start. True by default
465+
:param timeout_s: The amount of time in seconds to wait for the
466+
pipeline to start.
467+
:param ignore_deployment_error: Set True to ignore deployment errors while waiting for
468+
START transition. If `timeout_s` is not provided, a default timeout is
469+
used to avoid indefinite waiting.
456470
"""
457471

458472
params = {"initial": initial}
@@ -464,32 +478,54 @@ def _inner_start_pipeline(
464478
params=params,
465479
)
466480

467-
if not wait:
468-
return None
469-
470-
return self.__wait_for_pipeline_state_one_of(
471-
pipeline_name, [initial, "AwaitingApproval"], timeout_s
472-
)
473-
474-
def start_pipeline(
475-
self,
476-
pipeline_name: str,
477-
bootstrap_policy: Optional[BootstrapPolicy] = None,
478-
wait: bool = True,
479-
timeout_s: Optional[float] = None,
480-
) -> Optional[PipelineStatus]:
481+
if not wait:
482+
return None
483+
484+
effective_timeout_s = timeout_s
485+
if ignore_deployment_error and effective_timeout_s is None:
486+
# ignore_deployment_error can intentionally skip the stopped+error failure guard.
487+
# Bound the wait so callers don't get an unbounded polling loop.
488+
effective_timeout_s = 60.0
489+
logging.warning(
490+
"start_pipeline(ignore_deployment_error=True) called without timeout; "
491+
"defaulting to %.0fs",
492+
effective_timeout_s,
493+
)
494+
495+
return self.__wait_for_pipeline_state_one_of(
496+
pipeline_name,
497+
[initial, "AwaitingApproval"],
498+
effective_timeout_s,
499+
ignore_deployment_error=ignore_deployment_error,
500+
)
501+
502+
def start_pipeline(
503+
self,
504+
pipeline_name: str,
505+
bootstrap_policy: Optional[BootstrapPolicy] = None,
506+
wait: bool = True,
507+
timeout_s: Optional[float] = None,
508+
ignore_deployment_error: bool = False,
509+
) -> Optional[PipelineStatus]:
481510
"""
482511
483512
:param pipeline_name: The name of the pipeline to start
484-
:param wait: Set True to wait for the pipeline to start.
485-
True by default
486-
:param timeout_s: The amount of time in seconds to wait for the
487-
pipeline to start.
488-
"""
489-
490-
return self._inner_start_pipeline(
491-
pipeline_name, "running", bootstrap_policy, wait, timeout_s
492-
)
513+
:param wait: Set True to wait for the pipeline to start.
514+
True by default
515+
:param timeout_s: The amount of time in seconds to wait for the
516+
pipeline to start.
517+
:param ignore_deployment_error: Set True to ignore deployment errors while waiting
518+
for START transition.
519+
"""
520+
521+
return self._inner_start_pipeline(
522+
pipeline_name,
523+
"running",
524+
bootstrap_policy,
525+
wait,
526+
timeout_s,
527+
ignore_deployment_error,
528+
)
493529

494530
def start_pipeline_as_paused(
495531
self,

python/tests/platform/test_bootstrapping.py

Lines changed: 9 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
import time
2-
from feldera.enums import BootstrapPolicy, PipelineFieldSelector, PipelineStatus
1+
from feldera.enums import BootstrapPolicy, PipelineStatus
32
from feldera.pipeline_builder import PipelineBuilder
43
from feldera.runtime_config import RuntimeConfig
54
from tests import TEST_CLIENT, enterprise_only
@@ -49,38 +48,6 @@ def test_bootstrap_enterprise(pipeline_name):
4948
"""
5049
pipeline.modify(sql=sql)
5150

52-
def wait_for_start_observed_and_error_cleared(timeout_s: float = 30.0):
53-
"""
54-
After issuing start(wait=False), wait until backend actually picks up the
55-
start request and clears stale deployment_error from the previous failed start.
56-
"""
57-
print(
58-
f"Waiting up to {timeout_s} seconds for start transition to be observed and deployment_error to clear"
59-
)
60-
start = time.time()
61-
deadline = start + timeout_s
62-
last = None
63-
while time.time() < deadline:
64-
p = pipeline.client.get_pipeline(
65-
pipeline.name, PipelineFieldSelector.STATUS
66-
)
67-
status = p.deployment_status
68-
desired = p.deployment_desired_status
69-
error_msg = (p.deployment_error or {}).get("message", "")
70-
snap = (status, desired, error_msg)
71-
if snap != last:
72-
print(
73-
f"After {time.time() - start:.1f} seconds: status={status} desired={desired} error={error_msg!r}"
74-
)
75-
last = snap
76-
if status != "Stopped" and error_msg == "":
77-
return
78-
time.sleep(0.1)
79-
raise TimeoutError(
80-
"Timed out waiting for start transition and deployment_error clearing "
81-
f"(last={last})"
82-
)
83-
8451
try:
8552
pipeline.start(bootstrap_policy=BootstrapPolicy.REJECT)
8653
# If we reach here, the pipeline started successfully when it should have failed
@@ -92,19 +59,15 @@ def wait_for_start_observed_and_error_cleared(timeout_s: float = 30.0):
9259
# Reject triggers async stopping.
9360
# This only guarantees deployment_status is Stopped
9461
wait_for_deployment_status(pipeline_name, "Stopped", 30)
95-
96-
# Kick one non-blocking ALLOW start so backend transitions out of stale
97-
# rejected-stop state and clears deployment_error on provisioning transition.
98-
print("Issuing non-blocking allow start to clear stale startup error state")
99-
pipeline.start(bootstrap_policy=BootstrapPolicy.ALLOW, wait=False)
100-
wait_for_start_observed_and_error_cleared(60)
101-
print("Stopping temporary run to return to clean stopped baseline")
102-
pipeline.stop(force=True)
103-
wait_for_deployment_status(pipeline_name, "Stopped", 30)
10462
pass
10563

106-
print("Starting pipeline with bootstrap_policy='allow'")
107-
pipeline.start(bootstrap_policy=BootstrapPolicy.ALLOW)
64+
print(
65+
"Starting pipeline with bootstrap_policy='allow' and ignoring expected deployment error from previous reject start"
66+
)
67+
pipeline.start(
68+
bootstrap_policy=BootstrapPolicy.ALLOW,
69+
ignore_deployment_error=True,
70+
)
10871
assert pipeline.status() == PipelineStatus.RUNNING
10972

11073
pipeline.execute("INSERT INTO t1 VALUES (4), (5), (6);")
@@ -116,10 +79,10 @@ def wait_for_start_observed_and_error_cleared(timeout_s: float = 30.0):
11679
assert result == [{"c": 6}]
11780

11881
pipeline.stop(force=True)
82+
wait_for_deployment_status(pipeline_name, "Stopped", 30)
11983

12084
# Since we didn't make a checkpoint during the previous run, the pipeline should be in the AWAITINGAPPROVAL state.
12185
print("Starting pipeline with bootstrap_policy='await_approval'")
122-
12386
pipeline.start(bootstrap_policy=BootstrapPolicy.AWAIT_APPROVAL)
12487
assert pipeline.status() == PipelineStatus.AWAITINGAPPROVAL
12588

0 commit comments

Comments
 (0)