Skip to content

Commit f5d9380

Browse files
authored
Added sample for demonstrating how to patch a workflow (temporalio#57)
1 parent a4f0a05 commit f5d9380

File tree

1 file changed

+144
-0
lines changed

1 file changed

+144
-0
lines changed

hello/hello_patch.py

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
import asyncio
2+
import logging
3+
import sys
4+
from dataclasses import dataclass
5+
from datetime import timedelta
6+
7+
from temporalio import activity, exceptions, workflow
8+
from temporalio.client import Client
9+
from temporalio.worker import Worker
10+
11+
12+
# While we could use multiple parameters in the activity, Temporal strongly
13+
# encourages using a single dataclass instead which can have fields added to it
14+
# in a backwards-compatible way.
15+
@dataclass
16+
class ComposeGreetingInput:
17+
greeting: str
18+
name: str
19+
20+
21+
# Basic activity that logs and does string concatenation
22+
@activity.defn
23+
async def compose_greeting(input: ComposeGreetingInput) -> str:
24+
activity.logger.info("Running activity with parameter %s" % input)
25+
return f"{input.greeting}, {input.name}!"
26+
27+
28+
# V1 of patch-workflow
29+
@workflow.defn(name="patch-workflow")
30+
class MyWorkflow:
31+
@workflow.run
32+
async def run(self, name: str) -> str:
33+
workflow.logger.info("Running patch-workflow with parameter %s" % name)
34+
greeting = await workflow.execute_activity(
35+
compose_greeting,
36+
ComposeGreetingInput("Hello", name),
37+
start_to_close_timeout=timedelta(seconds=70),
38+
)
39+
return greeting
40+
41+
42+
# V2 of patch-workflow using patched where we have changed newly started
43+
# workflow behavior without changing the behavior of currently running workflows
44+
@workflow.defn(name="patch-workflow")
45+
class MyWorkflowPatched:
46+
@workflow.run
47+
async def run(self, name: str) -> str:
48+
workflow.logger.info("Running patch-workflow with parameter %s" % name)
49+
if workflow.patched("my-patch-v2"):
50+
greeting = await workflow.execute_activity(
51+
compose_greeting,
52+
ComposeGreetingInput("Goodbye", name),
53+
start_to_close_timeout=timedelta(seconds=70),
54+
)
55+
56+
await asyncio.sleep(10)
57+
return greeting
58+
else:
59+
greeting = await workflow.execute_activity(
60+
compose_greeting,
61+
ComposeGreetingInput("Hello", name),
62+
start_to_close_timeout=timedelta(seconds=70),
63+
)
64+
return greeting
65+
66+
67+
# V3 of patch-workflow using deprecate_patch where all the old V1 workflows
68+
# have completed, we no longer need to preserve V1 and now just have V2
69+
@workflow.defn(name="patch-worklow")
70+
class MyWorkflowPatchDeprecated:
71+
@workflow.run
72+
async def run(self, name: str) -> str:
73+
workflow.logger.info("Running patch-workflow with parameter %s" % name)
74+
workflow.deprecate_patch("my-patch-v2")
75+
greeting = await workflow.execute_activity(
76+
compose_greeting,
77+
ComposeGreetingInput("Goodbye", name),
78+
start_to_close_timeout=timedelta(seconds=70),
79+
)
80+
81+
await asyncio.sleep(10)
82+
return greeting
83+
84+
85+
async def main():
86+
# Check Args
87+
if len(sys.argv) > 2:
88+
print(f"Incorrect arguments: {sys.argv[0]} v1|v2|v3")
89+
exit()
90+
if len(sys.argv) <= 1:
91+
print(f"Incorrect arguments: {sys.argv[0]} v1|v2|v3v3")
92+
exit()
93+
if sys.argv[1] != "v1" and sys.argv[1] != "v2" and sys.argv[1] != "v3":
94+
print(f"Incorrect arguments: {sys.argv[0]} v1|v2|v3")
95+
exit()
96+
97+
version = sys.argv[1]
98+
99+
# Uncomment the line below to see logging
100+
# logging.basicConfig(level=logging.INFO)
101+
102+
# Start client
103+
client = await Client.connect("localhost:7233")
104+
105+
# Set workflow_class to the proper class based on version
106+
workflow_class = ""
107+
if version == "v1":
108+
workflow_class = MyWorkflow # type: ignore
109+
elif version == "v2":
110+
workflow_class = MyWorkflowPatched # type: ignore
111+
elif version == "v3":
112+
workflow_class = MyWorkflowPatchDeprecated # type: ignore
113+
else:
114+
print(f"Incorrect arguments: {sys.argv[0]} v1|v2|v3")
115+
exit()
116+
117+
# While the worker is running, use the client to run the workflow and
118+
# print out its result. Check if the workflow is already running and
119+
# if so wait for the existing run to complete. Note, in many production setups,
120+
# the client would be in a completely separate process from the worker.
121+
async with Worker(
122+
client,
123+
task_queue="hello-patch-task-queue",
124+
workflows=[workflow_class], # type: ignore
125+
activities=[compose_greeting],
126+
):
127+
try:
128+
result = await client.execute_workflow(
129+
workflow_class.run, # type: ignore
130+
"World",
131+
id="hello-patch-workflow-id",
132+
task_queue="hello-patch-task-queue",
133+
)
134+
print(f"Result: {result}")
135+
except exceptions.WorkflowAlreadyStartedError:
136+
print(f"Workflow already running")
137+
result = await client.get_workflow_handle(
138+
"hello-patch-workflow-id"
139+
).result()
140+
print(f"Result: {result}")
141+
142+
143+
if __name__ == "__main__":
144+
asyncio.run(main())

0 commit comments

Comments
 (0)