Skip to content

Commit 3e5aba0

Browse files
committed
adding an example data pipeline
1 parent 3ee5ef7 commit 3e5aba0

8 files changed

Lines changed: 256 additions & 0 deletions

File tree

data_pipeline/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Data Pipeline example
2+
3+
These samples show how one might create a data pipeline.
4+
5+
To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to run the `data_pipeline/` sample:
6+
7+
poetry run python run_worker.py
8+
poetry run python start_data_pipeline.py
9+

data_pipeline/activities.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
from pathlib import Path
2+
import random
3+
import subprocess
4+
import os
5+
import json
6+
import shutil
7+
8+
from temporalio import activity
9+
from data_pipeline.dataobjects import YourParams, DataPipelineParams
10+
11+
12+
@activity.defn
13+
async def your_activity(input: YourParams) -> str:
14+
return f"{input.greeting}, {input.name}!"
15+
16+
@activity.defn
17+
async def validate(input: DataPipelineParams) -> bool:
18+
if(input.validation == "blue"):
19+
return False
20+
else:
21+
return True
22+
23+
@activity.defn
24+
async def extract(input: DataPipelineParams) -> str:
25+
initialize(input.foldername)
26+
27+
shutil.copy(input.foldername + "/source/" + input.input_filename, input.foldername + "/working/" + input.input_filename)
28+
activity.heartbeat(input.input_filename)
29+
30+
return "success"
31+
32+
@activity.defn
33+
async def transform(input: DataPipelineParams) -> str:
34+
namespaces = get_namespaces(input.foldername, input.input_filename)
35+
workingfilename = input.foldername + "/working/" + Path(input.input_filename).stem + ".csv"
36+
namespacesCSVFile = open(workingfilename, "w+")
37+
namespacesCSVFile.write(f"Namespace,\n")
38+
for i in namespaces:
39+
namespacesCSVFile.write(f"{i},\n")
40+
41+
namespacesCSVFile.close()
42+
43+
activity.heartbeat(input.input_filename)
44+
45+
return "success"
46+
47+
@activity.defn
48+
async def load(input: DataPipelineParams) -> str:
49+
50+
shutil.copy(input.foldername + "/working/" + Path(input.input_filename).stem + ".csv", input.foldername + "/output/" + Path(input.input_filename).stem + ".csv")
51+
activity.heartbeat(input.input_filename)
52+
53+
cleanup(input.foldername)
54+
55+
return "success"
56+
57+
# this activity simulates polling for demo purposes
58+
# see https://community.temporal.io/t/what-is-the-best-practice-for-a-polling-activity/328/2
59+
# it throws an exception 90% of the time (simulating "not found")
60+
# 10% of the time it simulates "found" and returns
61+
@activity.defn
62+
async def poll(input: DataPipelineParams) -> str:
63+
if random.randint(1, 10) > 9 :
64+
return "polled successfully: found"
65+
raise Exception("Simulate polled: not found")
66+
67+
68+
69+
70+
def initialize(datafolder: str):
71+
if(os.path.isfile(datafolder + "/working/" + "info.json")):
72+
os.remove(datafolder + "/working/" + "info.json")
73+
if(os.path.isfile(datafolder + "/working/" + "info.csv")):
74+
os.remove(datafolder + "/working/" + "info.csv")
75+
if(os.path.isfile(datafolder + "/output/" + "info.csv")):
76+
os.remove(datafolder + "/output/" + "info.csv")
77+
78+
def cleanup(datafolder: str):
79+
if(os.path.isfile(datafolder + "/working/" + "info.json")):
80+
os.remove(datafolder + "/working/" + "info.json")
81+
if(os.path.isfile(datafolder + "/working/" + "info.csv")):
82+
os.remove(datafolder + "/working/" + "info.csv")
83+
84+
85+
def get_namespaces(datafolder: str, filename: str):
86+
namespaces = []
87+
88+
namespacesJSONFile = open(datafolder + "/working/" + filename, "r")
89+
namespacesDict = json.load(namespacesJSONFile)
90+
namespacesJSONFile.close()
91+
92+
for i in namespacesDict['namespaces']:
93+
namespaces.append(i)
94+
return namespaces
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import logging
2+
from datetime import timedelta
3+
4+
from temporalio import workflow
5+
6+
from temporalio.common import RetryPolicy
7+
8+
with workflow.unsafe.imports_passed_through():
9+
from data_pipeline.activities import your_activity, extract, validate, transform, load, poll
10+
from data_pipeline.dataobjects import YourParams, DataPipelineParams
11+
12+
13+
@workflow.defn
14+
class YourSchedulesWorkflow:
15+
@workflow.run
16+
async def run(self, name: str) -> str:
17+
return await workflow.execute_activity(
18+
your_activity,
19+
YourParams("Hello", name),
20+
start_to_close_timeout=timedelta(seconds=10),
21+
)
22+
23+
@workflow.defn
24+
class DataPipelineWorkflow:
25+
@workflow.run
26+
async def run(self, input: DataPipelineParams) -> str:
27+
logging.info(f"The data pipeline for {input} beginning.")
28+
# [ ] multiple files for the heartbeats?
29+
# [ ] wait for signal version
30+
31+
validation = await workflow.execute_activity(
32+
validate, input, start_to_close_timeout=timedelta(seconds=300), heartbeat_timeout=timedelta(seconds=20)
33+
)
34+
if validation == False:
35+
logging.info(f"Validation rejected for: {input.input_filename}")
36+
return "invalidated"
37+
38+
39+
activity_output = await workflow.execute_activity(
40+
extract, input, start_to_close_timeout=timedelta(seconds=300), heartbeat_timeout=timedelta(seconds=20)
41+
)
42+
logging.info(f"Extract status: {input.input_filename}: {activity_output}")
43+
44+
45+
activity_output = await workflow.execute_activity(
46+
transform, input, start_to_close_timeout=timedelta(seconds=300), heartbeat_timeout=timedelta(seconds=20)
47+
)
48+
logging.info(f"Transform status: {input.input_filename}: {activity_output}")
49+
50+
activity_output = await workflow.execute_activity(
51+
load, input, start_to_close_timeout=timedelta(seconds=300), heartbeat_timeout=timedelta(seconds=20)
52+
)
53+
logging.info(f"Load status: {input.input_filename}: {activity_output}")
54+
55+
# it's ok if this activity fails: it is polling every 2 seconds
56+
# see https://community.temporal.io/t/what-is-the-best-practice-for-a-polling-activity/328/2
57+
activity_output = await workflow.execute_activity(
58+
poll, input, start_to_close_timeout=timedelta(seconds=3000), heartbeat_timeout=timedelta(seconds=20),
59+
retry_policy=RetryPolicy(initial_interval=timedelta(seconds=2), backoff_coefficient=1)
60+
61+
)
62+
logging.info(f"Poll status: {input.input_filename}: {activity_output}")
63+
64+
65+
return f"Successfully processed: {input.input_filename}!"

data_pipeline/dataobjects.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from dataclasses import dataclass
2+
3+
4+
@dataclass
5+
class YourParams:
6+
greeting: str
7+
name: str
8+
9+
10+
@dataclass
11+
class DataPipelineParams:
12+
input_filename: str
13+
poll_or_wait: str
14+
foldername: str
15+
validation: str
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
Namespace,
2+
aa123-namespace.a123x,
3+
aa-mrn-test.a123x,
4+
aa-mrn-test1.a123x,
5+
aa-mrn-test123.a123x,
6+
aa-mr-test.a123x,
7+
aa-test-1.a123x,
8+
aa-test-draftenv.a123x,
9+
aa-test-mr-fun.a123x,
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
{
2+
"namespaces": [
3+
"aa123-namespace.a123x",
4+
"aa-mrn-test.a123x",
5+
"aa-mrn-test1.a123x",
6+
"aa-mrn-test123.a123x",
7+
"aa-mr-test.a123x",
8+
"aa-test-1.a123x",
9+
"aa-test-draftenv.a123x",
10+
"aa-test-mr-fun.a123x"
11+
],
12+
"nextPageToken": ""
13+
}

data_pipeline/run_worker.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import asyncio
2+
3+
from temporalio.client import Client
4+
from temporalio.worker import Worker
5+
from activities import your_activity, extract, validate, transform, load, poll
6+
from data_pipeline.data_pipeline_workflows import YourSchedulesWorkflow, DataPipelineWorkflow
7+
8+
9+
async def main():
10+
client = await Client.connect("localhost:7233")
11+
worker = Worker(
12+
client,
13+
task_queue="data-pipeline-task-queue",
14+
workflows=[YourSchedulesWorkflow, DataPipelineWorkflow],
15+
activities=[your_activity, extract, validate, transform, load, poll],
16+
)
17+
await worker.run()
18+
19+
20+
if __name__ == "__main__":
21+
asyncio.run(main())
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import asyncio
2+
from datetime import timedelta
3+
4+
from temporalio.client import (
5+
Client,
6+
Schedule,
7+
ScheduleActionStartWorkflow,
8+
ScheduleIntervalSpec,
9+
ScheduleSpec,
10+
ScheduleState,
11+
)
12+
from data_pipeline.data_pipeline_workflows import YourSchedulesWorkflow, DataPipelineWorkflow
13+
14+
from data_pipeline.dataobjects import YourParams, DataPipelineParams
15+
16+
17+
async def main():
18+
client = await Client.connect("localhost:7233")
19+
datapipelineinput = DataPipelineParams(input_filename="info.json",
20+
foldername="./demodata",
21+
poll_or_wait="poll",
22+
validation="orange")
23+
result = await client.execute_workflow(
24+
DataPipelineWorkflow.run, datapipelineinput, id=f"datapipe-workflow", task_queue="data-pipeline-task-queue"
25+
)
26+
27+
print(f"Result: {result}")
28+
29+
if __name__ == "__main__":
30+
asyncio.run(main())

0 commit comments

Comments
 (0)