可观测性
Agent 会为每一次重要操作发出结构化事件 —— RPC 调用、状态变更、调度执行、工作流状态切换、MCP 连接等等。这些事件被发布到 diagnostics channel,并默认静默(没有人监听时零开销)。
事件结构
每个事件都包含以下字段:
TypeScript
{
type: "rpc", // what happened
agent: "MyAgent", // which agent class emitted it
name: "user-123", // which agent instance (Durable Object name)
payload: { method: "getWeather" }, // details
timestamp: 1758005142787 // when (ms since epoch)
}
agent 和 name 用于标识事件来源 —— agent 是类名,name 是 Durable Object 实例名。
通道
事件根据类型被路由到八个具名通道:
| 通道 | 事件类型 | 描述 |
|---|---|---|
| agents:state | state:update | 状态同步事件 |
| agents:rpc | rpc, rpc:error | RPC 方法调用与失败 |
| agents:message | message:request, message:response, message:clear, message:cancel, message:error, tool:result, tool:approval | 聊天消息和工具生命周期 |
| agents:schedule | schedule:create, schedule:execute, schedule:cancel, schedule:retry, schedule:error, queue:create, queue:retry, queue:error | 定时任务和队列任务的生命周期 |
| agents:lifecycle | connect, disconnect, destroy | Agent 连接和销毁 |
| agents:workflow | workflow:start, workflow:event, workflow:approved, workflow:rejected, workflow:terminated, workflow:paused, workflow:resumed, workflow:restarted | 工作流状态切换 |
| agents:mcp | mcp:client:preconnect, mcp:client:connect, mcp:client:authorize, mcp:client:discover | MCP client 操作 |
| agents:email | email:receive, email:reply | 邮件处理 |
订阅事件
类型化的 subscribe 帮助函数
agents/observability 提供的 subscribe() 函数可以以类型安全的方式访问指定通道上的事件:
JavaScript
import { subscribe } from "agents/observability";
const unsub = subscribe("rpc", (event) => {
if (event.type === "rpc") {
console.log(`RPC call: ${event.payload.method}`);
}
if (event.type === "rpc:error") {
console.error(
`RPC failed: ${event.payload.method} — ${event.payload.error}`,
);
}
});
// Clean up when done
unsub();
Explain Code
TypeScript
import { subscribe } from "agents/observability";
const unsub = subscribe("rpc", (event) => {
if (event.type === "rpc") {
console.log(`RPC call: ${event.payload.method}`);
}
if (event.type === "rpc:error") {
console.error(
`RPC failed: ${event.payload.method} — ${event.payload.error}`,
);
}
});
// Clean up when done
unsub();
Explain Code
回调函数有完整的类型提示 —— event 会被收窄为该通道里实际会出现的事件类型。
直接使用 diagnostics_channel
你也可以直接使用 Node.js API 进行订阅:
JavaScript
import { subscribe } from "node:diagnostics_channel";
subscribe("agents:schedule", (event) => {
console.log(event);
});
TypeScript
import { subscribe } from "node:diagnostics_channel";
subscribe("agents:schedule", (event) => {
console.log(event);
});
Tail Workers(生产环境)
在生产环境中,所有 diagnostics channel 上的消息都会自动转发到 Tail Workers。Agent 内部不需要写任何订阅代码 —— 只要附加一个 Tail Worker,然后通过 event.diagnosticsChannelEvents 访问事件:
JavaScript
export default {
async tail(events) {
for (const event of events) {
for (const msg of event.diagnosticsChannelEvents) {
// msg.channel is "agents:rpc", "agents:workflow", etc.
// msg.message is the typed event payload
console.log(msg.timestamp, msg.channel, msg.message);
}
}
},
};
Explain Code
TypeScript
export default {
async tail(events) {
for (const event of events) {
for (const msg of event.diagnosticsChannelEvents) {
// msg.channel is "agents:rpc", "agents:workflow", etc.
// msg.message is the typed event payload
console.log(msg.timestamp, msg.channel, msg.message);
}
}
},
};
Explain Code
这让你能在生产环境获得结构化、可过滤的可观测性,同时不会给 Agent 的热路径带来任何额外开销。
自定义可观测性
你可以通过提供自己的 Observability 接口来覆盖默认实现:
JavaScript
import { Agent } from "agents";
const myObservability = {
emit(event) {
// Send to your logging service, filter events, etc.
if (event.type === "rpc:error") {
console.error(event.payload.method, event.payload.error);
}
},
};
class MyAgent extends Agent {
observability = myObservability;
}
Explain Code
TypeScript
import { Agent } from "agents";
import type { Observability } from "agents/observability";
const myObservability: Observability = {
emit(event) {
// Send to your logging service, filter events, etc.
if (event.type === "rpc:error") {
console.error(event.payload.method, event.payload.error);
}
},
};
class MyAgent extends Agent {
override observability = myObservability;
}
Explain Code
把 observability 设置为 undefined 可以关闭所有事件发送:
JavaScript
import { Agent } from "agents";
class MyAgent extends Agent {
observability = undefined;
}
TypeScript
import { Agent } from "agents";
class MyAgent extends Agent {
override observability = undefined;
}
事件参考
RPC 事件
| 类型 | Payload | 触发时机 |
|---|---|---|
| rpc | { method, streaming? } | 调用了带 @callable 的方法 |
| rpc:error | { method, error } | 带 @callable 的方法抛出异常 |
State 事件
| 类型 | Payload | 触发时机 |
|---|---|---|
| state:update | {} | 调用了 setState() |
Message 与 tool 事件(AIChatAgent)
这些事件由 @cloudflare/ai-chat 的 AIChatAgent 发出,用于跟踪聊天消息的生命周期,包括客户端工具的交互。
| 类型 | Payload | 触发时机 |
|---|---|---|
| message:request | {} | 收到一条聊天消息 |
| message:response | {} | 一次聊天响应流式输出完成 |
| message:clear | {} | 聊天历史被清空 |
| message:cancel | { requestId } | 一个流式请求被取消 |
| message:error | { error } | 一次聊天流出现错误 |
| tool:result | { toolCallId, toolName } | 收到客户端工具的结果 |
| tool:approval | { toolCallId, approved } | 一个工具调用被批准或拒绝 |
Schedule 与 queue 事件
| 类型 | Payload | 触发时机 |
|---|---|---|
| schedule:create | { callback, id } | 创建了一个调度 |
| schedule:execute | { callback, id } | 一个调度回调开始执行 |
| schedule:cancel | { callback, id } | 一个调度被取消 |
| schedule:retry | { callback, id, attempt, maxAttempts } | 一个调度回调进入重试 |
| schedule:error | { callback, id, error, attempts } | 一个调度回调在所有重试都失败后失败 |
| queue:create | { callback, id } | 一个任务被入队 |
| queue:retry | { callback, id, attempt, maxAttempts } | 一个队列回调进入重试 |
| queue:error | { callback, id, error, attempts } | 一个队列回调在所有重试都失败后失败 |
Lifecycle 事件
| 类型 | Payload | 触发时机 |
|---|---|---|
| connect | { connectionId } | 建立了一个 WebSocket 连接 |
| disconnect | { connectionId, code, reason } | 一个 WebSocket 连接被关闭 |
| destroy | {} | Agent 被销毁 |
Workflow 事件
| 类型 | Payload | 触发时机 |
|---|---|---|
| workflow:start | { workflowId, workflowName? } | 一个工作流实例启动 |
| workflow:event | { workflowId, eventType? } | 一个事件被发送到某个工作流 |
| workflow:approved | { workflowId, reason? } | 一个工作流被批准 |
| workflow:rejected | { workflowId, reason? } | 一个工作流被拒绝 |
| workflow:terminated | { workflowId, workflowName? } | 一个工作流被终止 |
| workflow:paused | { workflowId, workflowName? } | 一个工作流被暂停 |
| workflow:resumed | { workflowId, workflowName? } | 一个工作流被恢复 |
| workflow:restarted | { workflowId, workflowName? } | 一个工作流被重新启动 |
MCP 事件
| 类型 | Payload | 触发时机 |
|---|---|---|
| mcp:client:preconnect | { serverId } | 即将连接到 MCP 服务器之前 |
| mcp:client:connect | { url, transport, state, error? } | MCP 连接尝试完成或失败 |
| mcp:client:authorize | { serverId, authUrl, clientId? } | MCP OAuth 流程开始 |
| mcp:client:discover | { url?, state?, error?, capability? } | MCP 能力发现成功或失败 |
Email 事件
| 类型 | Payload | 触发时机 |
|---|---|---|
| email:receive | { from, to, subject? } | 收到一封邮件 |
| email:reply | { from, to, subject? } | 发送了一封回复邮件 |
下一步
Configuration wrangler.jsonc 设置与部署。
Tail Workers 把 diagnostics channel 事件转发到 Tail Worker,实现生产环境监控。
Agents API Agents SDK 的完整 API 参考。