forked from singer-io/tap-github
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_github_all_fields.py
More file actions
75 lines (57 loc) · 3.3 KB
/
test_github_all_fields.py
File metadata and controls
75 lines (57 loc) · 3.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import os
from tap_tester import runner, connections, menagerie
from base import TestGithubBase
class TestGithubAllFields(TestGithubBase):
"""Test that with all fields selected for a stream automatic and available fields are replicated"""
@staticmethod
def name():
return "tap_tester_github_all_fields"
def test_run(self):
"""
Ensure running the tap with all streams and fields selected results in the
replication of all fields.
- Verify no unexpected streams were replicated
- Verify that more than just the automatic fields are replicated for each stream.
"""
expected_streams = self.expected_streams()
# instantiate connection
conn_id = connections.ensure_connection(self)
# run check mode
found_catalogs = self.run_and_verify_check_mode(conn_id)
# table and field selection
test_catalogs_all_fields = [catalog for catalog in found_catalogs
if catalog.get('stream_name') in expected_streams]
self.perform_and_verify_table_and_field_selection(
conn_id, test_catalogs_all_fields, select_all_fields=True,
)
# grab metadata after performing table-and-field selection to set expectations
stream_to_all_catalog_fields = dict() # used for asserting all fields are replicated
for catalog in test_catalogs_all_fields:
stream_id, stream_name = catalog['stream_id'], catalog['stream_name']
catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id)
fields_from_field_level_md = [md_entry['breadcrumb'][1]
for md_entry in catalog_entry['metadata']
if md_entry['breadcrumb'] != []]
stream_to_all_catalog_fields[stream_name] = set(fields_from_field_level_md)
# run initial sync
record_count_by_stream = self.run_and_verify_sync(conn_id)
synced_records = runner.get_records_from_target_output()
# Verify no unexpected streams were replicated
synced_stream_names = set(synced_records.keys())
self.assertSetEqual(expected_streams, synced_stream_names)
for stream in expected_streams:
with self.subTest(stream=stream):
# expected values
expected_automatic_keys = self.expected_primary_keys().get(stream)
# get all expected keys
expected_all_keys = stream_to_all_catalog_fields[stream]
# collect actual values
messages = synced_records.get(stream)
actual_all_keys = [set(message['data'].keys()) for message in messages['messages']
if message['action'] == 'upsert'][0]
# Verify that you get some records for each stream
self.assertGreater(record_count_by_stream.get(stream, -1), 0)
# verify all fields for a stream were replicated
self.assertGreater(len(expected_all_keys), len(expected_automatic_keys))
self.assertTrue(expected_automatic_keys.issubset(expected_all_keys), msg=f'{expected_automatic_keys-expected_all_keys} is not in "expected_all_keys"')
self.assertSetEqual(expected_all_keys, actual_all_keys)