@@ -113,133 +113,109 @@ def _get_monitor_config(celery_schedule, app, monitor_name):
113113 return monitor_config
114114
115115
116- def _patch_beat_apply_entry ( ):
117- # type: () -> None
116+ def _apply_crons_data_to_schedule_entry ( scheduler , schedule_entry , integration ):
117+ # type: (Any, Any, sentry_sdk.integrations.celery.CeleryIntegration ) -> None
118118 """
119- Makes sure that the Sentry Crons information is set in the Celery Beat task's
120- headers so that is is monitored with Sentry Crons.
121-
122- This is only called by Celery Beat. After apply_entry is called
123- Celery will call apply_async to put the task in the queue.
119+ Add Sentry Crons information to the schedule_entry headers.
124120 """
125- from sentry_sdk .integrations .celery import CeleryIntegration
126-
127- original_apply_entry = Scheduler .apply_entry
128-
129- def sentry_apply_entry (* args , ** kwargs ):
130- # type: (*Any, **Any) -> None
131- scheduler , schedule_entry = args
132- app = scheduler .app
133-
134- celery_schedule = schedule_entry .schedule
135- monitor_name = schedule_entry .name
136-
137- integration = sentry_sdk .get_client ().get_integration (CeleryIntegration )
138- if integration is None :
139- return original_apply_entry (* args , ** kwargs )
140-
141- if match_regex_list (monitor_name , integration .exclude_beat_tasks ):
142- return original_apply_entry (* args , ** kwargs )
121+ if not integration .monitor_beat_tasks :
122+ return
143123
144- # Tasks started by Celery Beat start a new Trace
145- scope = Scope .get_isolation_scope ()
146- scope .set_new_propagation_context ()
147- scope ._name = "celery-beat"
124+ monitor_name = schedule_entry .name
148125
149- monitor_config = _get_monitor_config (celery_schedule , app , monitor_name )
126+ task_should_be_excluded = match_regex_list (
127+ monitor_name , integration .exclude_beat_tasks
128+ )
129+ if task_should_be_excluded :
130+ return
150131
151- is_supported_schedule = bool (monitor_config )
152- if is_supported_schedule :
153- headers = schedule_entry .options .pop ("headers" , {})
154- headers .update (
155- {
156- "sentry-monitor-slug" : monitor_name ,
157- "sentry-monitor-config" : monitor_config ,
158- }
159- )
132+ celery_schedule = schedule_entry .schedule
133+ app = scheduler .app
160134
161- check_in_id = capture_checkin (
162- monitor_slug = monitor_name ,
163- monitor_config = monitor_config ,
164- status = MonitorStatus .IN_PROGRESS ,
165- )
166- headers .update ({"sentry-monitor-check-in-id" : check_in_id })
135+ monitor_config = _get_monitor_config (celery_schedule , app , monitor_name )
167136
168- # Set the Sentry configuration in the options of the ScheduleEntry.
169- # Those will be picked up in `apply_async` and added to the headers.
170- schedule_entry . options [ "headers" ] = headers
137+ is_supported_schedule = bool ( monitor_config )
138+ if not is_supported_schedule :
139+ return
171140
172- return original_apply_entry (* args , ** kwargs )
141+ headers = schedule_entry .options .pop ("headers" , {})
142+ headers .update (
143+ {
144+ "sentry-monitor-slug" : monitor_name ,
145+ "sentry-monitor-config" : monitor_config ,
146+ }
147+ )
173148
174- Scheduler .apply_entry = sentry_apply_entry
149+ check_in_id = capture_checkin (
150+ monitor_slug = monitor_name ,
151+ monitor_config = monitor_config ,
152+ status = MonitorStatus .IN_PROGRESS ,
153+ )
154+ headers .update ({"sentry-monitor-check-in-id" : check_in_id })
175155
156+ # Set the Sentry configuration in the options of the ScheduleEntry.
157+ # Those will be picked up in `apply_async` and added to the headers.
158+ schedule_entry .options ["headers" ] = headers
176159
177- def _patch_redbeat_maybe_due ():
178- # type: () -> None
179160
180- if RedBeatScheduler is None :
181- return
161+ def _wrap_beat_scheduler (original_function ):
162+ # type: (Callable[..., Any]) -> Callable[..., Any]
163+ """
164+ Makes sure that:
165+ - a new Sentry trace is started for each task started by Celery Beat and
166+ it is propagated to the task.
167+ - the Sentry Crons information is set in the Celery Beat task's
168+ headers so that is is monitored with Sentry Crons.
169+
170+ After the patched function is called,
171+ Celery Beat will call apply_async to put the task in the queue.
172+ """
173+ # Patch only once
174+ # Can't use __name__ here, because some of our tests mock original_apply_entry
175+ already_patched = "sentry_patched_scheduler" in str (original_function )
176+ if already_patched :
177+ return original_function
182178
183179 from sentry_sdk .integrations .celery import CeleryIntegration
184180
185- original_maybe_due = RedBeatScheduler .maybe_due
186-
187- def sentry_maybe_due (* args , ** kwargs ):
181+ def sentry_patched_scheduler (* args , ** kwargs ):
188182 # type: (*Any, **Any) -> None
189- scheduler , schedule_entry = args
190- app = scheduler .app
191-
192- celery_schedule = schedule_entry .schedule
193- monitor_name = schedule_entry .name
194-
195183 integration = sentry_sdk .get_client ().get_integration (CeleryIntegration )
196184 if integration is None :
197- return original_maybe_due (* args , ** kwargs )
198-
199- task_should_be_excluded = match_regex_list (
200- monitor_name , integration .exclude_beat_tasks
201- )
202- if task_should_be_excluded :
203- return original_maybe_due (* args , ** kwargs )
185+ return original_function (* args , ** kwargs )
204186
205187 # Tasks started by Celery Beat start a new Trace
206188 scope = Scope .get_isolation_scope ()
207189 scope .set_new_propagation_context ()
208190 scope ._name = "celery-beat"
209191
210- monitor_config = _get_monitor_config (celery_schedule , app , monitor_name )
211-
212- is_supported_schedule = bool (monitor_config )
213- if is_supported_schedule :
214- headers = schedule_entry .options .pop ("headers" , {})
215- headers .update (
216- {
217- "sentry-monitor-slug" : monitor_name ,
218- "sentry-monitor-config" : monitor_config ,
219- }
220- )
192+ scheduler , schedule_entry = args
193+ _apply_crons_data_to_schedule_entry (scheduler , schedule_entry , integration )
221194
222- check_in_id = capture_checkin (
223- monitor_slug = monitor_name ,
224- monitor_config = monitor_config ,
225- status = MonitorStatus .IN_PROGRESS ,
226- )
227- headers .update ({"sentry-monitor-check-in-id" : check_in_id })
195+ return original_function (* args , ** kwargs )
228196
229- # Set the Sentry configuration in the options of the ScheduleEntry.
230- # Those will be picked up in `apply_async` and added to the headers.
231- schedule_entry .options ["headers" ] = headers
197+ return sentry_patched_scheduler
232198
233- return original_maybe_due (* args , ** kwargs )
234199
235- RedBeatScheduler .maybe_due = sentry_maybe_due
200+ def _patch_beat_apply_entry ():
201+ # type: () -> None
202+ Scheduler .apply_entry = _wrap_beat_scheduler (Scheduler .apply_entry )
236203
237204
238- def _setup_celery_beat_signals ():
205+ def _patch_redbeat_maybe_due ():
239206 # type: () -> None
240- task_success .connect (crons_task_success )
241- task_failure .connect (crons_task_failure )
242- task_retry .connect (crons_task_retry )
207+ if RedBeatScheduler is None :
208+ return
209+
210+ RedBeatScheduler .maybe_due = _wrap_beat_scheduler (RedBeatScheduler .maybe_due )
211+
212+
213+ def _setup_celery_beat_signals (monitor_beat_tasks ):
214+ # type: (bool) -> None
215+ if monitor_beat_tasks :
216+ task_success .connect (crons_task_success )
217+ task_failure .connect (crons_task_failure )
218+ task_retry .connect (crons_task_retry )
243219
244220
245221def crons_task_success (sender , ** kwargs ):
0 commit comments