Skip to content

Commit 72e9240

Browse files
fix#4834 (canvas): fix chord error handling when body is a chain (#10247)
* fix(canvas): fix chord error handling when body is a chain (#4834) Three bugs caused chord_error_from_stack to crash with TypeError when a chord member failed and the chord body was a chain (created by chain(chain(group(...), task), task)): 1. prepare_steps set self.id = last_task_id (the input parameter, None) instead of the last task's actual UUID from results[0].id. The chord body chain had id=None, causing get_key_for_task(None) to crash in the error handler. 2. Errbacks set on the outer chain were applied to the chord task but not to the chord body. chord_error_from_stack reads errbacks from the body (request.chord), so link_error handlers never fired. 3. No defensive guard in chord_error_from_stack for callback.id being None. Fix: - canvas.py prepare_steps: set chain ID to results[0].id - canvas.py prepare_steps: propagate link_error to chord body - backends/base.py chord_error_from_stack: generate UUID if callback.id is None Fixes #4834 * style: shorten inline comments * test: add integration test for chord error in nested chain Verifies that chain(chain(group(ok, failing), task), task) does not crash with TypeError in chord_error_from_stack when a chord member fails. The test checks that no 'task_id must not be empty' error is logged during error propagation. * fix: narrow exception assertion in integration test Replace broad pytest.raises((ExpectedException, Exception)) with explicit checks that the exception is not TypeError or the internal ValueError from get_key_for_task. The chord failure itself is expected, but internal errors are not. --------- Co-authored-by: Asif Saif Uddin {"Auvi":"অভি"} <auvipy@gmail.com>
1 parent 1a39f8c commit 72e9240

5 files changed

Lines changed: 185 additions & 3 deletions

File tree

celery/backends/base.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,13 @@ def chord_error_from_stack(self, callback, exc=None):
317317
# Handle group callbacks specially to prevent hanging body tasks
318318
if isinstance(callback, group):
319319
return self._handle_group_chord_error(group_callback=callback, backend=backend, exc=exc)
320+
321+
# Generate an ID if missing so the error can be stored.
322+
callback_id = callback.id
323+
if not callback_id:
324+
from kombu.utils.uuid import uuid
325+
callback_id = callback.options['task_id'] = uuid()
326+
320327
# We have to make a fake request since either the callback failed or
321328
# we're pretending it did since we don't have information about the
322329
# chord part(s) which failed. This request is constructed as a best
@@ -330,9 +337,9 @@ def chord_error_from_stack(self, callback, exc=None):
330337
try:
331338
self._call_task_errbacks(fake_request, exc, None)
332339
except Exception as eb_exc: # pylint: disable=broad-except
333-
return backend.fail_from_current_stack(callback.id, exc=eb_exc)
340+
return backend.fail_from_current_stack(callback_id, exc=eb_exc)
334341
else:
335-
return backend.fail_from_current_stack(callback.id, exc=exc)
342+
return backend.fail_from_current_stack(callback_id, exc=exc)
336343

337344
def _handle_group_chord_error(self, group_callback, backend, exc=None):
338345
"""Handle chord errors when the callback is a group.

celery/canvas.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1260,6 +1260,9 @@ def prepare_steps(self, args, kwargs, tasks,
12601260
if link_error:
12611261
for errback in maybe_list(link_error):
12621262
task.link_error(errback)
1263+
# Propagate to chord body for chord_error_from_stack.
1264+
if isinstance(task, chord) and task.body:
1265+
task.body.link_error(errback)
12631266

12641267
tasks.append(task)
12651268
results.append(res)
@@ -1277,7 +1280,8 @@ def prepare_steps(self, args, kwargs, tasks,
12771280
while node.parent:
12781281
node = node.parent
12791282
prev_res = node
1280-
self.id = last_task_id
1283+
# Use the last task's actual ID, not the input parameter.
1284+
self.id = results[0].id if results else last_task_id
12811285
return tasks, results
12821286

12831287
def apply(self, args=None, kwargs=None, **options):

t/integration/test_canvas.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3269,6 +3269,47 @@ def test_chord_error_propagation_with_different_body_types(
32693269
error_found = check_for_logs(caplog=caplog, message="ValueError: task_id must not be empty")
32703270
assert not error_found, "The 'task_id must not be empty' error was found in the logs"
32713271

3272+
@flaky
3273+
def test_chord_error_in_nested_chain_does_not_crash(self, manager, caplog):
3274+
"""Chord error with chain body must not raise internal TypeError.
3275+
3276+
Regression test for https://github.com/celery/celery/issues/4834
3277+
chain(chain(group(ok, failing), task), task) crashed with
3278+
TypeError in chord_error_from_stack because the chain body
3279+
had id=None.
3280+
"""
3281+
try:
3282+
manager.app.backend.ensure_chords_allowed()
3283+
except NotImplementedError as e:
3284+
raise pytest.skip(e.args[0])
3285+
3286+
c = chain(
3287+
chain(
3288+
group(add.si(1, 1), fail.si()),
3289+
identity.s(),
3290+
),
3291+
identity.s(),
3292+
)
3293+
result = c.apply_async()
3294+
3295+
try:
3296+
result.get(timeout=TIMEOUT)
3297+
except Exception as exc:
3298+
assert not isinstance(exc, TypeError), (
3299+
f"Internal TypeError raised: {exc}"
3300+
)
3301+
assert "task_id must not be empty" not in str(exc), (
3302+
f"Internal ValueError raised: {exc}"
3303+
)
3304+
3305+
error_found = check_for_logs(
3306+
caplog=caplog,
3307+
message="task_id must not be empty",
3308+
)
3309+
assert not error_found, (
3310+
"chord_error_from_stack crashed with 'task_id must not be empty'"
3311+
)
3312+
32723313

32733314
class test_signature_serialization:
32743315
"""

t/unit/backends/test_base.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -910,6 +910,36 @@ def test_chord_error_from_stack_backend_fallback(self):
910910
# Verify self was used as fallback backend
911911
backend.fail_from_current_stack.assert_called_once()
912912

913+
def test_chord_error_from_stack_generates_id_when_callback_id_is_none(self):
914+
"""chord_error_from_stack must not crash when callback.id is None.
915+
916+
Regression test for https://github.com/celery/celery/issues/4834
917+
When a chord body is a chain without an explicit task_id, the
918+
error handler must generate an ID so the error result can be
919+
stored and errback handlers can fire.
920+
"""
921+
backend = self.b
922+
923+
callback = MagicMock(name='callback')
924+
callback.id = None
925+
callback.options = {'task_id': None, 'link_error': []}
926+
callback.keys.return_value = []
927+
callback.task = 'nonexistent.task'
928+
929+
backend._call_task_errbacks = Mock()
930+
backend.fail_from_current_stack = Mock()
931+
932+
backend.chord_error_from_stack(callback, exc=ValueError('test'))
933+
934+
# fail_from_current_stack must be called with a non-None ID
935+
call_args = backend.fail_from_current_stack.call_args
936+
actual_id = call_args[0][0]
937+
assert actual_id is not None, (
938+
"chord_error_from_stack must generate an ID when callback.id is None"
939+
)
940+
# The generated ID must also be stored on the callback
941+
assert callback.options['task_id'] is not None
942+
913943
def _create_mock_frozen_group(self, group_id="group-id", task_ids=None, task_names=None):
914944
"""Helper to create mock frozen group with results."""
915945
if task_ids is None:

t/unit/tasks/test_canvas.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -860,6 +860,106 @@ def test_upgrade_to_chord_on_chain(self):
860860
assert isinstance(final_task.tasks[0].body, chord)
861861
assert final_task.tasks[0].body.body == chain1
862862

863+
def test_chain_body_gets_id_when_used_as_chord_body(self):
864+
"""Chain used as chord body must have a non-None ID after freeze.
865+
866+
Regression test for https://github.com/celery/celery/issues/4834
867+
When chain(chain(group(...), task), task) is frozen, the chord
868+
body is a chain. Previously, the chain's ID stayed None because
869+
prepare_steps set self.id = last_task_id (the input parameter,
870+
None) instead of the last task's actual UUID.
871+
"""
872+
# Build chord with a chain body: group becomes chord,
873+
# the two following tasks become the body chain.
874+
g = group(self.add.si(1, 1), self.add.si(2, 2))
875+
body_chain = chord(g, self.add.s(10), app=self.app)
876+
canvas = chain(body_chain, self.add.s(20), app=self.app)
877+
canvas.freeze()
878+
879+
# Find the chord
880+
chords = [t for t in canvas.tasks if isinstance(t, chord)]
881+
assert chords, "Expected a chord in the frozen chain"
882+
chord_task = chords[0]
883+
884+
# The chord body (a chain) must have a non-None ID
885+
assert chord_task.body.id is not None, (
886+
"Chord body chain must have an ID after freeze"
887+
)
888+
889+
def test_chain_body_id_propagated_to_header_tasks(self):
890+
"""Header tasks must carry the chord body's ID in request.chord.
891+
892+
Regression test for https://github.com/celery/celery/issues/4834
893+
The header tasks store a reference to the chord body. After
894+
freeze, this reference must have a valid task_id so that
895+
chord_error_from_stack can store the error result.
896+
"""
897+
g = group(self.add.si(1, 1), self.add.si(2, 2))
898+
body_chain = chord(g, self.add.s(10), app=self.app)
899+
canvas = chain(body_chain, self.add.s(20), app=self.app)
900+
canvas.freeze()
901+
902+
chords = [t for t in canvas.tasks if isinstance(t, chord)]
903+
chord_task = chords[0]
904+
905+
# Every header task's chord option must have the body's ID
906+
body_id = chord_task.body.id
907+
for header_task in chord_task.tasks.tasks:
908+
header_chord = header_task.options.get('chord')
909+
assert header_chord is not None, "Header task must have chord option"
910+
assert header_chord.id == body_id, (
911+
f"Header chord ID {header_chord.id} != body ID {body_id}"
912+
)
913+
914+
def test_chain_errbacks_propagated_to_chord_body(self):
915+
"""Errbacks on a chain must propagate to the chord body.
916+
917+
Regression test for https://github.com/celery/celery/issues/4834
918+
chord_error_from_stack reads errbacks from the chord body
919+
(request.chord), not from the chord task. Without propagation,
920+
link_error handlers set on a chain containing a chord never fire.
921+
"""
922+
@self.app.task(shared=False)
923+
def on_error(*args):
924+
pass
925+
926+
g = group(self.add.si(1, 1), self.add.si(2, 2))
927+
body_chord = chord(g, self.add.s(10), app=self.app)
928+
c = chain(body_chord, self.add.s(20), app=self.app)
929+
930+
# Simulate what apply_async does: pass link_error to prepare_steps
931+
errback = on_error.s()
932+
c.prepare_steps(
933+
c.args, c.kwargs, c.tasks, app=self.app,
934+
link_error=[errback], clone=False,
935+
)
936+
937+
# The chord body must have the errback
938+
chords = [t for t in c.tasks if isinstance(t, chord)]
939+
assert chords
940+
body_errbacks = chords[0].body.options.get('link_error', [])
941+
assert len(body_errbacks) >= 1, (
942+
"Errback must be propagated to chord body"
943+
)
944+
945+
def test_chain_id_matches_result_after_freeze(self):
946+
"""Chain.id must equal the returned result ID after freeze.
947+
948+
After freeze(), a chain's ID should reflect the last task's
949+
result (since that is what the chain's result represents).
950+
"""
951+
c = self.add.s(1, 1) | self.add.s(2) | self.add.s(3)
952+
result = c.freeze()
953+
assert c.id is not None
954+
assert c.id == result.id
955+
956+
def test_chain_id_with_explicit_id_preserved(self):
957+
"""Chain.freeze(explicit_id) must set chain.id to that value."""
958+
c = self.add.s(1, 1) | self.add.s(2)
959+
result = c.freeze('my-explicit-id')
960+
assert c.id == 'my-explicit-id'
961+
assert result.id == 'my-explicit-id'
962+
863963

864964
class test_group(CanvasCase):
865965
def test_repr(self):

0 commit comments

Comments
 (0)