Skip to content

Commit f21615a

Browse files
authored
Ensure a strong reference to asyncio Task for auto-heartbeater (temporalio#43)
1 parent af94df0 commit f21615a

2 files changed

Lines changed: 21 additions & 15 deletions

File tree

custom_decorator/activity_utils.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,28 +13,27 @@ def auto_heartbeater(fn: F) -> F:
1313
# available via our wrapper, so we use the functools wraps decorator
1414
@wraps(fn)
1515
async def wrapper(*args, **kwargs):
16-
done = asyncio.Event()
17-
# Heartbeat twice as often as the timeout
1816
heartbeat_timeout = activity.info().heartbeat_timeout
17+
heartbeat_task = None
1918
if heartbeat_timeout:
20-
asyncio.create_task(
21-
heartbeat_every(heartbeat_timeout.total_seconds() / 2, done)
19+
# Heartbeat twice as often as the timeout
20+
heartbeat_task = asyncio.create_task(
21+
heartbeat_every(heartbeat_timeout.total_seconds() / 2)
2222
)
2323
try:
2424
return await fn(*args, **kwargs)
2525
finally:
26-
done.set()
26+
if heartbeat_task:
27+
heartbeat_task.cancel()
28+
# Wait for heartbeat cancellation to complete
29+
await asyncio.wait([heartbeat_task])
2730

2831
return cast(F, wrapper)
2932

3033

31-
async def heartbeat_every(
32-
delay: float, done_event: asyncio.Event, *details: Any
33-
) -> None:
34+
async def heartbeat_every(delay: float, *details: Any) -> None:
3435
# Heartbeat every so often while not cancelled
35-
while not done_event.is_set():
36-
try:
37-
await asyncio.wait_for(done_event.wait(), delay)
38-
except asyncio.TimeoutError:
39-
print(f"Heartbeating at {datetime.now()}")
40-
activity.heartbeat(*details)
36+
while True:
37+
await asyncio.sleep(delay)
38+
print(f"Heartbeating at {datetime.now()}")
39+
activity.heartbeat(*details)

hello/hello_async_activity_completion.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,14 @@ async def compose_greeting(self, input: ComposeGreetingInput) -> str:
2222
# Schedule a task to complete this asynchronously. This could be done in
2323
# a completely different process or system.
2424
print("Completing activity asynchronously")
25-
asyncio.create_task(self.complete_greeting(activity.info().task_token, input))
25+
# Tasks stored by asyncio are weak references and therefore can get GC'd
26+
# which can cause warnings like "Task was destroyed but it is pending!".
27+
# So we store the tasks ourselves.
28+
# See https://docs.python.org/3/library/asyncio-task.html#creating-tasks,
29+
# https://bugs.python.org/issue21163 and others.
30+
_ = asyncio.create_task(
31+
self.complete_greeting(activity.info().task_token, input)
32+
)
2633

2734
# Raise the complete-async error which will complete this function but
2835
# does not consider the activity complete from the workflow perspective

0 commit comments

Comments
 (0)