Skip to content

Commit 0344741

Browse files
fix: prevent deadlock when main thread puts on full queue
The main thread is the sole consumer of the shared queue. In 3 code paths from _handle_item, it also produces into the queue via ConcurrentMessageRepository.emit_message() -> queue.put(). When the queue is full, this blocks forever — the main thread deadlocks on its own queue. Fix: detect when the caller is the main thread (via threading.get_ident) and use non-blocking put(block=False). If the queue is Full, buffer the message in a deque. Buffered messages are drained via consume_queue(), which the main thread already calls after processing every queue item. Worker threads are unchanged — they still use blocking put() for normal backpressure. Deadlock paths fixed: 1. PartitionCompleteSentinel -> _on_stream_is_done -> ensure_at_least_one_state_emitted -> emit_message -> queue.put(state) 2. PartitionGenerationCompletedSentinel -> _on_stream_is_done -> same 3. Partition -> on_partition -> emit_message(slice_log) -> queue.put(log) Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
1 parent 0b94cbe commit 0344741

1 file changed

Lines changed: 46 additions & 10 deletions

File tree

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
22
import logging
3-
import os
4-
from queue import Queue
3+
import threading
4+
from collections import deque
5+
from queue import Full, Queue
56
from typing import Callable, Iterable
67

78
from airbyte_cdk.models import AirbyteMessage, Level
8-
from airbyte_cdk.models import Type as MessageType
99
from airbyte_cdk.sources.message.repository import LogMessage, MessageRepository
1010
from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem
1111

@@ -23,25 +23,61 @@ class ConcurrentMessageRepository(MessageRepository):
2323
2424
This is particularly important for the connector builder which relies on grouping
2525
of messages to organize request/response, pages, and partitions.
26+
27+
DEADLOCK PREVENTION:
28+
The main thread is the sole consumer of the shared queue. If it calls queue.put()
29+
while the queue is full, it deadlocks — nobody else will drain the queue.
30+
This happens in 3 code paths from _handle_item:
31+
1. PartitionCompleteSentinel → _on_stream_is_done → ensure_at_least_one_state_emitted → emit_message → queue.put(state)
32+
2. PartitionGenerationCompletedSentinel → _on_stream_is_done → same path
33+
3. Partition → on_partition → emit_message(slice_log) → queue.put(log)
34+
To prevent this, the main thread uses non-blocking put(block=False). If the queue
35+
is full, messages are buffered in _pending and drained via consume_queue(), which
36+
the main thread calls after processing every queue item.
37+
Worker threads continue using blocking put() for normal backpressure.
2638
"""
2739

2840
def __init__(self, queue: Queue[QueueItem], message_repository: MessageRepository):
2941
self._queue = queue
3042
self._decorated_message_repository = message_repository
43+
# Capture the thread ID of the consumer (main thread) at construction time.
44+
# This is always the main thread because ConcurrentSource.__init__ (and the
45+
# declarative source that creates this repository) runs on the main thread.
46+
self._consumer_thread_id = threading.get_ident()
47+
# Buffer for messages that couldn't be put on the queue from the main thread
48+
# because the queue was full. Drained by consume_queue().
49+
# deque.append() and deque.popleft() are atomic in CPython (GIL-protected).
50+
self._pending: deque[AirbyteMessage] = deque()
51+
52+
def _put_on_queue(self, message: AirbyteMessage) -> None:
53+
"""Put a message on the shared queue, with deadlock prevention for the main thread."""
54+
if threading.get_ident() == self._consumer_thread_id:
55+
# Main thread (consumer): non-blocking to prevent self-deadlock.
56+
# If queue is full, buffer the message — it will be drained via consume_queue().
57+
try:
58+
self._queue.put(message, block=False)
59+
except Full:
60+
self._pending.append(message)
61+
else:
62+
# Worker thread: blocking put for normal backpressure.
63+
self._queue.put(message)
3164

3265
def emit_message(self, message: AirbyteMessage) -> None:
3366
self._decorated_message_repository.emit_message(message)
34-
for message in self._decorated_message_repository.consume_queue():
35-
self._queue.put(message)
67+
for msg in self._decorated_message_repository.consume_queue():
68+
self._put_on_queue(msg)
3669

3770
def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
3871
self._decorated_message_repository.log_message(level, message_provider)
39-
for message in self._decorated_message_repository.consume_queue():
40-
self._queue.put(message)
72+
for msg in self._decorated_message_repository.consume_queue():
73+
self._put_on_queue(msg)
4174

4275
def consume_queue(self) -> Iterable[AirbyteMessage]:
4376
"""
44-
This method shouldn't need to be called because as part of emit_message() we are already
45-
loading messages onto the queue processed on the main thread.
77+
Drain any messages that were buffered because the queue was full when the
78+
main thread tried to put them. This is called by the main thread after
79+
processing every queue item (in on_record, on_partition_complete_sentinel,
80+
_on_stream_is_done), ensuring buffered messages are yielded promptly.
4681
"""
47-
yield from []
82+
while self._pending:
83+
yield self._pending.popleft()

0 commit comments

Comments
 (0)