Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

可调用方法

可调用方法允许客户端通过 WebSocket 使用 RPC(远程过程调用)调用 agent 方法。用 @callable() 标记方法,即可将其暴露给浏览器、移动应用或其他服务等外部客户端。

概览

JavaScript


import { Agent, callable } from "agents";


export class MyAgent extends Agent {

  @callable()

  async greet(name) {

    return `Hello, ${name}!`;

  }

}


TypeScript


import { Agent, callable } from "agents";


export class MyAgent extends Agent {

  @callable()

  async greet(name: string): Promise<string> {

    return `Hello, ${name}!`;

  }

}


JavaScript


// Client

const result = await agent.stub.greet("World");

console.log(result); // "Hello, World!"


TypeScript


// Client

const result = await agent.stub.greet("World");

console.log(result); // "Hello, World!"


工作原理

sequenceDiagram participant Client participant Agent Client->>Agent: agent.stub.greet(“World”) Note right of Agent: Check @callable
Execute method Agent–>>Client: “Hello, World!”

何时使用 @callable()

场景使用方式
浏览器/移动端调用 agent@callable()
外部服务调用 agent@callable()
Worker 调用 agent(同一代码库)Durable Object RPC(无需装饰器)
Agent 调用另一个 agent通过 getAgentByName() 使用 Durable Object RPC

@callable() 装饰器专门用于来自外部客户端的、基于 WebSocket 的 RPC。如果是在同一个 Worker 内或另一个 agent 中调用,直接使用标准的 Durable Object RPC

基本用法

定义可调用方法

为任何想要暴露的方法添加 @callable() 装饰器:

JavaScript


import { Agent, callable } from "agents";


export class CounterAgent extends Agent {

  initialState = { count: 0, items: [] };


  @callable()

  increment() {

    this.setState({ ...this.state, count: this.state.count + 1 });

    return this.state.count;

  }


  @callable()

  decrement() {

    this.setState({ ...this.state, count: this.state.count - 1 });

    return this.state.count;

  }


  @callable()

  async addItem(item) {

    this.setState({ ...this.state, items: [...this.state.items, item] });

    return this.state.items;

  }


  @callable()

  getStats() {

    return {

      count: this.state.count,

      itemCount: this.state.items.length,

    };

  }

}


TypeScript


import { Agent, callable } from "agents";


export type CounterState = {

  count: number;

  items: string[];

};


export class CounterAgent extends Agent<Env, CounterState> {

  initialState: CounterState = { count: 0, items: [] };


  @callable()

  increment(): number {

    this.setState({ ...this.state, count: this.state.count + 1 });

    return this.state.count;

  }


  @callable()

  decrement(): number {

    this.setState({ ...this.state, count: this.state.count - 1 });

    return this.state.count;

  }


  @callable()

  async addItem(item: string): Promise<string[]> {

    this.setState({ ...this.state, items: [...this.state.items, item] });

    return this.state.items;

  }


  @callable()

  getStats(): { count: number; itemCount: number } {

    return {

      count: this.state.count,

      itemCount: this.state.items.length,

    };

  }

}


从客户端调用

有两种方式在客户端调用方法:

使用 agent.stub(推荐):

JavaScript


// Clean, typed syntax

const count = await agent.stub.increment();

const items = await agent.stub.addItem("new item");

const stats = await agent.stub.getStats();


TypeScript


// Clean, typed syntax

const count = await agent.stub.increment();

const items = await agent.stub.addItem("new item");

const stats = await agent.stub.getStats();


使用 agent.call():

JavaScript


// Explicit method name as string

const count = await agent.call("increment");

const items = await agent.call("addItem", ["new item"]);

const stats = await agent.call("getStats");


TypeScript


// Explicit method name as string

const count = await agent.call("increment");

const items = await agent.call("addItem", ["new item"]);

const stats = await agent.call("getStats");


stub 代理提供了更好的开发体验和 TypeScript 支持。

方法签名

可序列化类型

参数和返回值必须可被 JSON 序列化:

JavaScript


// Valid - primitives and plain objects

class MyAgent extends Agent {

  @callable()

  processData(input) {

    return { result: true };

  }

}


// Valid - arrays

class MyAgent extends Agent {

  @callable()

  processItems(items) {

    return items.map((item) => item.length);

  }

}


// Invalid - non-serializable types

// Functions, Dates, Maps, Sets, etc. cannot be serialized


TypeScript


// Valid - primitives and plain objects

class MyAgent extends Agent {

  @callable()

  processData(input: { name: string; count: number }): { result: boolean } {

    return { result: true };

  }

}


// Valid - arrays

class MyAgent extends Agent {

  @callable()

  processItems(items: string[]): number[] {

    return items.map((item) => item.length);

  }

}


// Invalid - non-serializable types

// Functions, Dates, Maps, Sets, etc. cannot be serialized


异步方法

同步和异步方法都可以使用:

JavaScript


// Sync method

class MyAgent extends Agent {

  @callable()

  add(a, b) {

    return a + b;

  }

}


// Async method

class MyAgent extends Agent {

  @callable()

  async fetchUser(id) {

    const user = await this.sql`SELECT * FROM users WHERE id = ${id}`;

    return user[0];

  }

}


TypeScript


// Sync method

class MyAgent extends Agent {

  @callable()

  add(a: number, b: number): number {

    return a + b;

  }

}


// Async method

class MyAgent extends Agent {

  @callable()

  async fetchUser(id: string): Promise<User> {

    const user = await this.sql`SELECT * FROM users WHERE id = ${id}`;

    return user[0];

  }

}


Void 方法

不返回值的方法:

JavaScript


class MyAgent extends Agent {

  @callable()

  async logEvent(event) {

    await this.sql`INSERT INTO events (name) VALUES (${event})`;

  }

}


TypeScript


class MyAgent extends Agent {

  @callable()

  async logEvent(event: string): Promise<void> {

    await this.sql`INSERT INTO events (name) VALUES (${event})`;

  }

}


在客户端,这些方法仍返回一个 Promise,在方法执行完成时 resolve:

JavaScript


await agent.stub.logEvent("user-clicked");

// Resolves when the server confirms execution


TypeScript


await agent.stub.logEvent("user-clicked");

// Resolves when the server confirms execution


流式响应

对于会随时间产生数据的方法(例如 AI 文本生成),使用流式响应:

定义流式方法

JavaScript


import { Agent, callable } from "agents";


export class AIAgent extends Agent {

  @callable({ streaming: true })

  async generateText(stream, prompt) {

    // First parameter is always StreamingResponse for streaming methods


    for await (const chunk of this.llm.stream(prompt)) {

      stream.send(chunk); // Send each chunk to the client

    }


    stream.end(); // Signal completion

  }


  @callable({ streaming: true })

  async streamNumbers(stream, count) {

    for (let i = 0; i < count; i++) {

      stream.send(i);

      await new Promise((resolve) => setTimeout(resolve, 100));

    }

    stream.end(count); // Optional final value

  }

}


TypeScript


import { Agent, callable, type StreamingResponse } from "agents";


export class AIAgent extends Agent {

  @callable({ streaming: true })

  async generateText(stream: StreamingResponse, prompt: string) {

    // First parameter is always StreamingResponse for streaming methods


    for await (const chunk of this.llm.stream(prompt)) {

      stream.send(chunk); // Send each chunk to the client

    }


    stream.end(); // Signal completion

  }


  @callable({ streaming: true })

  async streamNumbers(stream: StreamingResponse, count: number) {

    for (let i = 0; i < count; i++) {

      stream.send(i);

      await new Promise((resolve) => setTimeout(resolve, 100));

    }

    stream.end(count); // Optional final value

  }

}


在客户端消费流

JavaScript


// Preferred format (supports timeout and other options)

await agent.call("generateText", [prompt], {

  stream: {

    onChunk: (chunk) => {

      // Called for each chunk

      appendToOutput(chunk);

    },

    onDone: (finalValue) => {

      // Called when stream ends

      console.log("Stream complete", finalValue);

    },

    onError: (error) => {

      // Called if an error occurs

      console.error("Stream error:", error);

    },

  },

});


// Legacy format (still supported for backward compatibility)

await agent.call("generateText", [prompt], {

  onChunk: (chunk) => appendToOutput(chunk),

  onDone: (finalValue) => console.log("Done", finalValue),

  onError: (error) => console.error("Error:", error),

});


TypeScript


// Preferred format (supports timeout and other options)

await agent.call("generateText", [prompt], {

  stream: {

    onChunk: (chunk) => {

      // Called for each chunk

      appendToOutput(chunk);

    },

    onDone: (finalValue) => {

      // Called when stream ends

      console.log("Stream complete", finalValue);

    },

    onError: (error) => {

      // Called if an error occurs

      console.error("Stream error:", error);

    },

  },

});


// Legacy format (still supported for backward compatibility)

await agent.call("generateText", [prompt], {

  onChunk: (chunk) => appendToOutput(chunk),

  onDone: (finalValue) => console.log("Done", finalValue),

  onError: (error) => console.error("Error:", error),

});


StreamingResponse API

方法描述
send(chunk)向客户端发送一个数据块
end(finalChunk?)结束流,可选地附带一个最终值
error(message)向客户端发送错误并关闭流

JavaScript


class MyAgent extends Agent {

  @callable({ streaming: true })

  async processWithProgress(stream, items) {

    for (let i = 0; i < items.length; i++) {

      await this.process(items[i]);

      stream.send({ progress: (i + 1) / items.length, item: items[i] });

    }

    stream.end({ completed: true, total: items.length });

  }

}


TypeScript


class MyAgent extends Agent {

  @callable({ streaming: true })

  async processWithProgress(stream: StreamingResponse, items: string[]) {

    for (let i = 0; i < items.length; i++) {

      await this.process(items[i]);

      stream.send({ progress: (i + 1) / items.length, item: items[i] });

    }

    stream.end({ completed: true, total: items.length });

  }

}


TypeScript 集成

类型化的客户端调用

将 agent 类作为类型参数传入,以获得完整的类型安全:

JavaScript


import { useAgent } from "agents/react";

function App() {

  const agent = useAgent({

    agent: "MyAgent",

    name: "default",

  });


  async function handleGreet() {

    // TypeScript knows the method signature

    const result = await agent.stub.greet("World");

    // ^? string

  }


  // TypeScript catches errors

  // await agent.stub.greet(123); // Error: Argument of type 'number' is not assignable

  // await agent.stub.nonExistent(); // Error: Property 'nonExistent' does not exist

}


TypeScript


import { useAgent } from "agents/react";

import type { MyAgent } from "./server";


function App() {

  const agent = useAgent<MyAgent>({

    agent: "MyAgent",

    name: "default",

  });


  async function handleGreet() {

    // TypeScript knows the method signature

    const result = await agent.stub.greet("World");

    // ^? string

  }


  // TypeScript catches errors

  // await agent.stub.greet(123); // Error: Argument of type 'number' is not assignable

  // await agent.stub.nonExistent(); // Error: Property 'nonExistent' does not exist

}


排除非可调用方法

如果你有未使用 @callable() 装饰的方法,可以将它们从类型中排除:

JavaScript


class MyAgent extends Agent {

  @callable()

  publicMethod() {

    return "public";

  }


  // Not callable from clients

  internalMethod() {

    // internal logic

  }

}


// Exclude internal methods from the client type

const agent = useAgent({

  agent: "MyAgent",

});


agent.stub.publicMethod(); // Works

// agent.stub.internalMethod(); // TypeScript error


TypeScript


class MyAgent extends Agent {

  @callable()

  publicMethod(): string {

    return "public";

  }


  // Not callable from clients

  internalMethod(): void {

    // internal logic

  }

}


// Exclude internal methods from the client type

const agent = useAgent<Omit<MyAgent, "internalMethod">>({

  agent: "MyAgent",

});


agent.stub.publicMethod(); // Works

// agent.stub.internalMethod(); // TypeScript error


错误处理

在可调用方法中抛出错误

可调用方法中抛出的错误会被传播到客户端:

JavaScript


class MyAgent extends Agent {

  @callable()

  async riskyOperation(data) {

    if (!isValid(data)) {

      throw new Error("Invalid data format");

    }


    try {

      await this.processData(data);

    } catch (e) {

      throw new Error("Processing failed: " + e.message);

    }

  }

}


TypeScript


class MyAgent extends Agent {

  @callable()

  async riskyOperation(data: unknown): Promise<void> {

    if (!isValid(data)) {

      throw new Error("Invalid data format");

    }


    try {

      await this.processData(data);

    } catch (e) {

      throw new Error("Processing failed: " + e.message);

    }

  }

}


客户端错误处理

JavaScript


try {

  const result = await agent.stub.riskyOperation(data);

} catch (error) {

  // Error thrown by the agent method

  console.error("RPC failed:", error.message);

}


TypeScript


try {

  const result = await agent.stub.riskyOperation(data);

} catch (error) {

  // Error thrown by the agent method

  console.error("RPC failed:", error.message);

}


流式错误处理

对于流式方法,使用 onError 回调:

JavaScript


await agent.call("streamData", [input], {

  stream: {

    onChunk: (chunk) => handleChunk(chunk),

    onError: (errorMessage) => {

      console.error("Stream error:", errorMessage);

      showErrorUI(errorMessage);

    },

    onDone: (result) => handleComplete(result),

  },

});


TypeScript


await agent.call("streamData", [input], {

  stream: {

    onChunk: (chunk) => handleChunk(chunk),

    onError: (errorMessage) => {

      console.error("Stream error:", errorMessage);

      showErrorUI(errorMessage);

    },

    onDone: (result) => handleComplete(result),

  },

});


在服务端,你可以使用 stream.error() 优雅地在流中途发送错误:

JavaScript


class MyAgent extends Agent {

  @callable({ streaming: true })

  async processItems(stream, items) {

    for (const item of items) {

      try {

        const result = await this.process(item);

        stream.send(result);

      } catch (e) {

        stream.error(`Failed to process ${item}: ${e.message}`);

        return; // Stream is now closed

      }

    }

    stream.end();

  }

}


TypeScript


class MyAgent extends Agent {

  @callable({ streaming: true })

  async processItems(stream: StreamingResponse, items: string[]) {

    for (const item of items) {

      try {

        const result = await this.process(item);

        stream.send(result);

      } catch (e) {

        stream.error(`Failed to process ${item}: ${e.message}`);

        return; // Stream is now closed

      }

    }

    stream.end();

  }

}


连接错误

如果在 RPC 调用挂起期间 WebSocket 连接关闭,这些调用会自动以 “Connection closed” 错误 reject:

JavaScript


try {

  const result = await agent.call("longRunningMethod", []);

} catch (error) {

  if (error.message === "Connection closed") {

    // Handle disconnection

    console.log("Lost connection to agent");

  }

}


TypeScript


try {

  const result = await agent.call("longRunningMethod", []);

} catch (error) {

  if (error.message === "Connection closed") {

    // Handle disconnection

    console.log("Lost connection to agent");

  }

}


重连后重试

客户端在断开后会自动重连。如果想在重连后重试一个失败的调用,在重试前 await agent.ready:

JavaScript


async function callWithRetry(agent, method, args = []) {

  try {

    return await agent.call(method, args);

  } catch (error) {

    if (error.message === "Connection closed") {

      await agent.ready; // Wait for reconnection

      return await agent.call(method, args); // Retry once

    }

    throw error;

  }

}


// Usage

const result = await callWithRetry(agent, "processData", [data]);


TypeScript


async function callWithRetry<T>(

  agent: AgentClient,

  method: string,

  args: unknown[] = [],

): Promise<T> {

  try {

    return await agent.call(method, args);

  } catch (error) {

    if (error.message === "Connection closed") {

      await agent.ready; // Wait for reconnection

      return await agent.call(method, args); // Retry once

    }

    throw error;

  }

}


// Usage

const result = await callWithRetry(agent, "processData", [data]);


注意

只对幂等操作进行重试。如果服务器收到了请求但响应到达前连接就断开,重试可能导致重复执行。

何时不该使用 @callable

Worker 调用 Agent

当从同一个 Worker 调用 agent 时(例如在你的 fetch 处理函数中),直接使用 Durable Object RPC:

JavaScript


import { getAgentByName } from "agents";


export default {

  async fetch(request, env) {

    // Get the agent stub

    const agent = await getAgentByName(env.MyAgent, "instance-name");


    // Call methods directly - no @callable needed

    const result = await agent.processData(data);


    return Response.json(result);

  },

};


TypeScript


import { getAgentByName } from "agents";


export default {

  async fetch(request: Request, env: Env) {

    // Get the agent stub

    const agent = await getAgentByName(env.MyAgent, "instance-name");


    // Call methods directly - no @callable needed

    const result = await agent.processData(data);


    return Response.json(result);

  },

} satisfies ExportedHandler<Env>;


Agent 之间的调用

当一个 agent 需要调用另一个 agent 时:

JavaScript


class OrchestratorAgent extends Agent {

  async delegateWork(taskId) {

    // Get another agent

    const worker = await getAgentByName(this.env.WorkerAgent, taskId);


    // Call its methods directly

    const result = await worker.doWork();


    return result;

  }

}


TypeScript


class OrchestratorAgent extends Agent {

  async delegateWork(taskId: string) {

    // Get another agent

    const worker = await getAgentByName(this.env.WorkerAgent, taskId);


    // Call its methods directly

    const result = await worker.doWork();


    return result;

  }

}


为什么要做这种区分?

RPC 类型传输方式使用场景
@callableWebSocket外部客户端(浏览器、应用)
Durable Object RPC内部Worker 到 Agent、Agent 到 Agent

由于 Durable Object RPC 不需要经过 WebSocket 序列化,因此对内部调用更高效。@callable 装饰器为外部客户端添加了必要的 WebSocket RPC 处理。

API 参考

@callable(metadata?) 装饰器

将方法标记为可被外部客户端调用。

JavaScript


import { callable } from "agents";


class MyAgent extends Agent {

  @callable()

  method() {}


  @callable({ streaming: true })

  streamingMethod(stream) {}


  @callable({ description: "Fetches user data" })

  getUser(id) {}

}


TypeScript


import { callable } from "agents";


class MyAgent extends Agent {

  @callable()

  method(): void {}


  @callable({ streaming: true })

  streamingMethod(stream: StreamingResponse): void {}


  @callable({ description: "Fetches user data" })

  getUser(id: string): User {}

}


CallableMetadata 类型

TypeScript


type CallableMetadata = {

  /** Optional description of what the method does */

  description?: string;

  /** Whether the method supports streaming responses */

  streaming?: boolean;

};


StreamingResponse 类

在流式可调用方法中用于向客户端发送数据。

JavaScript


import {} from "agents";


class MyAgent extends Agent {

  @callable({ streaming: true })

  async streamData(stream, input) {

    stream.send("chunk 1");

    stream.send("chunk 2");

    stream.end("final");

  }

}


TypeScript


import { type StreamingResponse } from "agents";


class MyAgent extends Agent {

  @callable({ streaming: true })

  async streamData(stream: StreamingResponse, input: string) {

    stream.send("chunk 1");

    stream.send("chunk 2");

    stream.end("final");

  }

}


方法签名描述
send(chunk: unknown) => void向客户端发送一个数据块
end(finalChunk?: unknown) => void结束流
error(message: string) => void发送错误并关闭流

客户端方法

方法签名描述
agent.call(method, args?, options?) => Promise按名称调用方法
agent.stubProxy类型化的方法调用

JavaScript


// Using call()

await agent.call("methodName", [arg1, arg2]);

await agent.call("streamMethod", [arg], {

  stream: { onChunk, onDone, onError },

});


// With timeout (rejects if call does not complete in time)

await agent.call("slowMethod", [], { timeout: 5000 });


// Using stub

await agent.stub.methodName(arg1, arg2);


TypeScript


// Using call()

await agent.call("methodName", [arg1, arg2]);

await agent.call("streamMethod", [arg], {

  stream: { onChunk, onDone, onError },

});


// With timeout (rejects if call does not complete in time)

await agent.call("slowMethod", [], { timeout: 5000 });


// Using stub

await agent.stub.methodName(arg1, arg2);


CallOptions 类型

TypeScript


type CallOptions = {

  /** Timeout in milliseconds. Rejects if call does not complete in time. */

  timeout?: number;

  /** Streaming options */

  stream?: {

    onChunk?: (chunk: unknown) => void;

    onDone?: (finalChunk: unknown) => void;

    onError?: (error: string) => void;

  };

};


注意

旧格式 { onChunk, onDone, onError }(不嵌套在 stream 下)仍然受支持。客户端会自动检测你使用的是哪种格式。

getCallableMethods() 方法

返回 agent 上所有可调用方法及其元数据的 map。便于内省和自动生成文档。

JavaScript


const methods = agent.getCallableMethods();

// Map<string, CallableMetadata>


for (const [name, meta] of methods) {

  console.log(`${name}: ${meta.description || "(no description)"}`);

  if (meta.streaming) console.log("  (streaming)");

}


TypeScript


const methods = agent.getCallableMethods();

// Map<string, CallableMetadata>


for (const [name, meta] of methods) {

  console.log(`${name}: ${meta.description || "(no description)"}`);

  if (meta.streaming) console.log("  (streaming)");

}


故障排查

SyntaxError: Invalid or unexpected token

如果在使用 @callable() 时,你的 dev 服务器报 SyntaxError: Invalid or unexpected token,需要做两件事:

1. 添加 agents/vite 插件 — Vite 8 使用 Oxc 进行转译,Oxc 目前还不支持 TC39 装饰器。这个插件加上了所需的转换:

vite.config.ts


import agents from "agents/vite";


export default defineConfig({

  plugins: [agents(), react(), cloudflare()],

});


2. 继承 agents/tsconfig — 这会设置 "target": "ES2021" 以及所有其他推荐的编译器选项:

tsconfig.json


{

  "extends": "agents/tsconfig"

}


如果你无法继承共享配置,在 tsconfig.json 里手动设置 "target": "ES2021"

警告

不要在 tsconfig.json 中设置 "experimentalDecorators": true。Agents SDK 使用的是 TC39 标准装饰器 ↗,不是 TypeScript 的旧版装饰器。启用 experimentalDecorators 会应用一个不兼容的转换,在运行时悄悄破坏 @callable()

下一步

Agents API Agents SDK 的完整 API 参考。

WebSockets 与客户端的实时双向通信。

状态管理 在 agent 与客户端之间同步状态。