Skip to content

Commit 2cb0fdd

Browse files
authored
Worker Versioning sample (temporalio#78)
* Upgrade sdk version to 1.3.0
1 parent 53c6063 commit 2cb0fdd

File tree

10 files changed

+259
-47
lines changed

10 files changed

+259
-47
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ Some examples require extra dependencies. See each sample's directory for specif
5151
while running.
5252
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow.
5353
<!-- Keep this list in alphabetical order -->
54-
* [activity_sticky_queue](activity_sticky_queue) - Uses unique task queues to ensure activities run on specific workers.
54+
* [activity_sticky_queue](activity_sticky_queues) - Uses unique task queues to ensure activities run on specific workers.
5555
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language.
5656
* [custom_converter](custom_converter) - Use a custom payload converter to handle custom types.
5757
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
@@ -63,6 +63,7 @@ Some examples require extra dependencies. See each sample's directory for specif
6363
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
6464
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
6565
* [sentry](sentry) - Report errors to Sentry.
66+
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code.
6667

6768
## Test
6869

poetry.lock

Lines changed: 9 additions & 45 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ packages = [
1717

1818
[tool.poetry.dependencies]
1919
python = "^3.7"
20-
temporalio = "^1.1.0"
20+
temporalio = "^1.3.0"
2121

2222
[tool.poetry.dev-dependencies]
2323
black = "^22.3.0"

worker_versioning/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# Worker Versioning Sample
2+
3+
This sample shows you how you can use the [Worker Versioning](https://docs.temporal.io/workers#worker-versioning)
4+
feature to deploy incompatible changes to workflow code more easily.
5+
6+
To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory:
7+
8+
poetry run python example.py
9+
10+
This will add some Build IDs to a Task Queue, and will also run Workers with those versions to show how you can
11+
mark add versions, mark them as compatible (or not) with one another, and run Workers at specific versions. You'll
12+
see that only the workers only process Workflow Tasks assigned versions they are compatible with.

worker_versioning/__init__.py

Whitespace-only changes.

worker_versioning/activities.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from temporalio import activity
2+
3+
4+
@activity.defn
5+
async def greet(inp: str) -> str:
6+
return f"Hi from {inp}"
7+
8+
9+
@activity.defn
10+
async def super_greet(inp: str, some_number: int) -> str:
11+
return f"Hi from {inp} with {some_number}"

worker_versioning/example.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import asyncio
2+
import uuid
3+
4+
from temporalio.client import BuildIdOpAddNewCompatible, BuildIdOpAddNewDefault, Client
5+
from temporalio.worker import Worker
6+
7+
from worker_versioning.activities import greet, super_greet
8+
from worker_versioning.workflow_v1 import MyWorkflow as MyWorkflowV1
9+
from worker_versioning.workflow_v1_1 import MyWorkflow as MyWorkflowV1_1
10+
from worker_versioning.workflow_v2 import MyWorkflow as MyWorkflowV2
11+
12+
13+
async def main():
14+
client = await Client.connect("localhost:7233")
15+
task_queue = f"worker-versioning-{uuid.uuid4()}"
16+
17+
# Start a 1.0 worker
18+
async with Worker(
19+
client,
20+
task_queue=task_queue,
21+
workflows=[MyWorkflowV1],
22+
activities=[greet, super_greet],
23+
build_id="1.0",
24+
use_worker_versioning=True,
25+
):
26+
# Add 1.0 as the default version for the queue
27+
await client.update_worker_build_id_compatibility(
28+
task_queue, BuildIdOpAddNewDefault("1.0")
29+
)
30+
31+
# Start a workflow which will run on the 1.0 worker
32+
handle = await client.start_workflow(
33+
MyWorkflowV1.run,
34+
task_queue=task_queue,
35+
id=f"worker-versioning-v1-{uuid.uuid4()}",
36+
)
37+
# Signal the workflow to proceed
38+
await handle.signal(MyWorkflowV1.proceeder, "go")
39+
40+
# Give a chance for the worker to process the signal
41+
# TODO Better?
42+
await asyncio.sleep(1)
43+
44+
# Add 1.1 as the default version for the queue, compatible with 1.0
45+
await client.update_worker_build_id_compatibility(
46+
task_queue, BuildIdOpAddNewCompatible("1.1", "1.0")
47+
)
48+
49+
# Stop the old worker, and start a 1.1 worker. We do this to speed along the example, since the
50+
# 1.0 worker may continue to process tasks briefly after we make 1.1 the new default.
51+
async with Worker(
52+
client,
53+
task_queue=task_queue,
54+
workflows=[MyWorkflowV1_1],
55+
activities=[greet, super_greet],
56+
build_id="1.1",
57+
use_worker_versioning=True,
58+
):
59+
# Continue driving the workflow. Take note that the new version of the workflow run by the 1.1
60+
# worker is the one that takes over! You might see a workflow task timeout, if the 1.0 worker is
61+
# processing a task as the version update happens. That's normal.
62+
await handle.signal(MyWorkflowV1.proceeder, "go")
63+
64+
# Add a new *incompatible* version to the task queue, which will become the new overall default for the queue.
65+
await client.update_worker_build_id_compatibility(
66+
task_queue, BuildIdOpAddNewDefault("2.0")
67+
)
68+
69+
# Start a 2.0 worker
70+
async with Worker(
71+
client,
72+
task_queue=task_queue,
73+
workflows=[MyWorkflowV2],
74+
activities=[greet, super_greet],
75+
build_id="2.0",
76+
use_worker_versioning=True,
77+
):
78+
# Start a new workflow. Note that it will run on the new 2.0 version, without the client invocation changing
79+
# at all! Note here we can use `MyWorkflowV1.run` because the signature of the workflow has not changed.
80+
handle2 = await client.start_workflow(
81+
MyWorkflowV1.run,
82+
task_queue=task_queue,
83+
id=f"worker-versioning-v2-{uuid.uuid4()}",
84+
)
85+
86+
# Drive both workflows once more before concluding them. The first workflow will continue running on the 1.1
87+
# worker.
88+
await handle.signal(MyWorkflowV1.proceeder, "go")
89+
await handle2.signal(MyWorkflowV1.proceeder, "go")
90+
await handle.signal(MyWorkflowV1.proceeder, "finish")
91+
await handle2.signal(MyWorkflowV1.proceeder, "finish")
92+
93+
# Wait for both workflows to complete
94+
await handle.result()
95+
await handle2.result()
96+
97+
# Lastly we'll demonstrate how you can use the gRPC api to determine if certain build IDs are ready to be
98+
# retired. There's more information in the documentation, but here's a quick example that shows us how to
99+
# tell when the 1.0 worker can be retired:
100+
101+
# There is a 5 minute buffer before we will consider IDs no longer reachable by new workflows, to
102+
# account for replication in multi-cluster setups. Uncomment the following line to wait long enough to see
103+
# the 1.0 worker become unreachable.
104+
# await asyncio.sleep(60 * 5)
105+
reachability = await client.get_worker_task_reachability(
106+
build_ids=["2.0", "1.0", "1.1"]
107+
)
108+
109+
if not reachability.build_id_reachability["1.0"].task_queue_reachability[
110+
task_queue
111+
]:
112+
print("1.0 is ready to be retired!")
113+
114+
115+
if __name__ == "__main__":
116+
asyncio.run(main())

worker_versioning/workflow_v1.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from datetime import timedelta
2+
3+
from temporalio import workflow
4+
5+
with workflow.unsafe.imports_passed_through():
6+
from worker_versioning.activities import greet
7+
8+
9+
@workflow.defn
10+
class MyWorkflow:
11+
"""The 1.0 version of the workflow we'll be making changes to"""
12+
13+
should_finish: bool = False
14+
15+
@workflow.run
16+
async def run(self) -> str:
17+
workflow.logger.info("Running workflow V1")
18+
await workflow.wait_condition(lambda: self.should_finish)
19+
return "Concluded workflow on V1"
20+
21+
@workflow.signal
22+
async def proceeder(self, inp: str):
23+
await workflow.execute_activity(
24+
greet, "V1", start_to_close_timeout=timedelta(seconds=5)
25+
)
26+
if inp == "finish":
27+
self.should_finish = True

worker_versioning/workflow_v1_1.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from datetime import timedelta
2+
3+
from temporalio import workflow
4+
5+
with workflow.unsafe.imports_passed_through():
6+
from worker_versioning.activities import greet, super_greet
7+
8+
9+
@workflow.defn
10+
class MyWorkflow:
11+
"""
12+
The 1.1 version of the workflow, which is compatible with the first version.
13+
14+
The compatible changes we've made are:
15+
- Altering the log lines
16+
- Using the `patched` API to properly introduce branching behavior while maintaining
17+
compatibility
18+
"""
19+
20+
should_finish: bool = False
21+
22+
@workflow.run
23+
async def run(self) -> str:
24+
workflow.logger.info("Running workflow V1.1")
25+
await workflow.wait_condition(lambda: self.should_finish)
26+
return "Concluded workflow on V1.1"
27+
28+
@workflow.signal
29+
async def proceeder(self, inp: str):
30+
if workflow.patched("different-activity"):
31+
await workflow.execute_activity(
32+
super_greet,
33+
args=["V1.1", 100],
34+
start_to_close_timeout=timedelta(seconds=5),
35+
)
36+
else:
37+
# Note it is a valid compatible change to alter the input to an activity. However, because
38+
# we're using the patched API, this branch would only be taken if the workflow was started on
39+
# a v1 worker.
40+
await workflow.execute_activity(
41+
greet, "V1.1", start_to_close_timeout=timedelta(seconds=5)
42+
)
43+
44+
if inp == "finish":
45+
self.should_finish = True

worker_versioning/workflow_v2.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import asyncio
2+
from datetime import timedelta
3+
4+
from temporalio import workflow
5+
6+
with workflow.unsafe.imports_passed_through():
7+
from worker_versioning.activities import greet
8+
9+
10+
@workflow.defn
11+
class MyWorkflow:
12+
"""
13+
The 2.0 version of the workflow, which is fully incompatible with the other workflows, since it
14+
alters the sequence of commands without using `patched`.
15+
"""
16+
17+
should_finish: bool = False
18+
19+
@workflow.run
20+
async def run(self) -> str:
21+
workflow.logger.info("Running workflow V2")
22+
await workflow.wait_condition(lambda: self.should_finish)
23+
return "Concluded workflow on V2"
24+
25+
@workflow.signal
26+
async def proceeder(self, inp: str):
27+
await asyncio.sleep(1)
28+
await workflow.execute_activity(
29+
greet, "V2", start_to_close_timeout=timedelta(seconds=5)
30+
)
31+
await workflow.execute_activity(
32+
greet, "V2", start_to_close_timeout=timedelta(seconds=5)
33+
)
34+
35+
if inp == "finish":
36+
self.should_finish = True

0 commit comments

Comments
 (0)