forked from singer-io/singer-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcatalog.py
More file actions
111 lines (92 loc) · 3.76 KB
/
catalog.py
File metadata and controls
111 lines (92 loc) · 3.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
'''Provides an object model for a Singer Catalog.'''
import json
import sys
from singer.schema import Schema
# pylint: disable=too-many-instance-attributes
class CatalogEntry(object):
def __init__(self, tap_stream_id=None, stream=None,
key_properties=None, schema=None, replication_key=None,
is_view=None, database=None, table=None, row_count=None,
stream_alias=None, metadata=None):
self.tap_stream_id = tap_stream_id
self.stream = stream
self.key_properties = key_properties
self.schema = schema
self.replication_key = replication_key
self.is_view = is_view
self.database = database
self.table = table
self.row_count = row_count
self.stream_alias = stream_alias
self.metadata = metadata
def __str__(self):
return str(self.__dict__)
def __eq__(self, other):
return self.__dict__ == other.__dict__
def is_selected(self):
return self.schema.selected # pylint: disable=no-member
def to_dict(self):
result = {}
if self.tap_stream_id:
result['tap_stream_id'] = self.tap_stream_id
if self.database:
result['database_name'] = self.database
if self.table:
result['table_name'] = self.table
if self.replication_key is not None:
result['replication_key'] = self.replication_key
if self.key_properties is not None:
result['key_properties'] = self.key_properties
if self.schema is not None:
schema = self.schema.to_dict() # pylint: disable=no-member
result['schema'] = schema
if self.is_view is not None:
result['is_view'] = self.is_view
if self.stream is not None:
result['stream'] = self.stream
if self.row_count is not None:
result['row_count'] = self.row_count
if self.metadata is not None:
result['metadata'] = self.metadata
return result
class Catalog(object):
def __init__(self, streams):
self.streams = streams
def __str__(self):
return str(self.__dict__)
def __eq__(self, other):
return self.__dict__ == other.__dict__
@classmethod
def load(cls, filename):
with open(filename) as fp: # pylint: disable=invalid-name
return Catalog.from_dict(json.load(fp))
@classmethod
def from_dict(cls, data):
# TODO: We may want to store streams as a dict where the key is a
# tap_stream_id and the value is a CatalogEntry. This will allow
# faster lookup based on tap_stream_id. This would be a breaking
# change, since callers typically access the streams property
# directly.
streams = []
for stream in data['streams']:
entry = CatalogEntry()
entry.tap_stream_id = stream.get('tap_stream_id')
entry.stream = stream.get('stream')
entry.replication_key = stream.get('replication_key')
entry.key_properties = stream.get('key_properties')
entry.database = stream.get('database_name')
entry.table = stream.get('table_name')
entry.schema = Schema.from_dict(stream.get('schema'))
entry.is_view = stream.get('is_view')
entry.stream_alias = stream.get('stream_alias')
streams.append(entry)
return Catalog(streams)
def to_dict(self):
return {'streams': [stream.to_dict() for stream in self.streams]}
def dump(self):
json.dump(self.to_dict(), sys.stdout, indent=2)
def get_stream(self, tap_stream_id):
for stream in self.streams:
if stream.tap_stream_id == tap_stream_id:
return stream
return None