forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbatch_workflow.py
More file actions
110 lines (88 loc) · 3.56 KB
/
batch_workflow.py
File metadata and controls
110 lines (88 loc) · 3.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import asyncio
from dataclasses import dataclass
from datetime import timedelta
from typing import List
from temporalio import workflow
from temporalio.common import WorkflowIDReusePolicy
from temporalio.exceptions import ApplicationError
from batch_sliding_window.record_loader_activity import RecordLoader
from batch_sliding_window.sliding_window_workflow import (
SlidingWindowWorkflow,
SlidingWindowWorkflowInput,
)
@dataclass
class ProcessBatchWorkflowInput:
"""Input for the ProcessBatchWorkflow.
A single input structure is preferred to multiple workflow arguments
to simplify backward compatible API changes.
"""
page_size: int # Number of children started by a single sliding window workflow run
sliding_window_size: int # Maximum number of children to run in parallel
partitions: int # How many sliding windows to run in parallel
@workflow.defn
class ProcessBatchWorkflow:
"""Sample workflow that partitions the data set into continuous ranges.
A real application can choose any other way to divide the records
into multiple collections.
"""
@workflow.run
async def run(self, input: ProcessBatchWorkflowInput) -> int:
# Get total record count
record_count: int = await workflow.execute_activity_method(
RecordLoader.get_record_count,
start_to_close_timeout=timedelta(seconds=5),
)
if input.sliding_window_size < input.partitions:
raise ApplicationError(
"SlidingWindowSize cannot be less than number of partitions"
)
partitions = self._divide_into_partitions(record_count, input.partitions)
window_sizes = self._divide_into_partitions(
input.sliding_window_size, input.partitions
)
workflow.logger.info(
f"ProcessBatchWorkflow started",
extra={
"input": input,
"record_count": record_count,
"partitions": partitions,
"window_sizes": window_sizes,
},
)
# Start child workflows for each partition
tasks = []
offset = 0
for i in range(input.partitions):
# Make child id more user-friendly
child_id = f"{workflow.info().workflow_id}/{i}"
# Define partition boundaries
maximum_partition_offset = offset + partitions[i]
if maximum_partition_offset > record_count:
maximum_partition_offset = record_count
child_input = SlidingWindowWorkflowInput(
page_size=input.page_size,
sliding_window_size=window_sizes[i],
offset=offset, # inclusive
maximum_offset=maximum_partition_offset, # exclusive
progress=0,
current_records=None,
)
task = workflow.execute_child_workflow(
SlidingWindowWorkflow.run,
child_input,
id=child_id,
id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE,
)
tasks.append(task)
offset += partitions[i]
# Wait for all child workflows to complete
results = await asyncio.gather(*tasks)
return sum(results)
def _divide_into_partitions(self, number: int, n: int) -> List[int]:
"""Divide a number into n partitions as evenly as possible."""
base = number // n
remainder = number % n
partitions = [base] * n
for i in range(remainder):
partitions[i] += 1
return partitions