forked from getsentry/sentry-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcelery.py
More file actions
203 lines (155 loc) · 6.45 KB
/
celery.py
File metadata and controls
203 lines (155 loc) · 6.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
from __future__ import absolute_import
import functools
import sys
from celery.exceptions import ( # type: ignore
SoftTimeLimitExceeded,
Retry,
Ignore,
Reject,
)
from sentry_sdk.hub import Hub
from sentry_sdk.utils import capture_internal_exceptions, event_from_exception
from sentry_sdk.tracing import Span
from sentry_sdk._compat import reraise
from sentry_sdk.integrations import Integration
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk._types import MYPY
if MYPY:
from typing import Any
CELERY_CONTROL_FLOW_EXCEPTIONS = (Retry, Ignore, Reject)
class CeleryIntegration(Integration):
identifier = "celery"
def __init__(self, propagate_traces=True):
# type: (bool) -> None
self.propagate_traces = propagate_traces
@staticmethod
def setup_once():
# type: () -> None
import celery.app.trace as trace # type: ignore
old_build_tracer = trace.build_tracer
def sentry_build_tracer(name, task, *args, **kwargs):
if not getattr(task, "_sentry_is_patched", False):
# Need to patch both methods because older celery sometimes
# short-circuits to task.run if it thinks it's safe.
task.__call__ = _wrap_task_call(task, task.__call__)
task.run = _wrap_task_call(task, task.run)
task.apply_async = _wrap_apply_async(task, task.apply_async)
# `build_tracer` is apparently called for every task
# invocation. Can't wrap every celery task for every invocation
# or we will get infinitely nested wrapper functions.
task._sentry_is_patched = True
return _wrap_tracer(task, old_build_tracer(name, task, *args, **kwargs))
trace.build_tracer = sentry_build_tracer
_patch_worker_exit()
# This logger logs every status of every task that ran on the worker.
# Meaning that every task's breadcrumbs are full of stuff like "Task
# <foo> raised unexpected <bar>".
ignore_logger("celery.worker.job")
def _wrap_apply_async(task, f):
@functools.wraps(f)
def apply_async(*args, **kwargs):
hub = Hub.current
integration = hub.get_integration(CeleryIntegration)
if integration is not None and integration.propagate_traces:
headers = None
for key, value in hub.iter_trace_propagation_headers():
if headers is None:
headers = dict(kwargs.get("headers") or {})
headers[key] = value
if headers is not None:
kwargs["headers"] = headers
with hub.span(op="celery.submit", description=task.name):
return f(*args, **kwargs)
else:
return f(*args, **kwargs)
return apply_async
def _wrap_tracer(task, f):
# Need to wrap tracer for pushing the scope before prerun is sent, and
# popping it after postrun is sent.
#
# This is the reason we don't use signals for hooking in the first place.
# Also because in Celery 3, signal dispatch returns early if one handler
# crashes.
@functools.wraps(f)
def _inner(*args, **kwargs):
hub = Hub.current
if hub.get_integration(CeleryIntegration) is None:
return f(*args, **kwargs)
with hub.push_scope() as scope:
scope._name = "celery"
scope.clear_breadcrumbs()
scope.add_event_processor(_make_event_processor(task, *args, **kwargs))
span = Span.continue_from_headers(args[3].get("headers") or {})
span.transaction = "unknown celery task"
with capture_internal_exceptions():
# Celery task objects are not a thing to be trusted. Even
# something such as attribute access can fail.
span.transaction = task.name
with hub.span(span):
return f(*args, **kwargs)
return _inner
def _wrap_task_call(task, f):
# Need to wrap task call because the exception is caught before we get to
# see it. Also celery's reported stacktrace is untrustworthy.
# functools.wraps is important here because celery-once looks at this
# method's name.
# https://github.com/getsentry/sentry-python/issues/421
@functools.wraps(f)
def _inner(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception:
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(task, exc_info)
reraise(*exc_info)
return _inner
def _make_event_processor(task, uuid, args, kwargs, request=None):
def event_processor(event, hint):
with capture_internal_exceptions():
extra = event.setdefault("extra", {})
extra["celery-job"] = {
"task_name": task.name,
"args": args,
"kwargs": kwargs,
}
if "exc_info" in hint:
with capture_internal_exceptions():
if issubclass(hint["exc_info"][0], SoftTimeLimitExceeded):
event["fingerprint"] = [
"celery",
"SoftTimeLimitExceeded",
getattr(task, "name", task),
]
return event
return event_processor
def _capture_exception(task, exc_info):
hub = Hub.current
if hub.get_integration(CeleryIntegration) is None:
return
if isinstance(exc_info[1], CELERY_CONTROL_FLOW_EXCEPTIONS):
return
if hasattr(task, "throws") and isinstance(exc_info[1], task.throws):
return
# If an integration is there, a client has to be there.
client = hub.client # type: Any
event, hint = event_from_exception(
exc_info,
client_options=client.options,
mechanism={"type": "celery", "handled": False},
)
hub.capture_event(event, hint=hint)
def _patch_worker_exit():
# Need to flush queue before worker shutdown because a crashing worker will
# call os._exit
from billiard.pool import Worker # type: ignore
old_workloop = Worker.workloop
def sentry_workloop(*args, **kwargs):
try:
return old_workloop(*args, **kwargs)
finally:
with capture_internal_exceptions():
hub = Hub.current
if hub.get_integration(CeleryIntegration) is not None:
hub.flush()
Worker.workloop = sentry_workloop