#!/usr/bin/env python3 """Compare bytecode between CPython and RustPython. Compiles all Python files under Lib/ with both interpreters and reports differences in the generated bytecode instructions. Usage: python scripts/compare_bytecode.py python scripts/compare_bytecode.py --detail python scripts/compare_bytecode.py --filter "asyncio/*.py" python scripts/compare_bytecode.py --summary-json report.json """ import argparse import fnmatch import json import os import random import subprocess import sys import tempfile from collections import defaultdict SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) PROJECT_ROOT = os.path.dirname(SCRIPT_DIR) DIS_DUMP = os.path.join(SCRIPT_DIR, "dis_dump.py") DEFAULT_REPORT = os.path.join(PROJECT_ROOT, "compare_bytecode.report") def find_rustpython(): """Locate the RustPython binary, allowing release builds only.""" if "RUSTPYTHON" in os.environ: path = os.environ["RUSTPYTHON"] normalized = os.path.normpath(path) debug_fragment = os.path.join("target", "debug", "rustpython") if normalized.endswith(debug_fragment): raise ValueError( "RUSTPYTHON must point to a release binary, not target/debug/rustpython" ) return path path = os.path.join(PROJECT_ROOT, "target", "release", "rustpython") if os.path.isfile(path) and os.access(path, os.X_OK): return path return None def collect_targets(lib_dir, pattern=None): """Collect Python files to compare, relative to lib_dir.""" targets = [] for root, dirs, files in os.walk(lib_dir): dirs[:] = sorted( d for d in dirs if d != "__pycache__" and not d.startswith(".") ) for fname in sorted(files): if not fname.endswith(".py"): continue fpath = os.path.join(root, fname) relpath = os.path.relpath(fpath, lib_dir) if pattern and not fnmatch.fnmatch(relpath, pattern): continue targets.append((relpath, fpath)) return targets def _start_one(interpreter, targets, base_dir): """Start a single dis_dump.py subprocess.""" env = os.environ.copy() if interpreter != sys.executable: env["RUSTPYTHONPATH"] = base_dir files_file = tempfile.NamedTemporaryFile( mode="w", encoding="utf-8", delete=False, dir=PROJECT_ROOT ) try: for _, path in targets: files_file.write(path) files_file.write("\n") files_file.close() cmd = [ interpreter, DIS_DUMP, "--base-dir", base_dir, "--files-from", files_file.name, "--progress", "10", ] proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=None, # inherit stderr so progress dots appear on terminal env=env, cwd=PROJECT_ROOT, ) return proc, files_file.name except Exception: os.unlink(files_file.name) raise def _finish_one(proc, files_file): """Wait for a single dis_dump.py process and return parsed JSON.""" try: stdout = proc.communicate(timeout=600)[0] except subprocess.TimeoutExpired: proc.kill() proc.communicate() print(" Timeout (600s)", file=sys.stderr) os.unlink(files_file) return {} finally: if os.path.exists(files_file): os.unlink(files_file) if proc.returncode != 0: print(" Warning: exited with code %d" % proc.returncode, file=sys.stderr) content = stdout.decode(errors="replace").strip() if not content: return {} try: return json.loads(content) except json.JSONDecodeError as e: print(" JSON parse error: %s" % e, file=sys.stderr) return {} def start_dump(interpreter, targets, base_dir, num_workers=1): """Start dis_dump.py under the given interpreter, split across workers.""" if num_workers <= 1 or len(targets) <= num_workers: proc, ff = _start_one(interpreter, targets, base_dir) return [(proc, ff)] chunks = [[] for _ in range(num_workers)] for i, t in enumerate(targets): chunks[i % num_workers].append(t) return [_start_one(interpreter, chunk, base_dir) for chunk in chunks if chunk] def finish_dump(procs): """Wait for all dis_dump.py processes and merge results.""" merged = {} for proc, files_file in procs: merged.update(_finish_one(proc, files_file)) return merged def compare_insts(cp_insts, rp_insts): """Compare two instruction lists. Returns list of (index, cp, rp) diffs.""" diffs = [] for i in range(max(len(cp_insts), len(rp_insts))): cp = cp_insts[i] if i < len(cp_insts) else None rp = rp_insts[i] if i < len(rp_insts) else None if cp != rp: diffs.append((i, cp, rp)) return diffs def compare_code(cp_code, rp_code, path=""): """Recursively compare code objects. Returns [(code_path, diffs)].""" results = [] name = cp_code["name"] full = (path + "/" + name) if path else name diffs = compare_insts(cp_code.get("insts", []), rp_code.get("insts", [])) if diffs: results.append((full, diffs)) cp_ch = cp_code.get("children", []) rp_ch = rp_code.get("children", []) cp_by_name = defaultdict(list) rp_by_name = defaultdict(list) for c in cp_ch: cp_by_name[c["name"]].append(c) for c in rp_ch: rp_by_name[c["name"]].append(c) all_names = list(dict.fromkeys(c["name"] for c in cp_ch)) for c in rp_ch: if c["name"] not in cp_by_name: all_names.append(c["name"]) for name in all_names: cp_list = cp_by_name.get(name, []) rp_list = rp_by_name.get(name, []) for i in range(max(len(cp_list), len(rp_list))): if i < len(cp_list) and i < len(rp_list): results.extend(compare_code(cp_list[i], rp_list[i], full)) elif i < len(cp_list): results.append((full + "/" + name, [(-1, "extra in CPython", None)])) else: results.append((full + "/" + name, [(-1, None, "extra in RustPython")])) return results def compare_code_summary(cp_code, rp_code): """Recursively compare code objects and return summary counts.""" diff_code_objects = 0 diff_instructions = compare_insts_count( cp_code.get("insts", []), rp_code.get("insts", []) ) if diff_instructions: diff_code_objects += 1 cp_ch = cp_code.get("children", []) rp_ch = rp_code.get("children", []) cp_by_name = defaultdict(list) rp_by_name = defaultdict(list) for child in cp_ch: cp_by_name[child["name"]].append(child) for child in rp_ch: rp_by_name[child["name"]].append(child) all_names = list(dict.fromkeys(child["name"] for child in cp_ch)) for child in rp_ch: if child["name"] not in cp_by_name: all_names.append(child["name"]) for name in all_names: cp_list = cp_by_name.get(name, []) rp_list = rp_by_name.get(name, []) for i in range(max(len(cp_list), len(rp_list))): if i < len(cp_list) and i < len(rp_list): child_objects, child_insts = compare_code_summary( cp_list[i], rp_list[i] ) diff_code_objects += child_objects diff_instructions += child_insts else: diff_code_objects += 1 diff_instructions += 1 return diff_code_objects, diff_instructions def compare_insts_count(cp_insts, rp_insts): """Count mismatched instruction slots without storing the full diff.""" diff_count = 0 for i in range(max(len(cp_insts), len(rp_insts))): cp = cp_insts[i] if i < len(cp_insts) else None rp = rp_insts[i] if i < len(rp_insts) else None if cp != rp: diff_count += 1 return diff_count def main(): parser = argparse.ArgumentParser(description="Compare CPython/RustPython bytecode") parser.add_argument( "--detail", action="store_true", help="Show per-file instruction differences" ) parser.add_argument("--filter", default=None, help="Glob pattern to filter files") parser.add_argument( "--max-diffs", type=int, default=5, help="Max diffs shown per code object (default: 5)", ) parser.add_argument( "--summary-json", default=None, help="Write summary as JSON to file" ) parser.add_argument( "--sample", type=int, default=None, help="Compare a random sample of N matching files", ) parser.add_argument( "--seed", type=int, default=None, help="Random seed used with --sample", ) parser.add_argument( "--list-limit", type=int, default=10, help="Max differing files to print in non-detail mode (default: 10)", ) parser.add_argument( "--lib-dir", default=os.path.join(PROJECT_ROOT, "Lib"), help="Library directory to compare", ) parser.add_argument( "-j", "--jobs", type=int, default=None, help="Number of parallel workers per interpreter (default: cpu_count)", ) parser.add_argument( "-o", "--output", default=DEFAULT_REPORT, help="Report output file (default: compare_bytecode.report)", ) args = parser.parse_args() try: rp_bin = find_rustpython() except ValueError as exc: print("Error: %s" % exc, file=sys.stderr) sys.exit(1) if not rp_bin: print("Error: RustPython binary not found.", file=sys.stderr) print(" Build with: cargo build --release", file=sys.stderr) print(" Or set RUSTPYTHON=/path/to/binary", file=sys.stderr) sys.exit(1) if not os.path.isfile(DIS_DUMP): print("Error: disassembler helper not found: %s" % DIS_DUMP, file=sys.stderr) print( " Expected scripts/dis_dump.py from origin/bytecode-parity", file=sys.stderr, ) sys.exit(1) targets = collect_targets(args.lib_dir, args.filter) sample_seed = None if args.sample is not None: if args.sample < 1: print("Error: --sample must be >= 1", file=sys.stderr) sys.exit(1) sample_seed = ( args.seed if args.seed is not None else random.SystemRandom().randrange(2**32) ) rng = random.Random(sample_seed) sample_size = min(args.sample, len(targets)) targets = sorted(rng.sample(targets, sample_size), key=lambda item: item[0]) if not targets: print("Error: no Python files matched", file=sys.stderr) sys.exit(1) report_path = args.output log = lambda *a, **kw: print(*a, file=sys.stderr, **kw) log("Report: %s" % os.path.relpath(report_path, PROJECT_ROOT)) log("Targets: %d file(s)" % len(targets)) num_workers = args.jobs if args.jobs else os.cpu_count() or 4 log("Workers: %d per interpreter" % num_workers) sys.stderr.write("Dumping bytecode ") sys.stderr.flush() cp_procs = start_dump(sys.executable, targets, args.lib_dir, num_workers) rp_procs = start_dump(rp_bin, targets, args.lib_dir, num_workers) cp_data = finish_dump(cp_procs) rp_data = finish_dump(rp_procs) sys.stderr.write("\n") if not cp_data: log("Error: CPython dump produced no data") sys.exit(1) # Phase 2: Compare all_files = sorted(set(cp_data) | set(rp_data)) match = 0 differ = 0 cp_err = 0 rp_err = 0 both_err = 0 rp_miss = 0 diff_files = [] rp_error_files = [] diff_summaries = [] need_detailed_diffs = args.detail for fp in all_files: cp = cp_data.get(fp) rp = rp_data.get(fp) if rp is None: rp_miss += 1 continue cp_ok = cp and cp.get("status") == "ok" rp_ok = rp and rp.get("status") == "ok" if not cp_ok and not rp_ok: both_err += 1 elif not cp_ok: cp_err += 1 elif not rp_ok: rp_err += 1 rp_error_files.append((fp, rp.get("error", "?"))) else: if need_detailed_diffs: code_diffs = compare_code(cp["code"], rp["code"]) if code_diffs: differ += 1 diff_files.append((fp, code_diffs)) else: match += 1 else: diff_code_objects, diff_instructions = compare_code_summary( cp["code"], rp["code"] ) if diff_code_objects: differ += 1 diff_summaries.append( { "path": fp, "diff_code_objects": diff_code_objects, "diff_instructions": diff_instructions, } ) else: match += 1 total = match + differ + cp_err + rp_err + both_err + rp_miss def pct(n): return "%.1f%%" % (100.0 * n / total) if total else "0%" # Phase 3: Write report to file with open(report_path, "w") as out: p = lambda *a: print(*a, file=out) p("CPython: %s (%s)" % (sys.executable, sys.version.split()[0])) p("RustPython: %s" % rp_bin) p("Lib: %s" % args.lib_dir) if sample_seed is not None: p("Sample: %s file(s), seed=%s" % (len(targets), sample_seed)) p() p("=" * 60) p(" Bytecode Comparison Report") p("=" * 60) p() p(" Total files: %6d" % total) p(" Match: %6d (%s)" % (match, pct(match))) p(" Differ: %6d (%s)" % (differ, pct(differ))) p(" RustPython error: %6d (%s)" % (rp_err, pct(rp_err))) p(" CPython error: %6d (%s)" % (cp_err, pct(cp_err))) p(" Both error: %6d (%s)" % (both_err, pct(both_err))) if rp_miss: p(" RustPython missing: %6d (%s)" % (rp_miss, pct(rp_miss))) p() if args.detail: if rp_error_files: p("-" * 60) p(" RustPython Compile Errors") p("-" * 60) for fp, err in rp_error_files[:50]: p(" %s: %s" % (fp, err)) if len(rp_error_files) > 50: p(" ... and %d more" % (len(rp_error_files) - 50)) p() if diff_files: p("-" * 60) p(" Bytecode Differences") p("-" * 60) for fp, code_diffs in diff_files: p() p(" %s:" % fp) for code_path, diffs in code_diffs: shown = min(len(diffs), args.max_diffs) p(" %s: %d difference(s)" % (code_path, len(diffs))) for idx, cp_inst, rp_inst in diffs[:shown]: if idx == -1: p(" %s" % (cp_inst or rp_inst)) else: p(" [%3d] CPython: %s" % (idx, cp_inst)) p(" RustPython: %s" % rp_inst) if len(diffs) > shown: p(" ... and %d more" % (len(diffs) - shown)) p() else: list_limit = 0 if args.summary_json else max(args.list_limit, 0) if diff_summaries and list_limit: shown = min(list_limit, len(diff_summaries)) total = len(diff_summaries) p(f"Top differing files ({shown} shown of {total}):") top = sorted( diff_summaries, key=lambda item: ( item["diff_instructions"], item["diff_code_objects"], item["path"], ), reverse=True, )[:list_limit] for item in top: p( " %s (%d code objects, %d instruction diffs)" % ( item["path"], item["diff_code_objects"], item["diff_instructions"], ) ) p() p("Use --detail to see specific instruction differences.") p() # Summary JSON output if args.summary_json: summary = { "total": total, "sample": args.sample, "sample_seed": sample_seed, "match": match, "differ": differ, "rp_error": rp_err, "cp_error": cp_err, "both_error": both_err, "rp_missing": rp_miss, "match_pct": round(100.0 * match / total, 2) if total else 0, "diff_files": [fp for fp, _ in diff_files] if need_detailed_diffs else [item["path"] for item in diff_summaries], "top_diff_files": sorted( diff_summaries, key=lambda item: ( item["diff_instructions"], item["diff_code_objects"], item["path"], ), reverse=True, )[: min(20, len(diff_summaries))], "rp_error_files": [fp for fp, _ in rp_error_files], } with open(args.summary_json, "w") as f: json.dump(summary, f, indent=2) log("Summary JSON: %s" % args.summary_json) log("Done: %d match, %d differ, %d errors" % (match, differ, rp_err)) sys.exit(0 if differ == 0 and rp_err == 0 else 1) if __name__ == "__main__": main()