diff --git a/data_pipeline/README.md b/data_pipeline/README.md new file mode 100644 index 00000000..30088478 --- /dev/null +++ b/data_pipeline/README.md @@ -0,0 +1,18 @@ +# Data Pipeline example + +These samples show how one might create a data pipeline. + +To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to run the `data_pipeline/` sample: + + poetry run python run_worker.py + poetry run python start_data_pipeline.py + +In separate terminals + +You can also demonstrate it waiting for a signal with: + + poetry run python run_worker.py + poetry run python start_data_pipeline_wait_signal.py + poetry run python signal_load_complete.py + +All in separate terminals. \ No newline at end of file diff --git a/data_pipeline/activities.py b/data_pipeline/activities.py new file mode 100644 index 00000000..8d46dea4 --- /dev/null +++ b/data_pipeline/activities.py @@ -0,0 +1,90 @@ +from pathlib import Path +import random +import subprocess +import os +import json +import shutil + +from temporalio import activity +from data_pipeline.dataobjects import YourParams, DataPipelineParams + + +@activity.defn +async def validate(input: DataPipelineParams) -> bool: + if(input.validation == "blue"): + return False + else: + return True + +@activity.defn +async def extract(input: DataPipelineParams) -> str: + initialize(input.foldername) + + shutil.copy(input.foldername + "/source/" + input.input_filename, input.foldername + "/working/" + input.input_filename) + activity.heartbeat(input.input_filename) + + return "success" + +@activity.defn +async def transform(input: DataPipelineParams) -> str: + namespaces = get_namespaces(input.foldername, input.input_filename) + workingfilename = input.foldername + "/working/" + Path(input.input_filename).stem + ".csv" + namespacesCSVFile = open(workingfilename, "w+") + namespacesCSVFile.write(f"Namespace,\n") + for i in namespaces: + namespacesCSVFile.write(f"{i},\n") + + namespacesCSVFile.close() + + activity.heartbeat(input.input_filename) + + return "success" + +@activity.defn +async def load(input: DataPipelineParams) -> str: + + shutil.copy(input.foldername + "/working/" + Path(input.input_filename).stem + ".csv", input.foldername + "/output/" + Path(input.input_filename).stem + ".csv") + activity.heartbeat(input.input_filename) + + cleanup(input.foldername) + + return "success" + +# this activity simulates polling for demo purposes +# see https://community.temporal.io/t/what-is-the-best-practice-for-a-polling-activity/328/2 +# it throws an exception 90% of the time (simulating "not found") +# 10% of the time it simulates "found" and returns +@activity.defn +async def poll(input: DataPipelineParams) -> str: + if random.randint(1, 10) > 9 : + return "polled successfully: found" + raise Exception("Simulate polled: not found") + + + + +def initialize(datafolder: str): + if(os.path.isfile(datafolder + "/working/" + "info.json")): + os.remove(datafolder + "/working/" + "info.json") + if(os.path.isfile(datafolder + "/working/" + "info.csv")): + os.remove(datafolder + "/working/" + "info.csv") + if(os.path.isfile(datafolder + "/output/" + "info.csv")): + os.remove(datafolder + "/output/" + "info.csv") + +def cleanup(datafolder: str): + if(os.path.isfile(datafolder + "/working/" + "info.json")): + os.remove(datafolder + "/working/" + "info.json") + if(os.path.isfile(datafolder + "/working/" + "info.csv")): + os.remove(datafolder + "/working/" + "info.csv") + + +def get_namespaces(datafolder: str, filename: str): + namespaces = [] + + namespacesJSONFile = open(datafolder + "/working/" + filename, "r") + namespacesDict = json.load(namespacesJSONFile) + namespacesJSONFile.close() + + for i in namespacesDict['namespaces']: + namespaces.append(i) + return namespaces \ No newline at end of file diff --git a/data_pipeline/data_pipeline_workflows.py b/data_pipeline/data_pipeline_workflows.py new file mode 100644 index 00000000..2ac48d2d --- /dev/null +++ b/data_pipeline/data_pipeline_workflows.py @@ -0,0 +1,72 @@ +import logging +import asyncio +from datetime import timedelta + +from temporalio import workflow + +import temporalio +from temporalio.common import RetryPolicy + +with workflow.unsafe.imports_passed_through(): + from data_pipeline.activities import extract, validate, transform, load, poll + from data_pipeline.dataobjects import DataPipelineParams + + + +@workflow.defn +class DataPipelineWorkflow: + + def __init__(self) -> None: + self.load_complete = False + + @workflow.run + async def run(self, input: DataPipelineParams) -> str: + logging.info(f"The data pipeline for {input} beginning.") + # [ ] multiple files for the heartbeats? + + validation = await workflow.execute_activity( + validate, input, start_to_close_timeout=timedelta(seconds=300), heartbeat_timeout=timedelta(seconds=20) + ) + if validation == False: + logging.info(f"Validation rejected for: {input.input_filename}") + return "invalidated" + + + activity_output = await workflow.execute_activity( + extract, input, start_to_close_timeout=timedelta(seconds=300), heartbeat_timeout=timedelta(seconds=20) + ) + logging.info(f"Extract status: {input.input_filename}: {activity_output}") + + + activity_output = await workflow.execute_activity( + transform, input, start_to_close_timeout=timedelta(seconds=300), heartbeat_timeout=timedelta(seconds=20) + ) + logging.info(f"Transform status: {input.input_filename}: {activity_output}") + + activity_output = await workflow.execute_activity( + load, input, start_to_close_timeout=timedelta(seconds=300), heartbeat_timeout=timedelta(seconds=20) + ) + logging.info(f"Load status: {input.input_filename}: {activity_output}") + + if input.poll_or_wait == "poll": + # it's ok if this activity fails: it is polling every 2 seconds + # see https://community.temporal.io/t/what-is-the-best-practice-for-a-polling-activity/328/2 + activity_output = await workflow.execute_activity( + poll, input, start_to_close_timeout=timedelta(seconds=3000), heartbeat_timeout=timedelta(seconds=20), + retry_policy=RetryPolicy(initial_interval=timedelta(seconds=2), backoff_coefficient=1) + + ) + logging.info(f"Poll status: {input.input_filename}: {activity_output}") + else: + try: + await workflow.wait_condition(lambda: self.load_complete, timeout=45) + logging.info(f"Received signal that load completed: {input.input_filename} load complete: {self.load_complete}") + except asyncio.TimeoutError: + # could return "Load did not complete before timeout." + raise temporalio.exceptions.ApplicationError("Load did not complete before timeout") + + return f"Successfully processed: {input.input_filename}!" + + @workflow.signal + async def load_complete(self, approval: str) -> None: + self.load_complete = True \ No newline at end of file diff --git a/data_pipeline/dataobjects.py b/data_pipeline/dataobjects.py new file mode 100644 index 00000000..35e8e3bd --- /dev/null +++ b/data_pipeline/dataobjects.py @@ -0,0 +1,15 @@ +from dataclasses import dataclass + + +@dataclass +class YourParams: + greeting: str + name: str + + +@dataclass +class DataPipelineParams: + input_filename: str + poll_or_wait: str + foldername: str + validation: str diff --git a/data_pipeline/demodata/output/info.csv b/data_pipeline/demodata/output/info.csv new file mode 100644 index 00000000..b4d74129 --- /dev/null +++ b/data_pipeline/demodata/output/info.csv @@ -0,0 +1,9 @@ +Namespace, +aa123-namespace.a123x, +aa-mrn-test.a123x, +aa-mrn-test1.a123x, +aa-mrn-test123.a123x, +aa-mr-test.a123x, +aa-test-1.a123x, +aa-test-draftenv.a123x, +aa-test-mr-fun.a123x, diff --git a/data_pipeline/demodata/source/info.json b/data_pipeline/demodata/source/info.json new file mode 100644 index 00000000..2346154c --- /dev/null +++ b/data_pipeline/demodata/source/info.json @@ -0,0 +1,13 @@ +{ + "namespaces": [ + "aa123-namespace.a123x", + "aa-mrn-test.a123x", + "aa-mrn-test1.a123x", + "aa-mrn-test123.a123x", + "aa-mr-test.a123x", + "aa-test-1.a123x", + "aa-test-draftenv.a123x", + "aa-test-mr-fun.a123x" + ], + "nextPageToken": "" +} diff --git a/data_pipeline/run_worker.py b/data_pipeline/run_worker.py new file mode 100644 index 00000000..41302324 --- /dev/null +++ b/data_pipeline/run_worker.py @@ -0,0 +1,21 @@ +import asyncio + +from temporalio.client import Client +from temporalio.worker import Worker +from activities import extract, validate, transform, load, poll +from data_pipeline.data_pipeline_workflows import DataPipelineWorkflow + + +async def main(): + client = await Client.connect("localhost:7233") + worker = Worker( + client, + task_queue="data-pipeline-task-queue", + workflows=[DataPipelineWorkflow], + activities=[extract, validate, transform, load, poll], + ) + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/data_pipeline/signal_load_complete.py b/data_pipeline/signal_load_complete.py new file mode 100644 index 00000000..4a251754 --- /dev/null +++ b/data_pipeline/signal_load_complete.py @@ -0,0 +1,29 @@ +import asyncio +from datetime import timedelta + +from temporalio.client import ( + Client, + Schedule, + ScheduleActionStartWorkflow, + ScheduleIntervalSpec, + ScheduleSpec, + ScheduleState, +) +from data_pipeline.data_pipeline_workflows import DataPipelineWorkflow + +from data_pipeline.dataobjects import DataPipelineParams + + +async def main(): + client = await Client.connect("localhost:7233") + + handle = client.get_workflow_handle(workflow_id="datapipe-workflow") + + await handle.signal(DataPipelineWorkflow.load_complete, "complete") + # Wait and return result + result = await handle.result() + print(f"Result: {result}") + return result + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/data_pipeline/start_data_pipeline.py b/data_pipeline/start_data_pipeline.py new file mode 100644 index 00000000..b61b1f4c --- /dev/null +++ b/data_pipeline/start_data_pipeline.py @@ -0,0 +1,30 @@ +import asyncio +from datetime import timedelta + +from temporalio.client import ( + Client, + Schedule, + ScheduleActionStartWorkflow, + ScheduleIntervalSpec, + ScheduleSpec, + ScheduleState, +) +from data_pipeline.data_pipeline_workflows import DataPipelineWorkflow + +from data_pipeline.dataobjects import DataPipelineParams + + +async def main(): + client = await Client.connect("localhost:7233") + datapipelineinput = DataPipelineParams(input_filename="info.json", + foldername="./demodata", + poll_or_wait="poll", + validation="orange") + result = await client.execute_workflow( + DataPipelineWorkflow.run, datapipelineinput, id=f"datapipe-workflow", task_queue="data-pipeline-task-queue" + ) + + print(f"Result: {result}") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/data_pipeline/start_data_pipeline_wait_signal.py b/data_pipeline/start_data_pipeline_wait_signal.py new file mode 100644 index 00000000..fd436d2c --- /dev/null +++ b/data_pipeline/start_data_pipeline_wait_signal.py @@ -0,0 +1,30 @@ +import asyncio +from datetime import timedelta + +from temporalio.client import ( + Client, + Schedule, + ScheduleActionStartWorkflow, + ScheduleIntervalSpec, + ScheduleSpec, + ScheduleState, +) +from data_pipeline.data_pipeline_workflows import DataPipelineWorkflow + +from data_pipeline.dataobjects import DataPipelineParams + + +async def main(): + client = await Client.connect("localhost:7233") + datapipelineinput = DataPipelineParams(input_filename="info.json", + foldername="./demodata", + poll_or_wait="wait", + validation="orange") + result = await client.execute_workflow( + DataPipelineWorkflow.run, datapipelineinput, id=f"datapipe-workflow", task_queue="data-pipeline-task-queue" + ) + + print(f"Result: {result}") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/schedules/list_schedule.py b/schedules/list_schedule.py index a863aeee..9e2b2b33 100644 --- a/schedules/list_schedule.py +++ b/schedules/list_schedule.py @@ -6,6 +6,7 @@ async def main() -> None: client = await Client.connect("localhost:7233") + client.list_schedules() async for schedule in await client.list_schedules(): print(f"List Schedule Info: {schedule.info}.") diff --git a/schedules/start_schedule_daily.py b/schedules/start_schedule_daily.py new file mode 100644 index 00000000..ffd16628 --- /dev/null +++ b/schedules/start_schedule_daily.py @@ -0,0 +1,35 @@ +import asyncio +from datetime import timedelta + +from temporalio.client import ( + Client, + Schedule, + ScheduleActionStartWorkflow, + ScheduleIntervalSpec, + ScheduleSpec, + ScheduleState, +) +from your_workflows import YourSchedulesWorkflow + + +async def main(): + client = await Client.connect("localhost:7233") + await client.create_schedule( + "workflow-schedule-id-daily", + Schedule( + action=ScheduleActionStartWorkflow( + YourSchedulesWorkflow.run, + "my schedule arg", + id="schedules-workflow-id", + task_queue="schedules-task-queue", + ), + spec=ScheduleSpec( + intervals=[ScheduleIntervalSpec(every=timedelta(days=1))] + ), + state=ScheduleState(note="Here's a note on my Schedule."), + ), + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/schedules/start_schedule_first_of_month.py b/schedules/start_schedule_first_of_month.py new file mode 100644 index 00000000..0ce00e42 --- /dev/null +++ b/schedules/start_schedule_first_of_month.py @@ -0,0 +1,37 @@ +import asyncio +from datetime import timedelta + +from temporalio.client import ( + Client, + Schedule, + ScheduleActionStartWorkflow, + ScheduleIntervalSpec, + ScheduleCalendarSpec, + ScheduleRange, + ScheduleSpec, + ScheduleState, +) +from your_workflows import YourSchedulesWorkflow + + +async def main(): + client = await Client.connect("localhost:7233") + await client.create_schedule( + "workflow-schedule-id-first-of-month", + Schedule( + action=ScheduleActionStartWorkflow( + YourSchedulesWorkflow.run, + "my schedule arg", + id="schedules-workflow-id", + task_queue="schedules-task-queue", + ), + spec=ScheduleSpec( + calendars=[ScheduleCalendarSpec(day_of_month=(ScheduleRange(1,),))] + ), + state=ScheduleState(note="Here's a note on my Schedule."), + ), + ) + + +if __name__ == "__main__": + asyncio.run(main())