Files
LaodingBot/internal/agent/orchestrator.go

579 lines
17 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package agent
import (
"context"
"encoding/json"
"fmt"
"sort"
"strconv"
"strings"
"sync"
"time"
"laodingbot/internal/knowledge"
"laodingbot/internal/llm"
"laodingbot/internal/logger"
"laodingbot/internal/memory"
"laodingbot/internal/tools"
)
type Orchestrator struct {
llm llm.Client
store *memory.SQLiteStore
tools *tools.Registry
soul string
skills []knowledge.Skill
skillsDir string
autoSkillDir string
gapDraftTriggerCount int
gapLookbackDuration time.Duration
reactMaxStep int
enableCapabilityGap bool
log *logger.Logger
skillsMu sync.RWMutex
}
func NewOrchestrator(
llmClient llm.Client,
store *memory.SQLiteStore,
registry *tools.Registry,
soul string,
skills []knowledge.Skill,
skillsDir string,
reactMaxStep int,
enableCapabilityGap bool,
autoSkillDir string,
gapDraftTriggerCount int,
gapLookbackDuration time.Duration,
log *logger.Logger,
) *Orchestrator {
if reactMaxStep <= 0 {
reactMaxStep = 4
}
if gapDraftTriggerCount <= 0 {
gapDraftTriggerCount = 3
}
if gapLookbackDuration <= 0 {
gapLookbackDuration = 7 * 24 * time.Hour
}
if strings.TrimSpace(autoSkillDir) == "" {
autoSkillDir = skillsDir
}
return &Orchestrator{
llm: llmClient,
store: store,
tools: registry,
soul: soul,
skills: skills,
skillsDir: skillsDir,
autoSkillDir: autoSkillDir,
gapDraftTriggerCount: gapDraftTriggerCount,
gapLookbackDuration: gapLookbackDuration,
reactMaxStep: reactMaxStep,
enableCapabilityGap: enableCapabilityGap,
log: log,
}
}
func (o *Orchestrator) HandleMessage(ctx context.Context, chatID, userID, text string) (string, error) {
traceID := logger.NewTraceID()
ctx = logger.WithTraceID(ctx, traceID)
traceLogPrefix := "trace_id=" + traceID
if o.log != nil {
o.log.Infof("%s handle message chat_id=%s user_id=%s text_len=%d", traceLogPrefix, chatID, userID, len(text))
o.log.Debugf("%s handle message text=%q", traceLogPrefix, text)
}
if strings.EqualFold(strings.TrimSpace(text), "/reload_skills") {
if err := o.ReloadSkills(); err != nil {
return "技能热加载失败: " + err.Error(), nil
}
return "技能已热加载完成。", nil
}
if strings.EqualFold(strings.TrimSpace(text), "/capability_gaps") {
report, err := o.BuildCapabilityGapReport(10)
if err != nil {
return "缺口报告生成失败: " + err.Error(), nil
}
return report, nil
}
if err := o.store.SaveMessage(chatID, userID, "user", text); err != nil {
if o.log != nil {
o.log.Errorf("%s save user message failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return "", err
}
recent, err := o.store.LoadRecent(chatID, 16)
if err != nil {
if o.log != nil {
o.log.Errorf("%s load recent failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return "", err
}
compressed := memory.CompressForPrompt(recent, 6000)
if o.log != nil {
o.log.Debugf("%s prompt context prepared chat_id=%s recent_count=%d compressed_len=%d", traceLogPrefix, chatID, len(recent), len(compressed))
}
matchedSkills := o.matchSkills(ctx, compressed, text)
if len(matchedSkills) == 0 {
if bootstrap, ok := o.findSkillByKeyword("创建skill", "skill builder", "skill 创建", "构建技能"); ok {
matchedSkills = []knowledge.Skill{bootstrap}
if o.log != nil {
o.log.Infof("%s fallback bootstrap skill selected name=%s", traceLogPrefix, bootstrap.Name)
}
}
}
var response string
if len(matchedSkills) == 0 {
if o.log != nil {
o.log.Infof("%s no skill matched; use direct llm chat_id=%s", traceLogPrefix, chatID)
}
o.emitCapabilityGap(chatID, userID, text, "no_skill_matched")
response, err = o.runDirectLLM(ctx, compressed, text)
} else {
if o.log != nil {
names := make([]string, 0, len(matchedSkills))
for _, s := range matchedSkills {
names = append(names, s.Name)
o.log.Infof("%s skill selected name=%s source=%s", traceLogPrefix, s.Name, s.Source)
o.log.Debugf("%s skill selected content name=%s content=%q", traceLogPrefix, s.Name, s.Content)
}
o.log.Infof("%s skills matched chat_id=%s skills=%s", traceLogPrefix, chatID, strings.Join(names, ","))
}
response, err = o.runReAct(ctx, chatID, userID, compressed, text, matchedSkills)
}
if err != nil {
if o.log != nil {
o.log.Errorf("%s message generation failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return "", err
}
if err := o.store.SaveMessage(chatID, userID, "assistant", response); err != nil {
if o.log != nil {
o.log.Errorf("%s save assistant response failed chat_id=%s err=%v", traceLogPrefix, chatID, err)
}
return "", err
}
if o.log != nil {
o.log.Infof("%s message handled chat_id=%s response_len=%d", traceLogPrefix, chatID, len(response))
}
return response, nil
}
func (o *Orchestrator) runDirectLLM(ctx context.Context, compressedContext, userInput string) (string, error) {
systemPrompt := strings.Join([]string{
"你是一个个人自动化助手,必须遵循如下人格设定并保持一致:",
o.soul,
"",
"如果当前问题没有匹配到已定义技能,请直接回答用户。",
"当你判断必须依赖外部工具结果才能可靠回答时,请明确告知用户需要进一步操作信息。",
}, "\n")
userPrompt := strings.Join([]string{
"历史上下文:",
compressedContext,
"",
"用户问题:",
userInput,
}, "\n")
return o.llm.Generate(ctx, systemPrompt, userPrompt)
}
type reactDecision struct {
Thought string `json:"thought"`
Action string `json:"action"`
ActionInput string `json:"action_input"`
Final string `json:"final"`
}
func (o *Orchestrator) runReAct(ctx context.Context, chatID, userID, compressedContext, userInput string, selectedSkills []knowledge.Skill) (string, error) {
traceID := logger.TraceIDFromContext(ctx)
traceLogPrefix := "trace_id=" + traceID
selectedSkillsDoc := formatSkills(selectedSkills)
toolDoc := o.formatToolDoc()
if o.log != nil {
names := make([]string, 0, len(selectedSkills))
for _, s := range selectedSkills {
names = append(names, s.Name)
}
o.log.Infof("%s react start steps=%d skills=%s", traceLogPrefix, o.reactMaxStep, strings.Join(names, ","))
o.log.Debugf("%s react selected_skills_doc=%q", traceLogPrefix, selectedSkillsDoc)
o.log.Debugf("%s react tools_doc=%q", traceLogPrefix, toolDoc)
}
systemPrompt := strings.Join([]string{
"你是一个个人自动化助手,必须遵循如下人格设定并保持一致:",
o.soul,
"",
"已匹配到的 skills只可按下列技能执行",
selectedSkillsDoc,
"",
"可用工具:",
toolDoc,
"",
"你必须使用 ReAct 模式做决策。",
"只有当技能明确需要工具能力时才调用工具。",
"如果问题可直接回答,不要调用工具。",
"你的输出必须是 JSON对象字段为 thought, action, action_input, final。",
"规则:",
"1) 当需要调工具时final 置空action 必须是可用工具之一action_input 为工具输入。",
"2) 当可以最终回答时action 置 noneaction_input 置空final 填最终回复。",
"3) 不要输出 JSON 之外内容。",
}, "\n")
scratchpad := ""
for step := 1; step <= o.reactMaxStep; step++ {
if o.log != nil {
o.log.Infof("%s react step start step=%d/%d", traceLogPrefix, step, o.reactMaxStep)
o.log.Debugf("%s react scratchpad_before step=%d content=%q", traceLogPrefix, step, scratchpad)
}
prompt := strings.Join([]string{
"历史上下文:",
compressedContext,
"",
"用户问题:",
userInput,
"",
"当前推理记录(按时间顺序):",
scratchpad,
"",
fmt.Sprintf("请输出下一步 JSON 决策。当前步骤: %d/%d", step, o.reactMaxStep),
}, "\n")
raw, err := o.llm.Generate(ctx, systemPrompt, prompt)
if err != nil {
return "", err
}
if o.log != nil {
o.log.Infof("%s react step llm output step=%d raw=%q", traceLogPrefix, step, raw)
}
decision, err := parseDecision(raw)
if err != nil {
if o.log != nil {
o.log.Warnf("%s react parse failed, fallback to direct llm err=%v", traceLogPrefix, err)
}
o.emitCapabilityGap(chatID, userID, userInput, "react_parse_failed")
return o.runDirectLLM(ctx, compressedContext, userInput)
}
if o.log != nil {
o.log.Infof("%s react step decision step=%d thought=%q action=%q action_input=%q final=%q", traceLogPrefix, step, decision.Thought, decision.Action, decision.ActionInput, decision.Final)
}
action := strings.ToLower(strings.TrimSpace(decision.Action))
if action == "" {
action = "none"
}
if action == "none" {
finalText := strings.TrimSpace(decision.Final)
if finalText == "" {
finalText = "我已完成思考,但当前没有足够信息给出稳定结论。"
}
if o.log != nil {
o.log.Infof("%s react final step=%d final=%q", traceLogPrefix, step, finalText)
}
return finalText, nil
}
tool, ok := o.tools.Get(action)
if !ok {
if o.log != nil {
o.log.Warnf("%s react step tool missing step=%d tool=%s", traceLogPrefix, step, action)
}
scratchpad += "Step " + strconv.Itoa(step) + " Thought: " + decision.Thought + "\n"
scratchpad += "Step " + strconv.Itoa(step) + " Observation: " + formatToolErrorObservation("TOOL_NOT_FOUND", action, "tool not found") + "\n"
o.emitCapabilityGap(chatID, userID, userInput, "tool_not_found:"+action)
continue
}
toolOut, toolErr := tool.Call(ctx, decision.ActionInput)
if o.log != nil {
o.log.Infof("%s react step tool call step=%d tool=%s input=%q", traceLogPrefix, step, action, decision.ActionInput)
}
obs := strings.TrimSpace(toolOut)
if obs == "" {
obs = "(empty output)"
}
if toolErr != nil {
obs = formatToolErrorObservation("TOOL_EXEC_ERROR", action, toolErr.Error()) + "\nOUTPUT:\n" + obs
o.emitCapabilityGap(chatID, userID, userInput, "tool_call_failed:"+action)
}
if o.log != nil {
o.log.Infof("%s react step observation step=%d tool=%s observation=%q", traceLogPrefix, step, action, obs)
}
if len(obs) > 2000 {
obs = obs[:2000]
}
scratchpad += "Step " + strconv.Itoa(step) + " Thought: " + decision.Thought + "\n"
scratchpad += "Step " + strconv.Itoa(step) + " Action: " + action + "\n"
scratchpad += "Step " + strconv.Itoa(step) + " ActionInput: " + decision.ActionInput + "\n"
scratchpad += "Step " + strconv.Itoa(step) + " Observation: " + obs + "\n"
}
o.emitCapabilityGap(chatID, userID, userInput, "react_step_exhausted")
return "我尝试了多轮思考与工具调用,但仍未得到稳定结论。请给我更具体的约束或允许我继续尝试。", nil
}
func (o *Orchestrator) matchSkills(ctx context.Context, compressedContext, userInput string) []knowledge.Skill {
skills := o.getSkillsSnapshot()
if len(skills) == 0 {
return nil
}
type skillChoice struct {
Skills []string `json:"skills"`
}
systemPrompt := strings.Join([]string{
"你是技能路由器。",
"任务:根据用户问题,从候选技能中选择 0-2 个最相关技能名称。",
"输出必须是 JSON{\"skills\":[\"name1\",\"name2\"]}",
"如果没有匹配技能,返回 {\"skills\":[]}。",
"不要输出 JSON 之外内容。",
}, "\n")
userPrompt := strings.Join([]string{
"候选技能:",
formatSkillCatalog(skills),
"",
"历史上下文:",
compressedContext,
"",
"用户问题:",
userInput,
}, "\n")
raw, err := o.llm.Generate(ctx, systemPrompt, userPrompt)
if err != nil {
if o.log != nil {
o.log.Warnf("skill match llm failed err=%v", err)
}
return nil
}
if o.log != nil {
o.log.Infof("skill router output raw=%q", raw)
}
raw = normalizeJSON(raw)
choice := skillChoice{}
if err := json.Unmarshal([]byte(raw), &choice); err != nil {
if o.log != nil {
o.log.Warnf("skill match parse failed err=%v", err)
}
return nil
}
picked := make([]knowledge.Skill, 0, 2)
seen := map[string]struct{}{}
for _, name := range choice.Skills {
name = strings.TrimSpace(strings.ToLower(name))
if name == "" {
continue
}
if _, ok := seen[name]; ok {
continue
}
for _, skill := range skills {
if strings.ToLower(strings.TrimSpace(skill.Name)) == name {
picked = append(picked, skill)
seen[name] = struct{}{}
break
}
}
if len(picked) >= 2 {
break
}
}
if o.log != nil {
names := make([]string, 0, len(picked))
for _, s := range picked {
names = append(names, s.Name)
}
o.log.Infof("skill router selected skills=%s", strings.Join(names, ","))
}
return picked
}
func (o *Orchestrator) emitCapabilityGap(chatID, userID, intent, reason string) {
if !o.enableCapabilityGap {
return
}
intent = strings.TrimSpace(intent)
reason = strings.TrimSpace(reason)
if intent == "" || reason == "" {
return
}
if len(intent) > 1000 {
intent = intent[:1000]
}
if len(reason) > 240 {
reason = reason[:240]
}
if err := o.store.SaveCapabilityGap(chatID, userID, intent, reason); err != nil && o.log != nil {
o.log.Warnf("save capability gap failed chat_id=%s user_id=%s err=%v", chatID, userID, err)
return
}
clusters, err := o.store.TopCapabilityGapClusters(20, time.Now().UTC().Add(-o.gapLookbackDuration))
if err != nil {
if o.log != nil {
o.log.Warnf("query capability gap clusters failed err=%v", err)
}
return
}
for _, c := range clusters {
if c.Count < o.gapDraftTriggerCount {
continue
}
path, created, draftErr := knowledge.GenerateSkillDraft(c, o.autoSkillDir)
if draftErr != nil {
if o.log != nil {
o.log.Warnf("generate skill draft failed intent_key=%s reason=%s err=%v", c.IntentKey, c.Reason, draftErr)
}
continue
}
if created && o.log != nil {
o.log.Infof("capability gap draft generated path=%s intent_key=%s reason=%s count=%d", path, c.IntentKey, c.Reason, c.Count)
}
if created {
if reloadErr := o.ReloadSkills(); reloadErr != nil && o.log != nil {
o.log.Warnf("auto reload skills failed after generation path=%s err=%v", path, reloadErr)
}
}
}
}
func (o *Orchestrator) ReloadSkills() error {
skills, err := knowledge.LoadSkillSet(o.skillsDir)
if err != nil {
return err
}
o.skillsMu.Lock()
o.skills = skills
o.skillsMu.Unlock()
if o.log != nil {
o.log.Infof("skills hot reloaded count=%d dir=%s", len(skills), o.skillsDir)
}
return nil
}
func (o *Orchestrator) getSkillsSnapshot() []knowledge.Skill {
o.skillsMu.RLock()
defer o.skillsMu.RUnlock()
out := make([]knowledge.Skill, len(o.skills))
copy(out, o.skills)
return out
}
func (o *Orchestrator) BuildCapabilityGapReport(limit int) (string, error) {
clusters, err := o.store.TopCapabilityGapClusters(limit, time.Now().UTC().Add(-o.gapLookbackDuration))
if err != nil {
return "", err
}
if len(clusters) == 0 {
return "最近没有采集到能力缺口记录。", nil
}
b := strings.Builder{}
b.WriteString("高频能力缺口清单:\n")
for i, c := range clusters {
line := fmt.Sprintf("%d) intent=%s | reason=%s | count=%d | last_seen=%s\n", i+1, c.IntentKey, c.Reason, c.Count, c.LastSeenAt.Format("2006-01-02 15:04:05"))
b.WriteString(line)
}
b.WriteString("\n草稿目录")
b.WriteString(o.autoSkillDir)
b.WriteString("\n系统会在达到阈值后自动生成并热加载技能你也可以手动发送 /reload_skills。")
return b.String(), nil
}
func (o *Orchestrator) findSkillByKeyword(keywords ...string) (knowledge.Skill, bool) {
if len(keywords) == 0 {
return knowledge.Skill{}, false
}
skills := o.getSkillsSnapshot()
for _, s := range skills {
name := strings.ToLower(strings.TrimSpace(s.Name))
content := strings.ToLower(strings.TrimSpace(s.Content))
for _, kw := range keywords {
kw = strings.ToLower(strings.TrimSpace(kw))
if kw == "" {
continue
}
if strings.Contains(name, kw) || strings.Contains(content, kw) {
return s, true
}
}
}
return knowledge.Skill{}, false
}
func formatToolErrorObservation(code, action, reason string) string {
code = strings.TrimSpace(code)
action = strings.TrimSpace(action)
reason = strings.TrimSpace(reason)
if code == "" {
code = "TOOL_EXEC_ERROR"
}
if action == "" {
action = "unknown"
}
if reason == "" {
reason = "unknown error"
}
return "ERROR_CODE=" + code + "; TOOL=" + action + "; REASON=" + reason
}
func formatSkills(skills []knowledge.Skill) string {
b := strings.Builder{}
for _, skill := range skills {
b.WriteString("## ")
b.WriteString(skill.Name)
b.WriteString("\n")
b.WriteString(skill.Content)
b.WriteString("\n\n")
}
return strings.TrimSpace(b.String())
}
func formatSkillCatalog(skills []knowledge.Skill) string {
b := strings.Builder{}
for _, skill := range skills {
summary := strings.ReplaceAll(skill.Content, "\n", " ")
summary = strings.TrimSpace(summary)
if len(summary) > 220 {
summary = summary[:220]
}
b.WriteString("- ")
b.WriteString(skill.Name)
if summary != "" {
b.WriteString(": ")
b.WriteString(summary)
}
b.WriteString("\n")
}
return strings.TrimSpace(b.String())
}
func (o *Orchestrator) formatToolDoc() string {
list := o.tools.List()
if len(list) == 0 {
return "(none)"
}
sort.Slice(list, func(i, j int) bool {
return list[i].Name() < list[j].Name()
})
b := strings.Builder{}
for _, t := range list {
b.WriteString("- ")
b.WriteString(t.Name())
b.WriteString(": ")
b.WriteString(t.Description())
b.WriteString("\n")
}
return strings.TrimSpace(b.String())
}