Engine 模块详细设计
本文档展开 kgent/engine/ 的代码设计。
Engine 模块负责隔离 Pydantic AI,让 kgent runtime 不直接依赖具体 agent engine API。
目标关联
该设计支撑:
- Configurable Professional Identity
- Skill-Driven Capability System
- Tool-Mediated Action System
- Sandbox-First Work Execution
- Quality and Governance System
模块范围
包含:
AgentEngine抽象。AgentRunRequest。AgentResult。AgentError到ErrorInfo的转换。PydanticAIAgentEngine实现。MockAgentEngine测试实现。
不包含:
- profile 语义。
- skill registry。
- memory policy。
- sandbox layout。
- deliverable validation。
- multi-agent orchestration。
文件结构
src/kgent/engine/
__init__.py
base.py
pydantic_ai.py
mock.py
数据模型
位置:base.py
class AgentRunRequest:
context: RunContext
prompt: str
system_prompt: str
tools: list[ToolDefinition]
timeout_seconds: int | None
max_steps: int | None
stream: bool
cancellation_token: object | None
event_callback: object | None
class AgentResult:
status: Literal["completed", "failed", "interrupted"]
final_text: str | None
usage: dict | None
error: ErrorInfo | None
AgentResult.status 表达 engine 调用本身的结果,不直接等同于最终 RunStatus。
Runtime 负责将 engine 结果、governance 结果和 deliverable 检查合并为最终 run state:
| Engine status | 后续检查 | RunStatus |
|---|---|---|
completed | required deliverables present | completed |
completed | required deliverables missing | incomplete |
failed | 不继续交付物检查 | failed |
interrupted | timeout、max_steps、cancellation 等可恢复中断 | incomplete |
Phase 1 不把可恢复中断折叠为普通 failed,以便未来 reviewer、scheduler 或人工打回后能回到同一 session_id/task_id 继续迭代。
AgentEngine 抽象
class AgentEngine:
def run(self, request: AgentRunRequest) -> AgentResult:
...
要求:
- 不依赖 process-global state。
- 不直接写 artifacts。
- 不直接校验 deliverables。
- 所有异常转换为
AgentResult(error=ErrorInfo(...))。
Pydantic AI Adapter
位置:pydantic_ai.py
class PydanticAIAgentEngine(AgentEngine):
def run(self, request: AgentRunRequest) -> AgentResult:
...
职责:
- 从
request.context.config.model构建 Pydantic AI model。 - 创建 Pydantic AI Agent。
- 传入
system_prompt。 - 通过
tools/pydantic_adapter.py将request.tools转换为 Pydantic AI tools。 - 注册转换后的 Pydantic AI tools。
- 执行 prompt。
- 支持 streaming 配置。
- 返回
AgentResult。 - 将异常转换为
ErrorInfo(category="engine")。
Mock Engine
位置:mock.py
class MockAgentEngine(AgentEngine):
def run(self, request: AgentRunRequest) -> AgentResult:
...
用途:
- integration tests 不调用真实模型。
- 模拟 completed。
- 模拟 failed。
- 模拟 interrupted。
- 模拟写 deliverable 前后的状态。
Mock engine 可以通过配置指定:
- final_text。
- 是否写入 deliverables。
- 是否抛出 engine failure。
Scheduler Control Surface
AgentRunRequest 必须保留:
timeout_secondsmax_stepscancellation_tokenevent_callbackstream
Phase 1 可只实现部分底层支持,但接口必须存在。
事件集成
Engine 不直 接持有 recorder,但可以通过 event_callback 或 runtime wrapper 产生:
engine.startedengine.completedengine.failedengine.interrupted
Phase 1 也可由 runtime 在调用 engine 前后发出这些事件。
错误转换
错误转换建议:
provider auth error -> ErrorInfo(code="engine.auth_failed", retryable=false)
rate limit -> ErrorInfo(code="engine.rate_limited", retryable=true)
timeout -> ErrorInfo(code="engine.timeout", retryable=true)
tool protocol error -> ErrorInfo(code="engine.tool_error", retryable=false)
unknown -> ErrorInfo(code="engine.unknown", retryable=false)
不要把 secret 或完整 provider response 默认写入 transcript。
Tool Adapter 边界
Engine 不实现工具业务逻辑。
边界:
tools/
kgent-native tool handlers
tools/pydantic_adapter.py
kgent tools -> Pydantic AI tools
engine/pydantic_ai.py
consumes kgent-native ToolDefinition and adapts them internally
这样未来替换 engine 时,不需要重写工具业务。
测试设计
单元测试:
MockAgentEnginecompleted。MockAgentEnginefailed。MockAgentEngineinterrupted。AgentRunRequest包含 scheduler control fields。- engine exception 转换为
ErrorInfo。 - engine 不直接写 deliverable validation。
集成测试:
- runtime 使用 mock engine 完成最小 run。
- runtime 使用 mock engine failure 进入 failed state。
- runtime 使用 mock engine interrupted 进入 incomplete state。
- engine result final_text 写入 transcript summary。
性能和调度考虑
- Adapter 应保持薄,避免重建不必要状态。
- Streaming 用于改善感知延迟。
- Usage 可用时记录,但不是 Phase 1 硬依赖。
- Engine 无全局状态,未来 worker pool 可并行实例化。
验收标准
AgentEngine抽象存在。- Pydantic AI 代码隔离在
engine/pydantic_ai.py。 - Mock engine 支持 integration tests。
- Engine errors 转换为
ErrorInfo。 - Adapter 暴露调度控制字段。
- Product concepts 不泄漏进 engine。