Files
LaodingBot/cmd/bot/main.go

299 lines
9.7 KiB
Go
Raw Normal View History

2026-02-21 23:01:39 +08:00
package main
import (
"context"
"fmt"
"os"
2026-02-21 23:01:39 +08:00
"os/signal"
"syscall"
"time"
"laodingbot/internal/agent"
"laodingbot/internal/config"
"laodingbot/internal/knowledge"
"laodingbot/internal/llm"
"laodingbot/internal/logger"
"laodingbot/internal/memory"
"laodingbot/internal/runtimews"
"laodingbot/internal/toolhost"
2026-02-21 23:01:39 +08:00
"laodingbot/internal/tools"
"laodingbot/internal/transport/feishu"
"laodingbot/internal/transport/telegram"
"laodingbot/internal/transport/webui"
2026-02-21 23:01:39 +08:00
)
// main 是程序的入口点。它负责初始化环境、加载配置、注册工具并启动消息通道。
2026-02-21 23:01:39 +08:00
func main() {
// 设置优雅监听上下文,接收中断和终止信号
2026-02-21 23:01:39 +08:00
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// 检查是否作为 Toolhost 的子进程运行
isToolhostChild := len(os.Args) > 1 && os.Args[1] == "--toolhost"
workspaceRoot, err := runtimews.PrepareFromEnv()
if err != nil {
panic(fmt.Sprintf("prepare runtime workspace failed: %v", err))
}
2026-02-21 23:01:39 +08:00
// 加载应用配置
2026-02-21 23:01:39 +08:00
cfg, err := config.Load()
if err != nil {
panic(fmt.Sprintf("load config failed: %v", err))
}
// 如果是作为子进程运行,则启动工具宿主端
if isToolhostChild {
if err := toolhost.RunChild(ctx, cfg, nil); err != nil && ctx.Err() == nil {
panic(fmt.Sprintf("toolhost child failed: %v", err))
}
return
}
2026-02-21 23:01:39 +08:00
// 初始化日志系统
2026-02-21 23:01:39 +08:00
appLogger, err := logger.New(cfg.LogLevel)
if err != nil {
panic(fmt.Sprintf("init logger failed: %v", err))
}
appLogger = appLogger.WithComponent("main")
appLogger.Infof("config loaded; channel=%s, log_level=%s workspace=%s", cfg.MessageChannel, cfg.LogLevel, workspaceRoot)
2026-02-21 23:01:39 +08:00
// 初始化 SQLite 数据库存储层(例如记忆存储等)
2026-02-21 23:01:39 +08:00
store, err := memory.NewSQLiteStore(cfg.SQLitePath, appLogger.WithComponent("memory"))
if err != nil {
appLogger.Errorf("init memory store failed: %v", err)
panic(err)
}
defer store.Close()
// 注册内部系统工具
2026-02-21 23:01:39 +08:00
toolRegistry := tools.NewRegistry(appLogger.WithComponent("tools.registry"))
exePath, err := os.Executable()
if err != nil {
appLogger.Errorf("resolve executable path failed: %v", err)
panic(err)
}
// 初始化工具宿主客户端,以便运行独立进程内的工具
tc, err := toolhost.NewClient(toolhost.ClientConfig{
ExecutablePath: exePath,
Args: []string{"--toolhost"},
WorkDir: ".",
CallTimeout: time.Duration(cfg.ToolCallTimeoutSec) * time.Second,
HeartbeatInterval: 5 * time.Second,
MaxConcurrency: 4,
}, appLogger.WithComponent("toolhost.client"))
if err != nil {
appLogger.Errorf("init toolhost client failed: %v", err)
panic(err)
}
defer tc.Close()
2026-02-21 23:01:39 +08:00
// 获取支持的工具列表并将其注册
listCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
toolInfos, err := tc.ToolList(listCtx)
cancel()
2026-02-21 23:01:39 +08:00
if err != nil {
appLogger.Errorf("toolhost list failed: %v", err)
2026-02-21 23:01:39 +08:00
panic(err)
}
if len(toolInfos) == 0 {
panic("toolhost returned empty tool list")
}
for _, info := range toolInfos {
toolRegistry.Register(toolhost.NewRemoteTool(info.Name, info.Description, time.Duration(cfg.ToolCallTimeoutSec)*time.Second, tc))
}
// 加载 AI 角色的基础信息 (Soul)
soul, err := knowledge.LoadSoul(cfg.SoulPath)
2026-02-21 23:01:39 +08:00
if err != nil {
appLogger.Errorf("load soul failed path=%s err=%v", cfg.SoulPath, err)
2026-02-21 23:01:39 +08:00
panic(err)
}
// 加载所有可用技能
skillSet, err := knowledge.LoadSkillSet(cfg.SkillsDir)
if err != nil {
appLogger.Errorf("load skill set failed dir=%s err=%v", cfg.SkillsDir, err)
panic(err)
}
// 加载技能总结,用于后续路由和匹配
skillSummaries, err := knowledge.LoadSkillSummaries(cfg.SkillsDir)
if err != nil {
appLogger.Errorf("load skill summaries failed dir=%s err=%v", cfg.SkillsDir, err)
panic(err)
}
2026-02-21 23:01:39 +08:00
appLogger.Infof("knowledge loaded soul_path=%s skills_dir=%s", cfg.SoulPath, cfg.SkillsDir)
// 实例化 LLM 客户端
2026-02-21 23:01:39 +08:00
llmClient := llm.NewOpenAICompatibleClient(cfg.LLM, appLogger.WithComponent("llm"))
feat: implement streaming chat, skill routing, and SAFe PI planning tools - Add /api/chat/stream endpoint with Server-Sent Events (SSE) for real-time message streaming * Implement StreamEvent types (thought, tool_call, tool_result, final, error) * Add StreamEventCallback mechanism for event propagation * Create StreamChatHandler in webui/bot with proper HTTP headers and flushing - Implement LLM-based skill router for intelligent capability selection * Add optional routerLLM client for semantic routing * Implement routeSkillsWithLLM() to match user intent to available skills * Add matchSkillsByName() for fuzzy skill matching * Update buildUnifiedSystemPrompt() to use routed skills - Add streaming support to ReAct pipeline * Implement runUnifiedReActStream() for streaming thought/action/observation * Emit StreamEvent at each ReAct step * Support callback error handling in streaming mode - Integrate three new DevOps tools * tools/filedoc: Extract document content from file_id via OpenAI * tools/giteaticket: Create Gitea issues from PI plan items with SAFe metadata * tools/piplan: Publish PI planning blueprints with dependency tracking - Add SAFe PI Planning skill * Implement PM/SA/RTE (iron triangle) workflow * Support for Feature, Enabler, and Dependency definition * Automatic task decomposition and Gitea integration - Create frontend integration documentation * Complete SSE protocol specification * TypeScript fetch + ReadableStream example * LLM-ready refactoring template for other projects - Simplify file handling * Remove legacy file context structures and dual-mode processing * Consolidate file operations into UploadAndCacheFiles() * Remove FilePromptMode configuration and related complexity - Update configuration * Add Router model support (LLM_ROUTER_MODEL) * Add Gitea configuration (BaseURL, Token, Owner, Repo) * WebSearch and additional tool infrastructure Tests: All 22 test packages passing, 8/8 webui tests including 3 new stream tests
2026-03-11 17:58:19 +08:00
// 实例化路由 LLM 客户端(如果配置了独立的路由模型)
var routerLLMClient llm.Client
if cfg.LLM.RouterModel != "" {
routerCfg := cfg.LLM
routerCfg.Model = cfg.LLM.RouterModel
routerLLMClient = llm.NewOpenAICompatibleClient(routerCfg, appLogger.WithComponent("llm.router"))
appLogger.Infof("skill router enabled, model=%s", cfg.LLM.RouterModel)
}
// 创建编排器,整合 LLM、记忆系统、知识技能库与各种工具
2026-02-21 23:01:39 +08:00
engine := agent.NewOrchestrator(
llmClient,
feat: implement streaming chat, skill routing, and SAFe PI planning tools - Add /api/chat/stream endpoint with Server-Sent Events (SSE) for real-time message streaming * Implement StreamEvent types (thought, tool_call, tool_result, final, error) * Add StreamEventCallback mechanism for event propagation * Create StreamChatHandler in webui/bot with proper HTTP headers and flushing - Implement LLM-based skill router for intelligent capability selection * Add optional routerLLM client for semantic routing * Implement routeSkillsWithLLM() to match user intent to available skills * Add matchSkillsByName() for fuzzy skill matching * Update buildUnifiedSystemPrompt() to use routed skills - Add streaming support to ReAct pipeline * Implement runUnifiedReActStream() for streaming thought/action/observation * Emit StreamEvent at each ReAct step * Support callback error handling in streaming mode - Integrate three new DevOps tools * tools/filedoc: Extract document content from file_id via OpenAI * tools/giteaticket: Create Gitea issues from PI plan items with SAFe metadata * tools/piplan: Publish PI planning blueprints with dependency tracking - Add SAFe PI Planning skill * Implement PM/SA/RTE (iron triangle) workflow * Support for Feature, Enabler, and Dependency definition * Automatic task decomposition and Gitea integration - Create frontend integration documentation * Complete SSE protocol specification * TypeScript fetch + ReadableStream example * LLM-ready refactoring template for other projects - Simplify file handling * Remove legacy file context structures and dual-mode processing * Consolidate file operations into UploadAndCacheFiles() * Remove FilePromptMode configuration and related complexity - Update configuration * Add Router model support (LLM_ROUTER_MODEL) * Add Gitea configuration (BaseURL, Token, Owner, Repo) * WebSearch and additional tool infrastructure Tests: All 22 test packages passing, 8/8 webui tests including 3 new stream tests
2026-03-11 17:58:19 +08:00
routerLLMClient,
2026-02-21 23:01:39 +08:00
store,
toolRegistry,
soul,
skillSet,
skillSummaries,
cfg.SkillsDir,
2026-02-21 23:01:39 +08:00
cfg.ReactMaxSteps,
cfg.EnableCapabilityGap,
cfg.AutoSkillDir,
cfg.GapDraftTriggerCount,
time.Duration(cfg.GapClusterLookbackHours)*time.Hour,
2026-02-21 23:01:39 +08:00
appLogger.WithComponent("agent"),
)
appLogger.Infof("LaodingBot started, channel=%s", cfg.MessageChannel)
// 根据配置启动对应的信息通道
2026-02-21 23:01:39 +08:00
if err := runMessageChannel(ctx, cfg, engine, appLogger); err != nil && ctx.Err() == nil {
appLogger.Errorf("message channel run failed: %v", err)
panic(err)
}
appLogger.Infof("LaodingBot stopped")
}
// runMessageChannel 负责初始化并运行配置指定的消息通道(如 telegram 或 feishu
2026-02-21 23:01:39 +08:00
func runMessageChannel(ctx context.Context, cfg config.Config, engine *agent.Orchestrator, lg *logger.Logger) error {
switch cfg.MessageChannel {
case "telegram":
tg, err := telegram.NewBot(cfg.Telegram.Token, cfg.Telegram.PollTimeoutSeconds, lg.WithComponent("transport.telegram"))
if err != nil {
return fmt.Errorf("init telegram bot failed: %w", err)
}
lg.Infof("starting telegram transport")
return tg.Run(ctx, func(ctx context.Context, msg telegram.IncomingMessage) (string, error) {
return engine.HandleMessage(ctx, msg.ChatID, msg.UserID, msg.Text)
})
case "feishu":
fs, err := feishu.NewBot(
cfg.Feishu.AppID,
cfg.Feishu.AppSecret,
cfg.Feishu.VerifyToken,
cfg.Feishu.ListenAddr,
cfg.Feishu.EventPath,
lg.WithComponent("transport.feishu"),
)
if err != nil {
return fmt.Errorf("init feishu bot failed: %w", err)
}
lg.Infof("starting feishu transport")
return fs.Run(ctx, func(ctx context.Context, msg feishu.IncomingMessage) (string, error) {
if msg.MsgType == "file" && len(msg.FileBytes) > 0 {
content := msg.FileBytes
if msg.FilePath != "" {
if b, err := os.ReadFile(msg.FilePath); err == nil && len(b) > 0 {
content = b
} else if lg != nil {
lg.Warnf("read local file failed path=%s err=%v; fallback to in-memory bytes", msg.FilePath, err)
}
}
files := []llm.InputFile{{
FileName: msg.FileName,
MimeType: msg.FileMime,
Content: content,
}}
// Feishu file event and user question are split into separate messages.
// Use empty text so file IDs are cached and consumed by the next text query.
return engine.HandleMessageWithFiles(ctx, msg.ChatID, msg.UserID, "", files)
}
2026-02-21 23:01:39 +08:00
return engine.HandleMessage(ctx, msg.ChatID, msg.UserID, msg.Text)
})
case "webui":
wb, err := webui.NewBot(cfg.WebUI, lg.WithComponent("transport.webui"))
if err != nil {
return fmt.Errorf("init webui bot failed: %w", err)
}
lg.Infof("starting webui transport listen_addr=%s", cfg.WebUI.ListenAddr)
return wb.Run(
ctx,
func(ctx context.Context, msg webui.IncomingMessage) (string, error) {
return engine.HandleMessage(ctx, msg.ChatID, msg.UserID, msg.Text)
},
feat: implement streaming chat, skill routing, and SAFe PI planning tools - Add /api/chat/stream endpoint with Server-Sent Events (SSE) for real-time message streaming * Implement StreamEvent types (thought, tool_call, tool_result, final, error) * Add StreamEventCallback mechanism for event propagation * Create StreamChatHandler in webui/bot with proper HTTP headers and flushing - Implement LLM-based skill router for intelligent capability selection * Add optional routerLLM client for semantic routing * Implement routeSkillsWithLLM() to match user intent to available skills * Add matchSkillsByName() for fuzzy skill matching * Update buildUnifiedSystemPrompt() to use routed skills - Add streaming support to ReAct pipeline * Implement runUnifiedReActStream() for streaming thought/action/observation * Emit StreamEvent at each ReAct step * Support callback error handling in streaming mode - Integrate three new DevOps tools * tools/filedoc: Extract document content from file_id via OpenAI * tools/giteaticket: Create Gitea issues from PI plan items with SAFe metadata * tools/piplan: Publish PI planning blueprints with dependency tracking - Add SAFe PI Planning skill * Implement PM/SA/RTE (iron triangle) workflow * Support for Feature, Enabler, and Dependency definition * Automatic task decomposition and Gitea integration - Create frontend integration documentation * Complete SSE protocol specification * TypeScript fetch + ReadableStream example * LLM-ready refactoring template for other projects - Simplify file handling * Remove legacy file context structures and dual-mode processing * Consolidate file operations into UploadAndCacheFiles() * Remove FilePromptMode configuration and related complexity - Update configuration * Add Router model support (LLM_ROUTER_MODEL) * Add Gitea configuration (BaseURL, Token, Owner, Repo) * WebSearch and additional tool infrastructure Tests: All 22 test packages passing, 8/8 webui tests including 3 new stream tests
2026-03-11 17:58:19 +08:00
func(ctx context.Context, msg webui.IncomingMessage, callback webui.StreamEventCallback) (string, error) {
return engine.HandleMessageStream(ctx, msg.ChatID, msg.UserID, msg.Text, buildWebUIStreamForwarder(callback, cfg.WebUI.ExposeReasoning))
feat: implement streaming chat, skill routing, and SAFe PI planning tools - Add /api/chat/stream endpoint with Server-Sent Events (SSE) for real-time message streaming * Implement StreamEvent types (thought, tool_call, tool_result, final, error) * Add StreamEventCallback mechanism for event propagation * Create StreamChatHandler in webui/bot with proper HTTP headers and flushing - Implement LLM-based skill router for intelligent capability selection * Add optional routerLLM client for semantic routing * Implement routeSkillsWithLLM() to match user intent to available skills * Add matchSkillsByName() for fuzzy skill matching * Update buildUnifiedSystemPrompt() to use routed skills - Add streaming support to ReAct pipeline * Implement runUnifiedReActStream() for streaming thought/action/observation * Emit StreamEvent at each ReAct step * Support callback error handling in streaming mode - Integrate three new DevOps tools * tools/filedoc: Extract document content from file_id via OpenAI * tools/giteaticket: Create Gitea issues from PI plan items with SAFe metadata * tools/piplan: Publish PI planning blueprints with dependency tracking - Add SAFe PI Planning skill * Implement PM/SA/RTE (iron triangle) workflow * Support for Feature, Enabler, and Dependency definition * Automatic task decomposition and Gitea integration - Create frontend integration documentation * Complete SSE protocol specification * TypeScript fetch + ReadableStream example * LLM-ready refactoring template for other projects - Simplify file handling * Remove legacy file context structures and dual-mode processing * Consolidate file operations into UploadAndCacheFiles() * Remove FilePromptMode configuration and related complexity - Update configuration * Add Router model support (LLM_ROUTER_MODEL) * Add Gitea configuration (BaseURL, Token, Owner, Repo) * WebSearch and additional tool infrastructure Tests: All 22 test packages passing, 8/8 webui tests including 3 new stream tests
2026-03-11 17:58:19 +08:00
},
func(ctx context.Context, chatID, userID string, files []llm.InputFile) ([]string, error) {
return engine.UploadAndCacheFiles(ctx, chatID, userID, files)
},
func(ctx context.Context, chatID string, limit int) ([]memory.Message, error) {
return engine.GetHistory(chatID, limit)
},
)
2026-02-21 23:01:39 +08:00
default:
return fmt.Errorf("unsupported message channel: %s", cfg.MessageChannel)
}
}
func buildWebUIStreamForwarder(callback webui.StreamEventCallback, exposeReasoning bool) agent.StreamEventCallback {
const finalChunkRunes = 12
const finalChunkInterval = 25 * time.Millisecond
return func(event agent.StreamEvent) error {
if callback == nil {
return nil
}
switch event.Type {
case agent.StreamEventTypeThought, agent.StreamEventTypeToolCall, agent.StreamEventTypeToolResult:
if !exposeReasoning {
return nil
}
return callback(webui.StreamEvent{
Type: webui.StreamEventType(event.Type),
Content: event.Content,
Step: event.Step,
ToolName: event.ToolName,
})
case agent.StreamEventTypeFinal:
runes := []rune(event.Content)
if len(runes) == 0 {
return callback(webui.StreamEvent{Type: webui.StreamEventTypeFinal, Content: "", Step: event.Step})
}
start := 0
for start < len(runes) {
end := start + finalChunkRunes
if end > len(runes) {
end = len(runes)
}
if err := callback(webui.StreamEvent{
Type: webui.StreamEventTypeFinal,
Content: string(runes[start:end]),
Step: event.Step,
}); err != nil {
return err
}
start = end
if start < len(runes) {
time.Sleep(finalChunkInterval)
}
}
return nil
case agent.StreamEventTypeError:
return callback(webui.StreamEvent{
Type: webui.StreamEventTypeError,
Content: event.Content,
Step: event.Step,
})
case agent.StreamEventTypeWorkspaceStart, agent.StreamEventTypeWorkspaceDelta, agent.StreamEventTypeWorkspaceEnd:
return callback(webui.StreamEvent{
Type: webui.StreamEventType(event.Type),
Content: event.Content,
Step: event.Step,
ToolName: event.ToolName,
WorkspaceTitle: event.WorkspaceTitle,
})
default:
return nil
}
}
}