py: don't hang forever when listen() fails to connect#6458
Conversation
When the egress request failed before the first chunk arrived, the CallbackRunner thread recorded the exception but never set the event that unblocks the caller, leaving Pipeline.listen() and foreach_chunk() blocked forever and the error invisible. In CI, where runtime tests run against a remote deployment, a transient failure right after a pipeline restart turned into a 1-hour pytest timeout inside listen() (test_transactions.py::test_dynamic_output_connector). - Always set the event when the listener thread exits, and re-raise the recorded exception from OutputHandler.start(), so listen() fails loudly instead of hanging. - Propagate listener exceptions from foreach_chunk(), which previously discarded them. - Treat a stream that closes cleanly before the first chunk as "no output" rather than an error. - Bound the egress stream reads with a 120s read timeout (the server emits a heartbeat chunk every ~3s, so silence that long means a dead or stalled connection). Overridable via the new read_timeout parameter of listen_to_pipeline(). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The 502 retry path treated every 502 as spurious while the cluster reported healthy and retried with zero wait, so all retries could fire inside the same sub-second 502 window (e.g., while gateway routes propagate after a pipeline restart) and exhaust without ever giving the outage time to clear -- the very case the retries exist for. Keep the immediate retry for the first 502, which preserves the cheap recovery from a one-off gateway hiccup, but fall back to the standard exponential backoff when 502s persist, so the remaining retries spread across the outage window. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
mythical-fred
left a comment
There was a problem hiding this comment.
High-level review (draft, Claude-generated patch — flagging things @abhizer should look at before taking over):
Real bug fix, correct shape. The core issue — OutputHandler.start() blocks on event.wait() forever when the listener thread dies before producing a chunk — is genuinely fixed by the finally: self.event.set() in _callback_runner.run() plus surfacing self.exception in OutputHandler.start(). The ordering ("exception stored before event is set") is correct because exception_callback(e) runs before the finally block. pipeline.foreach_chunk got the same treatment via a one-element list collector. Good.
StopIteration handling. Catching StopIteration from the iterator (server closed stream before first chunk, e.g., pipeline stopped) and treating it as a no-op is reasonable, but worth confirming this is the actual exception the requests/iter_chunks stream raises in that scenario — requests is more likely to yield nothing and exit the for loop normally. If StopIteration never actually fires here, the branch is dead and should be dropped; if it does, a comment explaining how requests produces it would help.
Default read_timeout=120.0 on listen_to_pipeline is a behavior change. Previously this method blocked indefinitely on a stalled stream; now any 2-minute gap raises. The docstring claims "the pipeline emits a heartbeat chunk every few seconds even when there is no data," and that's true for /egress — but worth double-checking that the heartbeat interval is well under 120s under load/backpressure and that no downstream user (CI, examples, customer scripts) relies on indefinite blocking. If you want to be conservative, default to None (preserve old behavior) and have listen()/foreach_chunk opt in to a finite timeout. At minimum, this default-change needs a release-note line.
502 retry control flow got more subtle. The new _custom_wait for 502 is: unhealthy → return unhealthy_backoff; healthy & first retry → return 0; healthy & subsequent retries → fall through to exponential. That's correct, but the fall-through (no explicit return) is the kind of thing that breaks the next time someone edits it. Either return the computed backoff explicitly inside the 502 branch, or restructure as a single match. Also: retry_state.attempt_number <= 1 — tenacity's attempt_number is 1-based and the first call is 1, so <= 1 means "the very first retry", which matches the intent. Worth a one-line comment.
Test consolidation, as you flagged. test_output_handler.py adds 180 lines; if each test spins up a pipeline, that's expensive. These are unit-style tests of OutputHandler/CallbackRunner exception plumbing — they should be mockable against a fake client and not need a real pipeline at all. Same for test_httprequests_retry.py.
Minor. except BaseException as e: in _callback_runner.run() is pre-existing but worth narrowing to Exception while you're touching the block — catching KeyboardInterrupt/SystemExit here is a smell.
Not blocking anything since this is a draft; flagging for the takeover.
@abhizer , this PR was created by claude when investigating https://github.com/feldera/feldera/actions/runs/27343742976/job/80788349384.
I did not try to validate what it did. Can you take over this PR please? One thing I notice is that it added quite a few tests, each creating a pipeline. Hopefully there's a way to consolidate them into fewer pipelines.