Skip to content

Commit 4a602ff

Browse files
authored
More Python samples (temporalio#65)
Fixes temporalio#53 Fixes temporalio#58 Fixes temporalio#59
1 parent 867a870 commit 4a602ff

16 files changed

+476
-0
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ Some examples require extra dependencies. See each sample's directory for specif
2727
* [hello_activity](hello/hello_activity.py) - Execute an activity from a workflow.
2828
* [hello_activity_choice](hello/hello_activity_choice.py) - Execute certain activities inside a workflow based on
2929
dynamic input.
30+
* [hello_activity_method](hello/hello_activity_method.py) - Demonstrate an activity that is an instance method on a
31+
class and can access class state.
3032
* [hello_activity_multiprocess](hello/hello_activity_multiprocess.py) - Execute a synchronous activity on a process
3133
pool.
3234
* [hello_activity_retry](hello/hello_activity_retry.py) - Demonstrate activity retry by failing until a certain number
@@ -55,6 +57,8 @@ Some examples require extra dependencies. See each sample's directory for specif
5557
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
5658
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
5759
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
60+
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
61+
* [prometheus](prometheus) - Configure Prometheus metrics on clients/workers.
5862
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
5963
* [sentry](sentry) - Report errors to Sentry.
6064

hello/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ Replace `hello_activity.py` in the command with any other example filename to r
1919
* [hello_activity](hello_activity.py) - Execute an activity from a workflow.
2020
* [hello_activity_choice](hello_activity_choice.py) - Execute certain activities inside a workflow based on dynamic
2121
input.
22+
* [hello_activity_method](hello/hello_activity_method.py) - Demonstrate an activity that is an instance method on a
23+
class and can access class state.
2224
* [hello_activity_multiprocess](hello_activity_multiprocess.py) - Execute a synchronous activity on a process pool.
2325
* [hello_activity_retry](hello_activity_retry.py) - Demonstrate activity retry by failing until a certain number of
2426
attempts.

hello/hello_activity_method.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import asyncio
2+
from datetime import timedelta
3+
4+
from temporalio import activity, workflow
5+
from temporalio.client import Client
6+
from temporalio.worker import Worker
7+
8+
9+
class MyDatabaseClient:
10+
async def run_database_update(self) -> None:
11+
print("Database update executed")
12+
13+
14+
class MyActivities:
15+
def __init__(self, db_client: MyDatabaseClient) -> None:
16+
self.db_client = db_client
17+
18+
@activity.defn
19+
async def do_database_thing(self) -> None:
20+
await self.db_client.run_database_update()
21+
22+
23+
@workflow.defn
24+
class MyWorkflow:
25+
@workflow.run
26+
async def run(self) -> None:
27+
await workflow.execute_activity_method(
28+
MyActivities.do_database_thing,
29+
start_to_close_timeout=timedelta(seconds=10),
30+
)
31+
32+
33+
async def main():
34+
# Start client
35+
client = await Client.connect("localhost:7233")
36+
37+
# Create our database client that can then be used in the activity
38+
db_client = MyDatabaseClient()
39+
# Instantiate our class containing state that can be referenced from
40+
# activity methods
41+
my_activities = MyActivities(db_client)
42+
43+
# Run a worker for the workflow
44+
async with Worker(
45+
client,
46+
task_queue="hello-activity-method-task-queue",
47+
workflows=[MyWorkflow],
48+
activities=[my_activities.do_database_thing],
49+
):
50+
51+
# While the worker is running, use the client to run the workflow and
52+
# print out its result. Note, in many production setups, the client
53+
# would be in a completely separate process from the worker.
54+
await client.execute_workflow(
55+
MyWorkflow.run,
56+
id="hello-activity-method-workflow-id",
57+
task_queue="hello-activity-method-task-queue",
58+
)
59+
60+
61+
if __name__ == "__main__":
62+
asyncio.run(main())

patching/README.md

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
# Patching Sample
2+
3+
This sample shows how to safely alter a workflow using `patched` and `deprecate_patch` in stages.
4+
5+
To run, first see [README.md](../README.md) for prerequisites. Then follow the patching stages below.
6+
7+
### Stage 1 - Initial code
8+
9+
This stage is for existing running workflows. To simulate our initial workflow, run the worker in a separate terminal:
10+
11+
poetry run python worker.py --workflow initial
12+
13+
Now we can start this workflow:
14+
15+
poetry run python starter.py --start-workflow initial-workflow-id
16+
17+
This will output "Started workflow with ID initial-workflow-id and ...". Now query this workflow:
18+
19+
poetry run python starter.py --query-workflow initial-workflow-id
20+
21+
This will output "Query result for ID initial-workflow-id: pre-patch".
22+
23+
### Stage 2 - Patch the workflow
24+
25+
This stage is for needing to run old and new workflows at the same time. To simulate our patched workflow, stop the
26+
worker from before and start it again with the patched workflow:
27+
28+
poetry run python worker.py --workflow patched
29+
30+
Now let's start another workflow with this patched code:
31+
32+
poetry run python starter.py --start-workflow patched-workflow-id
33+
34+
This will output "Started workflow with ID patched-workflow-id and ...". Now query the old workflow that's still
35+
running:
36+
37+
poetry run python starter.py --query-workflow initial-workflow-id
38+
39+
This will output "Query result for ID initial-workflow-id: pre-patch" since it is pre-patch. But if we execute a query
40+
against the new code:
41+
42+
poetry run python starter.py --query-workflow patched-workflow-id
43+
44+
We get "Query result for ID patched-workflow-id: post-patch". This is how old workflow code can take old paths and new
45+
workflow code can take new paths.
46+
47+
### Stage 3 - Deprecation
48+
49+
Once we know that all workflows that started with the initial code from "Stage 1" are no longer running, we don't need
50+
the patch so we can deprecate it. To use the patch deprecated workflow, stop the workflow from before and start it again
51+
with:
52+
53+
poetry run python worker.py --workflow patch-deprecated
54+
55+
All workflows in "Stage 2" and any new workflows will work. Now let's start another workflow with this patch deprecated
56+
code:
57+
58+
poetry run python starter.py --start-workflow patch-deprecated-workflow-id
59+
60+
This will output "Started workflow with ID patch-deprecated-workflow-id and ...". Now query the patched workflow that's
61+
still running:
62+
63+
poetry run python starter.py --query-workflow patched-workflow-id
64+
65+
This will output "Query result for ID patched-workflow-id: post-patch". And if we execute a query against the latest
66+
workflow:
67+
68+
poetry run python starter.py --query-workflow patch-deprecated-workflow-id
69+
70+
As expected, this will output "Query result for ID patch-deprecated-workflow-id: post-patch".
71+
72+
### Stage 4 - Patch complete
73+
74+
Once we know we don't even have any workflows running on "Stage 2" or before (i.e. the workflow with the patch with
75+
both code paths), we can just remove the patch deprecation altogether. To use the patch complete workflow, stop the
76+
workflow from before and start it again with:
77+
78+
poetry run python worker.py --workflow patch-complete
79+
80+
All workflows in "Stage 3" and any new workflows will work. Now let's start another workflow with this patch complete
81+
code:
82+
83+
poetry run python starter.py --start-workflow patch-complete-workflow-id
84+
85+
This will output "Started workflow with ID patch-complete-workflow-id and ...". Now query the patch deprecated workflow
86+
that's still running:
87+
88+
poetry run python starter.py --query-workflow patch-deprecated-workflow-id
89+
90+
This will output "Query result for ID patch-deprecated-workflow-id: post-patch". And if we execute a query against the
91+
latest workflow:
92+
93+
poetry run python starter.py --query-workflow patch-complete-workflow-id
94+
95+
As expected, this will output "Query result for ID patch-complete-workflow-id: post-patch".
96+
97+
Following these stages, we have successfully altered our workflow code.

patching/__init__.py

Whitespace-only changes.

patching/activities.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from temporalio import activity
2+
3+
4+
@activity.defn
5+
async def pre_patch_activity() -> str:
6+
return "pre-patch"
7+
8+
9+
@activity.defn
10+
async def post_patch_activity() -> str:
11+
return "post-patch"

patching/starter.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import argparse
2+
import asyncio
3+
4+
from temporalio.client import Client
5+
6+
# Since it's just used for typing purposes, it doesn't matter which one we
7+
# import
8+
from patching.workflow_1_initial import MyWorkflow
9+
10+
11+
async def main():
12+
parser = argparse.ArgumentParser(description="Run worker")
13+
parser.add_argument("--start-workflow", help="Start workflow with this ID")
14+
parser.add_argument("--query-workflow", help="Query workflow with this ID")
15+
args = parser.parse_args()
16+
if not args.start_workflow and not args.query_workflow:
17+
raise RuntimeError("Either --start-workflow or --query-workflow is required")
18+
19+
# Connect client
20+
client = await Client.connect("localhost:7233")
21+
22+
if args.start_workflow:
23+
handle = await client.start_workflow(
24+
MyWorkflow.run, id=args.start_workflow, task_queue="patching-task-queue"
25+
)
26+
print(f"Started workflow with ID {handle.id} and run ID {handle.result_run_id}")
27+
if args.query_workflow:
28+
handle = client.get_workflow_handle_for(MyWorkflow.run, args.query_workflow)
29+
result = await handle.query(MyWorkflow.result)
30+
print(f"Query result for ID {handle.id}: {result}")
31+
32+
33+
if __name__ == "__main__":
34+
asyncio.run(main())

patching/worker.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import argparse
2+
import asyncio
3+
4+
from temporalio.client import Client
5+
from temporalio.worker import Worker
6+
7+
from patching.activities import post_patch_activity, pre_patch_activity
8+
9+
interrupt_event = asyncio.Event()
10+
11+
12+
async def main():
13+
# Import which workflow based on CLI arg
14+
parser = argparse.ArgumentParser(description="Run worker")
15+
parser.add_argument(
16+
"--workflow",
17+
help="Which workflow. Can be 'initial', 'patched', 'patch-deprecated', or 'patch-complete'",
18+
required=True,
19+
)
20+
args = parser.parse_args()
21+
if args.workflow == "initial":
22+
from patching.workflow_1_initial import MyWorkflow
23+
elif args.workflow == "patched":
24+
from patching.workflow_2_patched import MyWorkflow # type: ignore
25+
elif args.workflow == "patch-deprecated":
26+
from patching.workflow_3_patch_deprecated import MyWorkflow # type: ignore
27+
elif args.workflow == "patch-complete":
28+
from patching.workflow_4_patch_complete import MyWorkflow # type: ignore
29+
else:
30+
raise RuntimeError("Unrecognized workflow")
31+
32+
# Connect client
33+
client = await Client.connect("localhost:7233")
34+
35+
# Run a worker for the workflow
36+
async with Worker(
37+
client,
38+
task_queue="patching-task-queue",
39+
workflows=[MyWorkflow],
40+
activities=[pre_patch_activity, post_patch_activity],
41+
):
42+
# Wait until interrupted
43+
print("Worker started")
44+
await interrupt_event.wait()
45+
print("Shutting down")
46+
47+
48+
if __name__ == "__main__":
49+
loop = asyncio.new_event_loop()
50+
try:
51+
loop.run_until_complete(main())
52+
except KeyboardInterrupt:
53+
interrupt_event.set()
54+
loop.run_until_complete(loop.shutdown_asyncgens())

patching/workflow_1_initial.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from datetime import timedelta
2+
3+
from temporalio import workflow
4+
5+
with workflow.unsafe.imports_passed_through():
6+
from patching.activities import pre_patch_activity
7+
8+
9+
@workflow.defn
10+
class MyWorkflow:
11+
@workflow.run
12+
async def run(self) -> None:
13+
self._result = await workflow.execute_activity(
14+
pre_patch_activity,
15+
schedule_to_close_timeout=timedelta(minutes=5),
16+
)
17+
18+
@workflow.query
19+
def result(self) -> str:
20+
return self._result

patching/workflow_2_patched.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from datetime import timedelta
2+
3+
from temporalio import workflow
4+
5+
with workflow.unsafe.imports_passed_through():
6+
from patching.activities import post_patch_activity, pre_patch_activity
7+
8+
9+
@workflow.defn
10+
class MyWorkflow:
11+
@workflow.run
12+
async def run(self) -> None:
13+
if workflow.patched("my-patch"):
14+
self._result = await workflow.execute_activity(
15+
post_patch_activity,
16+
schedule_to_close_timeout=timedelta(minutes=5),
17+
)
18+
else:
19+
self._result = await workflow.execute_activity(
20+
pre_patch_activity,
21+
schedule_to_close_timeout=timedelta(minutes=5),
22+
)
23+
24+
@workflow.query
25+
def result(self) -> str:
26+
return self._result

0 commit comments

Comments
 (0)