Skip to content

Commit 1b6145a

Browse files
Add a sample that uses a workflow to lock resources (temporalio#172)
* Add a sample resource locking workflow * Fix a code snippet in README.md * Clean up imports, exercise continue_as_new behavior * Rename the workflows for clarity, improve README * Remove spurious logging comment * poe format * poe lint * Borrow the random signal name trick from other samples * Simplify allocation algorithm * Add 'async with' syntactic sugar * Update readme * poe lint/fmt * Add a test * method rename * renames * type hints for all returns * handle substantive PR feedback * clean up imports * PR feedback (no move yet) * Move the resource pool client * Handle assignment signal failures * format/lint * PR feedback (move resource_pool_workflow, misc factoring)
1 parent 7a1dd4d commit 1b6145a

11 files changed

Lines changed: 636 additions & 0 deletions

File tree

resource_pool/README.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Resource Pool Sample
2+
3+
This sample shows how to use a long-lived `ResourcePoolWorkflow` to allocate `resources` to `ResourceUserWorkflows`.
4+
Each `ResourceUserWorkflow` runs several activities while it has ownership of a resource. Note that
5+
`ResourcePoolWorkflow` is making resource allocation decisions based on in-memory state.
6+
7+
Run the following from this directory to start the worker:
8+
9+
uv run worker.py
10+
11+
This will start the worker. Then, in another terminal, run the following to execute several `ResourceUserWorkflows`.
12+
13+
uv run starter.py
14+
15+
You should see output indicating that the `ResourcePoolWorkflow` serialized access to each resource.
16+
17+
You can query the set of current resource resource holders with:
18+
19+
tctl wf query -w resource_pool --qt get_current_holders
20+
21+
# Other approaches
22+
23+
There are simpler ways to manage concurrent access to resources. Consider using resource-specific workers/task queues,
24+
and limiting the number of activity slots on the workers. The golang SDK also [sessions](https://docs.temporal.io/develop/go/sessions)
25+
that allow workflows to pin themselves to workers.
26+
27+
The technique in this sample is capable of more complex resource allocation than the options above, but it doesn't scale
28+
as well. Specifically, it can:
29+
- Manage access to a set of resources that is decoupled from the set of workers and task queues
30+
- Run arbitrary code to place workloads on resources as they become available
31+
32+
# Caveats
33+
34+
This sample uses true locking (not leasing!) to avoid complexity and scaling concerns associated with heartbeating via
35+
signals. Locking carries a risk where failure to unlock permanently removing a resource from the pool. However, with
36+
Temporal's durable execution guarantees, this can only happen if:
37+
38+
- A ResourceUserWorkflows times out (prohibited in the sample code)
39+
- An operator terminates a ResourceUserWorkflows. (Temporal recommends canceling workflows instead of terminating them whenever possible.)
40+
- You shut down your workers and never restart them (unhandled, but irrelevant)
41+
42+
If a leak were to happen, you could discover the identity of the leaker using the query above, then:
43+
44+
tctl wf signal -w resource_pool --name release_resource --input '{ "release_key": "<the key from the query above>" }
45+
46+
Performance: A single ResourcePoolWorkflow scales to tens, but not hundreds, of request/release events per second. It is
47+
best suited for allocating resources to long-running workflows. Actual performance will depend on your temporal server's
48+
persistence layer.

resource_pool/__init__.py

Whitespace-only changes.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from .resource_pool_client import ResourcePoolClient
2+
from .resource_pool_workflow import ResourcePoolWorkflow
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
from contextlib import asynccontextmanager
2+
from datetime import timedelta
3+
from typing import AsyncGenerator, Optional
4+
5+
from temporalio import workflow
6+
7+
from resource_pool.pool_client.resource_pool_workflow import ResourcePoolWorkflow
8+
from resource_pool.shared import (
9+
AcquiredResource,
10+
AcquireRequest,
11+
AcquireResponse,
12+
DetachedResource,
13+
)
14+
15+
16+
# Use this class in workflow code that that needs to run on locked resources.
17+
class ResourcePoolClient:
18+
def __init__(self, pool_workflow_id: str) -> None:
19+
self.pool_workflow_id = pool_workflow_id
20+
self.acquired_resources: list[AcquiredResource] = []
21+
22+
signal_name = f"assign_resource_{self.pool_workflow_id}"
23+
if workflow.get_signal_handler(signal_name) is None:
24+
workflow.set_signal_handler(signal_name, self._handle_acquire_response)
25+
else:
26+
raise RuntimeError(
27+
f"{signal_name} already registered - if you use multiple ResourcePoolClients within the "
28+
f"same workflow, they must use different pool_workflow_ids"
29+
)
30+
31+
def _handle_acquire_response(self, response: AcquireResponse) -> None:
32+
self.acquired_resources.append(
33+
AcquiredResource(
34+
resource=response.resource, release_key=response.release_key
35+
)
36+
)
37+
38+
async def _send_acquire_signal(self) -> None:
39+
await workflow.get_external_workflow_handle_for(
40+
ResourcePoolWorkflow.run, self.pool_workflow_id
41+
).signal("acquire_resource", AcquireRequest(workflow.info().workflow_id))
42+
43+
async def _send_release_signal(self, acquired_resource: AcquiredResource) -> None:
44+
await workflow.get_external_workflow_handle_for(
45+
ResourcePoolWorkflow.run, self.pool_workflow_id
46+
).signal(
47+
"release_resource",
48+
AcquireResponse(
49+
resource=acquired_resource.resource,
50+
release_key=acquired_resource.release_key,
51+
),
52+
)
53+
54+
@asynccontextmanager
55+
async def acquire_resource(
56+
self,
57+
*,
58+
reattach: Optional[DetachedResource] = None,
59+
max_wait_time: timedelta = timedelta(minutes=5),
60+
) -> AsyncGenerator[AcquiredResource, None]:
61+
_warn_when_workflow_has_timeouts()
62+
63+
if reattach is None:
64+
await self._send_acquire_signal()
65+
await workflow.wait_condition(
66+
lambda: len(self.acquired_resources) > 0, timeout=max_wait_time
67+
)
68+
resource = self.acquired_resources.pop(0)
69+
else:
70+
resource = AcquiredResource(
71+
resource=reattach.resource, release_key=reattach.release_key
72+
)
73+
74+
# Can't happen, but the typechecker doesn't know about workflow.wait_condition
75+
if resource is None:
76+
raise RuntimeError("resource was None when it can't be")
77+
78+
# During the yield, the calling workflow owns the resource. Note that this is a lock, not a lease! Our
79+
# finally block will release the resource if an activity fails. This is why we asserted the lack of
80+
# workflow-level timeouts above - the finally block wouldn't run if there was a timeout.
81+
try:
82+
yield resource
83+
finally:
84+
if not resource.detached:
85+
await self._send_release_signal(resource)
86+
87+
88+
def _warn_when_workflow_has_timeouts() -> None:
89+
def has_timeout(timeout: Optional[timedelta]) -> bool:
90+
# After continue_as_new, timeouts are 0, even if they were None before continue_as_new (and were not set in the
91+
# continue_as_new call).
92+
return timeout is not None and timeout > timedelta(0)
93+
94+
if has_timeout(workflow.info().run_timeout):
95+
workflow.logger.warning(
96+
f"ResourceLockingWorkflow cannot have a run_timeout (found {workflow.info().run_timeout}) - this will leak locks"
97+
)
98+
if has_timeout(workflow.info().execution_timeout):
99+
workflow.logger.warning(
100+
f"ResourceLockingWorkflow cannot have an execution_timeout (found {workflow.info().execution_timeout}) - this will leak locks"
101+
)
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
from dataclasses import dataclass
2+
from typing import Optional
3+
4+
from temporalio import workflow
5+
from temporalio.exceptions import ApplicationError
6+
7+
from resource_pool.shared import AcquireRequest, AcquireResponse
8+
9+
10+
# Internal to this workflow, we'll associate randomly generated release signal names with each acquire request.
11+
@dataclass
12+
class InternalAcquireRequest(AcquireRequest):
13+
release_signal: Optional[str]
14+
15+
16+
@dataclass
17+
class ResourcePoolWorkflowInput:
18+
# Key is resource, value is current holder of the resource (None if not held)
19+
resources: dict[str, Optional[InternalAcquireRequest]]
20+
waiters: list[InternalAcquireRequest]
21+
22+
23+
@workflow.defn
24+
class ResourcePoolWorkflow:
25+
@workflow.init
26+
def __init__(self, input: ResourcePoolWorkflowInput) -> None:
27+
self.resources = input.resources
28+
self.waiters = input.waiters
29+
self.release_key_to_resource: dict[str, str] = {}
30+
31+
for resource, holder in self.resources.items():
32+
if holder is not None and holder.release_signal is not None:
33+
self.release_key_to_resource[holder.release_signal] = resource
34+
35+
@workflow.signal
36+
async def add_resources(self, resources: list[str]) -> None:
37+
for resource in resources:
38+
if resource in self.resources:
39+
workflow.logger.warning(
40+
f"Ignoring attempt to add already-existing resource: {resource}"
41+
)
42+
else:
43+
self.resources[resource] = None
44+
45+
@workflow.signal
46+
async def acquire_resource(self, request: AcquireRequest) -> None:
47+
self.waiters.append(
48+
InternalAcquireRequest(workflow_id=request.workflow_id, release_signal=None)
49+
)
50+
workflow.logger.info(
51+
f"workflow_id={request.workflow_id} is waiting for a resource"
52+
)
53+
54+
@workflow.signal
55+
async def release_resource(self, acquire_response: AcquireResponse) -> None:
56+
release_key = acquire_response.release_key
57+
resource = self.release_key_to_resource.get(release_key)
58+
if resource is None:
59+
workflow.logger.warning(f"Ignoring unknown release_key: {release_key}")
60+
return
61+
62+
holder = self.resources[resource]
63+
if holder is None:
64+
workflow.logger.warning(
65+
f"Ignoring request to release resource that is not held: {resource}"
66+
)
67+
return
68+
69+
# Remove the current holder
70+
workflow.logger.info(
71+
f"workflow_id={holder.workflow_id} released resource {resource}"
72+
)
73+
self.resources[resource] = None
74+
del self.release_key_to_resource[release_key]
75+
76+
@workflow.query
77+
def get_current_holders(self) -> dict[str, Optional[InternalAcquireRequest]]:
78+
return self.resources
79+
80+
async def assign_resource(
81+
self, resource: str, internal_request: InternalAcquireRequest
82+
) -> None:
83+
workflow.logger.info(
84+
f"workflow_id={internal_request.workflow_id} acquired resource {resource}"
85+
)
86+
87+
requester = workflow.get_external_workflow_handle(internal_request.workflow_id)
88+
try:
89+
release_signal = str(workflow.uuid4())
90+
await requester.signal(
91+
f"assign_resource_{workflow.info().workflow_id}",
92+
AcquireResponse(release_key=release_signal, resource=resource),
93+
)
94+
95+
internal_request.release_signal = release_signal
96+
self.resources[resource] = internal_request
97+
self.release_key_to_resource[release_signal] = resource
98+
except ApplicationError as e:
99+
if e.type == "ExternalWorkflowExecutionNotFound":
100+
workflow.logger.info(
101+
f"Could not assign resource {resource} to {internal_request.workflow_id}: {e.message}"
102+
)
103+
else:
104+
raise e
105+
106+
async def assign_next_resource(self) -> bool:
107+
if len(self.waiters) == 0:
108+
return False
109+
110+
next_free_resource = self.get_free_resource()
111+
if next_free_resource is None:
112+
return False
113+
114+
next_waiter = self.waiters.pop(0)
115+
await self.assign_resource(next_free_resource, next_waiter)
116+
return True
117+
118+
def get_free_resource(self) -> Optional[str]:
119+
return next(
120+
(resource for resource, holder in self.resources.items() if holder is None),
121+
None,
122+
)
123+
124+
def can_assign_resource(self) -> bool:
125+
return len(self.waiters) > 0 and self.get_free_resource() is not None
126+
127+
def should_continue_as_new(self) -> bool:
128+
return (
129+
workflow.info().is_continue_as_new_suggested()
130+
and workflow.all_handlers_finished()
131+
)
132+
133+
@workflow.run
134+
async def run(self, _: ResourcePoolWorkflowInput) -> None:
135+
while True:
136+
await workflow.wait_condition(
137+
lambda: self.can_assign_resource() or self.should_continue_as_new()
138+
)
139+
140+
if await self.assign_next_resource():
141+
continue
142+
143+
if self.should_continue_as_new():
144+
workflow.continue_as_new(
145+
ResourcePoolWorkflowInput(
146+
resources=self.resources,
147+
waiters=self.waiters,
148+
)
149+
)
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import asyncio
2+
from dataclasses import dataclass, field
3+
from datetime import timedelta
4+
from typing import Optional
5+
6+
from temporalio import activity, workflow
7+
8+
from resource_pool.pool_client import ResourcePoolClient
9+
from resource_pool.shared import DetachedResource
10+
11+
12+
@dataclass
13+
class UseResourceActivityInput:
14+
resource: str
15+
iteration: str
16+
17+
18+
@activity.defn
19+
async def use_resource(input: UseResourceActivityInput) -> None:
20+
info = activity.info()
21+
activity.logger.info(
22+
f"{info.workflow_id} starts using {input.resource} the {input.iteration} time"
23+
)
24+
await asyncio.sleep(3)
25+
activity.logger.info(
26+
f"{info.workflow_id} done using {input.resource} the {input.iteration} time"
27+
)
28+
29+
30+
@dataclass
31+
class ResourceUserWorkflowInput:
32+
# The id of the resource pool workflow to request a resource from
33+
resource_pool_workflow_id: str
34+
35+
# If set, this workflow will fail after the "first" or "second" activity.
36+
iteration_to_fail_after: Optional[str]
37+
38+
# If True, this workflow will continue as new after the last activity. The next iteration will run more activities,
39+
# but will not continue as new.
40+
should_continue_as_new: bool
41+
42+
# Used to transfer resource ownership between iterations during continue_as_new
43+
already_acquired_resource: Optional[DetachedResource] = field(default=None)
44+
45+
46+
class FailWorkflowException(Exception):
47+
pass
48+
49+
50+
# Wait this long for a resource before giving up
51+
MAX_RESOURCE_WAIT_TIME = timedelta(minutes=5)
52+
53+
54+
@workflow.defn(failure_exception_types=[FailWorkflowException])
55+
class ResourceUserWorkflow:
56+
@workflow.run
57+
async def run(self, input: ResourceUserWorkflowInput) -> None:
58+
pool_client = ResourcePoolClient(input.resource_pool_workflow_id)
59+
60+
async with pool_client.acquire_resource(
61+
reattach=input.already_acquired_resource
62+
) as acquired_resource:
63+
for iteration in ["first", "second"]:
64+
await workflow.execute_activity(
65+
use_resource,
66+
UseResourceActivityInput(acquired_resource.resource, iteration),
67+
start_to_close_timeout=timedelta(seconds=10),
68+
)
69+
70+
if iteration == input.iteration_to_fail_after:
71+
workflow.logger.info(
72+
f"Failing after iteration {input.iteration_to_fail_after}"
73+
)
74+
raise FailWorkflowException()
75+
76+
# This workflow only continues as new so it can demonstrate how to pass acquired resources across
77+
# iterations. Ordinarily, such a short workflow would not use continue as new.
78+
if input.should_continue_as_new:
79+
detached_resource = acquired_resource.detach()
80+
81+
next_input = ResourceUserWorkflowInput(
82+
resource_pool_workflow_id=input.resource_pool_workflow_id,
83+
iteration_to_fail_after=input.iteration_to_fail_after,
84+
should_continue_as_new=False,
85+
already_acquired_resource=detached_resource,
86+
)
87+
88+
workflow.continue_as_new(next_input)

0 commit comments

Comments
 (0)