Skip to content

Commit 68ba233

Browse files
authored
Fix custom_converter and infrequent_polling samples (temporalio#95)
Fixes temporalio#93 Fixes temporalio#90
1 parent ba5a87f commit 68ba233

7 files changed

Lines changed: 115 additions & 88 deletions

File tree

custom_converter/shared.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import dataclasses
2+
from typing import Any, Optional, Type
3+
4+
import temporalio.converter
5+
from temporalio.api.common.v1 import Payload
6+
from temporalio.converter import (
7+
CompositePayloadConverter,
8+
DefaultPayloadConverter,
9+
EncodingPayloadConverter,
10+
)
11+
12+
13+
class GreetingInput:
14+
def __init__(self, name: str) -> None:
15+
self.name = name
16+
17+
18+
class GreetingOutput:
19+
def __init__(self, result: str) -> None:
20+
self.result = result
21+
22+
23+
class GreetingEncodingPayloadConverter(EncodingPayloadConverter):
24+
@property
25+
def encoding(self) -> str:
26+
return "text/my-greeting-encoding"
27+
28+
def to_payload(self, value: Any) -> Optional[Payload]:
29+
if isinstance(value, GreetingInput):
30+
return Payload(
31+
metadata={"encoding": self.encoding.encode(), "is_input": b"true"},
32+
data=value.name.encode(),
33+
)
34+
elif isinstance(value, GreetingOutput):
35+
return Payload(
36+
metadata={"encoding": self.encoding.encode()},
37+
data=value.result.encode(),
38+
)
39+
else:
40+
return None
41+
42+
def from_payload(self, payload: Payload, type_hint: Optional[Type] = None) -> Any:
43+
if payload.metadata.get("is_input") == b"true":
44+
# Confirm proper type hint if present
45+
assert not type_hint or type_hint is GreetingInput
46+
return GreetingInput(payload.data.decode())
47+
else:
48+
assert not type_hint or type_hint is GreetingOutput
49+
return GreetingOutput(payload.data.decode())
50+
51+
52+
class GreetingPayloadConverter(CompositePayloadConverter):
53+
def __init__(self) -> None:
54+
# Just add ours as first before the defaults
55+
super().__init__(
56+
GreetingEncodingPayloadConverter(),
57+
# TODO(cretz): Make this list available without instantiation - https://github.com/temporalio/sdk-python/issues/139
58+
*DefaultPayloadConverter().converters.values(),
59+
)
60+
61+
62+
# Use the default data converter, but change the payload converter.
63+
greeting_data_converter = dataclasses.replace(
64+
temporalio.converter.default(),
65+
payload_converter_class=GreetingPayloadConverter,
66+
)

custom_converter/starter.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,22 @@
11
import asyncio
2-
import dataclasses
32

4-
import temporalio.converter
53
from temporalio.client import Client
64

7-
from custom_converter.worker import (
5+
from custom_converter.shared import (
86
GreetingInput,
97
GreetingOutput,
10-
GreetingPayloadConverter,
11-
GreetingWorkflow,
8+
greeting_data_converter,
129
)
10+
from custom_converter.workflow import GreetingWorkflow
1311

1412

1513
async def main():
1614
# Connect client
1715
client = await Client.connect(
1816
"localhost:7233",
19-
# Use the default data converter, but change the payload converter.
2017
# Without this we get:
2118
# TypeError: Object of type GreetingInput is not JSON serializable
22-
data_converter=dataclasses.replace(
23-
temporalio.converter.default(),
24-
payload_converter_class=GreetingPayloadConverter,
25-
),
19+
data_converter=greeting_data_converter,
2620
)
2721

2822
# Run workflow

custom_converter/worker.py

Lines changed: 3 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,74 +1,10 @@
11
import asyncio
2-
import dataclasses
3-
from typing import Any, Optional, Type
42

5-
import temporalio.converter
6-
from temporalio import workflow
7-
from temporalio.api.common.v1 import Payload
83
from temporalio.client import Client
9-
from temporalio.converter import (
10-
CompositePayloadConverter,
11-
DefaultPayloadConverter,
12-
EncodingPayloadConverter,
13-
)
144
from temporalio.worker import Worker
155

16-
17-
class GreetingInput:
18-
def __init__(self, name: str) -> None:
19-
self.name = name
20-
21-
22-
class GreetingOutput:
23-
def __init__(self, result: str) -> None:
24-
self.result = result
25-
26-
27-
@workflow.defn
28-
class GreetingWorkflow:
29-
@workflow.run
30-
async def run(self, input: GreetingInput) -> GreetingOutput:
31-
return GreetingOutput(f"Hello, {input.name}")
32-
33-
34-
class GreetingEncodingPayloadConverter(EncodingPayloadConverter):
35-
@property
36-
def encoding(self) -> str:
37-
return "text/my-greeting-encoding"
38-
39-
def to_payload(self, value: Any) -> Optional[Payload]:
40-
if isinstance(value, GreetingInput):
41-
return Payload(
42-
metadata={"encoding": self.encoding.encode(), "is_input": b"true"},
43-
data=value.name.encode(),
44-
)
45-
elif isinstance(value, GreetingOutput):
46-
return Payload(
47-
metadata={"encoding": self.encoding.encode()},
48-
data=value.result.encode(),
49-
)
50-
else:
51-
return None
52-
53-
def from_payload(self, payload: Payload, type_hint: Optional[Type] = None) -> Any:
54-
if payload.metadata.get("is_input") == b"true":
55-
# Confirm proper type hint if present
56-
assert not type_hint or type_hint is GreetingInput
57-
return GreetingInput(payload.data.decode())
58-
else:
59-
assert not type_hint or type_hint is GreetingOutput
60-
return GreetingOutput(payload.data.decode())
61-
62-
63-
class GreetingPayloadConverter(CompositePayloadConverter):
64-
def __init__(self) -> None:
65-
# Just add ours as first before the defaults
66-
super().__init__(
67-
GreetingEncodingPayloadConverter(),
68-
# TODO(cretz): Make this list available without instantiation - https://github.com/temporalio/sdk-python/issues/139
69-
*DefaultPayloadConverter().converters.values(),
70-
)
71-
6+
from custom_converter.shared import greeting_data_converter
7+
from custom_converter.workflow import GreetingWorkflow
728

739
interrupt_event = asyncio.Event()
7410

@@ -77,13 +13,9 @@ async def main():
7713
# Connect client
7814
client = await Client.connect(
7915
"localhost:7233",
80-
# Use the default data converter, but change the payload converter.
8116
# Without this, when trying to run a workflow, we get:
8217
# KeyError: 'Unknown payload encoding my-greeting-encoding
83-
data_converter=dataclasses.replace(
84-
temporalio.converter.default(),
85-
payload_converter_class=GreetingPayloadConverter,
86-
),
18+
data_converter=greeting_data_converter,
8719
)
8820

8921
# Run a worker for the workflow

custom_converter/workflow.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from temporalio import workflow
2+
3+
with workflow.unsafe.imports_passed_through():
4+
from custom_converter.shared import GreetingInput, GreetingOutput
5+
6+
7+
@workflow.defn
8+
class GreetingWorkflow:
9+
@workflow.run
10+
async def run(self, input: GreetingInput) -> GreetingOutput:
11+
return GreetingOutput(f"Hello, {input.name}")

polling/infrequent/activities.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import asyncio
21
from dataclasses import dataclass
32

43
from temporalio import activity
@@ -15,9 +14,6 @@ class ComposeGreetingInput:
1514
@activity.defn
1615
async def compose_greeting(input: ComposeGreetingInput) -> str:
1716
test_service = TestService()
18-
while True:
19-
try:
20-
result = test_service.get_service_result(input)
21-
return result
22-
except Exception:
23-
activity.heartbeat("Invoking activity")
17+
# If this raises an exception because it's not done yet, the activity will
18+
# continually be scheduled for retry
19+
return await test_service.get_service_result(input)

tests/custom_converter/__init__.py

Whitespace-only changes.
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import uuid
2+
3+
from temporalio.client import Client
4+
from temporalio.worker import Worker
5+
6+
from custom_converter.shared import (
7+
GreetingInput,
8+
GreetingOutput,
9+
greeting_data_converter,
10+
)
11+
from custom_converter.workflow import GreetingWorkflow
12+
13+
14+
async def test_workflow_with_custom_converter(client: Client):
15+
# Replace data converter in client
16+
new_config = client.config()
17+
new_config["data_converter"] = greeting_data_converter
18+
client = Client(**new_config)
19+
task_queue = f"tq-{uuid.uuid4()}"
20+
async with Worker(client, task_queue=task_queue, workflows=[GreetingWorkflow]):
21+
result = await client.execute_workflow(
22+
GreetingWorkflow.run,
23+
GreetingInput("Temporal"),
24+
id=f"wf-{uuid.uuid4()}",
25+
task_queue=task_queue,
26+
)
27+
assert isinstance(result, GreetingOutput)
28+
assert result.result == "Hello, Temporal"

0 commit comments

Comments
 (0)