You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This sample workflow processes an expense request. The key part of this sample is to show how to complete an activity asynchronously.
3
+
This sample workflow processes an expense request. It demonstrates human-in-the loop processing and asynchronous activity completion.
4
4
5
-
## Sample Description
5
+
## Overview
6
6
7
-
* Create a new expense report.
8
-
* Wait for the expense report to be approved. This could take an arbitrary amount of time. So the activity's `execute` method has to return before it is actually approved. This is done by raising `activity.AsyncActivityCompleteError` so the framework knows the activity is not completed yet.
9
-
* When the expense is approved (or rejected), somewhere in the world needs to be notified, and it will need to call `client.get_async_activity_handle().complete()` to tell Temporal service that the activity is now completed.
10
-
In this sample case, the sample expense system does this job. In real world, you will need to register some listener to the expense system or you will need to have your own polling agent to check for the expense status periodically.
11
-
* After the wait activity is completed, it does the payment for the expense (UI step in this sample case).
7
+
This sample demonstrates the following workflow:
12
8
13
-
This sample relies on a sample expense system to work.
9
+
1.**Create Expense**: The workflow executes the `create_expense_activity` to initialize a new expense report in the external system.
10
+
11
+
2.**Wait for Decision**: The workflow calls `wait_for_decision_activity`, which demonstrates asynchronous activity completion. The activity registers itself for external completion using its task token, then calls `activity.raise_complete_async()` to signal that it will complete later without blocking the worker.
12
+
13
+
3.**Async Completion**: When a human approves or rejects the expense, an external process uses the stored task token to call `workflow_client.get_async_activity_handle(task_token).complete()`, notifying Temporal that the waiting activity has finished and providing the decision result.
14
+
15
+
4.**Process Payment**: Once the workflow receives the approval decision, it executes the `payment_activity` to complete the simulated expense processing.
16
+
17
+
This pattern enables human-in-the-loop workflows where activities can wait as long as necessary for external decisions without consuming worker resources or timing out.
14
18
15
19
## Steps To Run Sample
16
20
17
21
* You need a Temporal service running. See the main [README.md](../README.md) for more details.
18
22
* Start the sample expense system UI:
19
-
```bash
20
-
uv run -m expense.ui
21
-
```
23
+
```bash
24
+
uv run -m expense.ui
25
+
```
22
26
* Start workflow and activity workers:
23
-
```bash
24
-
uv run -m expense.worker
25
-
```
27
+
```bash
28
+
uv run -m expense.worker
29
+
```
26
30
* Start expense workflow execution:
27
-
```bash
28
-
uv run -m expense.starter
29
-
```
31
+
```bash
32
+
uv run -m expense.starter
33
+
```
30
34
* When you see the console print out that the expense is created, go to [localhost:8099/list](http://localhost:8099/list) to approve the expense.
31
35
* You should see the workflow complete after you approve the expense. You can also reject the expense.
32
-
* If you see the workflow failed, try to change to a different port number in `ui.py` and `activities.py`. Then rerun everything.
33
36
34
37
## Running Tests
35
38
@@ -43,17 +46,18 @@ uv run pytest expense/test_workflow.py::TestSampleExpenseWorkflow::test_workflow
43
46
44
47
## Key Concepts Demonstrated
45
48
46
-
***Async Activity Completion**: Using `activity.raise_complete_async()` to indicate an activity will complete asynchronously
47
49
***Human-in-the-Loop Workflows**: Long-running workflows that wait for human interaction
48
-
***External System Integration**: HTTP-based communication between activities and external systems
49
-
***Task Tokens**: Using task tokens to complete activities from external systems
50
-
***Web UI Integration**: FastAPI-based expense approval system
50
+
***Async Activity Completion**: Using `activity.raise_complete_async()` to indicate an activity will complete asynchronously, then calling `complete()` on a handle to the async activity.
51
+
***External System Integration**: Communication between workflows and external systems via web services.
52
+
53
+
## Troubleshooting
54
+
55
+
If you see the workflow failed, the cause may be a port conflict. You can try to change to a different port number in `__init__.py`. Then rerun everything.
51
56
52
57
## Files
53
58
54
59
*`workflow.py` - The main expense processing workflow
55
60
*`activities.py` - Three activities: create expense, wait for decision, process payment
56
-
*`ui.py` - FastAPI-based mock expense system with web UI
57
-
*`worker.py` - Worker to run workflows and activities
58
-
*`starter.py` - Client to start workflow executions
59
-
*`test_workflow.py` - Unit tests with mocked activities
61
+
*`ui.py` - A demonstration expense approval system web UI
62
+
*`worker.py` - Worker to run workflows
63
+
*`starter.py` - Client to start workflow executions by submitting an expense report
0 commit comments