Files
autogen/utils/callback_handler.py
2026-03-12 13:27:03 +08:00

187 lines
5.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
回调处理器 - 用于实时推送对话内容到前端
"""
from typing import Callable, Dict, Any, List, Optional
from datetime import datetime
import threading
import queue
class MessageCallbackHandler:
"""消息回调处理器,支持实时流式输出"""
def __init__(self):
"""初始化回调处理器"""
self.callbacks: List[Callable[[Dict[str, Any]], None]] = []
self.message_queue = queue.Queue()
self.is_running = False
self._lock = threading.Lock()
def register_callback(self, callback: Callable[[Dict[str, Any]], None]):
"""
注册回调函数
Args:
callback: 回调函数,接收字典格式的消息
"""
with self._lock:
self.callbacks.append(callback)
def unregister_callback(self, callback: Callable[[Dict[str, Any]], None]):
"""注销回调函数"""
with self._lock:
if callback in self.callbacks:
self.callbacks.remove(callback)
def on_message(
self,
agent_name: str,
message: str,
role: str = "assistant",
metadata: Optional[Dict] = None
):
"""
处理新消息
Args:
agent_name: Agent 名称
message: 消息内容
role: 角色
metadata: 元数据
"""
msg_data = {
"timestamp": datetime.now().isoformat(),
"agent_name": agent_name,
"role": role,
"message": message,
"metadata": metadata or {}
}
# 放入队列
self.message_queue.put(msg_data)
# 调用所有回调
with self._lock:
for callback in self.callbacks:
try:
callback(msg_data)
except Exception as e:
print(f"回调执行失败:{e}")
def on_thinking(self, agent_name: str, status: str = "thinking"):
"""
发送思考状态
Args:
agent_name: Agent 名称
status: 状态thinking/generating/coding/testing
"""
self.on_message(
agent_name=agent_name,
message=f"_{status}...",
role="system",
metadata={"status": status}
)
def on_file_created(self, agent_name: str, file_path: str, file_type: str):
"""
发送文件创建事件
Args:
agent_name: Agent 名称
file_path: 文件路径
file_type: 文件类型
"""
self.on_message(
agent_name=agent_name,
message=f"📄 创建了文件:{file_path}",
role="system",
metadata={
"event_type": "file_created",
"file_path": file_path,
"file_type": file_type
}
)
def on_test_result(self, agent_name: str, passed: bool, details: str):
"""
发送测试结果
Args:
agent_name: Agent 名称
passed: 是否通过
details: 详细信息
"""
icon = "" if passed else ""
self.on_message(
agent_name=agent_name,
message=f"{icon} 测试结果:{'通过' if passed else '失败'}\n{details}",
role="system",
metadata={
"event_type": "test_result",
"passed": passed,
"details": details
}
)
def on_human_approval_request(
self,
request_id: str,
description: str,
data: Dict[str, Any]
):
"""
发送人工确认请求
Args:
request_id: 请求 ID
description: 请求描述
data: 相关数据
"""
self.on_message(
agent_name="Orchestrator",
message=f"⚠️ 需要人工确认:{description}",
role="system",
metadata={
"event_type": "human_approval",
"request_id": request_id,
"description": description,
"data": data
}
)
def get_message(self, timeout: float = 1.0) -> Optional[Dict[str, Any]]:
"""
从队列获取消息(非阻塞)
Args:
timeout: 超时时间
Returns:
消息字典或 None
"""
try:
return self.message_queue.get(timeout=timeout)
except queue.Empty:
return None
def clear_queue(self):
"""清空消息队列"""
while not self.message_queue.empty():
try:
self.message_queue.get_nowait()
except queue.Empty:
break
# 全局回调实例
_global_callback: Optional[MessageCallbackHandler] = None
def get_callback_handler() -> MessageCallbackHandler:
"""获取或创建全局回调处理器"""
global _global_callback
if _global_callback is None:
_global_callback = MessageCallbackHandler()
return _global_callback