Skip to content

Commit 089127c

Browse files
committed
cleanup
1 parent 437f80e commit 089127c

10 files changed

Lines changed: 387 additions & 294 deletions

File tree

expense/README.md

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,38 @@
11
# Expense
22

3-
This sample workflow processes an expense request. The key part of this sample is to show how to complete an activity asynchronously.
3+
This sample workflow processes an expense request. It demonstrates human-in-the loop processing and asynchronous activity completion.
44

5-
## Sample Description
5+
## Overview
66

7-
* Create a new expense report.
8-
* Wait for the expense report to be approved. This could take an arbitrary amount of time. So the activity's `execute` method has to return before it is actually approved. This is done by raising `activity.AsyncActivityCompleteError` so the framework knows the activity is not completed yet.
9-
* When the expense is approved (or rejected), somewhere in the world needs to be notified, and it will need to call `client.get_async_activity_handle().complete()` to tell Temporal service that the activity is now completed.
10-
In this sample case, the sample expense system does this job. In real world, you will need to register some listener to the expense system or you will need to have your own polling agent to check for the expense status periodically.
11-
* After the wait activity is completed, it does the payment for the expense (UI step in this sample case).
7+
This sample demonstrates the following workflow:
128

13-
This sample relies on a sample expense system to work.
9+
1. **Create Expense**: The workflow executes the `create_expense_activity` to initialize a new expense report in the external system.
10+
11+
2. **Wait for Decision**: The workflow calls `wait_for_decision_activity`, which demonstrates asynchronous activity completion. The activity registers itself for external completion using its task token, then calls `activity.raise_complete_async()` to signal that it will complete later without blocking the worker.
12+
13+
3. **Async Completion**: When a human approves or rejects the expense, an external process uses the stored task token to call `workflow_client.get_async_activity_handle(task_token).complete()`, notifying Temporal that the waiting activity has finished and providing the decision result.
14+
15+
4. **Process Payment**: Once the workflow receives the approval decision, it executes the `payment_activity` to complete the simulated expense processing.
16+
17+
This pattern enables human-in-the-loop workflows where activities can wait as long as necessary for external decisions without consuming worker resources or timing out.
1418

1519
## Steps To Run Sample
1620

1721
* You need a Temporal service running. See the main [README.md](../README.md) for more details.
1822
* Start the sample expense system UI:
19-
```bash
20-
uv run -m expense.ui
21-
```
23+
```bash
24+
uv run -m expense.ui
25+
```
2226
* Start workflow and activity workers:
23-
```bash
24-
uv run -m expense.worker
25-
```
27+
```bash
28+
uv run -m expense.worker
29+
```
2630
* Start expense workflow execution:
27-
```bash
28-
uv run -m expense.starter
29-
```
31+
```bash
32+
uv run -m expense.starter
33+
```
3034
* When you see the console print out that the expense is created, go to [localhost:8099/list](http://localhost:8099/list) to approve the expense.
3135
* You should see the workflow complete after you approve the expense. You can also reject the expense.
32-
* If you see the workflow failed, try to change to a different port number in `ui.py` and `activities.py`. Then rerun everything.
3336

3437
## Running Tests
3538

@@ -43,17 +46,18 @@ uv run pytest expense/test_workflow.py::TestSampleExpenseWorkflow::test_workflow
4346

4447
## Key Concepts Demonstrated
4548

46-
* **Async Activity Completion**: Using `activity.raise_complete_async()` to indicate an activity will complete asynchronously
4749
* **Human-in-the-Loop Workflows**: Long-running workflows that wait for human interaction
48-
* **External System Integration**: HTTP-based communication between activities and external systems
49-
* **Task Tokens**: Using task tokens to complete activities from external systems
50-
* **Web UI Integration**: FastAPI-based expense approval system
50+
* **Async Activity Completion**: Using `activity.raise_complete_async()` to indicate an activity will complete asynchronously, then calling `complete()` on a handle to the async activity.
51+
* **External System Integration**: Communication between workflows and external systems via web services.
52+
53+
## Troubleshooting
54+
55+
If you see the workflow failed, the cause may be a port conflict. You can try to change to a different port number in `__init__.py`. Then rerun everything.
5156

5257
## Files
5358

5459
* `workflow.py` - The main expense processing workflow
5560
* `activities.py` - Three activities: create expense, wait for decision, process payment
56-
* `ui.py` - FastAPI-based mock expense system with web UI
57-
* `worker.py` - Worker to run workflows and activities
58-
* `starter.py` - Client to start workflow executions
59-
* `test_workflow.py` - Unit tests with mocked activities
61+
* `ui.py` - A demonstration expense approval system web UI
62+
* `worker.py` - Worker to run workflows
63+
* `starter.py` - Client to start workflow executions by submitting an expense report

expense/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
1+
EXPENSE_SERVER_HOST = "localhost"
2+
EXPENSE_SERVER_PORT = 8099
3+
EXPENSE_SERVER_HOST_PORT = f"http://{EXPENSE_SERVER_HOST}:{EXPENSE_SERVER_PORT}"

expense/activities.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import httpx
22
from temporalio import activity
33

4-
EXPENSE_SERVER_HOST_PORT = "http://localhost:8099"
4+
from expense import EXPENSE_SERVER_HOST_PORT
55

66

77
@activity.defn
@@ -12,7 +12,7 @@ async def create_expense_activity(expense_id: str) -> None:
1212
async with httpx.AsyncClient() as client:
1313
response = await client.get(
1414
f"{EXPENSE_SERVER_HOST_PORT}/create",
15-
params={"is_api_call": "true", "id": expense_id}
15+
params={"is_api_call": "true", "id": expense_id},
1616
)
1717
response.raise_for_status()
1818
body = response.text
@@ -43,12 +43,12 @@ async def wait_for_decision_activity(expense_id: str) -> str:
4343
task_token = activity_info.task_token
4444

4545
register_callback_url = f"{EXPENSE_SERVER_HOST_PORT}/registerCallback"
46-
46+
4747
async with httpx.AsyncClient() as client:
4848
response = await client.post(
4949
register_callback_url,
5050
params={"id": expense_id},
51-
data={"task_token": task_token.hex()}
51+
data={"task_token": task_token.hex()},
5252
)
5353
response.raise_for_status()
5454
body = response.text
@@ -58,8 +58,11 @@ async def wait_for_decision_activity(expense_id: str) -> str:
5858
# register callback succeed
5959
logger.info(f"Successfully registered callback. ExpenseID: {expense_id}")
6060

61-
# Raise the complete-async error which will complete this function but
62-
# does not consider the activity complete from the workflow perspective
61+
# Raise the complete-async error which will return from this function but
62+
# does not mark the activity as complete from the workflow perspective.
63+
#
64+
# Activity completion is signaled in the `notify_expense_state_change`
65+
# function in `ui.py`.
6366
activity.raise_complete_async()
6467

6568
logger.warning(f"Register callback failed. ExpenseStatus: {status}")
@@ -74,7 +77,7 @@ async def payment_activity(expense_id: str) -> None:
7477
async with httpx.AsyncClient() as client:
7578
response = await client.get(
7679
f"{EXPENSE_SERVER_HOST_PORT}/action",
77-
params={"is_api_call": "true", "type": "payment", "id": expense_id}
80+
params={"is_api_call": "true", "type": "payment", "id": expense_id},
7881
)
7982
response.raise_for_status()
8083
body = response.text
@@ -83,4 +86,4 @@ async def payment_activity(expense_id: str) -> None:
8386
activity.logger.info(f"payment_activity succeed ExpenseID: {expense_id}")
8487
return
8588

86-
raise Exception(body)
89+
raise Exception(body)

expense/starter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ async def main():
1111
client = await Client.connect("localhost:7233")
1212

1313
expense_id = str(uuid.uuid4())
14-
14+
1515
# Start the workflow (don't wait for completion)
1616
handle = await client.start_workflow(
1717
SampleExpenseWorkflow.run,
@@ -24,4 +24,4 @@ async def main():
2424

2525

2626
if __name__ == "__main__":
27-
asyncio.run(main())
27+
asyncio.run(main())

expense/ui.py

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import asyncio
22
from enum import Enum
3-
from typing import Dict
3+
from typing import Dict, Optional
44

55
import uvicorn
66
from fastapi import FastAPI, Form, Query
77
from fastapi.responses import HTMLResponse, PlainTextResponse
88
from temporalio.client import Client
99

10+
from expense import EXPENSE_SERVER_HOST, EXPENSE_SERVER_PORT
11+
1012

1113
class ExpenseState(str, Enum):
1214
CREATED = "CREATED"
@@ -22,7 +24,7 @@ class ExpenseState(str, Enum):
2224
app = FastAPI()
2325

2426
# Global client - will be initialized when starting the server
25-
workflow_client: Client = None
27+
workflow_client: Optional[Client] = None
2628

2729

2830
@app.get("/", response_class=HTMLResponse)
@@ -35,7 +37,7 @@ async def list_handler():
3537
<table border=1>
3638
<tr><th>Expense ID</th><th>Status</th><th>Action</th></tr>
3739
"""
38-
40+
3941
# Sort keys for consistent display
4042
for expense_id in sorted(all_expenses.keys()):
4143
state = all_expenses[expense_id]
@@ -51,16 +53,14 @@ async def list_handler():
5153
</a>
5254
"""
5355
html += f"<tr><td>{expense_id}</td><td>{state}</td><td>{action_link}</td></tr>"
54-
56+
5557
html += "</table>"
5658
return html
5759

5860

5961
@app.get("/action", response_class=HTMLResponse)
6062
async def action_handler(
61-
type: str = Query(...),
62-
id: str = Query(...),
63-
is_api_call: str = Query("false")
63+
type: str = Query(...), id: str = Query(...), is_api_call: str = Query("false")
6464
):
6565
if id not in all_expenses:
6666
if is_api_call == "true":
@@ -69,7 +69,7 @@ async def action_handler(
6969
return PlainTextResponse("Invalid ID")
7070

7171
old_state = all_expenses[id]
72-
72+
7373
if type == "approve":
7474
all_expenses[id] = ExpenseState.APPROVED
7575
elif type == "reject":
@@ -84,34 +84,37 @@ async def action_handler(
8484

8585
if is_api_call == "true" or type == "payment":
8686
# For API calls and payment, just return success
87-
if old_state == ExpenseState.CREATED and all_expenses[id] in [ExpenseState.APPROVED, ExpenseState.REJECTED]:
87+
if old_state == ExpenseState.CREATED and all_expenses[id] in [
88+
ExpenseState.APPROVED,
89+
ExpenseState.REJECTED,
90+
]:
8891
# Report state change
8992
await notify_expense_state_change(id, all_expenses[id])
90-
93+
9194
print(f"Set state for {id} from {old_state} to {all_expenses[id]}")
9295
return PlainTextResponse("SUCCEED")
9396
else:
9497
# For UI calls, notify and redirect to list
95-
if old_state == ExpenseState.CREATED and all_expenses[id] in [ExpenseState.APPROVED, ExpenseState.REJECTED]:
98+
if old_state == ExpenseState.CREATED and all_expenses[id] in [
99+
ExpenseState.APPROVED,
100+
ExpenseState.REJECTED,
101+
]:
96102
await notify_expense_state_change(id, all_expenses[id])
97-
103+
98104
print(f"Set state for {id} from {old_state} to {all_expenses[id]}")
99105
return await list_handler()
100106

101107

102108
@app.get("/create")
103-
async def create_handler(
104-
id: str = Query(...),
105-
is_api_call: str = Query("false")
106-
):
109+
async def create_handler(id: str = Query(...), is_api_call: str = Query("false")):
107110
if id in all_expenses:
108111
if is_api_call == "true":
109112
return PlainTextResponse("ERROR:ID_ALREADY_EXISTS")
110113
else:
111114
return PlainTextResponse("ID already exists")
112115

113116
all_expenses[id] = ExpenseState.CREATED
114-
117+
115118
if is_api_call == "true":
116119
print(f"Created new expense id: {id}")
117120
return PlainTextResponse("SUCCEED")
@@ -131,13 +134,10 @@ async def status_handler(id: str = Query(...)):
131134

132135

133136
@app.post("/registerCallback")
134-
async def callback_handler(
135-
id: str = Query(...),
136-
task_token: str = Form(...)
137-
):
137+
async def callback_handler(id: str = Query(...), task_token: str = Form(...)):
138138
if id not in all_expenses:
139139
return PlainTextResponse("ERROR:INVALID_ID")
140-
140+
141141
curr_state = all_expenses[id]
142142
if curr_state != ExpenseState.CREATED:
143143
return PlainTextResponse("ERROR:INVALID_STATE")
@@ -158,6 +158,10 @@ async def notify_expense_state_change(expense_id: str, state: str):
158158
print(f"Invalid id: {expense_id}")
159159
return
160160

161+
if workflow_client is None:
162+
print("Workflow client not initialized")
163+
return
164+
161165
token = token_map[expense_id]
162166
try:
163167
handle = workflow_client.get_async_activity_handle(task_token=token)
@@ -169,22 +173,21 @@ async def notify_expense_state_change(expense_id: str, state: str):
169173

170174
async def main():
171175
global workflow_client
172-
176+
173177
# Initialize the workflow client
174178
workflow_client = await Client.connect("localhost:7233")
175-
176-
print("Expense system UI available at http://localhost:8099")
177-
179+
180+
print(
181+
f"Expense system UI available at http://{EXPENSE_SERVER_HOST}:{EXPENSE_SERVER_PORT}"
182+
)
183+
178184
# Start the FastAPI server
179185
config = uvicorn.Config(
180-
app,
181-
host="0.0.0.0",
182-
port=8099,
183-
log_level="info"
186+
app, host="0.0.0.0", port=EXPENSE_SERVER_PORT, log_level="info"
184187
)
185188
server = uvicorn.Server(config)
186189
await server.serve()
187190

188191

189192
if __name__ == "__main__":
190-
asyncio.run(main())
193+
asyncio.run(main())

expense/worker.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33
from temporalio.client import Client
44
from temporalio.worker import Worker
55

6-
from .activities import create_expense_activity, payment_activity, wait_for_decision_activity
6+
from .activities import (
7+
create_expense_activity,
8+
payment_activity,
9+
wait_for_decision_activity,
10+
)
711
from .workflow import SampleExpenseWorkflow
812

913

@@ -28,4 +32,4 @@ async def main():
2832

2933

3034
if __name__ == "__main__":
31-
asyncio.run(main())
35+
asyncio.run(main())

0 commit comments

Comments
 (0)