Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Agent 类内部原理

agents 库的核心是 Agent 类。你只需继承它,重写少数几个方法,就能免费获得状态管理、WebSockets、调度、RPC 等能力。本页一层一层地解释 Agent 是如何构建的,帮助你理解底层都发生了什么。

本页中的代码片段仅作示意,并不一定代表最佳实践。完整 API 请参阅 API reference源码 ↗

Agent 是什么?

Agent 类是 DurableObject 的扩展 —— agent 本质上就是 Durable Object。如果你不熟悉 Durable Objects,请先阅读 What are Durable Objects。简而言之,Durable Object 是全局可寻址的(每个实例都有唯一 ID)、单线程的、自带长期存储(KV 与 SQLite)的计算实例。

Agent 并不直接继承 DurableObject,而是继承 partyserver ↗ 包中的 Server,而 Server 才继承 DurableObject。可以把它看作分层结构:DurableObject > Server > Agent

Layer 0: Durable Object

我们先简单看看 Durable Objects 都暴露了哪些原语,这样才能理解外层是如何使用它们的。Durable Object 类提供:

constructor

TypeScript


constructor(ctx: DurableObjectState, env: Env) {}


Workers 运行时始终会在内部处理时调用构造函数。这意味着两点:

  1. 虽然每次 Durable Object 初始化都会调用构造函数,但它的签名是固定的。开发者不能在构造函数里增加或修改参数。
  2. 开发者不能手动实例化该类,而必须通过 binding API,经由 DurableObjectNamespace 完成。

RPC

只要你写的 Durable Object 类继承自内置的 DurableObject,其公开方法就会作为 RPC 方法暴露出来,开发者可以通过 Worker 中的 DurableObjectStub 来调用它们。

TypeScript


// This instance could've been active, hibernated,

// not initialized or maybe had never even been created!

const stub = env.MY_DO.getByName("foo");


// We can call any public method on the class. The runtime

// ensures the constructor is called if the instance was not active.

await stub.bar();


fetch()

Durable Object 可以接收 Worker 发来的 Request 并返回 Response。这只能通过开发者实现的 fetch 方法完成。

WebSockets

Durable Object 对 WebSockets 提供一流支持。它可以在 fetch 中接受由 Request 带来的 WebSocket,然后“忘掉“它。基类提供的方法可由开发者实现,以回调的形式被调用,从而无需事件监听器。

基类提供 webSocketMessage(ws, message)webSocketClose(ws, code, reason, wasClean)webSocketError(ws , error)(API)。

TypeScript


export class MyDurableObject extends DurableObject {

  async fetch(request) {

    // Creates two ends of a WebSocket connection.

    const webSocketPair = new WebSocketPair();

    const [client, server] = Object.values(webSocketPair);


    // Calling `acceptWebSocket()` connects the WebSocket to the Durable Object, allowing the WebSocket to send and receive messages.

    this.ctx.acceptWebSocket(server);


    return new Response(null, {

      status: 101,

      webSocket: client,

    });

  }


  async webSocketMessage(ws, message) {

    ws.send(message);

  }

}


Explain Code

alarm()

HTTP 与 RPC 请求并不是 Durable Object 的唯一入口。Alarm 让开发者可以安排一个事件在稍后触发。下一个 alarm 到期时,运行时会调用开发者实现的 alarm() 方法。

要安排 alarm,可以使用 this.ctx.storage.setAlarm()。详见 Alarms

this.ctx

DurableObject 基类把 DurableObjectState 设到 this.ctx。其上有许多有趣的方法与属性,我们重点关注 this.ctx.storage

this.ctx.storage

DurableObjectStorage 是与 Durable Object 持久化机制交互的主要接口,既包含 KV 也包含 SQLITE,且都是 同步 API。

TypeScript


const sql = this.ctx.storage.sql;


// Synchronous SQL query

const rows = sql.exec("SELECT * FROM contacts WHERE country = ?", "US");


// Key-value storage

const token = this.ctx.storage.get("someToken");


this.ctx.env

最后值得一提的是,Durable Object 也通过 this.env 暴露了 Worker 的 Env。详见 Bindings

Layer 1: Server(partyserver)

了解了 Durable Objects 自带的能力之后,partyserver ↗ 中的 Server 类就更容易理解了。它是一个有主见的 DurableObject 包装层,把底层原语替换成更易用的回调。

Server 自身不增加任何存储操作 —— 它只是包装了 Durable Object 的生命周期。

寻址

partyserver 提供按名称寻址 Durable Object 的辅助方法,而不必手动走 binding。其中包括 URL 路由方案(<your-worker>/servers/:durableClass/:durableName),Agent 层就建立在它之上。

TypeScript


// Note the await here!

const stub = await getServerByName(env.MY_DO, "foo");


// We can still call RPC methods.

await stub.bar();


这套 URL 方案也支持请求路由。在 Agent 层,它被重新导出为 routeAgentRequest:

TypeScript


  async fetch(request: Request, env: Env, ctx: ExecutionContext) {

    const res = await routeAgentRequest(request, env);


    if (res) return res;


    return new Response("Not found", { status: 404 });

  }


onStart

寻址层让 Server 可以暴露一个 onStart 回调,在 Durable Object 每次启动(从被驱逐、休眠或首次创建)且在任何 fetch 或 RPC 调用之前运行。

TypeScript


class MyServer extends Server {

  onStart() {

    // Some initialization logic that you wish

    // to run every time the DO is started up.

    const sql = this.ctx.storage.sql;

    sql.exec(`...`);

  }

}


onRequestonConnect

Server 已经为底层 Durable Object 实现了 fetch,并暴露了两个回调供开发者使用:onRequest 处理 HTTP 请求,onConnect 处理新进的 WS 连接(WebSocket 连接默认会被接受)。

TypeScript


class MyServer extends Server {

  async onRequest(request: Request) {

    const url = new URL(request.url);


    return new Response(`Hello from ${url.origin}!`);

  }


  async onConnect(conn, ctx) {

    const { request } = ctx;

    const url = new URL(request.url);


    // Connections are a WebSocket wrapper

    conn.send(`Hello from ${url.origin}!`);

  }

}


Explain Code

WebSockets

正如 onConnect 是每个新连接的回调,Server 也基于 DurableObject 默认回调提供了包装:onMessageonCloseonError

还有 this.broadcast,可以向所有已连接的客户端发送 WS 消息(没什么魔法,只是对 this.getConnections() 做了一个循环!)。

this.name

从 Durable Object 内部很难拿到它的 namepartyserver 尝试通过 this.name 提供它,但并不是完美的方案。详见这个 GitHub issue ↗

Layer 2: Agent

终于到了 Agent 类。Agent 继承 Server,为有状态、可调度、可观测的 agent 提供有主见的原语,这些 agent 可以通过 RPC、WebSockets,甚至(!)email 进行通信。

this.statethis.setState()

Agent 的核心特性之一是 状态自动持久化。开发者通过泛型参数与 initialState(仅在存储中尚无 state 时使用)定义状态的形状,Agent 会自动处理状态的加载、保存与广播(参见上文 Server 中的 this.broadcast())。

this.state 是一个 getter,会从存储(SQL)中惰性加载 state。当通过 this.setState() 更新时,state 会被自动序列化并写回存储,从而在 Durable Object 被驱逐之间保留下来。

还有 this.onStateChanged 可以重写,以响应状态变化。

TypeScript


class MyAgent extends Agent<Env, { count: number }> {

  initialState = { count: 0 };


  increment() {

    this.setState({ count: this.state.count + 1 });

  }


  onStateChanged(state, source) {

    console.log("State updated:", state);

  }

}


Explain Code

State 存储在 cf_agents_state SQL 表中。状态消息以 type: "cf_agent_state" 发送(从客户端到服务端,反向亦然)。由于 agents 提供了 JS 与 React 客户端,实时状态更新开箱即用。

this.sql

Agent 提供了一个便捷的 sql 模板标签,用于针对 Durable Object 的 SQL 存储执行查询。它会构造参数化查询并执行。底层使用的是 this.ctx.storage.sql 提供的 同步 SQL API。

TypeScript


class MyAgent extends Agent {

  onStart() {

    this.sql`

      CREATE TABLE IF NOT EXISTS users (

        id TEXT PRIMARY KEY,

        name TEXT

      )

    `;


    const userId = "1";

    const userName = "Alice";

    this.sql`INSERT INTO users (id, name) VALUES (${userId}, ${userName})`;


    const users = this.sql<{ id: string; name: string }>`

      SELECT * FROM users WHERE id = ${userId}

    `;

    console.log(users); // [{ id: "1", name: "Alice" }]

  }

}


Explain Code

RPC 与可调用方法

agents 把 Durable Objects 的 RPC 又往前推进了一步:通过 WebSocket 实现 RPC,客户端可以直接调用 Agent 上的方法。要让一个方法可通过 WebSocket 调用,使用 @callable() 装饰器。方法可以返回可序列化的值,或者一个流(使用 @callable({ stream: true }) 时)。

TypeScript


class MyAgent extends Agent {

  @callable({ description: "Add two numbers" })

  async add(a: number, b: number) {

    return a + b;

  }

}


客户端可以通过发送一条 WebSocket 消息来调用此方法:


{

  "type": "rpc",

  "id": "unique-request-id",

  "method": "add",

  "args": [2, 3]

}


例如,使用提供的 React 客户端非常简单:

TypeScript


const { stub } = useAgent({ name: "my-agent" });

const result = await stub.add(2, 3);

console.log(result); // 5


this.queue 及相关方法

Agent 内置了一个用于延迟执行的任务队列,可用于卸载工作或重试操作。可用方法包括:this.queuethis.dequeuethis.dequeueAllthis.dequeueAllByCallbackthis.getQueuethis.getQueues

TypeScript


class MyAgent extends Agent {

  async onConnect() {

    // Queue a task to be executed later

    await this.queue("processTask", { userId: "123" });

  }


  async processTask(payload: { userId: string }, queueItem: QueueItem) {

    console.log("Processing task for user:", payload.userId);

  }

}


Explain Code

任务存储在 cf_agents_queues SQL 表中,会按顺序自动 flush。如果任务执行成功,会自动出队。

this.schedule 及相关方法

Agent 通过包装 Durable Object 的 alarm() 来支持方法的定时执行。可用方法包括:this.schedulethis.getSchedulethis.getSchedulesthis.cancelSchedule。Schedule 可以是一次性、延迟,或重复(使用 cron 表达式)。

由于 Durable Object 一次只允许设置一个 alarm,Agent 类通过把多个 schedule 存在 SQL 中、只用一个 alarm 来绕开这个限制。

TypeScript


class MyAgent extends Agent {

  async foo() {

    // Schedule at a specific time

    await this.schedule(new Date("2025-12-25T00:00:00Z"), "sendGreeting", {

      message: "Merry Christmas!",

    });


    // Schedule with a delay (in seconds)

    await this.schedule(60, "checkStatus", { check: "health" });


    // Schedule with a cron expression

    await this.schedule("0 0 * * *", "dailyTask", { type: "cleanup" });

  }


  async sendGreeting(payload: { message: string }) {

    console.log(payload.message);

  }


  async checkStatus(payload: { check: string }) {

    console.log("Running check:", payload.check);

  }


  async dailyTask(payload: { type: string }) {

    console.log("Daily task:", payload.type);

  }

}


Explain Code

Schedule 存储在 cf_agents_schedules SQL 表中。Cron 类型的 schedule 在执行后会自动重新调度,而一次性 schedule 在执行后会被删除。

this.mcp 及相关方法

Agent 内置了一个多服务器 MCP 客户端,使你的 Agent 能够与暴露 MCP 接口的外部服务交互。MCP 客户端的完整文档见 MCP client API

TypeScript


class MyAgent extends Agent {

  async onStart() {

    // Add an HTTP MCP server (callbackHost only needed for OAuth servers)

    await this.addMcpServer("GitHub", "https://mcp.github.com/mcp", {

      callbackHost: "https://my-worker.example.workers.dev",

    });


    // Add an MCP server via RPC (Durable Object binding, no HTTP overhead)

    await this.addMcpServer("internal-tools", this.env.MyMCP);

  }

}


Explain Code

邮件处理

Agent 可以使用 Cloudflare 的 Email Routing 接收并回复邮件。

TypeScript


class MyAgent extends Agent {

  async onEmail(email: AgentEmail) {

    console.log("Received email from:", email.from);

    console.log("Subject:", email.headers.get("subject"));


    const raw = await email.getRaw();

    console.log("Raw email size:", raw.length);


    // Reply to the email

    await this.replyToEmail(email, {

      fromName: "My Agent",

      subject: "Re: " + email.headers.get("subject"),

      body: "Thanks for your email!",

      contentType: "text/plain",

    });

  }

}


Explain Code

要把邮件路由到你的 Agent,在 Worker 的 email handler 中使用 routeAgentEmail:

TypeScript


export default {

  async email(message, env, ctx) {

    await routeAgentEmail(message, env, {

      resolver: createAddressBasedEmailResolver("my-agent"),

    });

  },

} satisfies ExportedHandler<Env>;


上下文管理

agentsAsyncLocalStorage 包装你的所有方法,以在请求生命周期内维护上下文。这样你可以在代码任何位置访问当前的 agent、connection、request 或 email(取决于正在处理的事件):

TypeScript


import { getCurrentAgent } from "agents";


function someUtilityFunction() {

  const { agent, connection, request, email } = getCurrentAgent();


  if (agent) {

    console.log("Current agent:", agent.name);

  }


  if (connection) {

    console.log("WebSocket connection ID:", connection.id);

  }

}


Explain Code

this.onError

Agent 扩展了 ServeronError,因此可用于处理不一定是 WebSocket 错误的情况。它的参数可能是 Connectionunknown 错误。

TypeScript


class MyAgent extends Agent {

  onError(connectionOrError: Connection | unknown, error?: unknown) {

    if (error) {

      // WebSocket connection error

      console.error("Connection error:", error);

    } else {

      // Server error

      console.error("Server error:", connectionOrError);

    }


    // Optionally throw to propagate the error

    throw connectionOrError;

  }

}


Explain Code

this.destroy

this.destroy() 会删除所有表、清除 alarm、清空存储,并中止上下文。为了确保 Durable Object 完全被驱逐,this.ctx.abort() 会通过 setTimeout() 异步调用,以便让任何正在执行的 handler(如计划任务)先完成清理。

这意味着 this.ctx.abort() 会抛出一个不可捕获的错误,会出现在你的日志里,但它是在让出事件循环后才抛出的(详见 abort())。

destroy() 方法可以安全地在计划任务中调用。当从一个 schedule 回调中调用时,Agent 会设置一个内部标志以跳过任何剩余的数据库更新,并把 ctx.abort() 让给事件循环,确保 alarm handler 在 Agent 被驱逐前能干净地结束。

TypeScript


class MyAgent extends Agent {

  async onStart() {

    console.log("Agent is starting up...");

    // Initialize your agent

  }


  async cleanup() {

    // This wipes everything!

    await this.destroy();

  }


  async selfDestruct() {

    // Safe to call from within a scheduled task

    await this.schedule(60, "destroyAfterDelay", {});

  }


  async destroyAfterDelay() {

    // This will safely destroy the Agent even when

    // called from within the alarm handler

    await this.destroy();

  }

}


Explain Code

在计划任务中使用 destroy()

你可以安全地从一个计划任务回调内部调用 this.destroy()。Agent SDK 会设置一个内部标志,防止销毁后还进行数据库更新,并将上下文的中止延后,以确保 alarm handler 干净地结束。

static options

通过在你的类上重写 static options 来配置 agent 行为。所有字段都是可选的 —— 默认值在运行时会被应用。

TypeScript


export class MyAgent extends Agent {

  static options = {

    hibernate: true,

    sendIdentityOnConnect: false,

    retry: { maxAttempts: 5, baseDelayMs: 200, maxDelayMs: 5000 },

  };

}


选项类型默认值描述
hibernatebooleantrueagent 在不活跃时是否休眠。Durable Object 休眠时 WebSocket 连接仍然保持
sendIdentityOnConnectbooleantrue在 WebSocket 连接时向客户端发送身份(agent 名、实例名)。设为 false 可隐藏敏感的实例名
hungScheduleTimeoutSecondsnumber30一个运行中的间隔 schedule 在被认为“卡住“并强制重置之前的超时时间。回调耗时较长时可调大
keepAliveIntervalMsnumber30000keepAlive() alarm 心跳的毫秒间隔。值越小恢复越快,但 alarm 频次越高
retryRetryOptions{ maxAttempts: 3, baseDelayMs: 100, maxDelayMs: 3000 }schedule()、queue() 与 this.retry() 的默认重试选项。每个任务可单独覆盖这些默认值

this.keepAlive()this.keepAliveWhile()

Durable Object 在不活跃一段时间后会被驱逐(通常在没有任何请求、WebSocket 消息或 alarm 的情况下 70 到 140 秒)。在长时间运行的操作中 —— 比如流式 LLM 响应、等待外部 API、运行多步计算 —— Agent 可能在中途就被驱逐。

keepAlive() 创建一个 alarm 心跳来防止驱逐。keepAliveWhile() 包装一个异步函数,并保证清理。

TypeScript


class MyAgent extends Agent {

  async handleLongTask() {

    // Option 1: manual dispose

    const dispose = await this.keepAlive();

    try {

      await longRunningComputation();

    } finally {

      dispose();

    }


    // Option 2: automatic cleanup (recommended)

    const result = await this.keepAliveWhile(async () => {

      return await longRunningComputation();

    });

  }

}


Explain Code

AIChatAgent 内部使用 keepAliveWhile 在流式 LLM 响应期间保持 agent 存活。详见 Schedule tasks — Keeping the agent alive

路由

Agent 类把 寻址辅助方法 重新导出为 getAgentByNamerouteAgentRequest

TypeScript


const stub = await getAgentByName(env.MY_DO, "foo");

await stub.someMethod();


const res = await routeAgentRequest(request, env);

if (res) return res;


return new Response("Not found", { status: 404 });


Layer 3: AIChatAgent

@cloudflare/ai-chat 中的 AIChatAgent 类在 Agent 之上又叠加了一层有主见的 AI 聊天能力。它增加了消息自动持久化到 SQLite、可恢复流式传输、工具支持(服务端、客户端和 human-in-the-loop),以及一个用于构建聊天 UI 的 React hook(useAgentChat)。

完整层级结构是:DurableObject > Server > Agent > AIChatAgent

如果你在构建聊天 agent,从 AIChatAgent 开始。如果你需要更底层的控制,或者并不在做聊天界面,直接使用 Agent