好,我已经读完了所有核心模块的源码。下面开始全面分析。

lovia 框架深度解析

一、整体架构

分层模型

┌──────────────────────────────────────────────────────────────┐
│                         web/                                 │
│   FastAPI + SSE + Jinja2 UI  (optional, lovia[web])         │
├──────────────────────────────────────────────────────────────┤
│                        sandbox/                              │
│   Filesystem + Process boundary  (Sandbox.local, protocols) │
├──────────────────────────────────────────────────────────────┤
│                         core/                                │
│  agent ← runner ← tools / output / schema / skills         │
│           ↓                                                  │
│  providers/ · session · hooks · guardrails · context_policy │
└──────────────────────────────────────────────────────────────┘

三层单向依赖:core 不导入 sandbox/web,sandbox 不导入 web。import lovia 只加载 core,MCP/web/search 等功能按需延迟导入。

核心数据流(一次 Runner.run 的生命周期)

Runner.run(agent, input)
  │
  ├─ 1. 构建 RunContext(注入 context、usage、session)
  ├─ 2. 渲染 system prompt(base + fragments + skills + sandbox)
  ├─ 3. 运行 input guardrails
  ├─ 4. ── TURN LOOP ──────────────────────────────────────────
  │     ├─ ContextPolicy.apply(items)   ← 可能触发摘要压缩
  │     ├─ Provider.stream(items, tools, response_format)
  │     │    └─ yield ItemDelta → 组装 AssistantMessage
  │     │
  │     ├─ 若有 tool_calls:
  │     │    ├─ 检查 needs_approval → emit ApprovalRequired
  │     │    ├─ run_tool(tool, args, ctx)  ← 策略链 + retry + timeout
  │     │    └─ 追加 ToolCallOutputItem,继续下一轮
  │     │
  │     ├─ 若遇到 _HandoffSignal → 切换 active_agent,继续 loop
  │     │
  │     └─ 若 finish_reason == "stop" 且无 tool_calls → 解析 output
  │
  ├─ 5. 运行 output guardrails
  ├─ 6. 持久化 session(append / replace)
  └─ 7. 返回 RunResult

二、逐文件深度解析

2.1 agent.py — 声明式 Agent 规格

Why(为什么这样设计)

Agent 是纯声明配置,没有任何运行时可变状态。这样做的好处是:

How(实现要点)

@dataclass
class Agent(Generic[TContext]):
    name: str
    instructions: str | InstructionsFn = ""
    model: str | Provider | list[str | Provider] = "openai:gpt-4o-mini"
    tools: list[Tool] = ...
    output_type: Any = str
    ...
    _fragments: tuple[InstructionsFn, ...] = ...   # 动态 system_prompt 片段

What(示例)

agent = Agent(name="assistant", instructions="You are helpful.")

@agent.system_prompt
async def add_context(ctx) -> str:
    return f"User plan: {ctx.context.plan}"

# per-request clone,不影响原对象
req_agent = agent.clone(settings=ModelSettings(temperature=0))

2.2 runner.py — 运行时调度核心

Why

所有可变状态(消息历史、usage、active_agent)都在 Runner 的调用栈里,不污染 Agent 声明。

How

Runner 有三个入口:

内部的 turn loop 通过两个小型状态类管理:

@dataclass
class _TurnState:
    # 当前轮产出的 items、是否需要继续、是否切换了 agent
    ...

@dataclass  
class _BootstrapState:
    # 第一轮的初始化数据(system prompt、items 预加载等)
    ...

关键逻辑片段(伪代码)

while turn < max_turns:
    items = await context_policy.apply(items, ctx=policy_ctx)
    
    async for delta in provider.stream(items, tools=tools, ...):
        # 组装 AssistantMessage,emit TextDelta / ReasoningDelta
        ...
    
    if assistant_msg.tool_calls:
        for call in assistant_msg.tool_calls:
            if tool.requires_approval(args, ctx):
                approved = await _resolve_approval(call, ctx)
            result = await run_tool(tool, args, ctx, ...)
            if isinstance(result, _HandoffSignal):
                active_agent = result.target
                # 可选 input_filter 重写 transcript
                break
        continue   # 进入下一 turn
    
    # finish → parse output → 检查 output guardrails → 返回
    output = parse_output(output_spec, assistant_msg.content)
    break

Provider fallback chain

providers = agent.resolve_providers()  # 可能是 [gpt-4o, claude-3-5-sonnet]
for provider in providers:
    try:
        return await _run_with_provider(provider, ...)
    except ProviderError as e:
        if not e.retryable or provider is providers[-1]:
            raise
        # 尝试下一个 provider

2.3 tools/__init__.py — 工具系统

Why

工具是框架最频繁被扩展的部分,设计上做到:

  1. sync/async 函数一视同仁(sync 自动 asyncio.to_thread,不阻塞 event loop);

  2. RunContext 注入通过类型注解自动识别,不需要特殊参数名;

  3. 策略链(ToolPolicy)提供比 wrap 更结构化的组合方式。

How

@tool 装饰器的核心是 _find_context_param()

def _find_context_param(func, sig) -> str | None:
    hints = get_type_hints(func, include_extras=False)
    for pname in sig.parameters:
        origin = get_origin(hints.get(pname)) or hints.get(pname)
        if origin is RunContext:
            return pname   # 找到了,记住参数名
    return None

生成的 Tool.invoke 会在调用时自动注入 ctx:

async def invoke(args, ctx):
    cleaned = validate_args(func, args)   # pydantic 验证 + 类型强制
    if context_param:
        cleaned[context_param] = ctx
    if is_async:
        return await func(**cleaned)
    return await asyncio.to_thread(lambda: func(**cleaned))

策略链(ToolPolicy)实现

async def apply_tool_policies(invoke, policies, args, ctx):
    next_in_chain = invoke
    for policy in reversed(policies):       # 从最外层开始套
        inner = next_in_chain
        async def wrapped(a, c, _p=policy, _i=inner):
            return await _p(_i, a, c)
        next_in_chain = wrapped
    return await next_in_chain(args, ctx)

这是经典的洋葱/中间件模式。reversed 使得第一个 policy 在调用链的最外层。

retry + timeout 围在整个策略链之外

for attempt in range(1, attempts + 1):
    try:
        if timeout:
            return await asyncio.wait_for(one_attempt(args, ctx), timeout)
        return await one_attempt(args, ctx)
    except Exception:
        await asyncio.sleep(min(0.5, 0.05 * (2 ** (attempt - 1))))  # 指数退避,上限 0.5s

内置工具一览

工具说明
read_file / write_file / edit_filesandbox 支撑的文件操作
list_dir / glob / shell目录列举 / 通配 / shell 执行
http_fetchhttpx 封装的 HTTP 请求
duckduckgo_search_toollovia[tools],opt-in
ask_humanHumanChannel 双向通信(human-in-the-loop)
think占位"思考"工具,让模型显式推理
now / sleep时间工具
todo_toolsTodoList CRUD
coding_tools()工厂函数,组合文件+shell 工具

2.4 messages.py — 消息格式

Why

选择 OpenAI Chat Completions wire format 作为内部格式,理由很直接:

What

ChatMessage(role, content, tool_calls, tool_call_id, name, reasoning_content)
ToolCall(id, name, arguments)          # arguments 是 raw JSON string,延迟解析
Usage(input_tokens, output_tokens, cache_read_tokens, cache_write_tokens)
AssistantMessage → to_chat_message()   # 组装后转标准格式

值得注意:content 可以是 str | list[ContentBlock] | None,支持多模态(图片等)。

2.5 handoff.py — 多智能体协作

Why

两种模式都统一实现为普通 Tool,runner 不需要任何特殊路径感知它们——只需识别 _HandoffSignal 返回值。

How

模式 1:Handoff(控制转移,共享历史)

# 构建一个 transfer_to_<name> 工具
def build_handoff_tool(handoff: Handoff) -> Tool:
    async def invoke(args, ctx):
        if handoff.on_handoff:
            await handoff.on_handoff(args, ctx)
        return _HandoffSignal(target=handoff.target, ...)  # runner 识别这个 sentinel
    ...

Runner 收到 _HandoffSignal 后:

  1. 切换 active_agent

  2. 可选运行 input_filter(如 drop_stale_tool_calls,清除新 agent 不认识的工具调用);

  3. 用新 agent 的 system prompt 继续当前 turn loop。

模式 2:agent_as_tool(隔离子运行)

async def invoke(args, ctx):
    result = await Runner.run(agent, args["input"], context=ctx.context,
                              _parent_usage=ctx.usage)   # 累计 usage
    return result.output

子 agent 在完全独立的对话上下文里运行,结果作为字符串返回给父 agent。

伪代码对比

Handoff:                          agent_as_tool:
                                   
Parent                             Parent
  │ transfer_to_B(reason=...)        │ ask_B(input=...)
  ↓                                  ↓
[_HandoffSignal]                   Runner.run(B, input)
  │                                  │
Runner switches agent                B runs isolated
  │                                  │ → returns output str
B continues in same loop           Parent resumes
  (shares transcript)

2.6 output.py — 结构化输出

Why

不同 provider 对结构化输出的支持程度不同,需要两套策略:

  1. 原生 JSON Schema(OpenAI response_format):直接让模型输出 JSON;

  2. 工具回调 fallback:注入一个合成的 final_output 工具,模型必须调用它一次。

How

def build_output_spec(output_type, supports_response_format) -> OutputSpec | None:
    if output_type is str:
        return None    # 纯文本,不需要结构化
    schema = model_json_schema(output_type)
    return OutputSpec(
        output_type=output_type,
        use_tool_fallback=not supports_response_format,
        schema=schema,
    )

输出修复机制(OutputRepairStrategy)

class DefaultOutputRepair:
    max_attempts: int = 1
    
    def build_prompt(self, exc: OutputValidationError, attempt: int) -> str | None:
        if attempt > self.max_attempts:
            return None
        return "Your previous response could not be parsed... Please reply again..."

失败时,runner 追加一条 user 消息要求模型重试一次,而不是直接抛异常。用户也可以实现自己的 OutputRepairStrategy 协议,控制重试次数和提示语言(如中文提示)。

2.7 schema.py — JSON Schema 生成

Why

LLM 工具调用需要 JSON Schema 描述参数,pydantic v2 的 TypeAdaptercreate_model 可以处理几乎所有 Python 类型,避免重新造轮子。

How

核心技巧:对函数参数动态构建一个 pydantic model:

def function_args_schema(fn) -> tuple[dict, list[str]]:
    fields = {}
    for name, param, annotation in _iter_arg_params(fn):
        default = param.default if param.default is not empty else ...
        fields[name] = (annotation, default)
    
    Model = create_model(f"{fn.__name__}Args", **fields)
    schema = _strip_titles(Model.model_json_schema())
    schema.setdefault("additionalProperties", False)
    return schema, list(fields)

_strip_titles() 递归删除 pydantic 自动生成的 title 字段,减少 token 消耗。

Annotated 支持

# 用户可以这样写:
def search(query: Annotated[str, "the search query"], top_k: int = 5):
    ...
# 也可以用 pydantic Field:
def search(query: Annotated[str, Field(description="...", min_length=1)]):
    ...

_normalize_annotation() 将 bare string 转换成 Field(description=...),统一交给 pydantic 处理。

2.8 context_policy.py — 上下文窗口管理

Why

长对话最终会打满 context window,这是生产环境必须处理的问题。lovia 把这个策略做成可插拔协议,而不是 hard-code。

How

协议只有两个方法:

class ContextPolicy(Protocol):
    async def apply(self, items, *, ctx) -> list[Item]: ...         # 每轮调用(主动压缩)
    async def apply_reactive(self, items, *, ctx) -> list[Item]: ... # provider 溢出时触发

SummarizingContextPolicy 的完整流程:

apply() 被调用
  │
  ├─ 估算当前 tokens(取 last_prompt_tokens 和 estimate_tokens 的较大值)
  │                    ↑ 注意:上一轮的 usage 会低估,因为新消息已追加
  │
  ├─ tokens < threshold?
  │   └─ 是 → _maybe_micro_compact()(替换旧 tool result 为占位符)→ 返回
  │
  └─ 否 → summarize(items) → [summary_item, *safe_window(items, tail=10)]
                │
                └─ circuit breaker:连续失败 3 次后停止调用,避免烧钱

summary item 格式:

[Conversation summary — prior turns compacted]
1. User goal(s): ...
2. Key findings & decisions: ...
3. Files / artifacts changed: ...
4. Outstanding work: ...
5. User constraints & preferences: ...
[End summary]

2.9 skills.py — 知识目录

Why

Agent 有时需要访问大量知识文档(如产品手册、操作规程),一次性全部塞进 system prompt 会浪费 token。Skill 系统提供按需加载机制。

How

目录结构:

skills/
  refund-policy/
    SKILL.md       ← YAML frontmatter + body
    references/    ← 补充文档
    scripts/       ← 可执行脚本
    assets/        ← 模板等

两种模式:

框架自动注入 list_skills / load_skill / read_skill_file 三个工具。

2.10 hooks.py — 生命周期订阅

Why

上一版 lovia 设计了一堆 on_tool_called() / on_run_completed() 等具名方法,API 面太宽。现在用订阅者模式:一个 on(event_type) 搞定所有事件。

How

hooks = AgentHooks()

@hooks.on(events.ToolCallStarted)
async def log_tool(ev):
    print(f"→ {ev.call.name}")

@hooks.on((events.RunCompleted, events.ErrorOccurred))
def at_end(ev):
    ...

@hooks.on_any
def trace_all(ev):
    ...

dispatch 时用 isinstance 而不是精确类型匹配,支持父类捕获子类事件。sync/async handler 均支持,内部用 _maybe_await() 统一处理。

2.11 guardrails.py — 守护栏

Why

输入/输出验证属于横切关注点,需要从主逻辑中分离。

How

协议极简:

GuardrailFn = Callable[[Any, RunContext], GuardrailVerdict | Awaitable[GuardrailVerdict]]
# 返回 None/False = 通过;True 或 非空字符串 = 拒绝

典型用法:

async def no_pii(messages, ctx) -> str | None:
    if any("ssn" in m.text.lower() for m in messages):
        return "PII detected in input"   # 触发 GuardrailTripped

agent = Agent(..., input_guardrails=[no_pii])

2.12 events.py — 事件系统

Why

流式 API 和可观测性复用同一套事件类型,不需要维护两套并行机制。

What(事件层次)

Event
├── RunEvent:     RunStarted, RunCompleted
├── TurnEvent:    TurnStarted, TurnEnded
├── DeltaEvent:   TextDelta, ReasoningDelta
├── MessageEvent: MessageCompleted(items)
├── ToolEvent:    ToolCallStarted, ToolCallCompleted, ApprovalRequired
├── TransitionEvent: HandoffOccurred
├── ErrorEvent:   ErrorOccurred
└── ContextEvent: ContextCompacted

ApprovalRequired 比较特殊——它携带了一个 _channel: ApprovalChannel,消费者可以在事件处理器里直接调用 ev.approve() / ev.reject(),实现流式审批流程。

2.13 providers/base.py — Provider 协议

Why

Provider 只需实现一个 stream() 方法(AsyncIterator),不需要非流式 generate,简化了 adapter 开发。

How

class Provider(Protocol):
    name: str
    def stream(self, input: list[Item], *, tools, response_format, settings
               ) -> AsyncIterator[ItemDelta]: ...

ItemDelta 是一个联合类型(TextDelta / ReasoningDelta / ToolCallDelta / UsageDelta / FinishDelta),runner 根据类型组装完整的 AssistantMessage

context window 相关的两个可选方法(estimate_tokens / context_window)通过 getattr 检测而不要求 Protocol 声明,保持向后兼容。

2.14 session.py — 会话协议

class Session(Protocol):
    async def load(self, session_id: str) -> list[Item]: ...
    async def append(self, session_id: str, items: list[Item]) -> None: ...
    async def replace(self, session_id: str, items: list[Item]) -> None: ...  # for compaction
    async def clear(self, session_id: str) -> None: ...

Item 而不是 ChatMessage 作为存储单元,原因是 Item 保留了 reasoning、server-side tool call id 等 provider-specific 元数据,能无损 round-trip。

2.15 exceptions.py — 异常体系

所有异常继承自 LoviaError,统一携带 hint 字段:

LoviaError
├── UserError           # 配置错误(用户可修)
├── ProviderError       # LLM API 错误(含 retryable 字段)
├── ToolError           # 工具执行失败
├── MaxTurnsExceeded    # turn 上限
├── OutputValidationError # 结构化输出解析失败(含 raw 字段)
├── BudgetExceeded      # RunBudget 超限
├── RunCancelled        # CancelToken 触发
├── GuardrailTripped    # 守护栏拦截
└── ContextOverflowError # provider 上下文超限

ProviderError.retryable 让 runner 可以做智能重试决策;ContextOverflowError 被 runner 捕获后触发 context_policy.apply_reactive()

三、总体评价

优点

1. 设计哲学贯彻始终

"简洁 > 轻量 > 易扩展 > 通用"的优先级不只是口号,从代码里能清晰感受到:

2. Tool 系统设计优雅

3. 多智能体模式正交

Handoff 和 agent_as_tool 都通过普通 Tool 实现,runner 只需识别一个 sentinel 值,不需要特殊分支,保持了核心循环的简洁。

4. 上下文管理完备

SummarizingContextPolicy 的设计很成熟:

5. 异常设计实用

每个异常都有 hint 字段,ProviderError.retryable 让框架可以做有根据的重试决策,ContextOverflowError 作为统一信号支持跨 vendor。

缺点 / 局限

1. 并发工具调用支持不明

代码中 tool calls 是串行执行的。OpenAI 支持 parallel_tool_calls,模型可能在一轮返回多个 tool call。ModelSettings 里有 parallel_tool_calls: bool | None,但没有看到 runner 侧并发执行多个 tool 的逻辑,可能存在性能损失。

2. Skill 系统 YAML 解析过于简陋

def _parse_frontmatter(text: str) -> tuple[dict[str, str], str]:
    for line in parts[1].strip().splitlines():
        if ":" in line:
            k, v = line.split(":", 1)

手工解析 YAML,不支持列表值(triggers 通过字符串 strip [] 处理)。如果 value 含有 :(如 URL),会错误截断。应用 PyYAMLtomllib 会更健壮,但增加依赖——这是轻量原则的代价。

3. 没有内置 Structured Concurrency / 任务取消

CancelTokenRunBudget 存在于异常定义中,但没有看到系统性的取消传播机制(如 asyncio.TaskGroup),大型并发 agent 场景可能需要用户自己管理。

4. Memory 接口轻描淡写

Agent.memory: Memory | None 字段存在,但 Memory 协议没有在本次读到的代码里体现完整实现,长期记忆(semantic search、embedding)的集成完整度待确认。

5. 缺乏原生 streaming structured output

当前结构化输出通过 final_output tool fallback 实现,这意味着在 tool_fallback 路径下无法边流边解析。原生 response_format 路径支持流式,但仅限 OpenAI。

与其他框架对比

维度loviaOpenAI Agents SDKLangChainLlamaIndex
核心复杂度★★☆(~2k 行)★★★★★★★★★★★★
硬依赖数2(httpx+pydantic)~10+50+30+
Agent 定义风格dataclassdataclass类继承/LCEL类继承
Provider 可换✅ Protocol⚠️ 主要 OpenAI✅ 但复杂✅ 但复杂
流式支持✅ 原生⚠️ 部分⚠️ 部分
上下文压缩✅ 内置策略❌(需插件)⚠️ 部分
多智能体✅ Handoff+as_tool✅ Handoff✅ 但复杂⚠️
Human-in-loop✅ HumanChannel⚠️ 有限⚠️
Skill/知识目录✅ 独特🔧 RAG🔧 RAG

lovia 的定位与 OpenAI Agents SDK 最接近,但前者是 provider-neutral 的,后者深度绑定 OpenAI。相较于 LangChain/LlamaIndex,lovia 刻意保持了极小的核心,放弃了生态广度换来可读性和可维护性。

改进方向

  1. 并行工具调用:runner 侧用 asyncio.gather() 并发执行同一轮的多个 tool call,再汇总结果追加;

  2. 更健壮的 frontmatter 解析:考虑 tomllib(Python 3.11+ 标准库,零依赖)替代手写 YAML 解析;

  3. Structured output streaming:对 response_format 路径提供增量 JSON 解析(ijson 或累积 buffer),让调用者在结构化输出完成前也能得到 delta;

  4. Tracer 协议的参考实现Agent.tracer 字段存在但没有开箱即用的 OpenTelemetry/Langfuse adapter,提供官方示例会降低集成门槛;

  5. Memory 完整化:提供基于 sqlite-vecchromadb 的轻量向量存储实现,与 ContextCompacted 事件自动联动,让长期记忆真正开箱即用;

  6. Provider 扩展文档:目前只有代码,添加一份"如何实现自己的 Provider"的 COOKBOOK 文档,会极大降低社区贡献门槛。

总结:lovia 是一个思路清晰、代码质量较高的框架。它用极小的代码量覆盖了 Agent 框架最关键的功能点,设计决策大多有明确的 why,适合作为学习标本,也适合需要轻量可控 agent 基础设施的团队。主要缺口在于生态深度(Memory、Tracer 完整实现)和若干生产细节(并行 tool call、cancellation 传播)。