forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhello_signal_test.py
More file actions
22 lines (16 loc) · 915 Bytes
/
hello_signal_test.py
File metadata and controls
22 lines (16 loc) · 915 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import uuid
from temporalio.client import Client, WorkflowExecutionStatus
from temporalio.worker import Worker
from hello.hello_signal import GreetingWorkflow
async def test_signal_workflow(client: Client):
task_queue_name = str(uuid.uuid4())
async with Worker(client, task_queue=task_queue_name, workflows=[GreetingWorkflow]):
handle = await client.start_workflow(
GreetingWorkflow.run, id=str(uuid.uuid4()), task_queue=task_queue_name
)
await handle.signal(GreetingWorkflow.submit_greeting, "user1")
await handle.signal(GreetingWorkflow.submit_greeting, "user2")
assert WorkflowExecutionStatus.RUNNING == (await handle.describe()).status
await handle.signal(GreetingWorkflow.exit)
assert ["Hello, user1", "Hello, user2"] == await handle.result()
assert WorkflowExecutionStatus.COMPLETED == (await handle.describe()).status