""" User Manual Agent node for the Agentic RAG system. This module contains the autonomous user manual agent that can use tools and generate responses. """ import logging from typing import Dict, Any, List, Optional, Callable, Literal from contextvars import ContextVar from langchain_core.messages import AIMessage, SystemMessage, BaseMessage, ToolMessage, HumanMessage from langchain_core.runnables import RunnableConfig from .state import AgentState from .user_manual_tools import get_user_manual_tool_schemas, get_user_manual_tools_by_name from .message_trimmer import create_conversation_trimmer from ..llm_client import LLMClient from ..config import get_config from ..sse import ( create_tool_start_event, create_tool_result_event, create_tool_error_event, create_token_event, create_error_event ) from ..utils.error_handler import ( StructuredLogger, ErrorCategory, ErrorCode, handle_async_errors, get_user_message ) logger = StructuredLogger(__name__) # Cache configuration at module level to avoid repeated get_config() calls _cached_config = None def get_cached_config(): """Get cached configuration, loading it if not already cached""" global _cached_config if _cached_config is None: _cached_config = get_config() return _cached_config # User Manual Agent node (autonomous function calling agent) async def user_manual_agent_node(state: AgentState, config: Optional[RunnableConfig] = None) -> Dict[str, Any]: """ User Manual Agent node that autonomously uses user manual tools and generates final answer. Implements "detect-first-then-stream" strategy for optimal multi-round behavior: 1. Always start with non-streaming detection to check for tool needs 2. If tool_calls exist → return immediately for routing to tools 3. If no tool_calls → temporarily disable tools and perform streaming final synthesis """ app_config = get_cached_config() llm_client = LLMClient() # Get stream callback from context variable from .graph import stream_callback_context stream_callback = stream_callback_context.get() # Get user manual tool schemas and bind tools for planning phase tool_schemas = get_user_manual_tool_schemas() llm_client.bind_tools(tool_schemas, force_tool_choice=True) # Create conversation trimmer for managing context length trimmer = create_conversation_trimmer() # Prepare messages with user manual system prompt messages = state["messages"].copy() if not messages or not isinstance(messages[0], SystemMessage): rag_prompts = app_config.get_rag_prompts() user_manual_prompt = rag_prompts.get("user_manual_prompt", "") if not user_manual_prompt: raise ValueError("user_manual_prompt is null") # For user manual agent, we need to format the prompt with placeholders # Extract current query and conversation history current_query = "" for message in reversed(messages): if isinstance(message, HumanMessage): current_query = message.content break conversation_history = "" if len(messages) > 1: conversation_history = render_conversation_history(messages[:-1]) # Exclude current query # Format system prompt (initially with empty context, tools will provide it) formatted_system_prompt = user_manual_prompt.format( conversation_history=conversation_history, context_content="", # Will be filled by tools current_query=current_query ) messages = [SystemMessage(content=formatted_system_prompt)] + messages # Track tool rounds current_round = state.get("tool_rounds", 0) # Get max_tool_rounds_user_manual from state, fallback to config if not set max_rounds = state.get("max_tool_rounds_user_manual", None) if max_rounds is None: max_rounds = app_config.app.max_tool_rounds_user_manual # Only apply trimming at the start of a new conversation turn (when tool_rounds = 0) # This prevents trimming current turn's tool results during multi-round tool calling if current_round == 0: # Trim conversation history to manage context length (only for previous conversation turns) if trimmer.should_trim(messages): messages = trimmer.trim_conversation_history(messages) logger.info("Applied conversation history trimming for context management (new conversation turn)") else: logger.info(f"Skipping trimming during tool round {current_round} to preserve current turn's context") logger.info(f"User Manual Agent node: tool_rounds={current_round}, max_tool_rounds={max_rounds}") # Check if this should be final synthesis (max rounds reached) has_tool_messages = any(isinstance(msg, ToolMessage) for msg in messages) is_final_synthesis = has_tool_messages and current_round >= max_rounds if is_final_synthesis: logger.info("Starting final synthesis phase - no more tool calls allowed") # ✅ STEP 1: Final synthesis with tools disabled from the start # Disable tools to prevent any tool calling during synthesis try: original_tools = llm_client.bind_tools([], force_tool_choice=False) # Disable tools if not stream_callback: # No streaming callback, generate final response without tools draft = await llm_client.ainvoke(list(messages)) return {"messages": [draft]} # ✅ STEP 2: Streaming final synthesis with improved HTML comment filtering response_content = "" accumulated_content = "" async for token in llm_client.astream(list(messages)): accumulated_content += token response_content += token # Check for complete HTML comments in accumulated content while "" in accumulated_content: comment_start = accumulated_content.find("", comment_start) if comment_start >= 0 and comment_end >= 0: # Send content before comment before_comment = accumulated_content[:comment_start] if stream_callback and before_comment: await stream_callback(create_token_event(before_comment)) # Skip the comment and continue with content after accumulated_content = accumulated_content[comment_end + 3:] else: break # Send accumulated content if no pending comment if "" in accumulated_content: comment_start = accumulated_content.find("", comment_start) if comment_start >= 0 and comment_end >= 0: # Send content before comment before_comment = accumulated_content[:comment_start] if stream_callback and before_comment: await stream_callback(create_token_event(before_comment)) # Skip the comment and continue with content after accumulated_content = accumulated_content[comment_end + 3:] else: break # Send accumulated content if no pending comment if "