Skip to content

Commit 0b1c586

Browse files
authored
Simple samples (temporalio#4)
1 parent a5b67e4 commit 0b1c586

23 files changed

Lines changed: 1328 additions & 294 deletions

README.md

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,36 @@ With this repository cloned, run the following at the root of the directory:
2020

2121
That loads all dependencies. Then to run a sample, usually you just run it in Python. For example:
2222

23-
poetry run python hello_world/hello_world.py
23+
poetry run python hello/hello_activity.py
2424

2525
See each sample's directory for specific instructions.
2626

2727
## Samples
2828

29-
* [Activity Worker](activity_worker) - Use Python activities from a workflow in another language
30-
* [Hello World](hello_world) - Basic hello world workflow and activity
29+
* [hello](hello) - All of the basic features.
30+
<!-- Keep this list in alphabetical order and in sync on hello/README.md and root README.md -->
31+
* [hello_activity](hello/hello_activity.py) - Execute an activity from a workflow.
32+
* [hello_activity_choice](hello/hello_activity_choice.py) - Execute certain activities inside a workflow based on
33+
dynamic input.
34+
* [hello_activity_multiprocess](hello/hello_activity_multiprocess.py) - Execute a synchronous activity on a process
35+
pool.
36+
* [hello_activity_retry](hello/hello_activity_retry.py) - Demonstrate activity retry by failing until a certain number
37+
of attempts.
38+
* [hello_activity_threaded](hello/hello_activity_threaded.py) - Execute a synchronous activity on a thread pool.
39+
* [hello_async_activity_completion](hello/hello_async_activity_completion.py) - Complete an activity outside of the
40+
function that was called.
41+
* [hello_cancellation](hello/hello_cancellation.py) - Manually react to cancellation inside workflows and activities.
42+
* [hello_child_workflow](hello/hello_child_workflow.py) - Execute a child workflow from a workflow.
43+
* [hello_continue_as_new](hello/hello_continue_as_new.py) - Use continue as new to restart a workflow.
44+
* [hello_cron](hello/hello_cron.py) - Execute a workflow once a minute.
45+
* [hello_exception](hello/hello_exception.py) - Execute an activity that raises an error out of the workflow and out
46+
of the program.
47+
* [hello_local_activity](hello/hello_local_activity.py) - Execute a local activity from a workflow.
48+
* [hello_mtls](hello/hello_mtls.py) - Accept URL, namespace, and certificate info as CLI args and use mTLS for
49+
connecting to server.
50+
* [hello_parallel_activity](hello/hello_parallel_activity.py) - Execute multiple activities at once.
51+
* [hello_query](hello/hello_query.py) - Invoke queries on a workflow.
52+
* [hello_search_attributes](hello/hello_search_attributes.py) - Start workflow with search attributes then change
53+
while running.
54+
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow.
55+
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language.

activity_worker/activity_worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ async def say_hello_activity(name: str) -> str:
1818

1919
async def main():
2020
# Create client to localhost on default namespace
21-
client = await Client.connect("http://localhost:7233")
21+
client = await Client.connect("localhost:7233")
2222

2323
# Run activity worker
2424
async with Worker(client, task_queue=task_queue, activities=[say_hello_activity]):

hello/README.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Hello Samples
2+
3+
These samples show basic workflow and activity features.
4+
5+
To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to run the
6+
`hello_activity.py` sample:
7+
8+
poetry run python hello_activity.py
9+
10+
The result will be:
11+
12+
Result: Hello, World!
13+
14+
Replace `hello_activity.py` in the command with any other example filename to run it instead.
15+
16+
## Samples
17+
18+
<!-- Keep this list in alphabetical order and in sync on hello/README.md and root README.md -->
19+
* [hello_activity](hello_activity.py) - Execute an activity from a workflow.
20+
* [hello_activity_choice](hello_activity_choice.py) - Execute certain activities inside a workflow based on dynamic
21+
input.
22+
* [hello_activity_multiprocess](hello_activity_multiprocess.py) - Execute a synchronous activity on a process pool.
23+
* [hello_activity_retry](hello_activity_retry.py) - Demonstrate activity retry by failing until a certain number of
24+
attempts.
25+
* [hello_activity_threaded](hello_activity_threaded.py) - Execute a synchronous activity on a thread pool.
26+
* [hello_async_activity_completion](hello_async_activity_completion.py) - Complete an activity outside of the function
27+
that was called.
28+
* [hello_cancellation](hello_cancellation.py) - Manually react to cancellation inside workflows and activities.
29+
* [hello_child_workflow](hello_child_workflow.py) - Execute a child workflow from a workflow.
30+
* [hello_continue_as_new](hello_continue_as_new.py) - Use continue as new to restart a workflow.
31+
* [hello_cron](hello_cron.py) - Execute a workflow once a minute.
32+
* [hello_exception](hello_exception.py) - Execute an activity that raises an error out of the workflow and out of the
33+
program.
34+
* [hello_local_activity](hello_local_activity.py) - Execute a local activity from a workflow.
35+
* [hello_mtls](hello_mtls.py) - Accept URL, namespace, and certificate info as CLI args and use mTLS for connecting to
36+
server.
37+
* [hello_parallel_activity](hello_parallel_activity.py) - Execute multiple activities at once.
38+
* [hello_query](hello_query.py) - Invoke queries on a workflow.
39+
* [hello_search_attributes](hello_search_attributes.py) - Start workflow with search attributes then change while
40+
running.
41+
* [hello_signal](hello_signal.py) - Send signals to a workflow.
Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,39 @@
11
import asyncio
22
import logging
3+
from dataclasses import dataclass
34
from datetime import timedelta
45

56
from temporalio import activity, workflow
67
from temporalio.client import Client
78
from temporalio.worker import Worker
89

910

11+
# While we could use multiple parameters in the activity, Temporal strongly
12+
# encourages using a single dataclass instead which can have fields added to it
13+
# in a backwards-compatible way.
14+
@dataclass
15+
class ComposeGreetingInput:
16+
greeting: str
17+
name: str
18+
19+
1020
# Basic activity that logs and does string concatenation
1121
@activity.defn
12-
async def say_hello_activity(name: str) -> str:
13-
activity.logger.info("Running activity with parameter %s" % name)
14-
return f"Hello, {name}!"
22+
async def compose_greeting(input: ComposeGreetingInput) -> str:
23+
activity.logger.info("Running activity with parameter %s" % input)
24+
return f"{input.greeting}, {input.name}!"
1525

1626

1727
# Basic workflow that logs and invokes an activity
1828
@workflow.defn
19-
class SayHelloWorkflow:
29+
class GreetingWorkflow:
2030
@workflow.run
2131
async def run(self, name: str) -> str:
2232
workflow.logger.info("Running workflow with parameter %s" % name)
2333
return await workflow.execute_activity(
24-
say_hello_activity, name, start_to_close_timeout=timedelta(seconds=10)
34+
compose_greeting,
35+
ComposeGreetingInput("Hello", name),
36+
start_to_close_timeout=timedelta(seconds=10),
2537
)
2638

2739

@@ -30,24 +42,24 @@ async def main():
3042
# logging.basicConfig(level=logging.INFO)
3143

3244
# Start client
33-
client = await Client.connect("http://localhost:7233")
45+
client = await Client.connect("localhost:7233")
3446

3547
# Run a worker for the workflow
3648
async with Worker(
3749
client,
38-
task_queue="my-task-queue",
39-
workflows=[SayHelloWorkflow],
40-
activities=[say_hello_activity],
50+
task_queue="hello-activity-task-queue",
51+
workflows=[GreetingWorkflow],
52+
activities=[compose_greeting],
4153
):
4254

4355
# While the worker is running, use the client to run the workflow and
4456
# print out its result. Note, in many production setups, the client
4557
# would be in a completely separate process from the worker.
4658
result = await client.execute_workflow(
47-
SayHelloWorkflow.run,
48-
"Temporal",
49-
id="my-workflow-id",
50-
task_queue="my-task-queue",
59+
GreetingWorkflow.run,
60+
"World",
61+
id="hello-activity-workflow-id",
62+
task_queue="hello-activity-task-queue",
5163
)
5264
print(f"Result: {result}")
5365

hello/hello_activity_choice.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import asyncio
2+
from dataclasses import dataclass
3+
from datetime import timedelta
4+
from enum import IntEnum
5+
from typing import List
6+
7+
from temporalio import activity, workflow
8+
from temporalio.client import Client
9+
from temporalio.worker import Worker
10+
11+
# Activities that will be called by the workflow
12+
13+
14+
@activity.defn
15+
async def order_apples(amount: int) -> str:
16+
return f"Ordered {amount} Apples..."
17+
18+
19+
@activity.defn
20+
async def order_bananas(amount: int) -> str:
21+
return f"Ordered {amount} Bananas..."
22+
23+
24+
@activity.defn
25+
async def order_cherries(amount: int) -> str:
26+
return f"Ordered {amount} Cherries..."
27+
28+
29+
@activity.defn
30+
async def order_oranges(amount: int) -> str:
31+
return f"Ordered {amount} Oranges..."
32+
33+
34+
# We have to make enumerates IntEnum to be JSON serializable
35+
class Fruit(IntEnum):
36+
APPLE = 1
37+
BANANA = 2
38+
CHERRY = 3
39+
ORANGE = 4
40+
41+
42+
@dataclass
43+
class ShoppingListItem:
44+
fruit: Fruit
45+
amount: int
46+
47+
48+
@dataclass
49+
class ShoppingList:
50+
items: List[ShoppingListItem]
51+
52+
53+
# Basic workflow that logs and invokes different activities based on input
54+
@workflow.defn
55+
class PurchaseFruitsWorkflow:
56+
@workflow.run
57+
async def run(self, list: ShoppingList) -> str:
58+
# Order each thing on the list
59+
ordered: List[str] = []
60+
for item in list.items:
61+
if item.fruit is Fruit.APPLE:
62+
ordered.append(
63+
await workflow.execute_activity(
64+
order_apples,
65+
item.amount,
66+
start_to_close_timeout=timedelta(seconds=5),
67+
)
68+
)
69+
elif item.fruit is Fruit.BANANA:
70+
ordered.append(
71+
await workflow.execute_activity(
72+
order_bananas,
73+
item.amount,
74+
start_to_close_timeout=timedelta(seconds=5),
75+
)
76+
)
77+
elif item.fruit is Fruit.CHERRY:
78+
ordered.append(
79+
await workflow.execute_activity(
80+
order_cherries,
81+
item.amount,
82+
start_to_close_timeout=timedelta(seconds=5),
83+
)
84+
)
85+
elif item.fruit is Fruit.ORANGE:
86+
ordered.append(
87+
await workflow.execute_activity(
88+
order_oranges,
89+
item.amount,
90+
start_to_close_timeout=timedelta(seconds=5),
91+
)
92+
)
93+
else:
94+
raise ValueError(f"Unrecognized fruit: {item.fruit}")
95+
return "".join(ordered)
96+
97+
98+
async def main():
99+
# Start client
100+
client = await Client.connect("localhost:7233")
101+
102+
# Run a worker for the workflow
103+
async with Worker(
104+
client,
105+
task_queue="hello-activity-choice-task-queue",
106+
workflows=[PurchaseFruitsWorkflow],
107+
activities=[order_apples, order_bananas, order_cherries, order_oranges],
108+
):
109+
110+
# While the worker is running, use the client to run the workflow and
111+
# print out its result. Note, in many production setups, the client
112+
# would be in a completely separate process from the worker.
113+
result = await client.execute_workflow(
114+
PurchaseFruitsWorkflow.run,
115+
ShoppingList(
116+
[
117+
ShoppingListItem(Fruit.APPLE, 8),
118+
ShoppingListItem(Fruit.BANANA, 5),
119+
ShoppingListItem(Fruit.CHERRY, 1),
120+
ShoppingListItem(Fruit.ORANGE, 4),
121+
]
122+
),
123+
id="hello-activity-choice-workflow-id",
124+
task_queue="hello-activity-choice-task-queue",
125+
)
126+
print(f"Order result: {result}")
127+
128+
129+
if __name__ == "__main__":
130+
asyncio.run(main())
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import asyncio
2+
import multiprocessing
3+
import os
4+
import time
5+
from concurrent.futures import ProcessPoolExecutor
6+
from dataclasses import dataclass
7+
from datetime import timedelta
8+
9+
from temporalio import activity, workflow
10+
from temporalio.client import Client
11+
from temporalio.worker import SharedStateManager, Worker
12+
13+
14+
@dataclass
15+
class ComposeGreetingInput:
16+
greeting: str
17+
name: str
18+
19+
20+
@activity.defn
21+
def compose_greeting(input: ComposeGreetingInput) -> str:
22+
# We'll wait for 3 seconds, heartbeating in between (like all long-running
23+
# activities should do), then return the greeting
24+
for _ in range(0, 3):
25+
print(f"Heartbeating activity on PID {os.getpid()}")
26+
activity.heartbeat()
27+
time.sleep(1)
28+
return f"{input.greeting}, {input.name}!"
29+
30+
31+
@workflow.defn
32+
class GreetingWorkflow:
33+
@workflow.run
34+
async def run(self, name: str) -> str:
35+
return await workflow.execute_activity(
36+
compose_greeting,
37+
ComposeGreetingInput("Hello", name),
38+
start_to_close_timeout=timedelta(seconds=10),
39+
# Always set a heartbeat timeout for long-running activities
40+
heartbeat_timeout=timedelta(seconds=2),
41+
)
42+
43+
44+
async def main():
45+
# Start client
46+
client = await Client.connect("localhost:7233")
47+
48+
# Run a worker for the workflow
49+
async with Worker(
50+
client,
51+
task_queue="hello-activity-multiprocess-task-queue",
52+
workflows=[GreetingWorkflow],
53+
activities=[compose_greeting],
54+
# Synchronous activities are not allowed unless we provide some kind of
55+
# executor. Here we are giving a process pool executor which means the
56+
# activity will actually run in a separate process. This same executor
57+
# could be passed to multiple workers if desired.
58+
activity_executor=ProcessPoolExecutor(5),
59+
# Since we are using an executor that is not a thread pool executor,
60+
# Temporal needs some kind of manager to share state such as
61+
# cancellation info and heartbeat info between the host and the
62+
# activity. Therefore, we must provide a shared_state_manager here. A
63+
# helper is provided to create it from a multiprocessing manager.
64+
shared_state_manager=SharedStateManager.create_from_multiprocessing(
65+
multiprocessing.Manager()
66+
),
67+
):
68+
69+
# While the worker is running, use the client to run the workflow and
70+
# print out its result. Note, in many production setups, the client
71+
# would be in a completely separate process from the worker.
72+
result = await client.execute_workflow(
73+
GreetingWorkflow.run,
74+
"World",
75+
id="hello-activity-multiprocess-workflow-id",
76+
task_queue="hello-activity-multiprocess-task-queue",
77+
)
78+
print(f"Result on PID {os.getpid()}: {result}")
79+
80+
81+
if __name__ == "__main__":
82+
asyncio.run(main())

0 commit comments

Comments
 (0)