197 lines
8.4 KiB
Markdown
197 lines
8.4 KiB
Markdown
|
|
太好了,Python 版 LangGraph + FastAPI 完全可以和 AI SDK Elements 的 Chatbot 做**原生流式对接**,而且不需要 Node/Next.js 后端:只要你的 FastAPI 按 **AI SDK v5 的 UI Message Stream 协议**发 **SSE** 就能被 `useChat()`/Elements 直接吃下。下面给你一套**最小可跑模板**(含工具调用输出)。
|
|||
|
|
|
|||
|
|
> 要点(来自官方协议):用 **SSE**,响应头加 `x-vercel-ai-ui-message-stream: v1`,依次发 `start → text-start → text-delta* → text-end → finish → [DONE]`;如要展示工具,发 `tool-output-available` 等分片。([AI SDK][1])
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
# 服务器(FastAPI + LangGraph,SSE 输出 UI Message Stream)
|
|||
|
|
|
|||
|
|
```python
|
|||
|
|
# app.py
|
|||
|
|
# pip install fastapi sse-starlette langgraph langchain-openai "langchain>=0.2" uvicorn
|
|||
|
|
from fastapi import FastAPI, Request
|
|||
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|||
|
|
from sse_starlette.sse import EventSourceResponse
|
|||
|
|
from uuid import uuid4
|
|||
|
|
import json
|
|||
|
|
from typing import AsyncGenerator, List
|
|||
|
|
|
|||
|
|
from langgraph.graph import StateGraph, START, END
|
|||
|
|
from langchain.chat_models import init_chat_model
|
|||
|
|
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage, BaseMessage
|
|||
|
|
from langchain_core.tools import tool
|
|||
|
|
from langgraph.prebuilt import ToolNode
|
|||
|
|
|
|||
|
|
# --- 1) 定义 LLM + 工具,并做一个最小的“LLM->工具->LLM”循环 ---
|
|||
|
|
llm = init_chat_model(model="openai:gpt-4o-mini") # 自行替换模型/供应商
|
|||
|
|
|
|||
|
|
@tool
|
|||
|
|
def get_weather(city: str) -> str:
|
|||
|
|
"""Demo 工具:返回城市天气"""
|
|||
|
|
return f"It is sunny in {city}"
|
|||
|
|
|
|||
|
|
tools = [get_weather]
|
|||
|
|
model_with_tools = llm.bind_tools(tools)
|
|||
|
|
tool_node = ToolNode(tools)
|
|||
|
|
|
|||
|
|
class GraphState(dict):
|
|||
|
|
# 仅需 messages,用 LangChain BaseMessage 列表承载对话与工具来回
|
|||
|
|
messages: List[BaseMessage]
|
|||
|
|
|
|||
|
|
def call_model(state: GraphState):
|
|||
|
|
resp = model_with_tools.invoke(state["messages"])
|
|||
|
|
return {"messages": [resp]}
|
|||
|
|
|
|||
|
|
def call_tools(state: GraphState):
|
|||
|
|
last = state["messages"][-1]
|
|||
|
|
if isinstance(last, AIMessage) and last.tool_calls:
|
|||
|
|
# ToolNode 会根据 AIMessage.tool_calls 并行执行工具并返回 ToolMessage
|
|||
|
|
return tool_node.invoke({"messages": [last]})
|
|||
|
|
return {"messages": []}
|
|||
|
|
|
|||
|
|
builder = StateGraph(GraphState)
|
|||
|
|
builder.add_node("llm", call_model)
|
|||
|
|
builder.add_node("tools", call_tools)
|
|||
|
|
builder.add_edge(START, "llm")
|
|||
|
|
# 如果 llm 触发了工具,则进 tools;否则结束
|
|||
|
|
builder.add_conditional_edges(
|
|||
|
|
"llm",
|
|||
|
|
lambda s: "tools" if isinstance(s["messages"][-1], AIMessage) and s["messages"][-1].tool_calls else END,
|
|||
|
|
{"tools": "tools", END: END},
|
|||
|
|
)
|
|||
|
|
builder.add_edge("tools", "llm")
|
|||
|
|
graph = builder.compile()
|
|||
|
|
|
|||
|
|
# --- 2) FastAPI 基础 + CORS ---
|
|||
|
|
app = FastAPI()
|
|||
|
|
app.add_middleware(
|
|||
|
|
CORSMiddleware,
|
|||
|
|
allow_origins=["*"], # 生产建议收紧
|
|||
|
|
allow_methods=["*"],
|
|||
|
|
allow_headers=["*"],
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def sse_json(obj: dict) -> str:
|
|||
|
|
# AI SDK UI Message Stream: 每条 SSE 用 data: <json>\n\n
|
|||
|
|
return f"data: {json.dumps(obj, ensure_ascii=False)}\n\n"
|
|||
|
|
|
|||
|
|
# --- 3) /chat:按 UI Message Stream 协议发 SSE ---
|
|||
|
|
@app.post("/chat")
|
|||
|
|
async def chat(req: Request):
|
|||
|
|
payload = await req.json()
|
|||
|
|
ui_messages = payload.get("messages", [])
|
|||
|
|
|
|||
|
|
# 将 UIMessage[] 转成 LangChain BaseMessage 列表(最简:只拼 text 部分)
|
|||
|
|
history: List[BaseMessage] = []
|
|||
|
|
for m in ui_messages:
|
|||
|
|
role = m["role"]
|
|||
|
|
text = "".join(p.get("text", "") for p in m.get("parts", []) if p["type"] == "text")
|
|||
|
|
if role == "user":
|
|||
|
|
history.append(HumanMessage(text))
|
|||
|
|
elif role == "assistant":
|
|||
|
|
history.append(AIMessage(text))
|
|||
|
|
|
|||
|
|
message_id = f"msg_{uuid4().hex}"
|
|||
|
|
text_id = f"txt_{uuid4().hex}"
|
|||
|
|
|
|||
|
|
async def event_stream() -> AsyncGenerator[str, None]:
|
|||
|
|
# 必备:start → text-start
|
|||
|
|
yield sse_json({"type": "start", "messageId": message_id})
|
|||
|
|
yield sse_json({"type": "text-start", "id": text_id})
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 同时订阅 token 与 step 更新:messages / updates 两种 stream mode
|
|||
|
|
# messages: token-by-token;updates: 每步状态(含 ToolMessage)
|
|||
|
|
async for mode, chunk in graph.astream(
|
|||
|
|
{"messages": history},
|
|||
|
|
stream_mode=["messages", "updates"], # 关键参数
|
|||
|
|
):
|
|||
|
|
if await req.is_disconnected():
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
if mode == "messages":
|
|||
|
|
message_chunk, meta = chunk # (token/message_piece, metadata)
|
|||
|
|
# LangGraph 的 messages 模式会不断给出 LLM token 或段落
|
|||
|
|
if getattr(message_chunk, "content", None):
|
|||
|
|
yield sse_json({"type": "text-delta", "id": text_id, "delta": message_chunk.content})
|
|||
|
|
|
|||
|
|
elif mode == "updates":
|
|||
|
|
# updates 是 { node_name: { "messages": [...] } } 这样的增量
|
|||
|
|
for _node, delta in chunk.items():
|
|||
|
|
msgs = delta.get("messages") or []
|
|||
|
|
for m in msgs:
|
|||
|
|
if isinstance(m, ToolMessage):
|
|||
|
|
# 把工具结果作为 UI 的 tool 输出分片
|
|||
|
|
yield sse_json({
|
|||
|
|
"type": "tool-output-available",
|
|||
|
|
"toolCallId": m.tool_call_id or f"tool_{uuid4().hex}",
|
|||
|
|
"output": m.content,
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
# 收尾:text-end → finish → [DONE]
|
|||
|
|
yield sse_json({"type": "text-end", "id": text_id})
|
|||
|
|
yield sse_json({"type": "finish"})
|
|||
|
|
except Exception as e:
|
|||
|
|
# 可选:错误分片
|
|||
|
|
yield sse_json({"type": "error", "errorText": str(e)})
|
|||
|
|
|
|||
|
|
yield "data: [DONE]\n\n"
|
|||
|
|
|
|||
|
|
# 关键响应头:让 AI SDK 按 UI Message Stream 协议解析
|
|||
|
|
headers = {"x-vercel-ai-ui-message-stream": "v1"}
|
|||
|
|
return EventSourceResponse(event_stream(), headers=headers)
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**为什么可行?**
|
|||
|
|
|
|||
|
|
* LangGraph Python 的 `stream_mode` 支持 `messages`(token 流)、`updates`(每步增量)、`values/custom/debug` 等;你可以在一次 `astream` 中订多种模式,并据此映射为前端可渲染的“分片”。([LangChain AI][2])
|
|||
|
|
* AI SDK v5 的前端默认吃 **UI Message Stream(SSE)**,只要你用上面这些分片类型(`text-*`、`tool-output-available`、`finish`、`[DONE]`)并加 `x-vercel-ai-ui-message-stream: v1` 头,就能被 `useChat()` / Elements 的 `<Conversation/>` 实时渲染。([AI SDK][1])
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
# 前端(Elements/`useChat` 指到你的 FastAPI)
|
|||
|
|
|
|||
|
|
在你的 Elements/Next.js 页面里,把 `useChat` 的传输 `api` 指到 FastAPI 的 `/chat`:
|
|||
|
|
|
|||
|
|
```tsx
|
|||
|
|
// app/page.tsx
|
|||
|
|
'use client';
|
|||
|
|
import { useChat, DefaultChatTransport } from 'ai';
|
|||
|
|
|
|||
|
|
export default function Chat() {
|
|||
|
|
const { messages, sendMessage, addToolResult } = useChat({
|
|||
|
|
transport: new DefaultChatTransport({
|
|||
|
|
api: 'http://localhost:8000/chat', // 直连 FastAPI
|
|||
|
|
}),
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// ... 渲染 messages.parts(text / tool-xxx 等)
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
> `useChat` 默认就是 UI Message Stream 协议;你可以像官方“工具用法”示例那样渲染 `parts`,包含 `tool-*` 类型与不同 `state`。([AI SDK][3])
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 可选进阶(按需添加)
|
|||
|
|
|
|||
|
|
* **流式展示“思考/理由”**:从后端发 `reasoning-start/delta/end` 分片即可。([AI SDK][1])
|
|||
|
|
* **显示检索/来源**:用 `source-url` / `source-document` 分片附上链接或文件元信息。([AI SDK][1])
|
|||
|
|
* **多步边界**:在每次 LLM 调用复用/衔接时添加 `start-step` / `finish-step`,前端就能画分隔线。([AI SDK][3])
|
|||
|
|
* **自定义进度/指标**:任意结构都可以用 `data-*`(如 `data-agent-step`),前端自定义解析。([AI SDK][1])
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 调试与提示
|
|||
|
|
|
|||
|
|
* **CORS**:不同域名访问 FastAPI 请开启 CORS(示例已放开,生产请白名单)。
|
|||
|
|
* **只做文本最小闭环**:如果暂时不展示工具,在后端只发 `text-*` & `finish` 也能跑通。([AI SDK][1])
|
|||
|
|
* **LangGraph 事件丰富**:需要更细的“工具入参流”(`tool-input-*`)或更完整的节点/子图进度,用 `messages` + `updates`/`custom` 模式组合拿到足够上下文,再映射到对应分片。([LangChain AI][2])
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
|
|||
|
|
[1]: https://ai-sdk.dev/docs/ai-sdk-ui/stream-protocol "AI SDK UI: Stream Protocols"
|
|||
|
|
[2]: https://langchain-ai.github.io/langgraph/how-tos/streaming/ "Stream outputs"
|
|||
|
|
[3]: https://ai-sdk.dev/docs/ai-sdk-ui/chatbot-tool-usage "AI SDK UI: Chatbot Tool Usage"
|