#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Gitea API 客户端 用于操作 PR:合并、关闭等 """ import logging import requests from typing import Dict, Any, Optional, List logger = logging.getLogger(__name__) class GiteaClient: """Gitea API 客户端""" def __init__(self, config: Dict[str, Any]): """ 初始化 Gitea 客户端 Args: config: Gitea 配置,包含 base_url 和 api_token """ self.base_url = config.get('base_url', '').rstrip('/') self.api_token = config.get('api_token', '') if not self.base_url: raise ValueError("Gitea base_url 未配置") if not self.api_token: raise ValueError("Gitea api_token 未配置") def _get_headers(self) -> Dict[str, str]: """获取 API 请求头""" return { 'Authorization': f'token {self.api_token}', 'Content-Type': 'application/json', 'Accept': 'application/json' } def merge_pull_request(self, owner: str, repo: str, pr_number: int, merge_message: str = "", merge_commit_id: str = None) -> bool: """ 合并 Pull Request """ url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}/merge" logger.info(f"合并 PR URL: {url}") # Gitea API 需要 do 参数:merge, rebase, squash payload = { "do": "merge", "merge_commit_message": merge_message or f"Merge PR #{pr_number}" } if merge_commit_id: payload["merge_commit_id"] = merge_commit_id try: response = requests.post( url, headers=self._get_headers(), json=payload, timeout=30 ) logger.info(f"合并响应状态码: {response.status_code}") logger.info(f"合并响应内容: {response.text[:500]}") if response.status_code == 200: logger.info(f"成功合并 PR #{pr_number}") return True elif response.status_code == 405: logger.error(f"PR #{pr_number} 无法合并: {response.json().get('message', '未知原因')}") return False elif response.status_code == 422: logger.error(f"PR #{pr_number} 合并失败: {response.json().get('message', '参数错误')}") return False else: logger.error(f"合并 PR #{pr_number} 失败: {response.status_code} - {response.text}") return False except Exception as e: logger.error(f"合并 PR #{pr_number} 异常: {str(e)}") return False def close_pull_request(self, owner: str, repo: str, pr_number: int) -> bool: """ 关闭 Pull Request Args: owner: 仓库所有者 repo: 仓库名称 pr_number: PR 编号 Returns: 是否关闭成功 """ url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}" payload = { "state": "closed" } try: response = requests.patch( url, headers=self._get_headers(), json=payload, timeout=30 ) if response.status_code in (200, 201): logger.info(f"成功关闭 PR #{pr_number}") return True else: logger.error(f"关闭 PR #{pr_number} 失败: {response.status_code} - {response.text}") return False except Exception as e: logger.error(f"关闭 PR #{pr_number} 异常: {str(e)}") return False def get_pull_request(self, owner: str, repo: str, pr_number: int) -> Optional[Dict[str, Any]]: """ 获取 Pull Request 信息 Args: owner: 仓库所有者 repo: 仓库名称 pr_number: PR 编号 Returns: PR 信息字典,失败返回 None """ url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}" try: response = requests.get( url, headers=self._get_headers(), timeout=30 ) if response.status_code == 200: return response.json() else: logger.error(f"获取 PR #{pr_number} 失败: {response.status_code}") return None except Exception as e: logger.error(f"获取 PR #{pr_number} 异常: {str(e)}") return None def can_merge(self, owner: str, repo: str, pr_number: int) -> bool: """ 检查 PR 是否可以合并 Args: owner: 仓库所有者 repo: 仓库名称 pr_number: PR 编号 Returns: 是否可以合并 """ pr_info = self.get_pull_request(owner, repo, pr_number) if pr_info: return pr_info.get('mergeable', False) and pr_info.get('state') == 'open' return False def get_pull_request_diff(self, owner: str, repo: str, pr_number: int) -> Optional[str]: """ 获取 Pull Request 的代码差异 Args: owner: 仓库所有者 repo: 仓库名称 pr_number: PR 编号 Returns: diff 文本,失败返回 None """ url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}.diff" try: response = requests.get( url, headers=self._get_headers(), timeout=30 ) if response.status_code == 200: logger.info(f"成功获取 PR #{pr_number} 的 diff") return response.text else: logger.error(f"获取 PR #{pr_number} diff 失败: {response.status_code}") return None except Exception as e: logger.error(f"获取 PR #{pr_number} diff 异常: {str(e)}") return None def get_pull_request_files(self, owner: str, repo: str, pr_number: int) -> Optional[List[Dict[str, Any]]]: """ 获取 PR 中修改的文件列表 Args: owner: 仓库所有者 repo: 仓库名称 pr_number: PR 编号 Returns: 文件列表,失败返回 None """ url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}/files" try: response = requests.get( url, headers=self._get_headers(), timeout=30 ) if response.status_code == 200: logger.info(f"成功获取 PR #{pr_number} 的文件列表") return response.json() else: logger.error(f"获取 PR #{pr_number} 文件列表失败: {response.status_code}") return None except Exception as e: logger.error(f"获取 PR #{pr_number} 文件列表异常: {str(e)}") return None def get_file_contents(self, owner: str, repo: str, filepath: str, ref: str) -> Optional[str]: """ 获取仓库中指定文件在给定 ref(分支/commit)下的内容 Args: owner: 仓库所有者 repo: 仓库名称 filepath: 文件路径 ref: 分支名或 commit SHA Returns: 文件内容文本,失败返回 None """ import base64 import urllib.parse encoded_path = urllib.parse.quote(filepath, safe='') url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{encoded_path}?ref={urllib.parse.quote(ref)}" try: response = requests.get( url, headers=self._get_headers(), timeout=30 ) if response.status_code == 200: data = response.json() if data.get('encoding') == 'base64' and data.get('content'): return base64.b64decode(data['content']).decode('utf-8', errors='replace') return None logger.error(f"获取文件 {filepath} 失败: {response.status_code}") return None except Exception as e: logger.error(f"获取文件内容异常: {str(e)}") return None