双层架构设计
外层 QueryEngine 管理会话生命周期,内层 queryLoop 驱动实际的 Agent 行为循环。
外层 · 会话管理

QueryEngine(QueryEngine.ts · 1295行)

  • 管理跨轮次的消息历史 mutableMessages
  • 处理 SDK 输出流(yield SDKMessage)
  • 跟踪 Token 使用量与总费用
  • 消息转录持久化(sessionStorage)
  • USD 预算限制检查
  • 权限拒绝记录(permissionDenials)
内层 · 行为循环

queryLoop(query.ts · 1729行)

  • while(true) 无限循环主体
  • 流式调用 Anthropic API
  • 提取并执行 tool_use 块
  • 自动压缩 / Context Collapse
  • 错误恢复(prompt_too_long / max_tokens)
  • Token 预算 nudge 继续机制
工具执行层

StreamingToolExecutor(并发控制)

  • 工具在 API 流式完成前已开始执行
  • 并发安全工具批量并行(最多10个)
  • 非并发工具(写文件、Bash)严格串行
  • isConcurrencySafe() 每个工具自声明
  • 失败时自动 discard 孤儿 tool_result
主循环完整流程
queryLoop 在每次迭代中经历以下关键节点,实现一个完整的"思考-行动-观察"周期。
0

迭代前准备

AutoCompact检查 · snip压缩 · Microcompact · Context Collapse · Token阻塞判断

1

🌊 流式调用 Anthropic API

for await (message of deps.callModel({messages, systemPrompt, tools, signal}))
一旦 tool_use 块到达,立即投入 StreamingToolExecutor 排队执行

📥 API 响应中是否包含 tool_use 块?
✅ 否 → needsFollowUp = false
→ 进入终止判断
🔧 是 → needsFollowUp = true
→ 执行所有工具
2

🔧 工具执行(toolOrchestration)

runTools(toolUseBlocks, ...) — 按并发安全性分批执行
并发安全批:读文件/Glob/Grep/MCP工具 → 并行 | 非并发:写文件/Bash → 串行

3

🛑 终止条件检查

AbortController 中断 · maxTurns 限制 · Stop Hooks 阻止继续 · Token 预算耗尽

继续下一轮?
❌ 终止 → return { reason: 'XXX' }
completed / aborted / max_turns / max_budget
🔁 继续 → 更新 state.messages
将 tool_results 追加到消息历史
turnCount++ → continue
4

📤 QueryEngine 消费并输出

转录持久化 · 构建 SDKMessage · 提取最终结果文本 · yield result{type:'success'}

循环终止条件详解
query.ts 中定义了多种退出路径,每种对应不同的终止原因。
正常完成

reason: 'completed'

助手最终消息中无 tool_use 块(needsFollowUp = false),且 stop hooks 未阻止继续。

用户中断

reason: 'aborted_streaming'

检测到 abortController.signal.aborted,生成缺失工具结果的中断消息后退出。

轮次限制

reason: 'max_turns'

达到 maxTurns 上限,附加 max_turns_reached attachment,QueryEngine 转换为 error_max_turns 结果。

上下文阻塞

reason: 'blocking_limit'

Token 数超过阻塞阈值(且无法压缩),返回 PROMPT_TOO_LONG 错误消息。

Stop Hook 阻止

reason: 'stop_hook_prevented'

外部 stop hook 返回阻止信号,Agent 不继续下一轮,直接结束。

Token 预算继续

action: 'continue' nudge

TOKEN_BUDGET 特性启用时,若未达目标 token 数,自动注入 nudge 消息推动 Agent 继续工作。

三条错误恢复路径
query.ts 内建了完整的错误恢复机制,大多数可恢复错误无需暴露给用户。
🗜️

Reactive Compact — 响应式压缩

当 API 返回 413 prompt_too_long 时,临时扣留错误消息,尝试对历史消息进行摘要压缩(调用独立压缩 API),然后重试原请求。仅尝试一次,失败则将原始错误返回给用户。

// query.ts:1119-1175 if (isWithheld413 && reactiveCompact) { const compacted = await reactiveCompact.tryReactiveCompact({...}) if (compacted) { state = { messages: buildPostCompactMessages(compacted), ... } continue // 重试 } yield lastMessage // 恢复失败,返回原错误 }
📈

Max Output Tokens 恢复(最多3次重试)

当模型触发 max_output_tokens 限制时:第一次尝试升级到 64k max_tokens(ESCALATED_MAX_TOKENS),之后最多重试 3 次,每次注入 meta 消息 "Output token limit hit. Resume directly—no apology..." 驱动模型继续输出。

// 第一次:升级到64k state = { maxOutputTokensOverride: ESCALATED_MAX_TOKENS, ... } // 后续:注入nudge消息(最多3次) const recoveryMessage = createUserMessage({ content: 'Output token limit hit. Resume directly — no apology...', isMeta: true, })
🔀

模型 Fallback 切换

当 API 抛出 FallbackTriggeredError(通常因主力模型过载)时,自动切换到 fallbackModel,清空当前轮次的所有消息(Tombstone),用新模型重新发起请求。同时注入系统通知消息告知用户已切换。

// query.ts:894-942 catch (innerError) { if (innerError instanceof FallbackTriggeredError && fallbackModel) { currentModel = fallbackModel attemptWithFallback = true assistantMessages.length = 0 // 清空孤儿消息 yield createSystemMessage(`Switched to ${fallbackModel}...`) } }
关键实现代码
核心主循环与工具执行逻辑的代码片段(带源文件行号)
query.ts · 241-307
async function* queryLoop(params, consumedCommandUuids) { // 不可变参数 const { systemPrompt, userContext, canUseTool, maxTurns, ... } = params // 可变跨迭代状态 let state: State = { messages: params.messages, turnCount: 1, maxOutputTokensRecoveryCount: 0, ... } // 主循环 while (true) { let { toolUseContext } = state const { messages, turnCount, ... } = state // 1. 流式API调用 for await (const message of deps.callModel({ messages: prependUserContext(messagesForQuery, userContext), systemPrompt: fullSystemPrompt, tools: toolUseContext.options.tools, signal: toolUseContext.abortController.signal, })) { if (message.type === 'assistant') { assistantMessages.push(message) // 提取tool_use块,立即投入StreamingToolExecutor const toolBlocks = message.message.content.filter(c => c.type === 'tool_use') if (toolBlocks.length > 0) { toolUseBlocks.push(...toolBlocks) needsFollowUp = true streamingToolExecutor?.addTool(toolBlock, message) } } } // 2. 收集工具结果 for await (const update of runTools(toolUseBlocks, ...)) { if (update.message) { yield update.message toolResults.push(...normalizeMessagesForAPI([update.message], ...)) } } // 3. 终止判断 if (!needsFollowUp) return { reason: 'completed' } if (maxTurns && nextTurnCount > maxTurns) return { reason: 'max_turns' } // 4. 准备下一轮 - 追加消息历史 state = { messages: [...messagesForQuery, ...assistantMessages, ...toolResults], turnCount: nextTurnCount, transition: { reason: 'tool_use' }, ... } continue // → while(true) } }
Task 状态机
每个异步 Agent 任务都经历这5种状态。一旦进入终止态,就不再转换(isTerminalTaskStatus)。
pending 初始状态 spawn running 执行中 success error / API fail TaskStop / abort completed 终止态 ✓ failed 终止态 ✗ killed 终止态 ⊗ isTerminalTaskStatus() completed | failed | killed → 不再接收消息 → 从AppState清理