fix: prevent deadlock when main thread puts on full queue#977
fix: prevent deadlock when main thread puts on full queue#977devin-ai-integration[bot] wants to merge 1 commit intomainfrom
Conversation
The main thread is the sole consumer of the shared queue. In 3 code paths from _handle_item, it also produces into the queue via ConcurrentMessageRepository.emit_message() -> queue.put(). When the queue is full, this blocks forever — the main thread deadlocks on its own queue. Fix: detect when the caller is the main thread (via threading.get_ident) and use non-blocking put(block=False). If the queue is Full, buffer the message in a deque. Buffered messages are drained via consume_queue(), which the main thread already calls after processing every queue item. Worker threads are unchanged — they still use blocking put() for normal backpressure. Deadlock paths fixed: 1. PartitionCompleteSentinel -> _on_stream_is_done -> ensure_at_least_one_state_emitted -> emit_message -> queue.put(state) 2. PartitionGenerationCompletedSentinel -> _on_stream_is_done -> same 3. Partition -> on_partition -> emit_message(slice_log) -> queue.put(log) Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1775123605-deadlock-fix-nonblocking-put#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1775123605-deadlock-fix-nonblocking-putPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
|
/prerelease
|
|
Anatolii Yatsuk (@tolik0) This is great! Have you been able to test this CDK change on specific connections that were experiencing deadlock? |
Summary
Fixes a deadlock in the concurrent source where the main thread (sole queue consumer) blocks on
queue.put()when the shared queue is full. Since the main thread is the only consumer, it can never unblock itself — classic self-deadlock.Root cause: Three code paths in
_handle_itemcause the main thread to produce into its own queue viaConcurrentMessageRepository.emit_message()→queue.put():PartitionCompleteSentinel→_on_stream_is_done→ensure_at_least_one_state_emitted→queue.put(state)PartitionGenerationCompletedSentinel→ same pathPartition→on_partition→emit_message(slice_log)→queue.put(log)Fix: The main thread now uses
put(block=False). If the queue isFull, the message is buffered in adeque(_pending) and drained viaconsume_queue(), which the main thread already calls after processing every queue item. Worker threads are unchanged (blockingput()for backpressure). Thread detection usesthreading.get_ident()captured at construction time.Review & Testing Checklist for Human
_pendingis main-thread-only:_put_on_queueonly appends to_pendingwhenget_ident() == _consumer_thread_id, andconsume_queue()is only called from the main thread's processing loop. If this invariant holds, no lock is needed. If a worker thread could ever callconsume_queue(), this would be a data race.consume_queue()is called frequently enough: Previously a no-op (yield from []), it now yields buffered messages. Confirm callers inconcurrent_read_processor.py(on_record,on_partition_complete_sentinel,_on_stream_is_done) all doyield from self._message_repository.consume_queue()so buffered messages are drained promptly.emit_messagefrom the "main" thread, and verifies the message lands in_pendingand is yielded byconsume_queue().Notes
threading.get_ident()is captured once in__init__, which is assumed to always run on the main thread. This is true today becauseConcurrentSource.__init__and the declarative source constructor both run on the main thread.dequethread-safety comment references CPython's GIL, but in practice_pendingis only accessed from a single thread (main), so it's safe regardless of GIL guarantees.consume_queue()was previously a dead no-op. It now has real behavior — any code that calls it will start receiving buffered messages, which is the intended design.Link to Devin session: https://app.devin.ai/sessions/ad184113df474f0ba37ede09cdac7eaf