#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Gitea Webhook 处理器 验证签名并解析 Webhook 事件 """ import hmac import hashlib import logging from typing import Dict, Any, Optional logger = logging.getLogger(__name__) class GiteaWebhookHandler: """Gitea Webhook 处理器""" def __init__(self, config: Dict[str, Any]): """ 初始化 Webhook 处理器 Args: config: Gitea 配置 """ self.config = config self.base_url = config.get('base_url', 'http://localhost:3000') self.webhook_secret = config.get('webhook_secret', '') def verify_signature(self, payload: bytes, signature: str, secret: str) -> bool: """ 验证 Webhook 签名 Args: payload: 请求体 signature: 请求头中的签名 secret: 密钥 Returns: 签名是否有效 """ if not secret: logger.warning('未配置 Webhook 密钥,跳过验证') return True try: # Gitea 使用 SHA256 HMAC expected_signature = hmac.new( secret.encode('utf-8'), payload, hashlib.sha256 ).hexdigest() # 比较签名(使用 constant time 比较防止时序攻击) return hmac.compare_digest(f'sha256={expected_signature}', signature) except Exception as e: logger.error(f'签名验证失败: {str(e)}') return False def parse_push_event(self, payload: Dict[str, Any]) -> Dict[str, Any]: """ 解析 Push 事件 Args: payload: Webhook payload Returns: 解析后的提交信息 """ repo = payload.get('repository', {}) commits = payload.get('commits', []) ref = payload.get('ref', '') return { 'repo_name': repo.get('full_name', ''), 'repo_url': repo.get('clone_url', ''), 'web_url': repo.get('web_url', ''), 'branch': ref.replace('refs/heads/', ''), 'commits': [ { 'id': commit.get('id', '')[:8], 'message': commit.get('message', ''), 'author': commit.get('author', {}).get('name', ''), 'email': commit.get('author', {}).get('email', ''), 'timestamp': commit.get('timestamp', ''), 'added': commit.get('added', []), 'modified': commit.get('modified', []), 'removed': commit.get('removed', []), } for commit in commits ], 'pusher': payload.get('pusher', {}).get('name', ''), 'before': payload.get('before', ''), 'after': payload.get('after', ''), } def get_changed_files(self, commit: Dict[str, Any]) -> list: """ 获取提交中变更的文件列表 Args: commit: 提交信息 Returns: 变更的文件列表 """ files = [] files.extend(commit.get('added', [])) files.extend(commit.get('modified', [])) files.extend(commit.get('removed', [])) return files def filter_by_extension(self, files: list, extensions: list) -> list: """ 按文件扩展名过滤文件 Args: files: 文件列表 extensions: 扩展名列表(如 ['.py', '.js']) Returns: 过滤后的文件列表 """ return [ f for f in files if any(f.endswith(ext) for ext in extensions) ] def parse_pull_request_event(self, payload: Dict[str, Any]) -> Dict[str, Any]: """ 解析 Pull Request 事件 Args: payload: Webhook payload Returns: 解析后的 PR 信息 """ action = payload.get('action', '') pr = payload.get('pull_request', {}) repo = payload.get('repository', {}) # 只处理 PR 创建和更新事件 if action not in ['opened', 'reopened', 'synchronize', 'ready_for_review']: return None # 获取 PR 的源分支和目标分支 head = pr.get('head', {}) base = pr.get('base', {}) return { 'action': action, 'repo_name': repo.get('full_name', ''), 'repo_url': repo.get('clone_url', ''), 'web_url': repo.get('web_url', ''), 'pr_number': pr.get('number', 0), 'pr_title': pr.get('title', ''), 'pr_body': pr.get('body', ''), 'pr_url': pr.get('html_url', ''), 'source_branch': head.get('ref', ''), 'source_sha': head.get('sha', '')[:8], 'target_branch': base.get('ref', ''), 'target_sha': base.get('sha', '')[:8], 'author': pr.get('user', {}).get('login', ''), 'author_email': pr.get('user', {}).get('email', ''), 'state': pr.get('state', ''), 'merged': pr.get('merged', False), } def get_pr_diff_files(self, payload: Dict[str, Any]) -> list: """ 从 Pull Request 事件中获取变更的文件列表 Args: payload: Webhook payload Returns: 变更的文件列表 """ pr = payload.get('pull_request', {}) files = pr.get('changed_files', []) # 如果没有 changed_files 字段,尝试从 commits 中获取 if not files: commits = pr.get('commits', []) files = [] for commit in commits: files.extend(self.get_changed_files(commit)) return files