Skip to content
Merged
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 132 additions & 87 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
except ImportError:
Omit = None

from anthropic import Stream, AsyncStream
from anthropic.resources import AsyncMessages, Messages

if TYPE_CHECKING:
Expand All @@ -45,10 +46,12 @@
raise DidNotEnable("Anthropic not installed")

if TYPE_CHECKING:
from typing import Any, AsyncIterator, Iterator, List, Optional, Union
from typing import Any, AsyncIterator, Iterator, List, Optional, Union, Callable
from sentry_sdk.tracing import Span
from sentry_sdk._types import TextPart

from anthropic.types import RawMessageStreamEvent


class _RecordedUsage:
output_tokens: int = 0
Expand All @@ -72,6 +75,129 @@
Messages.create = _wrap_message_create(Messages.create)
AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create)

Stream.__iter__ = _wrap_stream_iter(Stream.__iter__)
AsyncStream.__aiter__ = _wrap_async_stream_aiter(AsyncStream.__aiter__)


def _wrap_stream_iter(
f: "Callable[..., Iterator[RawMessageStreamEvent]]",
) -> "Callable[..., Iterator[RawMessageStreamEvent]]":
"""
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span.
"""

@wraps(f)
def _patched_iter(self: "Stream") -> "Iterator[RawMessageStreamEvent]":
if not hasattr(self, "_sentry_span"):
for event in f(self):
yield event
Comment thread
alexander-alderman-webb marked this conversation as resolved.
Outdated
return
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated

model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

for event in f(self):
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

Check warning on line 112 in sentry_sdk/integrations/anthropic.py

View workflow job for this annotation

GitHub Actions / warden: find-bugs

Stream iterator lacks exception handling, causing span leaks on errors

The `_wrap_stream_iter` function does not wrap the iteration loop in try/except/finally. If an exception occurs during `for event in f(self):` or in `_collect_ai_data`, the span created by `_sentry_patched_create_common` will never be closed (`span.__exit__` won't be called), and the exception won't be captured. This contrasts with `_wrap_message_create` which properly handles exceptions using try/except and calls `_capture_exception`. The same issue affects `_wrap_async_stream_aiter`.
Comment thread
github-actions[bot] marked this conversation as resolved.
Outdated

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

span = self._sentry_span
integration = self._integration

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

return f(self)

Check warning on line 137 in sentry_sdk/integrations/anthropic.py

View workflow job for this annotation

GitHub Actions / warden: find-bugs

Unreachable return statement creates new iterator after stream exhaustion

In `_wrap_stream_iter`, after the generator yields all events and calls `_set_output_data`, line 137 executes `return f(self)`. In a generator function, `return` with a value sets `StopIteration.value`, which is typically ignored by consumers using `for event in stream:`. This return statement creates a new iterator by calling the original `__iter__` on the already-exhausted stream object, which is both wasteful and semantically incorrect. The async version (`_wrap_async_stream_aiter`) correctly omits any return statement after `_set_output_data`.

Check failure on line 137 in sentry_sdk/integrations/anthropic.py

View workflow job for this annotation

GitHub Actions / warden: code-review

Return statement in generator calls f(self) again, causing double iteration attempt and lost return value

In `_patched_iter`, after yielding all events and calling `_set_output_data`, the code executes `return f(self)` at line 137. This is problematic for two reasons: (1) In a generator function, the return value is placed in `StopIteration.value` which is ignored when iterating with `for ... in`, so the value is effectively lost; (2) Calling `f(self)` attempts to restart iteration on the stream, which will either fail (most streams are single-use) or return an empty iterator. This line should simply be removed.
Comment thread
github-actions[bot] marked this conversation as resolved.
Outdated

return _patched_iter


def _wrap_async_stream_aiter(
f: "Callable[..., AsyncIterator[RawMessageStreamEvent]]",
) -> "Callable[..., AsyncIterator[RawMessageStreamEvent]]":
"""
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span.
"""

@wraps(f)
async def _patched_aiter(
self: "AsyncStream",
) -> "AsyncIterator[RawMessageStreamEvent]":
if not hasattr(self, "_sentry_span"):
async for event in f(self):
yield event
return

model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

async for event in f(self):
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

Check warning on line 174 in sentry_sdk/integrations/anthropic.py

View workflow job for this annotation

GitHub Actions / warden: find-bugs

[2AS-F8T] Stream iterator lacks exception handling, causing span leaks on errors (additional location)

The `_wrap_stream_iter` function does not wrap the iteration loop in try/except/finally. If an exception occurs during `for event in f(self):` or in `_collect_ai_data`, the span created by `_sentry_patched_create_common` will never be closed (`span.__exit__` won't be called), and the exception won't be captured. This contrasts with `_wrap_message_create` which properly handles exceptions using try/except and calls `_capture_exception`. The same issue affects `_wrap_async_stream_aiter`.

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

span = self._sentry_span
integration = self._integration

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

return _patched_aiter


def _capture_exception(exc: "Any") -> None:
set_span_errored()
Expand Down Expand Up @@ -415,6 +541,11 @@

result = yield f, args, kwargs

if isinstance(result, Stream) or isinstance(result, AsyncStream):
result._sentry_span = span
result._integration = integration
return result

with capture_internal_exceptions():
if hasattr(result, "content"):
(
Expand Down Expand Up @@ -444,92 +575,6 @@
content_blocks=content_blocks,
finish_span=True,
)

# Streaming response
elif hasattr(result, "_iterator"):
old_iterator = result._iterator

def new_iterator() -> "Iterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

async def new_iterator_async() -> "AsyncIterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

async for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

if str(type(result._iterator)) == "<class 'async_generator'>":
result._iterator = new_iterator_async()
else:
result._iterator = new_iterator()

else:
span.set_data("unknown_response", True)
span.__exit__(None, None, None)
Expand Down
Loading