Skip to content

Commit e283d90

Browse files
feldera-botigorscs
authored andcommitted
[ci] apply automatic fixes
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
1 parent 3e996db commit e283d90

File tree

1 file changed

+93
-93
lines changed

1 file changed

+93
-93
lines changed

python/feldera/rest/feldera_client.py

Lines changed: 93 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -232,17 +232,17 @@ def __wait_for_pipeline_state(
232232
)
233233
time.sleep(0.1)
234234

235-
def __wait_for_pipeline_state_one_of(
236-
self,
237-
pipeline_name: str,
238-
states: list[str],
239-
timeout_s: float | None = None,
240-
start: bool = True,
241-
ignore_deployment_error: bool = False,
242-
) -> PipelineStatus:
243-
start_time = time.monotonic()
244-
poll_interval_s = 0.1
245-
states = [state.lower() for state in states]
235+
def __wait_for_pipeline_state_one_of(
236+
self,
237+
pipeline_name: str,
238+
states: list[str],
239+
timeout_s: float | None = None,
240+
start: bool = True,
241+
ignore_deployment_error: bool = False,
242+
) -> PipelineStatus:
243+
start_time = time.monotonic()
244+
poll_interval_s = 0.1
245+
states = [state.lower() for state in states]
246246

247247
while True:
248248
if timeout_s is not None:
@@ -258,29 +258,29 @@ def __wait_for_pipeline_state_one_of(
258258

259259
if status.lower() in states:
260260
return PipelineStatus.from_str(status)
261-
elif (
262-
status == "Stopped"
263-
and len(resp.deployment_error or {}) > 0
264-
and resp.deployment_desired_status == "Stopped"
265-
):
266-
if ignore_deployment_error:
267-
logging.debug(
268-
"ignoring stopped deployment error while waiting for %s to start: %s",
269-
pipeline_name,
270-
(resp.deployment_error or {}).get("message", ""),
271-
)
272-
time.sleep(poll_interval_s)
273-
continue
274-
err_msg = "Unable to START the pipeline:\n" if start else ""
275-
raise RuntimeError(
276-
f"""{err_msg}Unable to transition the pipeline to one of the states: {states}.
277-
Reason: The pipeline is in a STOPPED state due to the following error:
278-
{resp.deployment_error.get("message", "")}"""
261+
elif (
262+
status == "Stopped"
263+
and len(resp.deployment_error or {}) > 0
264+
and resp.deployment_desired_status == "Stopped"
265+
):
266+
if ignore_deployment_error:
267+
logging.debug(
268+
"ignoring stopped deployment error while waiting for %s to start: %s",
269+
pipeline_name,
270+
(resp.deployment_error or {}).get("message", ""),
271+
)
272+
time.sleep(poll_interval_s)
273+
continue
274+
err_msg = "Unable to START the pipeline:\n" if start else ""
275+
raise RuntimeError(
276+
f"""{err_msg}Unable to transition the pipeline to one of the states: {states}.
277+
Reason: The pipeline is in a STOPPED state due to the following error:
278+
{resp.deployment_error.get("message", "")}"""
279279
)
280-
logging.debug(
281-
"still starting %s, waiting for 100 more milliseconds", pipeline_name
282-
)
283-
time.sleep(poll_interval_s)
280+
logging.debug(
281+
"still starting %s, waiting for 100 more milliseconds", pipeline_name
282+
)
283+
time.sleep(poll_interval_s)
284284

285285
def create_pipeline(self, pipeline: Pipeline, wait: bool = True) -> Pipeline:
286286
"""
@@ -447,26 +447,26 @@ def activate_pipeline(
447447
pipeline_name, ["running", "AwaitingApproval"], timeout_s
448448
)
449449

450-
def _inner_start_pipeline(
451-
self,
452-
pipeline_name: str,
453-
initial: str = "running",
454-
bootstrap_policy: Optional[BootstrapPolicy] = None,
455-
wait: bool = True,
456-
timeout_s: Optional[float] = None,
457-
ignore_deployment_error: bool = False,
458-
) -> Optional[PipelineStatus]:
450+
def _inner_start_pipeline(
451+
self,
452+
pipeline_name: str,
453+
initial: str = "running",
454+
bootstrap_policy: Optional[BootstrapPolicy] = None,
455+
wait: bool = True,
456+
timeout_s: Optional[float] = None,
457+
ignore_deployment_error: bool = False,
458+
) -> Optional[PipelineStatus]:
459459
"""
460460
461461
:param pipeline_name: The name of the pipeline to start
462462
:param initial: The initial state to start the pipeline in. "running"
463463
by default.
464-
:param wait: Set True to wait for the pipeline to start. True by default
465-
:param timeout_s: The amount of time in seconds to wait for the
466-
pipeline to start.
467-
:param ignore_deployment_error: Set True to ignore deployment errors while waiting for
468-
START transition. If `timeout_s` is not provided, a default timeout is
469-
used to avoid indefinite waiting.
464+
:param wait: Set True to wait for the pipeline to start. True by default
465+
:param timeout_s: The amount of time in seconds to wait for the
466+
pipeline to start.
467+
:param ignore_deployment_error: Set True to ignore deployment errors while waiting for
468+
START transition. If `timeout_s` is not provided, a default timeout is
469+
used to avoid indefinite waiting.
470470
"""
471471

472472
params = {"initial": initial}
@@ -478,54 +478,54 @@ def _inner_start_pipeline(
478478
params=params,
479479
)
480480

481-
if not wait:
482-
return None
483-
484-
effective_timeout_s = timeout_s
485-
if ignore_deployment_error and effective_timeout_s is None:
486-
# ignore_deployment_error can intentionally skip the stopped+error failure guard.
487-
# Bound the wait so callers don't get an unbounded polling loop.
488-
effective_timeout_s = 60.0
489-
logging.warning(
490-
"start_pipeline(ignore_deployment_error=True) called without timeout; "
491-
"defaulting to %.0fs",
492-
effective_timeout_s,
493-
)
494-
495-
return self.__wait_for_pipeline_state_one_of(
496-
pipeline_name,
497-
[initial, "AwaitingApproval"],
498-
effective_timeout_s,
499-
ignore_deployment_error=ignore_deployment_error,
500-
)
501-
502-
def start_pipeline(
503-
self,
504-
pipeline_name: str,
505-
bootstrap_policy: Optional[BootstrapPolicy] = None,
506-
wait: bool = True,
507-
timeout_s: Optional[float] = None,
508-
ignore_deployment_error: bool = False,
509-
) -> Optional[PipelineStatus]:
481+
if not wait:
482+
return None
483+
484+
effective_timeout_s = timeout_s
485+
if ignore_deployment_error and effective_timeout_s is None:
486+
# ignore_deployment_error can intentionally skip the stopped+error failure guard.
487+
# Bound the wait so callers don't get an unbounded polling loop.
488+
effective_timeout_s = 60.0
489+
logging.warning(
490+
"start_pipeline(ignore_deployment_error=True) called without timeout; "
491+
"defaulting to %.0fs",
492+
effective_timeout_s,
493+
)
494+
495+
return self.__wait_for_pipeline_state_one_of(
496+
pipeline_name,
497+
[initial, "AwaitingApproval"],
498+
effective_timeout_s,
499+
ignore_deployment_error=ignore_deployment_error,
500+
)
501+
502+
def start_pipeline(
503+
self,
504+
pipeline_name: str,
505+
bootstrap_policy: Optional[BootstrapPolicy] = None,
506+
wait: bool = True,
507+
timeout_s: Optional[float] = None,
508+
ignore_deployment_error: bool = False,
509+
) -> Optional[PipelineStatus]:
510510
"""
511511
512512
:param pipeline_name: The name of the pipeline to start
513-
:param wait: Set True to wait for the pipeline to start.
514-
True by default
515-
:param timeout_s: The amount of time in seconds to wait for the
516-
pipeline to start.
517-
:param ignore_deployment_error: Set True to ignore deployment errors while waiting
518-
for START transition.
519-
"""
520-
521-
return self._inner_start_pipeline(
522-
pipeline_name,
523-
"running",
524-
bootstrap_policy,
525-
wait,
526-
timeout_s,
527-
ignore_deployment_error,
528-
)
513+
:param wait: Set True to wait for the pipeline to start.
514+
True by default
515+
:param timeout_s: The amount of time in seconds to wait for the
516+
pipeline to start.
517+
:param ignore_deployment_error: Set True to ignore deployment errors while waiting
518+
for START transition.
519+
"""
520+
521+
return self._inner_start_pipeline(
522+
pipeline_name,
523+
"running",
524+
bootstrap_policy,
525+
wait,
526+
timeout_s,
527+
ignore_deployment_error,
528+
)
529529

530530
def start_pipeline_as_paused(
531531
self,

0 commit comments

Comments
 (0)