太好了,Python 版 LangGraph + FastAPI 完全可以和 AI SDK Elements 的 Chatbot 做**原生流式对接**,而且不需要 Node/Next.js 后端:只要你的 FastAPI 按 **AI SDK v5 的 UI Message Stream 协议**发 **SSE** 就能被 `useChat()`/Elements 直接吃下。下面给你一套**最小可跑模板**(含工具调用输出)。 > 要点(来自官方协议):用 **SSE**,响应头加 `x-vercel-ai-ui-message-stream: v1`,依次发 `start → text-start → text-delta* → text-end → finish → [DONE]`;如要展示工具,发 `tool-output-available` 等分片。([AI SDK][1]) --- # 服务器(FastAPI + LangGraph,SSE 输出 UI Message Stream) ```python # app.py # pip install fastapi sse-starlette langgraph langchain-openai "langchain>=0.2" uvicorn from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from sse_starlette.sse import EventSourceResponse from uuid import uuid4 import json from typing import AsyncGenerator, List from langgraph.graph import StateGraph, START, END from langchain.chat_models import init_chat_model from langchain_core.messages import HumanMessage, AIMessage, ToolMessage, BaseMessage from langchain_core.tools import tool from langgraph.prebuilt import ToolNode # --- 1) 定义 LLM + 工具,并做一个最小的“LLM->工具->LLM”循环 --- llm = init_chat_model(model="openai:gpt-4o-mini") # 自行替换模型/供应商 @tool def get_weather(city: str) -> str: """Demo 工具:返回城市天气""" return f"It is sunny in {city}" tools = [get_weather] model_with_tools = llm.bind_tools(tools) tool_node = ToolNode(tools) class GraphState(dict): # 仅需 messages,用 LangChain BaseMessage 列表承载对话与工具来回 messages: List[BaseMessage] def call_model(state: GraphState): resp = model_with_tools.invoke(state["messages"]) return {"messages": [resp]} def call_tools(state: GraphState): last = state["messages"][-1] if isinstance(last, AIMessage) and last.tool_calls: # ToolNode 会根据 AIMessage.tool_calls 并行执行工具并返回 ToolMessage return tool_node.invoke({"messages": [last]}) return {"messages": []} builder = StateGraph(GraphState) builder.add_node("llm", call_model) builder.add_node("tools", call_tools) builder.add_edge(START, "llm") # 如果 llm 触发了工具,则进 tools;否则结束 builder.add_conditional_edges( "llm", lambda s: "tools" if isinstance(s["messages"][-1], AIMessage) and s["messages"][-1].tool_calls else END, {"tools": "tools", END: END}, ) builder.add_edge("tools", "llm") graph = builder.compile() # --- 2) FastAPI 基础 + CORS --- app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], # 生产建议收紧 allow_methods=["*"], allow_headers=["*"], ) def sse_json(obj: dict) -> str: # AI SDK UI Message Stream: 每条 SSE 用 data: \n\n return f"data: {json.dumps(obj, ensure_ascii=False)}\n\n" # --- 3) /chat:按 UI Message Stream 协议发 SSE --- @app.post("/chat") async def chat(req: Request): payload = await req.json() ui_messages = payload.get("messages", []) # 将 UIMessage[] 转成 LangChain BaseMessage 列表(最简:只拼 text 部分) history: List[BaseMessage] = [] for m in ui_messages: role = m["role"] text = "".join(p.get("text", "") for p in m.get("parts", []) if p["type"] == "text") if role == "user": history.append(HumanMessage(text)) elif role == "assistant": history.append(AIMessage(text)) message_id = f"msg_{uuid4().hex}" text_id = f"txt_{uuid4().hex}" async def event_stream() -> AsyncGenerator[str, None]: # 必备:start → text-start yield sse_json({"type": "start", "messageId": message_id}) yield sse_json({"type": "text-start", "id": text_id}) try: # 同时订阅 token 与 step 更新:messages / updates 两种 stream mode # messages: token-by-token;updates: 每步状态(含 ToolMessage) async for mode, chunk in graph.astream( {"messages": history}, stream_mode=["messages", "updates"], # 关键参数 ): if await req.is_disconnected(): break if mode == "messages": message_chunk, meta = chunk # (token/message_piece, metadata) # LangGraph 的 messages 模式会不断给出 LLM token 或段落 if getattr(message_chunk, "content", None): yield sse_json({"type": "text-delta", "id": text_id, "delta": message_chunk.content}) elif mode == "updates": # updates 是 { node_name: { "messages": [...] } } 这样的增量 for _node, delta in chunk.items(): msgs = delta.get("messages") or [] for m in msgs: if isinstance(m, ToolMessage): # 把工具结果作为 UI 的 tool 输出分片 yield sse_json({ "type": "tool-output-available", "toolCallId": m.tool_call_id or f"tool_{uuid4().hex}", "output": m.content, }) # 收尾:text-end → finish → [DONE] yield sse_json({"type": "text-end", "id": text_id}) yield sse_json({"type": "finish"}) except Exception as e: # 可选:错误分片 yield sse_json({"type": "error", "errorText": str(e)}) yield "data: [DONE]\n\n" # 关键响应头:让 AI SDK 按 UI Message Stream 协议解析 headers = {"x-vercel-ai-ui-message-stream": "v1"} return EventSourceResponse(event_stream(), headers=headers) ``` **为什么可行?** * LangGraph Python 的 `stream_mode` 支持 `messages`(token 流)、`updates`(每步增量)、`values/custom/debug` 等;你可以在一次 `astream` 中订多种模式,并据此映射为前端可渲染的“分片”。([LangChain AI][2]) * AI SDK v5 的前端默认吃 **UI Message Stream(SSE)**,只要你用上面这些分片类型(`text-*`、`tool-output-available`、`finish`、`[DONE]`)并加 `x-vercel-ai-ui-message-stream: v1` 头,就能被 `useChat()` / Elements 的 `` 实时渲染。([AI SDK][1]) --- # 前端(Elements/`useChat` 指到你的 FastAPI) 在你的 Elements/Next.js 页面里,把 `useChat` 的传输 `api` 指到 FastAPI 的 `/chat`: ```tsx // app/page.tsx 'use client'; import { useChat, DefaultChatTransport } from 'ai'; export default function Chat() { const { messages, sendMessage, addToolResult } = useChat({ transport: new DefaultChatTransport({ api: 'http://localhost:8000/chat', // 直连 FastAPI }), }); // ... 渲染 messages.parts(text / tool-xxx 等) } ``` > `useChat` 默认就是 UI Message Stream 协议;你可以像官方“工具用法”示例那样渲染 `parts`,包含 `tool-*` 类型与不同 `state`。([AI SDK][3]) --- ## 可选进阶(按需添加) * **流式展示“思考/理由”**:从后端发 `reasoning-start/delta/end` 分片即可。([AI SDK][1]) * **显示检索/来源**:用 `source-url` / `source-document` 分片附上链接或文件元信息。([AI SDK][1]) * **多步边界**:在每次 LLM 调用复用/衔接时添加 `start-step` / `finish-step`,前端就能画分隔线。([AI SDK][3]) * **自定义进度/指标**:任意结构都可以用 `data-*`(如 `data-agent-step`),前端自定义解析。([AI SDK][1]) --- ## 调试与提示 * **CORS**:不同域名访问 FastAPI 请开启 CORS(示例已放开,生产请白名单)。 * **只做文本最小闭环**:如果暂时不展示工具,在后端只发 `text-*` & `finish` 也能跑通。([AI SDK][1]) * **LangGraph 事件丰富**:需要更细的“工具入参流”(`tool-input-*`)或更完整的节点/子图进度,用 `messages` + `updates`/`custom` 模式组合拿到足够上下文,再映射到对应分片。([LangChain AI][2]) --- [1]: https://ai-sdk.dev/docs/ai-sdk-ui/stream-protocol "AI SDK UI: Stream Protocols" [2]: https://langchain-ai.github.io/langgraph/how-tos/streaming/ "Stream outputs" [3]: https://ai-sdk.dev/docs/ai-sdk-ui/chatbot-tool-usage "AI SDK UI: Chatbot Tool Usage"