import os import subprocess import traceback from abc import ABC, abstractmethod from datetime import datetime from typing import Tuple from PyQt5.QtCore import pyqtSignal, QThreadPool, QObject, QRunnable from src.callback import CallbackScriptBuilder class Command(ABC): def __init__(self, repo_path: str): super().__init__() self.repo_path = repo_path @abstractmethod def execute(self, ): """执行命令""" pass class WorkerSignals(QObject): finished = pyqtSignal(bool, str) # success, output class CommandWorker(QRunnable): def __init__(self, command, callback=None): super().__init__() self.command = command self.callback = callback self.signals = WorkerSignals() if callback: self.signals.finished.connect(callback) def run(self): """在工作线程中执行""" try: print(f"执行命令: {self.command.__class__.__name__}") success, output = self.command.execute() self.signals.finished.emit(success, output) except Exception as e: error_msg = f"命令执行异常: {str(e)}\n{traceback.format_exc()}" print(error_msg) self.signals.finished.emit(False, error_msg) # 获取指定分支的提交信息 class GetBranchCommitsCommand(Command): def __init__(self, repo_path: str, branch_name: str): """ :param repo_path:仓库路径 :param branch_name:分支名称 """ super().__init__(repo_path) self.branch_name = branch_name def execute(self): try: print("获取指定分支的提交信息") result = subprocess.run([ "git", "log", "--pretty=format:%h %an <%ae> %ad %s %d", "--date=iso", self.branch_name ], cwd=self.repo_path, encoding='utf-8', errors='replace', capture_output=True, text=True) if result.returncode == 0: return True, result.stdout else: return False, result.stderr except Exception as e: return False, str(e) # 编辑单条提交信息 class EditSingleCommitCommand(Command): def __init__(self, repo_path: str, commit_id: str, new_author: str, new_email: str, new_commit_message: str, new_commit_time: datetime): """ :param repo_path:仓库路径 :param commit_id:提交ID :param new_author:新的作者 :param new_email:新的邮箱 :param new_commit_message:新的提交信息 :param new_commit_time:新的提交时间 """ super().__init__(repo_path) self.commit_id = commit_id self.new_commit_message = new_commit_message self.new_commit_time = new_commit_time self.new_author = new_author self.new_email = new_email def execute(self): # 创建文件 file_name = "edit_commit_callback.py" target_file_path = os.path.join(self.repo_path, file_name) try: ok = CallbackScriptBuilder.build_single_commit_callback( filepath=target_file_path, target_hash=self.commit_id, author_name=self.new_author, author_email=self.new_email, commit_message=self.new_commit_message, commit_time=self.new_commit_time # 格式:2024-01-01T10:00:00 ) if not ok: return False, "生成文件失败" result = subprocess.run([ "git-filter-repo", "--commit-callback", file_name , "--force" ], cwd=self.repo_path, encoding='utf-8', errors='replace', capture_output=True, text=True) if result.returncode == 0: return True, "提交信息修改成功" else: return False, result.stderr except Exception as e: return False, str(e) finally: if os.path.exists(target_file_path): os.remove(target_file_path) # 编辑单条提交信息 class EditBulkCommitCommand(Command): def __init__(self, repo_path: str, commit_changes: dict): """ :param repo_path:仓库路径 :param commit_changes:提交信息 """ super().__init__(repo_path) self.commit_changes = commit_changes def execute(self): # 创建文件 file_name = "rewrite_callback.py" target_file_path = os.path.join(self.repo_path, file_name) try: ok = CallbackScriptBuilder.build_bulk_commit_callback(target_file_path, self.commit_changes) if not ok: return False, "生成文件失败" result = subprocess.run([ "git-filter-repo", "--commit-callback", file_name , "--force" ], cwd=self.repo_path, encoding='utf-8', errors='replace', capture_output=True, text=True) if result.returncode == 0: return True, "批量修改作者、邮箱及时间信息成功" else: return False, result.stderr except Exception as e: return False, str(e) finally: if os.path.exists(target_file_path): os.remove(target_file_path) # 切换分支 class CheckoutCommand(Command): def __init__(self, repo_path: str, branch_name: str): """ :param repo_path:仓库路径 :param branch_name:分支名称 """ super().__init__(repo_path) self.branch_name = branch_name def execute(self): try: result = subprocess.run([ "git", "checkout", self.branch_name ], cwd=self.repo_path, encoding='utf-8', errors='replace', capture_output=True, text=True) if result.returncode == 0: return True, "切换分支成功" else: return False, result.stderr except Exception as e: return False, str(e) # 获取所有分支 class GetAllBranchesCommand(Command): def __init__(self, repo_path: str): """ :param repo_path:仓库路径 """ super().__init__(repo_path) def execute(self): try: result = subprocess.run([ "git", "branch" , "-a" ], cwd=self.repo_path, encoding='utf-8', errors='replace', capture_output=True, text=True) if result.returncode == 0: branches = [line.strip() for line in result.stdout.split("\n") if line.strip() != ''] return True, branches else: return False, result.stderr except Exception as e: return False, str(e) # 获取远程仓库地址 class GetRemoteRepoUrlCommand(Command): def __init__(self, repo_path: str): """ :param repo_path:仓库路径 """ super().__init__(repo_path) def execute(self): try: result = subprocess.run([ "git", "remote", "get-url", "origin" ], cwd=self.repo_path, encoding='utf-8', errors='replace', capture_output=True, text=True) if result.returncode == 0: return True, result.stdout.strip() else: return False, result.stderr except Exception as e: return False, str(e) # 设置远程仓库 class SetRemoteUrlCommand(Command): def __init__(self, repo_path: str, remote_url: str): super().__init__(repo_path) self.remote_url = remote_url def execute(self) -> Tuple[bool, str]: try: result = subprocess.run(["git", "remote", "add", "origin", self.remote_url], cwd=self.repo_path, capture_output=True, text=True) if result.returncode == 0: return True, result.stdout.strip() else: return False, result.stderr except Exception as e: return False, str(e) class Executor: @staticmethod def executeAsync(command: Command, callback=None): try: worker = CommandWorker(command, callback) # 设置自动删除 worker.setAutoDelete(True) # 使用全局线程池 QThreadPool.globalInstance().start(worker) except Exception as e: print(f"提交异步任务失败: {e}") if callback: callback(False, str(e)) @staticmethod def execute(command: Command): return command.execute()