Files
code_scan/webhook/handler.py
Dang Zerong 8594cf4d77 init
2026-03-10 11:18:39 +08:00

189 lines
5.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Gitea Webhook 处理器
验证签名并解析 Webhook 事件
"""
import hmac
import hashlib
import logging
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
class GiteaWebhookHandler:
"""Gitea Webhook 处理器"""
def __init__(self, config: Dict[str, Any]):
"""
初始化 Webhook 处理器
Args:
config: Gitea 配置
"""
self.config = config
self.base_url = config.get('base_url', 'http://localhost:3000')
self.webhook_secret = config.get('webhook_secret', '')
def verify_signature(self, payload: bytes, signature: str, secret: str) -> bool:
"""
验证 Webhook 签名
Args:
payload: 请求体
signature: 请求头中的签名
secret: 密钥
Returns:
签名是否有效
"""
if not secret:
logger.warning('未配置 Webhook 密钥,跳过验证')
return True
try:
# Gitea 使用 SHA256 HMAC
expected_signature = hmac.new(
secret.encode('utf-8'),
payload,
hashlib.sha256
).hexdigest()
# 比较签名(使用 constant time 比较防止时序攻击)
return hmac.compare_digest(f'sha256={expected_signature}', signature)
except Exception as e:
logger.error(f'签名验证失败: {str(e)}')
return False
def parse_push_event(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
解析 Push 事件
Args:
payload: Webhook payload
Returns:
解析后的提交信息
"""
repo = payload.get('repository', {})
commits = payload.get('commits', [])
ref = payload.get('ref', '')
return {
'repo_name': repo.get('full_name', ''),
'repo_url': repo.get('clone_url', ''),
'web_url': repo.get('web_url', ''),
'branch': ref.replace('refs/heads/', ''),
'commits': [
{
'id': commit.get('id', '')[:8],
'message': commit.get('message', ''),
'author': commit.get('author', {}).get('name', ''),
'email': commit.get('author', {}).get('email', ''),
'timestamp': commit.get('timestamp', ''),
'added': commit.get('added', []),
'modified': commit.get('modified', []),
'removed': commit.get('removed', []),
}
for commit in commits
],
'pusher': payload.get('pusher', {}).get('name', ''),
'before': payload.get('before', ''),
'after': payload.get('after', ''),
}
def get_changed_files(self, commit: Dict[str, Any]) -> list:
"""
获取提交中变更的文件列表
Args:
commit: 提交信息
Returns:
变更的文件列表
"""
files = []
files.extend(commit.get('added', []))
files.extend(commit.get('modified', []))
files.extend(commit.get('removed', []))
return files
def filter_by_extension(self, files: list, extensions: list) -> list:
"""
按文件扩展名过滤文件
Args:
files: 文件列表
extensions: 扩展名列表(如 ['.py', '.js']
Returns:
过滤后的文件列表
"""
return [
f for f in files
if any(f.endswith(ext) for ext in extensions)
]
def parse_pull_request_event(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
解析 Pull Request 事件
Args:
payload: Webhook payload
Returns:
解析后的 PR 信息
"""
action = payload.get('action', '')
pr = payload.get('pull_request', {})
repo = payload.get('repository', {})
# 只处理 PR 创建和更新事件
if action not in ['opened', 'reopened', 'synchronize', 'ready_for_review']:
return None
# 获取 PR 的源分支和目标分支
head = pr.get('head', {})
base = pr.get('base', {})
return {
'action': action,
'repo_name': repo.get('full_name', ''),
'repo_url': repo.get('clone_url', ''),
'web_url': repo.get('web_url', ''),
'pr_number': pr.get('number', 0),
'pr_title': pr.get('title', ''),
'pr_body': pr.get('body', ''),
'pr_url': pr.get('html_url', ''),
'source_branch': head.get('ref', ''),
'source_sha': head.get('sha', '')[:8],
'target_branch': base.get('ref', ''),
'target_sha': base.get('sha', '')[:8],
'author': pr.get('user', {}).get('login', ''),
'author_email': pr.get('user', {}).get('email', ''),
'state': pr.get('state', ''),
'merged': pr.get('merged', False),
}
def get_pr_diff_files(self, payload: Dict[str, Any]) -> list:
"""
从 Pull Request 事件中获取变更的文件列表
Args:
payload: Webhook payload
Returns:
变更的文件列表
"""
pr = payload.get('pull_request', {})
files = pr.get('changed_files', [])
# 如果没有 changed_files 字段,尝试从 commits 中获取
if not files:
commits = pr.get('commits', [])
files = []
for commit in commits:
files.extend(self.get_changed_files(commit))
return files