forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworkflow_test.py
More file actions
28 lines (24 loc) · 898 Bytes
/
workflow_test.py
File metadata and controls
28 lines (24 loc) · 898 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import uuid
from temporalio.client import Client
from temporalio.worker import Worker
from custom_converter.shared import (
GreetingInput,
GreetingOutput,
greeting_data_converter,
)
from custom_converter.workflow import GreetingWorkflow
async def test_workflow_with_custom_converter(client: Client):
# Replace data converter in client
new_config = client.config()
new_config["data_converter"] = greeting_data_converter
client = Client(**new_config)
task_queue = f"tq-{uuid.uuid4()}"
async with Worker(client, task_queue=task_queue, workflows=[GreetingWorkflow]):
result = await client.execute_workflow(
GreetingWorkflow.run,
GreetingInput("Temporal"),
id=f"wf-{uuid.uuid4()}",
task_queue=task_queue,
)
assert isinstance(result, GreetingOutput)
assert result.result == "Hello, Temporal"