Files
LaodingBot/internal/agent/orchestrator.go

1184 lines
38 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package agent
import (
"context"
"encoding/json"
"fmt"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
"laodingbot/internal/knowledge"
"laodingbot/internal/llm"
"laodingbot/internal/logger"
"laodingbot/internal/memory"
"laodingbot/internal/tools"
)
// Orchestrator 负责协调和组合业务逻辑,包含 LLM 计算、上下文管理、技能匹配计算和工具调用。
type Orchestrator struct {
llm llm.Client
store *memory.SQLiteStore
tools *tools.Registry
soul string
skills []knowledge.Skill
skillSummaries []knowledge.SkillSummary
skillsDir string
autoSkillDir string
gapDraftTriggerCount int
gapLookbackDuration time.Duration
reactMaxStep int
enableCapabilityGap bool
log *logger.Logger
skillsMu sync.RWMutex
pendingFilesMu sync.Mutex
pendingFiles map[string][]pendingFileRef
}
type pendingFileRef struct {
ID string
Name string
MimeType string
}
type filePromptContext struct {
Summary string
FatalReason string
FileIDs []string
Uploaded []pendingFileRef
}
// NewOrchestrator 创建一个新的编排器对象,初始化关键路径和超时控制等。
func NewOrchestrator(
llmClient llm.Client,
store *memory.SQLiteStore,
registry *tools.Registry,
soul string,
skills []knowledge.Skill,
skillSummaries []knowledge.SkillSummary,
skillsDir string,
reactMaxStep int,
enableCapabilityGap bool,
autoSkillDir string,
gapDraftTriggerCount int,
gapLookbackDuration time.Duration,
log *logger.Logger,
) *Orchestrator {
if reactMaxStep <= 0 {
reactMaxStep = 8 // 默认最大 ReAct 步骤数为 8
}
if gapDraftTriggerCount <= 0 {
gapDraftTriggerCount = 3 // 默认触发技能生成的缺口数量为 3
}
if gapLookbackDuration <= 0 {
gapLookbackDuration = 7 * 24 * time.Hour // 默认回溯时长为 7 天
}
if strings.TrimSpace(autoSkillDir) == "" {
autoSkillDir = skillsDir
}
return &Orchestrator{
llm: llmClient,
store: store,
tools: registry,
soul: soul,
skills: skills,
skillSummaries: copySkillSummaries(skillSummaries),
skillsDir: skillsDir,
autoSkillDir: autoSkillDir,
gapDraftTriggerCount: gapDraftTriggerCount,
gapLookbackDuration: gapLookbackDuration,
reactMaxStep: reactMaxStep,
enableCapabilityGap: enableCapabilityGap,
log: log,
pendingFiles: make(map[string][]pendingFileRef),
}
}
// HandleMessage 是接受用户消息输入并通过统一 ReAct 循环生成回复的主流程。
// 不再分"先选 skill 再决策"两步,而是 LLM 第一次调用就同时决定:
// - 是否可以直接回答is_final_answer=true
// - 是否需要调用工具action + action_input
// 循环持续进行,直到 LLM 返回 is_final_answer=true。
func (o *Orchestrator) HandleMessage(ctx context.Context, chatID, userID, text string) (string, error) {
return o.handleMessageInternal(ctx, chatID, userID, text, nil, false)
}
func (o *Orchestrator) HandleMessageWithFiles(ctx context.Context, chatID, userID, text string, files []llm.InputFile) (string, error) {
return o.handleMessageInternal(ctx, chatID, userID, text, files, false)
}
// HandleMessageWithFileIDs 接收用户文本与外部 file_id 列表,复用统一 ReAct 链路。
// 该方法会先把 file_id 注入当前会话上下文,然后调用常规 HandleMessage 流程。
func (o *Orchestrator) HandleMessageWithFileIDs(ctx context.Context, chatID, userID, text string, fileIDs []string) (string, error) {
ids := nonEmptyIDs(fileIDs)
if len(ids) > 0 {
refs := make([]pendingFileRef, 0, len(ids))
for _, id := range ids {
refs = append(refs, pendingFileRef{ID: id})
}
o.appendPendingFiles(chatID, userID, refs)
}
return o.handleMessageInternal(ctx, chatID, userID, text, nil, true)
}
// UploadAndCacheFiles 上传文件到 LLM 并缓存 file_id供后续同会话文本问答复用。
// 该方法不会写入 messages 表,仅更新内存中的 pending file 上下文。
func (o *Orchestrator) UploadAndCacheFiles(ctx context.Context, chatID, userID string, files []llm.InputFile) ([]string, error) {
if len(files) == 0 {
return nil, fmt.Errorf("no files provided")
}
uploadCtx := o.prepareFilePromptContext(ctx, files, nil)
if strings.TrimSpace(uploadCtx.FatalReason) != "" {
return nil, fmt.Errorf(uploadCtx.FatalReason)
}
ids := nonEmptyIDs(uploadCtx.FileIDs)
if len(ids) == 0 {
return nil, fmt.Errorf("file upload completed but no valid file_id returned")
}
o.appendPendingFiles(chatID, userID, uploadCtx.toPendingRefs())
return ids, nil
}
func (o *Orchestrator) handleMessageInternal(ctx context.Context, chatID, userID, text string, files []llm.InputFile, appendFileIDText bool) (string, error) {
// 为链路追踪设置唯一的 TraceID
traceID := logger.NewTraceID()
ctx = logger.WithTraceID(ctx, traceID)
traceLogPrefix := "trace_id=" + traceID
if o.log != nil {
o.log.Infof("%s handle message chat_id=%s user_id=%s text_len=%d files=%d", traceLogPrefix, chatID, userID, len(text), len(files))
o.log.Debugf("%s handle message text=%q", traceLogPrefix, text)
}
// 处理特殊的重载指令
if strings.EqualFold(strings.TrimSpace(text), "/reload_skills") {
if err := o.ReloadSkills(); err != nil {
return "技能热加载失败: " + err.Error(), nil
}
return "技能已热加载完成。", nil
}
// 如果用户请求能力缺口报告,则生成报告格式化输出
if strings.EqualFold(strings.TrimSpace(text), "/capability_gaps") {
report, err := o.BuildCapabilityGapReport(10)
if err != nil {
return "缺口报告生成失败: " + err.Error(), nil
}
return report, nil
}
trimmedText := strings.TrimSpace(text)
isFileOnly := len(files) > 0 && trimmedText == ""
if isFileOnly {
if err := o.store.SaveMessage(chatID, userID, "user", "[FILE_UPLOAD]"); err != nil {
if o.log != nil {
o.log.Errorf("%s save file-only user marker failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return "", err
}
uploadCtx := o.prepareFilePromptContext(ctx, files, nil)
if strings.TrimSpace(uploadCtx.FatalReason) != "" {
finalText := "文件上传失败,无法建立文档上下文。" + "\n" + uploadCtx.FatalReason
if err := o.store.SaveMessage(chatID, userID, "assistant", finalText); err != nil && o.log != nil {
o.log.Warnf("%s save upload failure message failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return finalText, nil
}
o.appendPendingFiles(chatID, userID, uploadCtx.toPendingRefs())
finalText := o.buildFileUploadAck(uploadCtx)
if err := o.store.SaveMessage(chatID, userID, "assistant", finalText); err != nil {
if o.log != nil {
o.log.Errorf("%s save file upload ack failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return "", err
}
if o.log != nil {
o.log.Infof("%s file-only message handled chat_id=%s cached_files=%d", traceLogPrefix, chatID, len(uploadCtx.FileIDs))
}
return finalText, nil
}
// 保存用户消息到 SQLite 中
if err := o.store.SaveMessage(chatID, userID, "user", text); err != nil {
if o.log != nil {
o.log.Errorf("%s save user message failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return "", err
}
// 读取最近的会话记忆并压缩成 Prompt 上下文
recent, err := o.store.LoadRecent(chatID, 16)
if err != nil {
if o.log != nil {
o.log.Errorf("%s load recent failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return "", err
}
compressed := memory.CompressForPrompt(recent, 6000)
if o.log != nil {
o.log.Debugf("%s prompt context prepared chat_id=%s recent_count=%d compressed_len=%d", traceLogPrefix, chatID, len(recent), len(compressed))
}
// 进入统一 ReAct 循环
pendingRefs := o.getPendingFiles(chatID, userID)
fileCtx := o.prepareFilePromptContext(ctx, files, pendingRefs)
if strings.TrimSpace(fileCtx.FatalReason) != "" {
finalText := "文件上传失败,无法继续进行文档解析。" + "\n" + fileCtx.FatalReason
if err := o.store.SaveMessage(chatID, userID, "assistant", finalText); err != nil && o.log != nil {
o.log.Warnf("%s save assistant failure message failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
if o.log != nil {
o.log.Warnf("%s stop before react due to file upload failure reason=%s", traceLogPrefix, fileCtx.FatalReason)
}
return finalText, nil
}
response, err := o.runUnifiedReAct(ctx, chatID, userID, compressed, text, fileCtx, appendFileIDText)
if err != nil {
if o.log != nil {
o.log.Errorf("%s message generation failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return "", err
}
if len(pendingRefs) > 0 {
o.clearPendingFiles(chatID, userID)
}
// 最终将机器人的回复也加入记忆缓存
if err := o.store.SaveMessage(chatID, userID, "assistant", response); err != nil {
if o.log != nil {
o.log.Errorf("%s save assistant response failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return "", err
}
if o.log != nil {
o.log.Infof("%s message handled chat_id=%s response_len=%d", traceLogPrefix, chatID, len(response))
}
return response, nil
}
// buildUnifiedSystemPrompt 构建统一 ReAct 循环的 system prompt。
// 工具定义通过 API 的 tools 字段传递;此处只需包含人格、技能、运行环境和思考指引。
func (o *Orchestrator) buildUnifiedSystemPrompt(userInput string) string {
skillMetaDoc := o.formatSkillSummariesForPrompt()
relevantSkillsDoc := o.formatSelectedSkillsForPrompt(userInput, nil)
runtimeDoc := formatRuntimeContextForPrompt()
return strings.Join([]string{
"你是一个个人自动化助手,必须遵循如下人格设定并保持一致:",
o.soul,
"",
"===== ReAct 思考指引 =====",
"你采用 ReActReasoning + Acting模式进行任务处理。",
"1. 思考优先在做出任何行动之前先在回复中阐述你的推理过程Thought。",
"2. 工具调用如果需要获取信息或执行操作使用提供的工具函数function calling进行调用。",
"3. 观察反馈:检查工具返回的结果,据此决定下一步行动。",
"4. 最终回答:当你有足够信息时,直接给出面向用户的最终文本回复,不要调用工具。",
"",
"注意事项:",
"- 每次要么调用工具,要么给出最终回答,不要两者都做。",
"- 如果工具调用失败根据错误信息Traceback调整策略后重试或给出替代方案。",
"- 涉及文件、目录、命令时,优先调用工具获取真实结果,不要猜测。",
"- 你的思考过程Thought应写在回复内容中帮助追踪推理逻辑。",
"",
"===== 运行环境 =====",
runtimeDoc,
"",
"===== 可用技能概览 =====",
skillMetaDoc,
"",
"===== 本轮相关技能(按用户问题筛选) =====",
relevantSkillsDoc,
}, "\n")
}
// runUnifiedReAct 执行统一的 ReAct 循环,使用原生 function calling API。
// messages 数组随交互动态增长system → history → user → assistant(tool_calls) → tool → ...
// 循环持续到 LLM 返回无 tool_calls 的纯文本回复(即最终回答)或达到安全上限。
func (o *Orchestrator) runUnifiedReAct(ctx context.Context, chatID, userID, compressedContext, userInput string, fileCtx filePromptContext, appendFileIDText bool) (string, error) {
traceID := logger.TraceIDFromContext(ctx)
traceLogPrefix := "trace_id=" + traceID
systemPrompt := o.buildUnifiedSystemPrompt(userInput)
if o.log != nil {
o.log.Infof("%s unified react start", traceLogPrefix)
}
// 检查 LLM 客户端是否支持原生 tool_calls
toolCallClient, supportsToolCalls := o.llm.(llm.ToolCallChatClient)
if !supportsToolCalls {
if o.log != nil {
o.log.Warnf("%s llm client does not support ToolCallChatClient, falling back to legacy ReAct", traceLogPrefix)
}
return o.runLegacyReAct(ctx, chatID, userID, compressedContext, userInput, fileCtx, appendFileIDText)
}
// 构建初始 messages 数组
messages := make([]llm.PromptMessage, 0, 32)
messages = append(messages, llm.PromptMessage{Role: "system", Content: systemPrompt})
// 加入历史会话上下文
//messages = append(messages, parseCompressedHistoryMessages(compressedContext)...)
// 加入当前用户消息
messages = append(messages, llm.PromptMessage{Role: "user", Content: userInput})
// 构建工具定义列表(通过 API tools 字段传递)
toolDefs := o.buildToolDefinitions()
const maxSteps = 20
for step := 1; step <= maxSteps; step++ {
if o.log != nil {
o.log.Infof("%s react step=%d start messages_count=%d", traceLogPrefix, step, len(messages))
}
// 调用 LLM传入完整 messages + tools 定义)
completion, err := toolCallClient.GenerateWithTools(ctx, messages, toolDefs, fileCtx.FileIDs, appendFileIDText)
if err != nil {
return "", err
}
if o.log != nil {
o.log.Infof("%s react step=%d content_len=%d tool_calls=%d",
traceLogPrefix, step, len(completion.Content), len(completion.ToolCalls))
if completion.Content != "" {
o.log.Debugf("%s react step=%d thought=%q", traceLogPrefix, step, completion.Content)
}
}
// ========== 无 tool_calls → 最终回答 ==========
if len(completion.ToolCalls) == 0 {
finalText := strings.TrimSpace(completion.Content)
if finalText == "" {
finalText = "已完成处理。"
}
if o.log != nil {
o.log.Infof("%s react final at step=%d answer_len=%d", traceLogPrefix, step, len(finalText))
}
return finalText, nil
}
// ========== 有 tool_calls → 将 assistant 消息加入历史,然后执行工具 ==========
assistantMsg := llm.PromptMessage{
Role: "assistant",
Content: completion.Content,
ToolCalls: completion.ToolCalls,
}
messages = append(messages, assistantMsg)
// 逐个执行工具调用,并将结果作为 tool 角色消息加入
for _, tc := range completion.ToolCalls {
toolName := strings.ToLower(strings.TrimSpace(tc.Function.Name))
toolInput := extractToolInput(tc.Function.Arguments)
tool, ok := o.tools.Get(toolName)
if !ok {
if o.log != nil {
o.log.Warnf("%s react step=%d tool_not_found=%s", traceLogPrefix, step, toolName)
}
messages = append(messages, llm.PromptMessage{
Role: "tool",
ToolCallID: tc.ID,
Name: tc.Function.Name,
Content: formatToolErrorObservation("TOOL_NOT_FOUND", toolName, "该工具不存在,请检查工具名称后重试"),
})
o.emitCapabilityGap(chatID, userID, userInput, "tool_not_found:"+toolName)
continue
}
if o.log != nil {
o.log.Infof("%s react step=%d tool_call tool=%s input=%q", traceLogPrefix, step, toolName, toolInput)
}
toolOut, toolErr := tool.Call(ctx, toolInput)
obs := strings.TrimSpace(toolOut)
if obs == "" {
obs = "(empty output)"
}
if toolErr != nil {
obs = formatToolErrorObservation("TOOL_EXEC_ERROR", toolName, toolErr.Error()) + "\nOUTPUT:\n" + obs
o.emitCapabilityGap(chatID, userID, userInput, "tool_call_failed:"+toolName)
}
// 限制观察值长度防止超出 LLM 上下文窗口
if len(obs) > 4000 {
obs = obs[:4000] + "\n...(truncated)"
}
if o.log != nil {
o.log.Infof("%s react step=%d tool=%s observation_len=%d", traceLogPrefix, step, toolName, len(obs))
}
messages = append(messages, llm.PromptMessage{
Role: "tool",
ToolCallID: tc.ID,
Name: tc.Function.Name,
Content: obs,
})
}
}
// 达到安全上限仍未得到最终回答
o.emitCapabilityGap(chatID, userID, userInput, "react_step_exhausted")
return "我尝试了多轮推理与工具调用,但仍未得到稳定结论。请给我更具体的约束或允许我继续尝试。", nil
}
// runLegacyReAct 是旧版基于 JSON 决策解析的 ReAct 循环,作为不支持 tool_calls 的 LLM 的降级方案。
func (o *Orchestrator) runLegacyReAct(ctx context.Context, chatID, userID, compressedContext, userInput string, fileCtx filePromptContext, appendFileIDText bool) (string, error) {
traceID := logger.TraceIDFromContext(ctx)
traceLogPrefix := "trace_id=" + traceID
systemPrompt := o.buildLegacySystemPrompt(userInput)
const maxSteps = 20
scratchpad := ""
for step := 1; step <= maxSteps; step++ {
if o.log != nil {
o.log.Infof("%s legacy react step=%d start", traceLogPrefix, step)
}
messages := buildReActMessages(systemPrompt, compressedContext, userInput, fileCtx.Summary, scratchpad)
raw, err := o.generateWithOptionalFilesMessages(ctx, messages, fileCtx.FileIDs, appendFileIDText)
if err != nil {
return "", err
}
decision, err := parseDecision(raw)
if err != nil {
o.emitCapabilityGap(chatID, userID, userInput, "react_parse_failed")
return strings.TrimSpace(raw), nil
}
if decision.IsFinalAnswer {
finalText := ""
if decision.FinalAnswer != nil {
finalText = strings.TrimSpace(*decision.FinalAnswer)
}
if finalText == "" {
finalText = strings.TrimSpace(decision.Thought)
}
if finalText == "" {
finalText = "已完成处理。"
}
return finalText, nil
}
action := strings.ToLower(strings.TrimSpace(decision.Action))
if action == "" || action == "none" {
scratchpad += "Step " + strconv.Itoa(step) + " Thought: " + decision.Thought + "\n"
scratchpad += "Step " + strconv.Itoa(step) + " Observation: 你没有指定要调用的工具,请重新决策。\n"
continue
}
actionInput := decision.GetActionInputString()
tool, ok := o.tools.Get(action)
if !ok {
scratchpad += "Step " + strconv.Itoa(step) + " Thought: " + decision.Thought + "\n"
scratchpad += "Step " + strconv.Itoa(step) + " Action: " + action + "\n"
scratchpad += "Step " + strconv.Itoa(step) + " Observation: " + formatToolErrorObservation("TOOL_NOT_FOUND", action, "该工具不存在") + "\n"
o.emitCapabilityGap(chatID, userID, userInput, "tool_not_found:"+action)
continue
}
toolOut, toolErr := tool.Call(ctx, actionInput)
obs := strings.TrimSpace(toolOut)
if obs == "" {
obs = "(empty output)"
}
if toolErr != nil {
obs = formatToolErrorObservation("TOOL_EXEC_ERROR", action, toolErr.Error()) + "\nOUTPUT:\n" + obs
o.emitCapabilityGap(chatID, userID, userInput, "tool_call_failed:"+action)
}
if len(obs) > 4000 {
obs = obs[:4000] + "\n...(truncated)"
}
scratchpad += "Step " + strconv.Itoa(step) + " Thought: " + decision.Thought + "\n"
scratchpad += "Step " + strconv.Itoa(step) + " Action: " + action + "\n"
scratchpad += "Step " + strconv.Itoa(step) + " ActionInput: " + actionInput + "\n"
scratchpad += "Step " + strconv.Itoa(step) + " Observation: " + obs + "\n"
}
o.emitCapabilityGap(chatID, userID, userInput, "react_step_exhausted")
return "我尝试了多轮推理与工具调用,但仍未得到稳定结论。请给我更具体的约束或允许我继续尝试。", nil
}
// buildLegacySystemPrompt 为不支持 tool_calls 的旧版 ReAct 链路构建 system prompt含 JSON 输出格式约束)。
func (o *Orchestrator) buildLegacySystemPrompt(userInput string) string {
skillMetaDoc := o.formatSkillSummariesForPrompt()
relevantSkillsDoc := o.formatSelectedSkillsForPrompt(userInput, nil)
toolDoc := o.formatToolDoc()
runtimeDoc := formatRuntimeContextForPrompt()
return strings.Join([]string{
"你是一个个人自动化助手,必须遵循如下人格设定并保持一致:",
o.soul,
"",
"===== 运行环境 =====",
runtimeDoc,
"",
"===== 可用技能概览 =====",
skillMetaDoc,
"",
"===== 本轮相关技能 =====",
relevantSkillsDoc,
"",
"===== 可用工具 =====",
toolDoc,
"",
"===== 输出格式约束 =====",
"你必须使用 ReAct 模式进行决策。每次回复必须是且仅是一个 JSON 对象:",
"{",
" \"thought\": \"你的推理过程(必填)\",",
" \"action\": \"要调用的工具名称(不调工具时填 none\",",
" \"action_input\": \"传给工具的输入\",",
" \"is_final_answer\": true 或 false,",
" \"final_answer\": \"当 is_final_answer=true 时填写给用户的最终回复\"",
"}",
}, "\n")
}
// buildToolDefinitions 将工具注册表转换为 OpenAI function calling 所需的 ToolDefinition 列表。
func (o *Orchestrator) buildToolDefinitions() []llm.ToolDefinition {
list := o.tools.List()
defs := make([]llm.ToolDefinition, 0, len(list))
defaultParams := json.RawMessage(`{"type":"object","properties":{"input":{"type":"string","description":"工具的输入命令或查询内容"}},"required":["input"]}`)
sort.Slice(list, func(i, j int) bool {
return list[i].Name() < list[j].Name()
})
for _, t := range list {
defs = append(defs, llm.ToolDefinition{
Type: "function",
Function: llm.ToolFunctionDef{
Name: t.Name(),
Description: t.Description(),
Parameters: defaultParams,
},
})
}
return defs
}
// extractToolInput 从 LLM 的 function calling arguments JSON 中提取工具输入字符串。
func extractToolInput(arguments string) string {
arguments = strings.TrimSpace(arguments)
if arguments == "" {
return ""
}
var args struct {
Input string `json:"input"`
}
if err := json.Unmarshal([]byte(arguments), &args); err != nil {
// 降级:直接将 arguments 作为输入
return arguments
}
if args.Input != "" {
return args.Input
}
return arguments
}
func (o *Orchestrator) prepareFilePromptContext(ctx context.Context, files []llm.InputFile, pending []pendingFileRef) filePromptContext {
ctxOut := filePromptContext{}
if len(pending) > 0 {
for _, p := range pending {
id := strings.TrimSpace(p.ID)
if id == "" {
continue
}
ctxOut.FileIDs = append(ctxOut.FileIDs, id)
}
}
if len(files) == 0 {
ctxOut.Summary = buildFileSummary(pending, nil)
return ctxOut
}
uploader, ok := o.llm.(llm.FileUploader)
if !ok {
return filePromptContext{FatalReason: "检测到文件输入,但当前 LLM 客户端不支持文件上传接口。"}
}
uploaded := make([]pendingFileRef, 0, len(files))
for i, f := range files {
if strings.TrimSpace(f.FileName) == "" || len(f.Content) == 0 {
return filePromptContext{FatalReason: fmt.Sprintf("file[%d] 缺少文件名或内容,无法上传。", i+1)}
}
fileID, err := uploader.UploadFile(ctx, f, "file-extract")
if err != nil {
return filePromptContext{FatalReason: fmt.Sprintf("file[%d] name=%s 上传失败: %v", i+1, f.FileName, err)}
}
ctxOut.FileIDs = append(ctxOut.FileIDs, fileID)
uploaded = append(uploaded, pendingFileRef{
ID: fileID,
Name: strings.TrimSpace(f.FileName),
MimeType: defaultIfEmpty(strings.TrimSpace(f.MimeType), "application/octet-stream"),
})
}
ctxOut.Uploaded = uploaded
ctxOut.Summary = buildFileSummary(pending, uploaded)
return ctxOut
}
func buildFileSummary(pending, uploaded []pendingFileRef) string {
if len(pending) == 0 && len(uploaded) == 0 {
return ""
}
lines := make([]string, 0, len(pending)+len(uploaded)+2)
lines = append(lines, "以下文件 file_id 可用于本轮问答:")
idx := 1
for _, p := range pending {
id := strings.TrimSpace(p.ID)
if id == "" {
continue
}
lines = append(lines, fmt.Sprintf("- cached_file[%d] name=%s mime=%s file_id=%s", idx, defaultIfEmpty(strings.TrimSpace(p.Name), "(unknown)"), defaultIfEmpty(strings.TrimSpace(p.MimeType), "application/octet-stream"), id))
idx++
}
for _, p := range uploaded {
id := strings.TrimSpace(p.ID)
if id == "" {
continue
}
lines = append(lines, fmt.Sprintf("- uploaded_file[%d] name=%s mime=%s file_id=%s", idx, defaultIfEmpty(strings.TrimSpace(p.Name), "(unknown)"), defaultIfEmpty(strings.TrimSpace(p.MimeType), "application/octet-stream"), id))
idx++
}
if len(lines) == 1 {
return ""
}
return strings.Join(lines, "\n")
}
func (o *Orchestrator) generateWithOptionalFilesMessages(ctx context.Context, messages []llm.PromptMessage, fileIDs []string, appendFileIDText bool) (string, error) {
ids := nonEmptyIDs(fileIDs)
if len(ids) == 0 {
if client, ok := o.llm.(llm.MessageChatClient); ok {
return client.GenerateMessages(ctx, messages)
}
systemPrompt, userPrompt := fallbackPromptsFromMessages(messages)
return o.llm.Generate(ctx, systemPrompt, userPrompt)
}
if client, ok := o.llm.(llm.FileMessageChatClient); ok {
return client.GenerateMessagesWithFiles(ctx, messages, ids, appendFileIDText)
}
client, ok := o.llm.(llm.FileChatClient)
if !ok {
systemPrompt, userPrompt := fallbackPromptsFromMessages(messages)
return o.llm.Generate(ctx, systemPrompt, userPrompt)
}
systemPrompt, userPrompt := fallbackPromptsFromMessages(messages)
return client.GenerateWithFiles(ctx, systemPrompt, userPrompt, ids, appendFileIDText)
}
func buildReActMessages(systemPrompt, compressedContext, userInput, fileSummary, scratchpad string) []llm.PromptMessage {
msgs := make([]llm.PromptMessage, 0, 16)
msgs = append(msgs, llm.PromptMessage{Role: "system", Content: systemPrompt})
msgs = append(msgs, parseCompressedHistoryMessages(compressedContext)...)
if strings.TrimSpace(fileSummary) != "" {
msgs = append(msgs, llm.PromptMessage{Role: "assistant", Content: "文件上下文摘要:\n" + strings.TrimSpace(fileSummary)})
}
if strings.TrimSpace(scratchpad) != "" {
msgs = append(msgs, llm.PromptMessage{Role: "assistant", Content: "推理记录:\n" + strings.TrimSpace(scratchpad)})
}
msgs = append(msgs, llm.PromptMessage{Role: "user", Content: userInput})
return msgs
}
func parseCompressedHistoryMessages(compressed string) []llm.PromptMessage {
compressed = strings.TrimSpace(compressed)
if compressed == "" {
return nil
}
lines := strings.Split(compressed, "\n")
out := make([]llm.PromptMessage, 0, len(lines))
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
idx := strings.Index(line, ":")
if idx <= 0 {
out = append(out, llm.PromptMessage{Role: "assistant", Content: line})
continue
}
role := strings.ToLower(strings.TrimSpace(line[:idx]))
content := strings.TrimSpace(line[idx+1:])
if role != "system" && role != "user" && role != "assistant" {
role = "assistant"
}
out = append(out, llm.PromptMessage{Role: role, Content: content})
}
return out
}
func fallbackPromptsFromMessages(messages []llm.PromptMessage) (string, string) {
sysParts := make([]string, 0, 2)
userParts := make([]string, 0, len(messages))
for _, m := range messages {
role := strings.ToLower(strings.TrimSpace(m.Role))
content := strings.TrimSpace(m.Content)
if content == "" {
continue
}
if role == "system" {
sysParts = append(sysParts, content)
continue
}
userParts = append(userParts, role+": "+content)
}
return strings.Join(sysParts, "\n\n"), strings.Join(userParts, "\n")
}
func (o *Orchestrator) buildFileUploadAck(ctx filePromptContext) string {
if len(ctx.FileIDs) == 0 {
return "文件已接收,但未拿到有效 file_id。请重新上传一次。"
}
lines := []string{
fmt.Sprintf("文件上传完成,已缓存 %d 个 file_id。", len(ctx.FileIDs)),
"请继续发送你的问题,我会结合这些文件内容和历史对话一起回答。",
}
if strings.TrimSpace(ctx.Summary) != "" {
lines = append(lines, "", ctx.Summary)
}
return strings.Join(lines, "\n")
}
func nonEmptyIDs(ids []string) []string {
if len(ids) == 0 {
return nil
}
out := make([]string, 0, len(ids))
seen := map[string]struct{}{}
for _, id := range ids {
id = strings.TrimSpace(id)
if id == "" {
continue
}
if _, ok := seen[id]; ok {
continue
}
seen[id] = struct{}{}
out = append(out, id)
}
return out
}
func (c filePromptContext) toPendingRefs() []pendingFileRef {
if len(c.Uploaded) > 0 {
copied := make([]pendingFileRef, len(c.Uploaded))
copy(copied, c.Uploaded)
return sanitizePendingRefs(copied)
}
ids := nonEmptyIDs(c.FileIDs)
out := make([]pendingFileRef, 0, len(ids))
for _, id := range ids {
out = append(out, pendingFileRef{ID: id})
}
return out
}
func (o *Orchestrator) appendPendingFiles(chatID, userID string, refs []pendingFileRef) {
refs = sanitizePendingRefs(refs)
if len(refs) == 0 {
return
}
key := pendingFileKey(chatID, userID)
o.pendingFilesMu.Lock()
defer o.pendingFilesMu.Unlock()
merged := append(o.pendingFiles[key], refs...)
o.pendingFiles[key] = sanitizePendingRefs(merged)
}
func (o *Orchestrator) getPendingFiles(chatID, userID string) []pendingFileRef {
key := pendingFileKey(chatID, userID)
o.pendingFilesMu.Lock()
defer o.pendingFilesMu.Unlock()
snapshot := o.pendingFiles[key]
out := make([]pendingFileRef, len(snapshot))
copy(out, snapshot)
return out
}
func (o *Orchestrator) clearPendingFiles(chatID, userID string) {
key := pendingFileKey(chatID, userID)
o.pendingFilesMu.Lock()
defer o.pendingFilesMu.Unlock()
delete(o.pendingFiles, key)
}
func pendingFileKey(chatID, userID string) string {
return strings.TrimSpace(chatID) + "::" + strings.TrimSpace(userID)
}
func sanitizePendingRefs(refs []pendingFileRef) []pendingFileRef {
if len(refs) == 0 {
return nil
}
out := make([]pendingFileRef, 0, len(refs))
seen := map[string]struct{}{}
for _, r := range refs {
id := strings.TrimSpace(r.ID)
if id == "" {
continue
}
if _, ok := seen[id]; ok {
continue
}
seen[id] = struct{}{}
r.ID = id
r.Name = strings.TrimSpace(r.Name)
r.MimeType = strings.TrimSpace(r.MimeType)
out = append(out, r)
}
return out
}
func defaultIfEmpty(v, fallback string) string {
if strings.TrimSpace(v) == "" {
return fallback
}
return v
}
// formatRelevantSkillsForPrompt 返回与当前用户问题最相关的技能内容。
func (o *Orchestrator) formatSelectedSkillsForPrompt(userInput string, selected []knowledge.Skill) string {
skills := selected
if len(skills) == 0 {
skills = o.selectRelevantSkills(userInput, 4)
}
if len(skills) == 0 {
return "(none matched, tools are still globally available)"
}
return formatSkills(skills)
}
func (o *Orchestrator) selectRelevantSkills(userInput string, maxCount int) []knowledge.Skill {
if maxCount <= 0 {
maxCount = 4
}
query := strings.TrimSpace(strings.ToLower(userInput))
all := o.getSkillsSnapshot()
if query == "" || len(all) <= maxCount {
return all
}
queryTokens := buildQueryTokens(query)
type item struct {
skill knowledge.Skill
score int
}
ranked := make([]item, 0, len(all))
for _, sk := range all {
hay := strings.ToLower(sk.Name + "\n" + clipForScoring(sk.Content, 1800))
score := 0
if strings.Contains(hay, query) {
score += 8
}
for _, tk := range queryTokens {
if strings.Contains(hay, tk) {
score++
}
}
if score == 0 {
continue
}
ranked = append(ranked, item{skill: sk, score: score})
}
if len(ranked) == 0 {
return nil
}
sort.Slice(ranked, func(i, j int) bool {
if ranked[i].score == ranked[j].score {
return strings.ToLower(strings.TrimSpace(ranked[i].skill.Name)) < strings.ToLower(strings.TrimSpace(ranked[j].skill.Name))
}
return ranked[i].score > ranked[j].score
})
if len(ranked) > maxCount {
ranked = ranked[:maxCount]
}
out := make([]knowledge.Skill, 0, len(ranked))
for _, r := range ranked {
out = append(out, r.skill)
}
return out
}
func buildQueryTokens(query string) []string {
set := map[string]struct{}{}
collectToken := func(t string) {
t = strings.TrimSpace(t)
if len([]rune(t)) < 2 {
return
}
set[t] = struct{}{}
}
for _, part := range strings.FieldsFunc(query, func(r rune) bool {
if r >= 'a' && r <= 'z' {
return false
}
if r >= '0' && r <= '9' {
return false
}
if r >= 0x4e00 && r <= 0x9fff {
return false
}
return true
}) {
collectToken(part)
}
// 针对中文无空格输入,补充 2-gram 提升匹配命中率。
runes := []rune(query)
for i := 0; i+1 < len(runes); i++ {
r1 := runes[i]
r2 := runes[i+1]
if (r1 >= 0x4e00 && r1 <= 0x9fff) && (r2 >= 0x4e00 && r2 <= 0x9fff) {
collectToken(string([]rune{r1, r2}))
}
}
out := make([]string, 0, len(set))
for tk := range set {
out = append(out, tk)
}
sort.Strings(out)
return out
}
func clipForScoring(s string, maxRunes int) string {
if maxRunes <= 0 {
maxRunes = 1800
}
r := []rune(s)
if len(r) <= maxRunes {
return s
}
return string(r[:maxRunes])
}
func formatRuntimeContextForPrompt() string {
goos := strings.TrimSpace(strings.ToLower(runtime.GOOS))
if goos == "" {
goos = "unknown"
}
return "当前运行系统 GOOS=" + goos + "。请优先使用与该系统一致的策略。仅当用户明确要求时,才采用其他系统(如 Windows的专用流程。"
}
// emitCapabilityGap 处理能力缺口信息埋点或者通过 AI 自动创建生成相应缺失技能的逻辑
func (o *Orchestrator) emitCapabilityGap(chatID, userID, intent, reason string) {
if !o.enableCapabilityGap {
return
}
intent = strings.TrimSpace(intent)
reason = strings.TrimSpace(reason)
if intent == "" || reason == "" {
return
}
if len(intent) > 1000 {
intent = intent[:1000] // 防止恶意使用超长 payload
}
if len(reason) > 240 {
reason = reason[:240] // 保证状态长度在 DB 内正常可用
}
if err := o.store.SaveCapabilityGap(chatID, userID, intent, reason); err != nil && o.log != nil {
o.log.Warnf("save capability gap failed chat_id=%s user_id=%s err=%v", chatID, userID, err)
return
}
// 提取出高频率缺口并在超出阈值后进行 draft 生成
clusters, err := o.store.TopCapabilityGapClusters(20, time.Now().UTC().Add(-o.gapLookbackDuration))
if err != nil {
if o.log != nil {
o.log.Warnf("query capability gap clusters failed err=%v", err)
}
return
}
for _, c := range clusters {
if c.Count < o.gapDraftTriggerCount {
continue
}
path, created, draftErr := knowledge.GenerateSkillDraft(c, o.autoSkillDir)
if draftErr != nil {
if o.log != nil {
o.log.Warnf("generate skill draft failed intent_key=%s reason=%s err=%v", c.IntentKey, c.Reason, draftErr)
}
continue
}
if created && o.log != nil {
o.log.Infof("capability gap draft generated path=%s intent_key=%s reason=%s count=%d", path, c.IntentKey, c.Reason, c.Count)
}
// 如果生成了新技能则将它们重新加载进环境
if created {
if reloadErr := o.ReloadSkills(); reloadErr != nil && o.log != nil {
o.log.Warnf("auto reload skills failed after generation path=%s err=%v", path, reloadErr)
}
}
}
}
// ReloadSkills 会从提供的技能目录动态从最新存储位置载入所有技能定义而不重启系统。
func (o *Orchestrator) ReloadSkills() error {
skills, err := knowledge.LoadSkillSet(o.skillsDir)
if err != nil {
return err
}
summaries, err := knowledge.LoadSkillSummaries(o.skillsDir)
if err != nil {
return err
}
// 利用 RWMutex 做热更新保护
o.skillsMu.Lock()
o.skills = skills
o.skillSummaries = copySkillSummaries(summaries)
o.skillsMu.Unlock()
if o.log != nil {
o.log.Infof("skills hot reloaded count=%d dir=%s", len(skills), o.skillsDir)
}
return nil
}
func (o *Orchestrator) getSkillsSnapshot() []knowledge.Skill {
o.skillsMu.RLock()
defer o.skillsMu.RUnlock()
out := make([]knowledge.Skill, len(o.skills))
copy(out, o.skills)
return out
}
func (o *Orchestrator) getSkillSummariesSnapshot() []knowledge.SkillSummary {
o.skillsMu.RLock()
defer o.skillsMu.RUnlock()
return copySkillSummaries(o.skillSummaries)
}
// BuildCapabilityGapReport 生成指定数量以内的近期高频缺失功能报错并格式化成报表。
func (o *Orchestrator) BuildCapabilityGapReport(limit int) (string, error) {
clusters, err := o.store.TopCapabilityGapClusters(limit, time.Now().UTC().Add(-o.gapLookbackDuration))
if err != nil {
return "", err
}
if len(clusters) == 0 {
return "最近没有采集到能力缺口记录。", nil
}
b := strings.Builder{}
b.WriteString("高频能力缺口清单:\n")
for i, c := range clusters {
line := fmt.Sprintf("%d) intent=%s | reason=%s | count=%d | last_seen=%s\n", i+1, c.IntentKey, c.Reason, c.Count, c.LastSeenAt.Format("2006-01-02 15:04:05"))
b.WriteString(line)
}
b.WriteString("\n草稿目录")
b.WriteString(o.autoSkillDir)
b.WriteString("\n系统会在达到阈值后自动生成并热加载技能你也可以手动发送 /reload_skills。")
return b.String(), nil
}
func (o *Orchestrator) formatSkillSummariesForPrompt() string {
summaries := o.getSkillSummariesSnapshot()
if len(summaries) == 0 {
return "(none)"
}
sort.Slice(summaries, func(i, j int) bool {
left := strings.ToLower(strings.TrimSpace(summaries[i].DirName))
right := strings.ToLower(strings.TrimSpace(summaries[j].DirName))
if left == right {
return strings.ToLower(strings.TrimSpace(summaries[i].Name)) < strings.ToLower(strings.TrimSpace(summaries[j].Name))
}
return left < right
})
b := strings.Builder{}
for _, summary := range summaries {
dir := strings.TrimSpace(summary.DirName)
name := strings.TrimSpace(summary.Name)
desc := strings.TrimSpace(summary.Description)
if name == "" {
continue
}
if len(desc) > 220 {
desc = desc[:220]
}
b.WriteString("- ")
if dir != "" {
b.WriteString("[")
b.WriteString(dir)
b.WriteString("] ")
}
b.WriteString(name)
if desc != "" {
b.WriteString(" => ")
b.WriteString(desc)
}
b.WriteString("\n")
}
return strings.TrimSpace(b.String())
}
func copySkillSummaries(in []knowledge.SkillSummary) []knowledge.SkillSummary {
out := make([]knowledge.SkillSummary, len(in))
copy(out, in)
for i := range out {
out[i].DirName = strings.TrimSpace(out[i].DirName)
out[i].Name = strings.TrimSpace(out[i].Name)
out[i].Description = strings.TrimSpace(out[i].Description)
}
return out
}
func formatToolErrorObservation(code, action, reason string) string {
code = strings.TrimSpace(code)
action = strings.TrimSpace(action)
reason = strings.TrimSpace(reason)
if code == "" {
code = "TOOL_EXEC_ERROR"
}
if action == "" {
action = "unknown"
}
if reason == "" {
reason = "unknown error"
}
return "ERROR_CODE=" + code + "; TOOL=" + action + "; REASON=" + reason
}
func formatSkills(skills []knowledge.Skill) string {
b := strings.Builder{}
for _, skill := range skills {
b.WriteString("## ")
b.WriteString(skill.Name)
b.WriteString("\n")
b.WriteString(skill.Content)
b.WriteString("\n\n")
}
return strings.TrimSpace(b.String())
}
func (o *Orchestrator) formatToolDoc() string {
list := o.tools.List()
if len(list) == 0 {
return "(none)"
}
sort.Slice(list, func(i, j int) bool {
return list[i].Name() < list[j].Name()
})
b := strings.Builder{}
for _, t := range list {
b.WriteString("- ")
b.WriteString(t.Name())
b.WriteString(": ")
b.WriteString(t.Description())
b.WriteString("\n")
}
return strings.TrimSpace(b.String())
}