Skip to content

Commit 231a8de

Browse files
committed
switch from async activity completion to signals
1 parent 2f5d5d3 commit 231a8de

16 files changed

Lines changed: 755 additions & 348 deletions

expense/README.md

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
# Expense
22

3-
This sample workflow processes an expense request. It demonstrates human-in-the loop processing and asynchronous activity completion.
3+
This sample workflow processes an expense request. It demonstrates human-in-the loop processing using Temporal's signal mechanism.
44

55
## Overview
66

77
This sample demonstrates the following workflow:
88

99
1. **Create Expense**: The workflow executes the `create_expense_activity` to initialize a new expense report in the external system.
1010

11-
2. **Wait for Decision**: The workflow calls `wait_for_decision_activity`, which demonstrates asynchronous activity completion. The activity registers itself for external completion using its task token, then calls `activity.raise_complete_async()` to signal that it will complete later without blocking the worker.
11+
2. **Register for Decision**: The workflow calls `register_for_decision_activity`, which registers the workflow with the external UI system so it can receive signals when decisions are made.
1212

13-
3. **Async Completion**: When a human approves or rejects the expense, an external process uses the stored task token to call `workflow_client.get_async_activity_handle(task_token).complete()`, notifying Temporal that the waiting activity has finished and providing the decision result.
13+
3. **Wait for Signal**: The workflow uses `workflow.wait_condition()` to wait for an external signal containing the approval/rejection decision.
1414

15-
4. **Process Payment**: Once the workflow receives the approval decision, it executes the `payment_activity` to complete the simulated expense processing.
15+
4. **Signal-Based Completion**: When a human approves or rejects the expense, the external UI system sends a signal to the workflow using `workflow_handle.signal()`, providing the decision result.
1616

17-
This pattern enables human-in-the-loop workflows where activities can wait as long as necessary for external decisions without consuming worker resources or timing out.
17+
5. **Process Payment**: Once the workflow receives the approval decision via signal, it executes the `payment_activity` to complete the simulated expense processing.
18+
19+
This pattern enables human-in-the-loop workflows where workflows can wait as long as necessary for external decisions using Temporal's durable signal mechanism.
1820

1921
## Steps To Run Sample
2022

@@ -29,35 +31,53 @@ This pattern enables human-in-the-loop workflows where activities can wait as lo
2931
```
3032
* Start expense workflow execution:
3133
```bash
34+
# Start workflow and return immediately (default)
3235
uv run -m expense.starter
36+
37+
# Start workflow and wait for completion
38+
uv run -m expense.starter --wait
39+
40+
# Start workflow with custom expense ID
41+
uv run -m expense.starter --expense-id "my-expense-123"
42+
43+
# Start workflow with custom ID and wait for completion
44+
uv run -m expense.starter --wait --expense-id "my-expense-123"
3345
```
3446
* When you see the console print out that the expense is created, go to [localhost:8099/list](http://localhost:8099/list) to approve the expense.
3547
* You should see the workflow complete after you approve the expense. You can also reject the expense.
3648

3749
## Running Tests
3850

3951
```bash
40-
# Run all tests
41-
uv run pytest expense/test_workflow.py -v
52+
# Run all expense tests
53+
uv run -m pytest tests/expense/ -v
54+
55+
# Run specific test categories
56+
uv run -m pytest tests/expense/test_expense_workflow.py -v # Workflow tests
57+
uv run -m pytest tests/expense/test_expense_activities.py -v # Activity tests
58+
uv run -m pytest tests/expense/test_expense_integration.py -v # Integration tests
59+
uv run -m pytest tests/expense/test_ui.py -v # UI tests
4260

4361
# Run a specific test
44-
uv run pytest expense/test_workflow.py::TestSampleExpenseWorkflow::test_workflow_with_mock_activities -v
62+
uv run -m pytest tests/expense/test_expense_workflow.py::TestWorkflowPaths::test_workflow_approved_complete_flow -v
4563
```
4664

4765
## Key Concepts Demonstrated
4866

4967
* **Human-in-the-Loop Workflows**: Long-running workflows that wait for human interaction
50-
* **Async Activity Completion**: Using `activity.raise_complete_async()` to indicate an activity will complete asynchronously, then calling `complete()` on a handle to the async activity.
51-
* **External System Integration**: Communication between workflows and external systems via web services.
68+
* **Workflow Signals**: Using `workflow.signal()` and `workflow.wait_condition()` for external communication
69+
* **Signal-Based Completion**: External systems sending signals to workflows for asynchronous decision-making
70+
* **External System Integration**: Communication between workflows and external systems via web services and signals
71+
* **HTTP Client Lifecycle Management**: Proper resource management with worker-scoped HTTP clients
5272

5373
## Troubleshooting
5474

5575
If you see the workflow failed, the cause may be a port conflict. You can try to change to a different port number in `__init__.py`. Then rerun everything.
5676

5777
## Files
5878

59-
* `workflow.py` - The main expense processing workflow
60-
* `activities.py` - Three activities: create expense, wait for decision, process payment
61-
* `ui.py` - A demonstration expense approval system web UI
62-
* `worker.py` - Worker to run workflows
63-
* `starter.py` - Client to start workflow executions by submitting an expense report
79+
* `workflow.py` - The main expense processing workflow with signal handling
80+
* `activities.py` - Three activities: create expense, register for decision, process payment
81+
* `ui.py` - A demonstration expense approval system web UI with signal sending
82+
* `worker.py` - Worker to run workflows and activities with HTTP client lifecycle management
83+
* `starter.py` - Client to start workflow executions with optional completion waiting

expense/activities.py

Lines changed: 65 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,58 @@
11
import httpx
22
from temporalio import activity
3+
from temporalio.exceptions import ApplicationError
34

45
from expense import EXPENSE_SERVER_HOST_PORT
56

7+
# Module-level HTTP client, managed by worker lifecycle
8+
_http_client: httpx.AsyncClient | None = None
9+
10+
11+
async def initialize_http_client() -> None:
12+
"""Initialize the global HTTP client. Called by worker setup."""
13+
global _http_client
14+
if _http_client is None:
15+
_http_client = httpx.AsyncClient()
16+
17+
18+
async def cleanup_http_client() -> None:
19+
"""Cleanup the global HTTP client. Called by worker shutdown."""
20+
global _http_client
21+
if _http_client is not None:
22+
await _http_client.aclose()
23+
_http_client = None
24+
25+
26+
def get_http_client() -> httpx.AsyncClient:
27+
"""Get the global HTTP client."""
28+
if _http_client is None:
29+
raise RuntimeError(
30+
"HTTP client not initialized. Call initialize_http_client() first."
31+
)
32+
return _http_client
33+
634

735
@activity.defn
836
async def create_expense_activity(expense_id: str) -> None:
937
if not expense_id:
1038
raise ValueError("expense id is empty")
1139

12-
async with httpx.AsyncClient() as client:
40+
client = get_http_client()
41+
try:
1342
response = await client.get(
1443
f"{EXPENSE_SERVER_HOST_PORT}/create",
1544
params={"is_api_call": "true", "id": expense_id},
1645
)
1746
response.raise_for_status()
18-
body = response.text
47+
except httpx.HTTPStatusError as e:
48+
if 400 <= e.response.status_code < 500:
49+
raise ApplicationError(
50+
f"Client error: {e.response.status_code} {e.response.text}",
51+
non_retryable=True,
52+
) from e
53+
raise
54+
55+
body = response.text
1956

2057
if body == "SUCCEED":
2158
activity.logger.info(f"Expense created. ExpenseID: {expense_id}")
@@ -25,62 +62,56 @@ async def create_expense_activity(expense_id: str) -> None:
2562

2663

2764
@activity.defn
28-
async def wait_for_decision_activity(expense_id: str) -> str:
65+
async def register_for_decision_activity(expense_id: str) -> None:
2966
"""
30-
Wait for the expense decision. This activity will complete asynchronously. When this function
31-
calls activity.raise_complete_async(), the Temporal Python SDK recognizes this and won't mark this activity
32-
as failed or completed. The Temporal server will wait until Client.complete_activity() is called or timeout happened
33-
whichever happen first. In this sample case, the complete_activity() method is called by our sample expense system when
34-
the expense is approved.
67+
Register the expense for decision. This activity registers the workflow
68+
with the external system so it can receive signals when decisions are made.
3569
"""
3670
if not expense_id:
3771
raise ValueError("expense id is empty")
3872

3973
logger = activity.logger
74+
http_client = get_http_client()
4075

41-
# Save current activity info so it can be completed asynchronously when expense is approved/rejected
76+
# Get workflow info to register with the UI system
4277
activity_info = activity.info()
43-
task_token = activity_info.task_token
44-
45-
register_callback_url = f"{EXPENSE_SERVER_HOST_PORT}/registerCallback"
78+
workflow_id = activity_info.workflow_id
4679

47-
async with httpx.AsyncClient() as client:
48-
response = await client.post(
49-
register_callback_url,
80+
# Register the workflow ID with the UI system so it can send signals
81+
try:
82+
response = await http_client.post(
83+
f"{EXPENSE_SERVER_HOST_PORT}/registerWorkflow",
5084
params={"id": expense_id},
51-
data={"task_token": task_token.hex()},
85+
data={"workflow_id": workflow_id},
5286
)
5387
response.raise_for_status()
54-
body = response.text
55-
56-
status = body
57-
if status == "SUCCEED":
58-
# register callback succeed
59-
logger.info(f"Successfully registered callback. ExpenseID: {expense_id}")
60-
61-
# Raise the complete-async error which will return from this function but
62-
# does not mark the activity as complete from the workflow perspective.
63-
#
64-
# Activity completion is signaled in the `notify_expense_state_change`
65-
# function in `ui.py`.
66-
activity.raise_complete_async()
67-
68-
logger.warning(f"Register callback failed. ExpenseStatus: {status}")
69-
raise Exception(f"register callback failed status: {status}")
88+
logger.info(f"Registered expense for decision. ExpenseID: {expense_id}")
89+
except Exception as e:
90+
logger.error(f"Failed to register workflow with UI system: {e}")
91+
raise
7092

7193

7294
@activity.defn
7395
async def payment_activity(expense_id: str) -> None:
7496
if not expense_id:
7597
raise ValueError("expense id is empty")
7698

77-
async with httpx.AsyncClient() as client:
99+
client = get_http_client()
100+
try:
78101
response = await client.get(
79102
f"{EXPENSE_SERVER_HOST_PORT}/action",
80103
params={"is_api_call": "true", "type": "payment", "id": expense_id},
81104
)
82105
response.raise_for_status()
83-
body = response.text
106+
except httpx.HTTPStatusError as e:
107+
if 400 <= e.response.status_code < 500:
108+
raise ApplicationError(
109+
f"Client error: {e.response.status_code} {e.response.text}",
110+
non_retryable=True,
111+
) from e
112+
raise
113+
114+
body = response.text
84115

85116
if body == "SUCCEED":
86117
activity.logger.info(f"payment_activity succeed ExpenseID: {expense_id}")

expense/starter.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import argparse
12
import asyncio
23
import uuid
34

@@ -7,20 +8,44 @@
78

89

910
async def main():
11+
parser = argparse.ArgumentParser(description="Start an expense workflow")
12+
parser.add_argument(
13+
"--wait",
14+
action="store_true",
15+
help="Wait for workflow completion (default: start and return immediately)",
16+
)
17+
parser.add_argument(
18+
"--expense-id",
19+
type=str,
20+
help="Expense ID to use (default: generate random UUID)",
21+
)
22+
args = parser.parse_args()
23+
1024
# The client is a heavyweight object that should be created once per process.
1125
client = await Client.connect("localhost:7233")
1226

13-
expense_id = str(uuid.uuid4())
27+
expense_id = args.expense_id or str(uuid.uuid4())
28+
workflow_id = f"expense_{expense_id}"
1429

15-
# Start the workflow (don't wait for completion)
30+
# Start the workflow
1631
handle = await client.start_workflow(
1732
SampleExpenseWorkflow.run,
1833
expense_id,
19-
id=f"expense_{expense_id}",
34+
id=workflow_id,
2035
task_queue="expense",
2136
)
2237

2338
print(f"Started workflow WorkflowID {handle.id} RunID {handle.result_run_id}")
39+
print(f"Workflow will register itself with UI system for expense {expense_id}")
40+
41+
if args.wait:
42+
print("Waiting for workflow to complete...")
43+
result = await handle.result()
44+
print(f"Workflow completed with result: {result}")
45+
return result
46+
else:
47+
print("Workflow started. Use --wait flag to wait for completion.")
48+
return None
2449

2550

2651
if __name__ == "__main__":

expense/ui.py

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class ExpenseState(str, Enum):
1919

2020
# Use memory store for this sample expense system
2121
all_expenses: Dict[str, ExpenseState] = {}
22-
token_map: Dict[str, bytes] = {}
22+
workflow_map: Dict[str, str] = {} # Maps expense_id to workflow_id
2323

2424
app = FastAPI()
2525

@@ -133,42 +133,39 @@ async def status_handler(id: str = Query(...)):
133133
return PlainTextResponse(state.value)
134134

135135

136-
@app.post("/registerCallback")
137-
async def callback_handler(id: str = Query(...), task_token: str = Form(...)):
136+
@app.post("/registerWorkflow")
137+
async def register_workflow_handler(id: str = Query(...), workflow_id: str = Form(...)):
138138
if id not in all_expenses:
139139
return PlainTextResponse("ERROR:INVALID_ID")
140140

141141
curr_state = all_expenses[id]
142142
if curr_state != ExpenseState.CREATED:
143143
return PlainTextResponse("ERROR:INVALID_STATE")
144144

145-
# Convert hex string back to bytes
146-
try:
147-
task_token_bytes = bytes.fromhex(task_token)
148-
except ValueError:
149-
return PlainTextResponse("ERROR:INVALID_FORM_DATA")
150-
151-
print(f"Registered callback for ID={id}, token={task_token}")
152-
token_map[id] = task_token_bytes
145+
print(f"Registered workflow for ID={id}, workflow_id={workflow_id}")
146+
workflow_map[id] = workflow_id
153147
return PlainTextResponse("SUCCEED")
154148

155149

156150
async def notify_expense_state_change(expense_id: str, state: str):
157-
if expense_id not in token_map:
151+
if expense_id not in workflow_map:
158152
print(f"Invalid id: {expense_id}")
159153
return
160154

161155
if workflow_client is None:
162156
print("Workflow client not initialized")
163157
return
164158

165-
token = token_map[expense_id]
159+
workflow_id = workflow_map[expense_id]
166160
try:
167-
handle = workflow_client.get_async_activity_handle(task_token=token)
168-
await handle.complete(state)
169-
print(f"Successfully complete activity: {token.hex()}")
161+
# Send signal to workflow
162+
handle = workflow_client.get_workflow_handle(workflow_id)
163+
await handle.signal("expense_decision_signal", state)
164+
print(
165+
f"Successfully sent signal to workflow: {workflow_id} with decision: {state}"
166+
)
170167
except Exception as err:
171-
print(f"Failed to complete activity with error: {err}")
168+
print(f"Failed to send signal to workflow with error: {err}")
172169

173170

174171
async def main():

0 commit comments

Comments
 (0)