可调用方法
可调用方法允许客户端通过 WebSocket 使用 RPC(远程过程调用)调用 agent 方法。用 @callable() 标记方法,即可将其暴露给浏览器、移动应用或其他服务等外部客户端。
概览
JavaScript
import { Agent, callable } from "agents";
export class MyAgent extends Agent {
@callable()
async greet(name) {
return `Hello, ${name}!`;
}
}
TypeScript
import { Agent, callable } from "agents";
export class MyAgent extends Agent {
@callable()
async greet(name: string): Promise<string> {
return `Hello, ${name}!`;
}
}
JavaScript
// Client
const result = await agent.stub.greet("World");
console.log(result); // "Hello, World!"
TypeScript
// Client
const result = await agent.stub.greet("World");
console.log(result); // "Hello, World!"
工作原理
sequenceDiagram
participant Client
participant Agent
Client->>Agent: agent.stub.greet(“World”)
Note right of Agent: Check @callable
Execute method
Agent–>>Client: “Hello, World!”
何时使用 @callable()
| 场景 | 使用方式 |
|---|---|
| 浏览器/移动端调用 agent | @callable() |
| 外部服务调用 agent | @callable() |
| Worker 调用 agent(同一代码库) | Durable Object RPC(无需装饰器) |
| Agent 调用另一个 agent | 通过 getAgentByName() 使用 Durable Object RPC |
@callable() 装饰器专门用于来自外部客户端的、基于 WebSocket 的 RPC。如果是在同一个 Worker 内或另一个 agent 中调用,直接使用标准的 Durable Object RPC。
基本用法
定义可调用方法
为任何想要暴露的方法添加 @callable() 装饰器:
JavaScript
import { Agent, callable } from "agents";
export class CounterAgent extends Agent {
initialState = { count: 0, items: [] };
@callable()
increment() {
this.setState({ ...this.state, count: this.state.count + 1 });
return this.state.count;
}
@callable()
decrement() {
this.setState({ ...this.state, count: this.state.count - 1 });
return this.state.count;
}
@callable()
async addItem(item) {
this.setState({ ...this.state, items: [...this.state.items, item] });
return this.state.items;
}
@callable()
getStats() {
return {
count: this.state.count,
itemCount: this.state.items.length,
};
}
}
TypeScript
import { Agent, callable } from "agents";
export type CounterState = {
count: number;
items: string[];
};
export class CounterAgent extends Agent<Env, CounterState> {
initialState: CounterState = { count: 0, items: [] };
@callable()
increment(): number {
this.setState({ ...this.state, count: this.state.count + 1 });
return this.state.count;
}
@callable()
decrement(): number {
this.setState({ ...this.state, count: this.state.count - 1 });
return this.state.count;
}
@callable()
async addItem(item: string): Promise<string[]> {
this.setState({ ...this.state, items: [...this.state.items, item] });
return this.state.items;
}
@callable()
getStats(): { count: number; itemCount: number } {
return {
count: this.state.count,
itemCount: this.state.items.length,
};
}
}
从客户端调用
有两种方式在客户端调用方法:
使用 agent.stub(推荐):
JavaScript
// Clean, typed syntax
const count = await agent.stub.increment();
const items = await agent.stub.addItem("new item");
const stats = await agent.stub.getStats();
TypeScript
// Clean, typed syntax
const count = await agent.stub.increment();
const items = await agent.stub.addItem("new item");
const stats = await agent.stub.getStats();
使用 agent.call():
JavaScript
// Explicit method name as string
const count = await agent.call("increment");
const items = await agent.call("addItem", ["new item"]);
const stats = await agent.call("getStats");
TypeScript
// Explicit method name as string
const count = await agent.call("increment");
const items = await agent.call("addItem", ["new item"]);
const stats = await agent.call("getStats");
stub 代理提供了更好的开发体验和 TypeScript 支持。
方法签名
可序列化类型
参数和返回值必须可被 JSON 序列化:
JavaScript
// Valid - primitives and plain objects
class MyAgent extends Agent {
@callable()
processData(input) {
return { result: true };
}
}
// Valid - arrays
class MyAgent extends Agent {
@callable()
processItems(items) {
return items.map((item) => item.length);
}
}
// Invalid - non-serializable types
// Functions, Dates, Maps, Sets, etc. cannot be serialized
TypeScript
// Valid - primitives and plain objects
class MyAgent extends Agent {
@callable()
processData(input: { name: string; count: number }): { result: boolean } {
return { result: true };
}
}
// Valid - arrays
class MyAgent extends Agent {
@callable()
processItems(items: string[]): number[] {
return items.map((item) => item.length);
}
}
// Invalid - non-serializable types
// Functions, Dates, Maps, Sets, etc. cannot be serialized
异步方法
同步和异步方法都可以使用:
JavaScript
// Sync method
class MyAgent extends Agent {
@callable()
add(a, b) {
return a + b;
}
}
// Async method
class MyAgent extends Agent {
@callable()
async fetchUser(id) {
const user = await this.sql`SELECT * FROM users WHERE id = ${id}`;
return user[0];
}
}
TypeScript
// Sync method
class MyAgent extends Agent {
@callable()
add(a: number, b: number): number {
return a + b;
}
}
// Async method
class MyAgent extends Agent {
@callable()
async fetchUser(id: string): Promise<User> {
const user = await this.sql`SELECT * FROM users WHERE id = ${id}`;
return user[0];
}
}
Void 方法
不返回值的方法:
JavaScript
class MyAgent extends Agent {
@callable()
async logEvent(event) {
await this.sql`INSERT INTO events (name) VALUES (${event})`;
}
}
TypeScript
class MyAgent extends Agent {
@callable()
async logEvent(event: string): Promise<void> {
await this.sql`INSERT INTO events (name) VALUES (${event})`;
}
}
在客户端,这些方法仍返回一个 Promise,在方法执行完成时 resolve:
JavaScript
await agent.stub.logEvent("user-clicked");
// Resolves when the server confirms execution
TypeScript
await agent.stub.logEvent("user-clicked");
// Resolves when the server confirms execution
流式响应
对于会随时间产生数据的方法(例如 AI 文本生成),使用流式响应:
定义流式方法
JavaScript
import { Agent, callable } from "agents";
export class AIAgent extends Agent {
@callable({ streaming: true })
async generateText(stream, prompt) {
// First parameter is always StreamingResponse for streaming methods
for await (const chunk of this.llm.stream(prompt)) {
stream.send(chunk); // Send each chunk to the client
}
stream.end(); // Signal completion
}
@callable({ streaming: true })
async streamNumbers(stream, count) {
for (let i = 0; i < count; i++) {
stream.send(i);
await new Promise((resolve) => setTimeout(resolve, 100));
}
stream.end(count); // Optional final value
}
}
TypeScript
import { Agent, callable, type StreamingResponse } from "agents";
export class AIAgent extends Agent {
@callable({ streaming: true })
async generateText(stream: StreamingResponse, prompt: string) {
// First parameter is always StreamingResponse for streaming methods
for await (const chunk of this.llm.stream(prompt)) {
stream.send(chunk); // Send each chunk to the client
}
stream.end(); // Signal completion
}
@callable({ streaming: true })
async streamNumbers(stream: StreamingResponse, count: number) {
for (let i = 0; i < count; i++) {
stream.send(i);
await new Promise((resolve) => setTimeout(resolve, 100));
}
stream.end(count); // Optional final value
}
}
在客户端消费流
JavaScript
// Preferred format (supports timeout and other options)
await agent.call("generateText", [prompt], {
stream: {
onChunk: (chunk) => {
// Called for each chunk
appendToOutput(chunk);
},
onDone: (finalValue) => {
// Called when stream ends
console.log("Stream complete", finalValue);
},
onError: (error) => {
// Called if an error occurs
console.error("Stream error:", error);
},
},
});
// Legacy format (still supported for backward compatibility)
await agent.call("generateText", [prompt], {
onChunk: (chunk) => appendToOutput(chunk),
onDone: (finalValue) => console.log("Done", finalValue),
onError: (error) => console.error("Error:", error),
});
TypeScript
// Preferred format (supports timeout and other options)
await agent.call("generateText", [prompt], {
stream: {
onChunk: (chunk) => {
// Called for each chunk
appendToOutput(chunk);
},
onDone: (finalValue) => {
// Called when stream ends
console.log("Stream complete", finalValue);
},
onError: (error) => {
// Called if an error occurs
console.error("Stream error:", error);
},
},
});
// Legacy format (still supported for backward compatibility)
await agent.call("generateText", [prompt], {
onChunk: (chunk) => appendToOutput(chunk),
onDone: (finalValue) => console.log("Done", finalValue),
onError: (error) => console.error("Error:", error),
});
StreamingResponse API
| 方法 | 描述 |
|---|---|
| send(chunk) | 向客户端发送一个数据块 |
| end(finalChunk?) | 结束流,可选地附带一个最终值 |
| error(message) | 向客户端发送错误并关闭流 |
JavaScript
class MyAgent extends Agent {
@callable({ streaming: true })
async processWithProgress(stream, items) {
for (let i = 0; i < items.length; i++) {
await this.process(items[i]);
stream.send({ progress: (i + 1) / items.length, item: items[i] });
}
stream.end({ completed: true, total: items.length });
}
}
TypeScript
class MyAgent extends Agent {
@callable({ streaming: true })
async processWithProgress(stream: StreamingResponse, items: string[]) {
for (let i = 0; i < items.length; i++) {
await this.process(items[i]);
stream.send({ progress: (i + 1) / items.length, item: items[i] });
}
stream.end({ completed: true, total: items.length });
}
}
TypeScript 集成
类型化的客户端调用
将 agent 类作为类型参数传入,以获得完整的类型安全:
JavaScript
import { useAgent } from "agents/react";
function App() {
const agent = useAgent({
agent: "MyAgent",
name: "default",
});
async function handleGreet() {
// TypeScript knows the method signature
const result = await agent.stub.greet("World");
// ^? string
}
// TypeScript catches errors
// await agent.stub.greet(123); // Error: Argument of type 'number' is not assignable
// await agent.stub.nonExistent(); // Error: Property 'nonExistent' does not exist
}
TypeScript
import { useAgent } from "agents/react";
import type { MyAgent } from "./server";
function App() {
const agent = useAgent<MyAgent>({
agent: "MyAgent",
name: "default",
});
async function handleGreet() {
// TypeScript knows the method signature
const result = await agent.stub.greet("World");
// ^? string
}
// TypeScript catches errors
// await agent.stub.greet(123); // Error: Argument of type 'number' is not assignable
// await agent.stub.nonExistent(); // Error: Property 'nonExistent' does not exist
}
排除非可调用方法
如果你有未使用 @callable() 装饰的方法,可以将它们从类型中排除:
JavaScript
class MyAgent extends Agent {
@callable()
publicMethod() {
return "public";
}
// Not callable from clients
internalMethod() {
// internal logic
}
}
// Exclude internal methods from the client type
const agent = useAgent({
agent: "MyAgent",
});
agent.stub.publicMethod(); // Works
// agent.stub.internalMethod(); // TypeScript error
TypeScript
class MyAgent extends Agent {
@callable()
publicMethod(): string {
return "public";
}
// Not callable from clients
internalMethod(): void {
// internal logic
}
}
// Exclude internal methods from the client type
const agent = useAgent<Omit<MyAgent, "internalMethod">>({
agent: "MyAgent",
});
agent.stub.publicMethod(); // Works
// agent.stub.internalMethod(); // TypeScript error
错误处理
在可调用方法中抛出错误
可调用方法中抛出的错误会被传播到客户端:
JavaScript
class MyAgent extends Agent {
@callable()
async riskyOperation(data) {
if (!isValid(data)) {
throw new Error("Invalid data format");
}
try {
await this.processData(data);
} catch (e) {
throw new Error("Processing failed: " + e.message);
}
}
}
TypeScript
class MyAgent extends Agent {
@callable()
async riskyOperation(data: unknown): Promise<void> {
if (!isValid(data)) {
throw new Error("Invalid data format");
}
try {
await this.processData(data);
} catch (e) {
throw new Error("Processing failed: " + e.message);
}
}
}
客户端错误处理
JavaScript
try {
const result = await agent.stub.riskyOperation(data);
} catch (error) {
// Error thrown by the agent method
console.error("RPC failed:", error.message);
}
TypeScript
try {
const result = await agent.stub.riskyOperation(data);
} catch (error) {
// Error thrown by the agent method
console.error("RPC failed:", error.message);
}
流式错误处理
对于流式方法,使用 onError 回调:
JavaScript
await agent.call("streamData", [input], {
stream: {
onChunk: (chunk) => handleChunk(chunk),
onError: (errorMessage) => {
console.error("Stream error:", errorMessage);
showErrorUI(errorMessage);
},
onDone: (result) => handleComplete(result),
},
});
TypeScript
await agent.call("streamData", [input], {
stream: {
onChunk: (chunk) => handleChunk(chunk),
onError: (errorMessage) => {
console.error("Stream error:", errorMessage);
showErrorUI(errorMessage);
},
onDone: (result) => handleComplete(result),
},
});
在服务端,你可以使用 stream.error() 优雅地在流中途发送错误:
JavaScript
class MyAgent extends Agent {
@callable({ streaming: true })
async processItems(stream, items) {
for (const item of items) {
try {
const result = await this.process(item);
stream.send(result);
} catch (e) {
stream.error(`Failed to process ${item}: ${e.message}`);
return; // Stream is now closed
}
}
stream.end();
}
}
TypeScript
class MyAgent extends Agent {
@callable({ streaming: true })
async processItems(stream: StreamingResponse, items: string[]) {
for (const item of items) {
try {
const result = await this.process(item);
stream.send(result);
} catch (e) {
stream.error(`Failed to process ${item}: ${e.message}`);
return; // Stream is now closed
}
}
stream.end();
}
}
连接错误
如果在 RPC 调用挂起期间 WebSocket 连接关闭,这些调用会自动以 “Connection closed” 错误 reject:
JavaScript
try {
const result = await agent.call("longRunningMethod", []);
} catch (error) {
if (error.message === "Connection closed") {
// Handle disconnection
console.log("Lost connection to agent");
}
}
TypeScript
try {
const result = await agent.call("longRunningMethod", []);
} catch (error) {
if (error.message === "Connection closed") {
// Handle disconnection
console.log("Lost connection to agent");
}
}
重连后重试
客户端在断开后会自动重连。如果想在重连后重试一个失败的调用,在重试前 await agent.ready:
JavaScript
async function callWithRetry(agent, method, args = []) {
try {
return await agent.call(method, args);
} catch (error) {
if (error.message === "Connection closed") {
await agent.ready; // Wait for reconnection
return await agent.call(method, args); // Retry once
}
throw error;
}
}
// Usage
const result = await callWithRetry(agent, "processData", [data]);
TypeScript
async function callWithRetry<T>(
agent: AgentClient,
method: string,
args: unknown[] = [],
): Promise<T> {
try {
return await agent.call(method, args);
} catch (error) {
if (error.message === "Connection closed") {
await agent.ready; // Wait for reconnection
return await agent.call(method, args); // Retry once
}
throw error;
}
}
// Usage
const result = await callWithRetry(agent, "processData", [data]);
注意
只对幂等操作进行重试。如果服务器收到了请求但响应到达前连接就断开,重试可能导致重复执行。
何时不该使用 @callable
Worker 调用 Agent
当从同一个 Worker 调用 agent 时(例如在你的 fetch 处理函数中),直接使用 Durable Object RPC:
JavaScript
import { getAgentByName } from "agents";
export default {
async fetch(request, env) {
// Get the agent stub
const agent = await getAgentByName(env.MyAgent, "instance-name");
// Call methods directly - no @callable needed
const result = await agent.processData(data);
return Response.json(result);
},
};
TypeScript
import { getAgentByName } from "agents";
export default {
async fetch(request: Request, env: Env) {
// Get the agent stub
const agent = await getAgentByName(env.MyAgent, "instance-name");
// Call methods directly - no @callable needed
const result = await agent.processData(data);
return Response.json(result);
},
} satisfies ExportedHandler<Env>;
Agent 之间的调用
当一个 agent 需要调用另一个 agent 时:
JavaScript
class OrchestratorAgent extends Agent {
async delegateWork(taskId) {
// Get another agent
const worker = await getAgentByName(this.env.WorkerAgent, taskId);
// Call its methods directly
const result = await worker.doWork();
return result;
}
}
TypeScript
class OrchestratorAgent extends Agent {
async delegateWork(taskId: string) {
// Get another agent
const worker = await getAgentByName(this.env.WorkerAgent, taskId);
// Call its methods directly
const result = await worker.doWork();
return result;
}
}
为什么要做这种区分?
| RPC 类型 | 传输方式 | 使用场景 |
|---|---|---|
| @callable | WebSocket | 外部客户端(浏览器、应用) |
| Durable Object RPC | 内部 | Worker 到 Agent、Agent 到 Agent |
由于 Durable Object RPC 不需要经过 WebSocket 序列化,因此对内部调用更高效。@callable 装饰器为外部客户端添加了必要的 WebSocket RPC 处理。
API 参考
@callable(metadata?) 装饰器
将方法标记为可被外部客户端调用。
JavaScript
import { callable } from "agents";
class MyAgent extends Agent {
@callable()
method() {}
@callable({ streaming: true })
streamingMethod(stream) {}
@callable({ description: "Fetches user data" })
getUser(id) {}
}
TypeScript
import { callable } from "agents";
class MyAgent extends Agent {
@callable()
method(): void {}
@callable({ streaming: true })
streamingMethod(stream: StreamingResponse): void {}
@callable({ description: "Fetches user data" })
getUser(id: string): User {}
}
CallableMetadata 类型
TypeScript
type CallableMetadata = {
/** Optional description of what the method does */
description?: string;
/** Whether the method supports streaming responses */
streaming?: boolean;
};
StreamingResponse 类
在流式可调用方法中用于向客户端发送数据。
JavaScript
import {} from "agents";
class MyAgent extends Agent {
@callable({ streaming: true })
async streamData(stream, input) {
stream.send("chunk 1");
stream.send("chunk 2");
stream.end("final");
}
}
TypeScript
import { type StreamingResponse } from "agents";
class MyAgent extends Agent {
@callable({ streaming: true })
async streamData(stream: StreamingResponse, input: string) {
stream.send("chunk 1");
stream.send("chunk 2");
stream.end("final");
}
}
| 方法 | 签名 | 描述 |
|---|---|---|
| send | (chunk: unknown) => void | 向客户端发送一个数据块 |
| end | (finalChunk?: unknown) => void | 结束流 |
| error | (message: string) => void | 发送错误并关闭流 |
客户端方法
| 方法 | 签名 | 描述 |
|---|---|---|
| agent.call | (method, args?, options?) => Promise | 按名称调用方法 |
| agent.stub | Proxy | 类型化的方法调用 |
JavaScript
// Using call()
await agent.call("methodName", [arg1, arg2]);
await agent.call("streamMethod", [arg], {
stream: { onChunk, onDone, onError },
});
// With timeout (rejects if call does not complete in time)
await agent.call("slowMethod", [], { timeout: 5000 });
// Using stub
await agent.stub.methodName(arg1, arg2);
TypeScript
// Using call()
await agent.call("methodName", [arg1, arg2]);
await agent.call("streamMethod", [arg], {
stream: { onChunk, onDone, onError },
});
// With timeout (rejects if call does not complete in time)
await agent.call("slowMethod", [], { timeout: 5000 });
// Using stub
await agent.stub.methodName(arg1, arg2);
CallOptions 类型
TypeScript
type CallOptions = {
/** Timeout in milliseconds. Rejects if call does not complete in time. */
timeout?: number;
/** Streaming options */
stream?: {
onChunk?: (chunk: unknown) => void;
onDone?: (finalChunk: unknown) => void;
onError?: (error: string) => void;
};
};
注意
旧格式 { onChunk, onDone, onError }(不嵌套在 stream 下)仍然受支持。客户端会自动检测你使用的是哪种格式。
getCallableMethods() 方法
返回 agent 上所有可调用方法及其元数据的 map。便于内省和自动生成文档。
JavaScript
const methods = agent.getCallableMethods();
// Map<string, CallableMetadata>
for (const [name, meta] of methods) {
console.log(`${name}: ${meta.description || "(no description)"}`);
if (meta.streaming) console.log(" (streaming)");
}
TypeScript
const methods = agent.getCallableMethods();
// Map<string, CallableMetadata>
for (const [name, meta] of methods) {
console.log(`${name}: ${meta.description || "(no description)"}`);
if (meta.streaming) console.log(" (streaming)");
}
故障排查
SyntaxError: Invalid or unexpected token
如果在使用 @callable() 时,你的 dev 服务器报 SyntaxError: Invalid or unexpected token,需要做两件事:
1. 添加 agents/vite 插件 — Vite 8 使用 Oxc 进行转译,Oxc 目前还不支持 TC39 装饰器。这个插件加上了所需的转换:
vite.config.ts
import agents from "agents/vite";
export default defineConfig({
plugins: [agents(), react(), cloudflare()],
});
2. 继承 agents/tsconfig — 这会设置 "target": "ES2021" 以及所有其他推荐的编译器选项:
tsconfig.json
{
"extends": "agents/tsconfig"
}
如果你无法继承共享配置,在 tsconfig.json 里手动设置 "target": "ES2021"。
警告
不要在 tsconfig.json 中设置 "experimentalDecorators": true。Agents SDK 使用的是 TC39 标准装饰器 ↗,不是 TypeScript 的旧版装饰器。启用 experimentalDecorators 会应用一个不兼容的转换,在运行时悄悄破坏 @callable()。
下一步
Agents API Agents SDK 的完整 API 参考。
WebSockets 与客户端的实时双向通信。
状态管理 在 agent 与客户端之间同步状态。