@@ -22,17 +22,41 @@ def inner(signal, f):
2222
2323
2424@pytest .fixture
25- def init_celery (sentry_init ):
26- def inner (propagate_traces = True , ** kwargs ):
25+ def init_celery (sentry_init , request ):
26+ def inner (propagate_traces = True , backend = "always_eager" , ** kwargs ):
2727 sentry_init (
2828 integrations = [CeleryIntegration (propagate_traces = propagate_traces )],
2929 ** kwargs
3030 )
3131 celery = Celery (__name__ )
32- if VERSION < (4 ,):
33- celery .conf .CELERY_ALWAYS_EAGER = True
32+
33+ if backend == "always_eager" :
34+ if VERSION < (4 ,):
35+ celery .conf .CELERY_ALWAYS_EAGER = True
36+ else :
37+ celery .conf .task_always_eager = True
38+ elif backend == "redis" :
39+ # broken on celery 3
40+ if VERSION < (4 ,):
41+ pytest .skip ("Redis backend broken for some reason" )
42+
43+ # this backend requires capture_events_forksafe
44+ celery .conf .worker_max_tasks_per_child = 1
45+ celery .conf .broker_url = "redis://127.0.0.1:6379"
46+ celery .conf .result_backend = "redis://127.0.0.1:6379"
47+ celery .conf .task_always_eager = False
48+
49+ Hub .main .bind_client (Hub .current .client )
50+ request .addfinalizer (lambda : Hub .main .bind_client (None ))
51+
52+ # Once we drop celery 3 we can use the celery_worker fixture
53+ w = worker .worker (app = celery )
54+ t = threading .Thread (target = w .run )
55+ t .daemon = True
56+ t .start ()
3457 else :
35- celery .conf .task_always_eager = True
58+ raise ValueError (backend )
59+
3660 return celery
3761
3862 return inner
@@ -273,15 +297,10 @@ def dummy_task(self):
273297
274298
275299@pytest .mark .forked
276- @pytest .mark .skipif (VERSION < (4 ,), reason = "in-memory backend broken" )
277- def test_transport_shutdown (request , celery , capture_events_forksafe , tmpdir ):
278- events = capture_events_forksafe ()
300+ def test_redis_backend (init_celery , capture_events_forksafe , tmpdir ):
301+ celery = init_celery (traces_sample_rate = 1.0 , backend = "redis" , debug = True )
279302
280- celery .conf .worker_max_tasks_per_child = 1
281- celery .conf .broker_url = "memory://localhost/"
282- celery .conf .broker_backend = "memory"
283- celery .conf .result_backend = "file://{}" .format (tmpdir .mkdir ("celery-results" ))
284- celery .conf .task_always_eager = False
303+ events = capture_events_forksafe ()
285304
286305 runs = []
287306
@@ -290,21 +309,26 @@ def dummy_task(self):
290309 runs .append (1 )
291310 1 / 0
292311
293- res = dummy_task .delay ()
294-
295- w = worker .worker (app = celery )
296- t = threading .Thread (target = w .run )
297- t .daemon = True
298- t .start ()
312+ # Curious: Cannot use delay() here or py2.7-celery-4.2 crashes
313+ res = dummy_task .apply_async ()
299314
300315 with pytest .raises (Exception ):
301316 # Celery 4.1 raises a gibberish exception
302317 res .wait ()
303318
319+ # if this is nonempty, the worker never really forked
320+ assert not runs
321+
304322 event = events .read_event ()
305323 (exception ,) = event ["exception" ]["values" ]
306324 assert exception ["type" ] == "ZeroDivisionError"
307325
326+ transaction = events .read_event ()
327+ assert (
328+ transaction ["contexts" ]["trace" ]["trace_id" ]
329+ == event ["contexts" ]["trace" ]["trace_id" ]
330+ )
331+
308332 events .read_flush ()
309333
310334 # if this is nonempty, the worker never really forked
0 commit comments