Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

WebSockets

Agent 支持 WebSocket 连接以实现实时双向通信。本页介绍服务端 WebSocket 处理。客户端连接请参阅 Client SDK

生命周期 hook

Agent 有多个在不同时刻触发的生命周期 hook:

Hook何时调用
onStart(props?)Agent 首次启动时调用一次(在任何连接之前)
onRequest(request)收到 HTTP 请求时(非 WebSocket)
onConnect(connection, ctx)建立新的 WebSocket 连接时
onMessage(connection, message)收到 WebSocket 消息时
onClose(connection, code, reason, wasClean)WebSocket 连接关闭时
onError(connection, error)某连接上发生 WebSocket 错误时
onError(error)发生服务端级别错误时(不绑定特定连接)
shouldSendProtocolMessages(connection, ctx)是否向该连接发送协议消息(身份、状态、MCP)。默认值:true

onStart

onStart() 在 Agent 首次启动时调用一次,在任何连接建立之前:

JavaScript


export class MyAgent extends Agent {

  async onStart() {

    // Initialize resources

    console.log(`Agent ${this.name} starting...`);


    // Load data from storage

    const savedData = this.sql`SELECT * FROM cache`;

    for (const row of savedData) {

      // Rebuild in-memory state from persistent storage

    }

  }


  onConnect(connection) {

    // By the time connections arrive, onStart has completed

  }

}


Explain Code

TypeScript


export class MyAgent extends Agent {

  async onStart() {

    // Initialize resources

    console.log(`Agent ${this.name} starting...`);


    // Load data from storage

    const savedData = this.sql`SELECT * FROM cache`;

    for (const row of savedData) {

      // Rebuild in-memory state from persistent storage

    }

  }


  onConnect(connection: Connection) {

    // By the time connections arrive, onStart has completed

  }

}


Explain Code

处理连接

在 Agent 上定义 onConnectonMessage 方法以接受 WebSocket 连接:

JavaScript


import { Agent, Connection, ConnectionContext, WSMessage } from "agents";


export class ChatAgent extends Agent {

  async onConnect(connection, ctx) {

    // Connections are automatically accepted

    // Access the original request for auth, headers, cookies

    const url = new URL(ctx.request.url);

    const token = url.searchParams.get("token");


    if (!token) {

      connection.close(4001, "Unauthorized");

      return;

    }


    // Store user info on this connection

    connection.setState({ authenticated: true });

  }


  async onMessage(connection, message) {

    if (typeof message === "string") {

      // Handle text message

      const data = JSON.parse(message);

      connection.send(JSON.stringify({ received: data }));

    }

  }

}


Explain Code

TypeScript


import { Agent, Connection, ConnectionContext, WSMessage } from "agents";


export class ChatAgent extends Agent {

  async onConnect(connection: Connection, ctx: ConnectionContext) {

    // Connections are automatically accepted

    // Access the original request for auth, headers, cookies

    const url = new URL(ctx.request.url);

    const token = url.searchParams.get("token");


    if (!token) {

      connection.close(4001, "Unauthorized");

      return;

    }


    // Store user info on this connection

    connection.setState({ authenticated: true });

  }


  async onMessage(connection: Connection, message: WSMessage) {

    if (typeof message === "string") {

      // Handle text message

      const data = JSON.parse(message);

      connection.send(JSON.stringify({ received: data }));

    }

  }

}


Explain Code

Connection 对象

每个连接的客户端都有一个唯一的 Connection 对象:

属性/方法类型描述
idstring该连接的唯一标识符
uristring | null原始 WebSocket 升级请求的 URL。在休眠后仍然存在
stateState每连接的状态对象
setState(state)void更新连接状态
send(message)void向该客户端发送消息
close(code?, reason?)void关闭连接
tagsreadonly string[]通过 getConnectionTags 分配的标签。第一个标签始终是连接 ID
serverstringAgent 实例名称(等同于 Agent 上的 this.name)

每连接的状态

存储每个连接独有的数据(用户信息、偏好等):

JavaScript


export class ChatAgent extends Agent {

  async onConnect(connection, ctx) {

    const userId = new URL(ctx.request.url).searchParams.get("userId");


    connection.setState({

      userId: userId || "anonymous",

      role: "user",

      joinedAt: Date.now(),

    });

  }


  async onMessage(connection, message) {

    // Access connection-specific state

    console.log(`Message from ${connection.state.userId}`);

  }

}


Explain Code

TypeScript


interface ConnectionState {

  userId: string;

  role: "admin" | "user";

  joinedAt: number;

}


export class ChatAgent extends Agent {

  async onConnect(

    connection: Connection<ConnectionState>,

    ctx: ConnectionContext,

  ) {

    const userId = new URL(ctx.request.url).searchParams.get("userId");


    connection.setState({

      userId: userId || "anonymous",

      role: "user",

      joinedAt: Date.now(),

    });

  }


  async onMessage(connection: Connection<ConnectionState>, message: WSMessage) {

    // Access connection-specific state

    console.log(`Message from ${connection.state.userId}`);

  }

}


Explain Code

向所有客户端广播

使用 this.broadcast() 向所有连接的客户端发送消息:

JavaScript


export class ChatAgent extends Agent {

  async onMessage(connection, message) {

    // Broadcast to all connected clients

    this.broadcast(

      JSON.stringify({

        from: connection.id,

        message: message,

        timestamp: Date.now(),

      }),

    );

  }


  // Broadcast from any method

  async notifyAll(event, data) {

    this.broadcast(JSON.stringify({ event, data }));

  }

}


Explain Code

TypeScript


export class ChatAgent extends Agent {

  async onMessage(connection: Connection, message: WSMessage) {

    // Broadcast to all connected clients

    this.broadcast(

      JSON.stringify({

        from: connection.id,

        message: message,

        timestamp: Date.now(),

      }),

    );

  }


  // Broadcast from any method

  async notifyAll(event: string, data: unknown) {

    this.broadcast(JSON.stringify({ event, data }));

  }

}


Explain Code

排除某些连接

传入连接 ID 数组,以从广播中排除它们:

JavaScript


// Broadcast to everyone except the sender

this.broadcast(

  JSON.stringify({ type: "user-typing", userId: "123" }),

  [connection.id], // Do not send to the originator

);


TypeScript


// Broadcast to everyone except the sender

this.broadcast(

  JSON.stringify({ type: "user-typing", userId: "123" }),

  [connection.id], // Do not send to the originator

);


连接标签

为连接打标签以便过滤。重写 getConnectionTags(),在建立连接时分配标签:

JavaScript


export class ChatAgent extends Agent {

  getConnectionTags(connection, ctx) {

    const url = new URL(ctx.request.url);

    const role = url.searchParams.get("role");


    const tags = [];

    if (role === "admin") tags.push("admin");

    if (role === "moderator") tags.push("moderator");


    return tags; // Up to 9 tags, max 256 chars each

  }


  // Later, broadcast only to admins

  notifyAdmins(message) {

    for (const conn of this.getConnections("admin")) {

      conn.send(message);

    }

  }

}


Explain Code

TypeScript


export class ChatAgent extends Agent {

  getConnectionTags(connection: Connection, ctx: ConnectionContext): string[] {

    const url = new URL(ctx.request.url);

    const role = url.searchParams.get("role");


    const tags: string[] = [];

    if (role === "admin") tags.push("admin");

    if (role === "moderator") tags.push("moderator");


    return tags; // Up to 9 tags, max 256 chars each

  }


  // Later, broadcast only to admins

  notifyAdmins(message: string) {

    for (const conn of this.getConnections("admin")) {

      conn.send(message);

    }

  }

}


Explain Code

连接管理方法

方法签名描述
getConnections(tag?: string) => Iterable<Connection>获取所有连接,可按 tag 过滤
getConnection(id: string) => Connection | undefined按 ID 获取连接
getConnectionTags(connection, ctx) => string[]重写以为连接打标签
broadcast(message, without?: string[]) => void向所有连接发送
isConnectionReadonly(connection) => boolean检查连接是否只读
isConnectionProtocolEnabled(connection) => boolean检查该连接是否启用了协议消息

处理二进制数据

消息可以是字符串或二进制(ArrayBuffer / ArrayBufferView):

JavaScript


export class FileAgent extends Agent {

  async onMessage(connection, message) {

    if (message instanceof ArrayBuffer) {

      // Handle binary upload

      const bytes = new Uint8Array(message);

      await this.processFile(bytes);

      connection.send(

        JSON.stringify({ status: "received", size: bytes.length }),

      );

    } else if (typeof message === "string") {

      // Handle text command

      const command = JSON.parse(message);

      // ...

    }

  }

}


Explain Code

TypeScript


export class FileAgent extends Agent {

  async onMessage(connection: Connection, message: WSMessage) {

    if (message instanceof ArrayBuffer) {

      // Handle binary upload

      const bytes = new Uint8Array(message);

      await this.processFile(bytes);

      connection.send(

        JSON.stringify({ status: "received", size: bytes.length }),

      );

    } else if (typeof message === "string") {

      // Handle text command

      const command = JSON.parse(message);

      // ...

    }

  }

}


Explain Code

Note

Agent 会自动向每个连接发送 JSON 文本帧(身份、状态、MCP 服务器)。如果你的客户端只处理二进制数据且无法处理这些帧,使用 shouldSendProtocolMessages 来抑制它们。

错误与关闭处理

处理连接错误和断开。onError 方法有两个重载——一个用于 WebSocket 连接错误,一个用于服务端级别错误:

JavaScript


export class ChatAgent extends Agent {

  // WebSocket connection error


  // Server-level error (not tied to a specific connection)


  onError(connectionOrError, error) {

    if (error) {

      console.error(`Connection ${connectionOrError.id} error:`, error);

    } else {

      console.error("Server error:", connectionOrError);

    }

  }


  async onClose(connection, code, reason, wasClean) {

    console.log(`Connection ${connection.id} closed: ${code} ${reason}`);


    this.broadcast(

      JSON.stringify({

        event: "user-left",

        userId: connection.state?.userId,

      }),

    );

  }

}


Explain Code

TypeScript


export class ChatAgent extends Agent {

  // WebSocket connection error

  onError(connection: Connection, error: unknown): void;

  // Server-level error (not tied to a specific connection)

  onError(error: unknown): void;

  onError(connectionOrError: Connection | unknown, error?: unknown) {

    if (error) {

      console.error(

        `Connection ${(connectionOrError as Connection).id} error:`,

        error,

      );

    } else {

      console.error("Server error:", connectionOrError);

    }

  }


  async onClose(

    connection: Connection,

    code: number,

    reason: string,

    wasClean: boolean,

  ) {

    console.log(`Connection ${connection.id} closed: ${code} ${reason}`);


    this.broadcast(

      JSON.stringify({

        event: "user-left",

        userId: connection.state?.userId,

      }),

    );

  }

}


Explain Code

默认 onError 实现会记录错误并重新抛出。重写它以添加自定义错误处理、上报或恢复逻辑。

消息类型

类型描述
string文本消息(通常是 JSON)
ArrayBuffer二进制数据
ArrayBufferView二进制数据的类型化数组视图

休眠 (Hibernation)

Agent 支持休眠——它们可以在不活动时休眠,在需要时唤醒。这在保持 WebSocket 连接的同时节省了资源。

启用休眠

休眠默认启用。要禁用:

JavaScript


export class AlwaysOnAgent extends Agent {

  static options = { hibernate: false };

}


TypeScript


export class AlwaysOnAgent extends Agent {

  static options = { hibernate: false };

}


休眠如何工作

  1. Agent 处于活跃状态,处理连接
  2. 经过一段时间没有消息后,Agent 进入休眠(睡眠)
  3. WebSocket 连接保持打开(由 Cloudflare 处理)
  4. 当消息到达时,Agent 唤醒
  5. onMessage 像往常一样被调用

休眠后哪些会保留

保留不保留
this.state(Agent 状态)内存中的变量
connection.state定时器/intervals
SQLite 数据(this.sql)进行中的 Promise
连接元数据本地缓存

将重要数据存储在 this.state 或 SQLite 中,而不是类属性中:

JavaScript


export class MyAgent extends Agent {

  initialState = { counter: 0 };


  // Do not do this - lost on hibernation

  localCounter = 0;


  onMessage(connection, message) {

    // Persists across hibernation

    this.setState({ counter: this.state.counter + 1 });


    // Lost after hibernation

    this.localCounter++;

  }

}


Explain Code

TypeScript


export class MyAgent extends Agent<Env, { counter: number }> {

  initialState = { counter: 0 };


  // Do not do this - lost on hibernation

  private localCounter = 0;


  onMessage(connection: Connection, message: WSMessage) {

    // Persists across hibernation

    this.setState({ counter: this.state.counter + 1 });


    // Lost after hibernation

    this.localCounter++;

  }

}


Explain Code

常见模式

在线状态跟踪

使用每连接的状态跟踪谁在线。连接状态在用户断开时会自动清理:

JavaScript


export class PresenceAgent extends Agent {

  onConnect(connection, ctx) {

    const url = new URL(ctx.request.url);

    const name = url.searchParams.get("name") || "Anonymous";


    connection.setState({

      name,

      joinedAt: Date.now(),

      lastSeen: Date.now(),

    });


    // Send current presence to new user

    connection.send(

      JSON.stringify({

        type: "presence",

        users: this.getPresence(),

      }),

    );


    // Notify others that someone joined

    this.broadcastPresence();

  }


  onClose(connection) {

    // No manual cleanup needed - connection state is automatically gone

    this.broadcastPresence();

  }


  onMessage(connection, message) {

    if (message === "ping") {

      connection.setState((prev) => ({

        ...prev,

        lastSeen: Date.now(),

      }));

      connection.send("pong");

    }

  }


  getPresence() {

    const users = {};

    for (const conn of this.getConnections()) {

      if (conn.state) {

        users[conn.id] = {

          name: conn.state.name,

          lastSeen: conn.state.lastSeen,

        };

      }

    }

    return users;

  }


  broadcastPresence() {

    this.broadcast(

      JSON.stringify({

        type: "presence",

        users: this.getPresence(),

      }),

    );

  }

}


Explain Code

TypeScript


type UserState = {

  name: string;

  joinedAt: number;

  lastSeen: number;

};


export class PresenceAgent extends Agent {

  onConnect(connection: Connection<UserState>, ctx: ConnectionContext) {

    const url = new URL(ctx.request.url);

    const name = url.searchParams.get("name") || "Anonymous";


    connection.setState({

      name,

      joinedAt: Date.now(),

      lastSeen: Date.now(),

    });


    // Send current presence to new user

    connection.send(

      JSON.stringify({

        type: "presence",

        users: this.getPresence(),

      }),

    );


    // Notify others that someone joined

    this.broadcastPresence();

  }


  onClose(connection: Connection) {

    // No manual cleanup needed - connection state is automatically gone

    this.broadcastPresence();

  }


  onMessage(connection: Connection<UserState>, message: WSMessage) {

    if (message === "ping") {

      connection.setState((prev) => ({

        ...prev!,

        lastSeen: Date.now(),

      }));

      connection.send("pong");

    }

  }


  private getPresence() {

    const users: Record<string, { name: string; lastSeen: number }> = {};

    for (const conn of this.getConnections<UserState>()) {

      if (conn.state) {

        users[conn.id] = {

          name: conn.state.name,

          lastSeen: conn.state.lastSeen,

        };

      }

    }

    return users;

  }


  private broadcastPresence() {

    this.broadcast(

      JSON.stringify({

        type: "presence",

        users: this.getPresence(),

      }),

    );

  }

}


Explain Code

带广播的聊天室

JavaScript


export class ChatRoom extends Agent {

  onConnect(connection, ctx) {

    const url = new URL(ctx.request.url);

    const username = url.searchParams.get("username") || "Anonymous";


    connection.setState({ username });


    // Notify others

    this.broadcast(

      JSON.stringify({

        type: "join",

        user: username,

        timestamp: Date.now(),

      }),

      [connection.id], // Do not send to the joining user

    );

  }


  onMessage(connection, message) {

    if (typeof message !== "string") return;


    const { username } = connection.state;


    this.broadcast(

      JSON.stringify({

        type: "message",

        user: username,

        text: message,

        timestamp: Date.now(),

      }),

    );

  }


  onClose(connection) {

    const { username } = connection.state || {};

    if (username) {

      this.broadcast(

        JSON.stringify({

          type: "leave",

          user: username,

          timestamp: Date.now(),

        }),

      );

    }

  }

}


Explain Code

TypeScript


type Message = {

  type: "message" | "join" | "leave";

  user: string;

  text?: string;

  timestamp: number;

};


export class ChatRoom extends Agent {

  onConnect(connection: Connection, ctx: ConnectionContext) {

    const url = new URL(ctx.request.url);

    const username = url.searchParams.get("username") || "Anonymous";


    connection.setState({ username });


    // Notify others

    this.broadcast(

      JSON.stringify({

        type: "join",

        user: username,

        timestamp: Date.now(),

      } satisfies Message),

      [connection.id], // Do not send to the joining user

    );

  }


  onMessage(connection: Connection, message: WSMessage) {

    if (typeof message !== "string") return;


    const { username } = connection.state as { username: string };


    this.broadcast(

      JSON.stringify({

        type: "message",

        user: username,

        text: message,

        timestamp: Date.now(),

      } satisfies Message),

    );

  }


  onClose(connection: Connection) {

    const { username } = (connection.state as { username: string }) || {};

    if (username) {

      this.broadcast(

        JSON.stringify({

          type: "leave",

          user: username,

          timestamp: Date.now(),

        } satisfies Message),

      );

    }

  }

}


Explain Code

抑制协议消息

默认情况下,Agent 会向每个连接发送 JSON 文本帧(身份、状态同步、MCP 服务器列表)。重写 shouldSendProtocolMessages 可以为特定连接抑制这些消息——例如,无法处理 JSON 文本帧的纯二进制客户端:

JavaScript


export class IoTAgent extends Agent {

  shouldSendProtocolMessages(connection, ctx) {

    const url = new URL(ctx.request.url);

    return url.searchParams.get("protocol") !== "binary";

  }

}


TypeScript


export class IoTAgent extends Agent {

  shouldSendProtocolMessages(

    connection: Connection,

    ctx: ConnectionContext,

  ): boolean {

    const url = new URL(ctx.request.url);

    return url.searchParams.get("protocol") !== "binary";

  }

}


当其返回 false 时,该连接不再接收身份、状态或 MCP 服务器列表帧——无论是连接时还是通过广播。该连接仍然可以正常发送和接收消息、使用 RPC,以及参与所有非协议通信。

使用 isConnectionProtocolEnabled(connection) 可以在运行时检查任意连接的状态。

Agent 属性

这些属性可以在任意 Agent 方法内通过 this 访问:

属性类型描述
this.namestring该 Agent 的实例名
this.stateState当前的 Agent 状态(从 SQLite 延迟加载)
this.envEnvWorker 环境绑定
this.ctxDurableObjectStateDurable Object 上下文(storage、alarms 等)
this.sqltemplate tag用于对 Agent 的 SQLite 存储执行查询的 SQL 模板标签
this.mcpMCPClientManager用于连接外部 MCP 服务器的 MCP 客户端管理器

从客户端连接

对于浏览器连接,使用 Agents 客户端 SDK:

  • 原生 JS:来自 agents/clientAgentClient
  • React:来自 agents/reactuseAgent hook

完整文档请参阅 Client SDK

后续步骤

状态同步 在 Agent 与客户端之间同步状态。

可调用方法 通过 WebSocket 进行 RPC 方法调用。

跨域认证 在跨域环境下确保 WebSocket 连接安全。