11# Expense Workflow and Activities Specification
22
33## Overview
4- The Expense Processing System demonstrates a human-in-the-loop workflow pattern using Temporal. It processes expense requests through a multi-step approval workflow with asynchronous activity completion. The system is implemented in both Python and Go with identical business logic and behavior.
4+ The Expense Processing System demonstrates a human-in-the-loop workflow pattern using Temporal. It processes expense requests through a multi-step approval workflow with asynchronous activity completion.
55
66## Business Process Flow
77
@@ -12,7 +12,8 @@ The Expense Processing System demonstrates a human-in-the-loop workflow pattern
1212
1313### Decision Logic
1414- ** APPROVED** : Continue to payment processing → Return "COMPLETED"
15- - ** REJECTED** : Skip payment processing → Return empty string ""
15+ - ** Any other value** : Skip payment processing → Return empty string ""
16+ - This includes: "REJECTED", "DENIED", "PENDING", "CANCELLED", or any unknown value
1617- ** ERROR** : Propagate failure to workflow caller
1718
1819## Architecture Components
@@ -32,21 +33,16 @@ The Expense Processing System demonstrates a human-in-the-loop workflow pattern
3233
3334### Workflow Definition
3435
35- #### Python Implementation ( ` SampleExpenseWorkflow ` )
36+ #### ` SampleExpenseWorkflow `
3637``` python
3738@workflow.defn
3839class SampleExpenseWorkflow :
3940 @workflow.run
4041 async def run (self , expense_id : str ) -> str
4142```
4243
43- # ### Go Implementation (`SampleExpenseWorkflow`)
44- ```go
45- func SampleExpenseWorkflow(ctx workflow.Context, expenseID string) (result string, err error)
46- ```
47-
4844** Input Parameters** :
49- - `expense_id` / `expenseID` : Unique identifier for the expense request
45+ - `expense_id` : Unique identifier for the expense request
5046
5147** Return Values** :
5248- Success (Approved): `" COMPLETED" `
@@ -64,8 +60,7 @@ func SampleExpenseWorkflow(ctx workflow.Context, expenseID string) (result strin
6460
6561** Purpose** : Initialize expense record in external system
6662
67- ** Python** : `create_expense_activity(expense_id: str ) -> None `
68- ** Go** : `CreateExpenseActivity(ctx context.Context, expenseID string) error`
63+ ** Function Signature** : `create_expense_activity(expense_id: str ) -> None `
6964
7065** Business Rules** :
7166- Validate expense_id is not empty
@@ -74,27 +69,27 @@ func SampleExpenseWorkflow(ctx workflow.Context, expenseID string) (result strin
7469- Any other response triggers exception
7570
7671** Error Handling** :
77- - Empty expense_id: `ValueError ` / `errors.New`
78- - HTTP errors: Propagated to workflow
79- - Unexpected response: Exception with response body
72+ - Empty expense_id: `ValueError ` with message " expense id is empty"
73+ - Whitespace- only expense_id: `ValueError ` (same as empty)
74+ - HTTP errors: `httpx.HTTPStatusError` propagated to workflow
75+ - Server error responses: `Exception ` with specific error message (e.g., " ERROR:ID_ALREADY_EXISTS" )
76+ - Network failures: Connection timeouts and DNS resolution errors propagated
8077
8178# ### 2. Wait for Decision Activity
8279
8380** Purpose** : Register for async completion and wait for human approval
8481
85- ** Python** : `wait_for_decision_activity(expense_id: str ) -> str `
86- ** Go** : `WaitForDecisionActivity(ctx context.Context, expenseID string) (string, error)`
82+ ** Function Signature** : `wait_for_decision_activity(expense_id: str ) -> str `
8783
8884** Async Completion Pattern** :
89- - ** Python** : Raises `activity.raise_complete_async()`
90- - ** Go** : Returns `activity.ErrResultPending`
85+ The activity demonstrates asynchronous activity completion. It registers itself for external completion using its task token, then calls `activity.raise_complete_async()` to signal that it will complete later without blocking the worker. This pattern enables human- in - the- loop workflows where activities can wait as long as necessary for external decisions without consuming worker resources or timing out.
9186
9287** Business Logic** :
93881 . Validate expense_id is not empty
94892 . Extract activity task token from context
95903 . Register callback with external system via HTTP POST
96- 4 . Signal async completion to Temporal
97- 5 . External system later completes activity with decision
91+ 4 . Call `activity.raise_complete_async()` to signal async completion
92+ 5 . When a human approves or rejects the expense, an external process uses the stored task token to call `workflow_client.get_async_activity_handle(task_token).complete()` , providing the decision result
9893
9994** HTTP Integration** :
10095- ** Endpoint** : POST `/ registerCallback? id ={expense_id}`
@@ -104,7 +99,8 @@ func SampleExpenseWorkflow(ctx workflow.Context, expenseID string) (result strin
10499** Completion Values** :
105100- `" APPROVED" ` : Expense approved for payment
106101- `" REJECTED" ` : Expense denied
107- - Other values: Treated as rejection
102+ - `" DENIED" ` , `" PENDING" ` , `" CANCELLED" ` : Also treated as rejection
103+ - Any other value: Treated as rejection (workflow returns empty string)
108104
109105** Error Scenarios** :
110106- Empty expense_id: Immediate validation error
@@ -115,8 +111,7 @@ func SampleExpenseWorkflow(ctx workflow.Context, expenseID string) (result strin
115111
116112** Purpose** : Process payment for approved expenses
117113
118- ** Python** : `payment_activity(expense_id: str ) -> None `
119- ** Go** : `PaymentActivity(ctx context.Context, expenseID string) error`
114+ ** Function Signature** : `payment_activity(expense_id: str ) -> None `
120115
121116** Business Rules** :
122117- Only called for approved expenses
@@ -125,9 +120,10 @@ func SampleExpenseWorkflow(ctx workflow.Context, expenseID string) (result strin
125120- Success condition: Response body equals " SUCCEED"
126121
127122** Error Handling** :
128- - Empty expense_id: `ValueError ` / `errors.New`
129- - HTTP errors: Propagated to workflow
130- - Payment failure: Exception with response body
123+ - Empty expense_id: `ValueError ` with message " expense id is empty"
124+ - HTTP errors: `httpx.HTTPStatusError` propagated to workflow
125+ - Payment failure: `Exception ` with specific error message (e.g., " ERROR:INSUFFICIENT_FUNDS" )
126+ - Network failures: Connection timeouts and DNS resolution errors propagated
131127
132128# # State Management
133129
@@ -164,8 +160,9 @@ func SampleExpenseWorkflow(ctx workflow.Context, expenseID string) (result strin
164160
165161# ## External System Errors
166162- ** Business Logic Errors** : Duplicate expense IDs, invalid states
167- - ** Response Format** : Error messages in HTTP response body
163+ - ** Response Format** : Error messages in HTTP response body (e.g., " ERROR:ID_ALREADY_EXISTS " )
168164- ** Handling** : Converted to application errors with descriptive messages
165+ - ** Tested Examples** : " ERROR:INVALID_ID" , " ERROR:INSUFFICIENT_FUNDS" , " ERROR:INVALID_STATE"
169166
170167# ## Async Completion Errors
171168- ** Registration Failure** : Activity fails immediately if callback registration fails
@@ -184,17 +181,14 @@ func SampleExpenseWorkflow(ctx workflow.Context, expenseID string) (result strin
184181- ** Retry** : Follows activity retry policy
185182- ** Workflow Impact** : Timeout failures propagate to workflow
186183
187- # ## Production Considerations
188- - ** Human Approval** : Consider longer timeouts for real- world approval processes
189- - ** Business Hours** : May need different timeouts based on operational hours
190- - ** Escalation** : Implement escalation workflows for timeout scenarios
184+
191185
192186# # Testing Patterns
193187
194188# ## Mock Testing Approach
195- Both implementations support comprehensive testing with mocked activities:
189+ The system supports comprehensive testing with mocked activities:
196190
197- # ### Python Test Patterns
191+ # ### Test Patterns
198192```python
199193@ activity.defn(name = " create_expense_activity" )
200194async def create_expense_mock(expense_id: str ) -> None :
@@ -203,66 +197,49 @@ async def create_expense_mock(expense_id: str) -> None:
203197@ activity.defn(name = " wait_for_decision_activity" )
204198async def wait_for_decision_mock(expense_id: str ) -> str :
205199 return " APPROVED" # Decision mock
206- ```
207200
208- # ### Go Test Patterns
209- ```go
210- env.OnActivity(CreateExpenseActivity, mock.Anything).Return(nil).Once()
211- env.OnActivity(WaitForDecisionActivity, mock.Anything).Return( " APPROVED " , nil).Once( )
201+ # Testing async completion behavior:
202+ from temporalio.activity import _CompleteAsyncError
203+ with pytest.raises(_CompleteAsyncError):
204+ await activity_env.run(wait_for_decision_activity, " test-expense " )
212205```
213206
214207# ## Test Scenarios
2152081 . ** Happy Path** : All activities succeed, expense approved
2162092 . ** Rejection Path** : Expense rejected, payment skipped
2172103 . ** Failure Scenarios** : Activity failures at each step
2182114 . ** Mock Server Testing** : HTTP interactions with test server
219- 5 . ** Async Completion Testing** : Simulated callback completion
212+ 5 . ** Async Completion Testing** : Simulated callback completion (expects `_CompleteAsyncError` )
213+ 6 . ** Decision Value Testing** : All possible decision values (APPROVED , REJECTED , DENIED , PENDING , CANCELLED , UNKNOWN )
214+ 7 . ** Retryable Failures** : Activities that fail temporarily and then succeed on retry
215+ 8 . ** Parameter Validation** : Empty and whitespace- only expense IDs
216+ 9 . ** Logging Behavior** : Verify activity logging works correctly
217+ 10 . ** Server Error Responses** : Specific error formats like " ERROR:ID_ALREADY_EXISTS"
220218
221219# ## Mock Server Integration
222- - ** Go Implementation** : Uses `httptest.NewServer` for HTTP mocking
223- - ** Python Implementation** : Can use similar patterns with test frameworks
220+ - ** HTTP Mocking** : Uses test frameworks to mock HTTP server responses
224221- ** Delayed Completion** : Simulates human approval delays in tests
225222
226- # # Cross-Language Compatibility
227-
228- # ## Functional Equivalence
229- Both Python and Go implementations provide identical:
230- - ** Business Logic** : Same workflow steps and decision points
231- - ** External Integration** : Same HTTP endpoints and payloads
232- - ** Timeout Configuration** : Same duration settings
233- - ** Error Handling** : Equivalent error scenarios and responses
234-
235- # ## Implementation Differences
236- - ** Async Patterns** : Language- specific async completion mechanisms
237- - ** Error Types** : Language- native exception/ error handling
238- - ** HTTP Libraries** : `httpx` (Python) vs `net/ http` (Go)
239- - ** Logging** : Framework- specific logging approaches
240-
241- # ## Interoperability
242- - ** Task Tokens** : Binary compatible between implementations
243- - ** HTTP Payloads** : Same format for external system integration
244- - ** Workflow Results** : Same return value semantics
245- - ** External System** : Single UI can serve both implementations
246-
247- # # Production Deployment Considerations
248-
249- # ## Scalability
250- - ** Stateless Activities** : No local state, horizontally scalable
251- - ** External System** : UI system should support concurrent requests
252- - ** Task Token Storage** : Consider persistent storage for production UI
253-
254- # ## Reliability
255- - ** Retry Policies** : Configure appropriate retry behavior for each activity
256- - ** Circuit Breakers** : Consider circuit breaker patterns for external HTTP calls
257- - ** Monitoring** : Implement metrics and alerting for workflow execution
258-
259- # ## Security
260- - ** Task Token Security** : Protect task tokens from unauthorized access
261- - ** HTTP Security** : Use HTTPS for production external system integration
262- - ** Input Validation** : Comprehensive validation of expense IDs and external inputs
263-
264- # ## Observability
265- - ** Workflow Tracing** : Temporal provides built- in workflow execution history
266- - ** Activity Metrics** : Monitor activity success rates and durations
267- - ** External System Integration** : Log HTTP interactions for debugging
268- - ** Human Approval Metrics** : Track approval rates and response times
223+ # ## Edge Case Testing
224+ Tests include comprehensive coverage of edge cases and error scenarios:
225+
226+ # ### Retry Behavior Testing
227+ - ** Transient Failures** : Activities that fail on first attempts but succeed after retries
228+ - ** Retry Counting** : Verification that activities retry the expected number of times
229+ - ** Mixed Scenarios** : Different activities failing and recovering independently
230+
231+ # ### Parameter Validation Testing
232+ - ** Empty Strings** : Expense IDs that are completely empty (`" " ` )
233+ - ** Whitespace- Only** : Expense IDs containing only spaces (`" " ` )
234+ - ** Non- Retryable Errors** : Validation failures that should not be retried
235+
236+ # ### Logging Verification
237+ - ** Activity Logging** : Ensures activity.logger.info() calls work correctly
238+ - ** Workflow Logging** : Verification of workflow- level logging behavior
239+ - ** Log Content** : Checking that log messages contain expected information
240+
241+ # ### Server Error Response Testing
242+ - ** Specific Error Codes** : Testing responses like " ERROR:ID_ALREADY_EXISTS"
243+ - ** HTTP Status Errors** : Network- level HTTP errors vs application errors
244+ - ** Error Message Propagation** : Ensuring error details reach the workflow caller
245+
0 commit comments