Skip to content

Commit 8ccd571

Browse files
authored
Update SDK and custom converter/decorator samples (temporalio#19)
Fixes temporalio#15 Fixes temporalio#14
1 parent ac44173 commit 8ccd571

12 files changed

Lines changed: 362 additions & 59 deletions

File tree

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,6 @@ Some examples require extra dependencies. See each sample's directory for specif
5353
while running.
5454
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow.
5555
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language.
56+
* [custom_converter](custom_converter) - Use a custom payload converter to handle custom types.
57+
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
5658
* [encryption](encryption) - Apply end-to-end encryption for all input/output.

custom_converter/README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Custom Converter Sample
2+
3+
This sample shows how to make a custom payload converter for a type not natively supported by Temporal.
4+
5+
To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the
6+
worker:
7+
8+
poetry run python worker.py
9+
10+
This will start the worker. Then, in another terminal, run the following to execute the workflow:
11+
12+
poetry run python starter.py
13+
14+
The workflow should complete with the hello result. If the custom converter was not set for the custom input and output
15+
classes, we would get an error on the client side and on the worker side.

custom_converter/__init__.py

Whitespace-only changes.

custom_converter/starter.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import asyncio
2+
import dataclasses
3+
4+
import temporalio.converter
5+
from temporalio.client import Client
6+
7+
from custom_converter.worker import (
8+
GreetingInput,
9+
GreetingOutput,
10+
GreetingPayloadConverter,
11+
GreetingWorkflow,
12+
)
13+
14+
15+
async def main():
16+
# Connect client
17+
client = await Client.connect(
18+
"localhost:7233",
19+
# Use the default data converter, but change the payload converter.
20+
# Without this we get:
21+
# TypeError: Object of type GreetingInput is not JSON serializable
22+
data_converter=dataclasses.replace(
23+
temporalio.converter.default(),
24+
payload_converter_class=GreetingPayloadConverter,
25+
),
26+
)
27+
28+
# Run workflow
29+
result = await client.execute_workflow(
30+
GreetingWorkflow.run,
31+
GreetingInput("Temporal"),
32+
id=f"custom_converter-workflow-id",
33+
task_queue="custom_converter-task-queue",
34+
)
35+
assert isinstance(result, GreetingOutput)
36+
print(f"Workflow result: {result.result}")
37+
38+
39+
if __name__ == "__main__":
40+
asyncio.run(main())

custom_converter/worker.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import asyncio
2+
import dataclasses
3+
from typing import Any, Optional, Type
4+
5+
import temporalio.converter
6+
from temporalio import workflow
7+
from temporalio.api.common.v1 import Payload
8+
from temporalio.client import Client
9+
from temporalio.converter import (
10+
CompositePayloadConverter,
11+
DefaultPayloadConverter,
12+
EncodingPayloadConverter,
13+
)
14+
from temporalio.worker import Worker
15+
16+
17+
class GreetingInput:
18+
def __init__(self, name: str) -> None:
19+
self.name = name
20+
21+
22+
class GreetingOutput:
23+
def __init__(self, result: str) -> None:
24+
self.result = result
25+
26+
27+
@workflow.defn
28+
class GreetingWorkflow:
29+
@workflow.run
30+
async def run(self, input: GreetingInput) -> GreetingOutput:
31+
return GreetingOutput(f"Hello, {input.name}")
32+
33+
34+
class GreetingEncodingPayloadConverter(EncodingPayloadConverter):
35+
@property
36+
def encoding(self) -> str:
37+
return "text/my-greeting-encoding"
38+
39+
def to_payload(self, value: Any) -> Optional[Payload]:
40+
if isinstance(value, GreetingInput):
41+
return Payload(
42+
metadata={"encoding": self.encoding.encode(), "is_input": b"true"},
43+
data=value.name.encode(),
44+
)
45+
elif isinstance(value, GreetingOutput):
46+
return Payload(
47+
metadata={"encoding": self.encoding.encode()},
48+
data=value.result.encode(),
49+
)
50+
else:
51+
return None
52+
53+
def from_payload(self, payload: Payload, type_hint: Optional[Type] = None) -> Any:
54+
if payload.metadata.get("is_input") == b"true":
55+
# Confirm proper type hint if present
56+
assert not type_hint or type_hint is GreetingInput
57+
return GreetingInput(payload.data.decode())
58+
else:
59+
assert not type_hint or type_hint is GreetingOutput
60+
return GreetingOutput(payload.data.decode())
61+
62+
63+
class GreetingPayloadConverter(CompositePayloadConverter):
64+
def __init__(self) -> None:
65+
# Just add ours as first before the defaults
66+
super().__init__(
67+
GreetingEncodingPayloadConverter(),
68+
# TODO(cretz): Make this list available without instantiation - https://github.com/temporalio/sdk-python/issues/139
69+
*DefaultPayloadConverter().converters.values(),
70+
)
71+
72+
73+
interrupt_event = asyncio.Event()
74+
75+
76+
async def main():
77+
# Connect client
78+
client = await Client.connect(
79+
"localhost:7233",
80+
# Use the default data converter, but change the payload converter.
81+
# Without this, when trying to run a workflow, we get:
82+
# KeyError: 'Unknown payload encoding my-greeting-encoding
83+
data_converter=dataclasses.replace(
84+
temporalio.converter.default(),
85+
payload_converter_class=GreetingPayloadConverter,
86+
),
87+
)
88+
89+
# Run a worker for the workflow
90+
async with Worker(
91+
client,
92+
task_queue="custom_converter-task-queue",
93+
workflows=[GreetingWorkflow],
94+
):
95+
# Wait until interrupted
96+
print("Worker started, ctrl+c to exit")
97+
await interrupt_event.wait()
98+
print("Shutting down")
99+
100+
101+
if __name__ == "__main__":
102+
loop = asyncio.new_event_loop()
103+
try:
104+
loop.run_until_complete(main())
105+
except KeyboardInterrupt:
106+
interrupt_event.set()
107+
loop.run_until_complete(loop.shutdown_asyncgens())

custom_decorator/README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Custom Decorator Sample
2+
3+
This sample shows a custom decorator can help with Temporal code reuse. Specifically, this makes a `@auto_heartbeater`
4+
decorator that automatically configures an activity to heartbeat twice as frequently as the heartbeat timeout is set to.
5+
6+
To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the
7+
worker:
8+
9+
poetry run python worker.py
10+
11+
This will start the worker. Then, in another terminal, run the following to execute the workflow:
12+
13+
poetry run python starter.py
14+
15+
The workflow will be started, and then after 5 seconds will be sent a signal to cancel its forever-running activity.
16+
The activity has a heartbeat timeout set to 2s, so since it has the `@auto_heartbeater` decorator set, it will heartbeat
17+
every second. If this was not set, the workflow would fail with an activity heartbeat timeout failure.

custom_decorator/__init__.py

Whitespace-only changes.

custom_decorator/activity_utils.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import asyncio
2+
from datetime import datetime
3+
from functools import wraps
4+
from typing import Any, Awaitable, Callable, TypeVar, cast
5+
6+
from temporalio import activity
7+
8+
F = TypeVar("F", bound=Callable[..., Awaitable[Any]])
9+
10+
11+
def auto_heartbeater(fn: F) -> F:
12+
# We want to ensure that the type hints from the original callable are
13+
# available via our wrapper, so we use the functools wraps decorator
14+
@wraps(fn)
15+
async def wrapper(*args, **kwargs):
16+
done = asyncio.Event()
17+
# Heartbeat twice as often as the timeout
18+
heartbeat_timeout = activity.info().heartbeat_timeout
19+
if heartbeat_timeout:
20+
asyncio.create_task(
21+
heartbeat_every(heartbeat_timeout.total_seconds() / 2, done)
22+
)
23+
try:
24+
return await fn(*args, **kwargs)
25+
finally:
26+
done.set()
27+
28+
return cast(F, wrapper)
29+
30+
31+
async def heartbeat_every(
32+
delay: float, done_event: asyncio.Event, *details: Any
33+
) -> None:
34+
# Heartbeat every so often while not cancelled
35+
while not done_event.is_set():
36+
try:
37+
await asyncio.wait_for(done_event.wait(), delay)
38+
except asyncio.TimeoutError:
39+
print(f"Heartbeating at {datetime.now()}")
40+
activity.heartbeat(*details)

custom_decorator/starter.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import asyncio
2+
3+
from temporalio.client import Client
4+
5+
from custom_decorator.worker import WaitForCancelWorkflow
6+
7+
8+
async def main():
9+
# Connect client
10+
client = await Client.connect("localhost:7233")
11+
12+
# Start the workflow
13+
handle = await client.start_workflow(
14+
WaitForCancelWorkflow.run,
15+
id=f"custom_decorator-workflow-id",
16+
task_queue="custom_decorator-task-queue",
17+
)
18+
print("Started workflow, waiting 5 seconds before cancelling")
19+
await asyncio.sleep(5)
20+
21+
# Send a signal asking workflow to cancel the activity
22+
await handle.signal(WaitForCancelWorkflow.cancel_activity)
23+
24+
# Wait and expect to be told about the activity being cancelled. If we did
25+
# not have the automatic heartbeater decorator, the signal would have failed
26+
# because the workflow would already be completed as failed with activity
27+
# heartbeat timeout.
28+
result = await handle.result()
29+
print(f"Result: {result}")
30+
31+
32+
if __name__ == "__main__":
33+
asyncio.run(main())

custom_decorator/worker.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import asyncio
2+
from datetime import timedelta
3+
4+
from temporalio import activity, workflow
5+
from temporalio.client import Client
6+
from temporalio.common import RetryPolicy
7+
from temporalio.worker import Worker
8+
9+
from custom_decorator.activity_utils import auto_heartbeater
10+
11+
12+
# Here we use our automatic heartbeater decorator. If this wasn't present, our
13+
# activity would timeout since it isn't heartbeating.
14+
@activity.defn
15+
@auto_heartbeater
16+
async def wait_for_cancel_activity() -> str:
17+
# Wait forever, catch the cancel, and return some value
18+
try:
19+
await asyncio.Future()
20+
raise RuntimeError("unreachable")
21+
except asyncio.CancelledError:
22+
return "activity cancelled!"
23+
24+
25+
@workflow.defn
26+
class WaitForCancelWorkflow:
27+
@workflow.run
28+
async def run(self) -> str:
29+
# Start activity and wait on it (it will get cancelled from signal)
30+
self.activity = workflow.start_activity(
31+
wait_for_cancel_activity,
32+
start_to_close_timeout=timedelta(hours=20),
33+
# We set a heartbeat timeout so Temporal knows if the activity
34+
# failed/crashed. If we don't heartbeat within this time, Temporal
35+
# will consider the activity failed.
36+
heartbeat_timeout=timedelta(seconds=2),
37+
# Tell the activity not to retry for demonstration purposes only
38+
retry_policy=RetryPolicy(maximum_attempts=1),
39+
# Tell the workflow to wait for the post-cancel result
40+
cancellation_type=workflow.ActivityCancellationType.WAIT_CANCELLATION_COMPLETED,
41+
)
42+
return await self.activity
43+
44+
@workflow.signal
45+
def cancel_activity(self) -> None:
46+
self.activity.cancel()
47+
48+
49+
interrupt_event = asyncio.Event()
50+
51+
52+
async def main():
53+
# Connect client
54+
client = await Client.connect("localhost:7233")
55+
56+
# Run a worker for the workflow
57+
async with Worker(
58+
client,
59+
task_queue="custom_decorator-task-queue",
60+
workflows=[WaitForCancelWorkflow],
61+
activities=[wait_for_cancel_activity],
62+
):
63+
# Wait until interrupted
64+
print("Worker started, ctrl+c to exit")
65+
await interrupt_event.wait()
66+
print("Shutting down")
67+
68+
69+
if __name__ == "__main__":
70+
loop = asyncio.new_event_loop()
71+
try:
72+
loop.run_until_complete(main())
73+
except KeyboardInterrupt:
74+
interrupt_event.set()
75+
loop.run_until_complete(loop.shutdown_asyncgens())

0 commit comments

Comments
 (0)