第7课:Go 实现核心循环 Go
🎯 面试价值:⭐⭐⭐⭐⭐ · 现场手写代码最爱考 · 笔试必出
一、整体结构:loop.go
第 6 课定义了接口,现在把它们实现成一个可运行的 Agent Loop。核心文件是 agent/loop.go:
// agent/loop.go — Agent Loop 核心实现
type agentImpl struct {
config *agentConfig
wm WorkingMemory // 工作记忆(当前上下文)
store MemoryStore // 持久化记忆
llm LLMProvider // LLM 提供者
tools ToolRegistry // 工具注册表
hooks []Hook // 插件钩子
}
// Run 是 Agent 的入口
func (a *agentImpl) Run(ctx context.Context, req *Request) (*Response, error) {
// 1. 加载长期记忆
mems, _ := a.store.Search(ctx, req.SessionID, req.Input, 5)
for _, m := range mems {
a.wm.Push(&Message{
Role: "system",
Content: fmt.Sprintf("[记忆] %s", m.Content),
})
}
// 2. 注入系统 prompt
a.wm.Push(&Message{Role: "system", Content: a.buildSystemPrompt(req)})
// 3. 注入用户输入
a.wm.Push(&Message{Role: "user", Content: req.Input})
// 4. 进入 ReAct 循环
resp, err := a.reactLoop(ctx, req)
if err != nil {
return nil, fmt.Errorf("agent loop: %w", err)
}
// 5. 保存记忆(提取关键事实)
facts := extractFacts(req.Input, resp.Content)
for _, f := range facts {
_ = a.store.Save(ctx, req.SessionID, &MemoryEntry{
Content: f, Type: "fact", CreatedAt: time.Now(),
})
}
return resp, nil
}
二、ReAct 循环:核心 5 步
Agent Loop (ReAct):
┌──────────┐
│ 开始 │
└────┬─────┘
│
┌───────▼────────┐
│ 1. LLM 调用 │◄──── 触发钩子: pre_llm
│ (观察 + 思考) │────── 触发钩子: post_llm
└───────┬────────┘
│
┌───────▼────────┐
│ 2. 检查工具调用│
└───────┬────────┘
│
┌────────▼────────┐
│ 有工具调用? │
└───┬────────┬────┘
No│ │Yes
│ ┌▼───────────────┐
│ │ 3. 执行工具 │
│ │ 触发钩子: pre │
│ │ tool/post_tool │
│ └───────┬───────────┘
│ │
│ │ (结果追加到上下文)
│ ▼
│ ┌──────────────┐
│ │ 4. 轮数检查 │
│ │ iteration++ │
│ │ 超预算? │
│ └───┬────┬──────┘
│ No│ │Yes
│ │ ▼
│ │ ╔══════════════╗
│ │ ║ 超限回复 ║
│ │ ╚══════════════╝
│ │
│ ▼
│ (回到 LLM 调用)
│
┌──────▼──────┐
│ 5. 最终回复 │
└─────────────┘
// reactLoop — ReAct 循环核心
func (a *agentImpl) reactLoop(ctx context.Context, req *Request) (*Response, error) {
start := time.Now()
iter := 0
toolCalls := []ToolCallResult{}
for {
// ── 轮数检查 ──
iter++
if iter > a.config.MaxIterations {
return &Response{
Content: a.timeoutResponse(iter),
ToolCalls: toolCalls,
Usage: a.estimateUsage(),
Duration: time.Since(start),
}, nil
}
// ── 上下文预算检查 ──
tokens, _ := a.llm.CountTokens(ctx, a.wm.String())
if tokens > a.config.MaxContextTokens {
err := a.wm.Compress(SummarizeStrategy)
if err != nil {
return nil, fmt.Errorf("compress failed: %w", err)
}
}
// ── 步骤 1: LLM 调用 ──
resp, err := a.callLLM(ctx)
if err != nil {
if isTransient(err) && iter < 3 {
continue // 临时错误自动重试
}
return nil, fmt.Errorf("llm call iter %d: %w", iter, err)
}
// ── 步骤 2: 检查工具调用 ──
if len(resp.ToolCalls) == 0 {
// LLM 直接回复 → 循环结束
return &Response{
Content: resp.Message.Content,
ToolCalls: toolCalls,
Usage: resp.Usage,
Duration: time.Since(start),
}, nil
}
// ── 步骤 3: 执行工具 ──
for _, tc := range resp.ToolCalls {
result, err := a.executeTool(ctx, tc)
if err != nil {
result = fmt.Sprintf("工具调用失败: %v", err)
}
// 工具结果追加到上下文
a.wm.Push(&Message{
Role: "tool",
ToolID: tc.ID,
Content: result,
})
toolCalls = append(toolCalls, ToolCallResult{
ToolName: tc.Name,
Result: result,
})
}
// ── 继续循环 ──
}
}
💡 这段代码的核心设计决策:
1. 轮数检查在循环顶部——进入就检查,避免最后一次 LLM 调用浪费 token
2. 上下文压缩在 LLM 调用前——确保发送给 LLM 的不超过预算
3. 临时错误自动重试(最多 3 次)——网络抖动不崩
4. 空 ToolCalls = 结束——LLM 直接回复即认为任务完成(ReAct 的终止条件)
三、LLM 调用实现
// callLLM 对 LLM 做一次完整调用(含钩子)
func (a *agentImpl) callLLM(ctx context.Context) (*LLMResponse, error) {
// 1. 触发 pre_llm 钩子(让插件有机会拦截)
event := &Event{Type: EventPreLLM}
for _, hook := range a.hooks {
if hook.OnEvent == EventPreLLM {
if err := hook.Handler(ctx, event); err != nil {
return nil, err
}
}
}
// 2. 提取工具定义
toolDefs := a.tools.List()
for i := range toolDefs {
params, _ := a.tools.Get(toolDefs[i].Name)
// 这里可以给工具定义附加当前上下文
}
// 3. 调用 LLM
resp, err := a.llm.Chat(ctx, &LLMRequest{
Messages: a.wm.Context(),
Tools: toolDefs,
MaxTokens: a.config.MaxTokens,
Temperature: a.config.Temperature,
})
if err != nil {
return nil, err
}
// 4. 追加 LLM 回复到工作记忆
a.wm.Push(resp.Message)
// 5. 触发 post_llm 钩子
event = &Event{Type: EventPostLLM, Data: resp}
for _, hook := range a.hooks {
if hook.OnEvent == EventPostLLM {
_ = hook.Handler(ctx, event) // 钩子失败不阻塞主流程
}
}
return resp, nil
}
四、工具执行实现
// executeTool 执行单个工具调用
func (a *agentImpl) executeTool(ctx context.Context, tc *ToolCall) (string, error) {
// 1. 查找工具
tool, ok := a.tools.Get(tc.Name)
if !ok {
return "", fmt.Errorf("unknown tool: %s", tc.Name)
}
// 2. 解析参数
params := make(map[string]any)
if err := json.Unmarshal([]byte(tc.Args), ¶ms); err != nil {
return "", fmt.Errorf("parse args for %s: %w", tc.Name, err)
}
// 3. 触发 pre_tool 钩子
event := &Event{Type: EventPreTool, Data: tc}
for _, hook := range a.hooks {
if hook.OnEvent == EventPreTool {
_ = hook.Handler(ctx, event)
}
}
// 4. 执行工具(含超时控制)
execCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
result, err := tool.Execute(execCtx, params)
// 5. 触发 post_tool 钩子
event = &Event{Type: EventPostTool, Data: &ToolResult{
Name: tc.Name, Result: result, Error: err,
}}
for _, hook := range a.hooks {
if hook.OnEvent == EventPostTool {
_ = hook.Handler(ctx, event)
}
}
if err != nil {
return "", err
}
return result, nil
}
五、流式输出
非流式版本是一次返回完整结果。流式版本用 channel 逐步推送:
// Stream 实现流式输出
func (a *agentImpl) Stream(ctx context.Context, req *Request) (<-chan *Chunk, error) {
ch := make(chan *Chunk, 16)
go func() {
defer close(ch)
// 复用 reactLoop,但通过中间人收集结果
innerReq := &Request{Input: req.Input, SessionID: req.SessionID}
// 在独立的 goroutine 中运行 Agent Loop
resp, err := a.Run(ctx, innerReq)
if err != nil {
ch <- &Chunk{Error: err}
return
}
// 将完整回复分块推送
sentences := splitSentences(resp.Content)
for _, s := range sentences {
select {
case ch <- &Chunk{Text: s}:
time.Sleep(20 * time.Millisecond) // 模拟逐句推送
case <-ctx.Done():
return
}
}
// 最后推送 usage 信息
ch <- &Chunk{Done: true, Usage: resp.Usage}
}()
return ch, nil
}
⚠️ 生产级优化:上面的流式实现是简化版。真正的流式场景下,
1. LLM 的 SSE 输出应该逐 token 转发,而不是等全部结果后再分段
2. 工具调用结果也应该流式转推(tool call → 等待执行 → tool result)
3. 需要处理 client disconnect(context cancellation 传递)
4. 更严谨的做法:把 Agent 的 think/act/observe 各阶段也通过流式事件暴露
六、压缩策略实现
// 三种压缩策略
type CompressionStrategy int
const (
SlidingWindow CompressionStrategy = iota // 滑动窗口(丢弃最早消息)
SummarizeStrategy // LLM 摘要压缩
HybridStrategy // 混合策略
)
// Compress 实现工作记忆压缩
func (wm *workingMemory) Compress(strategy CompressionStrategy) error {
switch strategy {
case SlidingWindow:
return wm.slide()
case SummarizeStrategy:
return wm.summarize()
case HybridStrategy:
return wm.hybrid()
}
return nil
}
// 滑动窗口:保留最近的 N 条消息
func (wm *workingMemory) slide() error {
if len(wm.msgs) <= wm.windowSize {
return nil
}
// 保留系统 prompt + 最近的 N 条
keep := make([]*Message, 0, wm.windowSize+2)
// 系统 prompt 总是保留
for _, m := range wm.msgs {
if m.Role == "system" {
keep = append(keep, m)
}
}
// 保留最近的窗口
start := len(wm.msgs) - wm.windowSize
if start < 0 {
start = 0
}
keep = append(keep, wm.msgs[start:]...)
wm.msgs = keep
return nil
}
// LLM 摘要:把历史消息汇总为一句话
func (wm *workingMemory) summarize() error {
// 分割:找到可压缩的部分(中间的老消息)
if len(wm.msgs) < 3 {
return nil
}
// 保留 system prompt + 最近的 2 轮对话
// 中间的部分做摘要
var toCompress []*Message
var keep []*Message
for i, m := range wm.msgs {
if m.Role == "system" {
keep = append(keep, m)
continue
}
if i >= len(wm.msgs)-4 {
keep = append(keep, m)
} else {
toCompress = append(toCompress, m)
}
}
// 这里会调用 LLM 进行摘要
// summary := a.llm.Chat(ctx, &LLMRequest{
// Messages: append([]*Message{{Role:"user",Content:"请摘要以下对话"}}, toCompress...),
// })
// keep = append(keep, &Message{Role:"system", Content: summary})
wm.msgs = keep
return nil
}
七、完整的工具实现示例
一个实际可用的搜索工具:
// tool/websearch.go — 搜索引擎工具
type WebSearchTool struct {
apiKey string
}
func NewWebSearchTool(apiKey string) *WebSearchTool {
return &WebSearchTool{apiKey: apiKey}
}
func (w *WebSearchTool) Name() string { return "web_search" }
func (w *WebSearchTool) Description() string {
return "搜索互联网上的最新信息,返回搜索结果摘要。适合查询实时新闻、数据、文档。"
}
func (w *WebSearchTool) Parameters(ctx context.Context) (map[string]any, error) {
return map[string]any{
"type": "object",
"properties": map[string]any{
"query": map[string]any{
"type": "string",
"description": "搜索关键词,支持中文",
},
"count": map[string]any{
"type": "integer",
"description": "返回结果数(默认 5)",
"default": 5,
},
},
"required": []string{"query"},
}, nil
}
func (w *WebSearchTool) Execute(ctx context.Context, params map[string]any) (string, error) {
query, _ := params["query"].(string)
if query == "" {
return "", errors.New("query is required")
}
// 这里调用真实的搜索 API...
// 模拟返回
return fmt.Sprintf("搜索 %q 的结果:\n1. 相关链接A\n2. 相关链接B", query), nil
}
八、装配起来:cmd/agentd/main.go
// cmd/agentd/main.go — 框架入口
func main() {
ctx := context.Background()
// 1. 创建 LLM
llm := llm.NewAnthropicProvider(llm.Config{
APIKey: os.Getenv("ANTHROPIC_API_KEY"),
Model: "claude-sonnet-4",
})
// 2. 创建记忆存储
store, err := memory.NewSQLiteStore("data/memory.db")
if err != nil {
log.Fatal(err)
}
defer store.Close()
// 3. 创建工具
tools := []Tool{
tool.NewWebSearchTool(os.Getenv("SEARCH_API_KEY")),
tool.NewCalculator(),
tool.NewFileReader(),
}
// 4. 创建中间件
middlewares := []Middleware{
middleware.Logging(slog.Default()),
middleware.Retry(3),
middleware.RateLimit(100, time.Minute),
}
// 5. 装配 Agent
agent, err := agent.NewAgent(
agent.WithLLM(llm),
agent.WithTools(tools...),
agent.WithStore(store),
agent.WithMiddleware(middlewares...),
agent.WithMaxIterations(50),
agent.WithMaxContextTokens(8000),
)
if err != nil {
log.Fatal(err)
}
// 6. 启动 HTTP 服务
http.HandleFunc("/chat", func(w http.ResponseWriter, r *http.Request) {
// ... 解析请求 → agent.Run → 返回响应
})
log.Println("Agent server starting on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
九、与 Hermes Agent 对比
| 概念 | Hermes Agent (Python) | 我们的 Go 实现 |
| Agent 循环 | agent_init.py 中的 AgentLoop | agent/loop.go 中的 reactLoop() |
| LLM 调用 | LLMProvider 接口(provider/ 目录) | LLMProvider 接口(llm/provider.go) |
| 工具注册 | ToolRegistry(tools/registry.py) | ToolRegistry(tool/registry.go) |
| 记忆 | memory_manager.py(SQLite + 向量) | MemoryStore(sqlite.go + redis.go) |
| 上下文压缩 | conversation_compression.py | workingMemory.Compress() |
| 中间件/钩子 | 框架内置在 agent 执行流程中 | Middleware 链 + Plugin 钩子系统 |
| 多 Agent | delegate_task 工具 + async_delegation.py | orchestrator/ 包(第8课展开) |
| 配置 | YAML config + YAML 文件 | Options 模式 + 代码内配置 |
| 流式输出 | SSE 逐 token 推送 | Stream() channel 模式 |
十、面试 3 道题
🎯 Q1:"请手写一个 Agent Loop 的伪代码或真实代码。"
30 秒骨架:
"Agent Loop 的核心是 for 循环 + LLM 调用 + 工具执行:
for iter := 0; iter < max; iter++ {
resp := llm.Chat(ctx, messages, tools)
if resp.ToolCalls == nil { return resp.Content }
for _, tc := range resp.ToolCalls {
result := tool.Execute(ctx, tc)
messages = append(messages, ToolResult(tc, result))
}
}"
2 分钟展开:
1. 先检查迭代预算(防止无限循环)
2. 再检查上下文预算(必要时压缩)
3. 调用 LLM(触发 pre/post 钩子)
4. 解析响应——无工具调用说明结束
5. 执行工具(含超时 + 错误处理)
6. 工具结果追加到上下文——继续循环
重点提:context cancellation 传递、错误分类(临时 vs 永久)、压缩时机
🎯 Q2:"上下文满了怎么办?讲出你的压缩策略。"
答案框架:
三种策略,按场景选择:
1. 滑动窗口(简单快速)
- 保留系统 prompt + 最近 N 条消息
- 复杂度 O(1),不消耗 LLM token
- 问题:丢失中间信息,Agent 可能"失忆"
2. LLM 摘要(信息保留好,但慢且贵)
- 把历史消息发给 LLM 生成一段摘要
- 用摘要消息替换原有消息
- 问题:每次压缩都调用一次 LLM
3. 混合策略(推荐生产用)
- 正常用滑动窗口
- 当 LLM 表现出"遗忘"现象时,触发一次摘要压缩
- 更进阶:按重要性评分保留消息,低分消息汇总
关键点:压缩必须在 LLM 调用之前完成,不然发出去就超预算了。
🎯 Q3:"工具调用超时了怎么处理?"
答案框架:
"我会分层处理:
1. 工具级别超时:每个工具独立 context.WithTimeout(ctx, 30s)——不拖累其他工具
2. 分类错误:超时属于临时错误(transient),自动重试 1-2 次
3. 重试后仍超时:返回给 LLM 一个特殊的 tool result:'工具执行超时,请考虑换一种方式',让 LLM 自己决定是重试还是换策略
4. 兜底:如果所有工具都超时,Agent 至少可以回复'我尝试了搜索但当前不可用'——不能静默失败
额外注意:超时释放 goroutine——defer cancel() 确保不会 goroutine 泄漏。
如果工具是 HTTP 请求,http.Client 必须用 ctx 创建的 transport,否则 context.WithTimeout 只取消了自己,HTTP 请求还在跑。"
十一、课后行动
- ✅ 理解 ReAct 循环的 5 步实现(LLM → 工具 → 循环 → 检查 → 结束)
- ✅ 记住压缩策略的 3 种方式(滑动窗口 / LLM 摘要 / 混合)
- ✅ 能白板写出 Agent Loop 的 Go 版伪代码(面试高频)
- ✅ 理解流式输出的 channel 模式和 context 传递
- 📦 完整源码可以去 nickliqian/agent-framework 参考(Go 实现)
- ⏭ 下节课:生产部署 + 面试准备——系统设计题、避坑指南、全场景覆盖