Skip to content

Commit 4a65149

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
fix: add recursion guard to StreamSlicerMeta.__instancecheck__
The metaclass __instancecheck__ method could recurse infinitely when isinstance() was called on objects with wrapped_slicer attributes that chain or cycle back. This adds a thread-safe recursion guard using a set of in-progress object IDs and uses object.__getattribute__ to bypass custom __getattr__ methods that could trigger further isinstance calls. Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent 4aaafcf commit 4a65149

2 files changed

Lines changed: 71 additions & 5 deletions

File tree

airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
22

3+
import threading
34
from abc import ABC, ABCMeta, abstractmethod
45
from typing import Any, Iterable
56

@@ -16,12 +17,28 @@ class StreamSlicerMeta(ABCMeta):
1617
isinstance(declarative_stream.retriever.stream_slicer,(GlobalSubstreamCursor, PerPartitionWithGlobalCursor))
1718
"""
1819

19-
def __instancecheck__(cls, instance: Any) -> bool:
20-
# Check if it's our wrapper with matching wrapped class
21-
if hasattr(instance, "wrapped_slicer"):
22-
return isinstance(instance.wrapped_slicer, cls)
20+
_checking: threading.local = threading.local()
2321

24-
return super().__instancecheck__(instance)
22+
def __instancecheck__(cls, instance: Any) -> bool:
23+
if not hasattr(cls._checking, "in_progress"):
24+
cls._checking.in_progress = set()
25+
26+
instance_id = id(instance)
27+
if instance_id in cls._checking.in_progress:
28+
return super().__instancecheck__(instance)
29+
30+
# Use object.__getattribute__ to bypass any custom __getattr__ that
31+
# could trigger further isinstance() calls and cause recursion.
32+
try:
33+
wrapped = object.__getattribute__(instance, "wrapped_slicer")
34+
except AttributeError:
35+
return super().__instancecheck__(instance)
36+
37+
cls._checking.in_progress.add(instance_id)
38+
try:
39+
return isinstance(wrapped, cls)
40+
finally:
41+
cls._checking.in_progress.discard(instance_id)
2542

2643

2744
class StreamSlicer(ABC, metaclass=StreamSlicerMeta):

unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33
#
44
from datetime import timedelta
5+
from typing import Any, Iterable
56
from unittest.mock import Mock
67

78
import pytest
@@ -251,3 +252,51 @@ def test_slice_limiting_functionality():
251252
slices = list(wrapped_slicer.stream_slices())
252253
assert len(slices) == 3
253254
assert slices == mock_slicer.stream_slices.return_value[:3]
255+
256+
257+
def test_no_recursion_error_with_wrapped_slicer_attribute_on_non_decorator():
258+
"""Verify that isinstance does not cause RecursionError on objects with a wrapped_slicer attribute and custom __getattr__."""
259+
260+
class SlicerWithGetattr(StreamSlicer):
261+
def __init__(self, inner: StreamSlicer) -> None:
262+
self.wrapped_slicer = inner
263+
264+
def stream_slices(self) -> Iterable[StreamSlice]:
265+
return self.wrapped_slicer.stream_slices()
266+
267+
def __getattr__(self, name: str) -> Any:
268+
return getattr(self.wrapped_slicer, name)
269+
270+
inner = SinglePartitionRouter(parameters={})
271+
outer = SlicerWithGetattr(inner)
272+
273+
# This would raise RecursionError before the fix
274+
assert isinstance(outer, StreamSlicer)
275+
assert isinstance(outer, SlicerWithGetattr)
276+
assert not isinstance(outer, SubstreamPartitionRouter)
277+
278+
279+
def test_no_recursion_error_with_nested_decorators():
280+
"""Verify that double-wrapping a slicer does not cause RecursionError."""
281+
inner = SinglePartitionRouter(parameters={})
282+
first_wrap = StreamSlicerTestReadDecorator(wrapped_slicer=inner, maximum_number_of_slices=5)
283+
second_wrap = StreamSlicerTestReadDecorator(
284+
wrapped_slicer=first_wrap, maximum_number_of_slices=3
285+
)
286+
287+
assert isinstance(second_wrap, SinglePartitionRouter)
288+
assert isinstance(second_wrap, StreamSlicerTestReadDecorator)
289+
assert isinstance(first_wrap, SinglePartitionRouter)
290+
assert not isinstance(second_wrap, SubstreamPartitionRouter)
291+
292+
293+
def test_no_recursion_error_with_self_referencing_wrapped_slicer():
294+
"""Verify that a circular wrapped_slicer reference does not cause RecursionError."""
295+
mock_slicer = Mock(spec=StreamSlicer)
296+
wrapped = StreamSlicerTestReadDecorator(wrapped_slicer=mock_slicer, maximum_number_of_slices=5)
297+
298+
# Simulate a pathological cycle: wrapped_slicer points back to itself
299+
object.__setattr__(wrapped, "wrapped_slicer", wrapped)
300+
301+
# The key assertion: this must not raise RecursionError
302+
assert isinstance(wrapped, StreamSlicerTestReadDecorator)

0 commit comments

Comments
 (0)