@@ -232,15 +232,17 @@ def __wait_for_pipeline_state(
232232 )
233233 time .sleep (0.1 )
234234
235- def __wait_for_pipeline_state_one_of (
236- self ,
237- pipeline_name : str ,
238- states : list [str ],
239- timeout_s : float | None = None ,
240- start : bool = True ,
241- ) -> PipelineStatus :
242- start_time = time .monotonic ()
243- states = [state .lower () for state in states ]
235+ def __wait_for_pipeline_state_one_of (
236+ self ,
237+ pipeline_name : str ,
238+ states : list [str ],
239+ timeout_s : float | None = None ,
240+ start : bool = True ,
241+ ignore_deployment_error : bool = False ,
242+ ) -> PipelineStatus :
243+ start_time = time .monotonic ()
244+ poll_interval_s = 0.1
245+ states = [state .lower () for state in states ]
244246
245247 while True :
246248 if timeout_s is not None :
@@ -256,21 +258,29 @@ def __wait_for_pipeline_state_one_of(
256258
257259 if status .lower () in states :
258260 return PipelineStatus .from_str (status )
259- elif (
260- status == "Stopped"
261- and len (resp .deployment_error or {}) > 0
262- and resp .deployment_desired_status == "Stopped"
263- ):
264- err_msg = "Unable to START the pipeline:\n " if start else ""
265- raise RuntimeError (
266- f"""{ err_msg } Unable to transition the pipeline to one of the states: { states } .
267- Reason: The pipeline is in a STOPPED state due to the following error:
268- { resp .deployment_error .get ("message" , "" )} """
261+ elif (
262+ status == "Stopped"
263+ and len (resp .deployment_error or {}) > 0
264+ and resp .deployment_desired_status == "Stopped"
265+ ):
266+ if ignore_deployment_error :
267+ logging .debug (
268+ "ignoring stopped deployment error while waiting for %s to start: %s" ,
269+ pipeline_name ,
270+ (resp .deployment_error or {}).get ("message" , "" ),
271+ )
272+ time .sleep (poll_interval_s )
273+ continue
274+ err_msg = "Unable to START the pipeline:\n " if start else ""
275+ raise RuntimeError (
276+ f"""{ err_msg } Unable to transition the pipeline to one of the states: { states } .
277+ Reason: The pipeline is in a STOPPED state due to the following error:
278+ { resp .deployment_error .get ("message" , "" )} """
269279 )
270- logging .debug (
271- "still starting %s, waiting for 100 more milliseconds" , pipeline_name
272- )
273- time .sleep (0.1 )
280+ logging .debug (
281+ "still starting %s, waiting for 100 more milliseconds" , pipeline_name
282+ )
283+ time .sleep (poll_interval_s )
274284
275285 def create_pipeline (self , pipeline : Pipeline , wait : bool = True ) -> Pipeline :
276286 """
@@ -437,22 +447,26 @@ def activate_pipeline(
437447 pipeline_name , ["running" , "AwaitingApproval" ], timeout_s
438448 )
439449
440- def _inner_start_pipeline (
441- self ,
442- pipeline_name : str ,
443- initial : str = "running" ,
444- bootstrap_policy : Optional [BootstrapPolicy ] = None ,
445- wait : bool = True ,
446- timeout_s : Optional [float ] = None ,
447- ) -> Optional [PipelineStatus ]:
450+ def _inner_start_pipeline (
451+ self ,
452+ pipeline_name : str ,
453+ initial : str = "running" ,
454+ bootstrap_policy : Optional [BootstrapPolicy ] = None ,
455+ wait : bool = True ,
456+ timeout_s : Optional [float ] = None ,
457+ ignore_deployment_error : bool = False ,
458+ ) -> Optional [PipelineStatus ]:
448459 """
449460
450461 :param pipeline_name: The name of the pipeline to start
451462 :param initial: The initial state to start the pipeline in. "running"
452463 by default.
453- :param wait: Set True to wait for the pipeline to start. True by default
454- :param timeout_s: The amount of time in seconds to wait for the
455- pipeline to start.
464+ :param wait: Set True to wait for the pipeline to start. True by default
465+ :param timeout_s: The amount of time in seconds to wait for the
466+ pipeline to start.
467+ :param ignore_deployment_error: Set True to ignore deployment errors while waiting for
468+ START transition. If `timeout_s` is not provided, a default timeout is
469+ used to avoid indefinite waiting.
456470 """
457471
458472 params = {"initial" : initial }
@@ -464,32 +478,54 @@ def _inner_start_pipeline(
464478 params = params ,
465479 )
466480
467- if not wait :
468- return None
469-
470- return self .__wait_for_pipeline_state_one_of (
471- pipeline_name , [initial , "AwaitingApproval" ], timeout_s
472- )
473-
474- def start_pipeline (
475- self ,
476- pipeline_name : str ,
477- bootstrap_policy : Optional [BootstrapPolicy ] = None ,
478- wait : bool = True ,
479- timeout_s : Optional [float ] = None ,
480- ) -> Optional [PipelineStatus ]:
481+ if not wait :
482+ return None
483+
484+ effective_timeout_s = timeout_s
485+ if ignore_deployment_error and effective_timeout_s is None :
486+ # ignore_deployment_error can intentionally skip the stopped+error failure guard.
487+ # Bound the wait so callers don't get an unbounded polling loop.
488+ effective_timeout_s = 60.0
489+ logging .warning (
490+ "start_pipeline(ignore_deployment_error=True) called without timeout; "
491+ "defaulting to %.0fs" ,
492+ effective_timeout_s ,
493+ )
494+
495+ return self .__wait_for_pipeline_state_one_of (
496+ pipeline_name ,
497+ [initial , "AwaitingApproval" ],
498+ effective_timeout_s ,
499+ ignore_deployment_error = ignore_deployment_error ,
500+ )
501+
502+ def start_pipeline (
503+ self ,
504+ pipeline_name : str ,
505+ bootstrap_policy : Optional [BootstrapPolicy ] = None ,
506+ wait : bool = True ,
507+ timeout_s : Optional [float ] = None ,
508+ ignore_deployment_error : bool = False ,
509+ ) -> Optional [PipelineStatus ]:
481510 """
482511
483512 :param pipeline_name: The name of the pipeline to start
484- :param wait: Set True to wait for the pipeline to start.
485- True by default
486- :param timeout_s: The amount of time in seconds to wait for the
487- pipeline to start.
488- """
489-
490- return self ._inner_start_pipeline (
491- pipeline_name , "running" , bootstrap_policy , wait , timeout_s
492- )
513+ :param wait: Set True to wait for the pipeline to start.
514+ True by default
515+ :param timeout_s: The amount of time in seconds to wait for the
516+ pipeline to start.
517+ :param ignore_deployment_error: Set True to ignore deployment errors while waiting
518+ for START transition.
519+ """
520+
521+ return self ._inner_start_pipeline (
522+ pipeline_name ,
523+ "running" ,
524+ bootstrap_policy ,
525+ wait ,
526+ timeout_s ,
527+ ignore_deployment_error ,
528+ )
493529
494530 def start_pipeline_as_paused (
495531 self ,
0 commit comments