Skip to content

Commit 0ddc00e

Browse files
Translated Go sample of sliding window to be written in python (temporalio#210)
* translated Go sample of sliding window to be writting in python * refactor, change folder name, fix imports and update readme to match style * Cleaned things up and got everything functioning * Run linter * sort imports * fixed undeclared types * Change Readme to instruct running from the root directory * force commit to try to fix required actions --------- Co-authored-by: tconley1428 <timothy.conley@temporal.io>
1 parent a5ee599 commit 0ddc00e

File tree

9 files changed

+539
-0
lines changed

9 files changed

+539
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ Some examples require extra dependencies. See each sample's directory for specif
5757
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow.
5858
<!-- Keep this list in alphabetical order -->
5959
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language.
60+
* [batch_sliding_window](batch_sliding_window) - Batch processing with a sliding window of child workflows.
6061
* [bedrock](bedrock) - Orchestrate a chatbot with Amazon Bedrock.
6162
* [cloud_export_to_parquet](cloud_export_to_parquet) - Set up schedule workflow to process exported files on an hourly basis
6263
* [context_propagation](context_propagation) - Context propagation through workflows/activities via interceptor.

batch_sliding_window/README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Batch Sliding Window
2+
3+
This sample demonstrates a batch processing workflow that maintains a sliding window of record processing workflows.
4+
5+
A `SlidingWindowWorkflow` starts a configured number (sliding window size) of `RecordProcessorWorkflow` children in parallel. Each child processes a single record. When a child completes, a new child is started.
6+
7+
The `SlidingWindowWorkflow` calls continue-as-new after starting a preconfigured number of children to keep its history size bounded. A `RecordProcessorWorkflow` reports its completion through a signal to its parent, which allows notification of a parent that called continue-as-new.
8+
9+
A single instance of `SlidingWindowWorkflow` has limited window size and throughput. To support larger window size and overall throughput, multiple instances of `SlidingWindowWorkflow` run in parallel.
10+
11+
### Running This Sample
12+
13+
To run, first see [README.md](../README.md) for prerequisites. Then, run the following from root directory to start the worker:
14+
15+
uv run batch_sliding_window/worker.py
16+
17+
This will start the worker. Then, in another terminal, run the following to execute the workflow:
18+
19+
uv run batch_sliding_window/starter.py
20+
21+
The workflow will process 90 records using a sliding window of 10 parallel workers across 3 partitions, with a page size of 5 records per continue-as-new iteration.

batch_sliding_window/__init__.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"""Sliding Window Batch Processing Sample.
2+
3+
This sample demonstrates a batch processing workflow that maintains a sliding window
4+
of record processing workflows. It includes:
5+
6+
- ProcessBatchWorkflow: Main workflow that partitions work across multiple sliding windows
7+
- SlidingWindowWorkflow: Implements the sliding window pattern with continue-as-new
8+
- RecordProcessorWorkflow: Processes individual records
9+
- RecordLoader: Activity for loading records from external sources
10+
"""
11+
12+
from batch_sliding_window.batch_workflow import (
13+
ProcessBatchWorkflow,
14+
ProcessBatchWorkflowInput,
15+
)
16+
from batch_sliding_window.record_loader_activity import (
17+
GetRecordsInput,
18+
GetRecordsOutput,
19+
RecordLoader,
20+
SingleRecord,
21+
)
22+
from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow
23+
from batch_sliding_window.sliding_window_workflow import (
24+
SlidingWindowState,
25+
SlidingWindowWorkflow,
26+
SlidingWindowWorkflowInput,
27+
)
28+
29+
__all__ = [
30+
"ProcessBatchWorkflow",
31+
"ProcessBatchWorkflowInput",
32+
"SlidingWindowWorkflow",
33+
"SlidingWindowWorkflowInput",
34+
"SlidingWindowState",
35+
"RecordProcessorWorkflow",
36+
"RecordLoader",
37+
"GetRecordsInput",
38+
"GetRecordsOutput",
39+
"SingleRecord",
40+
]
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import asyncio
2+
from dataclasses import dataclass
3+
from datetime import timedelta
4+
from typing import List
5+
6+
from temporalio import workflow
7+
from temporalio.common import WorkflowIDReusePolicy
8+
from temporalio.exceptions import ApplicationError
9+
10+
from batch_sliding_window.record_loader_activity import RecordLoader
11+
from batch_sliding_window.sliding_window_workflow import (
12+
SlidingWindowWorkflow,
13+
SlidingWindowWorkflowInput,
14+
)
15+
16+
17+
@dataclass
18+
class ProcessBatchWorkflowInput:
19+
"""Input for the ProcessBatchWorkflow.
20+
21+
A single input structure is preferred to multiple workflow arguments
22+
to simplify backward compatible API changes.
23+
"""
24+
25+
page_size: int # Number of children started by a single sliding window workflow run
26+
sliding_window_size: int # Maximum number of children to run in parallel
27+
partitions: int # How many sliding windows to run in parallel
28+
29+
30+
@workflow.defn
31+
class ProcessBatchWorkflow:
32+
"""Sample workflow that partitions the data set into continuous ranges.
33+
34+
A real application can choose any other way to divide the records
35+
into multiple collections.
36+
"""
37+
38+
@workflow.run
39+
async def run(self, input: ProcessBatchWorkflowInput) -> int:
40+
# Get total record count
41+
record_count: int = await workflow.execute_activity_method(
42+
RecordLoader.get_record_count,
43+
start_to_close_timeout=timedelta(seconds=5),
44+
)
45+
46+
if input.sliding_window_size < input.partitions:
47+
raise ApplicationError(
48+
"SlidingWindowSize cannot be less than number of partitions"
49+
)
50+
51+
partitions = self._divide_into_partitions(record_count, input.partitions)
52+
window_sizes = self._divide_into_partitions(
53+
input.sliding_window_size, input.partitions
54+
)
55+
56+
workflow.logger.info(
57+
f"ProcessBatchWorkflow started",
58+
extra={
59+
"input": input,
60+
"record_count": record_count,
61+
"partitions": partitions,
62+
"window_sizes": window_sizes,
63+
},
64+
)
65+
66+
# Start child workflows for each partition
67+
tasks = []
68+
offset = 0
69+
70+
for i in range(input.partitions):
71+
# Make child id more user-friendly
72+
child_id = f"{workflow.info().workflow_id}/{i}"
73+
74+
# Define partition boundaries
75+
maximum_partition_offset = offset + partitions[i]
76+
if maximum_partition_offset > record_count:
77+
maximum_partition_offset = record_count
78+
79+
child_input = SlidingWindowWorkflowInput(
80+
page_size=input.page_size,
81+
sliding_window_size=window_sizes[i],
82+
offset=offset, # inclusive
83+
maximum_offset=maximum_partition_offset, # exclusive
84+
progress=0,
85+
current_records=None,
86+
)
87+
88+
task = workflow.execute_child_workflow(
89+
SlidingWindowWorkflow.run,
90+
child_input,
91+
id=child_id,
92+
id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE,
93+
)
94+
tasks.append(task)
95+
offset += partitions[i]
96+
97+
# Wait for all child workflows to complete
98+
results = await asyncio.gather(*tasks)
99+
return sum(results)
100+
101+
def _divide_into_partitions(self, number: int, n: int) -> List[int]:
102+
"""Divide a number into n partitions as evenly as possible."""
103+
base = number // n
104+
remainder = number % n
105+
partitions = [base] * n
106+
107+
for i in range(remainder):
108+
partitions[i] += 1
109+
110+
return partitions
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from dataclasses import dataclass
2+
from typing import List
3+
4+
from temporalio import activity
5+
6+
7+
@dataclass
8+
class GetRecordsInput:
9+
"""Input for the GetRecords activity."""
10+
11+
page_size: int
12+
offset: int
13+
max_offset: int
14+
15+
16+
@dataclass
17+
class SingleRecord:
18+
"""Represents a single record to be processed."""
19+
20+
id: int
21+
22+
23+
@dataclass
24+
class GetRecordsOutput:
25+
"""Output from the GetRecords activity."""
26+
27+
records: List[SingleRecord]
28+
29+
30+
class RecordLoader:
31+
"""Activities for loading records from an external data source."""
32+
33+
def __init__(self, record_count: int):
34+
self.record_count = record_count
35+
36+
@activity.defn
37+
async def get_record_count(self) -> int:
38+
"""Get the total record count.
39+
40+
Used to partition processing across parallel sliding windows.
41+
The sample implementation just returns a fake value passed during worker initialization.
42+
"""
43+
return self.record_count
44+
45+
@activity.defn
46+
async def get_records(self, input: GetRecordsInput) -> GetRecordsOutput:
47+
"""Get records loaded from an external data source.
48+
49+
The sample returns fake records.
50+
"""
51+
if input.max_offset > self.record_count:
52+
raise ValueError(
53+
f"max_offset({input.max_offset}) > record_count({self.record_count})"
54+
)
55+
56+
limit = min(input.offset + input.page_size, input.max_offset)
57+
records = [SingleRecord(id=i) for i in range(input.offset, limit)]
58+
59+
return GetRecordsOutput(records=records)
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import asyncio
2+
import random
3+
4+
from temporalio import workflow
5+
6+
from batch_sliding_window.record_loader_activity import SingleRecord
7+
8+
9+
@workflow.defn
10+
class RecordProcessorWorkflow:
11+
"""Workflow that implements processing of a single record."""
12+
13+
@workflow.run
14+
async def run(self, record: SingleRecord) -> None:
15+
await self._process_record(record)
16+
17+
# Notify parent about completion via signal
18+
parent = workflow.info().parent
19+
20+
# This workflow is always expected to have a parent.
21+
# But for unit testing it might be useful to skip the notification if there is none.
22+
if parent:
23+
# Don't specify run_id as parent calls continue-as-new
24+
handle = workflow.get_external_workflow_handle(parent.workflow_id)
25+
await handle.signal("report_completion", record.id)
26+
27+
async def _process_record(self, record: SingleRecord) -> None:
28+
"""Simulate application specific record processing."""
29+
# Use workflow.random() to get a random number to ensure workflow determinism
30+
sleep_duration = workflow.random().randint(1, 10)
31+
await workflow.sleep(sleep_duration)
32+
33+
workflow.logger.info(f"Processed record {record}")

0 commit comments

Comments
 (0)