-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathpolling.py
More file actions
118 lines (92 loc) · 3.54 KB
/
polling.py
File metadata and controls
118 lines (92 loc) · 3.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import time
from typing import Any, Union, TypeVar, Callable, Optional
from dataclasses import dataclass
T = TypeVar("T")
@dataclass
class PollingConfig:
"""Configuration for polling behavior"""
interval_seconds: float = 1.0
max_attempts: int = 120
timeout_seconds: Optional[float] = None
class PollingTimeout(Exception):
"""Raised when polling exceeds max attempts or timeout"""
def __init__(self, message: str, last_value: Any):
self.last_value = last_value
super().__init__(f"{message}. Last retrieved value: {last_value}")
def poll_until(
retriever: Callable[[], T],
is_terminal: Callable[[T], bool],
config: Optional[PollingConfig] = None,
on_error: Optional[Callable[[Exception], T]] = None,
) -> T:
"""
Poll until a condition is met or timeout/max attempts are reached.
Args:
retriever: Callable that returns the object to check
is_terminal: Callable that returns True when polling should stop
config: Optional polling configuration
on_error: Optional error handler that can return a value to continue polling
or re-raise the exception to stop polling
Returns:
The final state of the polled object
Raises:
PollingTimeout: When max attempts or timeout is reached
"""
if config is None:
config = PollingConfig()
attempts = 0
start_time = time.time()
last_result = None
while True:
try:
last_result = retriever()
except Exception as e:
if on_error is not None:
last_result = on_error(e)
else:
raise
if is_terminal(last_result):
return last_result
attempts += 1
if attempts >= config.max_attempts:
raise PollingTimeout(f"Exceeded maximum attempts ({config.max_attempts})", last_result)
if config.timeout_seconds is not None:
elapsed = time.time() - start_time
if elapsed >= config.timeout_seconds:
raise PollingTimeout(f"Exceeded timeout of {config.timeout_seconds} seconds", last_result)
time.sleep(config.interval_seconds)
def retry_server_poll_until(
retriever: Callable[[float], T],
is_terminal: Callable[[T], bool],
timeout_seconds: float = 30.0,
on_error: Optional[Callable[[Exception], T]] = None,
) -> T:
"""
Retry a server-side long-poll until a condition is met or max timeout is reached.
Args:
retriever: Callable that takes the remaining timeout (seconds) and
returns the object to check.
is_terminal: Callable that returns True when polling should stop
timeout_seconds: Total time to wait. Must be > 0
on_error: Optional error handler that can return a value to continue polling
or re-raise the exception to stop polling
Returns:
The final state of the polled object
Raises:
PollingTimeout: When max attempts or timeout is reached
"""
last_result: Union[T, None] = None
start_time = time.time()
while True:
remaining_time = timeout_seconds - (time.time() - start_time)
if remaining_time <= 0:
raise PollingTimeout(f"Exceeded timeout of {timeout_seconds} seconds", last_result)
try:
last_result = retriever(remaining_time)
except Exception as e:
if on_error is not None:
last_result = on_error(e)
else:
raise
if is_terminal(last_result):
return last_result