1589 lines
52 KiB
Go
1589 lines
52 KiB
Go
package agent
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"runtime"
|
||
"sort"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"laodingbot/internal/knowledge"
|
||
"laodingbot/internal/llm"
|
||
"laodingbot/internal/logger"
|
||
"laodingbot/internal/memory"
|
||
"laodingbot/internal/tools"
|
||
)
|
||
|
||
// StreamEventType 定义流式输出事件类型
|
||
type StreamEventType string
|
||
|
||
const (
|
||
StreamEventTypeThought StreamEventType = "thought" // LLM 思考过程
|
||
StreamEventTypeToolCall StreamEventType = "tool_call" // 工具调用请求
|
||
StreamEventTypeToolResult StreamEventType = "tool_result" // 工具执行结果
|
||
StreamEventTypeFinal StreamEventType = "final" // 最终答案
|
||
StreamEventTypeError StreamEventType = "error" // 错误信息
|
||
)
|
||
|
||
// StreamEvent 代表流式输出中的一个事件
|
||
type StreamEvent struct {
|
||
Type StreamEventType `json:"type"`
|
||
Content string `json:"content"`
|
||
Step int `json:"step,omitempty"`
|
||
ToolName string `json:"tool_name,omitempty"`
|
||
}
|
||
|
||
// StreamEventCallback 是流式事件回调函数类型,用于推送事件到客户端
|
||
type StreamEventCallback func(event StreamEvent) error
|
||
|
||
// Orchestrator 负责协调和组合业务逻辑,包含 LLM 计算、上下文管理、技能匹配计算和工具调用。
|
||
type Orchestrator struct {
|
||
llm llm.Client
|
||
routerLLM llm.Client // 可选:轻量路由模型,用于技能意图路由;为 nil 则仅用关键词匹配
|
||
store *memory.SQLiteStore
|
||
tools *tools.Registry
|
||
soul string
|
||
skills []knowledge.Skill
|
||
skillSummaries []knowledge.SkillSummary
|
||
skillsDir string
|
||
autoSkillDir string
|
||
gapDraftTriggerCount int
|
||
gapLookbackDuration time.Duration
|
||
reactMaxStep int
|
||
enableCapabilityGap bool
|
||
log *logger.Logger
|
||
skillsMu sync.RWMutex
|
||
pendingFilesMu sync.Mutex
|
||
pendingFiles map[string][]pendingFileRef
|
||
}
|
||
|
||
type pendingFileRef struct {
|
||
ID string
|
||
Name string
|
||
MimeType string
|
||
}
|
||
|
||
// NewOrchestrator 创建一个新的编排器对象,初始化关键路径和超时控制等。
|
||
func NewOrchestrator(
|
||
llmClient llm.Client,
|
||
routerLLM llm.Client,
|
||
store *memory.SQLiteStore,
|
||
registry *tools.Registry,
|
||
soul string,
|
||
skills []knowledge.Skill,
|
||
skillSummaries []knowledge.SkillSummary,
|
||
skillsDir string,
|
||
reactMaxStep int,
|
||
enableCapabilityGap bool,
|
||
autoSkillDir string,
|
||
gapDraftTriggerCount int,
|
||
gapLookbackDuration time.Duration,
|
||
log *logger.Logger,
|
||
) *Orchestrator {
|
||
if reactMaxStep <= 0 {
|
||
reactMaxStep = 8 // 默认最大 ReAct 步骤数为 8
|
||
}
|
||
if gapDraftTriggerCount <= 0 {
|
||
gapDraftTriggerCount = 3 // 默认触发技能生成的缺口数量为 3
|
||
}
|
||
if gapLookbackDuration <= 0 {
|
||
gapLookbackDuration = 7 * 24 * time.Hour // 默认回溯时长为 7 天
|
||
}
|
||
if strings.TrimSpace(autoSkillDir) == "" {
|
||
autoSkillDir = skillsDir
|
||
}
|
||
return &Orchestrator{
|
||
llm: llmClient,
|
||
routerLLM: routerLLM,
|
||
store: store,
|
||
tools: registry,
|
||
soul: soul,
|
||
skills: skills,
|
||
skillSummaries: copySkillSummaries(skillSummaries),
|
||
skillsDir: skillsDir,
|
||
autoSkillDir: autoSkillDir,
|
||
gapDraftTriggerCount: gapDraftTriggerCount,
|
||
gapLookbackDuration: gapLookbackDuration,
|
||
reactMaxStep: reactMaxStep,
|
||
enableCapabilityGap: enableCapabilityGap,
|
||
log: log,
|
||
pendingFiles: make(map[string][]pendingFileRef),
|
||
}
|
||
}
|
||
|
||
// HandleMessage 是接受用户消息输入并通过统一 ReAct 循环生成回复的主流程。
|
||
// 不再分"先选 skill 再决策"两步,而是 LLM 第一次调用就同时决定:
|
||
// - 是否可以直接回答(is_final_answer=true)
|
||
// - 是否需要调用工具(action + action_input)
|
||
// 循环持续进行,直到 LLM 返回 is_final_answer=true。
|
||
func (o *Orchestrator) HandleMessage(ctx context.Context, chatID, userID, text string) (string, error) {
|
||
return o.handleMessageInternal(ctx, chatID, userID, text)
|
||
}
|
||
|
||
// HandleMessageWithFiles 接收用户消息和文件,上传文件获取 file_id 并缓存,然后进入普通消息处理流程。
|
||
func (o *Orchestrator) HandleMessageWithFiles(ctx context.Context, chatID, userID, text string, files []llm.InputFile) (string, error) {
|
||
if len(files) > 0 {
|
||
ids, err := o.UploadAndCacheFiles(ctx, chatID, userID, files)
|
||
if err != nil && o.log != nil {
|
||
o.log.Warnf("upload files failed chat_id=%s err=%v", chatID, err)
|
||
}
|
||
_ = ids
|
||
}
|
||
if strings.TrimSpace(text) == "" {
|
||
return "文件已接收。请继续发送你的问题。", nil
|
||
}
|
||
return o.handleMessageInternal(ctx, chatID, userID, text)
|
||
}
|
||
|
||
// HandleMessageStream 接收用户消息并通过流式方式返回回复。
|
||
// 通过 callback 推送实时事件,包括思考过程、工具调用、工具结果和最终答案。
|
||
func (o *Orchestrator) HandleMessageStream(ctx context.Context, chatID, userID, text string, callback StreamEventCallback) (string, error) {
|
||
if callback == nil {
|
||
return "", fmt.Errorf("stream callback is required")
|
||
}
|
||
return o.handleMessageStreamInternal(ctx, chatID, userID, text, callback)
|
||
}
|
||
|
||
// HandleMessageStreamWithFiles 接收用户消息和文件,上传文件后进入流式处理流程。
|
||
func (o *Orchestrator) HandleMessageStreamWithFiles(ctx context.Context, chatID, userID, text string, files []llm.InputFile, callback StreamEventCallback) (string, error) {
|
||
if callback == nil {
|
||
return "", fmt.Errorf("stream callback is required")
|
||
}
|
||
if len(files) > 0 {
|
||
ids, err := o.UploadAndCacheFiles(ctx, chatID, userID, files)
|
||
if err != nil && o.log != nil {
|
||
o.log.Warnf("upload files failed chat_id=%s err=%v", chatID, err)
|
||
}
|
||
_ = ids
|
||
}
|
||
if strings.TrimSpace(text) == "" {
|
||
return "文件已接收。请继续发送你的问题。", nil
|
||
}
|
||
return o.handleMessageStreamInternal(ctx, chatID, userID, text, callback)
|
||
}
|
||
|
||
// UploadAndCacheFiles 上传文件到 LLM 并缓存 file_id,供后续同会话文本问答复用。
|
||
func (o *Orchestrator) UploadAndCacheFiles(ctx context.Context, chatID, userID string, files []llm.InputFile) ([]string, error) {
|
||
if len(files) == 0 {
|
||
return nil, fmt.Errorf("no files provided")
|
||
}
|
||
uploader, ok := o.llm.(llm.FileUploader)
|
||
if !ok {
|
||
return nil, fmt.Errorf("当前 LLM 客户端不支持文件上传接口")
|
||
}
|
||
var ids []string
|
||
var refs []pendingFileRef
|
||
for i, f := range files {
|
||
if strings.TrimSpace(f.FileName) == "" || len(f.Content) == 0 {
|
||
return nil, fmt.Errorf("file[%d] 缺少文件名或内容", i+1)
|
||
}
|
||
fileID, err := uploader.UploadFile(ctx, f, "file-extract")
|
||
if err != nil {
|
||
return nil, fmt.Errorf("file[%d] name=%s 上传失败: %w", i+1, f.FileName, err)
|
||
}
|
||
ids = append(ids, fileID)
|
||
refs = append(refs, pendingFileRef{
|
||
ID: fileID,
|
||
Name: strings.TrimSpace(f.FileName),
|
||
MimeType: defaultIfEmpty(strings.TrimSpace(f.MimeType), "application/octet-stream"),
|
||
})
|
||
}
|
||
o.appendPendingFiles(chatID, userID, refs)
|
||
return ids, nil
|
||
}
|
||
|
||
func (o *Orchestrator) handleMessageInternal(ctx context.Context, chatID, userID, text string) (string, error) {
|
||
// 为链路追踪设置唯一的 TraceID
|
||
traceID := logger.NewTraceID()
|
||
ctx = logger.WithTraceID(ctx, traceID)
|
||
traceLogPrefix := "trace_id=" + traceID
|
||
if o.log != nil {
|
||
o.log.Infof("%s handle message chat_id=%s user_id=%s text_len=%d", traceLogPrefix, chatID, userID, len(text))
|
||
o.log.Debugf("%s handle message text=%q", traceLogPrefix, text)
|
||
}
|
||
|
||
// 处理特殊的重载指令
|
||
if strings.EqualFold(strings.TrimSpace(text), "/reload_skills") {
|
||
if err := o.ReloadSkills(); err != nil {
|
||
return "技能热加载失败: " + err.Error(), nil
|
||
}
|
||
return "技能已热加载完成。", nil
|
||
}
|
||
|
||
// 如果用户请求能力缺口报告,则生成报告格式化输出
|
||
if strings.EqualFold(strings.TrimSpace(text), "/capability_gaps") {
|
||
report, err := o.BuildCapabilityGapReport(10)
|
||
if err != nil {
|
||
return "缺口报告生成失败: " + err.Error(), nil
|
||
}
|
||
return report, nil
|
||
}
|
||
|
||
// 保存用户消息到 SQLite 中
|
||
if err := o.store.SaveMessage(chatID, userID, "user", text); err != nil {
|
||
if o.log != nil {
|
||
o.log.Errorf("%s save user message failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
|
||
}
|
||
return "", err
|
||
}
|
||
|
||
// 读取最近的会话记忆并压缩成 Prompt 上下文
|
||
recent, err := o.store.LoadRecent(chatID, 16)
|
||
if err != nil {
|
||
if o.log != nil {
|
||
o.log.Errorf("%s load recent failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
|
||
}
|
||
return "", err
|
||
}
|
||
compressed := memory.CompressForPrompt(recent, 6000)
|
||
if o.log != nil {
|
||
o.log.Debugf("%s prompt context prepared chat_id=%s recent_count=%d compressed_len=%d", traceLogPrefix, chatID, len(recent), len(compressed))
|
||
}
|
||
|
||
// 进入统一 ReAct 循环
|
||
response, err := o.runUnifiedReAct(ctx, chatID, userID, compressed, text)
|
||
if err != nil {
|
||
if o.log != nil {
|
||
o.log.Errorf("%s message generation failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
|
||
}
|
||
return "", err
|
||
}
|
||
|
||
// 最终将机器人的回复也加入记忆缓存
|
||
if err := o.store.SaveMessage(chatID, userID, "assistant", response); err != nil {
|
||
if o.log != nil {
|
||
o.log.Errorf("%s save assistant response failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
|
||
}
|
||
return "", err
|
||
}
|
||
|
||
if o.log != nil {
|
||
o.log.Infof("%s message handled chat_id=%s response_len=%d", traceLogPrefix, chatID, len(response))
|
||
}
|
||
return response, nil
|
||
}
|
||
|
||
// handleMessageStreamInternal 处理流式消息的内部逻辑,类似于handleMessageInternal但支持流式回调
|
||
func (o *Orchestrator) handleMessageStreamInternal(ctx context.Context, chatID, userID, text string, callback StreamEventCallback) (string, error) {
|
||
// 为链路追踪设置唯一的 TraceID
|
||
traceID := logger.NewTraceID()
|
||
ctx = logger.WithTraceID(ctx, traceID)
|
||
traceLogPrefix := "trace_id=" + traceID
|
||
if o.log != nil {
|
||
o.log.Infof("%s handle message stream chat_id=%s user_id=%s text_len=%d", traceLogPrefix, chatID, userID, len(text))
|
||
o.log.Debugf("%s handle message stream text=%q", traceLogPrefix, text)
|
||
}
|
||
|
||
// 处理特殊的重载指令
|
||
if strings.EqualFold(strings.TrimSpace(text), "/reload_skills") {
|
||
if err := o.ReloadSkills(); err != nil {
|
||
return "技能热加载失败: " + err.Error(), nil
|
||
}
|
||
return "技能已热加载完成。", nil
|
||
}
|
||
|
||
// 如果用户请求能力缺口报告,则生成报告格式化输出
|
||
if strings.EqualFold(strings.TrimSpace(text), "/capability_gaps") {
|
||
report, err := o.BuildCapabilityGapReport(10)
|
||
if err != nil {
|
||
return "缺口报告生成失败: " + err.Error(), nil
|
||
}
|
||
return report, nil
|
||
}
|
||
|
||
// 保存用户消息到 SQLite 中
|
||
if err := o.store.SaveMessage(chatID, userID, "user", text); err != nil {
|
||
if o.log != nil {
|
||
o.log.Errorf("%s save user message failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
|
||
}
|
||
return "", err
|
||
}
|
||
|
||
// 读取最近的会话记忆并压缩成 Prompt 上下文
|
||
recent, err := o.store.LoadRecent(chatID, 16)
|
||
if err != nil {
|
||
if o.log != nil {
|
||
o.log.Errorf("%s load recent failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
|
||
}
|
||
return "", err
|
||
}
|
||
compressed := memory.CompressForPrompt(recent, 6000)
|
||
if o.log != nil {
|
||
o.log.Debugf("%s stream prompt context prepared chat_id=%s recent_count=%d compressed_len=%d", traceLogPrefix, chatID, len(recent), len(compressed))
|
||
}
|
||
|
||
// 进入流式统一 ReAct 循环
|
||
response, err := o.runUnifiedReActStream(ctx, chatID, userID, compressed, text, callback)
|
||
if err != nil {
|
||
if o.log != nil {
|
||
o.log.Errorf("%s stream message generation failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
|
||
}
|
||
return "", err
|
||
}
|
||
|
||
// 最终将机器人的回复也加入记忆缓存
|
||
if err := o.store.SaveMessage(chatID, userID, "assistant", response); err != nil {
|
||
if o.log != nil {
|
||
o.log.Errorf("%s save assistant response failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
|
||
}
|
||
return "", err
|
||
}
|
||
|
||
if o.log != nil {
|
||
o.log.Infof("%s stream message handled chat_id=%s response_len=%d", traceLogPrefix, chatID, len(response))
|
||
}
|
||
return response, nil
|
||
}
|
||
|
||
// buildUnifiedSystemPrompt 构建统一 ReAct 循环的 system prompt。
|
||
// 工具定义通过 API 的 tools 字段传递;此处只需包含人格、技能、运行环境和思考指引。
|
||
// routedSkills 为 LLM 路由预选的技能列表;如果为 nil,则回退到关键词匹配。
|
||
func (o *Orchestrator) buildUnifiedSystemPrompt(userInput string, routedSkills []knowledge.Skill) string {
|
||
skillMetaDoc := o.formatSkillSummariesForPrompt()
|
||
var relevantSkillsDoc string
|
||
if routedSkills != nil {
|
||
relevantSkillsDoc = o.formatSelectedSkillsForPrompt(userInput, routedSkills)
|
||
} else {
|
||
relevantSkillsDoc = o.formatSelectedSkillsForPrompt(userInput, nil)
|
||
}
|
||
runtimeDoc := formatRuntimeContextForPrompt()
|
||
|
||
return strings.Join([]string{
|
||
"你是一个个人自动化助手,必须遵循如下人格设定并保持一致:",
|
||
o.soul,
|
||
"",
|
||
"===== ReAct 思考指引 =====",
|
||
"你采用 ReAct(Reasoning + Acting)模式进行任务处理。",
|
||
"1. 思考优先:在做出任何行动之前,先完成内部推理,但不要把 Thought、trace、step 暴露给用户。",
|
||
"2. 工具调用:如果需要获取信息或执行操作,使用提供的工具函数(function calling)进行调用。",
|
||
"3. 观察反馈:检查工具返回的结果,据此决定下一步行动。",
|
||
"4. 最终回答:当你有足够信息时,只输出面向用户的最终文本回复,不要附带推理轨迹,不要调用工具。",
|
||
"",
|
||
"注意事项:",
|
||
"- 每次要么调用工具,要么给出最终回答,不要两者都做。",
|
||
"- 如果工具调用失败,根据错误信息(Traceback)调整策略后重试或给出替代方案。",
|
||
"- 涉及文件、目录、命令时,优先调用工具获取真实结果,不要猜测。",
|
||
"- 如果本轮需要调用工具,可以在 assistant content 中写简短内部推理,供系统记录日志;这些内容不会直接展示给用户。",
|
||
"- 最终用户可见内容中禁止出现 Thought、Trace、Step、Observation、Action、ActionInput 等字段或标题。",
|
||
"",
|
||
"===== 运行环境 =====",
|
||
runtimeDoc,
|
||
"",
|
||
"===== 可用技能概览 =====",
|
||
skillMetaDoc,
|
||
"",
|
||
"===== 本轮相关技能(按用户问题筛选) =====",
|
||
relevantSkillsDoc,
|
||
"",
|
||
"===== 关键约束 =====",
|
||
}, "\n")
|
||
}
|
||
|
||
// routeSkillsWithLLM 使用轻量 LLM 模型对用户输入进行语义路由,判断是否需要加载技能以及选择哪些技能。
|
||
// 返回匹配到的技能列表(可能为空切片表示不需要技能,nil 表示调用失败应回退)。
|
||
func (o *Orchestrator) routeSkillsWithLLM(ctx context.Context, userInput string) ([]knowledge.Skill, error) {
|
||
traceLogPrefix := "trace_id=" + logger.TraceIDFromContext(ctx)
|
||
|
||
summaries := o.getSkillSummariesSnapshot()
|
||
if len(summaries) == 0 {
|
||
if o.log != nil {
|
||
o.log.Debugf("%s skill router: no skills available, skip", traceLogPrefix)
|
||
}
|
||
return []knowledge.Skill{}, nil
|
||
}
|
||
|
||
// 构建技能池描述
|
||
skillPool := strings.Builder{}
|
||
for _, s := range summaries {
|
||
name := strings.TrimSpace(s.Name)
|
||
desc := strings.TrimSpace(s.Description)
|
||
if name == "" {
|
||
continue
|
||
}
|
||
skillPool.WriteString("- ")
|
||
skillPool.WriteString(name)
|
||
if desc != "" {
|
||
skillPool.WriteString(": ")
|
||
skillPool.WriteString(desc)
|
||
}
|
||
skillPool.WriteString("\n")
|
||
}
|
||
|
||
routerSystemPrompt := strings.Join([]string{
|
||
"你是一个意图路由器。根据用户输入,从技能池中挑选最合适的技能。",
|
||
"",
|
||
"规则:",
|
||
"1. 如果用户的问题可以直接回答(闲聊、简单问答)或只需简单工具调用,设置 need_skills=false,selected_skills 为空数组。",
|
||
"2. 如果用户的问题涉及专业流程、复杂任务或与某个技能高度相关,设置 need_skills=true 并选择最相关的技能名称。",
|
||
"3. 最多选择 3 个技能。",
|
||
"4. 仅返回 JSON,不要附加任何其他文字。",
|
||
"",
|
||
"可用技能池:",
|
||
strings.TrimSpace(skillPool.String()),
|
||
"",
|
||
"输出格式(严格 JSON):",
|
||
`{"need_skills": true, "selected_skills": ["技能名称1"], "reason": "简要说明"}`,
|
||
}, "\n")
|
||
|
||
routerUserPrompt := "用户输入:" + userInput
|
||
|
||
if o.log != nil {
|
||
o.log.Debugf("%s skill router request: skills_count=%d input_len=%d", traceLogPrefix, len(summaries), len(userInput))
|
||
}
|
||
|
||
raw, err := o.routerLLM.Generate(ctx, routerSystemPrompt, routerUserPrompt)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("router llm call failed: %w", err)
|
||
}
|
||
|
||
if o.log != nil {
|
||
o.log.Debugf("%s skill router response: %s", traceLogPrefix, truncateForLog(raw, 500))
|
||
}
|
||
|
||
decision, err := parseCapabilityRoute(raw)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("router response parse failed: %w", err)
|
||
}
|
||
|
||
if !decision.NeedSkills || len(decision.SelectedSkills) == 0 {
|
||
if o.log != nil {
|
||
o.log.Infof("%s skill router: no skills needed, reason=%s", traceLogPrefix, decision.Reason)
|
||
}
|
||
return []knowledge.Skill{}, nil
|
||
}
|
||
|
||
// 根据路由结果匹配完整技能内容
|
||
allSkills := o.getSkillsSnapshot()
|
||
selected := matchSkillsByName(allSkills, decision.SelectedSkills)
|
||
|
||
if o.log != nil {
|
||
o.log.Infof("%s skill router: need_skills=true requested=%v matched=%d reason=%s",
|
||
traceLogPrefix, decision.SelectedSkills, len(selected), decision.Reason)
|
||
}
|
||
|
||
return selected, nil
|
||
}
|
||
|
||
// matchSkillsByName 根据名称列表从全量技能中模糊匹配。
|
||
func matchSkillsByName(allSkills []knowledge.Skill, names []string) []knowledge.Skill {
|
||
if len(names) == 0 {
|
||
return nil
|
||
}
|
||
matched := make([]knowledge.Skill, 0, len(names))
|
||
for _, wantName := range names {
|
||
want := strings.ToLower(strings.TrimSpace(wantName))
|
||
if want == "" {
|
||
continue
|
||
}
|
||
for _, sk := range allSkills {
|
||
skName := strings.ToLower(strings.TrimSpace(sk.Name))
|
||
if skName == want || strings.Contains(skName, want) || strings.Contains(want, skName) {
|
||
matched = append(matched, sk)
|
||
break
|
||
}
|
||
}
|
||
}
|
||
return matched
|
||
}
|
||
|
||
// runUnifiedReAct 执行统一的 ReAct 循环,使用原生 function calling API。
|
||
// messages 数组随交互动态增长:system → history → user → assistant(tool_calls) → tool → ...
|
||
// 循环持续到 LLM 返回无 tool_calls 的纯文本回复(即最终回答)或达到安全上限。
|
||
func (o *Orchestrator) runUnifiedReAct(ctx context.Context, chatID, userID, compressedContext, userInput string) (string, error) {
|
||
traceID := logger.TraceIDFromContext(ctx)
|
||
traceLogPrefix := "trace_id=" + traceID
|
||
|
||
// ===== LLM 意图路由:使用轻量模型判断是否需要加载技能 =====
|
||
var routedSkills []knowledge.Skill
|
||
if o.routerLLM != nil {
|
||
routed, routeErr := o.routeSkillsWithLLM(ctx, userInput)
|
||
if routeErr != nil {
|
||
if o.log != nil {
|
||
o.log.Warnf("%s skill router failed, fallback to keyword matching err=%v", traceLogPrefix, routeErr)
|
||
}
|
||
// 路由失败时 routedSkills 保持 nil,buildUnifiedSystemPrompt 回退到关键词匹配
|
||
} else {
|
||
routedSkills = routed
|
||
if o.log != nil {
|
||
names := make([]string, 0, len(routedSkills))
|
||
for _, sk := range routedSkills {
|
||
names = append(names, sk.Name)
|
||
}
|
||
o.log.Infof("%s skill router selected %d skills: %v", traceLogPrefix, len(routedSkills), names)
|
||
}
|
||
}
|
||
}
|
||
|
||
systemPrompt := o.buildUnifiedSystemPrompt(userInput, routedSkills)
|
||
|
||
if o.log != nil {
|
||
o.log.Infof("%s unified react start", traceLogPrefix)
|
||
o.log.Debugf("%s system_prompt_len=%d", traceLogPrefix, len(systemPrompt))
|
||
}
|
||
|
||
// 检查 LLM 客户端是否支持原生 tool_calls
|
||
toolCallClient, supportsToolCalls := o.llm.(llm.ToolCallChatClient)
|
||
if !supportsToolCalls {
|
||
if o.log != nil {
|
||
o.log.Warnf("%s llm client does not support ToolCallChatClient, falling back to legacy ReAct", traceLogPrefix)
|
||
}
|
||
return o.runLegacyReAct(ctx, chatID, userID, compressedContext, userInput)
|
||
}
|
||
|
||
// 构建初始 messages 数组
|
||
messages := make([]llm.PromptMessage, 0, 32)
|
||
messages = append(messages, llm.PromptMessage{Role: "system", Content: systemPrompt})
|
||
|
||
// 加入历史会话上下文
|
||
messages = append(messages, parseCompressedHistoryMessages(compressedContext)...)
|
||
|
||
// 加入当前用户消息
|
||
messages = append(messages, llm.PromptMessage{Role: "user", Content: userInput})
|
||
|
||
// 构建工具定义列表(通过 API tools 字段传递)
|
||
toolDefs := o.buildToolDefinitions()
|
||
if o.log != nil {
|
||
toolNames := make([]string, 0, len(toolDefs))
|
||
for _, td := range toolDefs {
|
||
toolNames = append(toolNames, td.Function.Name)
|
||
}
|
||
o.log.Debugf("%s tool_defs_count=%d names=%v", traceLogPrefix, len(toolDefs), toolNames)
|
||
}
|
||
|
||
const maxSteps = 20
|
||
for step := 1; step <= maxSteps; step++ {
|
||
if o.log != nil {
|
||
o.log.Debugf("%s react step=%d start messages_count=%d", traceLogPrefix, step, len(messages))
|
||
}
|
||
|
||
// 调用 LLM(传入完整 messages + tools 定义)
|
||
completion, err := toolCallClient.GenerateWithTools(ctx, messages, toolDefs)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
|
||
if o.log != nil {
|
||
o.log.Debugf("%s react step=%d content_len=%d tool_calls=%d",
|
||
traceLogPrefix, step, len(completion.Content), len(completion.ToolCalls))
|
||
if completion.Content != "" {
|
||
o.log.Debugf("%s react step=%d thought=%q", traceLogPrefix, step, completion.Content)
|
||
}
|
||
}
|
||
|
||
// ========== 无 tool_calls → 最终回答 ==========
|
||
if len(completion.ToolCalls) == 0 {
|
||
finalText := sanitizeUserFacingAnswer(completion.Content)
|
||
if finalText == "" {
|
||
finalText = "已完成处理。"
|
||
}
|
||
if o.log != nil {
|
||
o.log.Debugf("%s react final at step=%d answer_len=%d", traceLogPrefix, step, len(finalText))
|
||
}
|
||
return finalText, nil
|
||
}
|
||
|
||
// ========== 有 tool_calls → 将 assistant 消息加入历史,然后执行工具 ==========
|
||
assistantMsg := llm.PromptMessage{
|
||
Role: "assistant",
|
||
Content: completion.Content,
|
||
ToolCalls: completion.ToolCalls,
|
||
}
|
||
messages = append(messages, assistantMsg)
|
||
|
||
// 逐个执行工具调用,并将结果作为 tool 角色消息加入
|
||
for _, tc := range completion.ToolCalls {
|
||
toolName := strings.ToLower(strings.TrimSpace(tc.Function.Name))
|
||
toolInput := extractToolInput(tc.Function.Arguments)
|
||
|
||
tool, ok := o.tools.Get(toolName)
|
||
if !ok {
|
||
if o.log != nil {
|
||
o.log.Warnf("%s react step=%d tool_not_found=%s", traceLogPrefix, step, toolName)
|
||
}
|
||
messages = append(messages, llm.PromptMessage{
|
||
Role: "tool",
|
||
ToolCallID: tc.ID,
|
||
Name: tc.Function.Name,
|
||
Content: formatToolErrorObservation("TOOL_NOT_FOUND", toolName, "该工具不存在,请检查工具名称后重试"),
|
||
})
|
||
o.emitCapabilityGap(chatID, userID, userInput, "tool_not_found:"+toolName)
|
||
continue
|
||
}
|
||
|
||
if o.log != nil {
|
||
o.log.Debugf("%s react step=%d tool_call tool=%s input_len=%d", traceLogPrefix, step, toolName, len(toolInput))
|
||
o.log.Debugf("%s react step=%d tool=%s input=%q", traceLogPrefix, step, toolName, toolInput)
|
||
}
|
||
|
||
toolOut, toolErr := tool.Call(ctx, toolInput)
|
||
obs := strings.TrimSpace(toolOut)
|
||
if obs == "" {
|
||
obs = "(empty output)"
|
||
}
|
||
if toolErr != nil {
|
||
obs = formatToolErrorObservation("TOOL_EXEC_ERROR", toolName, toolErr.Error()) + "\nOUTPUT:\n" + obs
|
||
o.emitCapabilityGap(chatID, userID, userInput, "tool_call_failed:"+toolName)
|
||
}
|
||
// 限制观察值长度防止超出 LLM 上下文窗口
|
||
if len(obs) > 4000 {
|
||
obs = obs[:4000] + "\n...(truncated)"
|
||
}
|
||
|
||
if o.log != nil {
|
||
o.log.Debugf("%s react step=%d tool=%s observation_len=%d", traceLogPrefix, step, toolName, len(obs))
|
||
o.log.Debugf("%s react step=%d tool=%s observation=%q", traceLogPrefix, step, toolName, truncateForLog(obs, 500))
|
||
}
|
||
|
||
messages = append(messages, llm.PromptMessage{
|
||
Role: "tool",
|
||
ToolCallID: tc.ID,
|
||
Name: tc.Function.Name,
|
||
Content: obs,
|
||
})
|
||
}
|
||
}
|
||
|
||
// 达到安全上限仍未得到最终回答
|
||
o.emitCapabilityGap(chatID, userID, userInput, "react_step_exhausted")
|
||
return "我尝试了多轮推理与工具调用,但仍未得到稳定结论。请给我更具体的约束或允许我继续尝试。", nil
|
||
}
|
||
|
||
// runUnifiedReActStream 执行统一的 ReAct 循环并通过回调推送流式事件。
|
||
func (o *Orchestrator) runUnifiedReActStream(ctx context.Context, chatID, userID, compressedContext, userInput string, callback StreamEventCallback) (string, error) {
|
||
traceID := logger.TraceIDFromContext(ctx)
|
||
traceLogPrefix := "trace_id=" + traceID
|
||
|
||
// ===== LLM 意图路由:使用轻量模型判断是否需要加载技能 =====
|
||
var routedSkills []knowledge.Skill
|
||
if o.routerLLM != nil {
|
||
routed, routeErr := o.routeSkillsWithLLM(ctx, userInput)
|
||
if routeErr != nil {
|
||
if o.log != nil {
|
||
o.log.Warnf("%s skill router failed, fallback to keyword matching err=%v", traceLogPrefix, routeErr)
|
||
}
|
||
} else {
|
||
routedSkills = routed
|
||
if o.log != nil {
|
||
names := make([]string, 0, len(routedSkills))
|
||
for _, sk := range routedSkills {
|
||
names = append(names, sk.Name)
|
||
}
|
||
o.log.Infof("%s skill router selected %d skills: %v", traceLogPrefix, len(routedSkills), names)
|
||
}
|
||
}
|
||
}
|
||
|
||
systemPrompt := o.buildUnifiedSystemPrompt(userInput, routedSkills)
|
||
|
||
if o.log != nil {
|
||
o.log.Infof("%s unified react stream start", traceLogPrefix)
|
||
o.log.Debugf("%s system_prompt_len=%d", traceLogPrefix, len(systemPrompt))
|
||
}
|
||
|
||
// 检查 LLM 客户端是否支持原生 tool_calls
|
||
toolCallClient, supportsToolCalls := o.llm.(llm.ToolCallChatClient)
|
||
if !supportsToolCalls {
|
||
if o.log != nil {
|
||
o.log.Warnf("%s llm client does not support ToolCallChatClient, stream mode not available", traceLogPrefix)
|
||
}
|
||
return "", fmt.Errorf("stream mode requires ToolCallChatClient support")
|
||
}
|
||
|
||
// 构建初始 messages 数组
|
||
messages := make([]llm.PromptMessage, 0, 32)
|
||
messages = append(messages, llm.PromptMessage{Role: "system", Content: systemPrompt})
|
||
messages = append(messages, parseCompressedHistoryMessages(compressedContext)...)
|
||
messages = append(messages, llm.PromptMessage{Role: "user", Content: userInput})
|
||
|
||
// 构建工具定义列表
|
||
toolDefs := o.buildToolDefinitions()
|
||
if o.log != nil {
|
||
toolNames := make([]string, 0, len(toolDefs))
|
||
for _, td := range toolDefs {
|
||
toolNames = append(toolNames, td.Function.Name)
|
||
}
|
||
o.log.Debugf("%s tool_defs_count=%d names=%v", traceLogPrefix, len(toolDefs), toolNames)
|
||
}
|
||
|
||
const maxSteps = 20
|
||
for step := 1; step <= maxSteps; step++ {
|
||
if o.log != nil {
|
||
o.log.Debugf("%s react stream step=%d start messages_count=%d", traceLogPrefix, step, len(messages))
|
||
}
|
||
|
||
// 调用 LLM
|
||
completion, err := toolCallClient.GenerateWithTools(ctx, messages, toolDefs)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
|
||
if o.log != nil {
|
||
o.log.Debugf("%s react stream step=%d content_len=%d tool_calls=%d",
|
||
traceLogPrefix, step, len(completion.Content), len(completion.ToolCalls))
|
||
if completion.Content != "" {
|
||
o.log.Debugf("%s react stream step=%d thought=%q", traceLogPrefix, step, completion.Content)
|
||
}
|
||
}
|
||
|
||
// 推送思考过程事件
|
||
if completion.Content != "" {
|
||
// 分割内容为逐步推送的片段
|
||
segments := splitContentIntoSegments(completion.Content, 50) // 每段50字符
|
||
for _, segment := range segments {
|
||
if err := callback(StreamEvent{
|
||
Type: StreamEventTypeThought,
|
||
Content: segment,
|
||
Step: step,
|
||
}); err != nil {
|
||
return "", fmt.Errorf("callback error: %w", err)
|
||
}
|
||
}
|
||
}
|
||
|
||
// ========== 无 tool_calls → 最终回答 ==========
|
||
if len(completion.ToolCalls) == 0 {
|
||
finalText := sanitizeUserFacingAnswer(completion.Content)
|
||
if finalText == "" {
|
||
finalText = "已完成处理。"
|
||
}
|
||
if o.log != nil {
|
||
o.log.Debugf("%s react stream final at step=%d answer_len=%d", traceLogPrefix, step, len(finalText))
|
||
}
|
||
// 推送最终答案事件
|
||
if err := callback(StreamEvent{
|
||
Type: StreamEventTypeFinal,
|
||
Content: finalText,
|
||
Step: step,
|
||
}); err != nil {
|
||
return "", fmt.Errorf("callback error: %w", err)
|
||
}
|
||
return finalText, nil
|
||
}
|
||
|
||
// ========== 有 tool_calls → 执行工具 ==========
|
||
assistantMsg := llm.PromptMessage{
|
||
Role: "assistant",
|
||
Content: completion.Content,
|
||
ToolCalls: completion.ToolCalls,
|
||
}
|
||
messages = append(messages, assistantMsg)
|
||
|
||
// 逐个执行工具调用
|
||
for _, tc := range completion.ToolCalls {
|
||
toolName := strings.ToLower(strings.TrimSpace(tc.Function.Name))
|
||
toolInput := extractToolInput(tc.Function.Arguments)
|
||
|
||
// 推送工具调用事件
|
||
if err := callback(StreamEvent{
|
||
Type: StreamEventTypeToolCall,
|
||
Content: toolInput,
|
||
Step: step,
|
||
ToolName: toolName,
|
||
}); err != nil {
|
||
return "", fmt.Errorf("callback error: %w", err)
|
||
}
|
||
|
||
tool, ok := o.tools.Get(toolName)
|
||
if !ok {
|
||
if o.log != nil {
|
||
o.log.Warnf("%s react stream step=%d tool_not_found=%s", traceLogPrefix, step, toolName)
|
||
}
|
||
// 推送错误事件
|
||
errMsg := "工具不存在:" + toolName
|
||
if err := callback(StreamEvent{
|
||
Type: StreamEventTypeError,
|
||
Content: errMsg,
|
||
Step: step,
|
||
ToolName: toolName,
|
||
}); err != nil {
|
||
return "", fmt.Errorf("callback error: %w", err)
|
||
}
|
||
messages = append(messages, llm.PromptMessage{
|
||
Role: "tool",
|
||
ToolCallID: tc.ID,
|
||
Name: tc.Function.Name,
|
||
Content: formatToolErrorObservation("TOOL_NOT_FOUND", toolName, "该工具不存在,请检查工具名称后重试"),
|
||
})
|
||
o.emitCapabilityGap(chatID, userID, userInput, "tool_not_found:"+toolName)
|
||
continue
|
||
}
|
||
|
||
if o.log != nil {
|
||
o.log.Debugf("%s react stream step=%d tool_call tool=%s input_len=%d", traceLogPrefix, step, toolName, len(toolInput))
|
||
o.log.Debugf("%s react stream step=%d tool=%s input=%q", traceLogPrefix, step, toolName, toolInput)
|
||
}
|
||
|
||
toolOut, toolErr := tool.Call(ctx, toolInput)
|
||
obs := strings.TrimSpace(toolOut)
|
||
if obs == "" {
|
||
obs = "(empty output)"
|
||
}
|
||
if toolErr != nil {
|
||
obs = formatToolErrorObservation("TOOL_EXEC_ERROR", toolName, toolErr.Error()) + "\nOUTPUT:\n" + obs
|
||
o.emitCapabilityGap(chatID, userID, userInput, "tool_call_failed:"+toolName)
|
||
}
|
||
// 限制观察值长度防止超出 LLM 上下文窗口
|
||
if len(obs) > 4000 {
|
||
obs = obs[:4000] + "\n...(truncated)"
|
||
}
|
||
|
||
if o.log != nil {
|
||
o.log.Debugf("%s react stream step=%d tool=%s observation_len=%d", traceLogPrefix, step, toolName, len(obs))
|
||
o.log.Debugf("%s react stream step=%d tool=%s observation=%q", traceLogPrefix, step, toolName, truncateForLog(obs, 500))
|
||
}
|
||
|
||
// 推送工具结果事件
|
||
if err := callback(StreamEvent{
|
||
Type: StreamEventTypeToolResult,
|
||
Content: obs,
|
||
Step: step,
|
||
ToolName: toolName,
|
||
}); err != nil {
|
||
return "", fmt.Errorf("callback error: %w", err)
|
||
}
|
||
|
||
messages = append(messages, llm.PromptMessage{
|
||
Role: "tool",
|
||
ToolCallID: tc.ID,
|
||
Name: tc.Function.Name,
|
||
Content: obs,
|
||
})
|
||
}
|
||
}
|
||
|
||
// 达到安全上限
|
||
o.emitCapabilityGap(chatID, userID, userInput, "react_step_exhausted")
|
||
errMsg := "我尝试了多轮推理与工具调用,但仍未得到稳定结论。请给我更具体的约束或允许我继续尝试。"
|
||
_ = callback(StreamEvent{
|
||
Type: StreamEventTypeError,
|
||
Content: errMsg,
|
||
})
|
||
return errMsg, nil
|
||
}
|
||
|
||
// runLegacyReAct 是旧版基于 JSON 决策解析的 ReAct 循环,作为不支持 tool_calls 的 LLM 的降级方案。
|
||
func (o *Orchestrator) runLegacyReAct(ctx context.Context, chatID, userID, compressedContext, userInput string) (string, error) {
|
||
traceID := logger.TraceIDFromContext(ctx)
|
||
traceLogPrefix := "trace_id=" + traceID
|
||
|
||
systemPrompt := o.buildLegacySystemPrompt(userInput)
|
||
|
||
const maxSteps = 20
|
||
scratchpad := ""
|
||
|
||
for step := 1; step <= maxSteps; step++ {
|
||
if o.log != nil {
|
||
o.log.Debugf("%s legacy react step=%d start", traceLogPrefix, step)
|
||
}
|
||
|
||
messages := buildReActMessages(systemPrompt, compressedContext, userInput, scratchpad)
|
||
raw, err := o.generateMessages(ctx, messages)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
|
||
decision, err := parseDecision(raw)
|
||
if err != nil {
|
||
o.emitCapabilityGap(chatID, userID, userInput, "react_parse_failed")
|
||
return strings.TrimSpace(raw), nil
|
||
}
|
||
|
||
if decision.IsFinalAnswer {
|
||
finalText := ""
|
||
if decision.FinalAnswer != nil {
|
||
finalText = sanitizeUserFacingAnswer(*decision.FinalAnswer)
|
||
}
|
||
if finalText == "" {
|
||
finalText = sanitizeUserFacingAnswer(decision.Thought)
|
||
}
|
||
if finalText == "" {
|
||
finalText = "已完成处理。"
|
||
}
|
||
return finalText, nil
|
||
}
|
||
|
||
action := strings.ToLower(strings.TrimSpace(decision.Action))
|
||
if action == "" || action == "none" {
|
||
scratchpad += "Step " + strconv.Itoa(step) + " Thought: " + decision.Thought + "\n"
|
||
scratchpad += "Step " + strconv.Itoa(step) + " Observation: 你没有指定要调用的工具,请重新决策。\n"
|
||
continue
|
||
}
|
||
|
||
actionInput := decision.GetActionInputString()
|
||
tool, ok := o.tools.Get(action)
|
||
if !ok {
|
||
scratchpad += "Step " + strconv.Itoa(step) + " Thought: " + decision.Thought + "\n"
|
||
scratchpad += "Step " + strconv.Itoa(step) + " Action: " + action + "\n"
|
||
scratchpad += "Step " + strconv.Itoa(step) + " Observation: " + formatToolErrorObservation("TOOL_NOT_FOUND", action, "该工具不存在") + "\n"
|
||
o.emitCapabilityGap(chatID, userID, userInput, "tool_not_found:"+action)
|
||
continue
|
||
}
|
||
|
||
toolOut, toolErr := tool.Call(ctx, actionInput)
|
||
obs := strings.TrimSpace(toolOut)
|
||
if obs == "" {
|
||
obs = "(empty output)"
|
||
}
|
||
if toolErr != nil {
|
||
obs = formatToolErrorObservation("TOOL_EXEC_ERROR", action, toolErr.Error()) + "\nOUTPUT:\n" + obs
|
||
o.emitCapabilityGap(chatID, userID, userInput, "tool_call_failed:"+action)
|
||
}
|
||
if len(obs) > 4000 {
|
||
obs = obs[:4000] + "\n...(truncated)"
|
||
}
|
||
|
||
scratchpad += "Step " + strconv.Itoa(step) + " Thought: " + decision.Thought + "\n"
|
||
scratchpad += "Step " + strconv.Itoa(step) + " Action: " + action + "\n"
|
||
scratchpad += "Step " + strconv.Itoa(step) + " ActionInput: " + actionInput + "\n"
|
||
scratchpad += "Step " + strconv.Itoa(step) + " Observation: " + obs + "\n"
|
||
}
|
||
|
||
o.emitCapabilityGap(chatID, userID, userInput, "react_step_exhausted")
|
||
return "我尝试了多轮推理与工具调用,但仍未得到稳定结论。请给我更具体的约束或允许我继续尝试。", nil
|
||
}
|
||
|
||
// buildLegacySystemPrompt 为不支持 tool_calls 的旧版 ReAct 链路构建 system prompt(含 JSON 输出格式约束)。
|
||
func (o *Orchestrator) buildLegacySystemPrompt(userInput string) string {
|
||
skillMetaDoc := o.formatSkillSummariesForPrompt()
|
||
relevantSkillsDoc := o.formatSelectedSkillsForPrompt(userInput, nil)
|
||
toolDoc := o.formatToolDoc()
|
||
runtimeDoc := formatRuntimeContextForPrompt()
|
||
|
||
return strings.Join([]string{
|
||
"你是一个个人自动化助手,必须遵循如下人格设定并保持一致:",
|
||
o.soul,
|
||
"",
|
||
"===== 运行环境 =====",
|
||
runtimeDoc,
|
||
"",
|
||
"===== 可用技能概览 =====",
|
||
skillMetaDoc,
|
||
"",
|
||
"===== 本轮相关技能 =====",
|
||
relevantSkillsDoc,
|
||
"",
|
||
"===== 可用工具 =====",
|
||
toolDoc,
|
||
"",
|
||
"===== 输出格式约束 =====",
|
||
"你必须使用 ReAct 模式进行决策。每次回复必须是且仅是一个 JSON 对象:",
|
||
"{",
|
||
" \"thought\": \"你的推理过程(必填)\",",
|
||
" \"action\": \"要调用的工具名称(不调工具时填 none)\",",
|
||
" \"action_input\": \"传给工具的输入\",",
|
||
" \"is_final_answer\": true 或 false,",
|
||
" \"final_answer\": \"当 is_final_answer=true 时填写给用户的最终回复\"",
|
||
"}",
|
||
}, "\n")
|
||
}
|
||
|
||
// buildToolDefinitions 将工具注册表转换为 OpenAI function calling 所需的 ToolDefinition 列表。
|
||
func (o *Orchestrator) buildToolDefinitions() []llm.ToolDefinition {
|
||
list := o.tools.List()
|
||
defs := make([]llm.ToolDefinition, 0, len(list))
|
||
defaultParams := json.RawMessage(`{"type":"object","properties":{"input":{"type":"string","description":"工具的输入命令或查询内容"}},"required":["input"]}`)
|
||
|
||
sort.Slice(list, func(i, j int) bool {
|
||
return list[i].Name() < list[j].Name()
|
||
})
|
||
|
||
for _, t := range list {
|
||
defs = append(defs, llm.ToolDefinition{
|
||
Type: "function",
|
||
Function: llm.ToolFunctionDef{
|
||
Name: t.Name(),
|
||
Description: t.Description(),
|
||
Parameters: defaultParams,
|
||
},
|
||
})
|
||
}
|
||
return defs
|
||
}
|
||
|
||
// extractToolInput 从 LLM 的 function calling arguments JSON 中提取工具输入字符串。
|
||
func extractToolInput(arguments string) string {
|
||
arguments = strings.TrimSpace(arguments)
|
||
if arguments == "" {
|
||
return ""
|
||
}
|
||
var args struct {
|
||
Input string `json:"input"`
|
||
}
|
||
if err := json.Unmarshal([]byte(arguments), &args); err != nil {
|
||
// 降级:直接将 arguments 作为输入
|
||
return arguments
|
||
}
|
||
if args.Input != "" {
|
||
return args.Input
|
||
}
|
||
return arguments
|
||
}
|
||
|
||
func (o *Orchestrator) generateMessages(ctx context.Context, messages []llm.PromptMessage) (string, error) {
|
||
if client, ok := o.llm.(llm.MessageChatClient); ok {
|
||
return client.GenerateMessages(ctx, messages)
|
||
}
|
||
systemPrompt, userPrompt := fallbackPromptsFromMessages(messages)
|
||
return o.llm.Generate(ctx, systemPrompt, userPrompt)
|
||
}
|
||
|
||
func buildReActMessages(systemPrompt, compressedContext, userInput, scratchpad string) []llm.PromptMessage {
|
||
msgs := make([]llm.PromptMessage, 0, 16)
|
||
msgs = append(msgs, llm.PromptMessage{Role: "system", Content: systemPrompt})
|
||
msgs = append(msgs, parseCompressedHistoryMessages(compressedContext)...)
|
||
|
||
if strings.TrimSpace(scratchpad) != "" {
|
||
msgs = append(msgs, llm.PromptMessage{Role: "assistant", Content: "推理记录:\n" + strings.TrimSpace(scratchpad)})
|
||
}
|
||
msgs = append(msgs, llm.PromptMessage{Role: "user", Content: userInput})
|
||
return msgs
|
||
}
|
||
|
||
func parseCompressedHistoryMessages(compressed string) []llm.PromptMessage {
|
||
compressed = strings.TrimSpace(compressed)
|
||
if compressed == "" {
|
||
return nil
|
||
}
|
||
lines := strings.Split(compressed, "\n")
|
||
out := make([]llm.PromptMessage, 0, len(lines))
|
||
for _, line := range lines {
|
||
line = strings.TrimSpace(line)
|
||
if line == "" {
|
||
continue
|
||
}
|
||
idx := strings.Index(line, ":")
|
||
if idx <= 0 {
|
||
out = append(out, llm.PromptMessage{Role: "assistant", Content: line})
|
||
continue
|
||
}
|
||
role := strings.ToLower(strings.TrimSpace(line[:idx]))
|
||
content := strings.TrimSpace(line[idx+1:])
|
||
if role != "system" && role != "user" && role != "assistant" {
|
||
role = "assistant"
|
||
}
|
||
out = append(out, llm.PromptMessage{Role: role, Content: content})
|
||
}
|
||
return out
|
||
}
|
||
|
||
func fallbackPromptsFromMessages(messages []llm.PromptMessage) (string, string) {
|
||
sysParts := make([]string, 0, 2)
|
||
userParts := make([]string, 0, len(messages))
|
||
for _, m := range messages {
|
||
role := strings.ToLower(strings.TrimSpace(m.Role))
|
||
content := strings.TrimSpace(m.Content)
|
||
if content == "" {
|
||
continue
|
||
}
|
||
if role == "system" {
|
||
sysParts = append(sysParts, content)
|
||
continue
|
||
}
|
||
userParts = append(userParts, role+": "+content)
|
||
}
|
||
return strings.Join(sysParts, "\n\n"), strings.Join(userParts, "\n")
|
||
}
|
||
|
||
func nonEmptyIDs(ids []string) []string {
|
||
if len(ids) == 0 {
|
||
return nil
|
||
}
|
||
out := make([]string, 0, len(ids))
|
||
seen := map[string]struct{}{}
|
||
for _, id := range ids {
|
||
id = strings.TrimSpace(id)
|
||
if id == "" {
|
||
continue
|
||
}
|
||
if _, ok := seen[id]; ok {
|
||
continue
|
||
}
|
||
seen[id] = struct{}{}
|
||
out = append(out, id)
|
||
}
|
||
return out
|
||
}
|
||
|
||
func (o *Orchestrator) appendPendingFiles(chatID, userID string, refs []pendingFileRef) {
|
||
refs = sanitizePendingRefs(refs)
|
||
if len(refs) == 0 {
|
||
return
|
||
}
|
||
key := pendingFileKey(chatID, userID)
|
||
o.pendingFilesMu.Lock()
|
||
defer o.pendingFilesMu.Unlock()
|
||
merged := append(o.pendingFiles[key], refs...)
|
||
o.pendingFiles[key] = sanitizePendingRefs(merged)
|
||
}
|
||
|
||
func (o *Orchestrator) getPendingFiles(chatID, userID string) []pendingFileRef {
|
||
key := pendingFileKey(chatID, userID)
|
||
o.pendingFilesMu.Lock()
|
||
defer o.pendingFilesMu.Unlock()
|
||
snapshot := o.pendingFiles[key]
|
||
out := make([]pendingFileRef, len(snapshot))
|
||
copy(out, snapshot)
|
||
return out
|
||
}
|
||
|
||
func (o *Orchestrator) clearPendingFiles(chatID, userID string) {
|
||
key := pendingFileKey(chatID, userID)
|
||
o.pendingFilesMu.Lock()
|
||
defer o.pendingFilesMu.Unlock()
|
||
delete(o.pendingFiles, key)
|
||
}
|
||
|
||
func pendingFileKey(chatID, userID string) string {
|
||
return strings.TrimSpace(chatID) + "::" + strings.TrimSpace(userID)
|
||
}
|
||
|
||
func sanitizePendingRefs(refs []pendingFileRef) []pendingFileRef {
|
||
if len(refs) == 0 {
|
||
return nil
|
||
}
|
||
out := make([]pendingFileRef, 0, len(refs))
|
||
seen := map[string]struct{}{}
|
||
for _, r := range refs {
|
||
id := strings.TrimSpace(r.ID)
|
||
if id == "" {
|
||
continue
|
||
}
|
||
if _, ok := seen[id]; ok {
|
||
continue
|
||
}
|
||
seen[id] = struct{}{}
|
||
r.ID = id
|
||
r.Name = strings.TrimSpace(r.Name)
|
||
r.MimeType = strings.TrimSpace(r.MimeType)
|
||
out = append(out, r)
|
||
}
|
||
return out
|
||
}
|
||
|
||
func defaultIfEmpty(v, fallback string) string {
|
||
if strings.TrimSpace(v) == "" {
|
||
return fallback
|
||
}
|
||
return v
|
||
}
|
||
|
||
// formatRelevantSkillsForPrompt 返回与当前用户问题最相关的技能内容。
|
||
func (o *Orchestrator) formatSelectedSkillsForPrompt(userInput string, selected []knowledge.Skill) string {
|
||
skills := selected
|
||
if len(skills) == 0 {
|
||
skills = o.selectRelevantSkills(userInput, 4)
|
||
}
|
||
if len(skills) == 0 {
|
||
return "(none matched, tools are still globally available)"
|
||
}
|
||
return formatSkills(skills)
|
||
}
|
||
|
||
func (o *Orchestrator) selectRelevantSkills(userInput string, maxCount int) []knowledge.Skill {
|
||
if maxCount <= 0 {
|
||
maxCount = 4
|
||
}
|
||
query := strings.TrimSpace(strings.ToLower(userInput))
|
||
all := o.getSkillsSnapshot()
|
||
if query == "" || len(all) <= maxCount {
|
||
return all
|
||
}
|
||
|
||
queryTokens := buildQueryTokens(query)
|
||
type item struct {
|
||
skill knowledge.Skill
|
||
score int
|
||
}
|
||
ranked := make([]item, 0, len(all))
|
||
|
||
for _, sk := range all {
|
||
hay := strings.ToLower(sk.Name + "\n" + clipForScoring(sk.Content, 1800))
|
||
score := 0
|
||
if strings.Contains(hay, query) {
|
||
score += 8
|
||
}
|
||
for _, tk := range queryTokens {
|
||
if strings.Contains(hay, tk) {
|
||
score++
|
||
}
|
||
}
|
||
if score == 0 {
|
||
continue
|
||
}
|
||
ranked = append(ranked, item{skill: sk, score: score})
|
||
if o.log != nil {
|
||
o.log.Debugf("selectRelevantSkills skill=%q score=%d", sk.Name, score)
|
||
}
|
||
}
|
||
|
||
if len(ranked) == 0 {
|
||
return nil
|
||
}
|
||
|
||
sort.Slice(ranked, func(i, j int) bool {
|
||
if ranked[i].score == ranked[j].score {
|
||
return strings.ToLower(strings.TrimSpace(ranked[i].skill.Name)) < strings.ToLower(strings.TrimSpace(ranked[j].skill.Name))
|
||
}
|
||
return ranked[i].score > ranked[j].score
|
||
})
|
||
|
||
if len(ranked) > maxCount {
|
||
ranked = ranked[:maxCount]
|
||
}
|
||
out := make([]knowledge.Skill, 0, len(ranked))
|
||
for _, r := range ranked {
|
||
out = append(out, r.skill)
|
||
}
|
||
if o.log != nil {
|
||
selectedNames := make([]string, 0, len(out))
|
||
for _, sk := range out {
|
||
selectedNames = append(selectedNames, sk.Name)
|
||
}
|
||
o.log.Debugf("selectRelevantSkills query=%q matched=%d selected=%v", query, len(ranked), selectedNames)
|
||
}
|
||
return out
|
||
}
|
||
|
||
func buildQueryTokens(query string) []string {
|
||
set := map[string]struct{}{}
|
||
collectToken := func(t string) {
|
||
t = strings.TrimSpace(t)
|
||
if len([]rune(t)) < 2 {
|
||
return
|
||
}
|
||
set[t] = struct{}{}
|
||
}
|
||
|
||
for _, part := range strings.FieldsFunc(query, func(r rune) bool {
|
||
if r >= 'a' && r <= 'z' {
|
||
return false
|
||
}
|
||
if r >= '0' && r <= '9' {
|
||
return false
|
||
}
|
||
if r >= 0x4e00 && r <= 0x9fff {
|
||
return false
|
||
}
|
||
return true
|
||
}) {
|
||
collectToken(part)
|
||
}
|
||
|
||
// 针对中文无空格输入,补充 2-gram 提升匹配命中率。
|
||
runes := []rune(query)
|
||
for i := 0; i+1 < len(runes); i++ {
|
||
r1 := runes[i]
|
||
r2 := runes[i+1]
|
||
if (r1 >= 0x4e00 && r1 <= 0x9fff) && (r2 >= 0x4e00 && r2 <= 0x9fff) {
|
||
collectToken(string([]rune{r1, r2}))
|
||
}
|
||
}
|
||
|
||
out := make([]string, 0, len(set))
|
||
for tk := range set {
|
||
out = append(out, tk)
|
||
}
|
||
sort.Strings(out)
|
||
return out
|
||
}
|
||
|
||
func clipForScoring(s string, maxRunes int) string {
|
||
if maxRunes <= 0 {
|
||
maxRunes = 1800
|
||
}
|
||
r := []rune(s)
|
||
if len(r) <= maxRunes {
|
||
return s
|
||
}
|
||
return string(r[:maxRunes])
|
||
}
|
||
|
||
func formatRuntimeContextForPrompt() string {
|
||
goos := strings.TrimSpace(strings.ToLower(runtime.GOOS))
|
||
if goos == "" {
|
||
goos = "unknown"
|
||
}
|
||
return "当前运行系统 GOOS=" + goos + "。请优先使用与该系统一致的策略。仅当用户明确要求时,才采用其他系统(如 Windows)的专用流程。"
|
||
}
|
||
|
||
// emitCapabilityGap 处理能力缺口信息埋点或者通过 AI 自动创建生成相应缺失技能的逻辑
|
||
func (o *Orchestrator) emitCapabilityGap(chatID, userID, intent, reason string) {
|
||
if !o.enableCapabilityGap {
|
||
return
|
||
}
|
||
intent = strings.TrimSpace(intent)
|
||
reason = strings.TrimSpace(reason)
|
||
if intent == "" || reason == "" {
|
||
return
|
||
}
|
||
if len(intent) > 1000 {
|
||
intent = intent[:1000] // 防止恶意使用超长 payload
|
||
}
|
||
if len(reason) > 240 {
|
||
reason = reason[:240] // 保证状态长度在 DB 内正常可用
|
||
}
|
||
if err := o.store.SaveCapabilityGap(chatID, userID, intent, reason); err != nil && o.log != nil {
|
||
o.log.Warnf("save capability gap failed chat_id=%s user_id=%s err=%v", chatID, userID, err)
|
||
return
|
||
}
|
||
|
||
// 提取出高频率缺口并在超出阈值后进行 draft 生成
|
||
clusters, err := o.store.TopCapabilityGapClusters(20, time.Now().UTC().Add(-o.gapLookbackDuration))
|
||
if err != nil {
|
||
if o.log != nil {
|
||
o.log.Warnf("query capability gap clusters failed err=%v", err)
|
||
}
|
||
return
|
||
}
|
||
for _, c := range clusters {
|
||
if c.Count < o.gapDraftTriggerCount {
|
||
continue
|
||
}
|
||
|
||
path, created, draftErr := knowledge.GenerateSkillDraft(c, o.autoSkillDir)
|
||
if draftErr != nil {
|
||
if o.log != nil {
|
||
o.log.Warnf("generate skill draft failed intent_key=%s reason=%s err=%v", c.IntentKey, c.Reason, draftErr)
|
||
}
|
||
continue
|
||
}
|
||
if created && o.log != nil {
|
||
o.log.Infof("capability gap draft generated path=%s intent_key=%s reason=%s count=%d", path, c.IntentKey, c.Reason, c.Count)
|
||
}
|
||
// 如果生成了新技能则将它们重新加载进环境
|
||
if created {
|
||
if reloadErr := o.ReloadSkills(); reloadErr != nil && o.log != nil {
|
||
o.log.Warnf("auto reload skills failed after generation path=%s err=%v", path, reloadErr)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// ReloadSkills 会从提供的技能目录动态从最新存储位置载入所有技能定义而不重启系统。
|
||
func (o *Orchestrator) ReloadSkills() error {
|
||
skills, err := knowledge.LoadSkillSet(o.skillsDir)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
summaries, err := knowledge.LoadSkillSummaries(o.skillsDir)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
// 利用 RWMutex 做热更新保护
|
||
o.skillsMu.Lock()
|
||
o.skills = skills
|
||
o.skillSummaries = copySkillSummaries(summaries)
|
||
o.skillsMu.Unlock()
|
||
if o.log != nil {
|
||
o.log.Infof("skills hot reloaded count=%d dir=%s", len(skills), o.skillsDir)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (o *Orchestrator) getSkillsSnapshot() []knowledge.Skill {
|
||
o.skillsMu.RLock()
|
||
defer o.skillsMu.RUnlock()
|
||
out := make([]knowledge.Skill, len(o.skills))
|
||
copy(out, o.skills)
|
||
return out
|
||
}
|
||
|
||
func (o *Orchestrator) getSkillSummariesSnapshot() []knowledge.SkillSummary {
|
||
o.skillsMu.RLock()
|
||
defer o.skillsMu.RUnlock()
|
||
return copySkillSummaries(o.skillSummaries)
|
||
}
|
||
|
||
// BuildCapabilityGapReport 生成指定数量以内的近期高频缺失功能报错并格式化成报表。
|
||
func (o *Orchestrator) BuildCapabilityGapReport(limit int) (string, error) {
|
||
clusters, err := o.store.TopCapabilityGapClusters(limit, time.Now().UTC().Add(-o.gapLookbackDuration))
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
if len(clusters) == 0 {
|
||
return "最近没有采集到能力缺口记录。", nil
|
||
}
|
||
b := strings.Builder{}
|
||
b.WriteString("高频能力缺口清单:\n")
|
||
for i, c := range clusters {
|
||
line := fmt.Sprintf("%d) intent=%s | reason=%s | count=%d | last_seen=%s\n", i+1, c.IntentKey, c.Reason, c.Count, c.LastSeenAt.Format("2006-01-02 15:04:05"))
|
||
b.WriteString(line)
|
||
}
|
||
b.WriteString("\n草稿目录:")
|
||
b.WriteString(o.autoSkillDir)
|
||
b.WriteString("\n系统会在达到阈值后自动生成并热加载技能;你也可以手动发送 /reload_skills。")
|
||
return b.String(), nil
|
||
}
|
||
|
||
func (o *Orchestrator) formatSkillSummariesForPrompt() string {
|
||
summaries := o.getSkillSummariesSnapshot()
|
||
if len(summaries) == 0 {
|
||
return "(none)"
|
||
}
|
||
sort.Slice(summaries, func(i, j int) bool {
|
||
left := strings.ToLower(strings.TrimSpace(summaries[i].DirName))
|
||
right := strings.ToLower(strings.TrimSpace(summaries[j].DirName))
|
||
if left == right {
|
||
return strings.ToLower(strings.TrimSpace(summaries[i].Name)) < strings.ToLower(strings.TrimSpace(summaries[j].Name))
|
||
}
|
||
return left < right
|
||
})
|
||
b := strings.Builder{}
|
||
for _, summary := range summaries {
|
||
dir := strings.TrimSpace(summary.DirName)
|
||
name := strings.TrimSpace(summary.Name)
|
||
desc := strings.TrimSpace(summary.Description)
|
||
if name == "" {
|
||
continue
|
||
}
|
||
if len(desc) > 220 {
|
||
desc = desc[:220]
|
||
}
|
||
b.WriteString("- ")
|
||
if dir != "" {
|
||
b.WriteString("[")
|
||
b.WriteString(dir)
|
||
b.WriteString("] ")
|
||
}
|
||
b.WriteString(name)
|
||
if desc != "" {
|
||
b.WriteString(" => ")
|
||
b.WriteString(desc)
|
||
}
|
||
b.WriteString("\n")
|
||
}
|
||
return strings.TrimSpace(b.String())
|
||
}
|
||
|
||
func copySkillSummaries(in []knowledge.SkillSummary) []knowledge.SkillSummary {
|
||
out := make([]knowledge.SkillSummary, len(in))
|
||
copy(out, in)
|
||
for i := range out {
|
||
out[i].DirName = strings.TrimSpace(out[i].DirName)
|
||
out[i].Name = strings.TrimSpace(out[i].Name)
|
||
out[i].Description = strings.TrimSpace(out[i].Description)
|
||
}
|
||
return out
|
||
}
|
||
|
||
func formatToolErrorObservation(code, action, reason string) string {
|
||
code = strings.TrimSpace(code)
|
||
action = strings.TrimSpace(action)
|
||
reason = strings.TrimSpace(reason)
|
||
if code == "" {
|
||
code = "TOOL_EXEC_ERROR"
|
||
}
|
||
if action == "" {
|
||
action = "unknown"
|
||
}
|
||
if reason == "" {
|
||
reason = "unknown error"
|
||
}
|
||
return "ERROR_CODE=" + code + "; TOOL=" + action + "; REASON=" + reason
|
||
}
|
||
|
||
func formatSkills(skills []knowledge.Skill) string {
|
||
b := strings.Builder{}
|
||
for _, skill := range skills {
|
||
b.WriteString("## ")
|
||
b.WriteString(skill.Name)
|
||
b.WriteString("\n")
|
||
b.WriteString(skill.Content)
|
||
b.WriteString("\n\n")
|
||
}
|
||
return strings.TrimSpace(b.String())
|
||
}
|
||
|
||
func (o *Orchestrator) formatToolDoc() string {
|
||
list := o.tools.List()
|
||
if len(list) == 0 {
|
||
return "(none)"
|
||
}
|
||
sort.Slice(list, func(i, j int) bool {
|
||
return list[i].Name() < list[j].Name()
|
||
})
|
||
b := strings.Builder{}
|
||
for _, t := range list {
|
||
b.WriteString("- ")
|
||
b.WriteString(t.Name())
|
||
b.WriteString(": ")
|
||
b.WriteString(t.Description())
|
||
b.WriteString("\n")
|
||
}
|
||
return strings.TrimSpace(b.String())
|
||
}
|
||
|
||
func truncateForLog(s string, maxLen int) string {
|
||
if len(s) <= maxLen {
|
||
return s
|
||
}
|
||
return s[:maxLen] + "...(truncated)"
|
||
}
|
||
|
||
func sanitizeUserFacingAnswer(raw string) string {
|
||
raw = strings.ReplaceAll(raw, "\r\n", "\n")
|
||
raw = strings.TrimSpace(raw)
|
||
if raw == "" {
|
||
return ""
|
||
}
|
||
|
||
markers := []string{"Final Answer:", "Final Answer:", "最终回答:", "最终回答:", "最终答案:", "最终答案:", "Answer:", "Answer:"}
|
||
for _, marker := range markers {
|
||
idx := strings.LastIndex(raw, marker)
|
||
if idx >= 0 {
|
||
candidate := strings.TrimSpace(raw[idx+len(marker):])
|
||
if candidate != "" {
|
||
return candidate
|
||
}
|
||
}
|
||
}
|
||
|
||
lines := strings.Split(raw, "\n")
|
||
cleaned := make([]string, 0, len(lines))
|
||
for _, line := range lines {
|
||
trimmed := strings.TrimSpace(line)
|
||
if trimmed == "" {
|
||
if len(cleaned) > 0 && cleaned[len(cleaned)-1] != "" {
|
||
cleaned = append(cleaned, "")
|
||
}
|
||
continue
|
||
}
|
||
lower := strings.ToLower(trimmed)
|
||
if strings.HasPrefix(lower, "thought:") || strings.HasPrefix(lower, "trace:") || strings.HasPrefix(lower, "observation:") ||
|
||
strings.HasPrefix(lower, "action:") || strings.HasPrefix(lower, "actioninput:") || strings.HasPrefix(lower, "action input:") ||
|
||
strings.HasPrefix(lower, "step ") || strings.HasPrefix(trimmed, "思考:") || strings.HasPrefix(trimmed, "思考:") ||
|
||
strings.HasPrefix(trimmed, "推理:") || strings.HasPrefix(trimmed, "推理:") || strings.HasPrefix(trimmed, "观察:") ||
|
||
strings.HasPrefix(trimmed, "观察:") || strings.HasPrefix(trimmed, "行动:") || strings.HasPrefix(trimmed, "行动:") ||
|
||
strings.HasPrefix(trimmed, "步骤 ") {
|
||
continue
|
||
}
|
||
cleaned = append(cleaned, trimmed)
|
||
}
|
||
|
||
for len(cleaned) > 0 && cleaned[len(cleaned)-1] == "" {
|
||
cleaned = cleaned[:len(cleaned)-1]
|
||
}
|
||
if len(cleaned) == 0 {
|
||
return ""
|
||
}
|
||
return strings.TrimSpace(strings.Join(cleaned, "\n"))
|
||
}
|
||
|
||
// splitContentIntoSegments splits a string into smaller segments of the specified size (by rune count).
|
||
func splitContentIntoSegments(content string, segmentSize int) []string {
|
||
runes := []rune(content)
|
||
var segments []string
|
||
for start := 0; start < len(runes); start += segmentSize {
|
||
end := start + segmentSize
|
||
if end > len(runes) {
|
||
end = len(runes)
|
||
}
|
||
segments = append(segments, string(runes[start:end]))
|
||
}
|
||
return segments
|
||
}
|