Skip to content

Commit 82f5267

Browse files
authored
fix: update queue generation to be one sticky queue per worker host (temporalio#71)
1 parent d4e6c36 commit 82f5267

File tree

3 files changed

+22
-26
lines changed

3 files changed

+22
-26
lines changed

activity_sticky_queues/README.md

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
# Sticky Activity Queues
22

3-
This sample is a Python implementation of the [TypeScript "Sticky Workers" example](https://github.com/temporalio/samples-typescript/tree/main/activities-sticky-queues), full credit for the design to the authors of that sample. A [sticky execution](https://docs.temporal.io/tasks#sticky-execution) is a job distribution design pattern where all workflow computational tasks are executed on a single worker. In the Go and Java SDKs this is explicitly supported via the Session option, but in other SDKs a different approach is required.
3+
This sample is a Python implementation of the [TypeScript "Sticky Workers" example](https://github.com/temporalio/samples-typescript/tree/main/activities-sticky-queues), full credit for the design to the authors of that sample. A [sticky execution](https://docs.temporal.io/tasks#sticky-execution) is a job distribution design pattern where all workflow computational tasks are executed on a single worker. In the Go and Java SDKs this is explicitly supported via the Session option, but in other SDKs a different approach is required.
44

55
Typical use cases for sticky executions include tasks where interaction with a filesystem is required, such as data processing or interacting with legacy access structures. This example will write text files to folders corresponding to each worker, located in the `demo_fs` folder. In production, these folders would typically be independent machines in a worker cluster.
66

77
This strategy is:
8+
89
- Create a `get_available_task_queue` activity that generates a unique task queue name, `unique_worker_task_queue`.
910
- For activities intended to be "sticky", only register them in one Worker, and have that be the only Worker listening on that `unique_worker_task_queue`. This will be run on a series of `FileProcessing` workflows.
1011
- Execute workflows from the Client like normal. Check the Temporal Web UI to confirm tasks were staying with their respective worker.
1112

12-
It doesn't matter where the `get_available_task_queue` activity is run, so it can be "non sticky" as per Temporal default behavior. In this demo, `unique_worker_task_queue` is simply a `uuid` initialized in the Worker, but you can inject smart logic here to uniquely identify the Worker, [as Netflix did](https://community.temporal.io/t/using-dynamic-task-queues-for-traffic-routing/3045). Our example differs from the Node sample by running across 5 unique task queues.
13+
It doesn't matter where the `get_available_task_queue` activity is run, so it can be "non sticky" as per Temporal default behavior. In this demo, `unique_worker_task_queue` is simply a `uuid` initialized in the Worker, but you can inject smart logic here to uniquely identify the Worker, [as Netflix did](https://community.temporal.io/t/using-dynamic-task-queues-for-traffic-routing/3045).
1314

1415
Activities have been artificially slowed with `time.sleep(3)` to simulate slow activities.
1516

@@ -27,7 +28,7 @@ This will start the worker. Then, in another terminal, run the following to exec
2728
#### Example output:
2829

2930
```bash
30-
(temporalio-samples-py3.10) user@machine:~/samples-python/activities_sticky_queues$ poetry run python starter.py
31+
(temporalio-samples-py3.10) user@machine:~/samples-python/activities_sticky_queues$ poetry run python starter.py
3132
Output checksums:
3233
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
3334
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
@@ -48,5 +49,3 @@ All activities for the one workflow are running against the same task queue, whi
4849
![image](./static/all-activitites-on-same-task-queue.png)
4950

5051
</details>
51-
52-

activity_sticky_queues/tasks.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ async def get_available_task_queue() -> str:
6363
async def download_file_to_worker_filesystem(details: DownloadObj) -> str:
6464
"""Simulates downloading a file to a local filesystem"""
6565
# FS ops
66+
print(details.unique_worker_id, details.workflow_uuid)
6667
path = create_filepath(details.unique_worker_id, details.workflow_uuid)
6768
activity.logger.info(f"Downloading ${details.url} and saving to ${path}")
6869

activity_sticky_queues/worker.py

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,12 @@ async def main():
2121
random.seed(667)
2222

2323
# Create random task queues and build task queue selection function
24-
task_queues: List[str] = [
25-
f"activity_sticky_queue-host-{UUID(int=random.getrandbits(128))}"
26-
for _ in range(5)
27-
]
24+
task_queue: str = f"activity_sticky_queue-host-{UUID(int=random.getrandbits(128))}"
2825

2926
@activity.defn(name="get_available_task_queue")
30-
async def select_task_queue_random() -> str:
27+
async def select_task_queue() -> str:
3128
"""Randomly assign the job to a queue"""
32-
return random.choice(task_queues)
29+
return task_queue
3330

3431
# Start client
3532
client = await Client.connect("localhost:7233")
@@ -40,25 +37,24 @@ async def select_task_queue_random() -> str:
4037
client,
4138
task_queue="activity_sticky_queue-distribution-queue",
4239
workflows=[tasks.FileProcessing],
43-
activities=[select_task_queue_random],
40+
activities=[select_task_queue],
4441
)
4542
run_futures.append(handle.run())
4643
print("Base worker started")
4744

48-
# Run the workers for the individual task queues
49-
for queue_id in task_queues:
50-
handle = Worker(
51-
client,
52-
task_queue=queue_id,
53-
activities=[
54-
tasks.download_file_to_worker_filesystem,
55-
tasks.work_on_file_in_worker_filesystem,
56-
tasks.clean_up_file_from_worker_filesystem,
57-
],
58-
)
59-
run_futures.append(handle.run())
60-
# Wait until interrupted
61-
print(f"Worker {queue_id} started")
45+
# Run unique task queue for this particular host
46+
handle = Worker(
47+
client,
48+
task_queue=task_queue,
49+
activities=[
50+
tasks.download_file_to_worker_filesystem,
51+
tasks.work_on_file_in_worker_filesystem,
52+
tasks.clean_up_file_from_worker_filesystem,
53+
],
54+
)
55+
run_futures.append(handle.run())
56+
# Wait until interrupted
57+
print(f"Worker {task_queue} started")
6258

6359
print("All workers started, ctrl+c to exit")
6460
await asyncio.gather(*run_futures)

0 commit comments

Comments
 (0)