-
-
Notifications
You must be signed in to change notification settings - Fork 902
Expand file tree
/
Copy pathcheck_repo_ci_jobs.py
More file actions
104 lines (78 loc) · 2.92 KB
/
check_repo_ci_jobs.py
File metadata and controls
104 lines (78 loc) · 2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import os
import pathlib
import shutil
import zipfile
import requests
# To test this locally, set these environment variables
REPO_OWNER = os.environ.get("REPO_OWNER", "IfcOpenShell/IfcOpenShell")
MY_WORKFLOW = os.environ.get("MY_WORKFLOW", "ci-ifcopenshell-conda-daily")
WORKFLOW_RUN_ID = os.environ.get("WORKFLOW_RUN_ID", None)
TOKEN = os.environ.get("GITHUB_TOKEN", None)
# To test this locally create a fine-grained personal access token for this repo with permissions "actions:read"
# See https://github.com/settings/tokens?type=beta
# This is a list of strings that indicate that a job has stopped abruptly.
SIGNS_OF_STOPPAGE = [
"Error: The operation was canceled.",
"fatal error C1060: compiler is out of heap space",
]
def start_request_session():
s = requests.Session()
headers = {
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
}
if TOKEN:
headers["Authorization"] = f"Bearer {TOKEN}"
s.headers = headers
return s
def get_ci_run(repo_name, run_id):
url = f"https://api.github.com/repos/{repo_name}/actions/runs/{run_id}"
s = start_request_session()
response = s.get(url)
return response.json()
def evaluate_log_file_for_abrupt_stop(log_file):
with open(log_file) as f:
for i, line in enumerate(f):
for sign in SIGNS_OF_STOPPAGE:
if sign in line:
print(f"Found sign of abrupt stoppage in [line {i}]: '{sign}'")
return True
return False
def get_ci_specific_run_specific_failure_details(repo_name, run_id):
url = f"https://api.github.com/repos/{repo_name}/actions/runs/{run_id}/attempts/1/logs"
s = start_request_session()
response = s.get(url)
# SAVE zip file
with open("logs.zip", "wb") as f:
f.write(response.content)
# unzip file
with zipfile.ZipFile("logs.zip", "r") as zip_ref:
zip_ref.extractall("logs")
failed_logs = []
for file in pathlib.Path("logs").iterdir():
if file.is_dir():
continue
if evaluate_log_file_for_abrupt_stop(file):
failed_logs.append(file)
return failed_logs
def restart_job(repo_name, job_id):
url = f"https://api.github.com/repos/{repo_name}/actions/runs/{job_id}/rerun-failed-jobs"
s = start_request_session()
response = s.post(url)
return response.json()
def eval_jobs():
run = get_ci_run(REPO_OWNER, WORKFLOW_RUN_ID)
if run["run_attempt"] > 1:
print("This is not the first attempt. Exiting")
return
failed_logs = get_ci_specific_run_specific_failure_details(REPO_OWNER, WORKFLOW_RUN_ID)
if len(failed_logs) == 0:
print("No runs exhibit signs of abrupt stoppage")
return
print("restarting job", WORKFLOW_RUN_ID)
r = restart_job(REPO_OWNER, WORKFLOW_RUN_ID)
print(r)
shutil.rmtree("logs")
os.remove("logs.zip")
if __name__ == "__main__":
eval_jobs()