持久化执行 (Durable execution)
运行可在 Durable Object 被驱逐后存活的工作。runFiber() 会在 SQLite 中注册一个任务,在执行期间保持 Agent 活跃,允许你通过 stash() 检查点中间状态,并在 Agent 在任务执行中途被驱逐时,在下次激活时调用 onFiberRecovered()。
Note
关于 fiber 如何融入构建可运行数周乃至数月的 Agent 的全局图景,请参阅 长期运行的 Agent。
快速开始
TypeScript
import { Agent } from "agents";
import type { FiberRecoveryContext } from "agents";
class MyAgent extends Agent {
async doWork() {
await this.runFiber("my-task", async (ctx) => {
const step1 = await expensiveOperation();
ctx.stash({ step1 });
const step2 = await anotherExpensiveOperation(step1);
this.setState({ ...this.state, result: step2 });
});
}
async onFiberRecovered(ctx: FiberRecoveryContext) {
if (ctx.name !== "my-task") return;
const snapshot = ctx.snapshot as { step1: unknown } | null;
if (snapshot) {
const step2 = await anotherExpensiveOperation(snapshot.step1);
this.setState({ ...this.state, result: step2 });
}
}
}
Explain Code
为什么需要 fiber
Durable Object 会因为以下三种原因被驱逐:
- 不活动超时 — 大约 70–140 秒没有传入请求或开放的 WebSocket
- 代码更新 / 运行时重启 — 不确定的时间,每天 1–2 次
- Alarm handler 超时 — 15 分钟
当驱逐发生在工作进行中时,与上游(LLM provider、API、数据库)的 HTTP 连接会被永久切断。内存中的状态——流式缓冲区、部分响应、循环计数器——都会丢失。多轮 Agent 循环也会完全失去其执行位置。
keepAlive() 减少被驱逐的可能性。runFiber() 让驱逐变得可恢复。
如果你的工作应该独立于 Agent 运行,并具有逐步重试和多步编排能力,请改用 Workflows。Fiber 用于属于 Agent 自身执行的一部分的工作。请参阅 长期运行的 Agent:Workflows 与 agent-internal 模式 以进行对比。
keepAlive
通过创建一个 30 秒的 alarm 心跳来重置不活动计时器,从而防止空闲被驱逐。
TypeScript
class Agent {
keepAlive(): Promise<() => void>;
keepAliveWhile<T>(fn: () => Promise<T>): Promise<T>;
}
keepAliveWhile() 是推荐用法——它运行一个 async 函数,并在其完成或抛出时自动清理心跳:
TypeScript
const result = await this.keepAliveWhile(async () => {
return await slowAPICall();
});
如需手动控制,keepAlive() 返回一个 disposer。结束时务必调用它——否则心跳会无限期持续:
TypeScript
const dispose = await this.keepAlive();
try {
await longWork();
} finally {
dispose();
}
工作原理
只要持有任何 keepAlive 引用,就会每 30 秒触发一次 alarm 来重置不活动计时器。当所有 disposer 都被调用后,alarm 停止,DO 可以自然进入空闲。
心跳对 getSchedules() 是不可见的——不会创建任何 schedule 行。它不会与你自己的 schedule 冲突;alarm 系统通过单一的 alarm 槽位复用所有 schedule 与 keepAlive 心跳。
可配置的间隔
默认值:30 秒。不活动超时大约为 70–140 秒,因此 30 秒提供了充足的缓冲。可通过静态选项覆盖:
TypeScript
class MyAgent extends Agent {
static options = { keepAliveIntervalMs: 2_000 };
}
何时使用 keepAlive vs runFiber
keepAlive 防止驱逐,但对恢复无能为力。如果 Agent 即使有心跳仍被驱逐(代码更新、alarm 超时、资源限制),所有进行中的工作都会丢失。
runFiber 内部会调用 keepAlive,并 将工作持久化到 SQLite,以便可恢复。当工作很容易重做或不需要检查点时,单独使用 keepAlive。当工作昂贵且你需要从中断处恢复时,使用 runFiber。
| 场景 | 使用 |
|---|---|
| 等待一次较慢的 API 调用 | keepAlive() |
| 流式传输 LLM 响应(通过 AIChatAgent) | 自动(内置) |
| 带中间结果的多步计算 | runFiber() |
| 需要 10+ 分钟的后台研究循环 | runFiber() 配合 stash() |
runFiber
带检查点和恢复机制的持久化执行。
TypeScript
class Agent {
runFiber<T>(name: string, fn: (ctx: FiberContext) => Promise<T>): Promise<T>;
stash(data: unknown): void;
onFiberRecovered(ctx: FiberRecoveryContext): Promise<void>;
}
type FiberContext = {
id: string;
stash(data: unknown): void;
snapshot: unknown | null;
};
type FiberRecoveryContext = {
id: string;
name: string;
snapshot: unknown | null;
};
Explain Code
生命周期
正常执行
runFiber("work", fn)
├─ INSERT row into cf_agents_runs
├─ keepAlive() — heartbeat starts
├─ Execute fn(ctx)
│ ├─ ctx.stash(data) → UPDATE snapshot in SQLite
│ ├─ ctx.stash(data) → UPDATE snapshot in SQLite
│ └─ return result
├─ DELETE row from cf_agents_runs
├─ keepAlive dispose — heartbeat stops
└─ Return result to caller
Explain Code
驱逐与恢复
[DO evicted — all in-memory state lost]
On next activation:
├─ Request/connection → onStart() → check for orphaned fibers [primary path]
│ OR
├─ Persisted alarm fires → housekeeping check [fallback path]
Recovery:
├─ SELECT * FROM cf_agents_runs
├─ For each orphaned row:
│ ├─ Parse snapshot from JSON
│ ├─ Call onFiberRecovered(ctx)
│ └─ DELETE the row
└─ If onFiberRecovered calls runFiber() again → new row, normal execution
Explain Code
两种恢复路径都会调用同一个 hook。alarm 路径对没有传入客户端连接的后台 Agent 至关重要——持久化的 alarm 会自行唤醒 Agent。
执行期间出错
fn(ctx) throws Error
├─ DELETE row from cf_agents_runs
├─ keepAlive dispose
└─ Error propagates to caller (or logged if fire-and-forget)
不会自动重试。恢复逻辑应放在 onFiberRecovered 中,此时你拥有 snapshot 和关于错误的完整上下文。
内联 vs fire-and-forget
runFiber() 同时支持两种模式:
TypeScript
// Inline — await the result
const result = await this.runFiber("work", async (ctx) => {
return computeExpensiveThing();
});
// Fire-and-forget — caller does not wait
void this.runFiber("background", async (ctx) => {
await longRunningProcess();
});
如果在内联 await 期间 DO 被驱逐,调用者已经消失。在恢复时,onFiberRecovered 触发——但它无法把结果返回给原始调用者。这是跨进程边界的持久化执行所固有的限制。对可能超出单个 DO 生命周期的长期运行工作来说,fire-and-forget 配合 checkpoint/recovery 是更安全的模式。
使用 stash 进行检查点
ctx.stash(data) 同步 写入 SQLite。在“我决定保存“和“它已被保存“之间不存在异步间隙。如果驱逐发生在 stash() 返回之后,数据保证已经在 SQLite 中。
每次调用都会完全替换之前的 snapshot——这不是合并。请写入你需要的完整恢复状态:
TypeScript
await this.runFiber("research", async (ctx) => {
const steps = ["search", "analyze", "synthesize"];
const completed: string[] = [];
const results: Record<string, unknown> = {};
for (const step of steps) {
results[step] = await executeStep(step);
completed.push(step);
ctx.stash({
completed,
results,
pendingSteps: steps.slice(completed.length)
});
}
});
Explain Code
this.stash vs ctx.stash
两者作用相同。ctx.stash() 通过对 fiber ID 的直接闭包工作。this.stash() 使用 AsyncLocalStorage 找到当前正在执行的 fiber——即使在并发 fiber 中也能正确工作,因为每个 fiber 的 ALS 上下文是独立的。
this.stash() 在从无法访问 ctx 的嵌套函数中调用时很方便。如果在 runFiber 回调之外调用,它会抛出异常。
恢复
重写 onFiberRecovered 来处理被中断的 fiber。默认实现会记录一条警告并删除该行。
TypeScript
class ResearchAgent extends Agent {
async onFiberRecovered(ctx: FiberRecoveryContext) {
if (ctx.name !== "research") return;
const snapshot = ctx.snapshot as {
completed: string[];
results: Record<string, unknown>;
pendingSteps: string[];
} | null;
if (snapshot && snapshot.pendingSteps.length > 0) {
void this.runFiber("research", async (fiberCtx) => {
const { completed, results, pendingSteps } = snapshot;
for (const step of pendingSteps) {
results[step] = await this.executeStep(step);
completed.push(step);
fiberCtx.stash({
completed,
results,
pendingSteps: pendingSteps.slice(pendingSteps.indexOf(step) + 1)
});
}
});
}
}
}
Explain Code
要点:
- 原始的 lambda 已经消失。 在恢复时,你只能拿到
name和snapshot。lambda 无法被序列化——恢复逻辑必须放在 hook 中。 - 该行在 hook 运行后被删除。 如果你想继续工作,在 hook 内再次调用
runFiber()——这会创建一个新的行。 - 你可以决定恢复意味着什么。 从头重试、从检查点继续、跳过并通知用户、或者什么都不做。框架不强制特定策略。
- 如果 hook 抛出异常,该行仍会被删除。 不会有第二次恢复机会。如果你的恢复逻辑可能失败,请捕获错误并处理(例如安排重试、记录日志,或重新创建 fiber)。
聊天恢复
AIChatAgent 在 fiber 之上构建了 LLM 流恢复。当启用 chatRecovery 时,每个聊天回合都会自动包装在一个 fiber 中。框架处理内部恢复路径,并暴露 onChatRecovery 用于针对特定 provider 的策略。详情请参阅 长期运行的 Agent:恢复中断的 LLM 流。
并发 fiber
多个 fiber 可以同时运行。每个在 SQLite 中都有自己的行和自己的 snapshot,并各自独立调用 keepAlive()(引用计数,因此 DO 在所有 fiber 完成前都保持活跃)。
TypeScript
void this.runFiber("fetch-data", async (ctx) => {
/* ... */
});
void this.runFiber("process-queue", async (ctx) => {
/* ... */
});
在恢复时,所有遗留行都会被遍历,并对每一个调用 onFiberRecovered。在恢复 hook 中使用 ctx.name 区分不同的 fiber 类型。
本地测试
在 wrangler dev 中,fiber 恢复与生产环境的工作方式完全相同。SQLite 和 alarm 状态在重启之间持久保存到磁盘。
- 启动你的 Agent 并触发一个 fiber (
runFiber) - Kill wrangler 进程(Ctrl-C 或 SIGKILL)
- 重启 wrangler
- 恢复会自动触发——若有请求到达则通过
onStart(),若无客户端连接则通过持久化的 alarm
API 参考
runFiber(name, fn)
执行一个持久化的 fiber。在 fn 运行前,fiber 已注册到 SQLite,完成(或抛出)后被删除。keepAlive() 在整个过程中保持。
name— fiber 的标识符,用于在onFiberRecovered中区分 fiber 类型。不要求唯一——多个 fiber 可以共享一个 name。fn— 接收FiberContext的 async 函数。闭包自然地工作(this和局部变量被捕获)。- 返回值 —
fn返回的值。如果 DO 在完成前被驱逐,返回值会丢失;恢复通过 hook 进行。
stash(data) / ctx.stash(data)
检查点当前 fiber 的状态。同步写入 SQLite。每次调用都会完全替换之前的 snapshot。data 必须可 JSON 序列化。
onFiberRecovered(ctx)
在 Agent 重启时,每个遗留 fiber 行都会调用一次。重写以实现恢复。该行在此 hook 返回后被删除。
ctx.id— 唯一的 fiber IDctx.name— 传给runFiber()的 namectx.snapshot— 最后一次stash()的数据,如果从未调用过stash()则为null
keepAlive()
创建一个 30 秒的 alarm 心跳。返回一个 disposer 函数。幂等——多次调用 disposer 是安全的。
keepAliveWhile(fn)
在保持 DO 活跃的同时运行一个 async 函数。心跳在 fn 之前开始,在其完成或抛出时停止。返回 fn 返回的值。
相关资源
- 长期运行的 Agent — fiber 如何与 schedule、plan 和异步操作组合
- 调度任务 —
keepAlive详情和 alarm 系统 - Workflows — Agent 之外的持久化多步执行
- 聊天 Agent —
chatRecovery与onChatRecovery