Files

197 lines
8.4 KiB
Markdown
Raw Permalink Normal View History

2025-09-26 17:15:54 +08:00
太好了Python 版 LangGraph + FastAPI 完全可以和 AI SDK Elements 的 Chatbot 做**原生流式对接**,而且不需要 Node/Next.js 后端:只要你的 FastAPI 按 **AI SDK v5 的 UI Message Stream 协议**发 **SSE** 就能被 `useChat()`/Elements 直接吃下。下面给你一套**最小可跑模板**(含工具调用输出)。
> 要点(来自官方协议):用 **SSE**,响应头加 `x-vercel-ai-ui-message-stream: v1`,依次发 `start → text-start → text-delta* → text-end → finish → [DONE]`;如要展示工具,发 `tool-output-available` 等分片。([AI SDK][1])
---
# 服务器FastAPI + LangGraphSSE 输出 UI Message Stream
```python
# app.py
# pip install fastapi sse-starlette langgraph langchain-openai "langchain>=0.2" uvicorn
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette.sse import EventSourceResponse
from uuid import uuid4
import json
from typing import AsyncGenerator, List
from langgraph.graph import StateGraph, START, END
from langchain.chat_models import init_chat_model
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage, BaseMessage
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode
# --- 1) 定义 LLM + 工具并做一个最小的“LLM->工具->LLM”循环 ---
llm = init_chat_model(model="openai:gpt-4o-mini") # 自行替换模型/供应商
@tool
def get_weather(city: str) -> str:
"""Demo 工具:返回城市天气"""
return f"It is sunny in {city}"
tools = [get_weather]
model_with_tools = llm.bind_tools(tools)
tool_node = ToolNode(tools)
class GraphState(dict):
# 仅需 messages用 LangChain BaseMessage 列表承载对话与工具来回
messages: List[BaseMessage]
def call_model(state: GraphState):
resp = model_with_tools.invoke(state["messages"])
return {"messages": [resp]}
def call_tools(state: GraphState):
last = state["messages"][-1]
if isinstance(last, AIMessage) and last.tool_calls:
# ToolNode 会根据 AIMessage.tool_calls 并行执行工具并返回 ToolMessage
return tool_node.invoke({"messages": [last]})
return {"messages": []}
builder = StateGraph(GraphState)
builder.add_node("llm", call_model)
builder.add_node("tools", call_tools)
builder.add_edge(START, "llm")
# 如果 llm 触发了工具,则进 tools否则结束
builder.add_conditional_edges(
"llm",
lambda s: "tools" if isinstance(s["messages"][-1], AIMessage) and s["messages"][-1].tool_calls else END,
{"tools": "tools", END: END},
)
builder.add_edge("tools", "llm")
graph = builder.compile()
# --- 2) FastAPI 基础 + CORS ---
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产建议收紧
allow_methods=["*"],
allow_headers=["*"],
)
def sse_json(obj: dict) -> str:
# AI SDK UI Message Stream: 每条 SSE 用 data: <json>\n\n
return f"data: {json.dumps(obj, ensure_ascii=False)}\n\n"
# --- 3) /chat按 UI Message Stream 协议发 SSE ---
@app.post("/chat")
async def chat(req: Request):
payload = await req.json()
ui_messages = payload.get("messages", [])
# 将 UIMessage[] 转成 LangChain BaseMessage 列表(最简:只拼 text 部分)
history: List[BaseMessage] = []
for m in ui_messages:
role = m["role"]
text = "".join(p.get("text", "") for p in m.get("parts", []) if p["type"] == "text")
if role == "user":
history.append(HumanMessage(text))
elif role == "assistant":
history.append(AIMessage(text))
message_id = f"msg_{uuid4().hex}"
text_id = f"txt_{uuid4().hex}"
async def event_stream() -> AsyncGenerator[str, None]:
# 必备start → text-start
yield sse_json({"type": "start", "messageId": message_id})
yield sse_json({"type": "text-start", "id": text_id})
try:
# 同时订阅 token 与 step 更新messages / updates 两种 stream mode
# messages: token-by-tokenupdates: 每步状态(含 ToolMessage
async for mode, chunk in graph.astream(
{"messages": history},
stream_mode=["messages", "updates"], # 关键参数
):
if await req.is_disconnected():
break
if mode == "messages":
message_chunk, meta = chunk # (token/message_piece, metadata)
# LangGraph 的 messages 模式会不断给出 LLM token 或段落
if getattr(message_chunk, "content", None):
yield sse_json({"type": "text-delta", "id": text_id, "delta": message_chunk.content})
elif mode == "updates":
# updates 是 { node_name: { "messages": [...] } } 这样的增量
for _node, delta in chunk.items():
msgs = delta.get("messages") or []
for m in msgs:
if isinstance(m, ToolMessage):
# 把工具结果作为 UI 的 tool 输出分片
yield sse_json({
"type": "tool-output-available",
"toolCallId": m.tool_call_id or f"tool_{uuid4().hex}",
"output": m.content,
})
# 收尾text-end → finish → [DONE]
yield sse_json({"type": "text-end", "id": text_id})
yield sse_json({"type": "finish"})
except Exception as e:
# 可选:错误分片
yield sse_json({"type": "error", "errorText": str(e)})
yield "data: [DONE]\n\n"
# 关键响应头:让 AI SDK 按 UI Message Stream 协议解析
headers = {"x-vercel-ai-ui-message-stream": "v1"}
return EventSourceResponse(event_stream(), headers=headers)
```
**为什么可行?**
* LangGraph Python 的 `stream_mode` 支持 `messages`token 流)、`updates`(每步增量)、`values/custom/debug` 等;你可以在一次 `astream` 中订多种模式,并据此映射为前端可渲染的“分片”。([LangChain AI][2])
* AI SDK v5 的前端默认吃 **UI Message StreamSSE**,只要你用上面这些分片类型(`text-*``tool-output-available``finish``[DONE]`)并加 `x-vercel-ai-ui-message-stream: v1` 头,就能被 `useChat()` / Elements 的 `<Conversation/>` 实时渲染。([AI SDK][1])
---
# 前端Elements/`useChat` 指到你的 FastAPI
在你的 Elements/Next.js 页面里,把 `useChat` 的传输 `api` 指到 FastAPI 的 `/chat`
```tsx
// app/page.tsx
'use client';
import { useChat, DefaultChatTransport } from 'ai';
export default function Chat() {
const { messages, sendMessage, addToolResult } = useChat({
transport: new DefaultChatTransport({
api: 'http://localhost:8000/chat', // 直连 FastAPI
}),
});
// ... 渲染 messages.partstext / tool-xxx 等)
}
```
> `useChat` 默认就是 UI Message Stream 协议;你可以像官方“工具用法”示例那样渲染 `parts`,包含 `tool-*` 类型与不同 `state`。([AI SDK][3])
---
## 可选进阶(按需添加)
* **流式展示“思考/理由”**:从后端发 `reasoning-start/delta/end` 分片即可。([AI SDK][1])
* **显示检索/来源**:用 `source-url` / `source-document` 分片附上链接或文件元信息。([AI SDK][1])
* **多步边界**:在每次 LLM 调用复用/衔接时添加 `start-step` / `finish-step`,前端就能画分隔线。([AI SDK][3])
* **自定义进度/指标**:任意结构都可以用 `data-*`(如 `data-agent-step`),前端自定义解析。([AI SDK][1])
---
## 调试与提示
* **CORS**:不同域名访问 FastAPI 请开启 CORS示例已放开生产请白名单
* **只做文本最小闭环**:如果暂时不展示工具,在后端只发 `text-*` & `finish` 也能跑通。([AI SDK][1])
* **LangGraph 事件丰富**:需要更细的“工具入参流”(`tool-input-*`)或更完整的节点/子图进度,用 `messages` + `updates`/`custom` 模式组合拿到足够上下文,再映射到对应分片。([LangChain AI][2])
---
[1]: https://ai-sdk.dev/docs/ai-sdk-ui/stream-protocol "AI SDK UI: Stream Protocols"
[2]: https://langchain-ai.github.io/langgraph/how-tos/streaming/ "Stream outputs"
[3]: https://ai-sdk.dev/docs/ai-sdk-ui/chatbot-tool-usage "AI SDK UI: Chatbot Tool Usage"