Skip to content
2 changes: 1 addition & 1 deletion sentry_sdk/_span_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def _to_transport_format(item: "StreamedSpan") -> "Any":
res: "dict[str, Any]" = {
"trace_id": item.trace_id,
"span_id": item.span_id,
"name": item._name,
"name": item._name if item._name is not None else "<unknown>",
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guarding against sending null span names also here, directly in the batcher, since it'd be a protocol violation

With the fix for task_name above, there are no known cases where this would happen, but better to be safe

"status": item._status,
"is_segment": item._is_segment(),
"start_timestamp": item._start_timestamp.timestamp(),
Expand Down
217 changes: 151 additions & 66 deletions sentry_sdk/integrations/celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
)
from sentry_sdk.integrations.celery.utils import _now_seconds_since_epoch
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, Span, TransactionSource
from sentry_sdk.tracing_utils import Baggage
from sentry_sdk.tracing_utils import Baggage, has_span_streaming_enabled
from sentry_sdk.utils import (
capture_internal_exceptions,
ensure_integration_enabled,
event_from_exception,
reraise,
)
Expand Down Expand Up @@ -162,7 +162,9 @@ def event_processor(event: "Event", hint: "Hint") -> "Optional[Event]":


def _update_celery_task_headers(
original_headers: "dict[str, Any]", span: "Optional[Span]", monitor_beat_tasks: bool
original_headers: "dict[str, Any]",
span: "Optional[Union[StreamedSpan, Span]]",
monitor_beat_tasks: bool,
) -> "dict[str, Any]":
"""
Updates the headers of the Celery task with the tracing information
Expand Down Expand Up @@ -255,7 +257,8 @@ def _wrap_task_run(f: "F") -> "F":
def apply_async(*args: "Any", **kwargs: "Any") -> "Any":
# Note: kwargs can contain headers=None, so no setdefault!
# Unsure which backend though.
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
client = sentry_sdk.get_client()
integration = client.get_integration(CeleryIntegration)
if integration is None:
return f(*args, **kwargs)

Expand All @@ -274,17 +277,28 @@ def apply_async(*args: "Any", **kwargs: "Any") -> "Any":
else:
task_name = "<unknown Celery task>"

span_streaming = has_span_streaming_enabled(client.options)

task_started_from_beat = sentry_sdk.get_isolation_scope()._name == "celery-beat"

span_mgr: "Union[Span, NoOpMgr]" = (
sentry_sdk.start_span(
op=OP.QUEUE_SUBMIT_CELERY,
name=task_name,
origin=CeleryIntegration.origin,
)
if not task_started_from_beat
else NoOpMgr()
)
span_mgr: "Union[StreamedSpan, Span, NoOpMgr]" = NoOpMgr()
if span_streaming:
if not task_started_from_beat and sentry_sdk.get_current_span() is not None:
span_mgr = sentry_sdk.traces.start_span(
name=task_name,
attributes={
"sentry.op": OP.QUEUE_SUBMIT_CELERY,
"sentry.origin": CeleryIntegration.origin,
},
)

else:
if not task_started_from_beat:
span_mgr = sentry_sdk.start_span(
op=OP.QUEUE_SUBMIT_CELERY,
name=task_name,
origin=CeleryIntegration.origin,
)

with span_mgr as span:
kwargs["headers"] = _update_celery_task_headers(
Expand All @@ -303,50 +317,74 @@ def _wrap_tracer(task: "Any", f: "F") -> "F":
# Also because in Celery 3, signal dispatch returns early if one handler
# crashes.
@wraps(f)
@ensure_integration_enabled(CeleryIntegration, f)
def _inner(*args: "Any", **kwargs: "Any") -> "Any":
client = sentry_sdk.get_client()
if client.get_integration(CeleryIntegration) is None:
return f(*args, **kwargs)

span_streaming = has_span_streaming_enabled(client.options)

with isolation_scope() as scope:
scope._name = "celery"
scope.clear_breadcrumbs()
scope.add_event_processor(_make_event_processor(task, *args, **kwargs))

transaction = None
custom_sampling_context = {
"celery_job": {
"task": task.name,
# for some reason, args[1] is a list if non-empty but a
# tuple if empty
"args": list(args[1]),
"kwargs": args[2],
}
}

span: "Union[Span, StreamedSpan]"
span_ctx: "Union[StreamedSpan, Span, NoOpMgr]" = NoOpMgr()

# Celery task objects are not a thing to be trusted. Even
# something such as attribute access can fail.
with capture_internal_exceptions():
headers = args[3].get("headers") or {}
transaction = continue_trace(
headers,
op=OP.QUEUE_TASK_CELERY,
name="unknown celery task",
source=TransactionSource.TASK,
origin=CeleryIntegration.origin,
)
transaction.name = task.name
transaction.set_status(SPANSTATUS.OK)
if span_streaming:
sentry_sdk.traces.continue_trace(headers)
scope.set_custom_sampling_context(custom_sampling_context)
span = sentry_sdk.traces.start_span(
name=task.name,
parent_span=None, # make this a segment
attributes={
"sentry.origin": CeleryIntegration.origin,
"sentry.span.source": TransactionSource.TASK.value,
"sentry.op": OP.QUEUE_TASK_CELERY,
},
)

if transaction is None:
return f(*args, **kwargs)
span_ctx = span

else:
span = continue_trace(
headers,
op=OP.QUEUE_TASK_CELERY,
name=task.name,
source=TransactionSource.TASK,
origin=CeleryIntegration.origin,
)
span.set_status(SPANSTATUS.OK)

span_ctx = sentry_sdk.start_transaction(
span,
custom_sampling_context=custom_sampling_context,
)

with sentry_sdk.start_transaction(
transaction,
custom_sampling_context={
"celery_job": {
"task": task.name,
# for some reason, args[1] is a list if non-empty but a
# tuple if empty
"args": list(args[1]),
"kwargs": args[2],
}
},
):
with span_ctx:
return f(*args, **kwargs)

return _inner # type: ignore


def _set_messaging_destination_name(task: "Any", span: "Span") -> None:
def _set_messaging_destination_name(
task: "Any", span: "Union[StreamedSpan, Span]"
) -> None:
"""Set "messaging.destination.name" tag for span"""
with capture_internal_exceptions():
delivery_info = task.request.delivery_info
Expand All @@ -355,26 +393,47 @@ def _set_messaging_destination_name(task: "Any", span: "Span") -> None:
if delivery_info.get("exchange") == "" and routing_key is not None:
# Empty exchange indicates the default exchange, meaning the tasks
# are sent to the queue with the same name as the routing key.
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
if isinstance(span, StreamedSpan):
span.set_attribute(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
else:
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)


def _wrap_task_call(task: "Any", f: "F") -> "F":
# Need to wrap task call because the exception is caught before we get to
# see it. Also celery's reported stacktrace is untrustworthy.

# functools.wraps is important here because celery-once looks at this
# method's name. @ensure_integration_enabled internally calls functools.wraps,
# but if we ever remove the @ensure_integration_enabled decorator, we need
# to add @functools.wraps(f) here.
# https://github.com/getsentry/sentry-python/issues/421
@ensure_integration_enabled(CeleryIntegration, f)
@wraps(f)
def _inner(*args: "Any", **kwargs: "Any") -> "Any":
client = sentry_sdk.get_client()
if client.get_integration(CeleryIntegration) is None:
return f(*args, **kwargs)

span_streaming = has_span_streaming_enabled(client.options)

try:
with sentry_sdk.start_span(
op=OP.QUEUE_PROCESS,
name=task.name,
origin=CeleryIntegration.origin,
) as span:
span: "Union[Span, StreamedSpan]"
if span_streaming:
span = sentry_sdk.traces.start_span(
name=task.name,
attributes={
"sentry.op": OP.QUEUE_PROCESS,
"sentry.origin": CeleryIntegration.origin,
},
)
else:
span = sentry_sdk.start_span(
op=OP.QUEUE_PROCESS,
name=task.name,
origin=CeleryIntegration.origin,
)

with span:
if isinstance(span, StreamedSpan):
set_on_span = span.set_attribute
else:
set_on_span = span.set_data

_set_messaging_destination_name(task, span)

latency = None
Expand All @@ -389,19 +448,19 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any":

if latency is not None:
latency *= 1000 # milliseconds
span.set_data(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency)
set_on_span(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency)

with capture_internal_exceptions():
span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id)
set_on_span(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id)

with capture_internal_exceptions():
span.set_data(
set_on_span(
SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries
)

with capture_internal_exceptions():
with task.app.connection() as conn:
span.set_data(
set_on_span(
SPANDATA.MESSAGING_SYSTEM,
conn.transport.driver_type,
)
Expand Down Expand Up @@ -476,8 +535,13 @@ def sentry_workloop(*args: "Any", **kwargs: "Any") -> "Any":
def _patch_producer_publish() -> None:
original_publish = Producer.publish

@ensure_integration_enabled(CeleryIntegration, original_publish)
def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any":
client = sentry_sdk.get_client()
if client.get_integration(CeleryIntegration) is None:
return original_publish(self, *args, **kwargs)

span_streaming = has_span_streaming_enabled(client.options)

kwargs_headers = kwargs.get("headers", {})
if not isinstance(kwargs_headers, Mapping):
# Ensure kwargs_headers is a Mapping, so we can safely call get().
Expand All @@ -487,31 +551,52 @@ def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any":
# method will still work.
kwargs_headers = {}

task_name = kwargs_headers.get("task")
task_name = kwargs_headers.get("task") or "<unknown Celery task>"
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used for naming the span and spans v2 must always have a name, name=None is not supported.

task_id = kwargs_headers.get("id")
retries = kwargs_headers.get("retries")

routing_key = kwargs.get("routing_key")
exchange = kwargs.get("exchange")

with sentry_sdk.start_span(
op=OP.QUEUE_PUBLISH,
name=task_name,
origin=CeleryIntegration.origin,
) as span:
span: "Union[StreamedSpan, Span, None]" = None
if span_streaming:
if sentry_sdk.get_current_span() is not None:
span = sentry_sdk.traces.start_span(
name=task_name,
attributes={
"sentry.op": OP.QUEUE_PUBLISH,
"sentry.origin": CeleryIntegration.origin,
},
)
else:
span = sentry_sdk.start_span(
op=OP.QUEUE_PUBLISH,
name=task_name,
origin=CeleryIntegration.origin,
)

if span is None:
return original_publish(self, *args, **kwargs)

with span:
if isinstance(span, StreamedSpan):
set_on_span = span.set_attribute
else:
set_on_span = span.set_data

if task_id is not None:
span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task_id)
set_on_span(SPANDATA.MESSAGING_MESSAGE_ID, task_id)

if exchange == "" and routing_key is not None:
# Empty exchange indicates the default exchange, meaning messages are
# routed to the queue with the same name as the routing key.
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
set_on_span(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)

if retries is not None:
span.set_data(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries)
set_on_span(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries)

with capture_internal_exceptions():
span.set_data(
set_on_span(
SPANDATA.MESSAGING_SYSTEM, self.connection.transport.driver_type
)

Expand Down
13 changes: 13 additions & 0 deletions sentry_sdk/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,19 @@ def span(self, span: "Optional[Union[Span, StreamedSpan]]") -> None:
if transaction.source:
self._transaction_info["source"] = transaction.source

# Also set _transaction and _transaction_info in streaming mode as this
# is used for populating events and linking them to segments
if (
isinstance(span, StreamedSpan)
and not isinstance(span, NoOpStreamedSpan)
and span._is_segment()
):
self._transaction = span.name
if span._attributes.get("sentry.span.source"):
self._transaction_info["source"] = str(
span._attributes["sentry.span.source"]
Comment on lines +905 to +906
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're wondering what the str() is doing here, the answer is mypy 😎 😭

)

@property
def profile(self) -> "Optional[Profile]":
return self._profile
Expand Down
4 changes: 3 additions & 1 deletion sentry_sdk/traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,9 @@ def __init__(
):
self._name: str = name
self._active: bool = active
self._attributes: "Attributes" = {}
self._attributes: "Attributes" = {
"sentry.origin": "manual",
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an oversight in the core logic -- we were never setting sentry.origin for custom instrumentation

}

if attributes:
for attribute, value in attributes.items():
Expand Down
Loading
Loading