forked from openai/openai-agents-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpr_labels.py
More file actions
435 lines (348 loc) · 13 KB
/
Copy pathpr_labels.py
File metadata and controls
435 lines (348 loc) · 13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import pathlib
import subprocess
import sys
from collections.abc import Sequence
from dataclasses import dataclass
from typing import Any, Final
ALLOWED_LABELS: Final[set[str]] = {
"documentation",
"project",
"bug",
"enhancement",
"dependencies",
"feature:chat-completions",
"feature:core",
"feature:extensions",
"feature:mcp",
"feature:realtime",
"feature:sessions",
"feature:tracing",
"feature:voice",
}
DETERMINISTIC_LABELS: Final[set[str]] = {
"documentation",
"project",
"dependencies",
}
MODEL_ONLY_LABELS: Final[set[str]] = {
"bug",
"enhancement",
}
FEATURE_LABELS: Final[set[str]] = ALLOWED_LABELS - DETERMINISTIC_LABELS - MODEL_ONLY_LABELS
SOURCE_FEATURE_PREFIXES: Final[dict[str, tuple[str, ...]]] = {
"feature:extensions": ("src/agents/extensions/",),
"feature:realtime": ("src/agents/realtime/",),
"feature:voice": ("src/agents/voice/",),
"feature:mcp": ("src/agents/mcp/",),
"feature:tracing": ("src/agents/tracing/",),
"feature:sessions": ("src/agents/memory/",),
}
CORE_EXCLUDED_PREFIXES: Final[tuple[str, ...]] = (
"src/agents/realtime/",
"src/agents/voice/",
"src/agents/mcp/",
"src/agents/tracing/",
"src/agents/memory/",
"src/agents/extensions/",
"src/agents/models/",
)
PR_CONTEXT_DEFAULT_PATH = ".tmp/pr-labels/pr-context.json"
@dataclass(frozen=True)
class PRContext:
title: str = ""
body: str = ""
def read_file_at(commit: str | None, path: str) -> str | None:
if not commit:
return None
try:
return subprocess.check_output(["git", "show", f"{commit}:{path}"], text=True)
except subprocess.CalledProcessError:
return None
def dependency_lines_for_pyproject(text: str) -> set[int]:
dependency_lines: set[int] = set()
current_section: str | None = None
in_project_dependencies = False
for line_number, raw_line in enumerate(text.splitlines(), start=1):
stripped = raw_line.strip()
if stripped.startswith("[") and stripped.endswith("]"):
if stripped.startswith("[[") and stripped.endswith("]]"):
current_section = stripped[2:-2].strip()
else:
current_section = stripped[1:-1].strip()
in_project_dependencies = False
if current_section in ("project.optional-dependencies", "dependency-groups"):
dependency_lines.add(line_number)
continue
if current_section in ("project.optional-dependencies", "dependency-groups"):
dependency_lines.add(line_number)
continue
if current_section != "project":
continue
if in_project_dependencies:
dependency_lines.add(line_number)
if "]" in stripped:
in_project_dependencies = False
continue
if stripped.startswith("dependencies") and "=" in stripped:
dependency_lines.add(line_number)
if "[" in stripped and "]" not in stripped:
in_project_dependencies = True
return dependency_lines
def pyproject_dependency_changed(
diff_text: str,
*,
base_sha: str | None,
head_sha: str | None,
) -> bool:
import re
base_text = read_file_at(base_sha, "pyproject.toml")
head_text = read_file_at(head_sha, "pyproject.toml")
if base_text is None and head_text is None:
return False
base_dependency_lines = dependency_lines_for_pyproject(base_text) if base_text else set()
head_dependency_lines = dependency_lines_for_pyproject(head_text) if head_text else set()
in_pyproject = False
base_line: int | None = None
head_line: int | None = None
hunk_re = re.compile(r"@@ -(\d+)(?:,\d+)? \+(\d+)(?:,\d+)? @@")
for line in diff_text.splitlines():
if line.startswith("+++ b/"):
current_file = line[len("+++ b/") :].strip()
in_pyproject = current_file == "pyproject.toml"
base_line = None
head_line = None
continue
if not in_pyproject:
continue
if line.startswith("@@ "):
match = hunk_re.match(line)
if not match:
continue
base_line = int(match.group(1))
head_line = int(match.group(2))
continue
if base_line is None or head_line is None:
continue
if line.startswith(" "):
base_line += 1
head_line += 1
continue
if line.startswith("-"):
if base_line in base_dependency_lines:
return True
base_line += 1
continue
if line.startswith("+"):
if head_line in head_dependency_lines:
return True
head_line += 1
continue
return False
def infer_specific_feature_labels(changed_files: Sequence[str]) -> set[str]:
source_files = [path for path in changed_files if path.startswith("src/")]
labels: set[str] = set()
for label, prefixes in SOURCE_FEATURE_PREFIXES.items():
if any(path.startswith(prefix) for path in source_files for prefix in prefixes):
labels.add(label)
if any(
path.startswith(("src/agents/models/", "src/agents/extensions/models/"))
and ("chatcmpl" in path or "chatcompletions" in path)
for path in source_files
):
labels.add("feature:chat-completions")
return labels
def infer_feature_labels(changed_files: Sequence[str]) -> set[str]:
source_files = [path for path in changed_files if path.startswith("src/")]
specific_labels = infer_specific_feature_labels(source_files)
core_touched = any(
path.startswith("src/agents/") and not path.startswith(CORE_EXCLUDED_PREFIXES)
for path in source_files
)
if core_touched and len(specific_labels) != 1:
return {"feature:core"}
return specific_labels
def infer_fallback_labels(changed_files: Sequence[str]) -> set[str]:
return infer_feature_labels(changed_files)
def load_json(path: pathlib.Path) -> Any:
return json.loads(path.read_text())
def load_pr_context(path: pathlib.Path) -> PRContext:
if not path.exists():
return PRContext()
try:
payload = load_json(path)
except json.JSONDecodeError:
return PRContext()
if not isinstance(payload, dict):
return PRContext()
title = payload.get("title", "")
body = payload.get("body", "")
if not isinstance(title, str):
title = ""
if not isinstance(body, str):
body = ""
return PRContext(title=title, body=body)
def load_codex_labels(path: pathlib.Path) -> tuple[list[str], bool]:
if not path.exists():
return [], False
raw = path.read_text().strip()
if not raw:
return [], False
try:
payload = load_json(path)
except json.JSONDecodeError:
return [], False
if not isinstance(payload, dict):
return [], False
labels = payload.get("labels")
if not isinstance(labels, list):
return [], False
if not all(isinstance(label, str) for label in labels):
return [], False
return list(labels), True
def fetch_existing_labels(pr_number: str) -> set[str]:
result = subprocess.check_output(
["gh", "pr", "view", pr_number, "--json", "labels", "--jq", ".labels[].name"],
text=True,
).strip()
return {label for label in result.splitlines() if label}
def infer_title_intent_labels(pr_context: PRContext) -> set[str]:
normalized_title = pr_context.title.strip().lower()
bug_prefixes = ("fix:", "fix(", "bug:", "bugfix:", "hotfix:", "regression:")
enhancement_prefixes = ("feat:", "feat(", "feature:", "enhancement:")
if normalized_title.startswith(bug_prefixes):
return {"bug"}
if normalized_title.startswith(enhancement_prefixes):
return {"enhancement"}
return set()
def compute_desired_labels(
*,
pr_context: PRContext,
changed_files: Sequence[str],
diff_text: str,
codex_ran: bool,
codex_output_valid: bool,
codex_labels: Sequence[str],
base_sha: str | None,
head_sha: str | None,
) -> set[str]:
desired: set[str] = set()
codex_label_set = {label for label in codex_labels if label in ALLOWED_LABELS}
codex_feature_labels = codex_label_set & FEATURE_LABELS
codex_model_only_labels = codex_label_set & MODEL_ONLY_LABELS
fallback_feature_labels = infer_fallback_labels(changed_files)
title_intent_labels = infer_title_intent_labels(pr_context)
if "pyproject.toml" in changed_files:
desired.add("project")
if any(path.startswith("docs/") for path in changed_files):
desired.add("documentation")
dependencies_allowed = "uv.lock" in changed_files
if "pyproject.toml" in changed_files and pyproject_dependency_changed(
diff_text, base_sha=base_sha, head_sha=head_sha
):
dependencies_allowed = True
if dependencies_allowed:
desired.add("dependencies")
if codex_ran and codex_output_valid and codex_feature_labels:
desired.update(codex_feature_labels)
else:
desired.update(fallback_feature_labels)
if title_intent_labels:
desired.update(title_intent_labels)
elif codex_ran and codex_output_valid:
desired.update(codex_model_only_labels)
return desired
def compute_managed_labels(
*,
pr_context: PRContext,
codex_ran: bool,
codex_output_valid: bool,
codex_labels: Sequence[str],
) -> set[str]:
managed = DETERMINISTIC_LABELS | FEATURE_LABELS
title_intent_labels = infer_title_intent_labels(pr_context)
codex_label_set = {label for label in codex_labels if label in MODEL_ONLY_LABELS}
if title_intent_labels or (codex_ran and codex_output_valid and codex_label_set):
managed |= MODEL_ONLY_LABELS
return managed
def parse_args(argv: Sequence[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--pr-number", default=os.environ.get("PR_NUMBER", ""))
parser.add_argument("--base-sha", default=os.environ.get("PR_BASE_SHA", ""))
parser.add_argument("--head-sha", default=os.environ.get("PR_HEAD_SHA", ""))
parser.add_argument(
"--codex-output-path",
default=os.environ.get("CODEX_OUTPUT_PATH", ".tmp/codex/outputs/pr-labels.json"),
)
parser.add_argument("--codex-conclusion", default=os.environ.get("CODEX_CONCLUSION", ""))
parser.add_argument(
"--pr-context-path",
default=os.environ.get("PR_CONTEXT_PATH", PR_CONTEXT_DEFAULT_PATH),
)
parser.add_argument(
"--changed-files-path",
default=os.environ.get("CHANGED_FILES_PATH", ".tmp/pr-labels/changed-files.txt"),
)
parser.add_argument(
"--changes-diff-path",
default=os.environ.get("CHANGES_DIFF_PATH", ".tmp/pr-labels/changes.diff"),
)
return parser.parse_args(argv)
def main(argv: Sequence[str] | None = None) -> int:
args = parse_args(argv)
if not args.pr_number:
raise SystemExit("Missing PR number.")
changed_files_path = pathlib.Path(args.changed_files_path)
changes_diff_path = pathlib.Path(args.changes_diff_path)
codex_output_path = pathlib.Path(args.codex_output_path)
pr_context_path = pathlib.Path(args.pr_context_path)
codex_conclusion = args.codex_conclusion.strip().lower()
codex_ran = bool(codex_conclusion) and codex_conclusion != "skipped"
pr_context = load_pr_context(pr_context_path)
changed_files = []
if changed_files_path.exists():
changed_files = [
line.strip() for line in changed_files_path.read_text().splitlines() if line.strip()
]
diff_text = changes_diff_path.read_text() if changes_diff_path.exists() else ""
codex_labels, codex_output_valid = load_codex_labels(codex_output_path)
if codex_ran and not codex_output_valid:
print(
"Codex output missing or invalid; using fallback feature labels and preserving "
"model-only labels."
)
desired = compute_desired_labels(
pr_context=pr_context,
changed_files=changed_files,
diff_text=diff_text,
codex_ran=codex_ran,
codex_output_valid=codex_output_valid,
codex_labels=codex_labels,
base_sha=args.base_sha or None,
head_sha=args.head_sha or None,
)
existing = fetch_existing_labels(args.pr_number)
managed_labels = compute_managed_labels(
pr_context=pr_context,
codex_ran=codex_ran,
codex_output_valid=codex_output_valid,
codex_labels=codex_labels,
)
to_add = sorted(desired - existing)
to_remove = sorted((existing & managed_labels) - desired)
if not to_add and not to_remove:
print("Labels already up to date.")
return 0
cmd = ["gh", "pr", "edit", args.pr_number]
if to_add:
cmd += ["--add-label", ",".join(to_add)]
if to_remove:
cmd += ["--remove-label", ",".join(to_remove)]
subprocess.check_call(cmd)
return 0
if __name__ == "__main__":
sys.exit(main())