@@ -300,7 +300,7 @@ def foreach_chunk(
300300 handler .start ()
301301
302302 def wait_for_completion (
303- self , force_stop : bool = False , timeout_s : Optional [ float ] = None
303+ self , force_stop : bool = False , timeout_s : float | None = None
304304 ):
305305 """
306306 Block until the pipeline has completed processing all input records.
@@ -376,7 +376,7 @@ def is_complete(self) -> bool:
376376 def wait_for_idle (
377377 self ,
378378 idle_interval_s : float = 5.0 ,
379- timeout_s : float = 600.0 ,
379+ timeout_s : float | None = None ,
380380 poll_interval_s : float = 0.2 ,
381381 ):
382382 """
@@ -396,12 +396,12 @@ def wait_for_idle(
396396 :raises RuntimeError: If the metrics are missing or the timeout was
397397 reached.
398398 """
399- if idle_interval_s > timeout_s :
399+ if timeout_s is not None and idle_interval_s > timeout_s :
400400 raise ValueError (
401401 f"idle interval ({ idle_interval_s } s) cannot be larger than"
402402 f" timeout ({ timeout_s } s)"
403403 )
404- if poll_interval_s > timeout_s :
404+ if timeout_s is not None and poll_interval_s > timeout_s :
405405 raise ValueError (
406406 f"poll interval ({ poll_interval_s } s) cannot be larger than"
407407 f" timeout ({ timeout_s } s)"
@@ -447,7 +447,7 @@ def wait_for_idle(
447447 return
448448
449449 # Timeout
450- if now_s - start_time_s >= timeout_s :
450+ if timeout_s is not None and now_s - start_time_s >= timeout_s :
451451 raise RuntimeError (f"waiting for idle reached timeout ({ timeout_s } s)" )
452452 time .sleep (poll_interval_s )
453453
@@ -696,7 +696,7 @@ def get(name: str, client: FelderaClient) -> "Pipeline":
696696 err .message = f"Pipeline with name { name } not found"
697697 raise err
698698
699- def checkpoint (self , wait : bool = False , timeout_s = 300 ) -> int :
699+ def checkpoint (self , wait : bool = False , timeout_s : Optional [ float ] = None ) -> int :
700700 """
701701 Checkpoints this pipeline.
702702
@@ -718,7 +718,7 @@ def checkpoint(self, wait: bool = False, timeout_s=300) -> int:
718718
719719 while True :
720720 elapsed = time .monotonic () - start
721- if elapsed > timeout_s :
721+ if timeout_s is not None and elapsed > timeout_s :
722722 raise TimeoutError (
723723 f"""timeout ({ timeout_s } s) reached while waiting for \
724724 pipeline '{ self .name } ' to make checkpoint '{ seq } '"""
@@ -754,7 +754,9 @@ def checkpoint_status(self, seq: int) -> CheckpointStatus:
754754 if seq < success :
755755 return CheckpointStatus .Unknown
756756
757- def sync_checkpoint (self , wait : bool = False , timeout_s = 300 ) -> str :
757+ def sync_checkpoint (
758+ self , wait : bool = False , timeout_s : Optional [float ] = None
759+ ) -> str :
758760 """
759761 Syncs this checkpoint to object store.
760762
@@ -776,7 +778,7 @@ def sync_checkpoint(self, wait: bool = False, timeout_s=300) -> str:
776778
777779 while True :
778780 elapsed = time .monotonic () - start
779- if elapsed > timeout_s :
781+ if timeout_s is not None and elapsed > timeout_s :
780782 raise TimeoutError (
781783 f"""timeout ({ timeout_s } s) reached while waiting for \
782784 pipeline '{ self .name } ' to sync checkpoint '{ uuid } '"""
0 commit comments