diff --git a/python/feldera/_callback_runner.py b/python/feldera/_callback_runner.py index d84a6154ed5..09a8c6d03d5 100644 --- a/python/feldera/_callback_runner.py +++ b/python/feldera/_callback_runner.py @@ -91,5 +91,16 @@ def run(self): for chunk in iterator: self.to_callback(chunk) + except StopIteration: + # The server closed the stream before sending the first chunk; + # e.g., the pipeline stopped concurrently. The listener observes + # no output; this is not an error. + pass except BaseException as e: self.exception_callback(e) + finally: + # Always unblock the caller, including when the connection attempt + # fails before the first chunk arrives. The exception recorded by + # `exception_callback` above must be stored before the event is + # set, so that the woken caller can observe and surface it. + self.event.set() diff --git a/python/feldera/output_handler.py b/python/feldera/output_handler.py index f7a4b9ad502..ac820607992 100644 --- a/python/feldera/output_handler.py +++ b/python/feldera/output_handler.py @@ -46,11 +46,21 @@ def exception_callback(exception: BaseException): def start(self): """ - Starts the output handler in a separate thread + Starts the output handler in a separate thread. + + Blocks until the listener is connected to the change stream and ready + to receive data. + + :raises BaseException: The exception that caused the connection + attempt to fail; e.g., a + :class:`feldera.rest.errors.FelderaAPIError` if the egress + request was rejected. """ self.handler.start() _ = self.event.wait() + if self.exception is not None: + raise self.exception def to_pandas(self, clear_buffer: bool = True): """ diff --git a/python/feldera/pipeline.py b/python/feldera/pipeline.py index d404bf139ef..6d19ac0547c 100644 --- a/python/feldera/pipeline.py +++ b/python/feldera/pipeline.py @@ -290,6 +290,10 @@ def listen(self, view_name: str) -> OutputHandler: using :meth:`start_paused`, attach listeners and unpause the pipeline using :meth:`resume`. :param view_name: The name of the view to listen to. + + :raises RuntimeError: If the pipeline is neither running nor paused. + :raises BaseException: The underlying error, if the connection to the + change stream cannot be established. """ if self.status() not in [PipelineStatus.PAUSED, PipelineStatus.RUNNING]: @@ -324,17 +328,25 @@ def foreach_chunk( .. note:: - The callback must be thread-safe as it will be run in a separate thread. + :raises RuntimeError: If the pipeline is neither running nor paused. + :raises BaseException: The underlying error, if the connection to the + change stream cannot be established. """ if self.status() not in [PipelineStatus.RUNNING, PipelineStatus.PAUSED]: raise RuntimeError("Pipeline must be running or paused to listen to output") event = Event() + # Collect the exception that aborted the listener, if any, so that a + # connection failure surfaces here instead of being silently dropped. + errors: List[BaseException] = [] handler = CallbackRunner( - self.client, self.name, view_name, callback, lambda exception: None, event + self.client, self.name, view_name, callback, errors.append, event ) handler.start() event.wait() + if errors: + raise errors[0] def wait_for_completion( self, force_stop: bool = False, timeout_s: float | None = None diff --git a/python/feldera/rest/_httprequests.py b/python/feldera/rest/_httprequests.py index 53f311bf542..d8f09e89530 100644 --- a/python/feldera/rest/_httprequests.py +++ b/python/feldera/rest/_httprequests.py @@ -101,10 +101,14 @@ def _custom_wait(self, retry_state: "RetryCallState") -> float: """ Compute the wait between retries. Branches by exception type: - `Retry-After` header (if any) wins, capped at `max_backoff`. - - 502: probe `/cluster_healthz`. If the cluster is healthy the 502 - is treated as spurious — return 0 so the retry runs immediately. - Otherwise return the configured `unhealthy_backoff` (a flat wait - while an upgrade or restart completes). + - 502: probe `/cluster_healthz`. If the cluster is unhealthy, + return the configured `unhealthy_backoff` (a flat wait while an + upgrade or restart completes). If the cluster is healthy, treat + the first 502 as spurious — return 0 so the retry runs + immediately. If 502s persist (e.g., a brief window while gateway + routes propagate after a pipeline restart), fall through to the + exponential backoff, so the remaining retries spread across the + outage instead of all firing inside it. - Everything else: exponential backoff plus optional jitter. """ cfg = self.config.retry_config @@ -115,14 +119,18 @@ def _custom_wait(self, retry_state: "RetryCallState") -> float: return min(float(retry_after), cfg.max_backoff) if _is_502(exc): - if self._check_cluster_health(): + if not self._check_cluster_health(): + logging.info( + "Cluster unhealthy; backing off %.1fs before retrying 502", + cfg.unhealthy_backoff, + ) + return cfg.unhealthy_backoff + if retry_state.attempt_number <= 1: logging.info("Cluster healthy — treating 502 as spurious") return 0.0 logging.info( - "Cluster unhealthy; backing off %.1fs before retrying 502", - cfg.unhealthy_backoff, + "Cluster healthy but 502s persist; falling back to exponential backoff" ) - return cfg.unhealthy_backoff backoff = wait_exponential( multiplier=cfg.initial_backoff, @@ -140,11 +148,14 @@ def _do_single_request( data: Any, params: Optional[Mapping[str, Any]], stream: bool, + timeout: Optional[tuple[Optional[float], Optional[float]]] = None, ) -> Any: response = http_method( request_path, data=data, - timeout=(self.config.connection_timeout, self.config.timeout), + timeout=timeout + if timeout is not None + else (self.config.connection_timeout, self.config.timeout), headers=self.headers, params=params, stream=stream, @@ -165,6 +176,7 @@ def send_request( params: Optional[Mapping[str, Any]] = None, stream: bool = False, serialize: bool = True, + timeout: Optional[tuple[Optional[float], Optional[float]]] = None, ) -> Any: """ :param http_method: The HTTP method to use. Takes the equivalent `requests.*` module. (Example: `requests.get`) @@ -174,6 +186,9 @@ def send_request( :param params: The query parameters part of this request. :param stream: True if the response is expected to be a HTTP stream. :param serialize: True if the body needs to be serialized to JSON. + :param timeout: A `(connection timeout, read timeout)` pair, in + seconds, overriding the client-wide timeout configuration for + this request only. `None` elements mean "wait indefinitely". Send an HTTP request, retrying transient failures per the client's `RetryConfig`. @@ -182,9 +197,10 @@ def send_request( - Status codes in `retry_config.retryable_status_codes` (default 408, 429, 502, 503, 504) and connection/read timeouts retry. - 502 probes `/cluster_healthz` to distinguish a spurious gateway - error (cluster healthy → retry immediately) from a real outage - (cluster unhealthy → wait `unhealthy_backoff` seconds before - retrying). + error from a real outage. With a healthy cluster, the first 502 + retries immediately and persistent 502s use exponential backoff; + with an unhealthy cluster, each retry waits `unhealthy_backoff` + seconds. - Other retryable failures use exponential backoff with optional jitter; a server-supplied `Retry-After` header overrides it (capped at `max_backoff`). @@ -220,7 +236,7 @@ def send_request( for attempt in retryer: with attempt: return self._do_single_request( - http_method, request_path, data, params, stream + http_method, request_path, data, params, stream, timeout ) except requests.exceptions.Timeout as err: raise FelderaTimeoutError(str(err)) from err @@ -245,6 +261,7 @@ def post( params: Optional[Mapping[str, Any]] = None, stream: bool = False, serialize: bool = True, + timeout: Optional[tuple[Optional[float], Optional[float]]] = None, ) -> Any: return self.send_request( requests.post, @@ -254,6 +271,7 @@ def post( params, stream=stream, serialize=serialize, + timeout=timeout, ) def patch( diff --git a/python/feldera/rest/feldera_client.py b/python/feldera/rest/feldera_client.py index 90bca15f599..effb27d1cdc 100644 --- a/python/feldera/rest/feldera_client.py +++ b/python/feldera/rest/feldera_client.py @@ -1162,6 +1162,7 @@ def listen_to_pipeline( array: bool = False, timeout: Optional[float] = None, case_sensitive: bool = False, + read_timeout: Optional[float] = 120.0, ): """ Listen for updates to views for pipeline, yields the chunks of data @@ -1181,6 +1182,13 @@ def listen_to_pipeline( :param timeout: The amount of time in seconds to listen to the stream for :param case_sensitive: True if the table name is case sensitive or a reserved keyword, False by default + :param read_timeout: The maximum time in seconds to wait for the server + to produce the next piece of the stream, including the initial + response. The pipeline emits a heartbeat chunk every few seconds + even when there is no data, so this timeout fires only when the + connection or the server is stalled; it converts an otherwise + indefinite hang into an error. Set to None to wait indefinitely. + The default is 120 seconds. """ params = { @@ -1200,6 +1208,7 @@ def listen_to_pipeline( path=f"/pipelines/{quote(pipeline_name, safe='')}/egress/{quote(table_name, safe='')}", params=params, stream=True, + timeout=(self.config.connection_timeout, read_timeout), ) end = time.monotonic() + timeout if timeout else None diff --git a/python/feldera/rest/retry.py b/python/feldera/rest/retry.py index b2977042477..7205fc73beb 100644 --- a/python/feldera/rest/retry.py +++ b/python/feldera/rest/retry.py @@ -20,10 +20,13 @@ class RetryConfig: plus a uniform random `[0, jitter)` term, where `n` is the zero-based retry index. - 502 uses cluster-aware backoff: the client probes - `/cluster_healthz`; if the cluster is healthy, the 502 is treated as - spurious and the next retry runs immediately (wait = 0). If the - cluster reports unhealthy (e.g. an upgrade is in progress), the next - retry waits `unhealthy_backoff` seconds. + `/cluster_healthz`. If the cluster reports unhealthy (e.g. an + upgrade is in progress), the next retry waits `unhealthy_backoff` + seconds. If the cluster is healthy, the first 502 is treated as + spurious and retried immediately (wait = 0); if 502s persist + (e.g. a brief outage while gateway routes propagate), subsequent + retries use the exponential backoff above, so they spread across + the outage instead of all firing inside it. - A server-supplied `Retry-After` header always overrides the computed wait (capped at `max_backoff`). diff --git a/python/tests/unit/test_httprequests_retry.py b/python/tests/unit/test_httprequests_retry.py index c28cadb75a8..feda57688ab 100644 --- a/python/tests/unit/test_httprequests_retry.py +++ b/python/tests/unit/test_httprequests_retry.py @@ -351,6 +351,37 @@ def test_spurious_502_retries_immediately(self): assert health.call_count == 1 sleeper.assert_called_once_with(0.0) + def test_persistent_healthy_502_backs_off_exponentially(self): + """ + With a healthy cluster, only the first 502 retries immediately. + Persistent 502s (e.g., a brief outage while gateway routes + propagate) fall back to the exponential schedule, so the remaining + retries spread across the outage instead of all firing inside it. + """ + cfg = RetryConfig( + max_retries=3, + initial_backoff=1.0, + max_backoff=8.0, + multiplier=2.0, + jitter=0.0, + ) + client = _make_client(cfg) + with patch_requests("get", [_make_response(502)] * 4) as m: + with mock.patch.object( + client, "_check_cluster_health", return_value=True + ) as health: + with mock.patch("tenacity.nap.time.sleep") as sleeper: + with pytest.raises(FelderaAPIError) as exc_info: + client.get("/foo") + assert exc_info.value.status_code == 502 + assert m.call_count == 4 + # One probe per computed wait. Tenacity computes the wait once more + # after the final attempt before stopping, hence >= rather than ==. + assert health.call_count >= 3 + slept = [call.args[0] for call in sleeper.call_args_list] + # Immediate first retry, then the documented exponential schedule. + assert slept == [0.0, 2.0, 4.0] + def test_unhealthy_502_uses_unhealthy_backoff(self): """Unhealthy cluster + 502 → wait function returns unhealthy_backoff.""" cfg = RetryConfig( diff --git a/python/tests/unit/test_output_handler.py b/python/tests/unit/test_output_handler.py new file mode 100644 index 00000000000..cbd2f0a03d9 --- /dev/null +++ b/python/tests/unit/test_output_handler.py @@ -0,0 +1,180 @@ +"""Tests for the change-stream listener (`OutputHandler` / `CallbackRunner`). + +These tests exercise the connection phase of `Pipeline.listen()` and +`Pipeline.foreach_chunk()` against a mocked REST client. In particular, they +pin down the regression where a failure to establish the egress connection +left the caller blocked forever on an event that no one would ever set +(observed as a 1-hour `pytest` timeout in CI inside `pipeline.listen()`). +""" + +from __future__ import annotations + +from threading import Thread +from typing import Any, Callable, Iterable, Mapping, Optional +from unittest import mock + +import pytest + +from feldera.enums import PipelineStatus +from feldera.output_handler import OutputHandler +from feldera.pipeline import Pipeline +from feldera.rest.errors import FelderaTimeoutError +from feldera.rest.sql_table import SQLTable + +# Generous bound on calls that must complete quickly; reached only when the +# code under test hangs, which is precisely the regression being tested. +WATCHDOG_TIMEOUT_S = 60.0 + +ID_FIELD = { + "name": "id", + "case_sensitive": False, + "columntype": {"type": "INTEGER", "nullable": False}, +} + +HEARTBEAT_CHUNK: Mapping[str, Any] = {"sequence_number": 0, "snapshot": False} + +DATA_CHUNK: Mapping[str, Any] = { + "sequence_number": 1, + "snapshot": False, + "json_data": [{"insert": {"id": 1}}, {"insert": {"id": 2}}], +} + + +def _make_client( + listen_side_effect: Optional[BaseException] = None, + chunks: Optional[Iterable[Mapping[str, Any]]] = None, +) -> mock.Mock: + """ + Build a mock `FelderaClient` whose `listen_to_pipeline` either raises + `listen_side_effect` or returns a generator factory yielding `chunks`. + """ + + inner = mock.Mock() + inner.name = "test_pipeline" + inner.tables = [SQLTable("t1", fields=[ID_FIELD])] + inner.views = [] + + client = mock.Mock() + client.get_pipeline.return_value = inner + + if listen_side_effect is not None: + client.listen_to_pipeline.side_effect = listen_side_effect + else: + + def factory(): + yield from chunks or [] + + client.listen_to_pipeline.return_value = factory + + return client + + +def _call_with_watchdog(fn: Callable[[], Any]) -> Mapping[str, Any]: + """ + Run `fn` on a daemon thread and fail the test if it does not finish in + time, so that a hang in the code under test cannot hang the test suite. + + Returns {"returned": value} or {"raised": exception}. + """ + + outcome: dict[str, Any] = {} + + def target(): + try: + outcome["returned"] = fn() + except BaseException as e: + outcome["raised"] = e + + thread = Thread(target=target, daemon=True) + thread.start() + thread.join(WATCHDOG_TIMEOUT_S) + if thread.is_alive(): + pytest.fail("call did not complete: the listener is hanging") + return outcome + + +def test_start_raises_when_connection_fails(): + """ + An egress request that fails before the first chunk arrives must raise + from `start()`, not leave the caller blocked forever. + """ + + error = FelderaTimeoutError("egress request timed out") + client = _make_client(listen_side_effect=error) + handler = OutputHandler(client, "test_pipeline", "t1") + + outcome = _call_with_watchdog(handler.start) + + assert outcome.get("raised") is error + + +def test_start_raises_when_first_read_fails(): + """A connection that dies during the first body read must also raise.""" + + error = ConnectionError("connection reset by peer") + + def factory(): + raise error + yield # pragma: no cover -- makes `factory` a generator + + inner_client = _make_client(chunks=[]) + inner_client.listen_to_pipeline.return_value = factory + handler = OutputHandler(inner_client, "test_pipeline", "t1") + + outcome = _call_with_watchdog(handler.start) + + assert outcome.get("raised") is error + + +def test_start_returns_when_stream_ends_before_first_chunk(): + """ + A stream that closes cleanly before the first chunk (e.g., the pipeline + stopped concurrently) is not an error: `start()` returns and the listener + observes no output. + """ + + client = _make_client(chunks=[]) + handler = OutputHandler(client, "test_pipeline", "t1") + + outcome = _call_with_watchdog(handler.start) + + assert "raised" not in outcome + assert handler.to_dict() == [] + + +def test_start_unblocks_on_heartbeat_and_buffers_data(): + """ + The first chunk is normally an empty heartbeat; it must unblock `start()` + without producing output, and subsequent data chunks must be buffered. + """ + + client = _make_client(chunks=[HEARTBEAT_CHUNK, DATA_CHUNK]) + handler = OutputHandler(client, "test_pipeline", "t1") + + outcome = _call_with_watchdog(handler.start) + assert "raised" not in outcome + + # The data chunk is processed asynchronously; wait for the runner thread + # to drain the (finite) stream before checking the buffer. + handler.handler.join(WATCHDOG_TIMEOUT_S) + assert not handler.handler.is_alive() + + assert handler.to_dict() == [ + {"id": 1, "insert_delete": 1}, + {"id": 2, "insert_delete": 1}, + ] + + +def test_foreach_chunk_raises_when_connection_fails(): + """`foreach_chunk` must propagate a failed egress connection, not hang.""" + + error = FelderaTimeoutError("egress request timed out") + client = _make_client(listen_side_effect=error) + pipeline = Pipeline._from_inner(client.get_pipeline.return_value, client) + + with mock.patch.object(Pipeline, "status", return_value=PipelineStatus.RUNNING): + outcome = _call_with_watchdog( + lambda: pipeline.foreach_chunk("t1", lambda df, seq: None) + ) + + assert outcome.get("raised") is error