WebSockets
Agent 支持 WebSocket 连接以实现实时双向通信。本页介绍服务端 WebSocket 处理。客户端连接请参阅 Client SDK。
生命周期 hook
Agent 有多个在不同时刻触发的生命周期 hook:
| Hook | 何时调用 |
|---|---|
| onStart(props?) | Agent 首次启动时调用一次(在任何连接之前) |
| onRequest(request) | 收到 HTTP 请求时(非 WebSocket) |
| onConnect(connection, ctx) | 建立新的 WebSocket 连接时 |
| onMessage(connection, message) | 收到 WebSocket 消息时 |
| onClose(connection, code, reason, wasClean) | WebSocket 连接关闭时 |
| onError(connection, error) | 某连接上发生 WebSocket 错误时 |
| onError(error) | 发生服务端级别错误时(不绑定特定连接) |
| shouldSendProtocolMessages(connection, ctx) | 是否向该连接发送协议消息(身份、状态、MCP)。默认值:true |
onStart
onStart() 在 Agent 首次启动时调用一次,在任何连接建立之前:
JavaScript
export class MyAgent extends Agent {
async onStart() {
// Initialize resources
console.log(`Agent ${this.name} starting...`);
// Load data from storage
const savedData = this.sql`SELECT * FROM cache`;
for (const row of savedData) {
// Rebuild in-memory state from persistent storage
}
}
onConnect(connection) {
// By the time connections arrive, onStart has completed
}
}
Explain Code
TypeScript
export class MyAgent extends Agent {
async onStart() {
// Initialize resources
console.log(`Agent ${this.name} starting...`);
// Load data from storage
const savedData = this.sql`SELECT * FROM cache`;
for (const row of savedData) {
// Rebuild in-memory state from persistent storage
}
}
onConnect(connection: Connection) {
// By the time connections arrive, onStart has completed
}
}
Explain Code
处理连接
在 Agent 上定义 onConnect 和 onMessage 方法以接受 WebSocket 连接:
JavaScript
import { Agent, Connection, ConnectionContext, WSMessage } from "agents";
export class ChatAgent extends Agent {
async onConnect(connection, ctx) {
// Connections are automatically accepted
// Access the original request for auth, headers, cookies
const url = new URL(ctx.request.url);
const token = url.searchParams.get("token");
if (!token) {
connection.close(4001, "Unauthorized");
return;
}
// Store user info on this connection
connection.setState({ authenticated: true });
}
async onMessage(connection, message) {
if (typeof message === "string") {
// Handle text message
const data = JSON.parse(message);
connection.send(JSON.stringify({ received: data }));
}
}
}
Explain Code
TypeScript
import { Agent, Connection, ConnectionContext, WSMessage } from "agents";
export class ChatAgent extends Agent {
async onConnect(connection: Connection, ctx: ConnectionContext) {
// Connections are automatically accepted
// Access the original request for auth, headers, cookies
const url = new URL(ctx.request.url);
const token = url.searchParams.get("token");
if (!token) {
connection.close(4001, "Unauthorized");
return;
}
// Store user info on this connection
connection.setState({ authenticated: true });
}
async onMessage(connection: Connection, message: WSMessage) {
if (typeof message === "string") {
// Handle text message
const data = JSON.parse(message);
connection.send(JSON.stringify({ received: data }));
}
}
}
Explain Code
Connection 对象
每个连接的客户端都有一个唯一的 Connection 对象:
| 属性/方法 | 类型 | 描述 |
|---|---|---|
| id | string | 该连接的唯一标识符 |
| uri | string | null | 原始 WebSocket 升级请求的 URL。在休眠后仍然存在 |
| state | State | 每连接的状态对象 |
| setState(state) | void | 更新连接状态 |
| send(message) | void | 向该客户端发送消息 |
| close(code?, reason?) | void | 关闭连接 |
| tags | readonly string[] | 通过 getConnectionTags 分配的标签。第一个标签始终是连接 ID |
| server | string | Agent 实例名称(等同于 Agent 上的 this.name) |
每连接的状态
存储每个连接独有的数据(用户信息、偏好等):
JavaScript
export class ChatAgent extends Agent {
async onConnect(connection, ctx) {
const userId = new URL(ctx.request.url).searchParams.get("userId");
connection.setState({
userId: userId || "anonymous",
role: "user",
joinedAt: Date.now(),
});
}
async onMessage(connection, message) {
// Access connection-specific state
console.log(`Message from ${connection.state.userId}`);
}
}
Explain Code
TypeScript
interface ConnectionState {
userId: string;
role: "admin" | "user";
joinedAt: number;
}
export class ChatAgent extends Agent {
async onConnect(
connection: Connection<ConnectionState>,
ctx: ConnectionContext,
) {
const userId = new URL(ctx.request.url).searchParams.get("userId");
connection.setState({
userId: userId || "anonymous",
role: "user",
joinedAt: Date.now(),
});
}
async onMessage(connection: Connection<ConnectionState>, message: WSMessage) {
// Access connection-specific state
console.log(`Message from ${connection.state.userId}`);
}
}
Explain Code
向所有客户端广播
使用 this.broadcast() 向所有连接的客户端发送消息:
JavaScript
export class ChatAgent extends Agent {
async onMessage(connection, message) {
// Broadcast to all connected clients
this.broadcast(
JSON.stringify({
from: connection.id,
message: message,
timestamp: Date.now(),
}),
);
}
// Broadcast from any method
async notifyAll(event, data) {
this.broadcast(JSON.stringify({ event, data }));
}
}
Explain Code
TypeScript
export class ChatAgent extends Agent {
async onMessage(connection: Connection, message: WSMessage) {
// Broadcast to all connected clients
this.broadcast(
JSON.stringify({
from: connection.id,
message: message,
timestamp: Date.now(),
}),
);
}
// Broadcast from any method
async notifyAll(event: string, data: unknown) {
this.broadcast(JSON.stringify({ event, data }));
}
}
Explain Code
排除某些连接
传入连接 ID 数组,以从广播中排除它们:
JavaScript
// Broadcast to everyone except the sender
this.broadcast(
JSON.stringify({ type: "user-typing", userId: "123" }),
[connection.id], // Do not send to the originator
);
TypeScript
// Broadcast to everyone except the sender
this.broadcast(
JSON.stringify({ type: "user-typing", userId: "123" }),
[connection.id], // Do not send to the originator
);
连接标签
为连接打标签以便过滤。重写 getConnectionTags(),在建立连接时分配标签:
JavaScript
export class ChatAgent extends Agent {
getConnectionTags(connection, ctx) {
const url = new URL(ctx.request.url);
const role = url.searchParams.get("role");
const tags = [];
if (role === "admin") tags.push("admin");
if (role === "moderator") tags.push("moderator");
return tags; // Up to 9 tags, max 256 chars each
}
// Later, broadcast only to admins
notifyAdmins(message) {
for (const conn of this.getConnections("admin")) {
conn.send(message);
}
}
}
Explain Code
TypeScript
export class ChatAgent extends Agent {
getConnectionTags(connection: Connection, ctx: ConnectionContext): string[] {
const url = new URL(ctx.request.url);
const role = url.searchParams.get("role");
const tags: string[] = [];
if (role === "admin") tags.push("admin");
if (role === "moderator") tags.push("moderator");
return tags; // Up to 9 tags, max 256 chars each
}
// Later, broadcast only to admins
notifyAdmins(message: string) {
for (const conn of this.getConnections("admin")) {
conn.send(message);
}
}
}
Explain Code
连接管理方法
| 方法 | 签名 | 描述 |
|---|---|---|
| getConnections | (tag?: string) => Iterable<Connection> | 获取所有连接,可按 tag 过滤 |
| getConnection | (id: string) => Connection | undefined | 按 ID 获取连接 |
| getConnectionTags | (connection, ctx) => string[] | 重写以为连接打标签 |
| broadcast | (message, without?: string[]) => void | 向所有连接发送 |
| isConnectionReadonly | (connection) => boolean | 检查连接是否只读 |
| isConnectionProtocolEnabled | (connection) => boolean | 检查该连接是否启用了协议消息 |
处理二进制数据
消息可以是字符串或二进制(ArrayBuffer / ArrayBufferView):
JavaScript
export class FileAgent extends Agent {
async onMessage(connection, message) {
if (message instanceof ArrayBuffer) {
// Handle binary upload
const bytes = new Uint8Array(message);
await this.processFile(bytes);
connection.send(
JSON.stringify({ status: "received", size: bytes.length }),
);
} else if (typeof message === "string") {
// Handle text command
const command = JSON.parse(message);
// ...
}
}
}
Explain Code
TypeScript
export class FileAgent extends Agent {
async onMessage(connection: Connection, message: WSMessage) {
if (message instanceof ArrayBuffer) {
// Handle binary upload
const bytes = new Uint8Array(message);
await this.processFile(bytes);
connection.send(
JSON.stringify({ status: "received", size: bytes.length }),
);
} else if (typeof message === "string") {
// Handle text command
const command = JSON.parse(message);
// ...
}
}
}
Explain Code
Note
Agent 会自动向每个连接发送 JSON 文本帧(身份、状态、MCP 服务器)。如果你的客户端只处理二进制数据且无法处理这些帧,使用 shouldSendProtocolMessages 来抑制它们。
错误与关闭处理
处理连接错误和断开。onError 方法有两个重载——一个用于 WebSocket 连接错误,一个用于服务端级别错误:
JavaScript
export class ChatAgent extends Agent {
// WebSocket connection error
// Server-level error (not tied to a specific connection)
onError(connectionOrError, error) {
if (error) {
console.error(`Connection ${connectionOrError.id} error:`, error);
} else {
console.error("Server error:", connectionOrError);
}
}
async onClose(connection, code, reason, wasClean) {
console.log(`Connection ${connection.id} closed: ${code} ${reason}`);
this.broadcast(
JSON.stringify({
event: "user-left",
userId: connection.state?.userId,
}),
);
}
}
Explain Code
TypeScript
export class ChatAgent extends Agent {
// WebSocket connection error
onError(connection: Connection, error: unknown): void;
// Server-level error (not tied to a specific connection)
onError(error: unknown): void;
onError(connectionOrError: Connection | unknown, error?: unknown) {
if (error) {
console.error(
`Connection ${(connectionOrError as Connection).id} error:`,
error,
);
} else {
console.error("Server error:", connectionOrError);
}
}
async onClose(
connection: Connection,
code: number,
reason: string,
wasClean: boolean,
) {
console.log(`Connection ${connection.id} closed: ${code} ${reason}`);
this.broadcast(
JSON.stringify({
event: "user-left",
userId: connection.state?.userId,
}),
);
}
}
Explain Code
默认 onError 实现会记录错误并重新抛出。重写它以添加自定义错误处理、上报或恢复逻辑。
消息类型
| 类型 | 描述 |
|---|---|
| string | 文本消息(通常是 JSON) |
| ArrayBuffer | 二进制数据 |
| ArrayBufferView | 二进制数据的类型化数组视图 |
休眠 (Hibernation)
Agent 支持休眠——它们可以在不活动时休眠,在需要时唤醒。这在保持 WebSocket 连接的同时节省了资源。
启用休眠
休眠默认启用。要禁用:
JavaScript
export class AlwaysOnAgent extends Agent {
static options = { hibernate: false };
}
TypeScript
export class AlwaysOnAgent extends Agent {
static options = { hibernate: false };
}
休眠如何工作
- Agent 处于活跃状态,处理连接
- 经过一段时间没有消息后,Agent 进入休眠(睡眠)
- WebSocket 连接保持打开(由 Cloudflare 处理)
- 当消息到达时,Agent 唤醒
onMessage像往常一样被调用
休眠后哪些会保留
| 保留 | 不保留 |
|---|---|
| this.state(Agent 状态) | 内存中的变量 |
| connection.state | 定时器/intervals |
| SQLite 数据(this.sql) | 进行中的 Promise |
| 连接元数据 | 本地缓存 |
将重要数据存储在 this.state 或 SQLite 中,而不是类属性中:
JavaScript
export class MyAgent extends Agent {
initialState = { counter: 0 };
// Do not do this - lost on hibernation
localCounter = 0;
onMessage(connection, message) {
// Persists across hibernation
this.setState({ counter: this.state.counter + 1 });
// Lost after hibernation
this.localCounter++;
}
}
Explain Code
TypeScript
export class MyAgent extends Agent<Env, { counter: number }> {
initialState = { counter: 0 };
// Do not do this - lost on hibernation
private localCounter = 0;
onMessage(connection: Connection, message: WSMessage) {
// Persists across hibernation
this.setState({ counter: this.state.counter + 1 });
// Lost after hibernation
this.localCounter++;
}
}
Explain Code
常见模式
在线状态跟踪
使用每连接的状态跟踪谁在线。连接状态在用户断开时会自动清理:
JavaScript
export class PresenceAgent extends Agent {
onConnect(connection, ctx) {
const url = new URL(ctx.request.url);
const name = url.searchParams.get("name") || "Anonymous";
connection.setState({
name,
joinedAt: Date.now(),
lastSeen: Date.now(),
});
// Send current presence to new user
connection.send(
JSON.stringify({
type: "presence",
users: this.getPresence(),
}),
);
// Notify others that someone joined
this.broadcastPresence();
}
onClose(connection) {
// No manual cleanup needed - connection state is automatically gone
this.broadcastPresence();
}
onMessage(connection, message) {
if (message === "ping") {
connection.setState((prev) => ({
...prev,
lastSeen: Date.now(),
}));
connection.send("pong");
}
}
getPresence() {
const users = {};
for (const conn of this.getConnections()) {
if (conn.state) {
users[conn.id] = {
name: conn.state.name,
lastSeen: conn.state.lastSeen,
};
}
}
return users;
}
broadcastPresence() {
this.broadcast(
JSON.stringify({
type: "presence",
users: this.getPresence(),
}),
);
}
}
Explain Code
TypeScript
type UserState = {
name: string;
joinedAt: number;
lastSeen: number;
};
export class PresenceAgent extends Agent {
onConnect(connection: Connection<UserState>, ctx: ConnectionContext) {
const url = new URL(ctx.request.url);
const name = url.searchParams.get("name") || "Anonymous";
connection.setState({
name,
joinedAt: Date.now(),
lastSeen: Date.now(),
});
// Send current presence to new user
connection.send(
JSON.stringify({
type: "presence",
users: this.getPresence(),
}),
);
// Notify others that someone joined
this.broadcastPresence();
}
onClose(connection: Connection) {
// No manual cleanup needed - connection state is automatically gone
this.broadcastPresence();
}
onMessage(connection: Connection<UserState>, message: WSMessage) {
if (message === "ping") {
connection.setState((prev) => ({
...prev!,
lastSeen: Date.now(),
}));
connection.send("pong");
}
}
private getPresence() {
const users: Record<string, { name: string; lastSeen: number }> = {};
for (const conn of this.getConnections<UserState>()) {
if (conn.state) {
users[conn.id] = {
name: conn.state.name,
lastSeen: conn.state.lastSeen,
};
}
}
return users;
}
private broadcastPresence() {
this.broadcast(
JSON.stringify({
type: "presence",
users: this.getPresence(),
}),
);
}
}
Explain Code
带广播的聊天室
JavaScript
export class ChatRoom extends Agent {
onConnect(connection, ctx) {
const url = new URL(ctx.request.url);
const username = url.searchParams.get("username") || "Anonymous";
connection.setState({ username });
// Notify others
this.broadcast(
JSON.stringify({
type: "join",
user: username,
timestamp: Date.now(),
}),
[connection.id], // Do not send to the joining user
);
}
onMessage(connection, message) {
if (typeof message !== "string") return;
const { username } = connection.state;
this.broadcast(
JSON.stringify({
type: "message",
user: username,
text: message,
timestamp: Date.now(),
}),
);
}
onClose(connection) {
const { username } = connection.state || {};
if (username) {
this.broadcast(
JSON.stringify({
type: "leave",
user: username,
timestamp: Date.now(),
}),
);
}
}
}
Explain Code
TypeScript
type Message = {
type: "message" | "join" | "leave";
user: string;
text?: string;
timestamp: number;
};
export class ChatRoom extends Agent {
onConnect(connection: Connection, ctx: ConnectionContext) {
const url = new URL(ctx.request.url);
const username = url.searchParams.get("username") || "Anonymous";
connection.setState({ username });
// Notify others
this.broadcast(
JSON.stringify({
type: "join",
user: username,
timestamp: Date.now(),
} satisfies Message),
[connection.id], // Do not send to the joining user
);
}
onMessage(connection: Connection, message: WSMessage) {
if (typeof message !== "string") return;
const { username } = connection.state as { username: string };
this.broadcast(
JSON.stringify({
type: "message",
user: username,
text: message,
timestamp: Date.now(),
} satisfies Message),
);
}
onClose(connection: Connection) {
const { username } = (connection.state as { username: string }) || {};
if (username) {
this.broadcast(
JSON.stringify({
type: "leave",
user: username,
timestamp: Date.now(),
} satisfies Message),
);
}
}
}
Explain Code
抑制协议消息
默认情况下,Agent 会向每个连接发送 JSON 文本帧(身份、状态同步、MCP 服务器列表)。重写 shouldSendProtocolMessages 可以为特定连接抑制这些消息——例如,无法处理 JSON 文本帧的纯二进制客户端:
JavaScript
export class IoTAgent extends Agent {
shouldSendProtocolMessages(connection, ctx) {
const url = new URL(ctx.request.url);
return url.searchParams.get("protocol") !== "binary";
}
}
TypeScript
export class IoTAgent extends Agent {
shouldSendProtocolMessages(
connection: Connection,
ctx: ConnectionContext,
): boolean {
const url = new URL(ctx.request.url);
return url.searchParams.get("protocol") !== "binary";
}
}
当其返回 false 时,该连接不再接收身份、状态或 MCP 服务器列表帧——无论是连接时还是通过广播。该连接仍然可以正常发送和接收消息、使用 RPC,以及参与所有非协议通信。
使用 isConnectionProtocolEnabled(connection) 可以在运行时检查任意连接的状态。
Agent 属性
这些属性可以在任意 Agent 方法内通过 this 访问:
| 属性 | 类型 | 描述 |
|---|---|---|
| this.name | string | 该 Agent 的实例名 |
| this.state | State | 当前的 Agent 状态(从 SQLite 延迟加载) |
| this.env | Env | Worker 环境绑定 |
| this.ctx | DurableObjectState | Durable Object 上下文(storage、alarms 等) |
| this.sql | template tag | 用于对 Agent 的 SQLite 存储执行查询的 SQL 模板标签 |
| this.mcp | MCPClientManager | 用于连接外部 MCP 服务器的 MCP 客户端管理器 |
从客户端连接
对于浏览器连接,使用 Agents 客户端 SDK:
- 原生 JS:来自
agents/client的AgentClient - React:来自
agents/react的useAgenthook
完整文档请参阅 Client SDK。
后续步骤
状态同步 在 Agent 与客户端之间同步状态。
可调用方法 通过 WebSocket 进行 RPC 方法调用。
跨域认证 在跨域环境下确保 WebSocket 连接安全。