7.3 KiB
7.3 KiB
下面给出一套“把流式放到最后一步”的最小侵入式改造方案,目标是:
- 工具规划阶段一律非流式,让模型能在一次交互内多轮地产生
tool_calls; - 仅当确认没有更多工具要调时,才触发最终流式生成;
- 并让
tool_results在多轮中累加,供最终引用/后处理使用。
1) 让 tool_results 支持累加(可选但强烈建议)
# ✅ 修改:为 tool_results 增加 reducer,使其在多轮工具调用中累加
from typing import Annotated
class AgentState(MessagesState):
session_id: str
tool_results: Annotated[List[Dict[str, Any]], lambda x, y: (x or []) + (y or [])]
final_answer: str
说明:没有 reducer 时,LangGraph 默认是“覆盖”。上面写法会把各轮
run_tools_with_streaming返回的结果累加进 state,方便最终post_process_node正确生成引用。
2) 调整 call_model:规划用非流式,终稿再流式
核心思路:
- 始终先用
ainvoke_with_tools()(非流式)拿到一个AIMessage; - 若含有
tool_calls→ 直接返回,让路由去tools; - 若不含
tool_calls→ 说明进入终稿阶段,这时临时禁用工具并用astream()做流式最终生成;把生成的流式文本作为本轮AIMessage返回。
async def call_model(state: AgentState, config: Optional[RunnableConfig] = None) -> Dict[str, List[BaseMessage]]:
app_config = get_config()
llm_client = LLMClient()
stream_callback = stream_callback_context.get()
# 绑定工具(规划阶段:强制允许工具调用)
tool_schemas = get_tool_schemas()
llm_client.bind_tools(tool_schemas, force_tool_choice=True)
trimmer = create_conversation_trimmer()
messages = state["messages"].copy()
if not messages or not isinstance(messages[0], SystemMessage):
rag_prompts = app_config.get_rag_prompts()
system_prompt = rag_prompts.get("agent_system_prompt", "")
if not system_prompt:
raise ValueError("system_prompt is null")
messages = [SystemMessage(content=system_prompt)] + messages
if trimmer.should_trim(messages):
messages = trimmer.trim_conversation_history(messages)
# ✅ 第一步:非流式规划(可能返回 tool_calls)
draft = await llm_client.ainvoke_with_tools(list(messages))
# 如果需要继续调工具,直接返回(由 should_continue 路由到 tools)
if isinstance(draft, AIMessage) and getattr(draft, "tool_calls", None):
return {"messages": [draft]}
# ✅ 走到这里,说明模型已不再需要工具 → 终稿阶段走“流式”
# 关键:临时禁用工具,避免生成期再次触发函数调用
try:
# ★ 根据你的 LLMClient 能力二选一:
# 方案 A:解绑工具
llm_client.bind_tools([], force_tool_choice=False)
# 方案 B:若支持 tool_choice 参数,可传 "none"
# (示例) llm_client.set_tool_choice("none")
if not stream_callback:
# 无流式回调时,走一次普通非流式生成(确保有终稿)
# 这里如果没有 ainvoke(),可以继续用 ainvoke_with_tools,但工具已解绑
final_msg = await llm_client.ainvoke_with_tools(list(messages))
return {"messages": [final_msg]}
# ✅ 仅此处进行流式:把终稿 token 推给前端
response_content = ""
filtering_html_comment = False
comment_buffer = ""
async for token in llm_client.astream(list(messages)):
response_content += token
# 保留你现有的 HTML 注释过滤逻辑(原样拷贝)
if not filtering_html_comment:
combined = comment_buffer + token
if "<!--" in combined:
pos = combined.find("<!--")
if pos > 0 and stream_callback:
await stream_callback(create_token_event(combined[:pos]))
filtering_html_comment = True
comment_buffer = combined[pos:]
else:
# 其他同你原来逻辑...
if stream_callback:
await stream_callback(create_token_event(token))
comment_buffer = ""
else:
comment_buffer += token
if "-->" in comment_buffer:
filtering_html_comment = False
comment_buffer = ""
if not filtering_html_comment and comment_buffer and stream_callback:
await stream_callback(create_token_event(comment_buffer))
return {"messages": [AIMessage(content=response_content)]}
finally:
# (可选)恢复工具绑定配置到“规划阶段”的默认,以免影响下一轮交互
llm_client.bind_tools(tool_schemas, force_tool_choice=True)
要点回顾
- 移除原先的
if has_tool_messages and stream_callback: astream(...)分支;- 统一先走一次非流式
ainvoke_with_tools()拿到draft;- 只有
draft没有tool_calls时,才临时禁用工具并执行流式astream();- 这样
should_continue()在“规划阶段”永远能看到tool_calls,从而支持多轮并行工具调用;只有到了真正的“终稿阶段”才会有一次流式输出。
3) should_continue() 无需改
现有逻辑已经满足需求:
- 有
tool_calls→ 去tools; - 无 → 去
post_process。
因为我们把“流式”只放在“无 tool_calls”的那一次 agent 返回里,路由自然会把这次当作终稿,然后进入 post_process_node。
4) run_tools_with_streaming() 不变(已支持并行)
你当前工具节点已经用 asyncio.gather(...) 并行执行,并且会发 start/result/error 的 SSE 事件;保留即可。
若启用了第 1 步的 reducer,确保返回值里仍旧是:
return {
"messages": new_messages,
"tool_results": tool_results # <- 将被累加
}
5) 可选的小优化
- 在“终稿流式”前,给模型一个显式的“不要再调用工具、直接给出最终答案”的系统/用户指令(如果你的模型容易犹豫)。
- 若
LLMClient支持tool_choice="none"或 “tools=[]+force_tool_choice=False”,推荐二者都做,以最大化禁止工具调用。 - 若担心“重复计费”,可以不先跑
draft,而是让ainvoke_with_tools()在内部“无工具可调时直接返回空AIMessage”,然后只做一次流式。但这需要改LLMClient,因此此方案保持为“先探测、再流式”,实现最小改动。
预期行为(对比)
-
改造前:
agent(非流式)->tools(并行)->agent(流式无 tool_calls)->post_process→ 只能一轮工具调用。 -
改造后:
agent(非流式有 tool_calls)->tools(并行)->agent(非流式有 tool_calls)->tools(并行)->...->agent(非流式无 tool_calls -> 终稿流式)->post_process- 多轮并行工具调用 ✅;只有最后一次生成才流式 ✅。
这套改造不改变你现有图结构与 SSE 协议,只是把流式移动到“最后一次没有工具调用”的那一步,即可在一次用户交互内稳定支持“多轮并行 tool call”。