1+ from pathlib import Path
2+ import random
3+ import subprocess
4+ import os
5+ import json
6+ import shutil
7+
8+ from temporalio import activity
9+ from data_pipeline .dataobjects import YourParams , DataPipelineParams
10+
11+
12+ @activity .defn
13+ async def your_activity (input : YourParams ) -> str :
14+ return f"{ input .greeting } , { input .name } !"
15+
16+ @activity .defn
17+ async def validate (input : DataPipelineParams ) -> bool :
18+ if (input .validation == "blue" ):
19+ return False
20+ else :
21+ return True
22+
23+ @activity .defn
24+ async def extract (input : DataPipelineParams ) -> str :
25+ initialize (input .foldername )
26+
27+ shutil .copy (input .foldername + "/source/" + input .input_filename , input .foldername + "/working/" + input .input_filename )
28+ activity .heartbeat (input .input_filename )
29+
30+ return "success"
31+
32+ @activity .defn
33+ async def transform (input : DataPipelineParams ) -> str :
34+ namespaces = get_namespaces (input .foldername , input .input_filename )
35+ workingfilename = input .foldername + "/working/" + Path (input .input_filename ).stem + ".csv"
36+ namespacesCSVFile = open (workingfilename , "w+" )
37+ namespacesCSVFile .write (f"Namespace,\n " )
38+ for i in namespaces :
39+ namespacesCSVFile .write (f"{ i } ,\n " )
40+
41+ namespacesCSVFile .close ()
42+
43+ activity .heartbeat (input .input_filename )
44+
45+ return "success"
46+
47+ @activity .defn
48+ async def load (input : DataPipelineParams ) -> str :
49+
50+ shutil .copy (input .foldername + "/working/" + Path (input .input_filename ).stem + ".csv" , input .foldername + "/output/" + Path (input .input_filename ).stem + ".csv" )
51+ activity .heartbeat (input .input_filename )
52+
53+ cleanup (input .foldername )
54+
55+ return "success"
56+
57+ # this activity simulates polling for demo purposes
58+ # see https://community.temporal.io/t/what-is-the-best-practice-for-a-polling-activity/328/2
59+ # it throws an exception 90% of the time (simulating "not found")
60+ # 10% of the time it simulates "found" and returns
61+ @activity .defn
62+ async def poll (input : DataPipelineParams ) -> str :
63+ if random .randint (1 , 10 ) > 9 :
64+ return "polled successfully: found"
65+ raise Exception ("Simulate polled: not found" )
66+
67+
68+
69+
70+ def initialize (datafolder : str ):
71+ if (os .path .isfile (datafolder + "/working/" + "info.json" )):
72+ os .remove (datafolder + "/working/" + "info.json" )
73+ if (os .path .isfile (datafolder + "/working/" + "info.csv" )):
74+ os .remove (datafolder + "/working/" + "info.csv" )
75+ if (os .path .isfile (datafolder + "/output/" + "info.csv" )):
76+ os .remove (datafolder + "/output/" + "info.csv" )
77+
78+ def cleanup (datafolder : str ):
79+ if (os .path .isfile (datafolder + "/working/" + "info.json" )):
80+ os .remove (datafolder + "/working/" + "info.json" )
81+ if (os .path .isfile (datafolder + "/working/" + "info.csv" )):
82+ os .remove (datafolder + "/working/" + "info.csv" )
83+
84+
85+ def get_namespaces (datafolder : str , filename : str ):
86+ namespaces = []
87+
88+ namespacesJSONFile = open (datafolder + "/working/" + filename , "r" )
89+ namespacesDict = json .load (namespacesJSONFile )
90+ namespacesJSONFile .close ()
91+
92+ for i in namespacesDict ['namespaces' ]:
93+ namespaces .append (i )
94+ return namespaces
0 commit comments