Skip to content

Commit e0e5fc7

Browse files
authored
Add message representation (singer-io#6)
* Working on message classes * Use namedtuple for message classes * Add unit tests * Pep8 * Add circle.yml * Use vars * Don't use namedtuple * Don't use deprecated method * Pep8
1 parent 0c7d48f commit e0e5fc7

4 files changed

Lines changed: 169 additions & 12 deletions

File tree

circle.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
machine:
2+
python:
3+
version: 3.4.4
4+
5+

singer/__init__.py

Lines changed: 85 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,61 @@
44
import logging
55
import logging.config
66

7-
from collections import namedtuple
87

8+
class Message(object):
9+
def __init__(self, **kwargs):
10+
for k in self.attr_list:
11+
if k not in kwargs:
12+
raise ValueError("missing {}".format(k))
13+
setattr(self, k, kwargs[k])
914

10-
def _writeline(s):
11-
sys.stdout.write(s + '\n')
15+
def asdict(self):
16+
res = {k: getattr(self, k) for k in self.attr_list}
17+
res['type'] = self._type
18+
return res
19+
20+
def __eq__(self, other):
21+
return self.asdict() == other.asdict()
22+
23+
def __repr__(self):
24+
attrstr = ", ".join(
25+
"{}={}".format(k, getattr(self, k)) for k in self.attr_list)
26+
return "{}({})".format(self.__class__.__name__, attrstr)
27+
28+
def tojson(self):
29+
return json.dumps(self.asdict())
30+
31+
32+
class RecordMessage(Message):
33+
_type = 'RECORD'
34+
attr_list = ['stream', 'record']
35+
36+
37+
class SchemaMessage(Message):
38+
_type = 'SCHEMA'
39+
attr_list = ['stream', 'schema', 'key_properties']
40+
41+
42+
class StateMessage(Message):
43+
_type = 'STATE'
44+
attr_list = ['value']
45+
46+
47+
def to_json(message):
48+
m = vars(message)
49+
if isinstance(message, RecordMessage):
50+
m['type'] = 'RECORD'
51+
elif isinstance(message, SchemaMessage):
52+
m['type'] = 'SCHEMA'
53+
elif isinstance(message, StateMessage):
54+
m['type'] = 'STATE'
55+
else:
56+
raise Exception('Unrecognized message {}'.format(message))
57+
return json.dumps(m)
58+
59+
60+
def _write_message(message):
61+
sys.stdout.write(to_json(message) + '\n')
1262
sys.stdout.flush()
1363

1464

@@ -17,9 +67,7 @@ def write_record(stream_name, record):
1767
1868
>>> write_record("users", {"id": 2, "email": "mike@stitchdata.com"})
1969
"""
20-
_writeline(json.dumps({'type': 'RECORD',
21-
'stream': stream_name,
22-
'record': record}))
70+
_write_message(RecordMessage(stream=stream_name, record=record))
2371

2472

2573
def write_records(stream_name, records):
@@ -45,19 +93,44 @@ def write_schema(stream_name, schema, key_properties):
4593
key_properties = [key_properties]
4694
if not isinstance(key_properties, list):
4795
raise Exception("key_properties must be a string or list of strings")
48-
_writeline(json.dumps({'type': 'SCHEMA',
49-
'stream': stream_name,
50-
'key_properties': key_properties,
51-
'schema': schema}))
96+
_write_message(
97+
SchemaMessage(
98+
stream=stream_name,
99+
schema=schema,
100+
key_properties=key_properties))
52101

53102

54103
def write_state(value):
55104
"""Write a state message.
56105
57106
>>> write_state({'last_updated_at': '2017-02-14T09:21:00'})
58107
"""
59-
_writeline(json.dumps({'type': 'STATE',
60-
'value': value}))
108+
_write_message(StateMessage(value))
109+
110+
111+
def _required_key(msg, k):
112+
if k not in msg:
113+
raise Exception("Message is missing required key '{}': {}".format(
114+
k, msg))
115+
return msg[k]
116+
117+
118+
def parse_message(s):
119+
"""Parse a message string into a Message object."""
120+
o = json.loads(s)
121+
t = _required_key(o, 'type')
122+
123+
if t == 'RECORD':
124+
return RecordMessage(stream=_required_key(o, 'stream'),
125+
record=_required_key(o, 'record'))
126+
127+
elif t == 'SCHEMA':
128+
return SchemaMessage(stream=_required_key(o, 'stream'),
129+
schema=_required_key(o, 'schema'),
130+
key_properties=_required_key(o, 'key_properties'))
131+
132+
elif t == 'STATE':
133+
return StateMessage(value=_required_key(o, 'value'))
61134

62135

63136
def get_logger():

tests/__init__.py

Whitespace-only changes.

tests/test_singer.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import singer
2+
import unittest
3+
4+
5+
class TestSinger(unittest.TestCase):
6+
def test_parse_message_record_good(self):
7+
message = singer.parse_message(
8+
'{"type": "RECORD", "record": {"name": "foo"}, "stream": "users"}')
9+
self.assertEqual(
10+
message,
11+
singer.RecordMessage(record={'name': 'foo'}, stream='users'))
12+
13+
def test_parse_message_record_missing_record(self):
14+
with self.assertRaises(Exception):
15+
singer.parse_message('{"type": "RECORD", "stream": "users"}')
16+
17+
def test_parse_message_record_missing_stream(self):
18+
with self.assertRaises(Exception):
19+
singer.parse_message(
20+
'{"type": "RECORD", "record": {"name": "foo"}}')
21+
22+
def test_parse_message_schema_good(self):
23+
message = singer.parse_message('{"type": "SCHEMA", "stream": "users", "schema": {"type": "object", "properties": {"name": {"type": "string"}}}, "key_properties": ["name"]}') # nopep8
24+
self.assertEqual(
25+
message,
26+
singer.SchemaMessage(
27+
stream='users',
28+
key_properties=['name'],
29+
schema={'type': 'object',
30+
'properties': {
31+
'name': {'type': 'string'}}}))
32+
33+
def test_parse_message_schema_missing_stream(self):
34+
with self.assertRaises(Exception):
35+
message = singer.parse_message('{"type": "SCHEMA", "schema": {"type": "object", "properties": {"name": {"type": "string"}}}, "key_properties": ["name"]}') # nopep8
36+
37+
def test_parse_message_schema_missing_schema(self):
38+
with self.assertRaises(Exception):
39+
message = singer.parse_message(
40+
'{"type": "SCHEMA", "stream": "users", "key_properties": ["name"]}') # nopep8
41+
42+
def test_parse_message_schema_missing_key_properties(self):
43+
with self.assertRaises(Exception):
44+
message = singer.parse_message('{"type": "SCHEMA", "stream": "users", "schema": {"type": "object", "properties": {"name": {"type": "string"}}}}') # nopep8
45+
46+
def test_parse_message_state_good(self):
47+
message = singer.parse_message(
48+
'{"type": "STATE", "value": {"seq": 1}}')
49+
self.assertEqual(message, singer.StateMessage(value={'seq': 1}))
50+
51+
def test_parse_message_state_missing_value(self):
52+
with self.assertRaises(Exception):
53+
singer.parse_message('{"type": "STATE"}')
54+
55+
def test_round_trip(self):
56+
57+
record_message = singer.RecordMessage(
58+
record={'name': 'foo'},
59+
stream='users')
60+
61+
schema_message = singer.SchemaMessage(
62+
stream='users',
63+
key_properties=['name'],
64+
schema={'type': 'object',
65+
'properties': {
66+
'name': {'type': 'string'}}})
67+
68+
state_message = singer.StateMessage(value={'seq': 1})
69+
70+
self.assertEqual(record_message,
71+
singer.parse_message(singer.to_json(record_message)))
72+
self.assertEqual(schema_message,
73+
singer.parse_message(singer.to_json(schema_message)))
74+
self.assertEqual(state_message,
75+
singer.parse_message(singer.to_json(state_message)))
76+
77+
78+
if __name__ == '__main__':
79+
unittest.main()

0 commit comments

Comments
 (0)