forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhello_activity_heartbeat.py
More file actions
70 lines (58 loc) · 2.13 KB
/
hello_activity_heartbeat.py
File metadata and controls
70 lines (58 loc) · 2.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
@activity.defn
def compose_greeting(input: ComposeGreetingInput) -> str:
# We'll wait for 3 seconds, heartbeating in between (like all long-running
# activities should do), then return the greeting
for _ in range(0, 3):
print(f"Heartbeating activity")
activity.heartbeat()
time.sleep(1)
return f"{input.greeting}, {input.name}!"
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
# Always set a heartbeat timeout for long-running activities
heartbeat_timeout=timedelta(seconds=2),
)
async def main():
# Start client
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(**config)
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-heartbeating-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
activity_executor=ThreadPoolExecutor(5),
):
# While the worker is running, use the client to run the workflow and
# print out its result. Note, in many production setups, the client
# would be in a completely separate process from the worker.
result = await client.execute_workflow(
GreetingWorkflow.run,
"World",
id="hello-activity-heartbeating-workflow-id",
task_queue="hello-activity-heartbeating-task-queue",
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())