|
| 1 | +import asyncio |
| 2 | +from dataclasses import dataclass |
| 3 | +from datetime import timedelta |
| 4 | +from typing import List |
| 5 | + |
| 6 | +from temporalio import workflow |
| 7 | +from temporalio.common import WorkflowIDReusePolicy |
| 8 | +from temporalio.exceptions import ApplicationError |
| 9 | + |
| 10 | +from batch_sliding_window.record_loader_activity import RecordLoader |
| 11 | +from batch_sliding_window.sliding_window_workflow import ( |
| 12 | + SlidingWindowWorkflow, |
| 13 | + SlidingWindowWorkflowInput, |
| 14 | +) |
| 15 | + |
| 16 | + |
| 17 | +@dataclass |
| 18 | +class ProcessBatchWorkflowInput: |
| 19 | + """Input for the ProcessBatchWorkflow. |
| 20 | +
|
| 21 | + A single input structure is preferred to multiple workflow arguments |
| 22 | + to simplify backward compatible API changes. |
| 23 | + """ |
| 24 | + |
| 25 | + page_size: int # Number of children started by a single sliding window workflow run |
| 26 | + sliding_window_size: int # Maximum number of children to run in parallel |
| 27 | + partitions: int # How many sliding windows to run in parallel |
| 28 | + |
| 29 | + |
| 30 | +@workflow.defn |
| 31 | +class ProcessBatchWorkflow: |
| 32 | + """Sample workflow that partitions the data set into continuous ranges. |
| 33 | +
|
| 34 | + A real application can choose any other way to divide the records |
| 35 | + into multiple collections. |
| 36 | + """ |
| 37 | + |
| 38 | + @workflow.run |
| 39 | + async def run(self, input: ProcessBatchWorkflowInput) -> int: |
| 40 | + # Get total record count |
| 41 | + record_count: int = await workflow.execute_activity_method( |
| 42 | + RecordLoader.get_record_count, |
| 43 | + start_to_close_timeout=timedelta(seconds=5), |
| 44 | + ) |
| 45 | + |
| 46 | + if input.sliding_window_size < input.partitions: |
| 47 | + raise ApplicationError( |
| 48 | + "SlidingWindowSize cannot be less than number of partitions" |
| 49 | + ) |
| 50 | + |
| 51 | + partitions = self._divide_into_partitions(record_count, input.partitions) |
| 52 | + window_sizes = self._divide_into_partitions( |
| 53 | + input.sliding_window_size, input.partitions |
| 54 | + ) |
| 55 | + |
| 56 | + workflow.logger.info( |
| 57 | + f"ProcessBatchWorkflow started", |
| 58 | + extra={ |
| 59 | + "input": input, |
| 60 | + "record_count": record_count, |
| 61 | + "partitions": partitions, |
| 62 | + "window_sizes": window_sizes, |
| 63 | + }, |
| 64 | + ) |
| 65 | + |
| 66 | + # Start child workflows for each partition |
| 67 | + tasks = [] |
| 68 | + offset = 0 |
| 69 | + |
| 70 | + for i in range(input.partitions): |
| 71 | + # Make child id more user-friendly |
| 72 | + child_id = f"{workflow.info().workflow_id}/{i}" |
| 73 | + |
| 74 | + # Define partition boundaries |
| 75 | + maximum_partition_offset = offset + partitions[i] |
| 76 | + if maximum_partition_offset > record_count: |
| 77 | + maximum_partition_offset = record_count |
| 78 | + |
| 79 | + child_input = SlidingWindowWorkflowInput( |
| 80 | + page_size=input.page_size, |
| 81 | + sliding_window_size=window_sizes[i], |
| 82 | + offset=offset, # inclusive |
| 83 | + maximum_offset=maximum_partition_offset, # exclusive |
| 84 | + progress=0, |
| 85 | + current_records=None, |
| 86 | + ) |
| 87 | + |
| 88 | + task = workflow.execute_child_workflow( |
| 89 | + SlidingWindowWorkflow.run, |
| 90 | + child_input, |
| 91 | + id=child_id, |
| 92 | + id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE, |
| 93 | + ) |
| 94 | + tasks.append(task) |
| 95 | + offset += partitions[i] |
| 96 | + |
| 97 | + # Wait for all child workflows to complete |
| 98 | + results = await asyncio.gather(*tasks) |
| 99 | + return sum(results) |
| 100 | + |
| 101 | + def _divide_into_partitions(self, number: int, n: int) -> List[int]: |
| 102 | + """Divide a number into n partitions as evenly as possible.""" |
| 103 | + base = number // n |
| 104 | + remainder = number % n |
| 105 | + partitions = [base] * n |
| 106 | + |
| 107 | + for i in range(remainder): |
| 108 | + partitions[i] += 1 |
| 109 | + |
| 110 | + return partitions |
0 commit comments