166 lines
7.3 KiB
Markdown
166 lines
7.3 KiB
Markdown
|
|
下面给出一套“**把流式放到最后一步**”的最小侵入式改造方案,目标是:
|
|||
|
|
|
|||
|
|
* 工具规划阶段**一律非流式**,让模型能在一次交互内多轮地产生 `tool_calls`;
|
|||
|
|
* **仅当确认没有更多工具要调**时,才触发**最终流式**生成;
|
|||
|
|
* 并让 `tool_results` 在多轮中**累加**,供最终引用/后处理使用。
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
# 1) 让 `tool_results` 支持累加(可选但强烈建议)
|
|||
|
|
|
|||
|
|
```python
|
|||
|
|
# ✅ 修改:为 tool_results 增加 reducer,使其在多轮工具调用中累加
|
|||
|
|
from typing import Annotated
|
|||
|
|
|
|||
|
|
class AgentState(MessagesState):
|
|||
|
|
session_id: str
|
|||
|
|
tool_results: Annotated[List[Dict[str, Any]], lambda x, y: (x or []) + (y or [])]
|
|||
|
|
final_answer: str
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
> 说明:没有 reducer 时,LangGraph 默认是“覆盖”。上面写法会把各轮 `run_tools_with_streaming` 返回的结果累加进 state,方便最终 `post_process_node` 正确生成引用。
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
# 2) 调整 `call_model`:**规划用非流式,终稿再流式**
|
|||
|
|
|
|||
|
|
核心思路:
|
|||
|
|
|
|||
|
|
* **始终**先用 `ainvoke_with_tools()`(非流式)拿到一个 `AIMessage`;
|
|||
|
|
* 若含有 `tool_calls` → 直接返回,让路由去 `tools`;
|
|||
|
|
* 若**不**含 `tool_calls` → 说明进入终稿阶段,这时**临时禁用工具**并用 `astream()` 做**流式**最终生成;把生成的流式文本作为本轮 `AIMessage` 返回。
|
|||
|
|
|
|||
|
|
```python
|
|||
|
|
async def call_model(state: AgentState, config: Optional[RunnableConfig] = None) -> Dict[str, List[BaseMessage]]:
|
|||
|
|
app_config = get_config()
|
|||
|
|
llm_client = LLMClient()
|
|||
|
|
stream_callback = stream_callback_context.get()
|
|||
|
|
|
|||
|
|
# 绑定工具(规划阶段:强制允许工具调用)
|
|||
|
|
tool_schemas = get_tool_schemas()
|
|||
|
|
llm_client.bind_tools(tool_schemas, force_tool_choice=True)
|
|||
|
|
|
|||
|
|
trimmer = create_conversation_trimmer()
|
|||
|
|
messages = state["messages"].copy()
|
|||
|
|
|
|||
|
|
if not messages or not isinstance(messages[0], SystemMessage):
|
|||
|
|
rag_prompts = app_config.get_rag_prompts()
|
|||
|
|
system_prompt = rag_prompts.get("agent_system_prompt", "")
|
|||
|
|
if not system_prompt:
|
|||
|
|
raise ValueError("system_prompt is null")
|
|||
|
|
messages = [SystemMessage(content=system_prompt)] + messages
|
|||
|
|
|
|||
|
|
if trimmer.should_trim(messages):
|
|||
|
|
messages = trimmer.trim_conversation_history(messages)
|
|||
|
|
|
|||
|
|
# ✅ 第一步:非流式规划(可能返回 tool_calls)
|
|||
|
|
draft = await llm_client.ainvoke_with_tools(list(messages))
|
|||
|
|
|
|||
|
|
# 如果需要继续调工具,直接返回(由 should_continue 路由到 tools)
|
|||
|
|
if isinstance(draft, AIMessage) and getattr(draft, "tool_calls", None):
|
|||
|
|
return {"messages": [draft]}
|
|||
|
|
|
|||
|
|
# ✅ 走到这里,说明模型已不再需要工具 → 终稿阶段走“流式”
|
|||
|
|
# 关键:临时禁用工具,避免生成期再次触发函数调用
|
|||
|
|
try:
|
|||
|
|
# ★ 根据你的 LLMClient 能力二选一:
|
|||
|
|
# 方案 A:解绑工具
|
|||
|
|
llm_client.bind_tools([], force_tool_choice=False)
|
|||
|
|
# 方案 B:若支持 tool_choice 参数,可传 "none"
|
|||
|
|
# (示例) llm_client.set_tool_choice("none")
|
|||
|
|
|
|||
|
|
if not stream_callback:
|
|||
|
|
# 无流式回调时,走一次普通非流式生成(确保有终稿)
|
|||
|
|
# 这里如果没有 ainvoke(),可以继续用 ainvoke_with_tools,但工具已解绑
|
|||
|
|
final_msg = await llm_client.ainvoke_with_tools(list(messages))
|
|||
|
|
return {"messages": [final_msg]}
|
|||
|
|
|
|||
|
|
# ✅ 仅此处进行流式:把终稿 token 推给前端
|
|||
|
|
response_content = ""
|
|||
|
|
filtering_html_comment = False
|
|||
|
|
comment_buffer = ""
|
|||
|
|
|
|||
|
|
async for token in llm_client.astream(list(messages)):
|
|||
|
|
response_content += token
|
|||
|
|
# 保留你现有的 HTML 注释过滤逻辑(原样拷贝)
|
|||
|
|
if not filtering_html_comment:
|
|||
|
|
combined = comment_buffer + token
|
|||
|
|
if "<!--" in combined:
|
|||
|
|
pos = combined.find("<!--")
|
|||
|
|
if pos > 0 and stream_callback:
|
|||
|
|
await stream_callback(create_token_event(combined[:pos]))
|
|||
|
|
filtering_html_comment = True
|
|||
|
|
comment_buffer = combined[pos:]
|
|||
|
|
else:
|
|||
|
|
# 其他同你原来逻辑...
|
|||
|
|
if stream_callback:
|
|||
|
|
await stream_callback(create_token_event(token))
|
|||
|
|
comment_buffer = ""
|
|||
|
|
else:
|
|||
|
|
comment_buffer += token
|
|||
|
|
if "-->" in comment_buffer:
|
|||
|
|
filtering_html_comment = False
|
|||
|
|
comment_buffer = ""
|
|||
|
|
|
|||
|
|
if not filtering_html_comment and comment_buffer and stream_callback:
|
|||
|
|
await stream_callback(create_token_event(comment_buffer))
|
|||
|
|
|
|||
|
|
return {"messages": [AIMessage(content=response_content)]}
|
|||
|
|
|
|||
|
|
finally:
|
|||
|
|
# (可选)恢复工具绑定配置到“规划阶段”的默认,以免影响下一轮交互
|
|||
|
|
llm_client.bind_tools(tool_schemas, force_tool_choice=True)
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
> 要点回顾
|
|||
|
|
>
|
|||
|
|
> * **移除**原先的 `if has_tool_messages and stream_callback: astream(...)` 分支;
|
|||
|
|
> * 统一先走一次**非流式** `ainvoke_with_tools()` 拿到 `draft`;
|
|||
|
|
> * 只有 `draft` **没有** `tool_calls` 时,才临时禁用工具并执行**流式** `astream()`;
|
|||
|
|
> * 这样 `should_continue()` 在“规划阶段”永远能看到 `tool_calls`,从而**支持多轮**并行工具调用;只有到了真正的“终稿阶段”才会有一次流式输出。
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
# 3) `should_continue()` 无需改
|
|||
|
|
|
|||
|
|
现有逻辑已经满足需求:
|
|||
|
|
|
|||
|
|
* 有 `tool_calls` → 去 `tools`;
|
|||
|
|
* 无 → 去 `post_process`。
|
|||
|
|
|
|||
|
|
因为我们把“流式”只放在“无 `tool_calls`”的那一次 `agent` 返回里,路由自然会把这次当作终稿,然后进入 `post_process_node`。
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
# 4) `run_tools_with_streaming()` 不变(已支持并行)
|
|||
|
|
|
|||
|
|
你当前工具节点已经用 `asyncio.gather(...)` 并行执行,并且会发 start/result/error 的 SSE 事件;保留即可。
|
|||
|
|
若启用了第 1 步的 reducer,确保返回值里仍旧是:
|
|||
|
|
|
|||
|
|
```python
|
|||
|
|
return {
|
|||
|
|
"messages": new_messages,
|
|||
|
|
"tool_results": tool_results # <- 将被累加
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
# 5) 可选的小优化
|
|||
|
|
|
|||
|
|
* 在“终稿流式”前,给模型一个显式的“**不要再调用工具**、直接给出最终答案”的系统/用户指令(如果你的模型容易犹豫)。
|
|||
|
|
* 若 `LLMClient` 支持 `tool_choice="none"` 或 “`tools=[]` + `force_tool_choice=False`”,推荐二者都做,以最大化禁止工具调用。
|
|||
|
|
* 若担心“重复计费”,可以不先跑 `draft`,而是让 `ainvoke_with_tools()` 在内部“无工具可调时直接返回空 `AIMessage`”,然后只做一次流式。但这需要改 `LLMClient`,因此此方案保持为“先探测、再流式”,实现最小改动。
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 预期行为(对比)
|
|||
|
|
|
|||
|
|
* **改造前**:`agent(非流式)->tools(并行)->agent(流式无 tool_calls)->post_process` → 只能一轮工具调用。
|
|||
|
|
* **改造后**:
|
|||
|
|
|
|||
|
|
* `agent(非流式有 tool_calls)->tools(并行)->agent(非流式有 tool_calls)->tools(并行)->...->agent(非流式无 tool_calls -> 终稿流式)->post_process`
|
|||
|
|
* 多轮并行工具调用 ✅;只有最后一次生成才流式 ✅。
|
|||
|
|
|
|||
|
|
这套改造不改变你现有图结构与 SSE 协议,只是**把流式移动到“最后一次没有工具调用”的那一步**,即可在一次用户交互内稳定支持“多轮并行 tool call”。
|