第7课:Go 实现核心循环 Go

🎯 面试价值:⭐⭐⭐⭐⭐ · 现场手写代码最爱考 · 笔试必出

← 上一章:第6课下一章 → 第8课

一、整体结构:loop.go

第 6 课定义了接口,现在把它们实现成一个可运行的 Agent Loop。核心文件是 agent/loop.go

// agent/loop.go — Agent Loop 核心实现 type agentImpl struct { config *agentConfig wm WorkingMemory // 工作记忆(当前上下文) store MemoryStore // 持久化记忆 llm LLMProvider // LLM 提供者 tools ToolRegistry // 工具注册表 hooks []Hook // 插件钩子 } // Run 是 Agent 的入口 func (a *agentImpl) Run(ctx context.Context, req *Request) (*Response, error) { // 1. 加载长期记忆 mems, _ := a.store.Search(ctx, req.SessionID, req.Input, 5) for _, m := range mems { a.wm.Push(&Message{ Role: "system", Content: fmt.Sprintf("[记忆] %s", m.Content), }) } // 2. 注入系统 prompt a.wm.Push(&Message{Role: "system", Content: a.buildSystemPrompt(req)}) // 3. 注入用户输入 a.wm.Push(&Message{Role: "user", Content: req.Input}) // 4. 进入 ReAct 循环 resp, err := a.reactLoop(ctx, req) if err != nil { return nil, fmt.Errorf("agent loop: %w", err) } // 5. 保存记忆(提取关键事实) facts := extractFacts(req.Input, resp.Content) for _, f := range facts { _ = a.store.Save(ctx, req.SessionID, &MemoryEntry{ Content: f, Type: "fact", CreatedAt: time.Now(), }) } return resp, nil }

二、ReAct 循环:核心 5 步

Agent Loop (ReAct): ┌──────────┐ │ 开始 │ └────┬─────┘ │ ┌───────▼────────┐ │ 1. LLM 调用 │◄──── 触发钩子: pre_llm │ (观察 + 思考) │────── 触发钩子: post_llm └───────┬────────┘ │ ┌───────▼────────┐ │ 2. 检查工具调用│ └───────┬────────┘ │ ┌────────▼────────┐ │ 有工具调用? │ └───┬────────┬────┘ No│ │Yes │ ┌▼───────────────┐ │ │ 3. 执行工具 │ │ │ 触发钩子: pre │ │ │ tool/post_tool │ │ └───────┬───────────┘ │ │ │ │ (结果追加到上下文) │ ▼ │ ┌──────────────┐ │ │ 4. 轮数检查 │ │ │ iteration++ │ │ │ 超预算? │ │ └───┬────┬──────┘ │ No│ │Yes │ │ ▼ │ │ ╔══════════════╗ │ │ ║ 超限回复 ║ │ │ ╚══════════════╝ │ │ │ ▼ │ (回到 LLM 调用) │ ┌──────▼──────┐ │ 5. 最终回复 │ └─────────────┘
// reactLoop — ReAct 循环核心 func (a *agentImpl) reactLoop(ctx context.Context, req *Request) (*Response, error) { start := time.Now() iter := 0 toolCalls := []ToolCallResult{} for { // ── 轮数检查 ── iter++ if iter > a.config.MaxIterations { return &Response{ Content: a.timeoutResponse(iter), ToolCalls: toolCalls, Usage: a.estimateUsage(), Duration: time.Since(start), }, nil } // ── 上下文预算检查 ── tokens, _ := a.llm.CountTokens(ctx, a.wm.String()) if tokens > a.config.MaxContextTokens { err := a.wm.Compress(SummarizeStrategy) if err != nil { return nil, fmt.Errorf("compress failed: %w", err) } } // ── 步骤 1: LLM 调用 ── resp, err := a.callLLM(ctx) if err != nil { if isTransient(err) && iter < 3 { continue // 临时错误自动重试 } return nil, fmt.Errorf("llm call iter %d: %w", iter, err) } // ── 步骤 2: 检查工具调用 ── if len(resp.ToolCalls) == 0 { // LLM 直接回复 → 循环结束 return &Response{ Content: resp.Message.Content, ToolCalls: toolCalls, Usage: resp.Usage, Duration: time.Since(start), }, nil } // ── 步骤 3: 执行工具 ── for _, tc := range resp.ToolCalls { result, err := a.executeTool(ctx, tc) if err != nil { result = fmt.Sprintf("工具调用失败: %v", err) } // 工具结果追加到上下文 a.wm.Push(&Message{ Role: "tool", ToolID: tc.ID, Content: result, }) toolCalls = append(toolCalls, ToolCallResult{ ToolName: tc.Name, Result: result, }) } // ── 继续循环 ── } }
💡 这段代码的核心设计决策:
1. 轮数检查在循环顶部——进入就检查,避免最后一次 LLM 调用浪费 token
2. 上下文压缩在 LLM 调用前——确保发送给 LLM 的不超过预算
3. 临时错误自动重试(最多 3 次)——网络抖动不崩
4. 空 ToolCalls = 结束——LLM 直接回复即认为任务完成(ReAct 的终止条件)

三、LLM 调用实现

// callLLM 对 LLM 做一次完整调用(含钩子) func (a *agentImpl) callLLM(ctx context.Context) (*LLMResponse, error) { // 1. 触发 pre_llm 钩子(让插件有机会拦截) event := &Event{Type: EventPreLLM} for _, hook := range a.hooks { if hook.OnEvent == EventPreLLM { if err := hook.Handler(ctx, event); err != nil { return nil, err } } } // 2. 提取工具定义 toolDefs := a.tools.List() for i := range toolDefs { params, _ := a.tools.Get(toolDefs[i].Name) // 这里可以给工具定义附加当前上下文 } // 3. 调用 LLM resp, err := a.llm.Chat(ctx, &LLMRequest{ Messages: a.wm.Context(), Tools: toolDefs, MaxTokens: a.config.MaxTokens, Temperature: a.config.Temperature, }) if err != nil { return nil, err } // 4. 追加 LLM 回复到工作记忆 a.wm.Push(resp.Message) // 5. 触发 post_llm 钩子 event = &Event{Type: EventPostLLM, Data: resp} for _, hook := range a.hooks { if hook.OnEvent == EventPostLLM { _ = hook.Handler(ctx, event) // 钩子失败不阻塞主流程 } } return resp, nil }

四、工具执行实现

// executeTool 执行单个工具调用 func (a *agentImpl) executeTool(ctx context.Context, tc *ToolCall) (string, error) { // 1. 查找工具 tool, ok := a.tools.Get(tc.Name) if !ok { return "", fmt.Errorf("unknown tool: %s", tc.Name) } // 2. 解析参数 params := make(map[string]any) if err := json.Unmarshal([]byte(tc.Args), ¶ms); err != nil { return "", fmt.Errorf("parse args for %s: %w", tc.Name, err) } // 3. 触发 pre_tool 钩子 event := &Event{Type: EventPreTool, Data: tc} for _, hook := range a.hooks { if hook.OnEvent == EventPreTool { _ = hook.Handler(ctx, event) } } // 4. 执行工具(含超时控制) execCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() result, err := tool.Execute(execCtx, params) // 5. 触发 post_tool 钩子 event = &Event{Type: EventPostTool, Data: &ToolResult{ Name: tc.Name, Result: result, Error: err, }} for _, hook := range a.hooks { if hook.OnEvent == EventPostTool { _ = hook.Handler(ctx, event) } } if err != nil { return "", err } return result, nil }

五、流式输出

非流式版本是一次返回完整结果。流式版本用 channel 逐步推送:

// Stream 实现流式输出 func (a *agentImpl) Stream(ctx context.Context, req *Request) (<-chan *Chunk, error) { ch := make(chan *Chunk, 16) go func() { defer close(ch) // 复用 reactLoop,但通过中间人收集结果 innerReq := &Request{Input: req.Input, SessionID: req.SessionID} // 在独立的 goroutine 中运行 Agent Loop resp, err := a.Run(ctx, innerReq) if err != nil { ch <- &Chunk{Error: err} return } // 将完整回复分块推送 sentences := splitSentences(resp.Content) for _, s := range sentences { select { case ch <- &Chunk{Text: s}: time.Sleep(20 * time.Millisecond) // 模拟逐句推送 case <-ctx.Done(): return } } // 最后推送 usage 信息 ch <- &Chunk{Done: true, Usage: resp.Usage} }() return ch, nil }
⚠️ 生产级优化:上面的流式实现是简化版。真正的流式场景下,
1. LLM 的 SSE 输出应该逐 token 转发,而不是等全部结果后再分段
2. 工具调用结果也应该流式转推(tool call → 等待执行 → tool result)
3. 需要处理 client disconnect(context cancellation 传递)
4. 更严谨的做法:把 Agent 的 think/act/observe 各阶段也通过流式事件暴露

六、压缩策略实现

// 三种压缩策略 type CompressionStrategy int const ( SlidingWindow CompressionStrategy = iota // 滑动窗口(丢弃最早消息) SummarizeStrategy // LLM 摘要压缩 HybridStrategy // 混合策略 ) // Compress 实现工作记忆压缩 func (wm *workingMemory) Compress(strategy CompressionStrategy) error { switch strategy { case SlidingWindow: return wm.slide() case SummarizeStrategy: return wm.summarize() case HybridStrategy: return wm.hybrid() } return nil } // 滑动窗口:保留最近的 N 条消息 func (wm *workingMemory) slide() error { if len(wm.msgs) <= wm.windowSize { return nil } // 保留系统 prompt + 最近的 N 条 keep := make([]*Message, 0, wm.windowSize+2) // 系统 prompt 总是保留 for _, m := range wm.msgs { if m.Role == "system" { keep = append(keep, m) } } // 保留最近的窗口 start := len(wm.msgs) - wm.windowSize if start < 0 { start = 0 } keep = append(keep, wm.msgs[start:]...) wm.msgs = keep return nil } // LLM 摘要:把历史消息汇总为一句话 func (wm *workingMemory) summarize() error { // 分割:找到可压缩的部分(中间的老消息) if len(wm.msgs) < 3 { return nil } // 保留 system prompt + 最近的 2 轮对话 // 中间的部分做摘要 var toCompress []*Message var keep []*Message for i, m := range wm.msgs { if m.Role == "system" { keep = append(keep, m) continue } if i >= len(wm.msgs)-4 { keep = append(keep, m) } else { toCompress = append(toCompress, m) } } // 这里会调用 LLM 进行摘要 // summary := a.llm.Chat(ctx, &LLMRequest{ // Messages: append([]*Message{{Role:"user",Content:"请摘要以下对话"}}, toCompress...), // }) // keep = append(keep, &Message{Role:"system", Content: summary}) wm.msgs = keep return nil }

七、完整的工具实现示例

一个实际可用的搜索工具:

// tool/websearch.go — 搜索引擎工具 type WebSearchTool struct { apiKey string } func NewWebSearchTool(apiKey string) *WebSearchTool { return &WebSearchTool{apiKey: apiKey} } func (w *WebSearchTool) Name() string { return "web_search" } func (w *WebSearchTool) Description() string { return "搜索互联网上的最新信息,返回搜索结果摘要。适合查询实时新闻、数据、文档。" } func (w *WebSearchTool) Parameters(ctx context.Context) (map[string]any, error) { return map[string]any{ "type": "object", "properties": map[string]any{ "query": map[string]any{ "type": "string", "description": "搜索关键词,支持中文", }, "count": map[string]any{ "type": "integer", "description": "返回结果数(默认 5)", "default": 5, }, }, "required": []string{"query"}, }, nil } func (w *WebSearchTool) Execute(ctx context.Context, params map[string]any) (string, error) { query, _ := params["query"].(string) if query == "" { return "", errors.New("query is required") } // 这里调用真实的搜索 API... // 模拟返回 return fmt.Sprintf("搜索 %q 的结果:\n1. 相关链接A\n2. 相关链接B", query), nil }

八、装配起来:cmd/agentd/main.go

// cmd/agentd/main.go — 框架入口 func main() { ctx := context.Background() // 1. 创建 LLM llm := llm.NewAnthropicProvider(llm.Config{ APIKey: os.Getenv("ANTHROPIC_API_KEY"), Model: "claude-sonnet-4", }) // 2. 创建记忆存储 store, err := memory.NewSQLiteStore("data/memory.db") if err != nil { log.Fatal(err) } defer store.Close() // 3. 创建工具 tools := []Tool{ tool.NewWebSearchTool(os.Getenv("SEARCH_API_KEY")), tool.NewCalculator(), tool.NewFileReader(), } // 4. 创建中间件 middlewares := []Middleware{ middleware.Logging(slog.Default()), middleware.Retry(3), middleware.RateLimit(100, time.Minute), } // 5. 装配 Agent agent, err := agent.NewAgent( agent.WithLLM(llm), agent.WithTools(tools...), agent.WithStore(store), agent.WithMiddleware(middlewares...), agent.WithMaxIterations(50), agent.WithMaxContextTokens(8000), ) if err != nil { log.Fatal(err) } // 6. 启动 HTTP 服务 http.HandleFunc("/chat", func(w http.ResponseWriter, r *http.Request) { // ... 解析请求 → agent.Run → 返回响应 }) log.Println("Agent server starting on :8080") log.Fatal(http.ListenAndServe(":8080", nil)) }

九、与 Hermes Agent 对比

概念Hermes Agent (Python)我们的 Go 实现
Agent 循环agent_init.py 中的 AgentLoopagent/loop.go 中的 reactLoop()
LLM 调用LLMProvider 接口(provider/ 目录)LLMProvider 接口(llm/provider.go)
工具注册ToolRegistry(tools/registry.py)ToolRegistry(tool/registry.go)
记忆memory_manager.py(SQLite + 向量)MemoryStore(sqlite.go + redis.go)
上下文压缩conversation_compression.pyworkingMemory.Compress()
中间件/钩子框架内置在 agent 执行流程中Middleware 链 + Plugin 钩子系统
多 Agentdelegate_task 工具 + async_delegation.pyorchestrator/ 包(第8课展开)
配置YAML config + YAML 文件Options 模式 + 代码内配置
流式输出SSE 逐 token 推送Stream() channel 模式

十、面试 3 道题

🎯 Q1:"请手写一个 Agent Loop 的伪代码或真实代码。"

30 秒骨架:
"Agent Loop 的核心是 for 循环 + LLM 调用 + 工具执行:
for iter := 0; iter < max; iter++ {
  resp := llm.Chat(ctx, messages, tools)
  if resp.ToolCalls == nil { return resp.Content }
  for _, tc := range resp.ToolCalls {
    result := tool.Execute(ctx, tc)
    messages = append(messages, ToolResult(tc, result))
  }
}"

2 分钟展开:
1. 先检查迭代预算(防止无限循环)
2. 再检查上下文预算(必要时压缩)
3. 调用 LLM(触发 pre/post 钩子)
4. 解析响应——无工具调用说明结束
5. 执行工具(含超时 + 错误处理)
6. 工具结果追加到上下文——继续循环
重点提:context cancellation 传递、错误分类(临时 vs 永久)、压缩时机
🎯 Q2:"上下文满了怎么办?讲出你的压缩策略。"

答案框架:
三种策略,按场景选择:

1. 滑动窗口(简单快速)
- 保留系统 prompt + 最近 N 条消息
- 复杂度 O(1),不消耗 LLM token
- 问题:丢失中间信息,Agent 可能"失忆"

2. LLM 摘要(信息保留好,但慢且贵)
- 把历史消息发给 LLM 生成一段摘要
- 用摘要消息替换原有消息
- 问题:每次压缩都调用一次 LLM

3. 混合策略(推荐生产用)
- 正常用滑动窗口
- 当 LLM 表现出"遗忘"现象时,触发一次摘要压缩
- 更进阶:按重要性评分保留消息,低分消息汇总

关键点:压缩必须在 LLM 调用之前完成,不然发出去就超预算了。
🎯 Q3:"工具调用超时了怎么处理?"

答案框架:
"我会分层处理:

1. 工具级别超时:每个工具独立 context.WithTimeout(ctx, 30s)——不拖累其他工具
2. 分类错误:超时属于临时错误(transient),自动重试 1-2 次
3. 重试后仍超时:返回给 LLM 一个特殊的 tool result:'工具执行超时,请考虑换一种方式',让 LLM 自己决定是重试还是换策略
4. 兜底:如果所有工具都超时,Agent 至少可以回复'我尝试了搜索但当前不可用'——不能静默失败

额外注意:超时释放 goroutine——defer cancel() 确保不会 goroutine 泄漏。
如果工具是 HTTP 请求,http.Client 必须用 ctx 创建的 transport,否则 context.WithTimeout 只取消了自己,HTTP 请求还在跑。"

十一、课后行动

  1. ✅ 理解 ReAct 循环的 5 步实现(LLM → 工具 → 循环 → 检查 → 结束)
  2. ✅ 记住压缩策略的 3 种方式(滑动窗口 / LLM 摘要 / 混合)
  3. ✅ 能白板写出 Agent Loop 的 Go 版伪代码(面试高频)
  4. ✅ 理解流式输出的 channel 模式和 context 传递
  5. 📦 完整源码可以去 nickliqian/agent-framework 参考(Go 实现)
  6. ⏭ 下节课:生产部署 + 面试准备——系统设计题、避坑指南、全场景覆盖
← 上一章:第6课下一章 → 第8课