下面给出一套“**把流式放到最后一步**”的最小侵入式改造方案,目标是: * 工具规划阶段**一律非流式**,让模型能在一次交互内多轮地产生 `tool_calls`; * **仅当确认没有更多工具要调**时,才触发**最终流式**生成; * 并让 `tool_results` 在多轮中**累加**,供最终引用/后处理使用。 --- # 1) 让 `tool_results` 支持累加(可选但强烈建议) ```python # ✅ 修改:为 tool_results 增加 reducer,使其在多轮工具调用中累加 from typing import Annotated class AgentState(MessagesState): session_id: str tool_results: Annotated[List[Dict[str, Any]], lambda x, y: (x or []) + (y or [])] final_answer: str ``` > 说明:没有 reducer 时,LangGraph 默认是“覆盖”。上面写法会把各轮 `run_tools_with_streaming` 返回的结果累加进 state,方便最终 `post_process_node` 正确生成引用。 --- # 2) 调整 `call_model`:**规划用非流式,终稿再流式** 核心思路: * **始终**先用 `ainvoke_with_tools()`(非流式)拿到一个 `AIMessage`; * 若含有 `tool_calls` → 直接返回,让路由去 `tools`; * 若**不**含 `tool_calls` → 说明进入终稿阶段,这时**临时禁用工具**并用 `astream()` 做**流式**最终生成;把生成的流式文本作为本轮 `AIMessage` 返回。 ```python async def call_model(state: AgentState, config: Optional[RunnableConfig] = None) -> Dict[str, List[BaseMessage]]: app_config = get_config() llm_client = LLMClient() stream_callback = stream_callback_context.get() # 绑定工具(规划阶段:强制允许工具调用) tool_schemas = get_tool_schemas() llm_client.bind_tools(tool_schemas, force_tool_choice=True) trimmer = create_conversation_trimmer() messages = state["messages"].copy() if not messages or not isinstance(messages[0], SystemMessage): rag_prompts = app_config.get_rag_prompts() system_prompt = rag_prompts.get("agent_system_prompt", "") if not system_prompt: raise ValueError("system_prompt is null") messages = [SystemMessage(content=system_prompt)] + messages if trimmer.should_trim(messages): messages = trimmer.trim_conversation_history(messages) # ✅ 第一步:非流式规划(可能返回 tool_calls) draft = await llm_client.ainvoke_with_tools(list(messages)) # 如果需要继续调工具,直接返回(由 should_continue 路由到 tools) if isinstance(draft, AIMessage) and getattr(draft, "tool_calls", None): return {"messages": [draft]} # ✅ 走到这里,说明模型已不再需要工具 → 终稿阶段走“流式” # 关键:临时禁用工具,避免生成期再次触发函数调用 try: # ★ 根据你的 LLMClient 能力二选一: # 方案 A:解绑工具 llm_client.bind_tools([], force_tool_choice=False) # 方案 B:若支持 tool_choice 参数,可传 "none" # (示例) llm_client.set_tool_choice("none") if not stream_callback: # 无流式回调时,走一次普通非流式生成(确保有终稿) # 这里如果没有 ainvoke(),可以继续用 ainvoke_with_tools,但工具已解绑 final_msg = await llm_client.ainvoke_with_tools(list(messages)) return {"messages": [final_msg]} # ✅ 仅此处进行流式:把终稿 token 推给前端 response_content = "" filtering_html_comment = False comment_buffer = "" async for token in llm_client.astream(list(messages)): response_content += token # 保留你现有的 HTML 注释过滤逻辑(原样拷贝) if not filtering_html_comment: combined = comment_buffer + token if "" in comment_buffer: filtering_html_comment = False comment_buffer = "" if not filtering_html_comment and comment_buffer and stream_callback: await stream_callback(create_token_event(comment_buffer)) return {"messages": [AIMessage(content=response_content)]} finally: # (可选)恢复工具绑定配置到“规划阶段”的默认,以免影响下一轮交互 llm_client.bind_tools(tool_schemas, force_tool_choice=True) ``` > 要点回顾 > > * **移除**原先的 `if has_tool_messages and stream_callback: astream(...)` 分支; > * 统一先走一次**非流式** `ainvoke_with_tools()` 拿到 `draft`; > * 只有 `draft` **没有** `tool_calls` 时,才临时禁用工具并执行**流式** `astream()`; > * 这样 `should_continue()` 在“规划阶段”永远能看到 `tool_calls`,从而**支持多轮**并行工具调用;只有到了真正的“终稿阶段”才会有一次流式输出。 --- # 3) `should_continue()` 无需改 现有逻辑已经满足需求: * 有 `tool_calls` → 去 `tools`; * 无 → 去 `post_process`。 因为我们把“流式”只放在“无 `tool_calls`”的那一次 `agent` 返回里,路由自然会把这次当作终稿,然后进入 `post_process_node`。 --- # 4) `run_tools_with_streaming()` 不变(已支持并行) 你当前工具节点已经用 `asyncio.gather(...)` 并行执行,并且会发 start/result/error 的 SSE 事件;保留即可。 若启用了第 1 步的 reducer,确保返回值里仍旧是: ```python return { "messages": new_messages, "tool_results": tool_results # <- 将被累加 } ``` --- # 5) 可选的小优化 * 在“终稿流式”前,给模型一个显式的“**不要再调用工具**、直接给出最终答案”的系统/用户指令(如果你的模型容易犹豫)。 * 若 `LLMClient` 支持 `tool_choice="none"` 或 “`tools=[]` + `force_tool_choice=False`”,推荐二者都做,以最大化禁止工具调用。 * 若担心“重复计费”,可以不先跑 `draft`,而是让 `ainvoke_with_tools()` 在内部“无工具可调时直接返回空 `AIMessage`”,然后只做一次流式。但这需要改 `LLMClient`,因此此方案保持为“先探测、再流式”,实现最小改动。 --- ## 预期行为(对比) * **改造前**:`agent(非流式)->tools(并行)->agent(流式无 tool_calls)->post_process` → 只能一轮工具调用。 * **改造后**: * `agent(非流式有 tool_calls)->tools(并行)->agent(非流式有 tool_calls)->tools(并行)->...->agent(非流式无 tool_calls -> 终稿流式)->post_process` * 多轮并行工具调用 ✅;只有最后一次生成才流式 ✅。 这套改造不改变你现有图结构与 SSE 协议,只是**把流式移动到“最后一次没有工具调用”的那一步**,即可在一次用户交互内稳定支持“多轮并行 tool call”。