Skip to content

Commit 2b994fc

Browse files
Igor Smolyarigorscs
authored andcommitted
[python tests] make bootstrap reject->allow recovery deterministic
After start(reject), the pipeline can be in a valid Stopped state with deployment_error set. Because start/stop are asynchronous, calling start(allow) immediately from that state is racy and can fail. This test now makes the sequence deterministic: - wait for Stopped after expected reject failure - issue a non-blocking start(allow) and wait until start is actually observed and deployment_error is cleared - stop again and wait for Stopped to return to a clean baseline - continue with the normal start(allow) assertions
1 parent 4bb308d commit 2b994fc

File tree

1 file changed

+45
-1
lines changed

1 file changed

+45
-1
lines changed

python/tests/platform/test_bootstrapping.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
from feldera.enums import BootstrapPolicy, PipelineStatus
1+
import time
2+
from feldera.enums import BootstrapPolicy, PipelineFieldSelector, PipelineStatus
23
from feldera.pipeline_builder import PipelineBuilder
34
from feldera.runtime_config import RuntimeConfig
45
from tests import TEST_CLIENT, enterprise_only
56
from .helper import (
67
gen_pipeline_name,
8+
wait_for_deployment_status,
79
)
810
from feldera.testutils import FELDERA_TEST_NUM_WORKERS, FELDERA_TEST_NUM_HOSTS
911

@@ -47,6 +49,36 @@ def test_bootstrap_enterprise(pipeline_name):
4749
"""
4850
pipeline.modify(sql=sql)
4951

52+
def wait_for_start_observed_and_error_cleared(timeout_s: float = 30.0):
53+
"""
54+
After issuing start(wait=False), wait until backend actually picks up the
55+
start request and clears stale deployment_error from the previous failed start.
56+
"""
57+
print(
58+
f"Waiting up to {timeout_s} seconds for start transition to be observed and deployment_error to clear"
59+
)
60+
start = time.time()
61+
deadline = start + timeout_s
62+
last = None
63+
while time.time() < deadline:
64+
p = pipeline.client.get_pipeline(pipeline.name, PipelineFieldSelector.STATUS)
65+
status = p.deployment_status
66+
desired = p.deployment_desired_status
67+
error_msg = (p.deployment_error or {}).get("message", "")
68+
snap = (status, desired, error_msg)
69+
if snap != last:
70+
print(
71+
f"After {time.time() - start:.1f} seconds: status={status} desired={desired} error={error_msg!r}"
72+
)
73+
last = snap
74+
if status != "Stopped" and error_msg == "":
75+
return
76+
time.sleep(0.1)
77+
raise TimeoutError(
78+
"Timed out waiting for start transition and deployment_error clearing "
79+
f"(last={last})"
80+
)
81+
5082
try:
5183
pipeline.start(bootstrap_policy=BootstrapPolicy.REJECT)
5284
# If we reach here, the pipeline started successfully when it should have failed
@@ -55,6 +87,18 @@ def test_bootstrap_enterprise(pipeline_name):
5587
)
5688
except Exception as e:
5789
print(f"Expected exception caught: {e}")
90+
# Reject triggers async stopping.
91+
# This only guarantees deployment_status is Stopped
92+
wait_for_deployment_status(pipeline_name, "Stopped", 30)
93+
94+
# Kick one non-blocking ALLOW start so backend transitions out of stale
95+
# rejected-stop state and clears deployment_error on provisioning transition.
96+
print("Issuing non-blocking allow start to clear stale startup error state")
97+
pipeline.start(bootstrap_policy=BootstrapPolicy.ALLOW, wait=False)
98+
wait_for_start_observed_and_error_cleared(60)
99+
print("Stopping temporary run to return to clean stopped baseline")
100+
pipeline.stop(force=True)
101+
wait_for_deployment_status(pipeline_name, "Stopped", 30)
58102
pass
59103

60104
print("Starting pipeline with bootstrap_policy='allow'")

0 commit comments

Comments
 (0)