Files
LaodingBot/internal/agent/orchestrator.go

2002 lines
66 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package agent
import (
"context"
"encoding/json"
"fmt"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
"laodingbot/internal/knowledge"
"laodingbot/internal/llm"
"laodingbot/internal/logger"
"laodingbot/internal/memory"
"laodingbot/internal/tools"
)
// StreamEventType 定义流式输出事件类型
type StreamEventType string
const (
StreamEventTypeThought StreamEventType = "thought" // LLM 思考过程
StreamEventTypeToolCall StreamEventType = "tool_call" // 工具调用请求
StreamEventTypeToolResult StreamEventType = "tool_result" // 工具执行结果
StreamEventTypeFinal StreamEventType = "final" // 最终答案
StreamEventTypeError StreamEventType = "error" // 错误信息
StreamEventTypeWorkspaceStart StreamEventType = "workspace_start" // 工具渲染开始
StreamEventTypeWorkspaceDelta StreamEventType = "workspace_delta" // 工具渲染增量内容
StreamEventTypeWorkspaceEnd StreamEventType = "workspace_end" // 工具渲染结束
)
// StreamEvent 代表流式输出中的一个事件
type StreamEvent struct {
Type StreamEventType `json:"type"`
Content string `json:"content"`
Step int `json:"step,omitempty"`
ToolName string `json:"tool_name,omitempty"`
WorkspaceTitle string `json:"workspace_title,omitempty"`
}
// StreamEventCallback 是流式事件回调函数类型,用于推送事件到客户端
type StreamEventCallback func(event StreamEvent) error
// Orchestrator 负责协调和组合业务逻辑,包含 LLM 计算、上下文管理、技能匹配计算和工具调用。
type Orchestrator struct {
llm llm.Client
routerLLM llm.Client // 可选:轻量路由模型,用于技能意图路由;为 nil 则仅用关键词匹配
store *memory.SQLiteStore
tools *tools.Registry
soul string
skills []knowledge.Skill
skillSummaries []knowledge.SkillSummary
skillsDir string
autoSkillDir string
gapDraftTriggerCount int
gapLookbackDuration time.Duration
reactMaxStep int
enableCapabilityGap bool
autoSkillDraftEnabled bool
log *logger.Logger
skillsMu sync.RWMutex
pendingFilesMu sync.Mutex
pendingFiles map[string][]pendingFileRef
planningSessionsMu sync.Mutex
planningSessions map[string]planningSessionState
}
type pendingFileRef struct {
ID string
Name string
MimeType string
}
type planningSessionState struct {
Active bool
LastArtifact string
AwaitingConfirm bool
UpdatedAt time.Time
}
const planningSessionTTL = 2 * time.Hour
// NewOrchestrator 创建一个新的编排器对象,初始化关键路径和超时控制等。
func NewOrchestrator(
llmClient llm.Client,
routerLLM llm.Client,
store *memory.SQLiteStore,
registry *tools.Registry,
soul string,
skills []knowledge.Skill,
skillSummaries []knowledge.SkillSummary,
skillsDir string,
reactMaxStep int,
enableCapabilityGap bool,
autoSkillDir string,
gapDraftTriggerCount int,
gapLookbackDuration time.Duration,
log *logger.Logger,
) *Orchestrator {
if reactMaxStep <= 0 {
reactMaxStep = 8 // 默认最大 ReAct 步骤数为 8
}
if gapDraftTriggerCount <= 0 {
gapDraftTriggerCount = 3 // 默认触发技能生成的缺口数量为 3
}
if gapLookbackDuration <= 0 {
gapLookbackDuration = 7 * 24 * time.Hour // 默认回溯时长为 7 天
}
if strings.TrimSpace(autoSkillDir) == "" {
autoSkillDir = skillsDir
}
return &Orchestrator{
llm: llmClient,
routerLLM: routerLLM,
store: store,
tools: registry,
soul: soul,
skills: skills,
skillSummaries: copySkillSummaries(skillSummaries),
skillsDir: skillsDir,
autoSkillDir: autoSkillDir,
gapDraftTriggerCount: gapDraftTriggerCount,
gapLookbackDuration: gapLookbackDuration,
reactMaxStep: reactMaxStep,
enableCapabilityGap: enableCapabilityGap,
autoSkillDraftEnabled: false,
log: log,
pendingFiles: make(map[string][]pendingFileRef),
planningSessions: make(map[string]planningSessionState),
}
}
// HandleMessage 是接受用户消息输入并通过统一 ReAct 循环生成回复的主流程。
// 不再分"先选 skill 再决策"两步,而是 LLM 第一次调用就同时决定:
// - 是否可以直接回答is_final_answer=true
// - 是否需要调用工具action + action_input
// 循环持续进行,直到 LLM 返回 is_final_answer=true。
func (o *Orchestrator) HandleMessage(ctx context.Context, chatID, userID, text string) (string, error) {
return o.handleMessageInternal(ctx, chatID, userID, text)
}
// HandleMessageWithFiles 接收用户消息和文件,上传文件获取 file_id 并缓存,然后进入普通消息处理流程。
func (o *Orchestrator) HandleMessageWithFiles(ctx context.Context, chatID, userID, text string, files []llm.InputFile) (string, error) {
if len(files) > 0 {
ids, err := o.UploadAndCacheFiles(ctx, chatID, userID, files)
if err != nil && o.log != nil {
o.log.Warnf("upload files failed chat_id=%s err=%v", chatID, err)
}
_ = ids
}
if strings.TrimSpace(text) == "" {
return "文件已接收。请继续发送你的问题。", nil
}
return o.handleMessageInternal(ctx, chatID, userID, text)
}
// HandleMessageStream 接收用户消息并通过流式方式返回回复。
// 通过 callback 推送实时事件,包括思考过程、工具调用、工具结果和最终答案。
func (o *Orchestrator) HandleMessageStream(ctx context.Context, chatID, userID, text string, callback StreamEventCallback) (string, error) {
if callback == nil {
return "", fmt.Errorf("stream callback is required")
}
return o.handleMessageStreamInternal(ctx, chatID, userID, text, callback)
}
// HandleMessageStreamWithFiles 接收用户消息和文件,上传文件后进入流式处理流程。
func (o *Orchestrator) HandleMessageStreamWithFiles(ctx context.Context, chatID, userID, text string, files []llm.InputFile, callback StreamEventCallback) (string, error) {
if callback == nil {
return "", fmt.Errorf("stream callback is required")
}
if len(files) > 0 {
ids, err := o.UploadAndCacheFiles(ctx, chatID, userID, files)
if err != nil && o.log != nil {
o.log.Warnf("upload files failed chat_id=%s err=%v", chatID, err)
}
_ = ids
}
if strings.TrimSpace(text) == "" {
return "文件已接收。请继续发送你的问题。", nil
}
return o.handleMessageStreamInternal(ctx, chatID, userID, text, callback)
}
// GetHistory 获取指定会话的历史记录。
func (o *Orchestrator) GetHistory(chatID string, limit int) ([]memory.Message, error) {
return o.store.LoadRecent(chatID, limit)
}
// UploadAndCacheFiles 上传文件到 LLM 并缓存 file_id供后续同会话文本问答复用。
func (o *Orchestrator) UploadAndCacheFiles(ctx context.Context, chatID, userID string, files []llm.InputFile) ([]string, error) {
if len(files) == 0 {
return nil, fmt.Errorf("no files provided")
}
uploader, ok := o.llm.(llm.FileUploader)
if !ok {
return nil, fmt.Errorf("当前 LLM 客户端不支持文件上传接口")
}
var ids []string
var refs []pendingFileRef
for i, f := range files {
if strings.TrimSpace(f.FileName) == "" || len(f.Content) == 0 {
return nil, fmt.Errorf("file[%d] 缺少文件名或内容", i+1)
}
fileID, err := uploader.UploadFile(ctx, f, "file-extract")
if err != nil {
return nil, fmt.Errorf("file[%d] name=%s 上传失败: %w", i+1, f.FileName, err)
}
ids = append(ids, fileID)
refs = append(refs, pendingFileRef{
ID: fileID,
Name: strings.TrimSpace(f.FileName),
MimeType: defaultIfEmpty(strings.TrimSpace(f.MimeType), "application/octet-stream"),
})
}
o.appendPendingFiles(chatID, userID, refs)
return ids, nil
}
func (o *Orchestrator) handleMessageInternal(ctx context.Context, chatID, userID, text string) (string, error) {
// 为链路追踪设置唯一的 TraceID
traceID := logger.NewTraceID()
ctx = logger.WithTraceID(ctx, traceID)
traceLogPrefix := "trace_id=" + traceID
if o.log != nil {
o.log.Infof("%s handle message chat_id=%s user_id=%s text_len=%d", traceLogPrefix, chatID, userID, len(text))
o.log.Debugf("%s handle message text=%q", traceLogPrefix, text)
}
// 处理特殊的重载指令
if strings.EqualFold(strings.TrimSpace(text), "/reload_skills") {
if err := o.ReloadSkills(); err != nil {
return "技能热加载失败: " + err.Error(), nil
}
return "技能已热加载完成。", nil
}
// 如果用户请求能力缺口报告,则生成报告格式化输出
if strings.EqualFold(strings.TrimSpace(text), "/capability_gaps") {
report, err := o.BuildCapabilityGapReport(10)
if err != nil {
return "缺口报告生成失败: " + err.Error(), nil
}
return report, nil
}
// 保存用户消息到 SQLite 中
if err := o.store.SaveMessage(chatID, userID, "user", text); err != nil {
if o.log != nil {
o.log.Errorf("%s save user message failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return "", err
}
// 读取最近的会话记忆并压缩成 Prompt 上下文
recent, err := o.store.LoadRecent(chatID, 16)
if err != nil {
if o.log != nil {
o.log.Errorf("%s load recent failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return "", err
}
compressed := memory.CompressForPrompt(recent, 6000)
if o.log != nil {
o.log.Debugf("%s prompt context prepared chat_id=%s recent_count=%d compressed_len=%d", traceLogPrefix, chatID, len(recent), len(compressed))
}
// 进入统一 ReAct 循环
response, err := o.runUnifiedReAct(ctx, chatID, userID, compressed, text)
if err != nil {
if o.log != nil {
o.log.Errorf("%s message generation failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return "", err
}
// 最终将机器人的回复也加入记忆缓存
if err := o.store.SaveMessage(chatID, userID, "assistant", response); err != nil {
if o.log != nil {
o.log.Errorf("%s save assistant response failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return "", err
}
if o.log != nil {
o.log.Infof("%s message handled chat_id=%s response_len=%d", traceLogPrefix, chatID, len(response))
}
return response, nil
}
// handleMessageStreamInternal 处理流式消息的内部逻辑类似于handleMessageInternal但支持流式回调
func (o *Orchestrator) handleMessageStreamInternal(ctx context.Context, chatID, userID, text string, callback StreamEventCallback) (string, error) {
// 为链路追踪设置唯一的 TraceID
traceID := logger.NewTraceID()
ctx = logger.WithTraceID(ctx, traceID)
traceLogPrefix := "trace_id=" + traceID
if o.log != nil {
o.log.Infof("%s handle message stream chat_id=%s user_id=%s text_len=%d", traceLogPrefix, chatID, userID, len(text))
o.log.Debugf("%s handle message stream text=%q", traceLogPrefix, text)
}
// 处理特殊的重载指令
if strings.EqualFold(strings.TrimSpace(text), "/reload_skills") {
if err := o.ReloadSkills(); err != nil {
return "技能热加载失败: " + err.Error(), nil
}
return "技能已热加载完成。", nil
}
// 如果用户请求能力缺口报告,则生成报告格式化输出
if strings.EqualFold(strings.TrimSpace(text), "/capability_gaps") {
report, err := o.BuildCapabilityGapReport(10)
if err != nil {
return "缺口报告生成失败: " + err.Error(), nil
}
return report, nil
}
// 保存用户消息到 SQLite 中
if err := o.store.SaveMessage(chatID, userID, "user", text); err != nil {
if o.log != nil {
o.log.Errorf("%s save user message failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return "", err
}
// 读取最近的会话记忆并压缩成 Prompt 上下文
recent, err := o.store.LoadRecent(chatID, 16)
if err != nil {
if o.log != nil {
o.log.Errorf("%s load recent failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return "", err
}
compressed := memory.CompressForPrompt(recent, 6000)
if o.log != nil {
o.log.Debugf("%s stream prompt context prepared chat_id=%s recent_count=%d compressed_len=%d", traceLogPrefix, chatID, len(recent), len(compressed))
}
// 进入流式统一 ReAct 循环
response, err := o.runUnifiedReActStream(ctx, chatID, userID, compressed, text, callback)
if err != nil {
if o.log != nil {
o.log.Errorf("%s stream message generation failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return "", err
}
// 最终将机器人的回复也加入记忆缓存
if err := o.store.SaveMessage(chatID, userID, "assistant", response); err != nil {
if o.log != nil {
o.log.Errorf("%s save assistant response failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return "", err
}
if o.log != nil {
o.log.Infof("%s stream message handled chat_id=%s response_len=%d", traceLogPrefix, chatID, len(response))
}
return response, nil
}
// buildUnifiedSystemPrompt 构建统一 ReAct 循环的 system prompt。
// 工具定义通过 API 的 tools 字段传递;此处只需包含人格、技能、运行环境和思考指引。
// routedSkills 为 LLM 路由预选的技能列表;如果为 nil则回退到关键词匹配。
func (o *Orchestrator) buildUnifiedSystemPrompt(userInput string, recentMessages []memory.Message, routedSkills []knowledge.Skill, planningMode bool) string {
skillMetaDoc := o.formatSkillSummariesForPrompt()
var relevantSkillsDoc string
if routedSkills != nil {
relevantSkillsDoc = o.formatSelectedSkillsForPrompt(userInput, routedSkills)
} else {
relevantSkillsDoc = o.formatSelectedSkillsForPrompt(userInput, nil)
}
runtimeDoc := formatRuntimeContextForPrompt()
// 提取最近的 Artifact
artifactDoc := extractLastArtifact(recentMessages)
planningModeDoc := "当前未处于 PI 规划编辑模式。"
if planningMode {
planningModeDoc = strings.Join([]string{
"当前处于 PI 规划编辑模式。",
"- 用户的修订意见必须基于现有 Artifact 继续迭代。",
"- 需要继续调用 safe_pi_planning / publish_pi_plan 相关流程生成更新版本。",
"- 不要仅给普通文本答复替代蓝图更新。",
}, "\n")
}
return strings.Join([]string{
"你是一个个人自动化助手,必须遵循如下人格设定并保持一致:",
o.soul,
"",
"===== ReAct 思考指引 =====",
"你采用 ReActReasoning + Acting模式进行任务处理。",
"1. 思考优先:在做出任何行动之前,先完成内部推理,但不要把 Thought、trace、step 暴露给用户。",
"2. 工具调用如果需要获取信息或执行操作使用提供的工具函数function calling进行调用。",
"3. 观察反馈:检查工具返回的结果,据此决定下一步行动。",
"4. 最终回答:当你有足够信息时,只输出面向用户的最终文本回复,不要附带推理轨迹,不要调用工具。",
"",
"注意事项:",
"- 每次要么调用工具,要么给出最终回答,不要两者都做。",
"- 如果工具调用失败根据错误信息Traceback调整策略后重试或给出替代方案。",
"- 涉及文件、目录、命令时,优先调用工具获取真实结果,不要猜测。",
"- 如果本轮需要调用工具,可以在 assistant content 中写简短内部推理,供系统记录日志;这些内容不会直接展示给用户。",
"- 最终用户可见内容中禁止出现 Thought、Trace、Step、Observation、Action、ActionInput 等字段或标题。",
"",
"===== 运行环境 =====",
runtimeDoc,
"",
"===== 已有结果 (Artifact) =====",
artifactDoc,
"",
"===== PI 规划模式 =====",
planningModeDoc,
"",
"===== 可用技能概览 =====",
skillMetaDoc,
"",
"===== 本轮相关技能(按用户问题筛选) =====",
relevantSkillsDoc,
"",
"===== 关键约束 =====",
}, "\n")
}
// extractLastArtifact 从消息历史中提取最后一个 Artifact
func extractLastArtifact(messages []memory.Message) string {
for i := len(messages) - 1; i >= 0; i-- {
msg := messages[i]
if strings.Contains(msg.Content, "<artifact") {
start := strings.Index(msg.Content, "<artifact")
end := strings.LastIndex(msg.Content, "</artifact>")
if end > start {
return "\n当前正在处理的 Artifact 内容如下:\n" + msg.Content[start:end+11] + "\n如果你需要更新此 Artifact请输出完整的更新后版本并确保包裹在 <artifact> 标签中。"
}
}
}
return "当前没有正在处理的 Artifact。"
}
// routeSkillsWithLLM 使用轻量 LLM 模型对用户输入进行语义路由,判断是否需要加载技能以及选择哪些技能。
// 返回匹配到的技能列表可能为空切片表示不需要技能nil 表示调用失败应回退)。
func (o *Orchestrator) routeSkillsWithLLM(ctx context.Context, userInput string) ([]knowledge.Skill, error) {
traceLogPrefix := "trace_id=" + logger.TraceIDFromContext(ctx)
summaries := o.getSkillSummariesSnapshot()
if len(summaries) == 0 {
if o.log != nil {
o.log.Debugf("%s skill router: no skills available, skip", traceLogPrefix)
}
return []knowledge.Skill{}, nil
}
// 构建技能池描述
skillPool := strings.Builder{}
for _, s := range summaries {
name := strings.TrimSpace(s.Name)
desc := strings.TrimSpace(s.Description)
if name == "" {
continue
}
skillPool.WriteString("- ")
skillPool.WriteString(name)
if desc != "" {
skillPool.WriteString(": ")
skillPool.WriteString(desc)
}
skillPool.WriteString("\n")
}
routerSystemPrompt := strings.Join([]string{
"你是一个意图路由器。根据用户输入,从技能池中挑选最合适的技能。",
"",
"规则:",
"1. 如果用户的问题可以直接回答(闲聊、简单问答)或只需简单工具调用,设置 need_skills=falseselected_skills 为空数组。",
"2. 如果用户的问题涉及专业流程、复杂任务或与某个技能高度相关,设置 need_skills=true 并选择最相关的技能名称。",
"3. 最多选择 3 个技能。",
"4. 仅返回 JSON不要附加任何其他文字。",
"",
"可用技能池:",
strings.TrimSpace(skillPool.String()),
"",
"输出格式(严格 JSON",
`{"need_skills": true, "selected_skills": ["技能名称1"], "reason": "简要说明"}`,
}, "\n")
routerUserPrompt := "用户输入:" + userInput
if o.log != nil {
o.log.Debugf("%s skill router request: skills_count=%d input_len=%d", traceLogPrefix, len(summaries), len(userInput))
}
raw, err := o.routerLLM.Generate(ctx, routerSystemPrompt, routerUserPrompt)
if err != nil {
return nil, fmt.Errorf("router llm call failed: %w", err)
}
if o.log != nil {
o.log.Debugf("%s skill router response: %s", traceLogPrefix, truncateForLog(raw, 500))
}
decision, err := parseCapabilityRoute(raw)
if err != nil {
return nil, fmt.Errorf("router response parse failed: %w", err)
}
if !decision.NeedSkills || len(decision.SelectedSkills) == 0 {
if o.log != nil {
o.log.Infof("%s skill router: no skills needed, reason=%s", traceLogPrefix, decision.Reason)
}
return []knowledge.Skill{}, nil
}
// 根据路由结果匹配完整技能内容
allSkills := o.getSkillsSnapshot()
selected := matchSkillsByName(allSkills, decision.SelectedSkills)
if o.log != nil {
o.log.Infof("%s skill router: need_skills=true requested=%v matched=%d reason=%s",
traceLogPrefix, decision.SelectedSkills, len(selected), decision.Reason)
}
return selected, nil
}
// matchSkillsByName 根据名称列表从全量技能中模糊匹配。
func matchSkillsByName(allSkills []knowledge.Skill, names []string) []knowledge.Skill {
if len(names) == 0 {
return nil
}
matched := make([]knowledge.Skill, 0, len(names))
for _, wantName := range names {
want := strings.ToLower(strings.TrimSpace(wantName))
if want == "" {
continue
}
for _, sk := range allSkills {
skName := strings.ToLower(strings.TrimSpace(sk.Name))
if skName == want || strings.Contains(skName, want) || strings.Contains(want, skName) {
matched = append(matched, sk)
break
}
}
}
return matched
}
// runUnifiedReAct 执行统一的 ReAct 循环,使用原生 function calling API。
// messages 数组随交互动态增长system → history → user → assistant(tool_calls) → tool → ...
// 循环持续到 LLM 返回无 tool_calls 的纯文本回复(即最终回答)或达到安全上限。
func (o *Orchestrator) runUnifiedReAct(ctx context.Context, chatID, userID, compressedContext, userInput string) (string, error) {
traceID := logger.TraceIDFromContext(ctx)
traceLogPrefix := "trace_id=" + traceID
planningMode := false
if session, ok := o.getPlanningSession(chatID, userID); ok && session.Active {
if shouldExitPlanningMode(userInput) {
o.clearPlanningSession(chatID, userID)
if o.log != nil {
o.log.Infof("%s planning mode exited chat_id=%s", traceLogPrefix, chatID)
}
} else {
planningMode = true
}
}
// ===== LLM 意图路由:使用轻量模型判断是否需要加载技能(规划模式下使用粘性技能) =====
var routedSkills []knowledge.Skill
if planningMode {
routedSkills = o.getSafePIPlanningSkills()
if o.log != nil {
o.log.Infof("%s planning mode sticky skill activated chat_id=%s matched=%d", traceLogPrefix, chatID, len(routedSkills))
}
} else if o.routerLLM != nil {
routed, routeErr := o.routeSkillsWithLLM(ctx, userInput)
if routeErr != nil {
if o.log != nil {
o.log.Warnf("%s skill router failed, fallback to keyword matching err=%v", traceLogPrefix, routeErr)
}
// 路由失败时 routedSkills 保持 nilbuildUnifiedSystemPrompt 回退到关键词匹配
} else {
routedSkills = routed
if o.log != nil {
names := make([]string, 0, len(routedSkills))
for _, sk := range routedSkills {
names = append(names, sk.Name)
}
o.log.Infof("%s skill router selected %d skills: %v", traceLogPrefix, len(routedSkills), names)
}
}
}
if containsSafePIPlanningSkill(routedSkills) {
planningMode = true
o.activatePlanningSession(chatID, userID, "", false)
}
systemPrompt := o.buildUnifiedSystemPrompt(userInput, nil, routedSkills, planningMode)
if o.log != nil {
o.log.Infof("%s unified react start", traceLogPrefix)
o.log.Debugf("%s system_prompt_len=%d", traceLogPrefix, len(systemPrompt))
}
// 检查 LLM 客户端是否支持原生 tool_calls
toolCallClient, supportsToolCalls := o.llm.(llm.ToolCallChatClient)
if !supportsToolCalls {
if o.log != nil {
o.log.Warnf("%s llm client does not support ToolCallChatClient, falling back to legacy ReAct", traceLogPrefix)
}
return o.runLegacyReAct(ctx, chatID, userID, compressedContext, userInput)
}
// 构建初始 messages 数组
messages := make([]llm.PromptMessage, 0, 32)
messages = append(messages, llm.PromptMessage{Role: "system", Content: systemPrompt})
// 加入历史会话上下文
messages = append(messages, parseCompressedHistoryMessages(compressedContext)...)
// 加入当前用户消息
messages = append(messages, llm.PromptMessage{Role: "user", Content: userInput})
// 获取最近消息以提取 Artifact
recent, _ := o.store.LoadRecent(chatID, 16)
systemPrompt = o.buildUnifiedSystemPrompt(userInput, recent, routedSkills, planningMode)
// 更新 system message
messages[0].Content = systemPrompt
if o.log != nil {
o.log.Infof("%s unified react start", traceLogPrefix)
o.log.Debugf("%s system_prompt_len=%d", traceLogPrefix, len(systemPrompt))
}
// 构建工具定义列表(通过 API tools 字段传递)
toolDefs := o.buildToolDefinitions()
if o.log != nil {
toolNames := make([]string, 0, len(toolDefs))
for _, td := range toolDefs {
toolNames = append(toolNames, td.Function.Name)
}
o.log.Debugf("%s tool_defs_count=%d names=%v", traceLogPrefix, len(toolDefs), toolNames)
}
const maxSteps = 20
for step := 1; step <= maxSteps; step++ {
if o.log != nil {
o.log.Debugf("%s react step=%d start messages_count=%d", traceLogPrefix, step, len(messages))
}
// 调用 LLM传入完整 messages + tools 定义)
completion, err := toolCallClient.GenerateWithTools(ctx, messages, toolDefs)
if err != nil {
return "", err
}
if o.log != nil {
o.log.Debugf("%s react step=%d content_len=%d tool_calls=%d",
traceLogPrefix, step, len(completion.Content), len(completion.ToolCalls))
if completion.Content != "" {
o.log.Debugf("%s react step=%d thought=%q", traceLogPrefix, step, completion.Content)
}
}
// ========== 无 tool_calls → 最终回答 ==========
if len(completion.ToolCalls) == 0 {
finalText := sanitizeUserFacingAnswer(completion.Content)
if finalText == "" {
finalText = "已完成处理。"
}
if o.log != nil {
o.log.Debugf("%s react final at step=%d answer_len=%d", traceLogPrefix, step, len(finalText))
}
return finalText, nil
}
// ========== 有 tool_calls → 将 assistant 消息加入历史,然后执行工具 ==========
assistantMsg := llm.PromptMessage{
Role: "assistant",
Content: completion.Content,
ToolCalls: completion.ToolCalls,
}
messages = append(messages, assistantMsg)
// 逐个执行工具调用,并将结果作为 tool 角色消息加入
for _, tc := range completion.ToolCalls {
toolName := strings.ToLower(strings.TrimSpace(tc.Function.Name))
toolInput := extractToolInput(tc.Function.Arguments)
if planningMode && toolName == "create_gitea_ticket" && !isPlanningConfirmation(userInput) {
obs := formatToolErrorObservation("WAIT_USER_CONFIRM", toolName, "需用户确认后才能创建工单,请先征求用户确认")
messages = append(messages, llm.PromptMessage{
Role: "tool",
ToolCallID: tc.ID,
Name: tc.Function.Name,
Content: obs,
})
continue
}
tool, ok := o.tools.Get(toolName)
if !ok {
if o.log != nil {
o.log.Warnf("%s react step=%d tool_not_found=%s", traceLogPrefix, step, toolName)
}
messages = append(messages, llm.PromptMessage{
Role: "tool",
ToolCallID: tc.ID,
Name: tc.Function.Name,
Content: formatToolErrorObservation("TOOL_NOT_FOUND", toolName, "该工具不存在,请检查工具名称后重试"),
})
o.emitCapabilityGap(chatID, userID, userInput, "tool_not_found:"+toolName)
continue
}
if o.log != nil {
o.log.Debugf("%s react step=%d tool_call tool=%s input_len=%d", traceLogPrefix, step, toolName, len(toolInput))
o.log.Debugf("%s react step=%d tool=%s input=%q", traceLogPrefix, step, toolName, toolInput)
}
toolOut, toolErr := tool.Call(ctx, toolInput)
obs := strings.TrimSpace(toolOut)
if obs == "" {
obs = "(empty output)"
}
if isPIPlanningToolName(toolName) && toolErr == nil && strings.TrimSpace(obs) != "" {
o.activatePlanningSession(chatID, userID, obs, true)
if err := o.store.SaveMessage(chatID, userID, "assistant", wrapPIArtifact(obs)); err != nil {
if o.log != nil {
o.log.Warnf("%s save planning artifact failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
}
}
if toolErr != nil {
obs = formatToolErrorObservation("TOOL_EXEC_ERROR", toolName, toolErr.Error()) + "\nOUTPUT:\n" + obs
o.emitCapabilityGap(chatID, userID, userInput, "tool_call_failed:"+toolName)
}
// 限制观察值长度防止超出 LLM 上下文窗口
if len(obs) > 4000 {
obs = obs[:4000] + "\n...(truncated)"
}
if o.log != nil {
o.log.Debugf("%s react step=%d tool=%s observation_len=%d", traceLogPrefix, step, toolName, len(obs))
o.log.Debugf("%s react step=%d tool=%s observation=%q", traceLogPrefix, step, toolName, truncateForLog(obs, 500))
}
messages = append(messages, llm.PromptMessage{
Role: "tool",
ToolCallID: tc.ID,
Name: tc.Function.Name,
Content: obs,
})
}
}
// 达到安全上限仍未得到最终回答
o.emitCapabilityGap(chatID, userID, userInput, "react_step_exhausted")
return "我尝试了多轮推理与工具调用,但仍未得到稳定结论。请给我更具体的约束或允许我继续尝试。", nil
}
// runUnifiedReActStream 执行统一的 ReAct 循环并通过回调推送流式事件。
func (o *Orchestrator) runUnifiedReActStream(ctx context.Context, chatID, userID, compressedContext, userInput string, callback StreamEventCallback) (string, error) {
traceID := logger.TraceIDFromContext(ctx)
traceLogPrefix := "trace_id=" + traceID
planningMode := false
if session, ok := o.getPlanningSession(chatID, userID); ok && session.Active {
if shouldExitPlanningMode(userInput) {
o.clearPlanningSession(chatID, userID)
if o.log != nil {
o.log.Infof("%s planning mode exited chat_id=%s", traceLogPrefix, chatID)
}
} else {
planningMode = true
}
}
// ===== LLM 意图路由:使用轻量模型判断是否需要加载技能(规划模式下使用粘性技能) =====
var routedSkills []knowledge.Skill
if planningMode {
routedSkills = o.getSafePIPlanningSkills()
if o.log != nil {
o.log.Infof("%s planning mode sticky skill activated chat_id=%s matched=%d", traceLogPrefix, chatID, len(routedSkills))
}
} else if o.routerLLM != nil {
routed, routeErr := o.routeSkillsWithLLM(ctx, userInput)
if routeErr != nil {
if o.log != nil {
o.log.Warnf("%s skill router failed, fallback to keyword matching err=%v", traceLogPrefix, routeErr)
}
} else {
routedSkills = routed
if o.log != nil {
names := make([]string, 0, len(routedSkills))
for _, sk := range routedSkills {
names = append(names, sk.Name)
}
o.log.Infof("%s skill router selected %d skills: %v", traceLogPrefix, len(routedSkills), names)
}
}
}
if containsSafePIPlanningSkill(routedSkills) {
planningMode = true
o.activatePlanningSession(chatID, userID, "", false)
}
routedToSafePIPlanning := false
for _, sk := range routedSkills {
name := strings.ToLower(strings.TrimSpace(sk.Name))
if strings.Contains(name, "safe") || strings.Contains(name, "pi 规划") || strings.Contains(name, "pi planning") {
routedToSafePIPlanning = true
break
}
}
systemPrompt := o.buildUnifiedSystemPrompt(userInput, nil, routedSkills, planningMode)
if o.log != nil {
o.log.Infof("%s unified react stream start", traceLogPrefix)
o.log.Debugf("%s system_prompt_len=%d", traceLogPrefix, len(systemPrompt))
}
// 检查 LLM 客户端是否支持原生 tool_calls
toolCallClient, supportsToolCalls := o.llm.(llm.ToolCallChatClient)
if !supportsToolCalls {
if o.log != nil {
o.log.Warnf("%s llm client does not support ToolCallChatClient, stream mode not available", traceLogPrefix)
}
return "", fmt.Errorf("stream mode requires ToolCallChatClient support")
}
// 构建初始 messages 数组
messages := make([]llm.PromptMessage, 0, 32)
messages = append(messages, llm.PromptMessage{Role: "system", Content: systemPrompt})
messages = append(messages, parseCompressedHistoryMessages(compressedContext)...)
messages = append(messages, llm.PromptMessage{Role: "user", Content: userInput})
// 获取最近消息以提取 Artifact
recent, _ := o.store.LoadRecent(chatID, 16)
systemPrompt = o.buildUnifiedSystemPrompt(userInput, recent, routedSkills, planningMode)
// 更新 system message
messages[0].Content = systemPrompt
if o.log != nil {
o.log.Infof("%s unified react stream start", traceLogPrefix)
o.log.Debugf("%s system_prompt_len=%d", traceLogPrefix, len(systemPrompt))
}
// 构建工具定义列表
toolDefs := o.buildToolDefinitions()
if o.log != nil {
toolNames := make([]string, 0, len(toolDefs))
for _, td := range toolDefs {
toolNames = append(toolNames, td.Function.Name)
}
o.log.Debugf("%s tool_defs_count=%d names=%v", traceLogPrefix, len(toolDefs), toolNames)
}
workspaceSentThisTurn := false
const maxSteps = 20
for step := 1; step <= maxSteps; step++ {
if o.log != nil {
o.log.Debugf("%s react stream step=%d start messages_count=%d", traceLogPrefix, step, len(messages))
}
// 调用 LLM
completion, err := toolCallClient.GenerateWithTools(ctx, messages, toolDefs)
if err != nil {
return "", err
}
if o.log != nil {
o.log.Debugf("%s react stream step=%d content_len=%d tool_calls=%d",
traceLogPrefix, step, len(completion.Content), len(completion.ToolCalls))
if completion.Content != "" {
o.log.Debugf("%s react stream step=%d thought=%q", traceLogPrefix, step, completion.Content)
}
}
// 推送思考过程事件(剥离 artifact 标签,避免前端重复渲染工作区)
if completion.Content != "" {
displayContent := completion.Content
if planningMode {
displayContent = stripArtifactTags(displayContent)
}
if displayContent != "" {
// 分割内容为逐步推送的片段
segments := splitContentIntoSegments(displayContent, 50) // 每段50字符
for _, segment := range segments {
if err := callback(StreamEvent{
Type: StreamEventTypeThought,
Content: segment,
Step: step,
}); err != nil {
return "", fmt.Errorf("callback error: %w", err)
}
}
}
}
// ========== 无 tool_calls → 最终回答 ==========
if len(completion.ToolCalls) == 0 {
finalText := sanitizeUserFacingAnswer(completion.Content)
if finalText == "" {
finalText = "已完成处理。"
}
if planningMode && !workspaceSentThisTurn {
// 提取 artifact 标签内的文档内容作为工作区内容
workspaceContent := extractArtifactContent(finalText)
if workspaceContent == "" {
workspaceContent = stripArtifactTags(finalText)
}
if workspaceContent == "" {
workspaceContent = finalText
}
o.activatePlanningSession(chatID, userID, workspaceContent, true)
if err := o.store.SaveMessage(chatID, userID, "assistant", wrapPIArtifact(workspaceContent)); err != nil {
if o.log != nil {
o.log.Warnf("%s save planning artifact failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
}
if err := callback(StreamEvent{
Type: StreamEventTypeWorkspaceStart,
WorkspaceTitle: "PI Planning Document",
}); err != nil {
return "", fmt.Errorf("callback error: %w", err)
}
if err := callback(StreamEvent{
Type: StreamEventTypeWorkspaceDelta,
Content: workspaceContent,
}); err != nil {
return "", fmt.Errorf("callback error: %w", err)
}
if err := callback(StreamEvent{Type: StreamEventTypeWorkspaceEnd}); err != nil {
return "", fmt.Errorf("callback error: %w", err)
}
workspaceSentThisTurn = true
finalText = "我已根据你的意见更新 PI 规划,请查看右侧工作区;如确认无误,请回复“确认”进入工单创建。"
} else if planningMode {
// 工作区已在本轮通过工具调用发送,剥离 final 内容中的 artifact 标签
finalText = stripArtifactTags(finalText)
if finalText == "" {
finalText = "已完成处理。"
}
}
if o.log != nil {
o.log.Debugf("%s react stream final at step=%d answer_len=%d", traceLogPrefix, step, len(finalText))
}
// 推送最终答案事件
if err := callback(StreamEvent{
Type: StreamEventTypeFinal,
Content: finalText,
Step: step,
}); err != nil {
return "", fmt.Errorf("callback error: %w", err)
}
return finalText, nil
}
// ========== 有 tool_calls → 执行工具 ==========
assistantMsg := llm.PromptMessage{
Role: "assistant",
Content: completion.Content,
ToolCalls: completion.ToolCalls,
}
messages = append(messages, assistantMsg)
// 逐个执行工具调用
for _, tc := range completion.ToolCalls {
toolName := strings.ToLower(strings.TrimSpace(tc.Function.Name))
toolInput := extractToolInput(tc.Function.Arguments)
// 推送工具调用事件
if err := callback(StreamEvent{
Type: StreamEventTypeToolCall,
Content: toolInput,
Step: step,
ToolName: toolName,
}); err != nil {
return "", fmt.Errorf("callback error: %w", err)
}
tool, ok := o.tools.Get(toolName)
if !ok {
if o.log != nil {
o.log.Warnf("%s react stream step=%d tool_not_found=%s", traceLogPrefix, step, toolName)
}
// 推送错误事件
errMsg := "工具不存在:" + toolName
if err := callback(StreamEvent{
Type: StreamEventTypeError,
Content: errMsg,
Step: step,
ToolName: toolName,
}); err != nil {
return "", fmt.Errorf("callback error: %w", err)
}
messages = append(messages, llm.PromptMessage{
Role: "tool",
ToolCallID: tc.ID,
Name: tc.Function.Name,
Content: formatToolErrorObservation("TOOL_NOT_FOUND", toolName, "该工具不存在,请检查工具名称后重试"),
})
o.emitCapabilityGap(chatID, userID, userInput, "tool_not_found:"+toolName)
continue
}
if o.log != nil {
o.log.Debugf("%s react stream step=%d tool_call tool=%s input_len=%d", traceLogPrefix, step, toolName, len(toolInput))
o.log.Debugf("%s react stream step=%d tool=%s input=%q", traceLogPrefix, step, toolName, toolInput)
}
if planningMode && toolName == "create_gitea_ticket" && !isPlanningConfirmation(userInput) {
obs := formatToolErrorObservation("WAIT_USER_CONFIRM", toolName, "需用户确认后才能创建工单,请先征求用户确认")
if err := callback(StreamEvent{
Type: StreamEventTypeToolResult,
Content: obs,
Step: step,
ToolName: toolName,
}); err != nil {
return "", fmt.Errorf("callback error: %w", err)
}
messages = append(messages, llm.PromptMessage{
Role: "tool",
ToolCallID: tc.ID,
Name: tc.Function.Name,
Content: obs,
})
continue
}
// 当路由命中 SAFe PI 规划技能或调用 publish_pi_plan 工具时,触发 workspace 事件。
isArtifactTool := strings.Contains(toolName, "safe_pi_planning") || toolName == "publish_pi_plan" || (routedToSafePIPlanning && strings.Contains(toolName, "publish_pi_plan"))
if isArtifactTool {
if err := callback(StreamEvent{
Type: StreamEventTypeWorkspaceStart,
WorkspaceTitle: "PI Planning Document",
ToolName: toolName,
}); err != nil {
return "", fmt.Errorf("callback error: %w", err)
}
workspaceSentThisTurn = true
}
toolOut, toolErr := tool.Call(ctx, toolInput)
obs := strings.TrimSpace(toolOut)
if obs == "" {
obs = "(empty output)"
}
if isArtifactTool && toolErr == nil {
o.activatePlanningSession(chatID, userID, obs, true)
if err := o.store.SaveMessage(chatID, userID, "assistant", wrapPIArtifact(obs)); err != nil {
if o.log != nil {
o.log.Warnf("%s save planning artifact failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
}
// 将工具输出通过 workspace_delta 发送给前端
if err := callback(StreamEvent{
Type: StreamEventTypeWorkspaceDelta,
Content: obs,
}); err != nil {
return "", fmt.Errorf("callback error: %w", err)
}
if err := callback(StreamEvent{
Type: StreamEventTypeWorkspaceEnd,
}); err != nil {
return "", fmt.Errorf("callback error: %w", err)
}
workspaceSentThisTurn = true
}
if toolErr != nil {
obs = formatToolErrorObservation("TOOL_EXEC_ERROR", toolName, toolErr.Error()) + "\nOUTPUT:\n" + obs
o.emitCapabilityGap(chatID, userID, userInput, "tool_call_failed:"+toolName)
}
// 限制观察值长度防止超出 LLM 上下文窗口
if len(obs) > 4000 {
obs = obs[:4000] + "\n...(truncated)"
}
if o.log != nil {
o.log.Debugf("%s react stream step=%d tool=%s observation_len=%d", traceLogPrefix, step, toolName, len(obs))
o.log.Debugf("%s react stream step=%d tool=%s observation=%q", traceLogPrefix, step, toolName, truncateForLog(obs, 500))
}
// 推送工具结果事件
if err := callback(StreamEvent{
Type: StreamEventTypeToolResult,
Content: obs,
Step: step,
ToolName: toolName,
}); err != nil {
return "", fmt.Errorf("callback error: %w", err)
}
messages = append(messages, llm.PromptMessage{
Role: "tool",
ToolCallID: tc.ID,
Name: tc.Function.Name,
Content: obs,
})
}
}
// 达到安全上限
o.emitCapabilityGap(chatID, userID, userInput, "react_step_exhausted")
errMsg := "我尝试了多轮推理与工具调用,但仍未得到稳定结论。请给我更具体的约束或允许我继续尝试。"
_ = callback(StreamEvent{
Type: StreamEventTypeError,
Content: errMsg,
})
return errMsg, nil
}
// runLegacyReAct 是旧版基于 JSON 决策解析的 ReAct 循环,作为不支持 tool_calls 的 LLM 的降级方案。
func (o *Orchestrator) runLegacyReAct(ctx context.Context, chatID, userID, compressedContext, userInput string) (string, error) {
traceID := logger.TraceIDFromContext(ctx)
traceLogPrefix := "trace_id=" + traceID
systemPrompt := o.buildLegacySystemPrompt(userInput)
const maxSteps = 20
scratchpad := ""
for step := 1; step <= maxSteps; step++ {
if o.log != nil {
o.log.Debugf("%s legacy react step=%d start", traceLogPrefix, step)
}
messages := buildReActMessages(systemPrompt, compressedContext, userInput, scratchpad)
raw, err := o.generateMessages(ctx, messages)
if err != nil {
return "", err
}
decision, err := parseDecision(raw)
if err != nil {
o.emitCapabilityGap(chatID, userID, userInput, "react_parse_failed")
return strings.TrimSpace(raw), nil
}
if decision.IsFinalAnswer {
finalText := ""
if decision.FinalAnswer != nil {
finalText = sanitizeUserFacingAnswer(*decision.FinalAnswer)
}
if finalText == "" {
finalText = sanitizeUserFacingAnswer(decision.Thought)
}
if finalText == "" {
finalText = "已完成处理。"
}
return finalText, nil
}
action := strings.ToLower(strings.TrimSpace(decision.Action))
if action == "" || action == "none" {
scratchpad += "Step " + strconv.Itoa(step) + " Thought: " + decision.Thought + "\n"
scratchpad += "Step " + strconv.Itoa(step) + " Observation: 你没有指定要调用的工具,请重新决策。\n"
continue
}
actionInput := decision.GetActionInputString()
tool, ok := o.tools.Get(action)
if !ok {
scratchpad += "Step " + strconv.Itoa(step) + " Thought: " + decision.Thought + "\n"
scratchpad += "Step " + strconv.Itoa(step) + " Action: " + action + "\n"
scratchpad += "Step " + strconv.Itoa(step) + " Observation: " + formatToolErrorObservation("TOOL_NOT_FOUND", action, "该工具不存在") + "\n"
o.emitCapabilityGap(chatID, userID, userInput, "tool_not_found:"+action)
continue
}
toolOut, toolErr := tool.Call(ctx, actionInput)
obs := strings.TrimSpace(toolOut)
if obs == "" {
obs = "(empty output)"
}
if toolErr != nil {
obs = formatToolErrorObservation("TOOL_EXEC_ERROR", action, toolErr.Error()) + "\nOUTPUT:\n" + obs
o.emitCapabilityGap(chatID, userID, userInput, "tool_call_failed:"+action)
}
if len(obs) > 4000 {
obs = obs[:4000] + "\n...(truncated)"
}
scratchpad += "Step " + strconv.Itoa(step) + " Thought: " + decision.Thought + "\n"
scratchpad += "Step " + strconv.Itoa(step) + " Action: " + action + "\n"
scratchpad += "Step " + strconv.Itoa(step) + " ActionInput: " + actionInput + "\n"
scratchpad += "Step " + strconv.Itoa(step) + " Observation: " + obs + "\n"
}
o.emitCapabilityGap(chatID, userID, userInput, "react_step_exhausted")
return "我尝试了多轮推理与工具调用,但仍未得到稳定结论。请给我更具体的约束或允许我继续尝试。", nil
}
// buildLegacySystemPrompt 为不支持 tool_calls 的旧版 ReAct 链路构建 system prompt含 JSON 输出格式约束)。
func (o *Orchestrator) buildLegacySystemPrompt(userInput string) string {
skillMetaDoc := o.formatSkillSummariesForPrompt()
relevantSkillsDoc := o.formatSelectedSkillsForPrompt(userInput, nil)
toolDoc := o.formatToolDoc()
runtimeDoc := formatRuntimeContextForPrompt()
return strings.Join([]string{
"你是一个个人自动化助手,必须遵循如下人格设定并保持一致:",
o.soul,
"",
"===== 运行环境 =====",
runtimeDoc,
"",
"===== 可用技能概览 =====",
skillMetaDoc,
"",
"===== 本轮相关技能 =====",
relevantSkillsDoc,
"",
"===== 可用工具 =====",
toolDoc,
"",
"===== 输出格式约束 =====",
"你必须使用 ReAct 模式进行决策。每次回复必须是且仅是一个 JSON 对象:",
"{",
" \"thought\": \"你的推理过程(必填)\",",
" \"action\": \"要调用的工具名称(不调工具时填 none\",",
" \"action_input\": \"传给工具的输入\",",
" \"is_final_answer\": true 或 false,",
" \"final_answer\": \"当 is_final_answer=true 时填写给用户的最终回复\"",
"}",
}, "\n")
}
// buildToolDefinitions 将工具注册表转换为 OpenAI function calling 所需的 ToolDefinition 列表。
func (o *Orchestrator) buildToolDefinitions() []llm.ToolDefinition {
list := o.tools.List()
defs := make([]llm.ToolDefinition, 0, len(list))
defaultParams := json.RawMessage(`{"type":"object","properties":{"input":{"type":"string","description":"工具的输入命令或查询内容"}},"required":["input"]}`)
sort.Slice(list, func(i, j int) bool {
return list[i].Name() < list[j].Name()
})
for _, t := range list {
defs = append(defs, llm.ToolDefinition{
Type: "function",
Function: llm.ToolFunctionDef{
Name: t.Name(),
Description: t.Description(),
Parameters: defaultParams,
},
})
}
return defs
}
// extractToolInput 从 LLM 的 function calling arguments JSON 中提取工具输入字符串。
func extractToolInput(arguments string) string {
arguments = strings.TrimSpace(arguments)
if arguments == "" {
return ""
}
var args struct {
Input string `json:"input"`
}
if err := json.Unmarshal([]byte(arguments), &args); err != nil {
// 降级:直接将 arguments 作为输入
return arguments
}
if args.Input != "" {
return args.Input
}
return arguments
}
func (o *Orchestrator) generateMessages(ctx context.Context, messages []llm.PromptMessage) (string, error) {
if client, ok := o.llm.(llm.MessageChatClient); ok {
return client.GenerateMessages(ctx, messages)
}
systemPrompt, userPrompt := fallbackPromptsFromMessages(messages)
return o.llm.Generate(ctx, systemPrompt, userPrompt)
}
func buildReActMessages(systemPrompt, compressedContext, userInput, scratchpad string) []llm.PromptMessage {
msgs := make([]llm.PromptMessage, 0, 16)
msgs = append(msgs, llm.PromptMessage{Role: "system", Content: systemPrompt})
msgs = append(msgs, parseCompressedHistoryMessages(compressedContext)...)
if strings.TrimSpace(scratchpad) != "" {
msgs = append(msgs, llm.PromptMessage{Role: "assistant", Content: "推理记录:\n" + strings.TrimSpace(scratchpad)})
}
msgs = append(msgs, llm.PromptMessage{Role: "user", Content: userInput})
return msgs
}
func parseCompressedHistoryMessages(compressed string) []llm.PromptMessage {
compressed = strings.TrimSpace(compressed)
if compressed == "" {
return nil
}
lines := strings.Split(compressed, "\n")
out := make([]llm.PromptMessage, 0, len(lines))
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
idx := strings.Index(line, ":")
if idx <= 0 {
out = append(out, llm.PromptMessage{Role: "assistant", Content: line})
continue
}
role := strings.ToLower(strings.TrimSpace(line[:idx]))
content := strings.TrimSpace(line[idx+1:])
if role != "system" && role != "user" && role != "assistant" {
role = "assistant"
}
out = append(out, llm.PromptMessage{Role: role, Content: content})
}
return out
}
func fallbackPromptsFromMessages(messages []llm.PromptMessage) (string, string) {
sysParts := make([]string, 0, 2)
userParts := make([]string, 0, len(messages))
for _, m := range messages {
role := strings.ToLower(strings.TrimSpace(m.Role))
content := strings.TrimSpace(m.Content)
if content == "" {
continue
}
if role == "system" {
sysParts = append(sysParts, content)
continue
}
userParts = append(userParts, role+": "+content)
}
return strings.Join(sysParts, "\n\n"), strings.Join(userParts, "\n")
}
func nonEmptyIDs(ids []string) []string {
if len(ids) == 0 {
return nil
}
out := make([]string, 0, len(ids))
seen := map[string]struct{}{}
for _, id := range ids {
id = strings.TrimSpace(id)
if id == "" {
continue
}
if _, ok := seen[id]; ok {
continue
}
seen[id] = struct{}{}
out = append(out, id)
}
return out
}
func (o *Orchestrator) appendPendingFiles(chatID, userID string, refs []pendingFileRef) {
refs = sanitizePendingRefs(refs)
if len(refs) == 0 {
return
}
key := pendingFileKey(chatID, userID)
o.pendingFilesMu.Lock()
defer o.pendingFilesMu.Unlock()
merged := append(o.pendingFiles[key], refs...)
o.pendingFiles[key] = sanitizePendingRefs(merged)
}
func (o *Orchestrator) getPendingFiles(chatID, userID string) []pendingFileRef {
key := pendingFileKey(chatID, userID)
o.pendingFilesMu.Lock()
defer o.pendingFilesMu.Unlock()
snapshot := o.pendingFiles[key]
out := make([]pendingFileRef, len(snapshot))
copy(out, snapshot)
return out
}
func (o *Orchestrator) clearPendingFiles(chatID, userID string) {
key := pendingFileKey(chatID, userID)
o.pendingFilesMu.Lock()
defer o.pendingFilesMu.Unlock()
delete(o.pendingFiles, key)
}
func pendingFileKey(chatID, userID string) string {
return strings.TrimSpace(chatID) + "::" + strings.TrimSpace(userID)
}
func sanitizePendingRefs(refs []pendingFileRef) []pendingFileRef {
if len(refs) == 0 {
return nil
}
out := make([]pendingFileRef, 0, len(refs))
seen := map[string]struct{}{}
for _, r := range refs {
id := strings.TrimSpace(r.ID)
if id == "" {
continue
}
if _, ok := seen[id]; ok {
continue
}
seen[id] = struct{}{}
r.ID = id
r.Name = strings.TrimSpace(r.Name)
r.MimeType = strings.TrimSpace(r.MimeType)
out = append(out, r)
}
return out
}
func defaultIfEmpty(v, fallback string) string {
if strings.TrimSpace(v) == "" {
return fallback
}
return v
}
func shouldExitPlanningMode(userInput string) bool {
text := strings.ToLower(strings.TrimSpace(userInput))
if text == "" {
return false
}
markers := []string{"退出规划", "结束规划", "关闭分屏", "结束分屏", "停止规划", "cancel planning", "exit planning"}
for _, marker := range markers {
if strings.Contains(text, marker) {
return true
}
}
return false
}
func isPlanningConfirmation(userInput string) bool {
text := strings.ToLower(strings.TrimSpace(userInput))
if text == "" {
return false
}
markers := []string{
"确认", "同意", "可以创建", "开始创建", "继续创建", "执行下一步", "没问题,继续",
"confirm", "approved", "go ahead", "proceed",
}
for _, marker := range markers {
if strings.Contains(text, marker) {
return true
}
}
return false
}
func containsSafePIPlanningSkill(skills []knowledge.Skill) bool {
for _, sk := range skills {
if isSafePIPlanningSkill(sk) {
return true
}
}
return false
}
func isSafePIPlanningSkill(sk knowledge.Skill) bool {
name := strings.ToLower(strings.TrimSpace(sk.Name))
source := strings.ToLower(strings.TrimSpace(sk.Source))
if strings.Contains(name, "safe") || strings.Contains(name, "pi 规划") || strings.Contains(name, "pi planning") {
return true
}
if strings.Contains(source, "safe_pi_planning") {
return true
}
return false
}
func (o *Orchestrator) getSafePIPlanningSkills() []knowledge.Skill {
all := o.getSkillsSnapshot()
out := make([]knowledge.Skill, 0, 1)
for _, sk := range all {
if isSafePIPlanningSkill(sk) {
out = append(out, sk)
}
}
return out
}
func isPIPlanningToolName(toolName string) bool {
t := strings.ToLower(strings.TrimSpace(toolName))
return strings.Contains(t, "safe_pi_planning") || t == "publish_pi_plan"
}
func wrapPIArtifact(markdown string) string {
content := strings.TrimSpace(markdown)
if content == "" {
return ""
}
return "<artifact type=\"safe_pi_planning\" title=\"PI Planning Document\">\n" + content + "\n</artifact>"
}
func (o *Orchestrator) activatePlanningSession(chatID, userID, artifact string, awaitingConfirm bool) {
key := pendingFileKey(chatID, userID)
o.planningSessionsMu.Lock()
defer o.planningSessionsMu.Unlock()
state := o.planningSessions[key]
state.Active = true
if strings.TrimSpace(artifact) != "" {
state.LastArtifact = strings.TrimSpace(artifact)
}
state.AwaitingConfirm = awaitingConfirm
state.UpdatedAt = time.Now().UTC()
o.planningSessions[key] = state
}
func (o *Orchestrator) clearPlanningSession(chatID, userID string) {
key := pendingFileKey(chatID, userID)
o.planningSessionsMu.Lock()
defer o.planningSessionsMu.Unlock()
delete(o.planningSessions, key)
}
func (o *Orchestrator) getPlanningSession(chatID, userID string) (planningSessionState, bool) {
key := pendingFileKey(chatID, userID)
o.planningSessionsMu.Lock()
defer o.planningSessionsMu.Unlock()
state, ok := o.planningSessions[key]
if !ok {
return planningSessionState{}, false
}
if time.Since(state.UpdatedAt) > planningSessionTTL {
delete(o.planningSessions, key)
return planningSessionState{}, false
}
return state, true
}
// formatRelevantSkillsForPrompt 返回与当前用户问题最相关的技能内容。
func (o *Orchestrator) formatSelectedSkillsForPrompt(userInput string, selected []knowledge.Skill) string {
skills := selected
if len(skills) == 0 {
skills = o.selectRelevantSkills(userInput, 4)
}
if len(skills) == 0 {
return "(none matched, tools are still globally available)"
}
return formatSkills(skills)
}
func (o *Orchestrator) selectRelevantSkills(userInput string, maxCount int) []knowledge.Skill {
if maxCount <= 0 {
maxCount = 4
}
query := strings.TrimSpace(strings.ToLower(userInput))
all := o.getSkillsSnapshot()
if query == "" || len(all) <= maxCount {
return all
}
queryTokens := buildQueryTokens(query)
type item struct {
skill knowledge.Skill
score int
}
ranked := make([]item, 0, len(all))
for _, sk := range all {
hay := strings.ToLower(sk.Name + "\n" + clipForScoring(sk.Content, 1800))
score := 0
if strings.Contains(hay, query) {
score += 8
}
for _, tk := range queryTokens {
if strings.Contains(hay, tk) {
score++
}
}
if score == 0 {
continue
}
ranked = append(ranked, item{skill: sk, score: score})
if o.log != nil {
o.log.Debugf("selectRelevantSkills skill=%q score=%d", sk.Name, score)
}
}
if len(ranked) == 0 {
return nil
}
sort.Slice(ranked, func(i, j int) bool {
if ranked[i].score == ranked[j].score {
return strings.ToLower(strings.TrimSpace(ranked[i].skill.Name)) < strings.ToLower(strings.TrimSpace(ranked[j].skill.Name))
}
return ranked[i].score > ranked[j].score
})
if len(ranked) > maxCount {
ranked = ranked[:maxCount]
}
out := make([]knowledge.Skill, 0, len(ranked))
for _, r := range ranked {
out = append(out, r.skill)
}
if o.log != nil {
selectedNames := make([]string, 0, len(out))
for _, sk := range out {
selectedNames = append(selectedNames, sk.Name)
}
o.log.Debugf("selectRelevantSkills query=%q matched=%d selected=%v", query, len(ranked), selectedNames)
}
return out
}
func buildQueryTokens(query string) []string {
set := map[string]struct{}{}
collectToken := func(t string) {
t = strings.TrimSpace(t)
if len([]rune(t)) < 2 {
return
}
set[t] = struct{}{}
}
for _, part := range strings.FieldsFunc(query, func(r rune) bool {
if r >= 'a' && r <= 'z' {
return false
}
if r >= '0' && r <= '9' {
return false
}
if r >= 0x4e00 && r <= 0x9fff {
return false
}
return true
}) {
collectToken(part)
}
// 针对中文无空格输入,补充 2-gram 提升匹配命中率。
runes := []rune(query)
for i := 0; i+1 < len(runes); i++ {
r1 := runes[i]
r2 := runes[i+1]
if (r1 >= 0x4e00 && r1 <= 0x9fff) && (r2 >= 0x4e00 && r2 <= 0x9fff) {
collectToken(string([]rune{r1, r2}))
}
}
out := make([]string, 0, len(set))
for tk := range set {
out = append(out, tk)
}
sort.Strings(out)
return out
}
func clipForScoring(s string, maxRunes int) string {
if maxRunes <= 0 {
maxRunes = 1800
}
r := []rune(s)
if len(r) <= maxRunes {
return s
}
return string(r[:maxRunes])
}
func formatRuntimeContextForPrompt() string {
goos := strings.TrimSpace(strings.ToLower(runtime.GOOS))
if goos == "" {
goos = "unknown"
}
return "当前运行系统 GOOS=" + goos + "。请优先使用与该系统一致的策略。仅当用户明确要求时,才采用其他系统(如 Windows的专用流程。"
}
// emitCapabilityGap 处理能力缺口信息埋点或者通过 AI 自动创建生成相应缺失技能的逻辑
func (o *Orchestrator) emitCapabilityGap(chatID, userID, intent, reason string) {
if !o.enableCapabilityGap {
return
}
intent = strings.TrimSpace(intent)
reason = strings.TrimSpace(reason)
if intent == "" || reason == "" {
return
}
if len(intent) > 1000 {
intent = intent[:1000] // 防止恶意使用超长 payload
}
if len(reason) > 240 {
reason = reason[:240] // 保证状态长度在 DB 内正常可用
}
if err := o.store.SaveCapabilityGap(chatID, userID, intent, reason); err != nil && o.log != nil {
o.log.Warnf("save capability gap failed chat_id=%s user_id=%s err=%v", chatID, userID, err)
return
}
// 暂时关闭基于历史能力缺口的自动技能草稿生成。
if !o.autoSkillDraftEnabled {
if o.log != nil {
o.log.Debugf("auto skill draft generation disabled temporarily")
}
return
}
// 提取出高频率缺口并在超出阈值后进行 draft 生成
clusters, err := o.store.TopCapabilityGapClusters(20, time.Now().UTC().Add(-o.gapLookbackDuration))
if err != nil {
if o.log != nil {
o.log.Warnf("query capability gap clusters failed err=%v", err)
}
return
}
for _, c := range clusters {
if c.Count < o.gapDraftTriggerCount {
continue
}
path, created, draftErr := knowledge.GenerateSkillDraft(c, o.autoSkillDir)
if draftErr != nil {
if o.log != nil {
o.log.Warnf("generate skill draft failed intent_key=%s reason=%s err=%v", c.IntentKey, c.Reason, draftErr)
}
continue
}
if created && o.log != nil {
o.log.Infof("capability gap draft generated path=%s intent_key=%s reason=%s count=%d", path, c.IntentKey, c.Reason, c.Count)
}
// 如果生成了新技能则将它们重新加载进环境
if created {
if reloadErr := o.ReloadSkills(); reloadErr != nil && o.log != nil {
o.log.Warnf("auto reload skills failed after generation path=%s err=%v", path, reloadErr)
}
}
}
}
// ReloadSkills 会从提供的技能目录动态从最新存储位置载入所有技能定义而不重启系统。
func (o *Orchestrator) ReloadSkills() error {
skills, err := knowledge.LoadSkillSet(o.skillsDir)
if err != nil {
return err
}
summaries, err := knowledge.LoadSkillSummaries(o.skillsDir)
if err != nil {
return err
}
// 利用 RWMutex 做热更新保护
o.skillsMu.Lock()
o.skills = skills
o.skillSummaries = copySkillSummaries(summaries)
o.skillsMu.Unlock()
if o.log != nil {
o.log.Infof("skills hot reloaded count=%d dir=%s", len(skills), o.skillsDir)
}
return nil
}
func (o *Orchestrator) getSkillsSnapshot() []knowledge.Skill {
o.skillsMu.RLock()
defer o.skillsMu.RUnlock()
out := make([]knowledge.Skill, len(o.skills))
copy(out, o.skills)
return out
}
func (o *Orchestrator) getSkillSummariesSnapshot() []knowledge.SkillSummary {
o.skillsMu.RLock()
defer o.skillsMu.RUnlock()
return copySkillSummaries(o.skillSummaries)
}
// BuildCapabilityGapReport 生成指定数量以内的近期高频缺失功能报错并格式化成报表。
func (o *Orchestrator) BuildCapabilityGapReport(limit int) (string, error) {
clusters, err := o.store.TopCapabilityGapClusters(limit, time.Now().UTC().Add(-o.gapLookbackDuration))
if err != nil {
return "", err
}
if len(clusters) == 0 {
return "最近没有采集到能力缺口记录。", nil
}
b := strings.Builder{}
b.WriteString("高频能力缺口清单:\n")
for i, c := range clusters {
line := fmt.Sprintf("%d) intent=%s | reason=%s | count=%d | last_seen=%s\n", i+1, c.IntentKey, c.Reason, c.Count, c.LastSeenAt.Format("2006-01-02 15:04:05"))
b.WriteString(line)
}
b.WriteString("\n草稿目录")
b.WriteString(o.autoSkillDir)
b.WriteString("\n系统会在达到阈值后自动生成并热加载技能你也可以手动发送 /reload_skills。")
return b.String(), nil
}
func (o *Orchestrator) formatSkillSummariesForPrompt() string {
summaries := o.getSkillSummariesSnapshot()
if len(summaries) == 0 {
return "(none)"
}
sort.Slice(summaries, func(i, j int) bool {
left := strings.ToLower(strings.TrimSpace(summaries[i].DirName))
right := strings.ToLower(strings.TrimSpace(summaries[j].DirName))
if left == right {
return strings.ToLower(strings.TrimSpace(summaries[i].Name)) < strings.ToLower(strings.TrimSpace(summaries[j].Name))
}
return left < right
})
b := strings.Builder{}
for _, summary := range summaries {
dir := strings.TrimSpace(summary.DirName)
name := strings.TrimSpace(summary.Name)
desc := strings.TrimSpace(summary.Description)
if name == "" {
continue
}
if len(desc) > 220 {
desc = desc[:220]
}
b.WriteString("- ")
if dir != "" {
b.WriteString("[")
b.WriteString(dir)
b.WriteString("] ")
}
b.WriteString(name)
if desc != "" {
b.WriteString(" => ")
b.WriteString(desc)
}
b.WriteString("\n")
}
return strings.TrimSpace(b.String())
}
func copySkillSummaries(in []knowledge.SkillSummary) []knowledge.SkillSummary {
out := make([]knowledge.SkillSummary, len(in))
copy(out, in)
for i := range out {
out[i].DirName = strings.TrimSpace(out[i].DirName)
out[i].Name = strings.TrimSpace(out[i].Name)
out[i].Description = strings.TrimSpace(out[i].Description)
}
return out
}
func formatToolErrorObservation(code, action, reason string) string {
code = strings.TrimSpace(code)
action = strings.TrimSpace(action)
reason = strings.TrimSpace(reason)
if code == "" {
code = "TOOL_EXEC_ERROR"
}
if action == "" {
action = "unknown"
}
if reason == "" {
reason = "unknown error"
}
return "ERROR_CODE=" + code + "; TOOL=" + action + "; REASON=" + reason
}
func formatSkills(skills []knowledge.Skill) string {
b := strings.Builder{}
for _, skill := range skills {
b.WriteString("## ")
b.WriteString(skill.Name)
b.WriteString("\n")
b.WriteString(skill.Content)
b.WriteString("\n\n")
}
return strings.TrimSpace(b.String())
}
func (o *Orchestrator) formatToolDoc() string {
list := o.tools.List()
if len(list) == 0 {
return "(none)"
}
sort.Slice(list, func(i, j int) bool {
return list[i].Name() < list[j].Name()
})
b := strings.Builder{}
for _, t := range list {
b.WriteString("- ")
b.WriteString(t.Name())
b.WriteString(": ")
b.WriteString(t.Description())
b.WriteString("\n")
}
return strings.TrimSpace(b.String())
}
func truncateForLog(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen] + "...(truncated)"
}
func sanitizeUserFacingAnswer(raw string) string {
raw = strings.ReplaceAll(raw, "\r\n", "\n")
raw = strings.TrimSpace(raw)
if raw == "" {
return ""
}
markers := []string{"Final Answer:", "Final Answer", "最终回答:", "最终回答:", "最终答案:", "最终答案:", "Answer:", "Answer"}
for _, marker := range markers {
idx := strings.LastIndex(raw, marker)
if idx >= 0 {
candidate := strings.TrimSpace(raw[idx+len(marker):])
if candidate != "" {
return candidate
}
}
}
lines := strings.Split(raw, "\n")
cleaned := make([]string, 0, len(lines))
for _, line := range lines {
trimmed := strings.TrimSpace(line)
if trimmed == "" {
if len(cleaned) > 0 && cleaned[len(cleaned)-1] != "" {
cleaned = append(cleaned, "")
}
continue
}
lower := strings.ToLower(trimmed)
if strings.HasPrefix(lower, "thought:") || strings.HasPrefix(lower, "trace:") || strings.HasPrefix(lower, "observation:") ||
strings.HasPrefix(lower, "action:") || strings.HasPrefix(lower, "actioninput:") || strings.HasPrefix(lower, "action input:") ||
strings.HasPrefix(lower, "step ") || strings.HasPrefix(trimmed, "思考:") || strings.HasPrefix(trimmed, "思考:") ||
strings.HasPrefix(trimmed, "推理:") || strings.HasPrefix(trimmed, "推理:") || strings.HasPrefix(trimmed, "观察:") ||
strings.HasPrefix(trimmed, "观察:") || strings.HasPrefix(trimmed, "行动:") || strings.HasPrefix(trimmed, "行动:") ||
strings.HasPrefix(trimmed, "步骤 ") {
continue
}
cleaned = append(cleaned, trimmed)
}
for len(cleaned) > 0 && cleaned[len(cleaned)-1] == "" {
cleaned = cleaned[:len(cleaned)-1]
}
if len(cleaned) == 0 {
return ""
}
return strings.TrimSpace(strings.Join(cleaned, "\n"))
}
// stripArtifactTags removes <artifact ...>...</artifact> blocks from text,
// returning only the surrounding non-artifact content.
func stripArtifactTags(s string) string {
for {
start := strings.Index(s, "<artifact")
if start == -1 {
break
}
end := strings.Index(s[start:], "</artifact>")
if end == -1 {
s = strings.TrimSpace(s[:start])
break
}
end += start + len("</artifact>")
before := strings.TrimSpace(s[:start])
after := strings.TrimSpace(s[end:])
if before != "" && after != "" {
s = before + "\n\n" + after
} else {
s = before + after
}
}
return strings.TrimSpace(s)
}
// extractArtifactContent returns the text inside the first <artifact ...>...</artifact> block.
// Returns empty string if no artifact tags are found.
func extractArtifactContent(s string) string {
start := strings.Index(s, "<artifact")
if start == -1 {
return ""
}
tagEnd := strings.Index(s[start:], ">")
if tagEnd == -1 {
return ""
}
contentStart := start + tagEnd + 1
end := strings.Index(s, "</artifact>")
if end == -1 {
return strings.TrimSpace(s[contentStart:])
}
return strings.TrimSpace(s[contentStart:end])
}
// splitContentIntoSegments splits a string into smaller segments of the specified size (by rune count).
func splitContentIntoSegments(content string, segmentSize int) []string {
runes := []rune(content)
var segments []string
for start := 0; start < len(runes); start += segmentSize {
end := start + segmentSize
if end > len(runes) {
end = len(runes)
}
segments = append(segments, string(runes[start:end]))
}
return segments
}