package agent import ( "context" "fmt" "sort" "strconv" "strings" "sync" "time" "laodingbot/internal/knowledge" "laodingbot/internal/llm" "laodingbot/internal/logger" "laodingbot/internal/memory" "laodingbot/internal/tools" ) // Orchestrator 负责协调和组合业务逻辑,包含 LLM 计算、上下文管理、技能匹配计算和工具调用。 type Orchestrator struct { llm llm.Client store *memory.SQLiteStore tools *tools.Registry soul string skills []knowledge.Skill skillSummaries []knowledge.SkillSummary skillsDir string autoSkillDir string gapDraftTriggerCount int gapLookbackDuration time.Duration reactMaxStep int enableCapabilityGap bool log *logger.Logger skillsMu sync.RWMutex } // NewOrchestrator 创建一个新的编排器对象,初始化关键路径和超时控制等。 func NewOrchestrator( llmClient llm.Client, store *memory.SQLiteStore, registry *tools.Registry, soul string, skills []knowledge.Skill, skillSummaries []knowledge.SkillSummary, skillsDir string, reactMaxStep int, enableCapabilityGap bool, autoSkillDir string, gapDraftTriggerCount int, gapLookbackDuration time.Duration, log *logger.Logger, ) *Orchestrator { if reactMaxStep <= 0 { reactMaxStep = 8 // 默认最大 ReAct 步骤数为 8 } if gapDraftTriggerCount <= 0 { gapDraftTriggerCount = 3 // 默认触发技能生成的缺口数量为 3 } if gapLookbackDuration <= 0 { gapLookbackDuration = 7 * 24 * time.Hour // 默认回溯时长为 7 天 } if strings.TrimSpace(autoSkillDir) == "" { autoSkillDir = skillsDir } return &Orchestrator{ llm: llmClient, store: store, tools: registry, soul: soul, skills: skills, skillSummaries: copySkillSummaries(skillSummaries), skillsDir: skillsDir, autoSkillDir: autoSkillDir, gapDraftTriggerCount: gapDraftTriggerCount, gapLookbackDuration: gapLookbackDuration, reactMaxStep: reactMaxStep, enableCapabilityGap: enableCapabilityGap, log: log, } } // HandleMessage 是接受用户消息输入并通过统一 ReAct 循环生成回复的主流程。 // 不再分"先选 skill 再决策"两步,而是 LLM 第一次调用就同时决定: // - 是否可以直接回答(is_final_answer=true) // - 是否需要调用工具(action + action_input) // 循环持续进行,直到 LLM 返回 is_final_answer=true。 func (o *Orchestrator) HandleMessage(ctx context.Context, chatID, userID, text string) (string, error) { // 为链路追踪设置唯一的 TraceID traceID := logger.NewTraceID() ctx = logger.WithTraceID(ctx, traceID) traceLogPrefix := "trace_id=" + traceID if o.log != nil { o.log.Infof("%s handle message chat_id=%s user_id=%s text_len=%d", traceLogPrefix, chatID, userID, len(text)) o.log.Debugf("%s handle message text=%q", traceLogPrefix, text) } // 处理特殊的重载指令 if strings.EqualFold(strings.TrimSpace(text), "/reload_skills") { if err := o.ReloadSkills(); err != nil { return "技能热加载失败: " + err.Error(), nil } return "技能已热加载完成。", nil } // 如果用户请求能力缺口报告,则生成报告格式化输出 if strings.EqualFold(strings.TrimSpace(text), "/capability_gaps") { report, err := o.BuildCapabilityGapReport(10) if err != nil { return "缺口报告生成失败: " + err.Error(), nil } return report, nil } // 保存用户消息到 SQLite 中 if err := o.store.SaveMessage(chatID, userID, "user", text); err != nil { if o.log != nil { o.log.Errorf("%s save user message failed chat_id=%s err=%v", traceLogPrefix, chatID, err) } return "", err } // 读取最近的会话记忆并压缩成 Prompt 上下文 recent, err := o.store.LoadRecent(chatID, 16) if err != nil { if o.log != nil { o.log.Errorf("%s load recent failed chat_id=%s err=%v", traceLogPrefix, chatID, err) } return "", err } compressed := memory.CompressForPrompt(recent, 6000) if o.log != nil { o.log.Debugf("%s prompt context prepared chat_id=%s recent_count=%d compressed_len=%d", traceLogPrefix, chatID, len(recent), len(compressed)) } // 进入统一 ReAct 循环 response, err := o.runUnifiedReAct(ctx, chatID, userID, compressed, text) if err != nil { if o.log != nil { o.log.Errorf("%s message generation failed chat_id=%s err=%v", traceLogPrefix, chatID, err) } return "", err } // 最终将机器人的回复也加入记忆缓存 if err := o.store.SaveMessage(chatID, userID, "assistant", response); err != nil { if o.log != nil { o.log.Errorf("%s save assistant response failed chat_id=%s err=%v", traceLogPrefix, chatID, err) } return "", err } if o.log != nil { o.log.Infof("%s message handled chat_id=%s response_len=%d", traceLogPrefix, chatID, len(response)) } return response, nil } // buildUnifiedSystemPrompt 构建统一 ReAct 循环的 system prompt。 // 包含人格设定、所有可用技能(含完整内容)、所有可用工具、以及 JSON 输出格式约束。 func (o *Orchestrator) buildUnifiedSystemPrompt() string { skillMetaDoc := o.formatSkillSummariesForPrompt() allSkillsDoc := o.formatAllSkillsContent() toolDoc := o.formatToolDoc() return strings.Join([]string{ "你是一个个人自动化助手,必须遵循如下人格设定并保持一致:", o.soul, "", "===== 可用技能概览 =====", skillMetaDoc, "", "===== 技能详细说明 =====", allSkillsDoc, "", "===== 可用工具 =====", toolDoc, "", "===== 输出格式约束 =====", "你必须使用 ReAct(Reasoning + Acting)模式进行决策。", "每次回复必须是且仅是一个 JSON 对象,字段如下:", "", "{", " \"thought\": \"你的推理过程(必填)\",", " \"action\": \"要调用的工具名称,如 file/shell/web_search(不调工具时填 none)\",", " \"action_input\": \"传给工具的输入(字符串或对象),不调工具时填空字符串或 null\",", " \"is_final_answer\": true 或 false,", " \"final_answer\": \"当 is_final_answer=true 时填写给用户的最终回复,否则填 null\"", "}", "", "决策规则:", "1) 如果你可以直接回答用户问题(不需要任何工具):", " 设 is_final_answer=true,action=\"none\",final_answer 填写完整回复。", "2) 如果你需要调用工具获取信息后才能回答:", " 设 is_final_answer=false,action 填工具名,action_input 填工具所需输入,final_answer=null。", "3) 不要在 JSON 之外输出任何内容。", "4) 根据技能说明中的指引决定何时以及如何使用工具。", "5) 每轮工具调用结果会以 Observation 的形式追加到推理记录中,供你下一轮决策参考。", }, "\n") } // runUnifiedReAct 执行统一的 ReAct 循环。 // LLM 每次都看到完整的技能集+工具集,自行决定是否调用工具或直接回答。 // 循环持续到 is_final_answer=true 或达到安全上限。 func (o *Orchestrator) runUnifiedReAct(ctx context.Context, chatID, userID, compressedContext, userInput string) (string, error) { traceID := logger.TraceIDFromContext(ctx) traceLogPrefix := "trace_id=" + traceID systemPrompt := o.buildUnifiedSystemPrompt() if o.log != nil { o.log.Infof("%s unified react start", traceLogPrefix) } // 安全上限:防止无限循环(当前暂不使用 reactMaxStep 配置约束,使用固定硬上限) const maxSteps = 20 scratchpad := "" for step := 1; step <= maxSteps; step++ { if o.log != nil { o.log.Infof("%s react step=%d start", traceLogPrefix, step) o.log.Debugf("%s react step=%d scratchpad=%q", traceLogPrefix, step, scratchpad) } // 构造本轮 user prompt:历史上下文 + 用户问题 + 推理记录 prompt := strings.Join([]string{ "历史上下文:", compressedContext, "", "用户问题:", userInput, "", "当前推理记录(按时间顺序):", scratchpad, "", "请输出你的 JSON 决策。", }, "\n") raw, err := o.llm.Generate(ctx, systemPrompt, prompt) if err != nil { return "", err } if o.log != nil { o.log.Infof("%s react step=%d llm_raw=%q", traceLogPrefix, step, raw) } // 解析 LLM 返回的 JSON 决策 decision, err := parseDecision(raw) if err != nil { if o.log != nil { o.log.Warnf("%s react step=%d parse failed err=%v, using raw as final answer", traceLogPrefix, step, err) } // 解析失败时,尝试将原始输出当作直接回答返回 o.emitCapabilityGap(chatID, userID, userInput, "react_parse_failed") return strings.TrimSpace(raw), nil } if o.log != nil { o.log.Infof("%s react step=%d thought=%q action=%q is_final=%v", traceLogPrefix, step, decision.Thought, decision.Action, decision.IsFinalAnswer) } // ========== 判定:是否为最终回答 ========== if decision.IsFinalAnswer { finalText := "" if decision.FinalAnswer != nil { finalText = strings.TrimSpace(*decision.FinalAnswer) } if finalText == "" { finalText = strings.TrimSpace(decision.Thought) } if finalText == "" { finalText = "已完成处理。" } if o.log != nil { o.log.Infof("%s react final at step=%d answer=%q", traceLogPrefix, step, finalText) } return finalText, nil } // ========== 非最终回答:执行工具调用 ========== action := strings.ToLower(strings.TrimSpace(decision.Action)) if action == "" || action == "none" { // LLM 说不是最终回答但也不指定工具,记录后让它再想一轮 scratchpad += "Step " + strconv.Itoa(step) + " Thought: " + decision.Thought + "\n" scratchpad += "Step " + strconv.Itoa(step) + " Observation: 你没有指定要调用的工具,请重新决策:要么调用工具,要么给出最终回答。\n" continue } actionInput := decision.GetActionInputString() // 检查工具是否存在 tool, ok := o.tools.Get(action) if !ok { if o.log != nil { o.log.Warnf("%s react step=%d tool_not_found=%s", traceLogPrefix, step, action) } scratchpad += "Step " + strconv.Itoa(step) + " Thought: " + decision.Thought + "\n" scratchpad += "Step " + strconv.Itoa(step) + " Action: " + action + "\n" scratchpad += "Step " + strconv.Itoa(step) + " Observation: " + formatToolErrorObservation("TOOL_NOT_FOUND", action, "该工具不存在,可用工具请参阅 system prompt") + "\n" o.emitCapabilityGap(chatID, userID, userInput, "tool_not_found:"+action) continue } // 调用工具 if o.log != nil { o.log.Infof("%s react step=%d tool_call tool=%s input=%q", traceLogPrefix, step, action, actionInput) } toolOut, toolErr := tool.Call(ctx, actionInput) obs := strings.TrimSpace(toolOut) if obs == "" { obs = "(empty output)" } if toolErr != nil { obs = formatToolErrorObservation("TOOL_EXEC_ERROR", action, toolErr.Error()) + "\nOUTPUT:\n" + obs o.emitCapabilityGap(chatID, userID, userInput, "tool_call_failed:"+action) } // 限制观察值长度防止超出 LLM 上下文窗口 if len(obs) > 4000 { obs = obs[:4000] + "\n...(truncated)" } if o.log != nil { o.log.Infof("%s react step=%d observation_len=%d", traceLogPrefix, step, len(obs)) } // 将本轮的思考、行动、观察追加到 scratchpad scratchpad += "Step " + strconv.Itoa(step) + " Thought: " + decision.Thought + "\n" scratchpad += "Step " + strconv.Itoa(step) + " Action: " + action + "\n" scratchpad += "Step " + strconv.Itoa(step) + " ActionInput: " + actionInput + "\n" scratchpad += "Step " + strconv.Itoa(step) + " Observation: " + obs + "\n" } // 达到安全上限仍未得到最终回答 o.emitCapabilityGap(chatID, userID, userInput, "react_step_exhausted") return "我尝试了多轮推理与工具调用,但仍未得到稳定结论。请给我更具体的约束或允许我继续尝试。", nil } // formatAllSkillsContent 返回所有技能的完整内容,用于注入到 system prompt 中。 func (o *Orchestrator) formatAllSkillsContent() string { skills := o.getSkillsSnapshot() if len(skills) == 0 { return "(none)" } return formatSkills(skills) } // emitCapabilityGap 处理能力缺口信息埋点或者通过 AI 自动创建生成相应缺失技能的逻辑 func (o *Orchestrator) emitCapabilityGap(chatID, userID, intent, reason string) { if !o.enableCapabilityGap { return } intent = strings.TrimSpace(intent) reason = strings.TrimSpace(reason) if intent == "" || reason == "" { return } if len(intent) > 1000 { intent = intent[:1000] // 防止恶意使用超长 payload } if len(reason) > 240 { reason = reason[:240] // 保证状态长度在 DB 内正常可用 } if err := o.store.SaveCapabilityGap(chatID, userID, intent, reason); err != nil && o.log != nil { o.log.Warnf("save capability gap failed chat_id=%s user_id=%s err=%v", chatID, userID, err) return } // 提取出高频率缺口并在超出阈值后进行 draft 生成 clusters, err := o.store.TopCapabilityGapClusters(20, time.Now().UTC().Add(-o.gapLookbackDuration)) if err != nil { if o.log != nil { o.log.Warnf("query capability gap clusters failed err=%v", err) } return } for _, c := range clusters { if c.Count < o.gapDraftTriggerCount { continue } path, created, draftErr := knowledge.GenerateSkillDraft(c, o.autoSkillDir) if draftErr != nil { if o.log != nil { o.log.Warnf("generate skill draft failed intent_key=%s reason=%s err=%v", c.IntentKey, c.Reason, draftErr) } continue } if created && o.log != nil { o.log.Infof("capability gap draft generated path=%s intent_key=%s reason=%s count=%d", path, c.IntentKey, c.Reason, c.Count) } // 如果生成了新技能则将它们重新加载进环境 if created { if reloadErr := o.ReloadSkills(); reloadErr != nil && o.log != nil { o.log.Warnf("auto reload skills failed after generation path=%s err=%v", path, reloadErr) } } } } // ReloadSkills 会从提供的技能目录动态从最新存储位置载入所有技能定义而不重启系统。 func (o *Orchestrator) ReloadSkills() error { skills, err := knowledge.LoadSkillSet(o.skillsDir) if err != nil { return err } summaries, err := knowledge.LoadSkillSummaries(o.skillsDir) if err != nil { return err } // 利用 RWMutex 做热更新保护 o.skillsMu.Lock() o.skills = skills o.skillSummaries = copySkillSummaries(summaries) o.skillsMu.Unlock() if o.log != nil { o.log.Infof("skills hot reloaded count=%d dir=%s", len(skills), o.skillsDir) } return nil } func (o *Orchestrator) getSkillsSnapshot() []knowledge.Skill { o.skillsMu.RLock() defer o.skillsMu.RUnlock() out := make([]knowledge.Skill, len(o.skills)) copy(out, o.skills) return out } func (o *Orchestrator) getSkillSummariesSnapshot() []knowledge.SkillSummary { o.skillsMu.RLock() defer o.skillsMu.RUnlock() return copySkillSummaries(o.skillSummaries) } // BuildCapabilityGapReport 生成指定数量以内的近期高频缺失功能报错并格式化成报表。 func (o *Orchestrator) BuildCapabilityGapReport(limit int) (string, error) { clusters, err := o.store.TopCapabilityGapClusters(limit, time.Now().UTC().Add(-o.gapLookbackDuration)) if err != nil { return "", err } if len(clusters) == 0 { return "最近没有采集到能力缺口记录。", nil } b := strings.Builder{} b.WriteString("高频能力缺口清单:\n") for i, c := range clusters { line := fmt.Sprintf("%d) intent=%s | reason=%s | count=%d | last_seen=%s\n", i+1, c.IntentKey, c.Reason, c.Count, c.LastSeenAt.Format("2006-01-02 15:04:05")) b.WriteString(line) } b.WriteString("\n草稿目录:") b.WriteString(o.autoSkillDir) b.WriteString("\n系统会在达到阈值后自动生成并热加载技能;你也可以手动发送 /reload_skills。") return b.String(), nil } func (o *Orchestrator) formatSkillSummariesForPrompt() string { summaries := o.getSkillSummariesSnapshot() if len(summaries) == 0 { return "(none)" } sort.Slice(summaries, func(i, j int) bool { left := strings.ToLower(strings.TrimSpace(summaries[i].DirName)) right := strings.ToLower(strings.TrimSpace(summaries[j].DirName)) if left == right { return strings.ToLower(strings.TrimSpace(summaries[i].Name)) < strings.ToLower(strings.TrimSpace(summaries[j].Name)) } return left < right }) b := strings.Builder{} for _, summary := range summaries { dir := strings.TrimSpace(summary.DirName) name := strings.TrimSpace(summary.Name) desc := strings.TrimSpace(summary.Description) if name == "" { continue } if len(desc) > 220 { desc = desc[:220] } b.WriteString("- ") if dir != "" { b.WriteString("[") b.WriteString(dir) b.WriteString("] ") } b.WriteString(name) if desc != "" { b.WriteString(" => ") b.WriteString(desc) } b.WriteString("\n") } return strings.TrimSpace(b.String()) } func copySkillSummaries(in []knowledge.SkillSummary) []knowledge.SkillSummary { out := make([]knowledge.SkillSummary, len(in)) copy(out, in) for i := range out { out[i].DirName = strings.TrimSpace(out[i].DirName) out[i].Name = strings.TrimSpace(out[i].Name) out[i].Description = strings.TrimSpace(out[i].Description) } return out } func formatToolErrorObservation(code, action, reason string) string { code = strings.TrimSpace(code) action = strings.TrimSpace(action) reason = strings.TrimSpace(reason) if code == "" { code = "TOOL_EXEC_ERROR" } if action == "" { action = "unknown" } if reason == "" { reason = "unknown error" } return "ERROR_CODE=" + code + "; TOOL=" + action + "; REASON=" + reason } func formatSkills(skills []knowledge.Skill) string { b := strings.Builder{} for _, skill := range skills { b.WriteString("## ") b.WriteString(skill.Name) b.WriteString("\n") b.WriteString(skill.Content) b.WriteString("\n\n") } return strings.TrimSpace(b.String()) } func (o *Orchestrator) formatToolDoc() string { list := o.tools.List() if len(list) == 0 { return "(none)" } sort.Slice(list, func(i, j int) bool { return list[i].Name() < list[j].Name() }) b := strings.Builder{} for _, t := range list { b.WriteString("- ") b.WriteString(t.Name()) b.WriteString(": ") b.WriteString(t.Description()) b.WriteString("\n") } return strings.TrimSpace(b.String()) }