Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions python/feldera/_callback_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,16 @@ def run(self):
for chunk in iterator:
self.to_callback(chunk)

except StopIteration:
# The server closed the stream before sending the first chunk;
# e.g., the pipeline stopped concurrently. The listener observes
# no output; this is not an error.
pass
except BaseException as e:
self.exception_callback(e)
finally:
# Always unblock the caller, including when the connection attempt
# fails before the first chunk arrives. The exception recorded by
# `exception_callback` above must be stored before the event is
# set, so that the woken caller can observe and surface it.
self.event.set()
12 changes: 11 additions & 1 deletion python/feldera/output_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,21 @@ def exception_callback(exception: BaseException):

def start(self):
"""
Starts the output handler in a separate thread
Starts the output handler in a separate thread.

Blocks until the listener is connected to the change stream and ready
to receive data.

:raises BaseException: The exception that caused the connection
attempt to fail; e.g., a
:class:`feldera.rest.errors.FelderaAPIError` if the egress
request was rejected.
"""

self.handler.start()
_ = self.event.wait()
if self.exception is not None:
raise self.exception

def to_pandas(self, clear_buffer: bool = True):
"""
Expand Down
14 changes: 13 additions & 1 deletion python/feldera/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ def listen(self, view_name: str) -> OutputHandler:
using :meth:`start_paused`, attach listeners and unpause the pipeline using :meth:`resume`.

:param view_name: The name of the view to listen to.

:raises RuntimeError: If the pipeline is neither running nor paused.
:raises BaseException: The underlying error, if the connection to the
change stream cannot be established.
"""

if self.status() not in [PipelineStatus.PAUSED, PipelineStatus.RUNNING]:
Expand Down Expand Up @@ -324,17 +328,25 @@ def foreach_chunk(
.. note::
- The callback must be thread-safe as it will be run in a separate thread.

:raises RuntimeError: If the pipeline is neither running nor paused.
:raises BaseException: The underlying error, if the connection to the
change stream cannot be established.
"""

if self.status() not in [PipelineStatus.RUNNING, PipelineStatus.PAUSED]:
raise RuntimeError("Pipeline must be running or paused to listen to output")

event = Event()
# Collect the exception that aborted the listener, if any, so that a
# connection failure surfaces here instead of being silently dropped.
errors: List[BaseException] = []
handler = CallbackRunner(
self.client, self.name, view_name, callback, lambda exception: None, event
self.client, self.name, view_name, callback, errors.append, event
)
handler.start()
event.wait()
if errors:
raise errors[0]

def wait_for_completion(
self, force_stop: bool = False, timeout_s: float | None = None
Expand Down
44 changes: 31 additions & 13 deletions python/feldera/rest/_httprequests.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,14 @@ def _custom_wait(self, retry_state: "RetryCallState") -> float:
"""
Compute the wait between retries. Branches by exception type:
- `Retry-After` header (if any) wins, capped at `max_backoff`.
- 502: probe `/cluster_healthz`. If the cluster is healthy the 502
is treated as spurious — return 0 so the retry runs immediately.
Otherwise return the configured `unhealthy_backoff` (a flat wait
while an upgrade or restart completes).
- 502: probe `/cluster_healthz`. If the cluster is unhealthy,
return the configured `unhealthy_backoff` (a flat wait while an
upgrade or restart completes). If the cluster is healthy, treat
the first 502 as spurious — return 0 so the retry runs
immediately. If 502s persist (e.g., a brief window while gateway
routes propagate after a pipeline restart), fall through to the
exponential backoff, so the remaining retries spread across the
outage instead of all firing inside it.
- Everything else: exponential backoff plus optional jitter.
"""
cfg = self.config.retry_config
Expand All @@ -115,14 +119,18 @@ def _custom_wait(self, retry_state: "RetryCallState") -> float:
return min(float(retry_after), cfg.max_backoff)

if _is_502(exc):
if self._check_cluster_health():
if not self._check_cluster_health():
logging.info(
"Cluster unhealthy; backing off %.1fs before retrying 502",
cfg.unhealthy_backoff,
)
return cfg.unhealthy_backoff
if retry_state.attempt_number <= 1:
logging.info("Cluster healthy — treating 502 as spurious")
return 0.0
logging.info(
"Cluster unhealthy; backing off %.1fs before retrying 502",
cfg.unhealthy_backoff,
"Cluster healthy but 502s persist; falling back to exponential backoff"
)
return cfg.unhealthy_backoff

backoff = wait_exponential(
multiplier=cfg.initial_backoff,
Expand All @@ -140,11 +148,14 @@ def _do_single_request(
data: Any,
params: Optional[Mapping[str, Any]],
stream: bool,
timeout: Optional[tuple[Optional[float], Optional[float]]] = None,
) -> Any:
response = http_method(
request_path,
data=data,
timeout=(self.config.connection_timeout, self.config.timeout),
timeout=timeout
if timeout is not None
else (self.config.connection_timeout, self.config.timeout),
headers=self.headers,
params=params,
stream=stream,
Expand All @@ -165,6 +176,7 @@ def send_request(
params: Optional[Mapping[str, Any]] = None,
stream: bool = False,
serialize: bool = True,
timeout: Optional[tuple[Optional[float], Optional[float]]] = None,
) -> Any:
"""
:param http_method: The HTTP method to use. Takes the equivalent `requests.*` module. (Example: `requests.get`)
Expand All @@ -174,6 +186,9 @@ def send_request(
:param params: The query parameters part of this request.
:param stream: True if the response is expected to be a HTTP stream.
:param serialize: True if the body needs to be serialized to JSON.
:param timeout: A `(connection timeout, read timeout)` pair, in
seconds, overriding the client-wide timeout configuration for
this request only. `None` elements mean "wait indefinitely".

Send an HTTP request, retrying transient failures per the client's
`RetryConfig`.
Expand All @@ -182,9 +197,10 @@ def send_request(
- Status codes in `retry_config.retryable_status_codes` (default
408, 429, 502, 503, 504) and connection/read timeouts retry.
- 502 probes `/cluster_healthz` to distinguish a spurious gateway
error (cluster healthy → retry immediately) from a real outage
(cluster unhealthy → wait `unhealthy_backoff` seconds before
retrying).
error from a real outage. With a healthy cluster, the first 502
retries immediately and persistent 502s use exponential backoff;
with an unhealthy cluster, each retry waits `unhealthy_backoff`
seconds.
- Other retryable failures use exponential backoff with optional
jitter; a server-supplied `Retry-After` header overrides it
(capped at `max_backoff`).
Expand Down Expand Up @@ -220,7 +236,7 @@ def send_request(
for attempt in retryer:
with attempt:
return self._do_single_request(
http_method, request_path, data, params, stream
http_method, request_path, data, params, stream, timeout
)
except requests.exceptions.Timeout as err:
raise FelderaTimeoutError(str(err)) from err
Expand All @@ -245,6 +261,7 @@ def post(
params: Optional[Mapping[str, Any]] = None,
stream: bool = False,
serialize: bool = True,
timeout: Optional[tuple[Optional[float], Optional[float]]] = None,
) -> Any:
return self.send_request(
requests.post,
Expand All @@ -254,6 +271,7 @@ def post(
params,
stream=stream,
serialize=serialize,
timeout=timeout,
)

def patch(
Expand Down
9 changes: 9 additions & 0 deletions python/feldera/rest/feldera_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,7 @@ def listen_to_pipeline(
array: bool = False,
timeout: Optional[float] = None,
case_sensitive: bool = False,
read_timeout: Optional[float] = 120.0,
):
"""
Listen for updates to views for pipeline, yields the chunks of data
Expand All @@ -1181,6 +1182,13 @@ def listen_to_pipeline(

:param timeout: The amount of time in seconds to listen to the stream for
:param case_sensitive: True if the table name is case sensitive or a reserved keyword, False by default
:param read_timeout: The maximum time in seconds to wait for the server
to produce the next piece of the stream, including the initial
response. The pipeline emits a heartbeat chunk every few seconds
even when there is no data, so this timeout fires only when the
connection or the server is stalled; it converts an otherwise
indefinite hang into an error. Set to None to wait indefinitely.
The default is 120 seconds.
"""

params = {
Expand All @@ -1200,6 +1208,7 @@ def listen_to_pipeline(
path=f"/pipelines/{quote(pipeline_name, safe='')}/egress/{quote(table_name, safe='')}",
params=params,
stream=True,
timeout=(self.config.connection_timeout, read_timeout),
)

end = time.monotonic() + timeout if timeout else None
Expand Down
11 changes: 7 additions & 4 deletions python/feldera/rest/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ class RetryConfig:
plus a uniform random `[0, jitter)` term, where `n` is the
zero-based retry index.
- 502 uses cluster-aware backoff: the client probes
`/cluster_healthz`; if the cluster is healthy, the 502 is treated as
spurious and the next retry runs immediately (wait = 0). If the
cluster reports unhealthy (e.g. an upgrade is in progress), the next
retry waits `unhealthy_backoff` seconds.
`/cluster_healthz`. If the cluster reports unhealthy (e.g. an
upgrade is in progress), the next retry waits `unhealthy_backoff`
seconds. If the cluster is healthy, the first 502 is treated as
spurious and retried immediately (wait = 0); if 502s persist
(e.g. a brief outage while gateway routes propagate), subsequent
retries use the exponential backoff above, so they spread across
the outage instead of all firing inside it.
- A server-supplied `Retry-After` header always overrides the computed
wait (capped at `max_backoff`).

Expand Down
31 changes: 31 additions & 0 deletions python/tests/unit/test_httprequests_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,37 @@ def test_spurious_502_retries_immediately(self):
assert health.call_count == 1
sleeper.assert_called_once_with(0.0)

def test_persistent_healthy_502_backs_off_exponentially(self):
"""
With a healthy cluster, only the first 502 retries immediately.
Persistent 502s (e.g., a brief outage while gateway routes
propagate) fall back to the exponential schedule, so the remaining
retries spread across the outage instead of all firing inside it.
"""
cfg = RetryConfig(
max_retries=3,
initial_backoff=1.0,
max_backoff=8.0,
multiplier=2.0,
jitter=0.0,
)
client = _make_client(cfg)
with patch_requests("get", [_make_response(502)] * 4) as m:
with mock.patch.object(
client, "_check_cluster_health", return_value=True
) as health:
with mock.patch("tenacity.nap.time.sleep") as sleeper:
with pytest.raises(FelderaAPIError) as exc_info:
client.get("/foo")
assert exc_info.value.status_code == 502
assert m.call_count == 4
# One probe per computed wait. Tenacity computes the wait once more
# after the final attempt before stopping, hence >= rather than ==.
assert health.call_count >= 3
slept = [call.args[0] for call in sleeper.call_args_list]
# Immediate first retry, then the documented exponential schedule.
assert slept == [0.0, 2.0, 4.0]

def test_unhealthy_502_uses_unhealthy_backoff(self):
"""Unhealthy cluster + 502 → wait function returns unhealthy_backoff."""
cfg = RetryConfig(
Expand Down
Loading
Loading