forked from singer-io/tap-github
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtap_github.py
More file actions
135 lines (101 loc) · 3.9 KB
/
tap_github.py
File metadata and controls
135 lines (101 loc) · 3.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import argparse
import requests
import singer
import json
import os
import singer.stats
session = requests.Session()
logger = singer.get_logger()
def authed_get(source, url):
with singer.stats.Timer(source=source) as stats:
resp = session.request(method='get', url=url)
stats.http_status_code = resp.status_code
return resp
def authed_get_all_pages(source, url):
while True:
r = authed_get(source, url)
yield r
if 'next' in r.links:
url = r.links['next']['url']
else:
break
def get_abs_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)
def load_schemas():
schemas = {}
with open(get_abs_path('tap_github/commits.json')) as file:
schemas['commits'] = json.load(file)
with open(get_abs_path('tap_github/issues.json')) as file:
schemas['issues'] = json.load(file)
return schemas
def get_all_commits(repo_path, state):
if 'commits' in state and state['commits'] is not None:
query_string = '?since={}'.format(state['commits'])
else:
query_string = ''
latest_commit_time = None
with singer.stats.Counter(source='commits') as stats:
for response in authed_get_all_pages('commits', 'https://api.github.com/repos/{}/commits{}'.format(repo_path, query_string)):
commits = response.json()
for commit in commits:
stats.add(record_count=1)
commit.pop('author', None)
commit.pop('committer', None)
singer.write_records('commits', commits)
if not latest_commit_time:
latest_commit_time = commits[0]['commit']['committer']['date']
state['commits'] = latest_commit_time
return state
def get_all_issues(repo_path, state):
if 'issues' in state and state['issues'] is not None:
query_string = '&since={}'.format(state['issues'])
else:
query_string = ''
last_issue_time = None
with singer.stats.Counter(source='issues') as stats:
for response in authed_get_all_pages('issues', 'https://api.github.com/repos/{}/issues?sort=updated&direction=asc{}'.format(repo_path, query_string)):
issues = response.json()
stats.add(record_count=len(issues))
if len(issues) > 0:
last_issue_time = issues[-1]['updated_at']
singer.write_records('issues', issues)
state['issues'] = last_issue_time
return state
def do_sync(config, state):
access_token = config['access_token']
repo_path = config['repository']
schemas = load_schemas()
session.headers.update({'authorization': 'token ' + access_token})
if state:
logger.info('Replicating commits since %s from %s', state, repo_path)
else:
logger.info('Replicating all commits from %s', repo_path)
singer.write_schema('commits', schemas['commits'], 'sha')
singer.write_schema('issues', schemas['issues'], 'id')
state = get_all_commits(repo_path, state)
state = get_all_issues(repo_path, state)
singer.write_state(state)
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
'-c', '--config', help='Config file', required=True)
parser.add_argument(
'-s', '--state', help='State file')
args = parser.parse_args()
with open(args.config) as config_file:
config = json.load(config_file)
missing_keys = []
for key in ['access_token', 'repository']:
if key not in config:
missing_keys += [key]
if len(missing_keys) > 0:
logger.fatal("Missing required configuration keys: {}".format(missing_keys))
exit(1)
state = {}
if args.state:
with open(args.state, 'r') as file:
for line in file:
state = json.loads(line.strip())
do_sync(config, state)
if __name__ == '__main__':
main()