Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

可观测性

Agent 会为每一次重要操作发出结构化事件 —— RPC 调用、状态变更、调度执行、工作流状态切换、MCP 连接等等。这些事件被发布到 diagnostics channel,并默认静默(没有人监听时零开销)。

事件结构

每个事件都包含以下字段:

TypeScript


{

  type: "rpc",                        // what happened

  agent: "MyAgent",                   // which agent class emitted it

  name: "user-123",                   // which agent instance (Durable Object name)

  payload: { method: "getWeather" },  // details

  timestamp: 1758005142787            // when (ms since epoch)

}


agentname 用于标识事件来源 —— agent 是类名,name 是 Durable Object 实例名。

通道

事件根据类型被路由到八个具名通道:

通道事件类型描述
agents:statestate:update状态同步事件
agents:rpcrpc, rpc:errorRPC 方法调用与失败
agents:messagemessage:request, message:response, message:clear, message:cancel, message:error, tool:result, tool:approval聊天消息和工具生命周期
agents:scheduleschedule:create, schedule:execute, schedule:cancel, schedule:retry, schedule:error, queue:create, queue:retry, queue:error定时任务和队列任务的生命周期
agents:lifecycleconnect, disconnect, destroyAgent 连接和销毁
agents:workflowworkflow:start, workflow:event, workflow:approved, workflow:rejected, workflow:terminated, workflow:paused, workflow:resumed, workflow:restarted工作流状态切换
agents:mcpmcp:client:preconnect, mcp:client:connect, mcp:client:authorize, mcp:client:discoverMCP client 操作
agents:emailemail:receive, email:reply邮件处理

订阅事件

类型化的 subscribe 帮助函数

agents/observability 提供的 subscribe() 函数可以以类型安全的方式访问指定通道上的事件:

JavaScript


import { subscribe } from "agents/observability";


const unsub = subscribe("rpc", (event) => {

  if (event.type === "rpc") {

    console.log(`RPC call: ${event.payload.method}`);

  }

  if (event.type === "rpc:error") {

    console.error(

      `RPC failed: ${event.payload.method} — ${event.payload.error}`,

    );

  }

});


// Clean up when done

unsub();


Explain Code

TypeScript


import { subscribe } from "agents/observability";


const unsub = subscribe("rpc", (event) => {

  if (event.type === "rpc") {

    console.log(`RPC call: ${event.payload.method}`);

  }

  if (event.type === "rpc:error") {

    console.error(

      `RPC failed: ${event.payload.method} — ${event.payload.error}`,

    );

  }

});


// Clean up when done

unsub();


Explain Code

回调函数有完整的类型提示 —— event 会被收窄为该通道里实际会出现的事件类型。

直接使用 diagnostics_channel

你也可以直接使用 Node.js API 进行订阅:

JavaScript


import { subscribe } from "node:diagnostics_channel";


subscribe("agents:schedule", (event) => {

  console.log(event);

});


TypeScript


import { subscribe } from "node:diagnostics_channel";


subscribe("agents:schedule", (event) => {

  console.log(event);

});


Tail Workers(生产环境)

在生产环境中,所有 diagnostics channel 上的消息都会自动转发到 Tail Workers。Agent 内部不需要写任何订阅代码 —— 只要附加一个 Tail Worker,然后通过 event.diagnosticsChannelEvents 访问事件:

JavaScript


export default {

  async tail(events) {

    for (const event of events) {

      for (const msg of event.diagnosticsChannelEvents) {

        // msg.channel is "agents:rpc", "agents:workflow", etc.

        // msg.message is the typed event payload

        console.log(msg.timestamp, msg.channel, msg.message);

      }

    }

  },

};


Explain Code

TypeScript


export default {

  async tail(events) {

    for (const event of events) {

      for (const msg of event.diagnosticsChannelEvents) {

        // msg.channel is "agents:rpc", "agents:workflow", etc.

        // msg.message is the typed event payload

        console.log(msg.timestamp, msg.channel, msg.message);

      }

    }

  },

};


Explain Code

这让你能在生产环境获得结构化、可过滤的可观测性,同时不会给 Agent 的热路径带来任何额外开销。

自定义可观测性

你可以通过提供自己的 Observability 接口来覆盖默认实现:

JavaScript


import { Agent } from "agents";

const myObservability = {

  emit(event) {

    // Send to your logging service, filter events, etc.

    if (event.type === "rpc:error") {

      console.error(event.payload.method, event.payload.error);

    }

  },

};


class MyAgent extends Agent {

  observability = myObservability;

}


Explain Code

TypeScript


import { Agent } from "agents";

import type { Observability } from "agents/observability";


const myObservability: Observability = {

  emit(event) {

    // Send to your logging service, filter events, etc.

    if (event.type === "rpc:error") {

      console.error(event.payload.method, event.payload.error);

    }

  },

};


class MyAgent extends Agent {

  override observability = myObservability;

}


Explain Code

observability 设置为 undefined 可以关闭所有事件发送:

JavaScript


import { Agent } from "agents";


class MyAgent extends Agent {

  observability = undefined;

}


TypeScript


import { Agent } from "agents";


class MyAgent extends Agent {

  override observability = undefined;

}


事件参考

RPC 事件

类型Payload触发时机
rpc{ method, streaming? }调用了带 @callable 的方法
rpc:error{ method, error }带 @callable 的方法抛出异常

State 事件

类型Payload触发时机
state:update{}调用了 setState()

Message 与 tool 事件(AIChatAgent)

这些事件由 @cloudflare/ai-chatAIChatAgent 发出,用于跟踪聊天消息的生命周期,包括客户端工具的交互。

类型Payload触发时机
message:request{}收到一条聊天消息
message:response{}一次聊天响应流式输出完成
message:clear{}聊天历史被清空
message:cancel{ requestId }一个流式请求被取消
message:error{ error }一次聊天流出现错误
tool:result{ toolCallId, toolName }收到客户端工具的结果
tool:approval{ toolCallId, approved }一个工具调用被批准或拒绝

Schedule 与 queue 事件

类型Payload触发时机
schedule:create{ callback, id }创建了一个调度
schedule:execute{ callback, id }一个调度回调开始执行
schedule:cancel{ callback, id }一个调度被取消
schedule:retry{ callback, id, attempt, maxAttempts }一个调度回调进入重试
schedule:error{ callback, id, error, attempts }一个调度回调在所有重试都失败后失败
queue:create{ callback, id }一个任务被入队
queue:retry{ callback, id, attempt, maxAttempts }一个队列回调进入重试
queue:error{ callback, id, error, attempts }一个队列回调在所有重试都失败后失败

Lifecycle 事件

类型Payload触发时机
connect{ connectionId }建立了一个 WebSocket 连接
disconnect{ connectionId, code, reason }一个 WebSocket 连接被关闭
destroy{}Agent 被销毁

Workflow 事件

类型Payload触发时机
workflow:start{ workflowId, workflowName? }一个工作流实例启动
workflow:event{ workflowId, eventType? }一个事件被发送到某个工作流
workflow:approved{ workflowId, reason? }一个工作流被批准
workflow:rejected{ workflowId, reason? }一个工作流被拒绝
workflow:terminated{ workflowId, workflowName? }一个工作流被终止
workflow:paused{ workflowId, workflowName? }一个工作流被暂停
workflow:resumed{ workflowId, workflowName? }一个工作流被恢复
workflow:restarted{ workflowId, workflowName? }一个工作流被重新启动

MCP 事件

类型Payload触发时机
mcp:client:preconnect{ serverId }即将连接到 MCP 服务器之前
mcp:client:connect{ url, transport, state, error? }MCP 连接尝试完成或失败
mcp:client:authorize{ serverId, authUrl, clientId? }MCP OAuth 流程开始
mcp:client:discover{ url?, state?, error?, capability? }MCP 能力发现成功或失败

Email 事件

类型Payload触发时机
email:receive{ from, to, subject? }收到一封邮件
email:reply{ from, to, subject? }发送了一封回复邮件

下一步

Configuration wrangler.jsonc 设置与部署。

Tail Workers 把 diagnostics channel 事件转发到 Tail Worker,实现生产环境监控。

Agents API Agents SDK 的完整 API 参考。