-
Notifications
You must be signed in to change notification settings - Fork 607
feat(redis): Support streaming spans #6083
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
5a9efd3
fc29c65
263859c
2bee953
4bc2a5c
08e896a
f783ecc
032c7e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,34 +11,51 @@ | |
| _set_pipeline_data, | ||
| ) | ||
| from sentry_sdk.tracing import Span | ||
| from sentry_sdk.tracing_utils import has_span_streaming_enabled | ||
| from sentry_sdk.utils import capture_internal_exceptions | ||
|
|
||
| from typing import TYPE_CHECKING | ||
|
|
||
| if TYPE_CHECKING: | ||
| from collections.abc import Callable | ||
| from typing import Any | ||
| from typing import Any, Optional, Union | ||
| from sentry_sdk.traces import StreamedSpan | ||
|
|
||
|
|
||
| def patch_redis_pipeline( | ||
| pipeline_cls: "Any", | ||
| is_cluster: bool, | ||
| get_command_args_fn: "Any", | ||
| set_db_data_fn: "Callable[[Span, Any], None]", | ||
| set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]", | ||
| ) -> None: | ||
| old_execute = pipeline_cls.execute | ||
|
|
||
| from sentry_sdk.integrations.redis import RedisIntegration | ||
|
|
||
| def sentry_patched_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": | ||
| if sentry_sdk.get_client().get_integration(RedisIntegration) is None: | ||
| client = sentry_sdk.get_client() | ||
| if client.get_integration(RedisIntegration) is None: | ||
| return old_execute(self, *args, **kwargs) | ||
|
|
||
| with sentry_sdk.start_span( | ||
| op=OP.DB_REDIS, | ||
| name="redis.pipeline.execute", | ||
| origin=SPAN_ORIGIN, | ||
| ) as span: | ||
| span_streaming = has_span_streaming_enabled(client.options) | ||
|
|
||
| span: "Union[Span, StreamedSpan]" | ||
| if span_streaming: | ||
| span = sentry_sdk.traces.start_span( | ||
| name="redis.pipeline.execute", | ||
| attributes={ | ||
| "sentry.origin": SPAN_ORIGIN, | ||
| "sentry.op": OP.DB_REDIS, | ||
| }, | ||
| ) | ||
| else: | ||
| span = sentry_sdk.start_span( | ||
| op=OP.DB_REDIS, | ||
| name="redis.pipeline.execute", | ||
| origin=SPAN_ORIGIN, | ||
| ) | ||
|
|
||
| with span: | ||
| with capture_internal_exceptions(): | ||
| command_seq = None | ||
| try: | ||
|
|
@@ -61,7 +78,9 @@ | |
|
|
||
|
|
||
| def patch_redis_client( | ||
| cls: "Any", is_cluster: bool, set_db_data_fn: "Callable[[Span, Any], None]" | ||
| cls: "Any", | ||
| is_cluster: bool, | ||
| set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]", | ||
| ) -> None: | ||
| """ | ||
| This function can be used to instrument custom redis client classes or | ||
|
|
@@ -74,45 +93,71 @@ | |
| def sentry_patched_execute_command( | ||
| self: "Any", name: str, *args: "Any", **kwargs: "Any" | ||
| ) -> "Any": | ||
| integration = sentry_sdk.get_client().get_integration(RedisIntegration) | ||
| client = sentry_sdk.get_client() | ||
| integration = client.get_integration(RedisIntegration) | ||
| if integration is None: | ||
| return old_execute_command(self, name, *args, **kwargs) | ||
|
|
||
| span_streaming = has_span_streaming_enabled(client.options) | ||
|
|
||
| cache_properties = _compile_cache_span_properties( | ||
| name, | ||
| args, | ||
| kwargs, | ||
| integration, | ||
| ) | ||
|
|
||
| cache_span = None | ||
| cache_span: "Optional[Union[Span, StreamedSpan]]" = None | ||
| if cache_properties["is_cache_key"] and cache_properties["op"] is not None: | ||
| cache_span = sentry_sdk.start_span( | ||
| op=cache_properties["op"], | ||
| name=cache_properties["description"], | ||
| origin=SPAN_ORIGIN, | ||
| ) | ||
| if span_streaming: | ||
| cache_span = sentry_sdk.traces.start_span( | ||
| name=cache_properties["description"], | ||
| attributes={ | ||
| "sentry.op": cache_properties["op"], | ||
| "sentry.origin": SPAN_ORIGIN, | ||
| }, | ||
| ) | ||
| else: | ||
| cache_span = sentry_sdk.start_span( | ||
| op=cache_properties["op"], | ||
| name=cache_properties["description"], | ||
| origin=SPAN_ORIGIN, | ||
| ) | ||
| cache_span.__enter__() | ||
|
|
||
| db_properties = _compile_db_span_properties(integration, name, args) | ||
|
|
||
| db_span = sentry_sdk.start_span( | ||
| op=db_properties["op"], | ||
| name=db_properties["description"], | ||
| origin=SPAN_ORIGIN, | ||
| ) | ||
| db_span: "Union[Span, StreamedSpan]" | ||
| if span_streaming: | ||
| db_span = sentry_sdk.traces.start_span( | ||
| name=db_properties["description"], | ||
| attributes={ | ||
| "sentry.op": db_properties["op"], | ||
| "sentry.origin": SPAN_ORIGIN, | ||
| }, | ||
| ) | ||
| else: | ||
| db_span = sentry_sdk.start_span( | ||
| op=db_properties["op"], | ||
| name=db_properties["description"], | ||
| origin=SPAN_ORIGIN, | ||
| ) | ||
| db_span.__enter__() | ||
|
|
||
| set_db_data_fn(db_span, self) | ||
| _set_client_data(db_span, is_cluster, name, *args) | ||
| with capture_internal_exceptions(): | ||
| set_db_data_fn(db_span, self) | ||
| _set_client_data(db_span, is_cluster, name, *args) | ||
|
|
||
| value = old_execute_command(self, name, *args, **kwargs) | ||
| try: | ||
| value = old_execute_command(self, name, *args, **kwargs) | ||
| finally: | ||
| db_span.__exit__(None, None, None) | ||
|
|
||
| db_span.__exit__(None, None, None) | ||
| if cache_span: | ||
| with capture_internal_exceptions(): | ||
| _set_cache_data(cache_span, self, cache_properties, value) | ||
|
Check warning on line 158 in sentry_sdk/integrations/redis/_sync_common.py
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NameError raised when Redis command fails with exception In VerificationRead the full file at _sync_common.py, traced through the try/finally block logic. Verified that Identified by Warden There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix attempt detected (commit 032c7e2) The commit reorganized code into a try-finally block and wrapped the problematic The original issue appears unresolved. Please review and try again. Evaluated by Warden |
||
|
|
||
| if cache_span: | ||
| _set_cache_data(cache_span, self, cache_properties, value) | ||
| cache_span.__exit__(None, None, None) | ||
| cache_span.__exit__(None, None, None) | ||
|
|
||
| return value | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.