forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworkflow_test.py
More file actions
31 lines (26 loc) · 1018 Bytes
/
workflow_test.py
File metadata and controls
31 lines (26 loc) · 1018 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import uuid
import pytest
from temporalio.client import Client
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
from polling.infrequent.activities import compose_greeting
from polling.infrequent.workflows import GreetingWorkflow
async def test_infrequent_polling_workflow(client: Client, env: WorkflowEnvironment):
if not env.supports_time_skipping:
pytest.skip("Too slow to test with time-skipping disabled")
# Start a worker that hosts the workflow and activity implementations.
task_queue = f"tq-{uuid.uuid4()}"
async with Worker(
client,
task_queue=task_queue,
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
handle = await client.start_workflow(
GreetingWorkflow.run,
"Temporal",
id=f"infrequent-polling-{uuid.uuid4()}",
task_queue=task_queue,
)
result = await handle.result()
assert result == "Hello, Temporal!"