好,我已经读完了所有核心模块的源码。下面开始全面分析。
lovia 框架深度解析
一、整体架构
分层模型
┌──────────────────────────────────────────────────────────────┐
│ web/ │
│ FastAPI + SSE + Jinja2 UI (optional, lovia[web]) │
├──────────────────────────────────────────────────────────────┤
│ sandbox/ │
│ Filesystem + Process boundary (Sandbox.local, protocols) │
├──────────────────────────────────────────────────────────────┤
│ core/ │
│ agent ← runner ← tools / output / schema / skills │
│ ↓ │
│ providers/ · session · hooks · guardrails · context_policy │
└──────────────────────────────────────────────────────────────┘三层单向依赖:core 不导入 sandbox/web,sandbox 不导入 web。import lovia 只加载 core,MCP/web/search 等功能按需延迟导入。
核心数据流(一次 Runner.run 的生命周期)
Runner.run(agent, input)
│
├─ 1. 构建 RunContext(注入 context、usage、session)
├─ 2. 渲染 system prompt(base + fragments + skills + sandbox)
├─ 3. 运行 input guardrails
├─ 4. ── TURN LOOP ──────────────────────────────────────────
│ ├─ ContextPolicy.apply(items) ← 可能触发摘要压缩
│ ├─ Provider.stream(items, tools, response_format)
│ │ └─ yield ItemDelta → 组装 AssistantMessage
│ │
│ ├─ 若有 tool_calls:
│ │ ├─ 检查 needs_approval → emit ApprovalRequired
│ │ ├─ run_tool(tool, args, ctx) ← 策略链 + retry + timeout
│ │ └─ 追加 ToolCallOutputItem,继续下一轮
│ │
│ ├─ 若遇到 _HandoffSignal → 切换 active_agent,继续 loop
│ │
│ └─ 若 finish_reason == "stop" 且无 tool_calls → 解析 output
│
├─ 5. 运行 output guardrails
├─ 6. 持久化 session(append / replace)
└─ 7. 返回 RunResult二、逐文件深度解析
2.1 agent.py — 声明式 Agent 规格
Why(为什么这样设计)
Agent 是纯声明配置,没有任何运行时可变状态。这样做的好处是:
可在多个并发请求间安全共享同一个 Agent 实例;
克隆(
clone())成本极低,适合在请求时做 per-request 微调(如注入用户 id);和 dataclass +
replace()完全兼容,immutable by convention。
How(实现要点)
@dataclass
class Agent(Generic[TContext]):
name: str
instructions: str | InstructionsFn = ""
model: str | Provider | list[str | Provider] = "openai:gpt-4o-mini"
tools: list[Tool] = ...
output_type: Any = str
...
_fragments: tuple[InstructionsFn, ...] = ... # 动态 system_prompt 片段model支持字符串("openai:gpt-4o-mini")、Provider实例、或列表(fallback 链);_fragments是一个 tuple(不可变),通过@agent.system_prompt装饰器追加,克隆时直接复制不会共享可变状态;render_instructions()将 base + fragments + extra 串联(每段非空才追加),async 统一处理 sync/async callable。
What(示例)
agent = Agent(name="assistant", instructions="You are helpful.")
@agent.system_prompt
async def add_context(ctx) -> str:
return f"User plan: {ctx.context.plan}"
# per-request clone,不影响原对象
req_agent = agent.clone(settings=ModelSettings(temperature=0))2.2 runner.py — 运行时调度核心
Why
所有可变状态(消息历史、usage、active_agent)都在 Runner 的调用栈里,不污染 Agent 声明。
How
Runner 有三个入口:
Runner.run()— 返回RunResult(完整跑完再返回);Runner.stream()— 返回RunHandle(async context manager),边跑边 yield events;Runner.run_sync()— 包一层asyncio.run(),给脚本场景用。
内部的 turn loop 通过两个小型状态类管理:
@dataclass
class _TurnState:
# 当前轮产出的 items、是否需要继续、是否切换了 agent
...
@dataclass
class _BootstrapState:
# 第一轮的初始化数据(system prompt、items 预加载等)
...关键逻辑片段(伪代码)
while turn < max_turns:
items = await context_policy.apply(items, ctx=policy_ctx)
async for delta in provider.stream(items, tools=tools, ...):
# 组装 AssistantMessage,emit TextDelta / ReasoningDelta
...
if assistant_msg.tool_calls:
for call in assistant_msg.tool_calls:
if tool.requires_approval(args, ctx):
approved = await _resolve_approval(call, ctx)
result = await run_tool(tool, args, ctx, ...)
if isinstance(result, _HandoffSignal):
active_agent = result.target
# 可选 input_filter 重写 transcript
break
continue # 进入下一 turn
# finish → parse output → 检查 output guardrails → 返回
output = parse_output(output_spec, assistant_msg.content)
breakProvider fallback chain
providers = agent.resolve_providers() # 可能是 [gpt-4o, claude-3-5-sonnet]
for provider in providers:
try:
return await _run_with_provider(provider, ...)
except ProviderError as e:
if not e.retryable or provider is providers[-1]:
raise
# 尝试下一个 provider2.3 tools/__init__.py — 工具系统
Why
工具是框架最频繁被扩展的部分,设计上做到:
sync/async 函数一视同仁(sync 自动
asyncio.to_thread,不阻塞 event loop);RunContext注入通过类型注解自动识别,不需要特殊参数名;策略链(
ToolPolicy)提供比wrap更结构化的组合方式。
How
@tool 装饰器的核心是 _find_context_param():
def _find_context_param(func, sig) -> str | None:
hints = get_type_hints(func, include_extras=False)
for pname in sig.parameters:
origin = get_origin(hints.get(pname)) or hints.get(pname)
if origin is RunContext:
return pname # 找到了,记住参数名
return None生成的 Tool.invoke 会在调用时自动注入 ctx:
async def invoke(args, ctx):
cleaned = validate_args(func, args) # pydantic 验证 + 类型强制
if context_param:
cleaned[context_param] = ctx
if is_async:
return await func(**cleaned)
return await asyncio.to_thread(lambda: func(**cleaned))策略链(ToolPolicy)实现
async def apply_tool_policies(invoke, policies, args, ctx):
next_in_chain = invoke
for policy in reversed(policies): # 从最外层开始套
inner = next_in_chain
async def wrapped(a, c, _p=policy, _i=inner):
return await _p(_i, a, c)
next_in_chain = wrapped
return await next_in_chain(args, ctx)这是经典的洋葱/中间件模式。reversed 使得第一个 policy 在调用链的最外层。
retry + timeout 围在整个策略链之外:
for attempt in range(1, attempts + 1):
try:
if timeout:
return await asyncio.wait_for(one_attempt(args, ctx), timeout)
return await one_attempt(args, ctx)
except Exception:
await asyncio.sleep(min(0.5, 0.05 * (2 ** (attempt - 1)))) # 指数退避,上限 0.5s内置工具一览
| 工具 | 说明 |
|---|---|
read_file / write_file / edit_file | sandbox 支撑的文件操作 |
list_dir / glob / shell | 目录列举 / 通配 / shell 执行 |
http_fetch | httpx 封装的 HTTP 请求 |
duckduckgo_search_tool | 需 lovia[tools],opt-in |
ask_human | HumanChannel 双向通信(human-in-the-loop) |
think | 占位"思考"工具,让模型显式推理 |
now / sleep | 时间工具 |
todo_tools | TodoList CRUD |
coding_tools() | 工厂函数,组合文件+shell 工具 |
2.4 messages.py — 消息格式
Why
选择 OpenAI Chat Completions wire format 作为内部格式,理由很直接:
OpenAI adapter 直接 pass-through,零翻译;
Anthropic、DeepSeek 等 adapter 只需实现一个方向的转换;
保留
reasoning_content(DeepSeek/o系列 CoT),保证回显正确。
What
ChatMessage(role, content, tool_calls, tool_call_id, name, reasoning_content)
ToolCall(id, name, arguments) # arguments 是 raw JSON string,延迟解析
Usage(input_tokens, output_tokens, cache_read_tokens, cache_write_tokens)
AssistantMessage → to_chat_message() # 组装后转标准格式值得注意:content 可以是 str | list[ContentBlock] | None,支持多模态(图片等)。
2.5 handoff.py — 多智能体协作
Why
两种模式都统一实现为普通 Tool,runner 不需要任何特殊路径感知它们——只需识别 _HandoffSignal 返回值。
How
模式 1:Handoff(控制转移,共享历史)
# 构建一个 transfer_to_<name> 工具
def build_handoff_tool(handoff: Handoff) -> Tool:
async def invoke(args, ctx):
if handoff.on_handoff:
await handoff.on_handoff(args, ctx)
return _HandoffSignal(target=handoff.target, ...) # runner 识别这个 sentinel
...Runner 收到 _HandoffSignal 后:
切换
active_agent;可选运行
input_filter(如drop_stale_tool_calls,清除新 agent 不认识的工具调用);用新 agent 的 system prompt 继续当前 turn loop。
模式 2:agent_as_tool(隔离子运行)
async def invoke(args, ctx):
result = await Runner.run(agent, args["input"], context=ctx.context,
_parent_usage=ctx.usage) # 累计 usage
return result.output子 agent 在完全独立的对话上下文里运行,结果作为字符串返回给父 agent。
伪代码对比
Handoff: agent_as_tool:
Parent Parent
│ transfer_to_B(reason=...) │ ask_B(input=...)
↓ ↓
[_HandoffSignal] Runner.run(B, input)
│ │
Runner switches agent B runs isolated
│ │ → returns output str
B continues in same loop Parent resumes
(shares transcript)2.6 output.py — 结构化输出
Why
不同 provider 对结构化输出的支持程度不同,需要两套策略:
原生 JSON Schema(OpenAI
response_format):直接让模型输出 JSON;工具回调 fallback:注入一个合成的
final_output工具,模型必须调用它一次。
How
def build_output_spec(output_type, supports_response_format) -> OutputSpec | None:
if output_type is str:
return None # 纯文本,不需要结构化
schema = model_json_schema(output_type)
return OutputSpec(
output_type=output_type,
use_tool_fallback=not supports_response_format,
schema=schema,
)输出修复机制(OutputRepairStrategy)
class DefaultOutputRepair:
max_attempts: int = 1
def build_prompt(self, exc: OutputValidationError, attempt: int) -> str | None:
if attempt > self.max_attempts:
return None
return "Your previous response could not be parsed... Please reply again..."失败时,runner 追加一条 user 消息要求模型重试一次,而不是直接抛异常。用户也可以实现自己的 OutputRepairStrategy 协议,控制重试次数和提示语言(如中文提示)。
2.7 schema.py — JSON Schema 生成
Why
LLM 工具调用需要 JSON Schema 描述参数,pydantic v2 的 TypeAdapter 和 create_model 可以处理几乎所有 Python 类型,避免重新造轮子。
How
核心技巧:对函数参数动态构建一个 pydantic model:
def function_args_schema(fn) -> tuple[dict, list[str]]:
fields = {}
for name, param, annotation in _iter_arg_params(fn):
default = param.default if param.default is not empty else ...
fields[name] = (annotation, default)
Model = create_model(f"{fn.__name__}Args", **fields)
schema = _strip_titles(Model.model_json_schema())
schema.setdefault("additionalProperties", False)
return schema, list(fields)_strip_titles() 递归删除 pydantic 自动生成的 title 字段,减少 token 消耗。
Annotated 支持
# 用户可以这样写:
def search(query: Annotated[str, "the search query"], top_k: int = 5):
...
# 也可以用 pydantic Field:
def search(query: Annotated[str, Field(description="...", min_length=1)]):
..._normalize_annotation() 将 bare string 转换成 Field(description=...),统一交给 pydantic 处理。
2.8 context_policy.py — 上下文窗口管理
Why
长对话最终会打满 context window,这是生产环境必须处理的问题。lovia 把这个策略做成可插拔协议,而不是 hard-code。
How
协议只有两个方法:
class ContextPolicy(Protocol):
async def apply(self, items, *, ctx) -> list[Item]: ... # 每轮调用(主动压缩)
async def apply_reactive(self, items, *, ctx) -> list[Item]: ... # provider 溢出时触发SummarizingContextPolicy 的完整流程:
apply() 被调用
│
├─ 估算当前 tokens(取 last_prompt_tokens 和 estimate_tokens 的较大值)
│ ↑ 注意:上一轮的 usage 会低估,因为新消息已追加
│
├─ tokens < threshold?
│ └─ 是 → _maybe_micro_compact()(替换旧 tool result 为占位符)→ 返回
│
└─ 否 → summarize(items) → [summary_item, *safe_window(items, tail=10)]
│
└─ circuit breaker:连续失败 3 次后停止调用,避免烧钱summary item 格式:
[Conversation summary — prior turns compacted]
1. User goal(s): ...
2. Key findings & decisions: ...
3. Files / artifacts changed: ...
4. Outstanding work: ...
5. User constraints & preferences: ...
[End summary]2.9 skills.py — 知识目录
Why
Agent 有时需要访问大量知识文档(如产品手册、操作规程),一次性全部塞进 system prompt 会浪费 token。Skill 系统提供按需加载机制。
How
目录结构:
skills/
refund-policy/
SKILL.md ← YAML frontmatter + body
references/ ← 补充文档
scripts/ ← 可执行脚本
assets/ ← 模板等两种模式:
lazy(默认):system prompt 只列出 name + description,模型调
load_skill(name)拉取全文;eager:所有 SKILL.md 内联进 system prompt。
框架自动注入 list_skills / load_skill / read_skill_file 三个工具。
2.10 hooks.py — 生命周期订阅
Why
上一版 lovia 设计了一堆 on_tool_called() / on_run_completed() 等具名方法,API 面太宽。现在用订阅者模式:一个 on(event_type) 搞定所有事件。
How
hooks = AgentHooks()
@hooks.on(events.ToolCallStarted)
async def log_tool(ev):
print(f"→ {ev.call.name}")
@hooks.on((events.RunCompleted, events.ErrorOccurred))
def at_end(ev):
...
@hooks.on_any
def trace_all(ev):
...dispatch 时用 isinstance 而不是精确类型匹配,支持父类捕获子类事件。sync/async handler 均支持,内部用 _maybe_await() 统一处理。
2.11 guardrails.py — 守护栏
Why
输入/输出验证属于横切关注点,需要从主逻辑中分离。
How
协议极简:
GuardrailFn = Callable[[Any, RunContext], GuardrailVerdict | Awaitable[GuardrailVerdict]]
# 返回 None/False = 通过;True 或 非空字符串 = 拒绝典型用法:
async def no_pii(messages, ctx) -> str | None:
if any("ssn" in m.text.lower() for m in messages):
return "PII detected in input" # 触发 GuardrailTripped
agent = Agent(..., input_guardrails=[no_pii])2.12 events.py — 事件系统
Why
流式 API 和可观测性复用同一套事件类型,不需要维护两套并行机制。
What(事件层次)
Event
├── RunEvent: RunStarted, RunCompleted
├── TurnEvent: TurnStarted, TurnEnded
├── DeltaEvent: TextDelta, ReasoningDelta
├── MessageEvent: MessageCompleted(items)
├── ToolEvent: ToolCallStarted, ToolCallCompleted, ApprovalRequired
├── TransitionEvent: HandoffOccurred
├── ErrorEvent: ErrorOccurred
└── ContextEvent: ContextCompactedApprovalRequired 比较特殊——它携带了一个 _channel: ApprovalChannel,消费者可以在事件处理器里直接调用 ev.approve() / ev.reject(),实现流式审批流程。
2.13 providers/base.py — Provider 协议
Why
Provider 只需实现一个 stream() 方法(AsyncIterator),不需要非流式 generate,简化了 adapter 开发。
How
class Provider(Protocol):
name: str
def stream(self, input: list[Item], *, tools, response_format, settings
) -> AsyncIterator[ItemDelta]: ...ItemDelta 是一个联合类型(TextDelta / ReasoningDelta / ToolCallDelta / UsageDelta / FinishDelta),runner 根据类型组装完整的 AssistantMessage。
context window 相关的两个可选方法(estimate_tokens / context_window)通过 getattr 检测而不要求 Protocol 声明,保持向后兼容。
2.14 session.py — 会话协议
class Session(Protocol):
async def load(self, session_id: str) -> list[Item]: ...
async def append(self, session_id: str, items: list[Item]) -> None: ...
async def replace(self, session_id: str, items: list[Item]) -> None: ... # for compaction
async def clear(self, session_id: str) -> None: ...用 Item 而不是 ChatMessage 作为存储单元,原因是 Item 保留了 reasoning、server-side tool call id 等 provider-specific 元数据,能无损 round-trip。
2.15 exceptions.py — 异常体系
所有异常继承自 LoviaError,统一携带 hint 字段:
LoviaError
├── UserError # 配置错误(用户可修)
├── ProviderError # LLM API 错误(含 retryable 字段)
├── ToolError # 工具执行失败
├── MaxTurnsExceeded # turn 上限
├── OutputValidationError # 结构化输出解析失败(含 raw 字段)
├── BudgetExceeded # RunBudget 超限
├── RunCancelled # CancelToken 触发
├── GuardrailTripped # 守护栏拦截
└── ContextOverflowError # provider 上下文超限ProviderError.retryable 让 runner 可以做智能重试决策;ContextOverflowError 被 runner 捕获后触发 context_policy.apply_reactive()。
三、总体评价
优点
1. 设计哲学贯彻始终
"简洁 > 轻量 > 易扩展 > 通用"的优先级不只是口号,从代码里能清晰感受到:
Agent是纯 dataclass,无继承;Provider、Session、ContextPolicy都是Protocol,不是 ABC;没有全局注册表、没有单例 runner——所有状态在调用栈里;
核心两个硬依赖(
httpx+pydantic),非常克制。
2. Tool 系统设计优雅
sync/async 透明;
RunContext注入通过类型注解自动发现,不污染工具签名;ToolPolicy洋葱链比传统 middleware 更 composable;result_renderer分离了"运行"和"呈现"。
3. 多智能体模式正交
Handoff 和 agent_as_tool 都通过普通 Tool 实现,runner 只需识别一个 sentinel 值,不需要特殊分支,保持了核心循环的简洁。
4. 上下文管理完备
SummarizingContextPolicy 的设计很成熟:
双重 token 估算(防止 stale usage 低估);
circuit breaker(防止坏 summarizer 烧钱);
主动 + 被动两路触发;
微压缩(替换旧 tool result)降低对 summarizer 的依赖。
5. 异常设计实用
每个异常都有 hint 字段,ProviderError.retryable 让框架可以做有根据的重试决策,ContextOverflowError 作为统一信号支持跨 vendor。
缺点 / 局限
1. 并发工具调用支持不明
代码中 tool calls 是串行执行的。OpenAI 支持 parallel_tool_calls,模型可能在一轮返回多个 tool call。ModelSettings 里有 parallel_tool_calls: bool | None,但没有看到 runner 侧并发执行多个 tool 的逻辑,可能存在性能损失。
2. Skill 系统 YAML 解析过于简陋
def _parse_frontmatter(text: str) -> tuple[dict[str, str], str]:
for line in parts[1].strip().splitlines():
if ":" in line:
k, v = line.split(":", 1)手工解析 YAML,不支持列表值(triggers 通过字符串 strip [] 处理)。如果 value 含有 :(如 URL),会错误截断。应用 PyYAML 或 tomllib 会更健壮,但增加依赖——这是轻量原则的代价。
3. 没有内置 Structured Concurrency / 任务取消
CancelToken 和 RunBudget 存在于异常定义中,但没有看到系统性的取消传播机制(如 asyncio.TaskGroup),大型并发 agent 场景可能需要用户自己管理。
4. Memory 接口轻描淡写
Agent.memory: Memory | None 字段存在,但 Memory 协议没有在本次读到的代码里体现完整实现,长期记忆(semantic search、embedding)的集成完整度待确认。
5. 缺乏原生 streaming structured output
当前结构化输出通过 final_output tool fallback 实现,这意味着在 tool_fallback 路径下无法边流边解析。原生 response_format 路径支持流式,但仅限 OpenAI。
与其他框架对比
| 维度 | lovia | OpenAI Agents SDK | LangChain | LlamaIndex |
|---|---|---|---|---|
| 核心复杂度 | ★★☆(~2k 行) | ★★★ | ★★★★★ | ★★★★ |
| 硬依赖数 | 2(httpx+pydantic) | ~10+ | 50+ | 30+ |
| Agent 定义风格 | dataclass | dataclass | 类继承/LCEL | 类继承 |
| Provider 可换 | ✅ Protocol | ⚠️ 主要 OpenAI | ✅ 但复杂 | ✅ 但复杂 |
| 流式支持 | ✅ 原生 | ✅ | ⚠️ 部分 | ⚠️ 部分 |
| 上下文压缩 | ✅ 内置策略 | ❌ | ❌(需插件) | ⚠️ 部分 |
| 多智能体 | ✅ Handoff+as_tool | ✅ Handoff | ✅ 但复杂 | ⚠️ |
| Human-in-loop | ✅ HumanChannel | ⚠️ 有限 | ✅ | ⚠️ |
| Skill/知识目录 | ✅ 独特 | ❌ | 🔧 RAG | 🔧 RAG |
lovia 的定位与 OpenAI Agents SDK 最接近,但前者是 provider-neutral 的,后者深度绑定 OpenAI。相较于 LangChain/LlamaIndex,lovia 刻意保持了极小的核心,放弃了生态广度换来可读性和可维护性。
改进方向
并行工具调用:runner 侧用
asyncio.gather()并发执行同一轮的多个 tool call,再汇总结果追加;更健壮的 frontmatter 解析:考虑
tomllib(Python 3.11+ 标准库,零依赖)替代手写 YAML 解析;Structured output streaming:对
response_format路径提供增量 JSON 解析(ijson或累积 buffer),让调用者在结构化输出完成前也能得到 delta;Tracer 协议的参考实现:
Agent.tracer字段存在但没有开箱即用的 OpenTelemetry/Langfuse adapter,提供官方示例会降低集成门槛;Memory 完整化:提供基于
sqlite-vec或chromadb的轻量向量存储实现,与ContextCompacted事件自动联动,让长期记忆真正开箱即用;Provider 扩展文档:目前只有代码,添加一份"如何实现自己的 Provider"的 COOKBOOK 文档,会极大降低社区贡献门槛。
总结:lovia 是一个思路清晰、代码质量较高的框架。它用极小的代码量覆盖了 Agent 框架最关键的功能点,设计决策大多有明确的 why,适合作为学习标本,也适合需要轻量可控 agent 基础设施的团队。主要缺口在于生态深度(Memory、Tracer 完整实现)和若干生产细节(并行 tool call、cancellation 传播)。