forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhello_query_test.py
More file actions
26 lines (21 loc) · 956 Bytes
/
hello_query_test.py
File metadata and controls
26 lines (21 loc) · 956 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import uuid
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
from hello.hello_query import GreetingWorkflow
async def test_query_workflow():
task_queue_name = str(uuid.uuid4())
# start manual time skipping
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
env.client, task_queue=task_queue_name, workflows=[GreetingWorkflow]
):
handle = await env.client.start_workflow(
GreetingWorkflow.run,
"World",
id=str(uuid.uuid4()),
task_queue=task_queue_name,
)
assert "Hello, World!" == await handle.query(GreetingWorkflow.greeting)
# manually skip 3 seconds. This will allow the workflow execution to move forward
await env.sleep(3)
assert "Goodbye, World!" == await handle.query(GreetingWorkflow.greeting)