-
Notifications
You must be signed in to change notification settings - Fork 606
feat(celery): Support span streaming #6074
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
5c20a0e
943c12b
a7d7af6
2a12715
c2d1ced
e0564ca
b8fefb2
0eb003b
593871c
35fa49c
ba5a988
71a6728
80a585a
ae964eb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,11 +14,11 @@ | |
| ) | ||
| from sentry_sdk.integrations.celery.utils import _now_seconds_since_epoch | ||
| from sentry_sdk.integrations.logging import ignore_logger | ||
| from sentry_sdk.traces import StreamedSpan | ||
| from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, Span, TransactionSource | ||
| from sentry_sdk.tracing_utils import Baggage | ||
| from sentry_sdk.tracing_utils import Baggage, has_span_streaming_enabled | ||
| from sentry_sdk.utils import ( | ||
| capture_internal_exceptions, | ||
| ensure_integration_enabled, | ||
| event_from_exception, | ||
| reraise, | ||
| ) | ||
|
|
@@ -162,7 +162,9 @@ def event_processor(event: "Event", hint: "Hint") -> "Optional[Event]": | |
|
|
||
|
|
||
| def _update_celery_task_headers( | ||
| original_headers: "dict[str, Any]", span: "Optional[Span]", monitor_beat_tasks: bool | ||
| original_headers: "dict[str, Any]", | ||
| span: "Optional[Union[StreamedSpan, Span]]", | ||
| monitor_beat_tasks: bool, | ||
| ) -> "dict[str, Any]": | ||
| """ | ||
| Updates the headers of the Celery task with the tracing information | ||
|
|
@@ -255,7 +257,8 @@ def _wrap_task_run(f: "F") -> "F": | |
| def apply_async(*args: "Any", **kwargs: "Any") -> "Any": | ||
| # Note: kwargs can contain headers=None, so no setdefault! | ||
| # Unsure which backend though. | ||
| integration = sentry_sdk.get_client().get_integration(CeleryIntegration) | ||
| client = sentry_sdk.get_client() | ||
| integration = client.get_integration(CeleryIntegration) | ||
| if integration is None: | ||
| return f(*args, **kwargs) | ||
|
|
||
|
|
@@ -274,17 +277,28 @@ def apply_async(*args: "Any", **kwargs: "Any") -> "Any": | |
| else: | ||
| task_name = "<unknown Celery task>" | ||
|
|
||
| span_streaming = has_span_streaming_enabled(client.options) | ||
|
|
||
| task_started_from_beat = sentry_sdk.get_isolation_scope()._name == "celery-beat" | ||
|
|
||
| span_mgr: "Union[Span, NoOpMgr]" = ( | ||
| sentry_sdk.start_span( | ||
| op=OP.QUEUE_SUBMIT_CELERY, | ||
| name=task_name, | ||
| origin=CeleryIntegration.origin, | ||
| ) | ||
| if not task_started_from_beat | ||
| else NoOpMgr() | ||
| ) | ||
| span_mgr: "Union[StreamedSpan, Span, NoOpMgr]" = NoOpMgr() | ||
| if span_streaming: | ||
| if not task_started_from_beat and sentry_sdk.get_current_span() is not None: | ||
| span_mgr = sentry_sdk.traces.start_span( | ||
| name=task_name, | ||
| attributes={ | ||
| "sentry.op": OP.QUEUE_SUBMIT_CELERY, | ||
| "sentry.origin": CeleryIntegration.origin, | ||
| }, | ||
| ) | ||
|
|
||
| else: | ||
| if not task_started_from_beat: | ||
| span_mgr = sentry_sdk.start_span( | ||
| op=OP.QUEUE_SUBMIT_CELERY, | ||
| name=task_name, | ||
| origin=CeleryIntegration.origin, | ||
| ) | ||
|
|
||
| with span_mgr as span: | ||
| kwargs["headers"] = _update_celery_task_headers( | ||
|
|
@@ -303,50 +317,74 @@ def _wrap_tracer(task: "Any", f: "F") -> "F": | |
| # Also because in Celery 3, signal dispatch returns early if one handler | ||
| # crashes. | ||
| @wraps(f) | ||
| @ensure_integration_enabled(CeleryIntegration, f) | ||
| def _inner(*args: "Any", **kwargs: "Any") -> "Any": | ||
| client = sentry_sdk.get_client() | ||
| if client.get_integration(CeleryIntegration) is None: | ||
| return f(*args, **kwargs) | ||
|
|
||
| span_streaming = has_span_streaming_enabled(client.options) | ||
|
|
||
| with isolation_scope() as scope: | ||
| scope._name = "celery" | ||
| scope.clear_breadcrumbs() | ||
| scope.add_event_processor(_make_event_processor(task, *args, **kwargs)) | ||
|
|
||
| transaction = None | ||
| custom_sampling_context = { | ||
| "celery_job": { | ||
| "task": task.name, | ||
| # for some reason, args[1] is a list if non-empty but a | ||
| # tuple if empty | ||
| "args": list(args[1]), | ||
| "kwargs": args[2], | ||
| } | ||
| } | ||
|
|
||
| span: "Union[Span, StreamedSpan]" | ||
| span_ctx: "Union[StreamedSpan, Span, NoOpMgr]" = NoOpMgr() | ||
|
|
||
| # Celery task objects are not a thing to be trusted. Even | ||
| # something such as attribute access can fail. | ||
| with capture_internal_exceptions(): | ||
| headers = args[3].get("headers") or {} | ||
| transaction = continue_trace( | ||
| headers, | ||
| op=OP.QUEUE_TASK_CELERY, | ||
| name="unknown celery task", | ||
| source=TransactionSource.TASK, | ||
| origin=CeleryIntegration.origin, | ||
| ) | ||
| transaction.name = task.name | ||
| transaction.set_status(SPANSTATUS.OK) | ||
| if span_streaming: | ||
| sentry_sdk.traces.continue_trace(headers) | ||
| scope.set_custom_sampling_context(custom_sampling_context) | ||
| span = sentry_sdk.traces.start_span( | ||
| name=task.name, | ||
| parent_span=None, # make this a segment | ||
| attributes={ | ||
| "sentry.origin": CeleryIntegration.origin, | ||
| "sentry.span.source": TransactionSource.TASK.value, | ||
| "sentry.op": OP.QUEUE_TASK_CELERY, | ||
| }, | ||
| ) | ||
|
|
||
| if transaction is None: | ||
| return f(*args, **kwargs) | ||
| span_ctx = span | ||
|
|
||
| else: | ||
| span = continue_trace( | ||
| headers, | ||
| op=OP.QUEUE_TASK_CELERY, | ||
| name=task.name, | ||
| source=TransactionSource.TASK, | ||
| origin=CeleryIntegration.origin, | ||
| ) | ||
| span.set_status(SPANSTATUS.OK) | ||
|
|
||
| span_ctx = sentry_sdk.start_transaction( | ||
| span, | ||
| custom_sampling_context=custom_sampling_context, | ||
| ) | ||
|
|
||
| with sentry_sdk.start_transaction( | ||
| transaction, | ||
| custom_sampling_context={ | ||
| "celery_job": { | ||
| "task": task.name, | ||
| # for some reason, args[1] is a list if non-empty but a | ||
| # tuple if empty | ||
| "args": list(args[1]), | ||
| "kwargs": args[2], | ||
| } | ||
| }, | ||
| ): | ||
| with span_ctx: | ||
| return f(*args, **kwargs) | ||
|
|
||
| return _inner # type: ignore | ||
|
|
||
|
|
||
| def _set_messaging_destination_name(task: "Any", span: "Span") -> None: | ||
| def _set_messaging_destination_name( | ||
| task: "Any", span: "Union[StreamedSpan, Span]" | ||
| ) -> None: | ||
| """Set "messaging.destination.name" tag for span""" | ||
| with capture_internal_exceptions(): | ||
| delivery_info = task.request.delivery_info | ||
|
|
@@ -355,26 +393,47 @@ def _set_messaging_destination_name(task: "Any", span: "Span") -> None: | |
| if delivery_info.get("exchange") == "" and routing_key is not None: | ||
| # Empty exchange indicates the default exchange, meaning the tasks | ||
| # are sent to the queue with the same name as the routing key. | ||
| span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) | ||
| if isinstance(span, StreamedSpan): | ||
| span.set_attribute(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) | ||
| else: | ||
| span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) | ||
|
|
||
|
|
||
| def _wrap_task_call(task: "Any", f: "F") -> "F": | ||
| # Need to wrap task call because the exception is caught before we get to | ||
| # see it. Also celery's reported stacktrace is untrustworthy. | ||
|
|
||
| # functools.wraps is important here because celery-once looks at this | ||
| # method's name. @ensure_integration_enabled internally calls functools.wraps, | ||
| # but if we ever remove the @ensure_integration_enabled decorator, we need | ||
| # to add @functools.wraps(f) here. | ||
| # https://github.com/getsentry/sentry-python/issues/421 | ||
| @ensure_integration_enabled(CeleryIntegration, f) | ||
| @wraps(f) | ||
| def _inner(*args: "Any", **kwargs: "Any") -> "Any": | ||
| client = sentry_sdk.get_client() | ||
| if client.get_integration(CeleryIntegration) is None: | ||
| return f(*args, **kwargs) | ||
|
|
||
| span_streaming = has_span_streaming_enabled(client.options) | ||
|
|
||
| try: | ||
| with sentry_sdk.start_span( | ||
| op=OP.QUEUE_PROCESS, | ||
| name=task.name, | ||
| origin=CeleryIntegration.origin, | ||
| ) as span: | ||
| span: "Union[Span, StreamedSpan]" | ||
| if span_streaming: | ||
| span = sentry_sdk.traces.start_span( | ||
| name=task.name, | ||
| attributes={ | ||
| "sentry.op": OP.QUEUE_PROCESS, | ||
| "sentry.origin": CeleryIntegration.origin, | ||
| }, | ||
| ) | ||
| else: | ||
| span = sentry_sdk.start_span( | ||
| op=OP.QUEUE_PROCESS, | ||
| name=task.name, | ||
| origin=CeleryIntegration.origin, | ||
| ) | ||
|
|
||
| with span: | ||
| if isinstance(span, StreamedSpan): | ||
| set_on_span = span.set_attribute | ||
| else: | ||
| set_on_span = span.set_data | ||
|
|
||
| _set_messaging_destination_name(task, span) | ||
|
|
||
| latency = None | ||
|
|
@@ -389,19 +448,19 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any": | |
|
|
||
| if latency is not None: | ||
| latency *= 1000 # milliseconds | ||
| span.set_data(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency) | ||
| set_on_span(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency) | ||
|
|
||
| with capture_internal_exceptions(): | ||
| span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id) | ||
| set_on_span(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id) | ||
|
|
||
| with capture_internal_exceptions(): | ||
| span.set_data( | ||
| set_on_span( | ||
| SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries | ||
| ) | ||
|
|
||
| with capture_internal_exceptions(): | ||
| with task.app.connection() as conn: | ||
| span.set_data( | ||
| set_on_span( | ||
| SPANDATA.MESSAGING_SYSTEM, | ||
| conn.transport.driver_type, | ||
| ) | ||
|
|
@@ -476,8 +535,13 @@ def sentry_workloop(*args: "Any", **kwargs: "Any") -> "Any": | |
| def _patch_producer_publish() -> None: | ||
| original_publish = Producer.publish | ||
|
|
||
| @ensure_integration_enabled(CeleryIntegration, original_publish) | ||
| def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any": | ||
| client = sentry_sdk.get_client() | ||
| if client.get_integration(CeleryIntegration) is None: | ||
| return original_publish(self, *args, **kwargs) | ||
|
|
||
| span_streaming = has_span_streaming_enabled(client.options) | ||
|
|
||
| kwargs_headers = kwargs.get("headers", {}) | ||
| if not isinstance(kwargs_headers, Mapping): | ||
| # Ensure kwargs_headers is a Mapping, so we can safely call get(). | ||
|
|
@@ -487,31 +551,52 @@ def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any": | |
| # method will still work. | ||
| kwargs_headers = {} | ||
|
|
||
| task_name = kwargs_headers.get("task") | ||
| task_name = kwargs_headers.get("task") or "<unknown Celery task>" | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is used for naming the span and spans v2 must always have a name, |
||
| task_id = kwargs_headers.get("id") | ||
| retries = kwargs_headers.get("retries") | ||
|
|
||
| routing_key = kwargs.get("routing_key") | ||
| exchange = kwargs.get("exchange") | ||
|
|
||
| with sentry_sdk.start_span( | ||
| op=OP.QUEUE_PUBLISH, | ||
| name=task_name, | ||
| origin=CeleryIntegration.origin, | ||
| ) as span: | ||
| span: "Union[StreamedSpan, Span, None]" = None | ||
| if span_streaming: | ||
| if sentry_sdk.get_current_span() is not None: | ||
| span = sentry_sdk.traces.start_span( | ||
| name=task_name, | ||
| attributes={ | ||
| "sentry.op": OP.QUEUE_PUBLISH, | ||
| "sentry.origin": CeleryIntegration.origin, | ||
| }, | ||
| ) | ||
| else: | ||
| span = sentry_sdk.start_span( | ||
| op=OP.QUEUE_PUBLISH, | ||
| name=task_name, | ||
| origin=CeleryIntegration.origin, | ||
| ) | ||
|
|
||
| if span is None: | ||
| return original_publish(self, *args, **kwargs) | ||
|
|
||
| with span: | ||
| if isinstance(span, StreamedSpan): | ||
| set_on_span = span.set_attribute | ||
| else: | ||
| set_on_span = span.set_data | ||
|
|
||
| if task_id is not None: | ||
| span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task_id) | ||
| set_on_span(SPANDATA.MESSAGING_MESSAGE_ID, task_id) | ||
|
|
||
| if exchange == "" and routing_key is not None: | ||
| # Empty exchange indicates the default exchange, meaning messages are | ||
| # routed to the queue with the same name as the routing key. | ||
| span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) | ||
| set_on_span(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) | ||
|
|
||
| if retries is not None: | ||
| span.set_data(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries) | ||
| set_on_span(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries) | ||
|
|
||
| with capture_internal_exceptions(): | ||
| span.set_data( | ||
| set_on_span( | ||
| SPANDATA.MESSAGING_SYSTEM, self.connection.transport.driver_type | ||
| ) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -893,6 +893,19 @@ def span(self, span: "Optional[Union[Span, StreamedSpan]]") -> None: | |
| if transaction.source: | ||
| self._transaction_info["source"] = transaction.source | ||
|
|
||
| # Also set _transaction and _transaction_info in streaming mode as this | ||
| # is used for populating events and linking them to segments | ||
| if ( | ||
| isinstance(span, StreamedSpan) | ||
| and not isinstance(span, NoOpStreamedSpan) | ||
| and span._is_segment() | ||
| ): | ||
| self._transaction = span.name | ||
| if span._attributes.get("sentry.span.source"): | ||
| self._transaction_info["source"] = str( | ||
| span._attributes["sentry.span.source"] | ||
|
Comment on lines
+905
to
+906
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you're wondering what the |
||
| ) | ||
|
|
||
| @property | ||
| def profile(self) -> "Optional[Profile]": | ||
| return self._profile | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -258,7 +258,9 @@ def __init__( | |
| ): | ||
| self._name: str = name | ||
| self._active: bool = active | ||
| self._attributes: "Attributes" = {} | ||
| self._attributes: "Attributes" = { | ||
| "sentry.origin": "manual", | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was an oversight in the core logic -- we were never setting |
||
| } | ||
|
|
||
| if attributes: | ||
| for attribute, value in attributes.items(): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guarding against sending null span names also here, directly in the batcher, since it'd be a protocol violation
With the fix for
task_nameabove, there are no known cases where this would happen, but better to be safe