From c2d7dd1e2b5c0a0e832d296a97587623870b85fe Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Fri, 12 Jun 2026 11:23:27 -0700 Subject: [PATCH 1/2] py: don't hang forever when listen() fails to connect When the egress request failed before the first chunk arrived, the CallbackRunner thread recorded the exception but never set the event that unblocks the caller, leaving Pipeline.listen() and foreach_chunk() blocked forever and the error invisible. In CI, where runtime tests run against a remote deployment, a transient failure right after a pipeline restart turned into a 1-hour pytest timeout inside listen() (test_transactions.py::test_dynamic_output_connector). - Always set the event when the listener thread exits, and re-raise the recorded exception from OutputHandler.start(), so listen() fails loudly instead of hanging. - Propagate listener exceptions from foreach_chunk(), which previously discarded them. - Treat a stream that closes cleanly before the first chunk as "no output" rather than an error. - Bound the egress stream reads with a 120s read timeout (the server emits a heartbeat chunk every ~3s, so silence that long means a dead or stalled connection). Overridable via the new read_timeout parameter of listen_to_pipeline(). Co-Authored-By: Claude Fable 5 --- python/feldera/_callback_runner.py | 11 ++ python/feldera/output_handler.py | 12 +- python/feldera/pipeline.py | 14 +- python/feldera/rest/_httprequests.py | 13 +- python/feldera/rest/feldera_client.py | 9 ++ python/tests/unit/test_output_handler.py | 180 +++++++++++++++++++++++ 6 files changed, 235 insertions(+), 4 deletions(-) create mode 100644 python/tests/unit/test_output_handler.py diff --git a/python/feldera/_callback_runner.py b/python/feldera/_callback_runner.py index d84a6154ed5..09a8c6d03d5 100644 --- a/python/feldera/_callback_runner.py +++ b/python/feldera/_callback_runner.py @@ -91,5 +91,16 @@ def run(self): for chunk in iterator: self.to_callback(chunk) + except StopIteration: + # The server closed the stream before sending the first chunk; + # e.g., the pipeline stopped concurrently. The listener observes + # no output; this is not an error. + pass except BaseException as e: self.exception_callback(e) + finally: + # Always unblock the caller, including when the connection attempt + # fails before the first chunk arrives. The exception recorded by + # `exception_callback` above must be stored before the event is + # set, so that the woken caller can observe and surface it. + self.event.set() diff --git a/python/feldera/output_handler.py b/python/feldera/output_handler.py index f7a4b9ad502..ac820607992 100644 --- a/python/feldera/output_handler.py +++ b/python/feldera/output_handler.py @@ -46,11 +46,21 @@ def exception_callback(exception: BaseException): def start(self): """ - Starts the output handler in a separate thread + Starts the output handler in a separate thread. + + Blocks until the listener is connected to the change stream and ready + to receive data. + + :raises BaseException: The exception that caused the connection + attempt to fail; e.g., a + :class:`feldera.rest.errors.FelderaAPIError` if the egress + request was rejected. """ self.handler.start() _ = self.event.wait() + if self.exception is not None: + raise self.exception def to_pandas(self, clear_buffer: bool = True): """ diff --git a/python/feldera/pipeline.py b/python/feldera/pipeline.py index d404bf139ef..6d19ac0547c 100644 --- a/python/feldera/pipeline.py +++ b/python/feldera/pipeline.py @@ -290,6 +290,10 @@ def listen(self, view_name: str) -> OutputHandler: using :meth:`start_paused`, attach listeners and unpause the pipeline using :meth:`resume`. :param view_name: The name of the view to listen to. + + :raises RuntimeError: If the pipeline is neither running nor paused. + :raises BaseException: The underlying error, if the connection to the + change stream cannot be established. """ if self.status() not in [PipelineStatus.PAUSED, PipelineStatus.RUNNING]: @@ -324,17 +328,25 @@ def foreach_chunk( .. note:: - The callback must be thread-safe as it will be run in a separate thread. + :raises RuntimeError: If the pipeline is neither running nor paused. + :raises BaseException: The underlying error, if the connection to the + change stream cannot be established. """ if self.status() not in [PipelineStatus.RUNNING, PipelineStatus.PAUSED]: raise RuntimeError("Pipeline must be running or paused to listen to output") event = Event() + # Collect the exception that aborted the listener, if any, so that a + # connection failure surfaces here instead of being silently dropped. + errors: List[BaseException] = [] handler = CallbackRunner( - self.client, self.name, view_name, callback, lambda exception: None, event + self.client, self.name, view_name, callback, errors.append, event ) handler.start() event.wait() + if errors: + raise errors[0] def wait_for_completion( self, force_stop: bool = False, timeout_s: float | None = None diff --git a/python/feldera/rest/_httprequests.py b/python/feldera/rest/_httprequests.py index 53f311bf542..c55caec7b5a 100644 --- a/python/feldera/rest/_httprequests.py +++ b/python/feldera/rest/_httprequests.py @@ -140,11 +140,14 @@ def _do_single_request( data: Any, params: Optional[Mapping[str, Any]], stream: bool, + timeout: Optional[tuple[Optional[float], Optional[float]]] = None, ) -> Any: response = http_method( request_path, data=data, - timeout=(self.config.connection_timeout, self.config.timeout), + timeout=timeout + if timeout is not None + else (self.config.connection_timeout, self.config.timeout), headers=self.headers, params=params, stream=stream, @@ -165,6 +168,7 @@ def send_request( params: Optional[Mapping[str, Any]] = None, stream: bool = False, serialize: bool = True, + timeout: Optional[tuple[Optional[float], Optional[float]]] = None, ) -> Any: """ :param http_method: The HTTP method to use. Takes the equivalent `requests.*` module. (Example: `requests.get`) @@ -174,6 +178,9 @@ def send_request( :param params: The query parameters part of this request. :param stream: True if the response is expected to be a HTTP stream. :param serialize: True if the body needs to be serialized to JSON. + :param timeout: A `(connection timeout, read timeout)` pair, in + seconds, overriding the client-wide timeout configuration for + this request only. `None` elements mean "wait indefinitely". Send an HTTP request, retrying transient failures per the client's `RetryConfig`. @@ -220,7 +227,7 @@ def send_request( for attempt in retryer: with attempt: return self._do_single_request( - http_method, request_path, data, params, stream + http_method, request_path, data, params, stream, timeout ) except requests.exceptions.Timeout as err: raise FelderaTimeoutError(str(err)) from err @@ -245,6 +252,7 @@ def post( params: Optional[Mapping[str, Any]] = None, stream: bool = False, serialize: bool = True, + timeout: Optional[tuple[Optional[float], Optional[float]]] = None, ) -> Any: return self.send_request( requests.post, @@ -254,6 +262,7 @@ def post( params, stream=stream, serialize=serialize, + timeout=timeout, ) def patch( diff --git a/python/feldera/rest/feldera_client.py b/python/feldera/rest/feldera_client.py index 90bca15f599..effb27d1cdc 100644 --- a/python/feldera/rest/feldera_client.py +++ b/python/feldera/rest/feldera_client.py @@ -1162,6 +1162,7 @@ def listen_to_pipeline( array: bool = False, timeout: Optional[float] = None, case_sensitive: bool = False, + read_timeout: Optional[float] = 120.0, ): """ Listen for updates to views for pipeline, yields the chunks of data @@ -1181,6 +1182,13 @@ def listen_to_pipeline( :param timeout: The amount of time in seconds to listen to the stream for :param case_sensitive: True if the table name is case sensitive or a reserved keyword, False by default + :param read_timeout: The maximum time in seconds to wait for the server + to produce the next piece of the stream, including the initial + response. The pipeline emits a heartbeat chunk every few seconds + even when there is no data, so this timeout fires only when the + connection or the server is stalled; it converts an otherwise + indefinite hang into an error. Set to None to wait indefinitely. + The default is 120 seconds. """ params = { @@ -1200,6 +1208,7 @@ def listen_to_pipeline( path=f"/pipelines/{quote(pipeline_name, safe='')}/egress/{quote(table_name, safe='')}", params=params, stream=True, + timeout=(self.config.connection_timeout, read_timeout), ) end = time.monotonic() + timeout if timeout else None diff --git a/python/tests/unit/test_output_handler.py b/python/tests/unit/test_output_handler.py new file mode 100644 index 00000000000..cbd2f0a03d9 --- /dev/null +++ b/python/tests/unit/test_output_handler.py @@ -0,0 +1,180 @@ +"""Tests for the change-stream listener (`OutputHandler` / `CallbackRunner`). + +These tests exercise the connection phase of `Pipeline.listen()` and +`Pipeline.foreach_chunk()` against a mocked REST client. In particular, they +pin down the regression where a failure to establish the egress connection +left the caller blocked forever on an event that no one would ever set +(observed as a 1-hour `pytest` timeout in CI inside `pipeline.listen()`). +""" + +from __future__ import annotations + +from threading import Thread +from typing import Any, Callable, Iterable, Mapping, Optional +from unittest import mock + +import pytest + +from feldera.enums import PipelineStatus +from feldera.output_handler import OutputHandler +from feldera.pipeline import Pipeline +from feldera.rest.errors import FelderaTimeoutError +from feldera.rest.sql_table import SQLTable + +# Generous bound on calls that must complete quickly; reached only when the +# code under test hangs, which is precisely the regression being tested. +WATCHDOG_TIMEOUT_S = 60.0 + +ID_FIELD = { + "name": "id", + "case_sensitive": False, + "columntype": {"type": "INTEGER", "nullable": False}, +} + +HEARTBEAT_CHUNK: Mapping[str, Any] = {"sequence_number": 0, "snapshot": False} + +DATA_CHUNK: Mapping[str, Any] = { + "sequence_number": 1, + "snapshot": False, + "json_data": [{"insert": {"id": 1}}, {"insert": {"id": 2}}], +} + + +def _make_client( + listen_side_effect: Optional[BaseException] = None, + chunks: Optional[Iterable[Mapping[str, Any]]] = None, +) -> mock.Mock: + """ + Build a mock `FelderaClient` whose `listen_to_pipeline` either raises + `listen_side_effect` or returns a generator factory yielding `chunks`. + """ + + inner = mock.Mock() + inner.name = "test_pipeline" + inner.tables = [SQLTable("t1", fields=[ID_FIELD])] + inner.views = [] + + client = mock.Mock() + client.get_pipeline.return_value = inner + + if listen_side_effect is not None: + client.listen_to_pipeline.side_effect = listen_side_effect + else: + + def factory(): + yield from chunks or [] + + client.listen_to_pipeline.return_value = factory + + return client + + +def _call_with_watchdog(fn: Callable[[], Any]) -> Mapping[str, Any]: + """ + Run `fn` on a daemon thread and fail the test if it does not finish in + time, so that a hang in the code under test cannot hang the test suite. + + Returns {"returned": value} or {"raised": exception}. + """ + + outcome: dict[str, Any] = {} + + def target(): + try: + outcome["returned"] = fn() + except BaseException as e: + outcome["raised"] = e + + thread = Thread(target=target, daemon=True) + thread.start() + thread.join(WATCHDOG_TIMEOUT_S) + if thread.is_alive(): + pytest.fail("call did not complete: the listener is hanging") + return outcome + + +def test_start_raises_when_connection_fails(): + """ + An egress request that fails before the first chunk arrives must raise + from `start()`, not leave the caller blocked forever. + """ + + error = FelderaTimeoutError("egress request timed out") + client = _make_client(listen_side_effect=error) + handler = OutputHandler(client, "test_pipeline", "t1") + + outcome = _call_with_watchdog(handler.start) + + assert outcome.get("raised") is error + + +def test_start_raises_when_first_read_fails(): + """A connection that dies during the first body read must also raise.""" + + error = ConnectionError("connection reset by peer") + + def factory(): + raise error + yield # pragma: no cover -- makes `factory` a generator + + inner_client = _make_client(chunks=[]) + inner_client.listen_to_pipeline.return_value = factory + handler = OutputHandler(inner_client, "test_pipeline", "t1") + + outcome = _call_with_watchdog(handler.start) + + assert outcome.get("raised") is error + + +def test_start_returns_when_stream_ends_before_first_chunk(): + """ + A stream that closes cleanly before the first chunk (e.g., the pipeline + stopped concurrently) is not an error: `start()` returns and the listener + observes no output. + """ + + client = _make_client(chunks=[]) + handler = OutputHandler(client, "test_pipeline", "t1") + + outcome = _call_with_watchdog(handler.start) + + assert "raised" not in outcome + assert handler.to_dict() == [] + + +def test_start_unblocks_on_heartbeat_and_buffers_data(): + """ + The first chunk is normally an empty heartbeat; it must unblock `start()` + without producing output, and subsequent data chunks must be buffered. + """ + + client = _make_client(chunks=[HEARTBEAT_CHUNK, DATA_CHUNK]) + handler = OutputHandler(client, "test_pipeline", "t1") + + outcome = _call_with_watchdog(handler.start) + assert "raised" not in outcome + + # The data chunk is processed asynchronously; wait for the runner thread + # to drain the (finite) stream before checking the buffer. + handler.handler.join(WATCHDOG_TIMEOUT_S) + assert not handler.handler.is_alive() + + assert handler.to_dict() == [ + {"id": 1, "insert_delete": 1}, + {"id": 2, "insert_delete": 1}, + ] + + +def test_foreach_chunk_raises_when_connection_fails(): + """`foreach_chunk` must propagate a failed egress connection, not hang.""" + + error = FelderaTimeoutError("egress request timed out") + client = _make_client(listen_side_effect=error) + pipeline = Pipeline._from_inner(client.get_pipeline.return_value, client) + + with mock.patch.object(Pipeline, "status", return_value=PipelineStatus.RUNNING): + outcome = _call_with_watchdog( + lambda: pipeline.foreach_chunk("t1", lambda df, seq: None) + ) + + assert outcome.get("raised") is error From 67b6237ae8e2c73f994ec4dd2abe7e3e80de8f19 Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Fri, 12 Jun 2026 11:26:58 -0700 Subject: [PATCH 2/2] py: back off when 502s persist on a healthy cluster The 502 retry path treated every 502 as spurious while the cluster reported healthy and retried with zero wait, so all retries could fire inside the same sub-second 502 window (e.g., while gateway routes propagate after a pipeline restart) and exhaust without ever giving the outage time to clear -- the very case the retries exist for. Keep the immediate retry for the first 502, which preserves the cheap recovery from a one-off gateway hiccup, but fall back to the standard exponential backoff when 502s persist, so the remaining retries spread across the outage window. Co-Authored-By: Claude Fable 5 --- python/feldera/rest/_httprequests.py | 31 +++++++++++++------- python/feldera/rest/retry.py | 11 ++++--- python/tests/unit/test_httprequests_retry.py | 31 ++++++++++++++++++++ 3 files changed, 58 insertions(+), 15 deletions(-) diff --git a/python/feldera/rest/_httprequests.py b/python/feldera/rest/_httprequests.py index c55caec7b5a..d8f09e89530 100644 --- a/python/feldera/rest/_httprequests.py +++ b/python/feldera/rest/_httprequests.py @@ -101,10 +101,14 @@ def _custom_wait(self, retry_state: "RetryCallState") -> float: """ Compute the wait between retries. Branches by exception type: - `Retry-After` header (if any) wins, capped at `max_backoff`. - - 502: probe `/cluster_healthz`. If the cluster is healthy the 502 - is treated as spurious — return 0 so the retry runs immediately. - Otherwise return the configured `unhealthy_backoff` (a flat wait - while an upgrade or restart completes). + - 502: probe `/cluster_healthz`. If the cluster is unhealthy, + return the configured `unhealthy_backoff` (a flat wait while an + upgrade or restart completes). If the cluster is healthy, treat + the first 502 as spurious — return 0 so the retry runs + immediately. If 502s persist (e.g., a brief window while gateway + routes propagate after a pipeline restart), fall through to the + exponential backoff, so the remaining retries spread across the + outage instead of all firing inside it. - Everything else: exponential backoff plus optional jitter. """ cfg = self.config.retry_config @@ -115,14 +119,18 @@ def _custom_wait(self, retry_state: "RetryCallState") -> float: return min(float(retry_after), cfg.max_backoff) if _is_502(exc): - if self._check_cluster_health(): + if not self._check_cluster_health(): + logging.info( + "Cluster unhealthy; backing off %.1fs before retrying 502", + cfg.unhealthy_backoff, + ) + return cfg.unhealthy_backoff + if retry_state.attempt_number <= 1: logging.info("Cluster healthy — treating 502 as spurious") return 0.0 logging.info( - "Cluster unhealthy; backing off %.1fs before retrying 502", - cfg.unhealthy_backoff, + "Cluster healthy but 502s persist; falling back to exponential backoff" ) - return cfg.unhealthy_backoff backoff = wait_exponential( multiplier=cfg.initial_backoff, @@ -189,9 +197,10 @@ def send_request( - Status codes in `retry_config.retryable_status_codes` (default 408, 429, 502, 503, 504) and connection/read timeouts retry. - 502 probes `/cluster_healthz` to distinguish a spurious gateway - error (cluster healthy → retry immediately) from a real outage - (cluster unhealthy → wait `unhealthy_backoff` seconds before - retrying). + error from a real outage. With a healthy cluster, the first 502 + retries immediately and persistent 502s use exponential backoff; + with an unhealthy cluster, each retry waits `unhealthy_backoff` + seconds. - Other retryable failures use exponential backoff with optional jitter; a server-supplied `Retry-After` header overrides it (capped at `max_backoff`). diff --git a/python/feldera/rest/retry.py b/python/feldera/rest/retry.py index b2977042477..7205fc73beb 100644 --- a/python/feldera/rest/retry.py +++ b/python/feldera/rest/retry.py @@ -20,10 +20,13 @@ class RetryConfig: plus a uniform random `[0, jitter)` term, where `n` is the zero-based retry index. - 502 uses cluster-aware backoff: the client probes - `/cluster_healthz`; if the cluster is healthy, the 502 is treated as - spurious and the next retry runs immediately (wait = 0). If the - cluster reports unhealthy (e.g. an upgrade is in progress), the next - retry waits `unhealthy_backoff` seconds. + `/cluster_healthz`. If the cluster reports unhealthy (e.g. an + upgrade is in progress), the next retry waits `unhealthy_backoff` + seconds. If the cluster is healthy, the first 502 is treated as + spurious and retried immediately (wait = 0); if 502s persist + (e.g. a brief outage while gateway routes propagate), subsequent + retries use the exponential backoff above, so they spread across + the outage instead of all firing inside it. - A server-supplied `Retry-After` header always overrides the computed wait (capped at `max_backoff`). diff --git a/python/tests/unit/test_httprequests_retry.py b/python/tests/unit/test_httprequests_retry.py index c28cadb75a8..feda57688ab 100644 --- a/python/tests/unit/test_httprequests_retry.py +++ b/python/tests/unit/test_httprequests_retry.py @@ -351,6 +351,37 @@ def test_spurious_502_retries_immediately(self): assert health.call_count == 1 sleeper.assert_called_once_with(0.0) + def test_persistent_healthy_502_backs_off_exponentially(self): + """ + With a healthy cluster, only the first 502 retries immediately. + Persistent 502s (e.g., a brief outage while gateway routes + propagate) fall back to the exponential schedule, so the remaining + retries spread across the outage instead of all firing inside it. + """ + cfg = RetryConfig( + max_retries=3, + initial_backoff=1.0, + max_backoff=8.0, + multiplier=2.0, + jitter=0.0, + ) + client = _make_client(cfg) + with patch_requests("get", [_make_response(502)] * 4) as m: + with mock.patch.object( + client, "_check_cluster_health", return_value=True + ) as health: + with mock.patch("tenacity.nap.time.sleep") as sleeper: + with pytest.raises(FelderaAPIError) as exc_info: + client.get("/foo") + assert exc_info.value.status_code == 502 + assert m.call_count == 4 + # One probe per computed wait. Tenacity computes the wait once more + # after the final attempt before stopping, hence >= rather than ==. + assert health.call_count >= 3 + slept = [call.args[0] for call in sleeper.call_args_list] + # Immediate first retry, then the documented exponential schedule. + assert slept == [0.0, 2.0, 4.0] + def test_unhealthy_502_uses_unhealthy_backoff(self): """Unhealthy cluster + 502 → wait function returns unhealthy_backoff.""" cfg = RetryConfig(