forked from singer-io/singer-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_singer.py
More file actions
99 lines (78 loc) · 3.97 KB
/
test_singer.py
File metadata and controls
99 lines (78 loc) · 3.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import singer
import unittest
class TestSinger(unittest.TestCase):
def test_parse_message_record_good(self):
message = singer.parse_message(
'{"type": "RECORD", "record": {"name": "foo"}, "stream": "users"}')
self.assertEqual(
message,
singer.RecordMessage(record={'name': 'foo'}, stream='users'))
def test_parse_message_record_with_version_good(self):
message = singer.parse_message(
'{"type": "RECORD", "record": {"name": "foo"}, "stream": "users", "version": 2}')
self.assertEqual(
message,
singer.RecordMessage(record={'name': 'foo'}, stream='users', version=2))
def test_parse_message_record_missing_record(self):
with self.assertRaises(Exception):
singer.parse_message('{"type": "RECORD", "stream": "users"}')
def test_parse_message_record_missing_stream(self):
with self.assertRaises(Exception):
singer.parse_message(
'{"type": "RECORD", "record": {"name": "foo"}}')
def test_parse_message_schema_good(self):
message = singer.parse_message('{"type": "SCHEMA", "stream": "users", "schema": {"type": "object", "properties": {"name": {"type": "string"}}}, "key_properties": ["name"]}') # nopep8
self.assertEqual(
message,
singer.SchemaMessage(
stream='users',
key_properties=['name'],
schema={'type': 'object',
'properties': {
'name': {'type': 'string'}}}))
def test_parse_message_schema_missing_stream(self):
with self.assertRaises(Exception):
message = singer.parse_message('{"type": "SCHEMA", "schema": {"type": "object", "properties": {"name": {"type": "string"}}}, "key_properties": ["name"]}') # nopep8
def test_parse_message_schema_missing_schema(self):
with self.assertRaises(Exception):
message = singer.parse_message(
'{"type": "SCHEMA", "stream": "users", "key_properties": ["name"]}') # nopep8
def test_parse_message_schema_missing_key_properties(self):
with self.assertRaises(Exception):
message = singer.parse_message('{"type": "SCHEMA", "stream": "users", "schema": {"type": "object", "properties": {"name": {"type": "string"}}}}') # nopep8
def test_parse_message_state_good(self):
message = singer.parse_message(
'{"type": "STATE", "value": {"seq": 1}}')
self.assertEqual(message, singer.StateMessage(value={'seq': 1}))
def test_parse_message_state_missing_value(self):
with self.assertRaises(Exception):
singer.parse_message('{"type": "STATE"}')
def test_round_trip(self):
record_message = singer.RecordMessage(
record={'name': 'foo'},
stream='users')
schema_message = singer.SchemaMessage(
stream='users',
key_properties=['name'],
schema={'type': 'object',
'properties': {
'name': {'type': 'string'}}})
state_message = singer.StateMessage(value={'seq': 1})
self.assertEqual(record_message,
singer.parse_message(singer.format_message(record_message)))
self.assertEqual(schema_message,
singer.parse_message(singer.format_message(schema_message)))
self.assertEqual(state_message,
singer.parse_message(singer.format_message(state_message)))
## These three tests just confirm that writing doesn't throw
def test_write_record(self):
singer.write_record("users", {"name": "mike"})
def test_write_schema(self):
schema={'type': 'object',
'properties': {
'name': {'type': 'string'}}}
singer.write_schema("users", schema, ["name"])
def test_write_state(self):
singer.write_state({"foo": 1})
if __name__ == '__main__':
unittest.main()