diff --git a/.pre-commit-hooks.yaml b/.pre-commit-hooks.yaml index 15456a6..5018259 100644 --- a/.pre-commit-hooks.yaml +++ b/.pre-commit-hooks.yaml @@ -27,3 +27,12 @@ args: [--author-email] pass_filenames: false language: python +- id: check-no-force-push + name: check no force push + description: prevents force pushes to remote branches + entry: commit-check + args: [--no-force-push] + stages: [pre-push] + pass_filenames: false + always_run: true + language: python diff --git a/README.rst b/README.rst index 74b2489..c94ac62 100644 --- a/README.rst +++ b/README.rst @@ -187,6 +187,32 @@ For one-off checks or CI/CD pipelines, you can configure via CLI arguments or en See the `Configuration documentation `_ for all available options. +Check Push Safety +~~~~~~~~~~~~~~~~~ + +Use ``--no-force-push`` in a ``pre-push`` hook to inspect the ref updates Git +provides on stdin, or run it directly to compare ``HEAD`` with the current +branch's configured upstream: + +.. code-block:: bash + + # Standalone preflight check against the current branch's upstream + commit-check --no-force-push + +.. code-block:: yaml + + # In pre-commit hooks (.pre-commit-config.yaml) + repos: + - repo: https://github.com/commit-check/commit-check + rev: v2.6.0 + hooks: + - id: check-no-force-push + stages: [pre-push] + +Piping ``git push`` into ``commit-check`` is not a prevention mechanism. The +push has already been started, and standard ``git push`` output does not carry +the pre-push ref metadata that ``commit-check`` uses. + AI-Native Usage --------------- @@ -415,33 +441,45 @@ reflects a DIY approach rather than built-in product features. .. list-table:: :header-rows: 1 - :widths: 36 18 18 28 + :widths: 26 16 16 16 26 * - Feature - - Commit Check ✅ + - Commit Check - commitlint + - YACC [#f2]_ - Custom hooks * - Conventional Commits enforcement - ✅ - ✅ + - Partial - DIY * - Branch naming validation - ✅ - ❌ + - ✅ + - DIY + * - Force push blocking + - ✅ + - ❌ + - ❌ - DIY * - Author name / email validation - ✅ - ❌ + - ✅ - DIY * - Signed-off-by trailer enforcement - ✅ - - ✅ + - Partial [#f1]_ + - ❌ - DIY * - Co-author ignore list - ✅ - ❌ + - Partial [#f3]_ - DIY * - Organization-level shared config + - ✅ - ✅ - ✅ - DIY @@ -449,28 +487,71 @@ reflects a DIY approach rather than built-in product features. - ✅ - ❌ - ❌ + - ❌ * - Works without Node.js - ✅ - ❌ + - ✅ - Depends * - Native TOML configuration - ✅ - ❌ + - ❌ - Depends * - Git hook / pre-commit integration - ✅ - Partial + - ❌ - ✅ * - CI/CD-friendly configuration - ✅ - Partial + - ❌ - DIY + * - Open source & free + - ✅ + - ✅ + - ❌ + - ✅ + * - Client-side (pre-commit) enforcement + - ✅ + - ✅ + - ❌ + - ✅ + * - AI-native (JSON API + Python SDK) + - ✅ + - ❌ + - ❌ + - ❌ -For ``commitlint``, organization-level shared config is typically delivered via -shareable config packages or local files. ``DIY`` means you can implement a +*For* ``commitlint``, organization-level shared config is typically delivered via +shareable config packages or local files. + +*For* ``YACC`` (Yet Another Commit Checker), conventional commit enforcement +is regex-based rather than Conventional Commits-aware; author validation +verifies committer name/email against Bitbucket user accounts or custom regex; +the plugin supports global → project → repository config inheritance; +it is a server-side pre-receive hook and merge check (no client-side +pre-commit), is paid (per-user licensing), and runs on Java (no Node.js needed). + +``DIY`` means you can implement a capability with custom Git hooks or ``pre-commit`` scripts, but it is not provided as a turnkey policy layer. +.. [#f1] ``commitlint`` provides a community ``signed-off-by`` rule + (``@commitlint/rule-signed-off-by``) that must be installed and configured + separately; it is not part of the default + ``@commitlint/config-conventional`` preset. + +.. [#f2] `Yet Another Commit Checker + `_ + is a paid Bitbucket Server / Data Center plugin (server-side pre-receive hook + and merge check). + +.. [#f3] YACC can exclude commits from specific Bitbucket users, user groups, + or service users (bots), but does not parse ``Co-authored-by:`` trailers + in commit messages. + Versioning ---------- diff --git a/commit_check/__init__.py b/commit_check/__init__.py index 1e16960..aeecd10 100644 --- a/commit_check/__init__.py +++ b/commit_check/__init__.py @@ -44,6 +44,11 @@ # Additional allowed branch names (e.g., develop, staging) DEFAULT_BRANCH_NAMES: list[str] = [] +# Push-related defaults +DEFAULT_PUSH_RULES = { + "allow_force_push": True, +} + # Handle different default values for different rules DEFAULT_BOOLEAN_RULES = { "subject_capitalized": False, diff --git a/commit_check/api.py b/commit_check/api.py index b579b9b..dac92bc 100644 --- a/commit_check/api.py +++ b/commit_check/api.py @@ -166,6 +166,38 @@ def validate_branch( return _run_checks(["branch", "merge_base"], context, cfg) +def validate_push( + push_refs: Optional[str] = None, + *, + config: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + """Validate that a push is not a force push. + + :param push_refs: Push ref information in the format produced by git's + pre-push hook: `` ``, + one entry per line. If *None*, the check is skipped (returns pass). + :param config: Optional configuration override dict. The push check is + always enabled when calling this function; force pushes detected here + will always return ``"fail"``. + :returns: A dict with ``"status"`` (``"pass"``/``"fail"``) and ``"checks"``. + + Example:: + + >>> from commit_check.api import validate_push + >>> zero = "0000000000000000000000000000000000000000" + >>> result = validate_push(f"refs/heads/main abc123 refs/heads/main {zero}") + >>> result["status"] + 'pass' + """ + cfg = _merge_config(config) + # Enable force push blocking in the config so the rule is built + if "push" not in cfg: + cfg["push"] = {} + cfg["push"]["allow_force_push"] = False + context = ValidationContext(stdin_text=push_refs, config=cfg) + return _run_checks(["no_force_push"], context, cfg) + + def validate_author( name: Optional[str] = None, email: Optional[str] = None, diff --git a/commit_check/config_merger.py b/commit_check/config_merger.py index 0cbeff1..d8b4abd 100644 --- a/commit_check/config_merger.py +++ b/commit_check/config_merger.py @@ -11,6 +11,7 @@ DEFAULT_BRANCH_TYPES, DEFAULT_BRANCH_NAMES, DEFAULT_BOOLEAN_RULES, + DEFAULT_PUSH_RULES, ) @@ -81,6 +82,9 @@ def get_default_config() -> Dict[str, Any]: "require_rebase_target": "", "ignore_authors": [], }, + "push": { + "allow_force_push": DEFAULT_PUSH_RULES["allow_force_push"], + }, } @@ -119,6 +123,8 @@ class ConfigMerger: "CCHK_ALLOW_BRANCH_NAMES": ("branch", "allow_branch_names", parse_list), "CCHK_REQUIRE_REBASE_TARGET": ("branch", "require_rebase_target", str), "CCHK_BRANCH_IGNORE_AUTHORS": ("branch", "ignore_authors", parse_list), + # Push section + "CCHK_ALLOW_FORCE_PUSH": ("push", "allow_force_push", parse_bool), } # Mapping of CLI argument names to config keys @@ -144,12 +150,14 @@ class ConfigMerger: "allow_branch_names": ("branch", "allow_branch_names"), "require_rebase_target": ("branch", "require_rebase_target"), "branch_ignore_authors": ("branch", "ignore_authors"), + # Push section + "allow_force_push": ("push", "allow_force_push"), } @staticmethod def parse_env_vars() -> Dict[str, Any]: """Parse environment variables with CCHK_ prefix into config dict.""" - config: Dict[str, Any] = {"commit": {}, "branch": {}} + config: Dict[str, Any] = {"commit": {}, "branch": {}, "push": {}} for env_var, (section, key, parser) in ConfigMerger.ENV_VAR_MAPPING.items(): value = os.environ.get(env_var) @@ -168,7 +176,7 @@ def parse_env_vars() -> Dict[str, Any]: @staticmethod def parse_cli_args(args: argparse.Namespace) -> Dict[str, Any]: """Parse CLI arguments into config dict.""" - config: Dict[str, Any] = {"commit": {}, "branch": {}} + config: Dict[str, Any] = {"commit": {}, "branch": {}, "push": {}} for arg_name, (section, key) in ConfigMerger.CLI_ARG_MAPPING.items(): if hasattr(args, arg_name): diff --git a/commit_check/engine.py b/commit_check/engine.py index e148cd4..369b897 100644 --- a/commit_check/engine.py +++ b/commit_check/engine.py @@ -8,9 +8,14 @@ from commit_check.rule_builder import ValidationRule from commit_check.util import ( + fetch_remote_ref, + fetch_upstream_ref, get_commit_info, get_git_config_value, get_branch_name, + get_git_remotes, + get_upstream_branch, + get_upstream_remote_sha, has_commits, git_merge_base, ) @@ -33,6 +38,7 @@ class ValidationContext: config: Dict = field(default_factory=dict) no_banner: bool = False compact: bool = False + push_upstream_fallback: bool = False @dataclass @@ -529,6 +535,107 @@ def _get_commit_message(self, context: ValidationContext) -> str: return f"{subject}\n\n{body}".strip() +class ForcePushValidator(BaseValidator): + """Validates that no force push is being performed. + + Reads pushed ref information from stdin (provided by git's pre-push hook) + in the format:: + + + + A force push is detected when the remote SHA is not an ancestor of the + local SHA, meaning local history would overwrite the remote. + """ + + ZERO_SHA = "0000000000000000000000000000000000000000" + + def validate(self, context: ValidationContext) -> ValidationResult: + if not context.stdin_text: + if context.push_upstream_fallback: + return self._check_current_branch_against_upstream() + return ValidationResult.PASS + + for line in context.stdin_text.splitlines(): + result = self._check_push_line(line.strip()) + if result == ValidationResult.FAIL: + return ValidationResult.FAIL + + return ValidationResult.PASS + + def _check_current_branch_against_upstream(self) -> ValidationResult: + """Check whether pushing HEAD to its upstream would require force.""" + upstream_ref = get_upstream_branch() + if not upstream_ref: + return ValidationResult.PASS + + target_ref = get_upstream_remote_sha(upstream_ref) or upstream_ref + returncode = git_merge_base(target_ref, "HEAD") + if ( + returncode == 128 + and target_ref != upstream_ref + and fetch_upstream_ref(upstream_ref) + ): + returncode = git_merge_base(target_ref, "HEAD") + if returncode == 1: + self._print_failure(f"{get_branch_name()} -> {upstream_ref}") + return ValidationResult.FAIL + + return ValidationResult.PASS + + def _check_push_line(self, line: str) -> ValidationResult: + """Check a single pushed ref line for force push.""" + if not line: + return ValidationResult.PASS + + parts = line.split() + if len(parts) < 4: + return ValidationResult.PASS + + local_ref, local_sha, remote_ref, remote_sha = ( + parts[0], + parts[1], + parts[2], + parts[3], + ) + + # Zero SHA for remote means a new branch push (not a force push) + if remote_sha == self.ZERO_SHA: + return ValidationResult.PASS + + # Check if the remote SHA is an ancestor of the local SHA. + # returncode 0 -> remote is ancestor of local (fast-forward push, OK) + # returncode 1 -> not an ancestor (force push detected) + # returncode 128 -> SHA may be unknown locally; fetch remote ref and retry + returncode = git_merge_base(remote_sha, local_sha) + if returncode == 128: + for remote in self._remote_candidates_for_push(remote_ref): + if not fetch_remote_ref(remote, remote_ref): + continue + returncode = git_merge_base(remote_sha, local_sha) + if returncode != 128: + break + if returncode == 1: + self._print_failure(f"{local_ref} -> {remote_ref}") + return ValidationResult.FAIL + + return ValidationResult.PASS + + def _remote_candidates_for_push(self, remote_ref: str) -> List[str]: + """Return remotes worth fetching for a pushed branch ref.""" + if not remote_ref.startswith("refs/heads/"): + return [] + + remotes: List[str] = [] + upstream_ref = get_upstream_branch() + upstream_parts = upstream_ref.split("/", 1) + remote_branch = remote_ref.removeprefix("refs/heads/") + if len(upstream_parts) == 2 and upstream_parts[1] == remote_branch: + remotes.append(upstream_parts[0]) + + remotes.extend(remote for remote in get_git_remotes() if remote not in remotes) + return remotes + + class CommitTypeValidator(BaseValidator): """Base validator for special commit types (merge, revert, fixup, WIP, empty).""" @@ -631,6 +738,7 @@ class ValidationEngine: "allow_fixup_commits": CommitTypeValidator, "allow_wip_commits": CommitTypeValidator, "ignore_authors": CommitTypeValidator, + "no_force_push": ForcePushValidator, } def __init__(self, rules: List[ValidationRule]): diff --git a/commit_check/imperatives.py b/commit_check/imperatives.py index 6149beb..63cbfbc 100644 --- a/commit_check/imperatives.py +++ b/commit_check/imperatives.py @@ -6,6 +6,8 @@ # nouns, but blacklisting them for this may cause false positives. IMPERATIVES = { + "abort", + "absorb", "accept", "access", "add", @@ -18,10 +20,15 @@ "archive", "assert", "assign", + "attach", "attempt", "authenticate", "authorize", + "batch", + "bind", + "block", "break", + "broadcast", "build", "bump", "cache", @@ -38,11 +45,14 @@ "combine", "commit", "compare", + "compose", + "compress", "compute", "configure", "confirm", "connect", "construct", + "consume", "control", "convert", "copy", @@ -50,9 +60,12 @@ "count", "create", "customize", + "debug", "declare", "decode", + "decompress", "decorate", + "decrypt", "decrease", "define", "delegate", @@ -60,32 +73,46 @@ "deprecate", "derive", "describe", + "detach", "detect", "determine", "disable", + "discard", + "disconnect", + "dispatch", "display", + "dispose", + "distribute", + "document", "download", "downgrade", "drop", + "duplicate", "dump", + "embed", "emit", "empty", "enable", "encapsulate", "encode", + "encrypt", "end", "enforce", "enhance", "ensure", "enumerate", + "escape", "establish", "evaluate", + "evolve", "examine", + "exclude", "execute", "exit", "expand", "expect", "export", + "expose", "extend", "extract", "feed", @@ -97,6 +124,8 @@ "fire", "fix", "flag", + "flush", + "fold", "force", "format", "forward", @@ -105,33 +134,50 @@ "get", "give", "go", + "grant", "group", + "halt", "handle", + "hash", "help", + "hide", + "highlight", "hold", "identify", + "ignore", "implement", "import", + "improve", "include", "increase", + "index", "indicate", + "infer", + "inform", "init", "initialise", "initialize", "initiate", + "inject", "input", "insert", "instantiate", + "integrate", "intercept", "introduce", + "invalidate", "invoke", + "isolate", "iterate", "join", "keep", "launch", + "link", "list", "listen", "load", + "locate", + "lock", "log", "look", "lower", @@ -140,17 +186,31 @@ "manipulate", "map", "mark", + "marshal", + "mask", "match", + "maximize", + "measure", "merge", + "migrate", + "minimize", + "mirror", "mock", "modify", "monitor", + "mount", "move", + "name", + "navigate", "normalize", "note", + "notify", "obtain", + "offset", + "omit", "open", "optimize", + "outline", "output", "organize", "orchestrate", @@ -158,40 +218,69 @@ "overwrite", "package", "pad", + "parameterize", "parse", "partial", "pass", + "pause", "perform", "persist", "pick", + "ping", + "pipe", "plot", "poll", "populate", "post", + "prefix", "prepare", "prevent", "print", "process", "produce", + "prune", "provide", "publish", "pull", + "purge", + "push", "put", "query", "raise", + "randomize", + "rank", "read", + "reassign", + "rebase", + "rebuild", + "recall", + "receive", + "recommend", + "reconcile", + "reconnect", "record", + "recover", "redesign", + "redirect", + "reduce", "refactor", "refer", + "reformat", "refresh", "register", + "reject", + "relate", + "release", "reload", + "relocate", "remove", "rename", "render", + "reorder", "reorganize", + "repeat", "replace", + "replay", "reply", "report", "represent", @@ -199,30 +288,49 @@ "require", "reset", "resolve", + "restrict", + "resume", "retrieve", + "retry", "return", + "reuse", "revert", + "revoke", "rework", + "rewrite", "roll", "rollback", "round", + "route", "run", "sample", + "sanitize", "save", "scan", + "schedule", "search", "select", "send", + "separate", "serialise", "serialize", "serve", "set", + "setup", "show", + "shuffle", "simplify", "simulate", + "skip", + "slice", + "sort", "source", + "spawn", "specify", "split", + "spread", + "squash", + "standardize", "start", "step", "stop", @@ -230,24 +338,47 @@ "strip", "submit", "subscribe", + "substitute", + "suggest", "sum", + "suppress", "support", + "suspend", "swap", + "switch", "sync", "synchronise", "synchronize", + "terminate", "take", "tear", "test", + "throw", + "throttle", "time", + "toggle", + "trace", + "track", + "transfer", "transform", "translate", "transmit", + "trigger", + "trim", "truncate", "try", + "tune", "turn", "tweak", + "unbind", + "unescape", "unify", + "unlock", + "unmarshal", + "unpack", + "unsubscribe", + "unwind", + "unwrap", "update", "upgrade", "upload", @@ -257,7 +388,12 @@ "view", "wait", "walk", + "warm", + "warn", + "wire", + "withdraw", "wrap", "write", "yield", + "zip", } diff --git a/commit_check/main.py b/commit_check/main.py index 8a77b86..bd3c6e7 100644 --- a/commit_check/main.py +++ b/commit_check/main.py @@ -2,6 +2,7 @@ from __future__ import annotations import json +import os import sys import argparse from typing import Optional @@ -32,6 +33,47 @@ def read_piped_input() -> Optional[str]: return None +def _normalize_pre_commit_branch_ref(branch: Optional[str]) -> str: + """Return a full branch ref from a pre-commit branch environment value.""" + if not branch: + return "" + if branch.startswith("refs/"): + return branch + return f"refs/heads/{branch}" + + +def _build_pre_commit_push_input() -> Optional[str]: + """Build pre-push ref data from pre-commit's pre-push environment. + + pre-commit consumes git's native pre-push stdin and exposes the active push + target through PRE_COMMIT_* variables. Convert that environment back into + the same four-field format used by git's native pre-push hook. + """ + remote_name = os.getenv("PRE_COMMIT_REMOTE_NAME") or os.getenv( + "PRE_COMMIT_REMOTE_URL" + ) + local_ref = _normalize_pre_commit_branch_ref(os.getenv("PRE_COMMIT_LOCAL_BRANCH")) + remote_ref = _normalize_pre_commit_branch_ref(os.getenv("PRE_COMMIT_REMOTE_BRANCH")) + local_sha = os.getenv("PRE_COMMIT_TO_REF") or "" + remote_sha = "" + + if not (local_ref and remote_ref and local_sha): + return None + + if remote_name: + from commit_check.util import get_remote_branch_sha + + remote_branch = remote_ref.removeprefix("refs/heads/") + remote_sha = get_remote_branch_sha(remote_name, remote_branch) + + remote_sha = remote_sha or os.getenv("PRE_COMMIT_FROM_REF") or "" + + if not remote_sha: + return None + + return f"{local_ref} {local_sha} {remote_ref} {remote_sha}" + + def _get_parser() -> argparse.ArgumentParser: """Get parser to interpret CLI args.""" parser = argparse.ArgumentParser( @@ -104,6 +146,13 @@ def _get_parser() -> argparse.ArgumentParser: required=False, ) + check_group.add_argument( + "--no-force-push", + help="check that no force push is being performed (uses pre-push hook stdin when available, otherwise checks the current branch against its upstream)", + action="store_true", + required=False, + ) + check_group.add_argument( "--format", choices=["text", "json"], @@ -342,6 +391,11 @@ def main() -> int: # Load and merge configuration from all sources: CLI > Env > TOML > Defaults config_data = ConfigMerger.from_all_sources(args, args.config) + # When --no-force-push is explicitly passed, override allow_force_push to + # False so the rule is built even if the TOML config defaults to True. + if args.no_force_push: + config_data.setdefault("push", {})["allow_force_push"] = False + # Build validation rules from config rule_builder = RuleBuilder(config_data) all_rules = rule_builder.build_all_rules() @@ -380,6 +434,8 @@ def main() -> int: requested_checks.append("author_name") if args.author_email: requested_checks.append("author_email") + if args.no_force_push: + requested_checks.append("no_force_push") # If no specific checks requested, show help if not requested_checks: @@ -406,12 +462,16 @@ def main() -> int: if not stdin_content: # No stdin and no file - let validators get data from git themselves stdin_content = None - elif not any([args.branch, args.author_name, args.author_email]): + elif not any( + [args.branch, args.author_name, args.author_email, args.no_force_push] + ): # If no specific validation type is requested, don't read stdin pass else: - # For non-message validations (branch, author), check for stdin input + # For non-message validations (branch, author, push), check for stdin input stdin_content = stdin_reader.read_piped_input() + if args.no_force_push and stdin_content is None: + stdin_content = _build_pre_commit_push_input() # Reset banner state for this run so that multiple main() calls # in the same process (e.g. tests) don't share banner state. @@ -425,6 +485,7 @@ def main() -> int: config=config_data, no_banner=getattr(args, "no_banner", False), compact=getattr(args, "compact", False), + push_upstream_fallback=args.no_force_push and stdin_content is None, ) # Run validation – choose output mode based on --format diff --git a/commit_check/rule_builder.py b/commit_check/rule_builder.py index 8850517..b072f18 100644 --- a/commit_check/rule_builder.py +++ b/commit_check/rule_builder.py @@ -2,12 +2,18 @@ from typing import Dict, Any, List, Optional from dataclasses import dataclass -from commit_check.rules_catalog import COMMIT_RULES, BRANCH_RULES, RuleCatalogEntry +from commit_check.rules_catalog import ( + COMMIT_RULES, + BRANCH_RULES, + PUSH_RULES, + RuleCatalogEntry, +) from commit_check import ( DEFAULT_COMMIT_TYPES, DEFAULT_BRANCH_TYPES, DEFAULT_BRANCH_NAMES, DEFAULT_BOOLEAN_RULES, + DEFAULT_PUSH_RULES, ) @@ -48,12 +54,14 @@ def __init__(self, config: Dict[str, Any]): self.config = config self.commit_config = config.get("commit", {}) self.branch_config = config.get("branch", {}) + self.push_config = config.get("push", {}) def build_all_rules(self) -> List[ValidationRule]: """Build all validation rules from config.""" rules = [] rules.extend(self._build_commit_rules()) rules.extend(self._build_branch_rules()) + rules.extend(self._build_push_rules()) return rules def _build_commit_rules(self) -> List[ValidationRule]: @@ -78,6 +86,40 @@ def _build_branch_rules(self) -> List[ValidationRule]: return rules + def _build_push_rules(self) -> List[ValidationRule]: + """Build push-related validation rules.""" + rules = [] + + for catalog_entry in PUSH_RULES: + rule = self._build_push_rule(catalog_entry) + if rule: + rules.append(rule) + + return rules + + def _build_push_rule( + self, catalog_entry: RuleCatalogEntry + ) -> Optional[ValidationRule]: + """Build a single push validation rule from catalog entry and config.""" + check = catalog_entry.check + + if check == "no_force_push": + allow = self.push_config.get( + "allow_force_push", DEFAULT_PUSH_RULES["allow_force_push"] + ) + # When allow_force_push is True (default), force pushes are permitted + # so no blocking rule is needed. Only build the rule when it's False. + if allow: + return None + return ValidationRule( + check=catalog_entry.check, + error=catalog_entry.error, + suggest=catalog_entry.suggest, + value=False, + ) + + return None + def _build_single_rule( self, catalog_entry: RuleCatalogEntry, section_config: Dict[str, Any] ) -> Optional[ValidationRule]: diff --git a/commit_check/rules_catalog.py b/commit_check/rules_catalog.py index 1c80682..af8eb79 100644 --- a/commit_check/rules_catalog.py +++ b/commit_check/rules_catalog.py @@ -106,6 +106,16 @@ class RuleCatalogEntry: ), ] +# Push rules +PUSH_RULES = [ + RuleCatalogEntry( + check="no_force_push", + regex=None, + error="Force push is not allowed", + suggest="Use a normal push instead of --force or --force-with-lease", + ), +] + # Branch rules BRANCH_RULES = [ RuleCatalogEntry( diff --git a/commit_check/util.py b/commit_check/util.py index b35d8e0..b5f7e69 100644 --- a/commit_check/util.py +++ b/commit_check/util.py @@ -74,6 +74,111 @@ def get_branch_name() -> str: return branch_name.strip() +def get_upstream_branch() -> str: + """Return the configured upstream ref for the current branch. + + :returns: The upstream tracking ref (e.g. ``origin/main``), or "" if none + is configured. + """ + result = subprocess.run( + ["git", "rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{upstream}"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + ) + if result.returncode == 0 and result.stdout: + return result.stdout.strip() + return "" + + +def get_upstream_remote_sha(upstream_ref: str) -> str: + """Return the current remote SHA for an upstream ref when available. + + :param upstream_ref: An upstream tracking ref (e.g. ``origin/main``). + :returns: The 40-character remote SHA, or "" if not available. + """ + parts = upstream_ref.split("/", 1) + if len(parts) != 2: + return "" + + remote_name, branch_name = parts + return get_remote_branch_sha(remote_name, branch_name) + + +def get_remote_branch_sha(remote_name: str, branch_name: str) -> str: + """Return the current remote SHA for a branch when available. + + :param remote_name: Git remote name, e.g. ``origin``. + :param branch_name: Branch name on the remote, e.g. ``main``. + :returns: The 40-character remote SHA, or "" if not available. + """ + if not remote_name or not branch_name: + return "" + + result = subprocess.run( + ["git", "ls-remote", "--exit-code", remote_name, f"refs/heads/{branch_name}"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + ) + if result.returncode != 0 or not result.stdout: + return "" + + return result.stdout.split()[0].strip() + + +def fetch_upstream_ref(upstream_ref: str) -> bool: + """Fetch an upstream branch so its tip commit is available locally. + + :param upstream_ref: An upstream tracking ref (e.g. ``origin/main``). + :returns: ``True`` if the fetch succeeded, ``False`` otherwise. + """ + parts = upstream_ref.split("/", 1) + if len(parts) != 2: + return False + + remote_name, branch_name = parts + result = subprocess.run( + ["git", "fetch", "--quiet", "--no-tags", remote_name, branch_name], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + ) + return result.returncode == 0 + + +def get_git_remotes() -> list[str]: + """Return configured git remote names.""" + result = subprocess.run( + ["git", "remote"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + ) + if result.returncode != 0 or not result.stdout: + return [] + return [line.strip() for line in result.stdout.splitlines() if line.strip()] + + +def fetch_remote_ref(remote_name: str, remote_ref: str) -> bool: + """Fetch a remote ref so its objects are available locally. + + :param remote_name: The git remote name, e.g. ``origin``. + :param remote_ref: The full ref name, e.g. ``refs/heads/main``. + :returns: ``True`` if the fetch succeeded, ``False`` otherwise. + """ + if not remote_name or not remote_ref: + return False + + result = subprocess.run( + ["git", "fetch", "--quiet", "--no-tags", remote_name, remote_ref], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + ) + return result.returncode == 0 + + def has_commits() -> bool: """Check if there are any commits in the current branch. :returns: `True` if there are commits, `False` otherwise. diff --git a/docs/changelog.rst b/docs/changelog.rst index 71c7cce..398bae5 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -5,6 +5,29 @@ All **notable changes** to this project will be documented in this file. Full changelog available at `GitHub releases `_. +v2.7.0 (unreleased) +------------------- + +New Features +~~~~~~~~~~~~ + +* **Force push detection and blocking** — Added ``--no-force-push`` CLI flag and + ``check-no-force-push`` pre-push hook that inspect pushed ref ancestry via + ``git merge-base --is-ancestor`` to detect and block ``git push --force`` and + ``git push -f``. A new ``[push]`` TOML config section with + ``allow_force_push`` (default ``true``) controls the behavior. Environment + variable ``CCHK_ALLOW_FORCE_PUSH`` is also supported. + +* **``validate_push()`` API** — New ``commit_check.api.validate_push()`` + function for programmatic push safety checks, matching the ``--no-force-push`` + CLI behavior without spawning a subprocess. + +* **Standalone mode** — When ``--no-force-push`` is run outside a pre-push hook + (no stdin), it checks whether pushing ``HEAD`` to its configured upstream + would require force, using ``git ls-remote`` and optional ``git fetch`` to + resolve the remote commit. + + v2.6.0 (2026-04-20) ------------------- diff --git a/docs/configuration.rst b/docs/configuration.rst index c6a95b1..f746daf 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -118,6 +118,10 @@ Example Configuration # required_signoff_name = "Your Name" # Optional # required_signoff_email = "your.email@example.com" # Optional + [push] + # Block force pushes when used as a pre-push hook or with --no-force-push + allow_force_push = true # Set to false to block force pushes + [branch] # https://conventional-branch.github.io/ conventional_branch = true @@ -269,6 +273,9 @@ Configuration can also be set via environment variables with the ``CCHK_`` prefi * - ``require_rebase_target = "main"`` - ``CCHK_REQUIRE_REBASE_TARGET=main`` - ``--require-rebase-target=main`` + * - ``allow_force_push = true`` + - ``CCHK_ALLOW_FORCE_PUSH=false`` + - ``--no-force-push`` (enable via ``--no-force-push`` flag) * - ``ignore_authors = ["bot"]`` (in branch section) - ``CCHK_BRANCH_IGNORE_AUTHORS=bot,user`` - ``--branch-ignore-authors=bot,user`` @@ -394,6 +401,11 @@ Options Table Description - str - None (no requirement) - Target branch for rebase requirement. If not set, no rebase validation is performed. + * - push + - allow_force_push + - bool + - true + - Allow force pushes. Set to ``false`` to block force pushes when used as a pre-push hook or with ``--no-force-push``. * - branch - ignore_authors - list[str] diff --git a/docs/example.rst b/docs/example.rst index 8040f9f..72c7de5 100644 --- a/docs/example.rst +++ b/docs/example.rst @@ -153,6 +153,27 @@ Branch Validation Examples # - hotfix/security-patch # - release/v1.2.0 +Push Validation Examples +~~~~~~~~~~~~~~~~~~~~~~~~ + +.. code-block:: bash + + # Check whether pushing HEAD to its configured upstream would require force + commit-check --no-force-push + +.. code-block:: yaml + + # Configure the dedicated pre-push hook + - repo: https://github.com/commit-check/commit-check + rev: the tag or revision + hooks: + - id: check-no-force-push + stages: [pre-push] + +``git push | commit-check --no-force-push`` is not a prevention mechanism. The +push has already started, and normal ``git push`` output does not include the +pre-push ref lines that Git provides to hooks. + Author Validation Examples ~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/docs/what-is-new.rst b/docs/what-is-new.rst index cb89995..a94421b 100644 --- a/docs/what-is-new.rst +++ b/docs/what-is-new.rst @@ -3,6 +3,64 @@ What's New This document highlights the major changes and improvements in each version of commit-check. +Version 2.7.0 — Force Push Blocking +----------------------------------- + +Force Push Detection and Prevention +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +commit-check now includes a **force push detection** feature that blocks +accidental ``git push --force`` / ``git push -f`` by inspecting pushed ref +ancestry via ``git merge-base --is-ancestor``. + +**How it works:** + +* Runs inside a Git ``pre-push`` hook — receives pushed ref metadata on stdin + and inspects the ancestry relationship. +* New branch pushes (remote SHA is all zeros) always pass. +* Fast-forward pushes (remote is ancestor of local) pass. +* When the remote commit is **not** an ancestor of the local commit, a force + push is detected and **blocked**. +* Git errors (e.g., unknown SHA) result in a safe pass. + +**Usage:** + +.. code-block:: bash + + # Standalone: check whether pushing HEAD to its upstream requires force + commit-check --no-force-push + +.. code-block:: yaml + + # As a pre-commit pre-push hook + repos: + - repo: https://github.com/commit-check/commit-check + rev: v2.7.0 + hooks: + - id: check-no-force-push + stages: [pre-push] + +.. code-block:: toml + + # Configurable in cchk.toml + [push] + allow_force_push = false # default: true (force pushes allowed) + +**New Python API:** + +.. code-block:: python + + from commit_check.api import validate_push + + zero = "0000000000000000000000000000000000000000" + result = validate_push(f"refs/heads/main abc123 refs/heads/main {zero}") + print(result["status"]) # "pass" + +See the `Push Safety section in README `_ +and `Push Validation Examples `_ +for more details. + + Version 2.6.0 — Output Controls for CLI Workflows ------------------------------------------------- diff --git a/tests/api_test.py b/tests/api_test.py index 4d7692e..9d8a374 100644 --- a/tests/api_test.py +++ b/tests/api_test.py @@ -7,6 +7,7 @@ validate_branch, validate_author, validate_all, + validate_push, ) @@ -266,3 +267,70 @@ def test_author_validation_included(self): check_names = {c["check"] for c in result["checks"]} assert "author_name" in check_names assert "author_email" in check_names + + +class TestValidatePush: + """Tests for validate_push() – the programmatic push safety API.""" + + ZERO_SHA = "0000000000000000000000000000000000000000" + + @pytest.mark.benchmark + def test_new_branch_push_passes(self): + """Push to a new (zero-SHA) remote ref returns status='pass'.""" + push_info = f"refs/heads/feature/x abc1 refs/heads/feature/x {self.ZERO_SHA}" + result = validate_push(push_info) + assert result["status"] == "pass" + checks = result["checks"] + assert any(c["check"] == "no_force_push" for c in checks) + + @pytest.mark.benchmark + def test_fast_forward_push_passes(self): + """A fast-forward push (ancestor check returns 0) passes.""" + push_info = "refs/heads/main abc123 refs/heads/main def456" + with patch("commit_check.engine.git_merge_base", return_value=0): + result = validate_push(push_info) + assert result["status"] == "pass" + + @pytest.mark.benchmark + def test_force_push_fails(self): + """A force push (ancestor check returns 1) returns status='fail'.""" + push_info = "refs/heads/main abc1 refs/heads/main def2" + with patch("commit_check.engine.git_merge_base", return_value=1): + result = validate_push(push_info) + assert result["status"] == "fail" + failed = [c for c in result["checks"] if c["status"] == "fail"] + assert len(failed) >= 1 + assert failed[0]["check"] == "no_force_push" + assert "Force push" in failed[0]["error"] + + @pytest.mark.benchmark + def test_none_push_refs_passes(self): + """Calling with push_refs=None (no stdin) returns pass.""" + result = validate_push(None) + assert result["status"] == "pass" + + @pytest.mark.benchmark + def test_custom_config_is_merged(self): + """Custom config overrides are applied.""" + push_info = f"refs/heads/main abc1 refs/heads/main {self.ZERO_SHA}" + # Even with allow_force_push=True in user config, validate_push + # always forces it to False so blocking is always active. + result = validate_push( + push_info, + config={"push": {"allow_force_push": True}}, + ) + assert result["status"] == "pass" + + @pytest.mark.benchmark + def test_result_has_expected_structure(self): + """Result dict has 'status' and 'checks' with correct keys.""" + push_info = f"refs/heads/main abc1 refs/heads/main {self.ZERO_SHA}" + result = validate_push(push_info) + assert "status" in result + assert "checks" in result + for c in result["checks"]: + assert "check" in c + assert "status" in c + assert "value" in c + assert "error" in c + assert "suggest" in c diff --git a/tests/engine_test.py b/tests/engine_test.py index 9e6ce21..b3e3121 100644 --- a/tests/engine_test.py +++ b/tests/engine_test.py @@ -19,6 +19,7 @@ SubjectCapitalizationValidator, BodyValidator, MergeBaseValidator, + ForcePushValidator, ) from commit_check.rule_builder import ValidationRule @@ -1102,3 +1103,330 @@ def test_author_email_uses_git_config_when_available(self): ): result = validator.validate(context) assert result == ValidationResult.PASS + + +class TestForcePushValidator: + """Tests for the ForcePushValidator class.""" + + ZERO_SHA = "0000000000000000000000000000000000000000" + + def _make_rule(self): + return ValidationRule( + check="no_force_push", + error="Force push is not allowed", + suggest="Use a normal push instead of --force or --force-with-lease", + value=False, + ) + + @pytest.mark.benchmark + def test_no_stdin_skips_validation(self): + """Validator passes when no stdin is provided (not a pre-push context).""" + rule = self._make_rule() + validator = ForcePushValidator(rule) + context = ValidationContext() # stdin_text=None + + result = validator.validate(context) + assert result == ValidationResult.PASS + + @pytest.mark.benchmark + def test_no_stdin_with_upstream_fallback_passes_without_upstream(self): + """Standalone mode passes when the current branch has no upstream.""" + rule = self._make_rule() + validator = ForcePushValidator(rule) + context = ValidationContext(push_upstream_fallback=True) + + with patch("commit_check.engine.get_upstream_branch", return_value=""): + result = validator.validate(context) + + assert result == ValidationResult.PASS + + @pytest.mark.benchmark + def test_no_stdin_with_upstream_fallback_passes_fast_forward(self): + """Standalone mode passes when upstream is an ancestor of HEAD.""" + rule = self._make_rule() + validator = ForcePushValidator(rule) + context = ValidationContext(push_upstream_fallback=True) + + with patch( + "commit_check.engine.get_upstream_branch", return_value="origin/main" + ): + with patch( + "commit_check.engine.get_upstream_remote_sha", return_value="abc123" + ): + with patch("commit_check.engine.git_merge_base", return_value=0): + result = validator.validate(context) + + assert result == ValidationResult.PASS + + @pytest.mark.benchmark + def test_no_stdin_with_upstream_fallback_uses_tracking_ref_when_remote_sha_missing( + self, + ): + """Standalone mode uses local tracking ref if ls-remote lookup fails.""" + rule = self._make_rule() + validator = ForcePushValidator(rule) + context = ValidationContext(push_upstream_fallback=True) + + with patch( + "commit_check.engine.get_upstream_branch", return_value="origin/main" + ): + with patch("commit_check.engine.get_upstream_remote_sha", return_value=""): + with patch( + "commit_check.engine.git_merge_base", return_value=0 + ) as mock_merge: + result = validator.validate(context) + + mock_merge.assert_called_once_with("origin/main", "HEAD") + assert result == ValidationResult.PASS + + @pytest.mark.benchmark + def test_no_stdin_with_upstream_fallback_blocks_force_push(self): + """Standalone mode fails when pushing HEAD to upstream requires force.""" + rule = self._make_rule() + validator = ForcePushValidator(rule) + context = ValidationContext(push_upstream_fallback=True) + + with patch( + "commit_check.engine.get_upstream_branch", return_value="origin/main" + ): + with patch( + "commit_check.engine.get_upstream_remote_sha", return_value="deadbeef" + ): + with patch("commit_check.engine.get_branch_name", return_value="main"): + with patch( + "commit_check.engine.git_merge_base", return_value=1 + ) as mock_merge: + with patch("commit_check.util._print_failure"): + result = validator.validate(context) + + mock_merge.assert_called_once_with("deadbeef", "HEAD") + assert result == ValidationResult.FAIL + + @pytest.mark.benchmark + def test_no_stdin_with_upstream_fallback_fetches_remote_commit_when_needed(self): + """Standalone mode fetches the upstream commit if not local yet.""" + rule = self._make_rule() + validator = ForcePushValidator(rule) + context = ValidationContext(push_upstream_fallback=True) + + with patch( + "commit_check.engine.get_upstream_branch", return_value="origin/main" + ): + with patch( + "commit_check.engine.get_upstream_remote_sha", return_value="deadbeef" + ): + with patch("commit_check.engine.get_branch_name", return_value="main"): + with patch( + "commit_check.engine.git_merge_base", side_effect=[128, 1] + ) as mock_merge: + with patch( + "commit_check.engine.fetch_upstream_ref", return_value=True + ) as mock_fetch: + with patch("commit_check.util._print_failure"): + result = validator.validate(context) + + mock_fetch.assert_called_once_with("origin/main") + assert mock_merge.call_count == 2 + assert result == ValidationResult.FAIL + + @pytest.mark.benchmark + def test_new_branch_push_is_allowed(self): + """A push to a new (non-existent) remote branch is not a force push.""" + rule = self._make_rule() + validator = ForcePushValidator(rule) + push_info = ( + f"refs/heads/feature/new abc123 refs/heads/feature/new {self.ZERO_SHA}" + ) + context = ValidationContext(stdin_text=push_info) + + result = validator.validate(context) + assert result == ValidationResult.PASS + + @pytest.mark.benchmark + def test_fast_forward_push_is_allowed(self): + """A normal fast-forward push (remote is ancestor of local) is allowed.""" + rule = self._make_rule() + validator = ForcePushValidator(rule) + push_info = "refs/heads/main abc123 refs/heads/main def456" + context = ValidationContext(stdin_text=push_info) + + with patch("commit_check.engine.git_merge_base", return_value=0): + result = validator.validate(context) + + assert result == ValidationResult.PASS + + @pytest.mark.benchmark + def test_force_push_is_blocked(self): + """A force push (remote is NOT ancestor of local) is blocked.""" + rule = self._make_rule() + validator = ForcePushValidator(rule) + push_info = "refs/heads/main abc123 refs/heads/main def456" + context = ValidationContext(stdin_text=push_info) + + with patch("commit_check.engine.git_merge_base", return_value=1): + with patch("commit_check.util._print_failure"): + result = validator.validate(context) + + assert result == ValidationResult.FAIL + + @pytest.mark.benchmark + def test_git_error_allows_push(self): + """When git cannot determine ancestry after fetch failure, push is allowed.""" + rule = self._make_rule() + validator = ForcePushValidator(rule) + push_info = "refs/heads/main abc123 refs/heads/main def456" + context = ValidationContext(stdin_text=push_info) + + with patch("commit_check.engine.git_merge_base", return_value=128): + with patch( + "commit_check.engine.fetch_remote_ref", return_value=False + ) as mock_fetch: + with patch( + "commit_check.engine.get_git_remotes", return_value=["origin"] + ): + with patch( + "commit_check.engine.get_upstream_branch", return_value="" + ): + result = validator.validate(context) + + mock_fetch.assert_called_once_with("origin", "refs/heads/main") + assert result == ValidationResult.PASS + + @pytest.mark.benchmark + def test_missing_remote_sha_is_fetched_then_force_push_is_blocked(self): + """A missing remote SHA is fetched before deciding the push is safe.""" + rule = self._make_rule() + validator = ForcePushValidator(rule) + push_info = "refs/heads/main abc123 refs/heads/main def456" + context = ValidationContext(stdin_text=push_info) + + with patch( + "commit_check.engine.git_merge_base", side_effect=[128, 1] + ) as mock_merge: + with patch("commit_check.engine.get_upstream_branch", return_value=""): + with patch( + "commit_check.engine.get_git_remotes", return_value=["origin"] + ): + with patch( + "commit_check.engine.fetch_remote_ref", return_value=True + ) as mock_fetch: + with patch("commit_check.util._print_failure"): + result = validator.validate(context) + + assert mock_merge.call_count == 2 + mock_fetch.assert_called_once_with("origin", "refs/heads/main") + assert result == ValidationResult.FAIL + + @pytest.mark.benchmark + def test_missing_remote_sha_fetch_prefers_matching_upstream_remote(self): + """The matching upstream remote is fetched before other remotes.""" + rule = self._make_rule() + validator = ForcePushValidator(rule) + push_info = "refs/heads/main abc123 refs/heads/main def456" + context = ValidationContext(stdin_text=push_info) + + with patch("commit_check.engine.git_merge_base", side_effect=[128, 0]): + with patch( + "commit_check.engine.get_upstream_branch", return_value="upstream/main" + ): + with patch( + "commit_check.engine.get_git_remotes", + return_value=["origin", "upstream"], + ): + with patch( + "commit_check.engine.fetch_remote_ref", return_value=True + ) as mock_fetch: + result = validator.validate(context) + + mock_fetch.assert_called_once_with("upstream", "refs/heads/main") + assert result == ValidationResult.PASS + + @pytest.mark.benchmark + def test_missing_remote_sha_tries_next_remote_until_resolved(self): + """Fetching one remote is not enough if it did not contain the SHA.""" + rule = self._make_rule() + validator = ForcePushValidator(rule) + push_info = "refs/heads/main abc123 refs/heads/main def456" + context = ValidationContext(stdin_text=push_info) + + with patch( + "commit_check.engine.git_merge_base", side_effect=[128, 128, 1] + ) as mock_merge: + with patch("commit_check.engine.get_upstream_branch", return_value=""): + with patch( + "commit_check.engine.get_git_remotes", + return_value=["origin", "upstream"], + ): + with patch( + "commit_check.engine.fetch_remote_ref", return_value=True + ) as mock_fetch: + with patch("commit_check.util._print_failure"): + result = validator.validate(context) + + assert mock_merge.call_count == 3 + assert [call.args for call in mock_fetch.call_args_list] == [ + ("origin", "refs/heads/main"), + ("upstream", "refs/heads/main"), + ] + assert result == ValidationResult.FAIL + + @pytest.mark.benchmark + def test_empty_lines_in_stdin_are_skipped(self): + """Empty lines in push info do not cause errors.""" + rule = self._make_rule() + validator = ForcePushValidator(rule) + push_info = "\n\nrefs/heads/main abc123 refs/heads/main def456\n\n" + context = ValidationContext(stdin_text=push_info) + + with patch("commit_check.engine.git_merge_base", return_value=0): + result = validator.validate(context) + + assert result == ValidationResult.PASS + + @pytest.mark.benchmark + def test_malformed_push_line_is_skipped(self): + """Lines that do not have 4 fields are silently skipped.""" + rule = self._make_rule() + validator = ForcePushValidator(rule) + push_info = "only two fields" + context = ValidationContext(stdin_text=push_info) + + result = validator.validate(context) + assert result == ValidationResult.PASS + + @pytest.mark.benchmark + def test_multiple_refs_one_force_push_blocks(self): + """If any pushed ref is a force push, the whole check fails.""" + rule = self._make_rule() + validator = ForcePushValidator(rule) + push_info = ( + f"refs/heads/feature/ok abc1 refs/heads/feature/ok {self.ZERO_SHA}\n" + "refs/heads/main abc2 refs/heads/main def2" + ) + context = ValidationContext(stdin_text=push_info) + + # Allow new branch, but force push on second line + def side_effect(remote_sha, local_sha): + if remote_sha == self.ZERO_SHA: + return 0 + return 1 + + with patch("commit_check.engine.git_merge_base", side_effect=side_effect): + with patch("commit_check.util._print_failure"): + result = validator.validate(context) + + assert result == ValidationResult.FAIL + + @pytest.mark.benchmark + def test_validation_engine_includes_force_push_validator(self): + """ValidationEngine maps 'no_force_push' to ForcePushValidator.""" + assert "no_force_push" in ValidationEngine.VALIDATOR_MAP + assert ValidationEngine.VALIDATOR_MAP["no_force_push"] is ForcePushValidator + + @pytest.mark.benchmark + def test_validation_context_push_upstream_fallback(self): + """ValidationContext supports push_upstream_fallback field.""" + ctx = ValidationContext(push_upstream_fallback=True) + assert ctx.push_upstream_fallback is True + ctx2 = ValidationContext() + assert ctx2.push_upstream_fallback is False diff --git a/tests/main_test.py b/tests/main_test.py index c379e36..7c3872d 100644 --- a/tests/main_test.py +++ b/tests/main_test.py @@ -1,9 +1,15 @@ import json +import subprocess import sys import pytest import tempfile import os -from commit_check.main import StdinReader, _get_message_content, main +from commit_check.main import ( + StdinReader, + _build_pre_commit_push_input, + _get_message_content, + main, +) CMD = "commit-check" @@ -761,3 +767,194 @@ def test_compact_passes_valid_commit(self, mocker): sys.argv = [CMD, "-m", "--compact"] assert main() == 0 + + +class TestNoForcePushFlag: + """Tests for the --no-force-push CLI flag.""" + + ZERO_SHA = "0000000000000000000000000000000000000000" + + @pytest.mark.benchmark + def test_no_force_push_new_branch_passes(self, mocker): + """Push to a new remote branch (zero SHA) always passes.""" + push_info = ( + f"refs/heads/feature/new abc123 refs/heads/feature/new {self.ZERO_SHA}" + ) + mocker.patch("sys.stdin.isatty", return_value=False) + mocker.patch("sys.stdin.read", return_value=push_info) + + sys.argv = [CMD, "--no-force-push"] + assert main() == 0 + + @pytest.mark.benchmark + def test_no_force_push_fast_forward_passes(self, mocker): + """Fast-forward push (remote is ancestor of local) passes.""" + push_info = "refs/heads/main abc123 refs/heads/main def456" + mocker.patch("sys.stdin.isatty", return_value=False) + mocker.patch("sys.stdin.read", return_value=push_info) + mocker.patch("commit_check.engine.git_merge_base", return_value=0) + + sys.argv = [CMD, "--no-force-push"] + assert main() == 0 + + @pytest.mark.benchmark + def test_no_force_push_force_push_fails(self, mocker): + """Force push (remote is not ancestor of local) fails.""" + push_info = "refs/heads/main abc123 refs/heads/main def456" + mocker.patch("sys.stdin.isatty", return_value=False) + mocker.patch("sys.stdin.read", return_value=push_info) + mocker.patch("commit_check.engine.git_merge_base", return_value=1) + + sys.argv = [CMD, "--no-force-push"] + assert main() == 1 + + @pytest.mark.benchmark + def test_no_force_push_no_stdin_passes(self, mocker): + """When no stdin and no upstream are available, the check is skipped.""" + mocker.patch("sys.stdin.isatty", return_value=True) + mocker.patch("commit_check.engine.get_upstream_branch", return_value="") + + sys.argv = [CMD, "--no-force-push"] + assert main() == 0 + + @pytest.mark.benchmark + def test_no_force_push_no_stdin_uses_upstream_fallback(self, mocker): + """Without stdin, the CLI falls back to checking the current upstream.""" + mocker.patch("sys.stdin.isatty", return_value=True) + mocker.patch( + "commit_check.engine.get_upstream_branch", return_value="origin/main" + ) + mocker.patch("commit_check.engine.git_merge_base", return_value=0) + + sys.argv = [CMD, "--no-force-push"] + assert main() == 0 + + @pytest.mark.benchmark + def test_no_force_push_no_stdin_blocks_non_fast_forward_upstream(self, mocker): + """Without stdin, a non-fast-forward upstream relationship fails.""" + mocker.patch("sys.stdin.isatty", return_value=True) + mocker.patch( + "commit_check.engine.get_upstream_branch", return_value="origin/main" + ) + mocker.patch("commit_check.engine.get_branch_name", return_value="main") + mocker.patch("commit_check.engine.git_merge_base", return_value=1) + + sys.argv = [CMD, "--no-force-push"] + assert main() == 1 + + @pytest.mark.benchmark + def test_no_force_push_uses_pre_commit_env_before_upstream(self, mocker): + """pre-commit pre-push metadata drives the check when stdin is unavailable.""" + mocker.patch("sys.stdin.isatty", return_value=True) + mocker.patch.dict( + os.environ, + { + "PRE_COMMIT_LOCAL_BRANCH": "feature/topic", + "PRE_COMMIT_REMOTE_BRANCH": "main", + "PRE_COMMIT_TO_REF": "local-sha", + "PRE_COMMIT_FROM_REF": "remote-sha", + }, + clear=True, + ) + mock_merge = mocker.patch("commit_check.engine.git_merge_base", return_value=1) + mock_upstream = mocker.patch("commit_check.engine.get_upstream_branch") + + sys.argv = [CMD, "--no-force-push"] + assert main() == 1 + + mock_merge.assert_called_once_with("remote-sha", "local-sha") + mock_upstream.assert_not_called() + + @pytest.mark.benchmark + def test_no_force_push_pre_commit_env_fetches_remote_sha(self, mocker): + """pre-commit metadata can resolve the remote tip when FROM_REF is absent.""" + mocker.patch("sys.stdin.isatty", return_value=True) + mocker.patch.dict( + os.environ, + { + "PRE_COMMIT_REMOTE_NAME": "upstream", + "PRE_COMMIT_LOCAL_BRANCH": "feature/topic", + "PRE_COMMIT_REMOTE_BRANCH": "main", + "PRE_COMMIT_TO_REF": "local-sha", + }, + clear=True, + ) + mock_run = mocker.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + returncode=0, + stdout="remote-sha\trefs/heads/main\n", + stderr="", + ), + ) + mock_merge = mocker.patch("commit_check.engine.git_merge_base", return_value=0) + + sys.argv = [CMD, "--no-force-push"] + assert main() == 0 + + mock_run.assert_called_once_with( + ["git", "ls-remote", "--exit-code", "upstream", "refs/heads/main"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + ) + mock_merge.assert_called_once_with("remote-sha", "local-sha") + + @pytest.mark.benchmark + def test_build_pre_commit_push_input_normalizes_branch_names(self, mocker): + """pre-commit branch names are converted to git pre-push ref rows.""" + mocker.patch.dict( + os.environ, + { + "PRE_COMMIT_LOCAL_BRANCH": "feature/topic", + "PRE_COMMIT_REMOTE_BRANCH": "main", + "PRE_COMMIT_TO_REF": "local-sha", + "PRE_COMMIT_FROM_REF": "remote-sha", + }, + clear=True, + ) + + assert ( + _build_pre_commit_push_input() + == "refs/heads/feature/topic local-sha refs/heads/main remote-sha" + ) + + @pytest.mark.benchmark + def test_build_pre_commit_push_input_prefers_remote_sha(self, mocker): + """The real remote tip is preferred over pre-commit's FROM_REF range.""" + mocker.patch.dict( + os.environ, + { + "PRE_COMMIT_REMOTE_NAME": "upstream", + "PRE_COMMIT_LOCAL_BRANCH": "feature/topic", + "PRE_COMMIT_REMOTE_BRANCH": "main", + "PRE_COMMIT_TO_REF": "local-sha", + "PRE_COMMIT_FROM_REF": "range-base-sha", + }, + clear=True, + ) + mocker.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + returncode=0, + stdout="remote-sha\trefs/heads/main\n", + stderr="", + ), + ) + + assert ( + _build_pre_commit_push_input() + == "refs/heads/feature/topic local-sha refs/heads/main remote-sha" + ) + + @pytest.mark.benchmark + def test_no_force_push_flag_in_help(self, capfd): + """The --no-force-push flag appears in help output.""" + sys.argv = [CMD, "--help"] + with pytest.raises(SystemExit): + main() + out, _ = capfd.readouterr() + assert "--no-force-push" in out + assert "current branch against its upstream" in out diff --git a/tests/rule_builder_test.py b/tests/rule_builder_test.py index 460e471..97df8bb 100644 --- a/tests/rule_builder_test.py +++ b/tests/rule_builder_test.py @@ -226,3 +226,43 @@ def test_rule_builder_allow_branch_names_with_duplicates(self): allowed_names = builder._get_allowed_branch_names() # Should deduplicate while preserving order assert allowed_names == ["develop", "staging"] + + +class TestPushRuleBuilder: + """Tests for push rule building.""" + + @pytest.mark.benchmark + def test_push_rule_not_built_when_force_push_allowed(self): + """No rule is built when allow_force_push is True (default).""" + config = {"push": {"allow_force_push": True}} + builder = RuleBuilder(config) + rules = builder.build_all_rules() + push_rules = [r for r in rules if r.check == "no_force_push"] + assert len(push_rules) == 0 + + @pytest.mark.benchmark + def test_push_rule_built_when_force_push_disabled(self): + """A rule is built when allow_force_push is False.""" + config = {"push": {"allow_force_push": False}} + builder = RuleBuilder(config) + rules = builder.build_all_rules() + push_rules = [r for r in rules if r.check == "no_force_push"] + assert len(push_rules) == 1 + assert push_rules[0].error == "Force push is not allowed" + assert push_rules[0].suggest is not None + + @pytest.mark.benchmark + def test_push_rule_not_built_by_default(self): + """No rule is built with empty config (default: allow force push).""" + builder = RuleBuilder({}) + rules = builder.build_all_rules() + push_rules = [r for r in rules if r.check == "no_force_push"] + assert len(push_rules) == 0 + + @pytest.mark.benchmark + def test_push_rule_unknown_check_returns_none(self): + """A push rule with an unrecognized check name returns None.""" + builder = RuleBuilder({}) + unknown_entry = RuleCatalogEntry(check="unknown_push_check") + rule = builder._build_push_rule(unknown_entry) + assert rule is None diff --git a/tests/util_test.py b/tests/util_test.py index cfa4201..ecab6f7 100644 --- a/tests/util_test.py +++ b/tests/util_test.py @@ -4,7 +4,13 @@ import os from pathlib import Path, PurePath from commit_check.util import ( + fetch_remote_ref, + fetch_upstream_ref, get_branch_name, + get_git_remotes, + get_remote_branch_sha, + get_upstream_branch, + get_upstream_remote_sha, has_commits, git_merge_base, get_commit_info, @@ -129,6 +135,270 @@ def test_has_commits_false(self, mocker): } assert retval is False + class TestGetUpstreamBranch: + @pytest.mark.benchmark + def test_get_upstream_branch(self, mocker): + mock_run = mocker.patch( + "subprocess.run", + return_value=type( + "MockResult", + (), + {"stdout": "origin/main\n", "stderr": "", "returncode": 0}, + )(), + ) + + result = get_upstream_branch() + + mock_run.assert_called_once_with( + [ + "git", + "rev-parse", + "--abbrev-ref", + "--symbolic-full-name", + "@{upstream}", + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + ) + assert result == "origin/main" + + @pytest.mark.benchmark + def test_get_upstream_branch_missing(self, mocker): + mocker.patch( + "subprocess.run", + return_value=type( + "MockResult", + (), + { + "stdout": "", + "stderr": "fatal: no upstream", + "returncode": 128, + }, + )(), + ) + + assert get_upstream_branch() == "" + + class TestGetUpstreamRemoteSha: + @pytest.mark.benchmark + def test_get_upstream_remote_sha(self, mocker): + mock_run = mocker.patch( + "subprocess.run", + return_value=type( + "MockResult", + (), + { + "stdout": "abc123\trefs/heads/main\n", + "stderr": "", + "returncode": 0, + }, + )(), + ) + + result = get_upstream_remote_sha("origin/main") + + mock_run.assert_called_once_with( + ["git", "ls-remote", "--exit-code", "origin", "refs/heads/main"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + ) + assert result == "abc123" + + @pytest.mark.benchmark + def test_get_upstream_remote_sha_with_nested_branch(self, mocker): + mocker.patch( + "subprocess.run", + return_value=type( + "MockResult", + (), + { + "stdout": "def456\trefs/heads/feature/topic\n", + "stderr": "", + "returncode": 0, + }, + )(), + ) + + assert get_upstream_remote_sha("origin/feature/topic") == "def456" + + @pytest.mark.benchmark + def test_get_upstream_remote_sha_missing(self, mocker): + mocker.patch( + "subprocess.run", + return_value=type( + "MockResult", + (), + {"stdout": "", "stderr": "fatal", "returncode": 2}, + )(), + ) + + assert get_upstream_remote_sha("origin/main") == "" + + @pytest.mark.benchmark + def test_get_upstream_remote_sha_invalid_ref(self, mocker): + mock_run = mocker.patch("subprocess.run") + + assert get_upstream_remote_sha("main") == "" + mock_run.assert_not_called() + + class TestGetRemoteBranchSha: + @pytest.mark.benchmark + def test_get_remote_branch_sha(self, mocker): + mock_run = mocker.patch( + "subprocess.run", + return_value=type( + "MockResult", + (), + { + "stdout": "abc123\trefs/heads/main\n", + "stderr": "", + "returncode": 0, + }, + )(), + ) + + result = get_remote_branch_sha("origin", "main") + + mock_run.assert_called_once_with( + ["git", "ls-remote", "--exit-code", "origin", "refs/heads/main"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + ) + assert result == "abc123" + + @pytest.mark.benchmark + @pytest.mark.parametrize( + "remote_name,branch_name", + [ + ("", "main"), + ("origin", ""), + ], + ) + def test_get_remote_branch_sha_invalid_args( + self, mocker, remote_name, branch_name + ): + mock_run = mocker.patch("subprocess.run") + + assert get_remote_branch_sha(remote_name, branch_name) == "" + mock_run.assert_not_called() + + class TestFetchUpstreamRef: + @pytest.mark.benchmark + def test_fetch_upstream_ref(self, mocker): + mock_run = mocker.patch( + "subprocess.run", + return_value=type( + "MockResult", + (), + {"stdout": "", "stderr": "", "returncode": 0}, + )(), + ) + + assert fetch_upstream_ref("origin/main") is True + mock_run.assert_called_once_with( + ["git", "fetch", "--quiet", "--no-tags", "origin", "main"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + ) + + @pytest.mark.benchmark + def test_fetch_upstream_ref_failure(self, mocker): + mocker.patch( + "subprocess.run", + return_value=type( + "MockResult", + (), + {"stdout": "", "stderr": "fatal", "returncode": 1}, + )(), + ) + + assert fetch_upstream_ref("origin/main") is False + + @pytest.mark.benchmark + def test_fetch_upstream_ref_invalid_ref(self, mocker): + mock_run = mocker.patch("subprocess.run") + + assert fetch_upstream_ref("main") is False + mock_run.assert_not_called() + + class TestGetGitRemotes: + @pytest.mark.benchmark + def test_get_git_remotes(self, mocker): + mocker.patch( + "subprocess.run", + return_value=type( + "MockResult", + (), + {"stdout": "origin\nupstream\n", "stderr": "", "returncode": 0}, + )(), + ) + + assert get_git_remotes() == ["origin", "upstream"] + + @pytest.mark.benchmark + def test_get_git_remotes_failure(self, mocker): + mocker.patch( + "subprocess.run", + return_value=type( + "MockResult", + (), + {"stdout": "", "stderr": "fatal", "returncode": 128}, + )(), + ) + + assert get_git_remotes() == [] + + class TestFetchRemoteRef: + @pytest.mark.benchmark + def test_fetch_remote_ref(self, mocker): + mock_run = mocker.patch( + "subprocess.run", + return_value=type( + "MockResult", + (), + {"stdout": "", "stderr": "", "returncode": 0}, + )(), + ) + + assert fetch_remote_ref("origin", "refs/heads/main") is True + mock_run.assert_called_once_with( + ["git", "fetch", "--quiet", "--no-tags", "origin", "refs/heads/main"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + ) + + @pytest.mark.benchmark + def test_fetch_remote_ref_failure(self, mocker): + mocker.patch( + "subprocess.run", + return_value=type( + "MockResult", + (), + {"stdout": "", "stderr": "fatal", "returncode": 1}, + )(), + ) + + assert fetch_remote_ref("origin", "refs/heads/main") is False + + @pytest.mark.benchmark + @pytest.mark.parametrize( + "remote_name,remote_ref", + [ + ("", "refs/heads/main"), + ("origin", ""), + ], + ) + def test_fetch_remote_ref_invalid_args(self, mocker, remote_name, remote_ref): + mock_run = mocker.patch("subprocess.run") + + assert fetch_remote_ref(remote_name, remote_ref) is False + mock_run.assert_not_called() + class TestGitMergeBase: @pytest.mark.benchmark @pytest.mark.parametrize(