Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 60 additions & 21 deletions sentry_sdk/integrations/redis/_async_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
_set_pipeline_data,
)
from sentry_sdk.tracing import Span
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import capture_internal_exceptions

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any, Union
from typing import Any, Optional, Union
from sentry_sdk.traces import StreamedSpan
from redis.asyncio.client import Pipeline, StrictRedis
from redis.asyncio.cluster import ClusterPipeline, RedisCluster

Expand All @@ -26,21 +28,36 @@
pipeline_cls: "Union[type[Pipeline[Any]], type[ClusterPipeline[Any]]]",
is_cluster: bool,
get_command_args_fn: "Any",
set_db_data_fn: "Callable[[Span, Any], None]",
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
old_execute = pipeline_cls.execute

from sentry_sdk.integrations.redis import RedisIntegration

async def _sentry_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
if sentry_sdk.get_client().get_integration(RedisIntegration) is None:
client = sentry_sdk.get_client()
if client.get_integration(RedisIntegration) is None:
return await old_execute(self, *args, **kwargs)

with sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
) as span:
span_streaming = has_span_streaming_enabled(client.options)

span: "Union[Span, StreamedSpan]"
if span_streaming:
span = sentry_sdk.traces.start_span(
name="redis.pipeline.execute",
attributes={
"sentry.origin": SPAN_ORIGIN,
"sentry.op": OP.DB_REDIS,
},
)
else:
span = sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
)

with span:
with capture_internal_exceptions():
try:
command_seq = self._execution_strategy._command_queue
Expand All @@ -67,7 +84,7 @@
def patch_redis_async_client(
cls: "Union[type[StrictRedis[Any]], type[RedisCluster[Any]]]",
is_cluster: bool,
set_db_data_fn: "Callable[[Span, Any], None]",
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
old_execute_command = cls.execute_command

Expand All @@ -76,36 +93,58 @@
async def _sentry_execute_command(
self: "Any", name: str, *args: "Any", **kwargs: "Any"
) -> "Any":
integration = sentry_sdk.get_client().get_integration(RedisIntegration)
client = sentry_sdk.get_client()
integration = client.get_integration(RedisIntegration)
if integration is None:
return await old_execute_command(self, name, *args, **kwargs)

span_streaming = has_span_streaming_enabled(client.options)

cache_properties = _compile_cache_span_properties(
name,
args,
kwargs,
integration,
)

cache_span = None
cache_span: "Optional[Union[Span, StreamedSpan]]" = None
if cache_properties["is_cache_key"] and cache_properties["op"] is not None:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
if span_streaming:
cache_span = sentry_sdk.traces.start_span(
name=cache_properties["description"],
attributes={
"sentry.op": cache_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
cache_span.__enter__()

db_properties = _compile_db_span_properties(integration, name, args)

db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span: "Union[Span, StreamedSpan]"
if span_streaming:
db_span = sentry_sdk.traces.start_span(
name=db_properties["description"],
attributes={
"sentry.op": db_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span.__enter__()

set_db_data_fn(db_span, self)

Check failure on line 147 in sentry_sdk/integrations/redis/_async_common.py

View check run for this annotation

@sentry/warden / warden: find-bugs

Missing exception handling in async Redis client causes span leak on error

In `_sentry_execute_command`, if `old_execute_command` raises an exception, the manually-entered `db_span` and `cache_span` are never exited via `__exit__`. The sync version properly wraps this in a `try/finally` block (lines 151-161 in `_sync_common.py`), but the async version does not. This leads to resource leaks and orphaned spans when Redis commands fail.

Check failure on line 147 in sentry_sdk/integrations/redis/_async_common.py

View check run for this annotation

@sentry/warden / warden: find-bugs

[HSQ-4YN] Missing exception handling in async Redis client causes span leak on error (additional location)

In `_sentry_execute_command`, if `old_execute_command` raises an exception, the manually-entered `db_span` and `cache_span` are never exited via `__exit__`. The sync version properly wraps this in a `try/finally` block (lines 151-161 in `_sync_common.py`), but the async version does not. This leads to resource leaks and orphaned spans when Redis commands fail.
Comment thread
sentry-warden[bot] marked this conversation as resolved.
_set_client_data(db_span, is_cluster, name, *args)

value = await old_execute_command(self, name, *args, **kwargs)
Expand Down
101 changes: 73 additions & 28 deletions sentry_sdk/integrations/redis/_sync_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,51 @@
_set_pipeline_data,
)
from sentry_sdk.tracing import Span
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import capture_internal_exceptions

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any
from typing import Any, Optional, Union
from sentry_sdk.traces import StreamedSpan


def patch_redis_pipeline(
pipeline_cls: "Any",
is_cluster: bool,
get_command_args_fn: "Any",
set_db_data_fn: "Callable[[Span, Any], None]",
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
old_execute = pipeline_cls.execute

from sentry_sdk.integrations.redis import RedisIntegration

def sentry_patched_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
if sentry_sdk.get_client().get_integration(RedisIntegration) is None:
client = sentry_sdk.get_client()
if client.get_integration(RedisIntegration) is None:
return old_execute(self, *args, **kwargs)

with sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
) as span:
span_streaming = has_span_streaming_enabled(client.options)

span: "Union[Span, StreamedSpan]"
if span_streaming:
span = sentry_sdk.traces.start_span(
name="redis.pipeline.execute",
attributes={
"sentry.origin": SPAN_ORIGIN,
"sentry.op": OP.DB_REDIS,
},
)
else:
span = sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
)

with span:
with capture_internal_exceptions():
command_seq = None
try:
Expand All @@ -61,7 +78,9 @@


def patch_redis_client(
cls: "Any", is_cluster: bool, set_db_data_fn: "Callable[[Span, Any], None]"
cls: "Any",
is_cluster: bool,
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
"""
This function can be used to instrument custom redis client classes or
Expand All @@ -74,45 +93,71 @@
def sentry_patched_execute_command(
self: "Any", name: str, *args: "Any", **kwargs: "Any"
) -> "Any":
integration = sentry_sdk.get_client().get_integration(RedisIntegration)
client = sentry_sdk.get_client()
integration = client.get_integration(RedisIntegration)
if integration is None:
return old_execute_command(self, name, *args, **kwargs)

span_streaming = has_span_streaming_enabled(client.options)

cache_properties = _compile_cache_span_properties(
name,
args,
kwargs,
integration,
)

cache_span = None
cache_span: "Optional[Union[Span, StreamedSpan]]" = None
if cache_properties["is_cache_key"] and cache_properties["op"] is not None:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
if span_streaming:
cache_span = sentry_sdk.traces.start_span(
name=cache_properties["description"],
attributes={
"sentry.op": cache_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
cache_span.__enter__()

db_properties = _compile_db_span_properties(integration, name, args)

db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span: "Union[Span, StreamedSpan]"
if span_streaming:
db_span = sentry_sdk.traces.start_span(
name=db_properties["description"],
attributes={
"sentry.op": db_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span.__enter__()

set_db_data_fn(db_span, self)
_set_client_data(db_span, is_cluster, name, *args)
with capture_internal_exceptions():
set_db_data_fn(db_span, self)
_set_client_data(db_span, is_cluster, name, *args)

value = old_execute_command(self, name, *args, **kwargs)
try:
value = old_execute_command(self, name, *args, **kwargs)
finally:
db_span.__exit__(None, None, None)

db_span.__exit__(None, None, None)
if cache_span:
with capture_internal_exceptions():
_set_cache_data(cache_span, self, cache_properties, value)

Check warning on line 158 in sentry_sdk/integrations/redis/_sync_common.py

View check run for this annotation

@sentry/warden / warden: code-review

Unbound variable 'value' used in finally block when exception occurs

When `old_execute_command` raises an exception, the `value` variable is never assigned. The finally block then attempts to use `value` in `_set_cache_data(cache_span, self, cache_properties, value)`, which will raise a `NameError`. While `capture_internal_exceptions()` will catch this error silently, it prevents proper cache span data from being set and masks the actual exception behavior. The variable should be initialized to `None` before the try block.

Check failure on line 158 in sentry_sdk/integrations/redis/_sync_common.py

View check run for this annotation

@sentry/warden / warden: find-bugs

[HSQ-4YN] Missing exception handling in async Redis client causes span leak on error (additional location)

In `_sentry_execute_command`, if `old_execute_command` raises an exception, the manually-entered `db_span` and `cache_span` are never exited via `__exit__`. The sync version properly wraps this in a `try/finally` block (lines 151-161 in `_sync_common.py`), but the async version does not. This leads to resource leaks and orphaned spans when Redis commands fail.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NameError raised when Redis command fails with exception

In sentry_patched_execute_command, when old_execute_command raises an exception, the value variable is never assigned. The finally block then attempts to use value in _set_cache_data(cache_span, self, cache_properties, value), causing a NameError. While capture_internal_exceptions() catches this, it results in unnecessary internal exception noise and obscures the real error from monitoring.

Verification

Read the full file at _sync_common.py, traced through the try/finally block logic. Verified that value is only assigned on line 152 inside the try block. If an exception is raised, execution jumps to finally where line 158 references value which would be undefined.

Identified by Warden code-review · XZX-J22

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix attempt detected (commit 032c7e2)

The commit reorganized code into a try-finally block and wrapped the problematic _set_cache_data call with capture_internal_exceptions(), but the core issue persists: if old_execute_command raises an exception, value remains undefined when accessed on line 158, causing the same NameError (just silently caught rather than fixed)

The original issue appears unresolved. Please review and try again.

Evaluated by Warden


if cache_span:
_set_cache_data(cache_span, self, cache_properties, value)
cache_span.__exit__(None, None, None)
cache_span.__exit__(None, None, None)

return value

Expand Down
24 changes: 15 additions & 9 deletions sentry_sdk/integrations/redis/modules/caches.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations.redis.utils import _get_safe_key, _key_as_string
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.utils import capture_internal_exceptions

GET_COMMANDS = ("get", "mget")
Expand All @@ -14,7 +15,7 @@
if TYPE_CHECKING:
from sentry_sdk.integrations.redis import RedisIntegration
from sentry_sdk.tracing import Span
from typing import Any, Optional
from typing import Any, Optional, Union


def _get_op(name: str) -> "Optional[str]":
Expand Down Expand Up @@ -80,25 +81,30 @@ def _get_cache_span_description(


def _set_cache_data(
span: "Span",
span: "Union[Span, StreamedSpan]",
redis_client: "Any",
properties: "dict[str, Any]",
return_value: "Optional[Any]",
) -> None:
if isinstance(span, StreamedSpan):
set_on_span = span.set_attribute
else:
set_on_span = span.set_data

with capture_internal_exceptions():
span.set_data(SPANDATA.CACHE_KEY, properties["key"])
set_on_span(SPANDATA.CACHE_KEY, properties["key"])

if properties["redis_command"] in GET_COMMANDS:
if return_value is not None:
span.set_data(SPANDATA.CACHE_HIT, True)
set_on_span(SPANDATA.CACHE_HIT, True)
size = (
len(str(return_value).encode("utf-8"))
if not isinstance(return_value, bytes)
else len(return_value)
)
span.set_data(SPANDATA.CACHE_ITEM_SIZE, size)
set_on_span(SPANDATA.CACHE_ITEM_SIZE, size)
else:
span.set_data(SPANDATA.CACHE_HIT, False)
set_on_span(SPANDATA.CACHE_HIT, False)

elif properties["redis_command"] in SET_COMMANDS:
if properties["value"] is not None:
Expand All @@ -107,7 +113,7 @@ def _set_cache_data(
if not isinstance(properties["value"], bytes)
else len(properties["value"])
)
span.set_data(SPANDATA.CACHE_ITEM_SIZE, size)
set_on_span(SPANDATA.CACHE_ITEM_SIZE, size)

try:
connection_params = redis_client.connection_pool.connection_kwargs
Expand All @@ -122,8 +128,8 @@ def _set_cache_data(

host = connection_params.get("host")
if host is not None:
span.set_data(SPANDATA.NETWORK_PEER_ADDRESS, host)
set_on_span(SPANDATA.NETWORK_PEER_ADDRESS, host)

port = connection_params.get("port")
if port is not None:
span.set_data(SPANDATA.NETWORK_PEER_PORT, port)
set_on_span(SPANDATA.NETWORK_PEER_PORT, port)
Loading
Loading