add web
This commit is contained in:
100
gitea_client.py
100
gitea_client.py
@@ -6,7 +6,7 @@ Gitea API 客户端
|
||||
"""
|
||||
import logging
|
||||
import requests
|
||||
from typing import Dict, Any, Optional
|
||||
from typing import Dict, Any, Optional, List
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -167,3 +167,101 @@ class GiteaClient:
|
||||
if pr_info:
|
||||
return pr_info.get('mergeable', False) and pr_info.get('state') == 'open'
|
||||
return False
|
||||
|
||||
def get_pull_request_diff(self, owner: str, repo: str, pr_number: int) -> Optional[str]:
|
||||
"""
|
||||
获取 Pull Request 的代码差异
|
||||
|
||||
Args:
|
||||
owner: 仓库所有者
|
||||
repo: 仓库名称
|
||||
pr_number: PR 编号
|
||||
|
||||
Returns:
|
||||
diff 文本,失败返回 None
|
||||
"""
|
||||
url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}.diff"
|
||||
|
||||
try:
|
||||
response = requests.get(
|
||||
url,
|
||||
headers=self._get_headers(),
|
||||
timeout=30
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
logger.info(f"成功获取 PR #{pr_number} 的 diff")
|
||||
return response.text
|
||||
else:
|
||||
logger.error(f"获取 PR #{pr_number} diff 失败: {response.status_code}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取 PR #{pr_number} diff 异常: {str(e)}")
|
||||
return None
|
||||
|
||||
def get_pull_request_files(self, owner: str, repo: str, pr_number: int) -> Optional[List[Dict[str, Any]]]:
|
||||
"""
|
||||
获取 PR 中修改的文件列表
|
||||
|
||||
Args:
|
||||
owner: 仓库所有者
|
||||
repo: 仓库名称
|
||||
pr_number: PR 编号
|
||||
|
||||
Returns:
|
||||
文件列表,失败返回 None
|
||||
"""
|
||||
url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}/files"
|
||||
|
||||
try:
|
||||
response = requests.get(
|
||||
url,
|
||||
headers=self._get_headers(),
|
||||
timeout=30
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
logger.info(f"成功获取 PR #{pr_number} 的文件列表")
|
||||
return response.json()
|
||||
else:
|
||||
logger.error(f"获取 PR #{pr_number} 文件列表失败: {response.status_code}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取 PR #{pr_number} 文件列表异常: {str(e)}")
|
||||
return None
|
||||
|
||||
def get_file_contents(self, owner: str, repo: str, filepath: str, ref: str) -> Optional[str]:
|
||||
"""
|
||||
获取仓库中指定文件在给定 ref(分支/commit)下的内容
|
||||
|
||||
Args:
|
||||
owner: 仓库所有者
|
||||
repo: 仓库名称
|
||||
filepath: 文件路径
|
||||
ref: 分支名或 commit SHA
|
||||
|
||||
Returns:
|
||||
文件内容文本,失败返回 None
|
||||
"""
|
||||
import base64
|
||||
import urllib.parse
|
||||
encoded_path = urllib.parse.quote(filepath, safe='')
|
||||
url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{encoded_path}?ref={urllib.parse.quote(ref)}"
|
||||
try:
|
||||
response = requests.get(
|
||||
url,
|
||||
headers=self._get_headers(),
|
||||
timeout=30
|
||||
)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
if data.get('encoding') == 'base64' and data.get('content'):
|
||||
return base64.b64decode(data['content']).decode('utf-8', errors='replace')
|
||||
return None
|
||||
logger.error(f"获取文件 {filepath} 失败: {response.status_code}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"获取文件内容异常: {str(e)}")
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user