forked from singer-io/singer-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtransform.py
More file actions
146 lines (122 loc) · 5.65 KB
/
transform.py
File metadata and controls
146 lines (122 loc) · 5.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import datetime
import pendulum
from singer import utils
# pylint: disable=line-too-long
def _transform_object(data, prop_schema, integer_datetime_fmt, path, error_paths):
result = {}
successes = []
for key, value in data.items():
if key in prop_schema:
success, subdata, _, error_paths = transform_recur(value, prop_schema[key], integer_datetime_fmt, path + [key], error_paths)
successes.append(success)
result[key] = subdata
return all(successes), result, path, error_paths
def _transform_array(data, item_schema, integer_datetime_fmt, path, error_paths):
result = []
successes = []
for i, row in enumerate(data):
success, subdata, _, error_paths = transform_recur(row, item_schema, integer_datetime_fmt, path + [i], error_paths)
successes.append(success)
result.append(subdata)
return all(successes), result, path, error_paths
def unix_milliseconds_to_datetime(value):
return utils.strftime(datetime.datetime.utcfromtimestamp(int(value) * 0.001))
def unix_seconds_to_datetime(value):
return utils.strftime(datetime.datetime.utcfromtimestamp(int(value)))
def string_to_datetime(value):
return utils.strftime(pendulum.parse(value))
def _transform_datetime(value, integer_datetime_fmt):
if integer_datetime_fmt not in [NO_INTEGER_DATETIME_PARSING,
UNIX_SECONDS_INTEGER_DATETIME_PARSING,
UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING]:
raise Exception("Invalid integer datetime parsing option")
if integer_datetime_fmt == NO_INTEGER_DATETIME_PARSING:
return string_to_datetime(value)
else:
try:
if integer_datetime_fmt == UNIX_SECONDS_INTEGER_DATETIME_PARSING:
return unix_seconds_to_datetime(value)
elif integer_datetime_fmt == UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING:
return unix_milliseconds_to_datetime(value)
except:
return string_to_datetime(value)
NO_INTEGER_DATETIME_PARSING = "no-integer-datetime-parsing"
UNIX_SECONDS_INTEGER_DATETIME_PARSING = "unix-seconds-integer-datetime-parsing"
UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING = "unix-milliseconds-integer-datetime-parsing"
def _transform(data, typ, schema, integer_datetime_fmt, path, error_paths):
if typ == "null":
if data is None or data == "":
return True, None, path, error_paths
else:
return False, None, path, error_paths + [path]
elif schema.get("format") == "date-time":
return True, _transform_datetime(data, integer_datetime_fmt), path, error_paths
elif typ == "object":
return _transform_object(data, schema["properties"], integer_datetime_fmt, path, error_paths)
elif typ == "array":
return _transform_array(data, schema["items"], integer_datetime_fmt, path, error_paths)
elif typ == "string":
if data != None:
return True, str(data), path, error_paths
else:
return False, None, path, error_paths + [path]
elif typ == "integer":
if isinstance(data, str):
data = data.replace(',', '')
return True, int(data), path, error_paths
elif typ == "number":
if isinstance(data, str):
data = data.replace(',', '')
return True, float(data), path, error_paths
elif typ == "boolean":
return True, bool(data), path, error_paths
else:
return False, None, path, error_paths + [path]
def transform(data, schema, integer_datetime_fmt=NO_INTEGER_DATETIME_PARSING):
"""
Applies schema (and integer_datetime_fmt, if supplied) to data, transforming
each field in data to the type specified in schema. If no type matches a
data field, this throws an Exception.
This applies types in order with the exception of 'null', which is always
applied last.
The valid types are: integer, number, boolean, array, object, null, string,
and string with date-time format.
If an integer_datetime_fmt is supplied, integer values in fields with date-
time formats are appropriately parsed as unix seconds or unix milliseconds.
"""
success, transformed_data, _, error_paths = transform_recur(data, schema, integer_datetime_fmt, [], [])
if success:
return transformed_data
else:
raise Exception("Errors at paths {} in data {} for schema {}".format(error_paths, data, schema))
def transform_recur(data, schema, integer_datetime_fmt, path, error_paths):
"""
This function (and several of its helper functions) returns a tuple:
(success, data, path, error_paths)
success is a boolean flag indicating whether data was successfully transformed with schema
data is the transformed data
path is the current path in the tree traversal of the data and schema
error_paths is a list of paths where the data could not be transformed according to the schema
"""
types = schema["type"]
if not isinstance(types, list):
types = [types]
if "null" in types:
types.remove("null")
types.append("null")
type_length = len(types)
for i, typ in enumerate(types):
try:
success, data, path, error_paths = _transform(data, typ, schema, integer_datetime_fmt, path, error_paths)
if success:
return success, data, path, error_paths
else:
if i == (type_length - 1):
return False, None, path, error_paths
else:
pass
except:
if i == (type_length - 1):
return False, None, path, error_paths + [path]
else:
pass