73 lines
2.1 KiB
Python
73 lines
2.1 KiB
Python
import json
|
|
from typing import AsyncGenerator, Dict, Any
|
|
|
|
|
|
def format_sse_event(event: str, data: Dict[str, Any]) -> str:
|
|
"""Format data as Server-Sent Events"""
|
|
return f"event: {event}\ndata: {json.dumps(data)}\n\n"
|
|
|
|
|
|
async def send_heartbeat() -> AsyncGenerator[str, None]:
|
|
"""Send periodic heartbeat to keep connection alive"""
|
|
while True:
|
|
yield format_sse_event("heartbeat", {"timestamp": "now"})
|
|
# In practice, you'd use asyncio.sleep but this is for demo
|
|
break
|
|
|
|
|
|
def create_token_event(delta: str, tool_call_id: str | None = None) -> str:
|
|
"""Create a token streaming event"""
|
|
return format_sse_event("tokens", {
|
|
"delta": delta,
|
|
"tool_call_id": tool_call_id
|
|
})
|
|
|
|
|
|
def create_tool_start_event(tool_id: str, name: str, args: Dict[str, Any]) -> str:
|
|
"""Create a tool start event"""
|
|
return format_sse_event("tool_start", {
|
|
"id": tool_id,
|
|
"name": name,
|
|
"args": args
|
|
})
|
|
|
|
|
|
def create_tool_progress_event(tool_id: str, message: str) -> str:
|
|
"""Create a tool progress event"""
|
|
return format_sse_event("tool_progress", {
|
|
"id": tool_id,
|
|
"message": message
|
|
})
|
|
|
|
|
|
def create_tool_result_event(tool_id: str, name: str, results: list, took_ms: int) -> str:
|
|
"""Create a tool result event"""
|
|
return format_sse_event("tool_result", {
|
|
"id": tool_id,
|
|
"name": name,
|
|
"results": results,
|
|
"took_ms": took_ms
|
|
})
|
|
|
|
|
|
def create_tool_error_event(tool_id: str, name: str, error: str) -> str:
|
|
"""Create a tool error event"""
|
|
return format_sse_event("tool_error", {
|
|
"id": tool_id,
|
|
"name": name,
|
|
"error": error
|
|
})
|
|
|
|
|
|
# def create_agent_done_event() -> str:
|
|
# """Create agent completion event"""
|
|
# return format_sse_event("agent_done", {"answer_done": True})
|
|
|
|
|
|
def create_error_event(error: str, details: Dict[str, Any] | None = None) -> str:
|
|
"""Create an error event"""
|
|
event_data: Dict[str, Any] = {"error": error}
|
|
if details:
|
|
event_data["details"] = details
|
|
return format_sse_event("error", event_data)
|