聊天 Agent
使用 AIChatAgent 与 useAgentChat 构建由 AI 驱动的聊天界面。消息会自动持久化到 SQLite,断开后流会自动续传,工具调用在服务端和客户端都可工作。
概览
@cloudflare/ai-chat 包提供两个主要的导出:
| 导出 | 引入路径 | 用途 |
|---|---|---|
| AIChatAgent | @cloudflare/ai-chat | 服务端 Agent 类,带消息持久化和流式输出 |
| useAgentChat | @cloudflare/ai-chat/react | 用于构建聊天 UI 的 React hook |
构建在 AI SDK ↗ 和 Cloudflare Durable Objects 之上,你可以获得:
- 自动消息持久化 —— 对话存储在 SQLite,可在重启后保留
- 可恢复的流式传输 —— 客户端断开后可在流的中间继续接收,不丢失数据
- 实时同步 —— 消息通过 WebSocket 广播给所有已连接的客户端
- 工具支持 —— 服务端工具、客户端工具,以及 human-in-the-loop 审批模式
- Data parts —— 在消息中除了文本外还可附加类型化的 JSON(引用、进度、用量)
- 行大小保护 —— 当消息接近 SQLite 限制时自动压缩
快速开始
安装
Terminal window
npm install @cloudflare/ai-chat agents ai
服务端
JavaScript
import { AIChatAgent } from "@cloudflare/ai-chat";
import { createWorkersAI } from "workers-ai-provider";
import { streamText, convertToModelMessages } from "ai";
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
// Use any provider such as workers-ai-provider, openai, anthropic, google, etc.
const workersai = createWorkersAI({ binding: this.env.AI });
const result = streamText({
model: workersai("@cf/zai-org/glm-4.7-flash"),
messages: await convertToModelMessages(this.messages),
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
TypeScript
import { AIChatAgent } from "@cloudflare/ai-chat";
import { createWorkersAI } from "workers-ai-provider";
import { streamText, convertToModelMessages } from "ai";
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
// Use any provider such as workers-ai-provider, openai, anthropic, google, etc.
const workersai = createWorkersAI({ binding: this.env.AI });
const result = streamText({
model: workersai("@cf/zai-org/glm-4.7-flash"),
messages: await convertToModelMessages(this.messages),
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
客户端
JavaScript
import { useAgent } from "agents/react";
import { useAgentChat } from "@cloudflare/ai-chat/react";
function Chat() {
const agent = useAgent({ agent: "ChatAgent" });
const { messages, sendMessage, status } = useAgentChat({ agent });
return (
<div>
{messages.map((msg) => (
<div key={msg.id}>
<strong>{msg.role}:</strong>
{msg.parts.map((part, i) =>
part.type === "text" ? <span key={i}>{part.text}</span> : null,
)}
</div>
))}
<form
onSubmit={(e) => {
e.preventDefault();
const input = e.currentTarget.elements.namedItem("input");
sendMessage({ text: input.value });
input.value = "";
}}
>
<input name="input" placeholder="Type a message..." />
<button type="submit" disabled={status === "streaming"}>
Send
</button>
</form>
</div>
);
}
Explain Code
TypeScript
import { useAgent } from "agents/react";
import { useAgentChat } from "@cloudflare/ai-chat/react";
function Chat() {
const agent = useAgent({ agent: "ChatAgent" });
const { messages, sendMessage, status } = useAgentChat({ agent });
return (
<div>
{messages.map((msg) => (
<div key={msg.id}>
<strong>{msg.role}:</strong>
{msg.parts.map((part, i) =>
part.type === "text" ? <span key={i}>{part.text}</span> : null,
)}
</div>
))}
<form
onSubmit={(e) => {
e.preventDefault();
const input = e.currentTarget.elements.namedItem(
"input",
) as HTMLInputElement;
sendMessage({ text: input.value });
input.value = "";
}}
>
<input name="input" placeholder="Type a message..." />
<button type="submit" disabled={status === "streaming"}>
Send
</button>
</form>
</div>
);
}
Explain Code
Wrangler 配置
JSONC
// wrangler.jsonc
{
"ai": { "binding": "AI" },
"durable_objects": {
"bindings": [{ "name": "ChatAgent", "class_name": "ChatAgent" }],
},
"migrations": [{ "tag": "v1", "new_sqlite_classes": ["ChatAgent"] }],
}
new_sqlite_classes migration 是必需的 —— AIChatAgent 使用 SQLite 来持久化消息和缓存流式 chunk。
工作原理
sequenceDiagram participant Client as Client (useAgentChat) participant Agent as AIChatAgent participant DB as SQLite
Client->>Agent: CF_AGENT_USE_CHAT_REQUEST (WebSocket)
Agent->>DB: Persist messages
Agent->>Agent: onChatMessage()
loop Streaming response
Agent-->>Client: CF_AGENT_USE_CHAT_RESPONSE (chunks)
Agent->>DB: Buffer chunks
end
Agent->>DB: Persist final message
Agent-->>Client: CF_AGENT_CHAT_MESSAGES (broadcast to all clients)
- 客户端通过 WebSocket 发送一条消息
AIChatAgent把消息持久化到 SQLite,并调用你的onChatMessage方法- 你的方法返回一个流式
Response(通常来自streamText) - Chunk 通过 WebSocket 实时回流
- 流结束后,最终消息被持久化,并广播给所有连接
服务端 API
AIChatAgent
继承自 agents 包中的 Agent。负责管理对话状态、持久化和流式输出。
JavaScript
import { AIChatAgent } from "@cloudflare/ai-chat";
export class ChatAgent extends AIChatAgent {
// Access current messages
// this.messages: UIMessage[]
// Limit stored messages (optional)
maxPersistedMessages = 200;
async onChatMessage(onFinish, options) {
// onFinish: optional callback for streamText (cleanup is automatic)
// options.abortSignal: cancel signal
// options.body: custom data from client
// Return a Response (streaming or plain text)
}
}
Explain Code
TypeScript
import { AIChatAgent } from "@cloudflare/ai-chat";
export class ChatAgent extends AIChatAgent {
// Access current messages
// this.messages: UIMessage[]
// Limit stored messages (optional)
maxPersistedMessages = 200;
async onChatMessage(onFinish?, options?) {
// onFinish: optional callback for streamText (cleanup is automatic)
// options.abortSignal: cancel signal
// options.body: custom data from client
// Return a Response (streaming or plain text)
}
}
Explain Code
onChatMessage
这是你需要重写的主要方法。它接收对话上下文,并需要返回一个 Response。
流式响应(最常见):
JavaScript
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const workersai = createWorkersAI({ binding: this.env.AI });
const result = streamText({
model: workersai("@cf/zai-org/glm-4.7-flash"),
system: "You are a helpful assistant.",
messages: await convertToModelMessages(this.messages),
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
TypeScript
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const workersai = createWorkersAI({ binding: this.env.AI });
const result = streamText({
model: workersai("@cf/zai-org/glm-4.7-flash"),
system: "You are a helpful assistant.",
messages: await convertToModelMessages(this.messages),
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
纯文本响应:
TypeScript
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
return new Response("Hello! I am a simple agent.", {
headers: { "Content-Type": "text/plain" },
});
}
}
访问自定义 body 数据和 request ID:
TypeScript
export class ChatAgent extends AIChatAgent {
async onChatMessage(_onFinish, options) {
const { timezone, userId } = options?.body ?? {};
// Use these values in your LLM call or business logic
// options.requestId — unique identifier for this chat request,
// useful for logging and correlating events
console.log("Request ID:", options?.requestId);
}
}
Explain Code
this.messages
当前对话历史,从 SQLite 中加载。它是一个由 AI SDK 的 UIMessage 对象组成的数组。每次交互之后,消息都会被自动持久化。
maxPersistedMessages
限制 SQLite 中存储的消息数量。一旦超过这个限制,最旧的消息就会被删除。这只控制存储 —— 不影响发送给 LLM 的内容。
JavaScript
export class ChatAgent extends AIChatAgent {
maxPersistedMessages = 200;
}
TypeScript
export class ChatAgent extends AIChatAgent {
maxPersistedMessages = 200;
}
要控制发送给模型的内容,可以使用 AI SDK 的 pruneMessages():
JavaScript
import { streamText, convertToModelMessages, pruneMessages } from "ai";
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const workersai = createWorkersAI({ binding: this.env.AI });
const result = streamText({
model: workersai("@cf/zai-org/glm-4.7-flash"),
messages: pruneMessages({
messages: await convertToModelMessages(this.messages),
reasoning: "before-last-message",
toolCalls: "before-last-2-messages",
}),
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
TypeScript
import { streamText, convertToModelMessages, pruneMessages } from "ai";
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const workersai = createWorkersAI({ binding: this.env.AI });
const result = streamText({
model: workersai("@cf/zai-org/glm-4.7-flash"),
messages: pruneMessages({
messages: await convertToModelMessages(this.messages),
reasoning: "before-last-message",
toolCalls: "before-last-2-messages",
}),
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
waitForMcpConnections
控制 AIChatAgent 在调用 onChatMessage 之前,是否等待 MCP 服务器连接稳定下来。这能确保 this.mcp.getAITools() 返回完整的工具集合 —— 在 Durable Object 从休眠唤醒、连接还在后台恢复时尤其重要。
| 取值 | 行为 |
|---|---|
| { timeout: 10_000 } | 最多等待 10 秒(默认) |
| { timeout: N } | 最多等待 N 毫秒 |
| true | 一直等待,直到所有连接都准备好 |
| false | 不等待(0.2.0 之前的旧行为) |
JavaScript
export class ChatAgent extends AIChatAgent {
// Default — waits up to 10 seconds
// waitForMcpConnections = { timeout: 10_000 };
// Wait forever
waitForMcpConnections = true;
// Disable waiting
waitForMcpConnections = false;
}
Explain Code
TypeScript
export class ChatAgent extends AIChatAgent {
// Default — waits up to 10 seconds
// waitForMcpConnections = { timeout: 10_000 };
// Wait forever
waitForMcpConnections = true;
// Disable waiting
waitForMcpConnections = false;
}
Explain Code
如果想要更细粒度的控制,可以直接在 onChatMessage 内部调用 this.mcp.waitForConnections()。
messageConcurrency
控制当前已经有一个聊天回合在进行或排队时,新到的用户提交该如何处理。
JavaScript
export class ChatAgent extends AIChatAgent {
messageConcurrency = "queue";
}
TypeScript
export class ChatAgent extends AIChatAgent {
messageConcurrency = "queue";
}
| 策略 | 行为 |
|---|---|
| “queue”(默认) | 把每次提交都排队,按顺序处理 |
| “latest” | 只保留最新的一次重叠提交;被覆盖掉的提交其用户消息仍会被持久化,但不会启动模型回合 |
| “merge” | 把重叠提交排队,然后在最新一个排队回合开始之前,把这些提交末尾的用户消息合并为一次回合 |
| “drop” | 完全忽略重叠的提交 |
| { strategy: “debounce”, debounceMs?: number } | 尾沿最新值,带一个静默窗口(默认 750ms) |
这个设置只对 sendMessage() 提交生效。重新生成、工具续传、批准、清空和程序化的 saveMessages() 调用仍保持原有的串行行为。
persistMessages 与 saveMessages
persistMessages 把消息存到 SQLite 并把更新广播给所有已连接的客户端,但 不会 触发模型回合。当你想往对话里注入消息但又不想启动新的响应时,使用它。
saveMessages 既会持久化消息 也会 触发 onChatMessage() 生成新的响应。它会等到当前正在运行的聊天回合结束后再启动,所以定时或程序化的消息绝不会与正在进行的流叠加。
JavaScript
// Store messages without triggering a response
await this.persistMessages(messages);
// Store messages AND trigger onChatMessage
const { requestId, status } = await this.saveMessages(messages);
TypeScript
// Store messages without triggering a response
await this.persistMessages(messages);
// Store messages AND trigger onChatMessage
const { requestId, status } = await this.saveMessages(messages);
saveMessages 接收一个消息数组,或者一个根据当前最新 this.messages 派生出下一份消息列表的函数。当多次调用排队时,使用函数形式可以避免基线过期:
JavaScript
await this.saveMessages((messages) => [
...messages,
{
id: crypto.randomUUID(),
role: "user",
parts: [{ type: "text", text: "Summarize the latest data" }],
createdAt: new Date(),
},
]);
TypeScript
await this.saveMessages((messages) => [
...messages,
{
id: crypto.randomUUID(),
role: "user",
parts: [{ type: "text", text: "Summarize the latest data" }],
createdAt: new Date(),
},
]);
saveMessages 返回 { requestId, status },其中 status 在回合实际运行时为 "completed",在还没开始就被聊天清空时为 "skipped"。
onChatResponse
在一次聊天回合完成、且 assistant 消息已经持久化之后被调用。这个钩子运行前,回合锁已经释放,所以可以放心地在内部调用 saveMessages。它会在所有回合完成路径上触发:WebSocket 聊天请求、saveMessages 以及自动续接。
JavaScript
export class ChatAgent extends AIChatAgent {
async onChatResponse(result) {
if (result.status === "completed") {
console.log("Turn completed:", result.requestId);
}
if (result.status === "error") {
console.error("Turn failed:", result.error);
}
}
}
Explain Code
TypeScript
import type { ChatResponseResult } from "@cloudflare/ai-chat";
export class ChatAgent extends AIChatAgent {
protected async onChatResponse(result: ChatResponseResult) {
if (result.status === "completed") {
console.log("Turn completed:", result.requestId);
}
if (result.status === "error") {
console.error("Turn failed:", result.error);
}
}
}
Explain Code
ChatResponseResult 包含:
| 字段 | 类型 | 描述 | |
|---|---|---|---|
| message | UIMessage | 本次回合最终生成的 assistant 消息 | |
| requestId | string | 与本次回合关联的 request ID | |
| continuation | boolean | 本次回合是否是上一次 assistant 回合的续接 | |
| status | “completed” | “error” | “aborted” | 回合的结束方式 |
| error | string | undefined | 当 status 为 “error” 时的错误信息 |
注意
在 onChatResponse 内部触发的响应(例如通过 saveMessages)不会再递归触发 onChatResponse。
sanitizeMessageForPersistence
重写这个方法可以在消息被持久化到存储之前,对它们做自定义的转换。这个钩子会在内置的清理逻辑(剥离 OpenAI metadata、截断 Anthropic 由 provider 执行的工具 payload、过滤空的 reasoning 部分)之后 运行。
JavaScript
export class ChatAgent extends AIChatAgent {
sanitizeMessageForPersistence(message) {
return {
...message,
parts: message.parts.map((part) => {
if (
"output" in part &&
typeof part.output === "string" &&
part.output.length > 1000
) {
return { ...part, output: "[redacted]" };
}
return part;
}),
};
}
}
Explain Code
TypeScript
export class ChatAgent extends AIChatAgent {
protected sanitizeMessageForPersistence(message: UIMessage): UIMessage {
return {
...message,
parts: message.parts.map((part) => {
if (
"output" in part &&
typeof part.output === "string" &&
part.output.length > 1000
) {
return { ...part, output: "[redacted]" };
}
return part;
}),
};
}
}
Explain Code
回合生命周期辅助方法
下列方法可以帮助你协调程序化回合,并等待挂起的交互完成。
hasPendingInteraction()
当一条 assistant 消息正在等待客户端工具结果或审批时,返回 true。
JavaScript
if (this.hasPendingInteraction()) {
console.log("Waiting for user to approve or provide tool output");
}
TypeScript
if (this.hasPendingInteraction()) {
console.log("Waiting for user to approve or provide tool output");
}
waitUntilStable()
等待对话彻底进入稳定状态 —— 没有正在进行的流、没有挂起的客户端工具交互、也没有排队的续接回合。在变得稳定时返回 true,如果在挂起的交互完成之前超时则返回 false。
JavaScript
const stable = await this.waitUntilStable({ timeout: 30_000 });
if (stable) {
console.log("All turns complete, safe to proceed");
}
TypeScript
const stable = await this.waitUntilStable({ timeout: 30_000 });
if (stable) {
console.log("All turns complete, safe to proceed");
}
这在配合 saveMessages 实现服务端驱动的流程时尤其有用:
JavaScript
await this.saveMessages((messages) => [...messages, syntheticUserMessage]);
await this.waitUntilStable({ timeout: 60_000 });
// The assistant has finished responding
TypeScript
await this.saveMessages((messages) => [...messages, syntheticUserMessage]);
await this.waitUntilStable({ timeout: 60_000 });
// The assistant has finished responding
resetTurnState()
中止当前正在进行的回合,并使排队的续接失效。内置的 CF_AGENT_CHAT_CLEAR 处理器会自动调用它,如果需要,你也可以手动调用。
生命周期钩子
重写 onConnect 和 onClose 可以添加自定义逻辑。流恢复和消息同步会自动处理:
JavaScript
export class ChatAgent extends AIChatAgent {
async onConnect(connection, ctx) {
// Your custom logic (e.g., logging, auth checks)
console.log("Client connected:", connection.id);
// Stream resumption and message sync are handled automatically
}
async onClose(connection, code, reason, wasClean) {
console.log("Client disconnected:", connection.id);
// Connection cleanup is handled automatically
}
}
Explain Code
TypeScript
export class ChatAgent extends AIChatAgent {
async onConnect(connection, ctx) {
// Your custom logic (e.g., logging, auth checks)
console.log("Client connected:", connection.id);
// Stream resumption and message sync are handled automatically
}
async onClose(connection, code, reason, wasClean) {
console.log("Client disconnected:", connection.id);
// Connection cleanup is handled automatically
}
}
Explain Code
destroy() 方法会取消所有挂起的聊天请求并清理流状态。当 Durable Object 被回收时它会被自动调用,你也可以根据需要手动调用。
请求取消
当用户在聊天 UI 中点击 “stop”,客户端会发送一条 CF_AGENT_CHAT_REQUEST_CANCEL 消息。服务端会把它传播到 options 中的 abortSignal:
JavaScript
export class ChatAgent extends AIChatAgent {
async onChatMessage(_onFinish, options) {
const result = streamText({
model: workersai("@cf/zai-org/glm-4.7-flash"),
messages: await convertToModelMessages(this.messages),
abortSignal: options?.abortSignal, // Pass through for cancellation
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
TypeScript
export class ChatAgent extends AIChatAgent {
async onChatMessage(_onFinish, options) {
const result = streamText({
model: workersai("@cf/zai-org/glm-4.7-flash"),
messages: await convertToModelMessages(this.messages),
abortSignal: options?.abortSignal, // Pass through for cancellation
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
警告
如果你不把 abortSignal 传给 streamText,即便用户已经取消,LLM 的调用仍然会在后台继续运行。请尽量始终把它传过去。
流恢复
当 Durable Object 在流式过程中被驱逐(代码更新、空闲超时、资源限制)时,与 LLM 之间的连接会被永久切断,内存中的流式状态也会丢失。chatRecovery 会把每次聊天回合都包在一个 runFiber() 内,提供流式期间的自动 keepAlive,以及在重启时的恢复钩子。
JavaScript
export class ChatAgent extends AIChatAgent {
chatRecovery = true;
}
TypeScript
export class ChatAgent extends AIChatAgent {
override chatRecovery = true;
}
启用之后,每次 onChatMessage 调用都会运行在一个 fiber 内。如果 Agent 在流式过程中被驱逐,fiber 行会保留在 SQLite 中。在下次激活时,框架会检测到被中断的 fiber,从缓存的流 chunk 重建出部分响应,并调用 onChatRecovery。
onChatRecovery
重写它可以实现按 provider 定制的恢复策略。默认行为会持久化部分响应,并通过 continueLastTurn() 安排续接。
JavaScript
export class ChatAgent extends AIChatAgent {
chatRecovery = true;
async onChatRecovery(ctx) {
console.log(`Recovered ${ctx.partialText.length} chars of partial text`);
// Default: persist partial + schedule continuation
return {};
}
}
Explain Code
TypeScript
import type { ChatRecoveryContext, ChatRecoveryOptions } from "@cloudflare/ai-chat";
export class ChatAgent extends AIChatAgent {
override chatRecovery = true;
override async onChatRecovery(
ctx: ChatRecoveryContext,
): Promise<ChatRecoveryOptions> {
console.log(`Recovered ${ctx.partialText.length} chars of partial text`);
// Default: persist partial + schedule continuation
return {};
}
}
Explain Code
ChatRecoveryContext:
| 字段 | 类型 | 描述 |
|---|---|---|
| streamId | string | 被中断流的 ID |
| requestId | string | 原始聊天请求的 ID |
| partialText | string | 驱逐前已经生成的文本 |
| partialParts | MessagePart[] | 驱逐前已经生成的消息部分(text、reasoning、tool calls) |
| recoveryData | unknown | null | 来自 this.stash() 的数据 —— 完全由用户控制 |
| messages | ChatMessage[] | 完整的对话历史 |
| lastBody | Record<string, unknown> | undefined | 原始请求的 body |
| lastClientTools | ClientToolSchema[] | undefined | 原始请求中携带的客户端工具 schema |
ChatRecoveryOptions:
| 字段 | 默认值 | 描述 |
|---|---|---|
| persist | true | 把部分响应保存为一条 assistant 消息 |
| continue | true | 通过 continueLastTurn() 安排续接 |
常见的返回值:
{}—— 持久化部分响应 + 自动续接(默认,适用于支持 assistant 预填充的 provider){ continue: false }—— 持久化部分响应但不自动续接(自己处理续接逻辑){ persist: false, continue: false }—— 一切自己处理(例如从 provider 获取已经完成的响应)
continueLastTurn
通过用保存好的请求 body 重新调用 onChatMessage,把内容追加到上一条 assistant 消息上。响应会作为续接流式输出 —— 追加到现有的 assistant 消息上,而不是新建一条。不会创建任何合成的 user 消息。
TypeScript
protected continueLastTurn(body?: Record<string, unknown>): Promise<SaveMessagesResult>;
默认的恢复路径会自动调用它。也可以在定时回调或其他入口点中手动调用。可选的 body 参数会与保存的 _lastBody 合并。
暂存恢复数据
在 onChatMessage 内部使用 this.stash() 持久化与 provider 相关的数据,以备恢复使用。stash 会被存入 fiber 的 SQLite 行,与 agent state 分开,在 onChatRecovery 中作为 ctx.recoveryData 提供。
JavaScript
export class ChatAgent extends AIChatAgent {
chatRecovery = true;
async onChatMessage(_onFinish, options) {
const result = streamText({
model: openai("gpt-5.4"),
messages: await convertToModelMessages(this.messages),
providerOptions: { openai: { store: true } },
includeRawChunks: true,
onChunk: ({ chunk }) => {
if (chunk.type === "raw") {
const raw = chunk.rawValue;
if (raw?.type === "response.created" && raw.response?.id) {
this.stash({ responseId: raw.response.id });
}
}
},
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
TypeScript
export class ChatAgent extends AIChatAgent {
override chatRecovery = true;
async onChatMessage(_onFinish, options) {
const result = streamText({
model: openai("gpt-5.4"),
messages: await convertToModelMessages(this.messages),
providerOptions: { openai: { store: true } },
includeRawChunks: true,
onChunk: ({ chunk }) => {
if (chunk.type === "raw") {
const raw = chunk.rawValue as {
type?: string;
response?: { id?: string };
};
if (raw?.type === "response.created" && raw.response?.id) {
this.stash({ responseId: raw.response.id });
}
}
},
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
各 provider 的恢复策略
合适的策略取决于 provider 是否支持 assistant 预填充,以及响应在断开后是否会在服务端继续:
| Provider | 策略 | Token 成本 |
|---|---|---|
| Workers AI | continueLastTurn() —— 模型通过 assistant 预填充继续 | 低 |
| OpenAI (Responses API) | 通过 ID 取回已完成的响应 —— 零浪费 token | 零 |
| Anthropic | 持久化部分响应,发送一条合成 user 消息以续接 | 中 |
要了解 chat recovery 在更宏观的长时运行 agent 中的位置,参见 Long-running agents: Recovering interrupted LLM streams。底层的 fiber API 参见 Durable Execution。
客户端 API
useAgentChat
通过 WebSocket 连接到 AIChatAgent 的 React hook。它把 AI SDK 的 useChat 封装在一个原生 WebSocket transport 之上。
JavaScript
import { useAgent } from "agents/react";
import { useAgentChat } from "@cloudflare/ai-chat/react";
function Chat() {
const agent = useAgent({ agent: "ChatAgent" });
const {
messages,
sendMessage,
clearHistory,
addToolOutput,
addToolApprovalResponse,
setMessages,
status,
} = useAgentChat({ agent });
// ...
}
Explain Code
TypeScript
import { useAgent } from "agents/react";
import { useAgentChat } from "@cloudflare/ai-chat/react";
function Chat() {
const agent = useAgent({ agent: "ChatAgent" });
const {
messages,
sendMessage,
clearHistory,
addToolOutput,
addToolApprovalResponse,
setMessages,
status,
} = useAgentChat({ agent });
// ...
}
Explain Code
选项
| 选项 | 类型 | 默认值 | 描述 |
|---|---|---|---|
| agent | ReturnType<typeof useAgent> | 必填 | 来自 useAgent 的 agent 连接 |
| onToolCall | ({ toolCall, addToolOutput }) => void | — | 处理客户端工具的执行 |
| autoContinueAfterToolResult | boolean | true | 在客户端工具结果和审批之后自动继续对话 |
| resume | boolean | true | 在重连时启用自动流恢复 |
| body | object | () => object | — | 每次请求都会附带的自定义数据 |
| prepareSendMessagesRequest | (options) => { body?, headers? } | — | 高级的逐请求自定义 |
| getInitialMessages | (options) => Promise<UIMessage[]> or null | — | 自定义初始消息加载器。设为 null 可完全跳过 HTTP 请求(在你直接提供消息时有用) |
返回值
| 属性 | 类型 | 描述 |
|---|---|---|
| messages | UIMessage[] | 当前对话消息 |
| sendMessage | (message) => void | 发送一条消息 |
| clearHistory | () => void | 清空对话(客户端和服务端) |
| addToolOutput | ({ toolCallId, output }) => void | 为客户端工具提供输出 |
| addToolApprovalResponse | ({ id, approved }) => void | 批准或拒绝需要审批的工具 |
| setMessages | (messages | updater) => void | 直接设置消息(同步到服务端) |
| status | string | “idle”、“submitted”、“streaming” 或 “error” |
工具
AIChatAgent 支持三种工具模式,全部使用 AI SDK 的 tool() 函数:
| 模式 | 运行位置 | 适用场景 |
|---|---|---|
| 服务端 | 服务器(自动) | API 调用、数据库查询、计算 |
| 客户端 | 浏览器(通过 onToolCall) | 地理位置、剪贴板、摄像头、本地存储 |
| 审批 | 服务端(在用户批准之后) | 支付、删除、外部操作 |
服务端工具
带有 execute 函数的工具会自动在服务端运行:
JavaScript
import { streamText, convertToModelMessages, tool, stepCountIs } from "ai";
import { z } from "zod";
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const workersai = createWorkersAI({ binding: this.env.AI });
const result = streamText({
model: workersai("@cf/zai-org/glm-4.7-flash"),
messages: await convertToModelMessages(this.messages),
tools: {
getWeather: tool({
description: "Get weather for a city",
inputSchema: z.object({ city: z.string() }),
execute: async ({ city }) => {
const data = await fetchWeather(city);
return { temperature: data.temp, condition: data.condition };
},
}),
},
stopWhen: stepCountIs(5),
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
TypeScript
import { streamText, convertToModelMessages, tool, stepCountIs } from "ai";
import { z } from "zod";
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const workersai = createWorkersAI({ binding: this.env.AI });
const result = streamText({
model: workersai("@cf/zai-org/glm-4.7-flash"),
messages: await convertToModelMessages(this.messages),
tools: {
getWeather: tool({
description: "Get weather for a city",
inputSchema: z.object({ city: z.string() }),
execute: async ({ city }) => {
const data = await fetchWeather(city);
return { temperature: data.temp, condition: data.condition };
},
}),
},
stopWhen: stepCountIs(5),
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
客户端工具
在服务端定义一个不带 execute 的工具,然后在客户端用 onToolCall 处理。这适用于需要浏览器 API 的工具。
服务端:
JavaScript
tools: {
getLocation: tool({
description: "Get the user's location from the browser",
inputSchema: z.object({}),
// No execute — the client handles it
});
}
TypeScript
tools: {
getLocation: tool({
description: "Get the user's location from the browser",
inputSchema: z.object({}),
// No execute — the client handles it
});
}
客户端:
JavaScript
const { messages, sendMessage } = useAgentChat({
agent,
onToolCall: async ({ toolCall, addToolOutput }) => {
if (toolCall.toolName === "getLocation") {
const pos = await new Promise((resolve, reject) =>
navigator.geolocation.getCurrentPosition(resolve, reject),
);
addToolOutput({
toolCallId: toolCall.toolCallId,
output: { lat: pos.coords.latitude, lng: pos.coords.longitude },
});
}
},
});
Explain Code
TypeScript
const { messages, sendMessage } = useAgentChat({
agent,
onToolCall: async ({ toolCall, addToolOutput }) => {
if (toolCall.toolName === "getLocation") {
const pos = await new Promise((resolve, reject) =>
navigator.geolocation.getCurrentPosition(resolve, reject),
);
addToolOutput({
toolCallId: toolCall.toolCallId,
output: { lat: pos.coords.latitude, lng: pos.coords.longitude },
});
}
},
});
Explain Code
当 LLM 调用 getLocation 时,流会暂停。onToolCall 回调被触发,你的代码提供输出,然后对话继续。
工具审批(human-in-the-loop)
对于在执行前需要用户确认的工具,使用 needsApproval。
服务端:
JavaScript
tools: {
processPayment: tool({
description: "Process a payment",
inputSchema: z.object({
amount: z.number(),
recipient: z.string(),
}),
needsApproval: async ({ amount }) => amount > 100,
execute: async ({ amount, recipient }) => charge(amount, recipient),
});
}
Explain Code
TypeScript
tools: {
processPayment: tool({
description: "Process a payment",
inputSchema: z.object({
amount: z.number(),
recipient: z.string(),
}),
needsApproval: async ({ amount }) => amount > 100,
execute: async ({ amount, recipient }) => charge(amount, recipient),
});
}
Explain Code
客户端:
JavaScript
const { messages, addToolApprovalResponse } = useAgentChat({ agent });
// Render pending approvals from message parts
{
messages.map((msg) =>
msg.parts
.filter(
(part) => part.type === "tool" && part.state === "approval-required",
)
.map((part) => (
<div key={part.toolCallId}>
<p>Approve {part.toolName}?</p>
<button
onClick={() =>
addToolApprovalResponse({
id: part.toolCallId,
approved: true,
})
}
>
Approve
</button>
<button
onClick={() =>
addToolApprovalResponse({
id: part.toolCallId,
approved: false,
})
}
>
Reject
</button>
</div>
)),
);
}
Explain Code
TypeScript
const { messages, addToolApprovalResponse } = useAgentChat({ agent });
// Render pending approvals from message parts
{
messages.map((msg) =>
msg.parts
.filter(
(part) => part.type === "tool" && part.state === "approval-required",
)
.map((part) => (
<div key={part.toolCallId}>
<p>Approve {part.toolName}?</p>
<button
onClick={() =>
addToolApprovalResponse({
id: part.toolCallId,
approved: true,
})
}
>
Approve
</button>
<button
onClick={() =>
addToolApprovalResponse({
id: part.toolCallId,
approved: false,
})
}
>
Reject
</button>
</div>
)),
);
}
Explain Code
用 addToolOutput 自定义拒绝消息
当用户拒绝一个工具时,addToolApprovalResponse({ id, approved: false }) 会把工具状态设置为 output-denied,带一条通用的消息。如果想让 LLM 知道拒绝的更具体原因,可以改用带 state: "output-error" 的 addToolOutput:
JavaScript
const { addToolOutput } = useAgentChat({ agent });
// Reject with a custom error message
addToolOutput({
toolCallId: part.toolCallId,
state: "output-error",
errorText: "User declined: insufficient budget for this quarter",
});
TypeScript
const { addToolOutput } = useAgentChat({ agent });
// Reject with a custom error message
addToolOutput({
toolCallId: part.toolCallId,
state: "output-error",
errorText: "User declined: insufficient budget for this quarter",
});
这会向 LLM 发送一条带你自定义错误文本的 tool_result,让它能够做出合理的回应(例如,建议替代方案或追问澄清问题)。
addToolApprovalResponse(且 approved: false)在 autoContinueAfterToolResult 启用时(默认值)会自动继续对话。带 state: "output-error" 的 addToolOutput 不会 自动续接 —— 如果你想让 LLM 对错误做出回应,请在之后调用 sendMessage()。
更多模式参见 Human-in-the-loop。
自定义请求数据
通过 body 选项,在每次聊天请求中包含自定义数据:
JavaScript
const { messages, sendMessage } = useAgentChat({
agent,
body: {
timezone: Intl.DateTimeFormat().resolvedOptions().timeZone,
userId: currentUser.id,
},
});
TypeScript
const { messages, sendMessage } = useAgentChat({
agent,
body: {
timezone: Intl.DateTimeFormat().resolvedOptions().timeZone,
userId: currentUser.id,
},
});
对于动态值,使用一个函数:
JavaScript
body: () => ({
token: getAuthToken(),
timestamp: Date.now(),
});
TypeScript
body: () => ({
token: getAuthToken(),
timestamp: Date.now(),
});
在服务端访问这些字段:
JavaScript
export class ChatAgent extends AIChatAgent {
async onChatMessage(_onFinish, options) {
const { timezone, userId } = options?.body ?? {};
// ...
}
}
TypeScript
export class ChatAgent extends AIChatAgent {
async onChatMessage(_onFinish, options) {
const { timezone, userId } = options?.body ?? {};
// ...
}
}
如果需要更高级的逐请求自定义(自定义 header、不同请求使用不同 body),使用 prepareSendMessagesRequest:
JavaScript
const { messages, sendMessage } = useAgentChat({
agent,
prepareSendMessagesRequest: async ({ messages, trigger }) => ({
headers: { Authorization: `Bearer ${await getToken()}` },
body: { requestedAt: Date.now() },
}),
});
TypeScript
const { messages, sendMessage } = useAgentChat({
agent,
prepareSendMessagesRequest: async ({ messages, trigger }) => ({
headers: { Authorization: `Bearer ${await getToken()}` },
body: { requestedAt: Date.now() },
}),
});
Data parts
Data parts 让你可以在消息中除了文本外附加类型化的 JSON —— 进度指示器、来源引用、token 用量,或者任何 UI 需要的结构化数据。
写入 data parts(服务端)
使用 createUIMessageStream 配合 writer.write() 从服务端发送 data parts:
JavaScript
import {
streamText,
convertToModelMessages,
createUIMessageStream,
createUIMessageStreamResponse,
} from "ai";
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const workersai = createWorkersAI({ binding: this.env.AI });
const stream = createUIMessageStream({
execute: async ({ writer }) => {
const result = streamText({
model: workersai("@cf/zai-org/glm-4.7-flash"),
messages: await convertToModelMessages(this.messages),
});
// Merge the LLM stream
writer.merge(result.toUIMessageStream());
// Write a data part — persisted to message.parts
writer.write({
type: "data-sources",
id: "src-1",
data: { query: "agents", status: "searching", results: [] },
});
// Later: update the same part in-place (same type + id)
writer.write({
type: "data-sources",
id: "src-1",
data: {
query: "agents",
status: "found",
results: ["Agents SDK docs", "Durable Objects guide"],
},
});
},
});
return createUIMessageStreamResponse({ stream });
}
}
Explain Code
TypeScript
import {
streamText,
convertToModelMessages,
createUIMessageStream,
createUIMessageStreamResponse,
} from "ai";
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const workersai = createWorkersAI({ binding: this.env.AI });
const stream = createUIMessageStream({
execute: async ({ writer }) => {
const result = streamText({
model: workersai("@cf/zai-org/glm-4.7-flash"),
messages: await convertToModelMessages(this.messages),
});
// Merge the LLM stream
writer.merge(result.toUIMessageStream());
// Write a data part — persisted to message.parts
writer.write({
type: "data-sources",
id: "src-1",
data: { query: "agents", status: "searching", results: [] },
});
// Later: update the same part in-place (same type + id)
writer.write({
type: "data-sources",
id: "src-1",
data: {
query: "agents",
status: "found",
results: ["Agents SDK docs", "Durable Objects guide"],
},
});
},
});
return createUIMessageStreamResponse({ stream });
}
}
Explain Code
三种模式
| 模式 | 方式 | 持久化? | 用例 |
|---|---|---|---|
| 就地更新 | 相同 type + id → 在原位置更新 | 是 | 渐进式状态(searching → found) |
| 追加 | 没有 id,或者使用不同的 id → 追加 | 是 | 日志条目、多条引用 |
| 瞬时 | transient: true → 不会进入 message.parts | 否 | 临时状态(thinking 指示器) |
瞬时(transient)part 会实时广播给已连接的客户端,但不会进入 SQLite 持久化,也不会出现在 message.parts 中。使用 onData 回调来消费它们。
读取 data parts(客户端)
非 transient 的 data parts 会出现在 message.parts 里。使用 UIMessage 泛型给它们加类型:
JavaScript
import { useAgentChat } from "@cloudflare/ai-chat/react";
const { messages } = useAgentChat({ agent });
// Typed access — no casts needed
for (const msg of messages) {
for (const part of msg.parts) {
if (part.type === "data-sources") {
console.log(part.data.results); // string[]
}
}
}
Explain Code
TypeScript
import { useAgentChat } from "@cloudflare/ai-chat/react";
import type { UIMessage } from "ai";
type ChatMessage = UIMessage<
unknown,
{
sources: { query: string; status: string; results: string[] };
usage: { model: string; inputTokens: number; outputTokens: number };
}
>;
const { messages } = useAgentChat<unknown, ChatMessage>({ agent });
// Typed access — no casts needed
for (const msg of messages) {
for (const part of msg.parts) {
if (part.type === "data-sources") {
console.log(part.data.results); // string[]
}
}
}
Explain Code
用 onData 处理瞬时 part
瞬时 data parts 不会出现在 message.parts 里。请改用 onData 回调:
JavaScript
const [thinking, setThinking] = useState(false);
const { messages } = useAgentChat({
agent,
onData(part) {
if (part.type === "data-thinking") {
setThinking(true);
}
},
});
Explain Code
TypeScript
const [thinking, setThinking] = useState(false);
const { messages } = useAgentChat<unknown, ChatMessage>({
agent,
onData(part) {
if (part.type === "data-thinking") {
setThinking(true);
}
},
});
Explain Code
在服务端,使用 transient: true 写入瞬时 part:
JavaScript
writer.write({
transient: true,
type: "data-thinking",
data: { model: "glm-4.7-flash", startedAt: new Date().toISOString() },
});
TypeScript
writer.write({
transient: true,
type: "data-thinking",
data: { model: "glm-4.7-flash", startedAt: new Date().toISOString() },
});
onData 在所有路径上都会触发 —— 新消息、流恢复以及跨标签页广播。
可恢复的流式传输
当客户端断开后再重新连接时,流会自动恢复。无需任何配置 —— 它开箱即用。
当流式传输处于活动状态时:
- 所有 chunk 在生成时都会缓存到 SQLite
- 如果客户端断开,服务器会继续生成并缓存
- 当客户端重新连接时,它会收到所有缓存的 chunk,并继续接收实时流
通过 resume: false 关闭:
JavaScript
const { messages } = useAgentChat({ agent, resume: false });
TypeScript
const { messages } = useAgentChat({ agent, resume: false });
存储管理
行大小保护
SQLite 的行最大尺寸为 2 MB。当一条消息接近这个限制(例如,某个工具返回了非常大的输出),AIChatAgent 会自动压缩这条消息:
- 工具输出压缩 —— 超大工具输出会被替换为一段对 LLM 友好的摘要,告知模型可以建议重新调用工具
- 文本截断 —— 如果在工具压缩之后消息仍然太大,文本部分会被截断并附上一段说明
被压缩过的消息会带有 metadata.compactedToolOutputs,这样客户端可以检测并优雅地展示。
控制 LLM 上下文与存储
存储(maxPersistedMessages)与 LLM 上下文是相互独立的:
| 关注点 | 控制方式 | 范围 |
|---|---|---|
| SQLite 存储多少条消息 | maxPersistedMessages | 持久化 |
| 模型实际看到什么 | pruneMessages() | LLM 上下文 |
| 行大小限制 | 自动压缩 | 单条消息 |
JavaScript
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const result = streamText({
model: workersai("@cf/zai-org/glm-4.7-flash"),
messages: pruneMessages({
// LLM context limit
messages: await convertToModelMessages(this.messages),
reasoning: "before-last-message",
toolCalls: "before-last-2-messages",
}),
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
TypeScript
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const result = streamText({
model: workersai("@cf/zai-org/glm-4.7-flash"),
messages: pruneMessages({
// LLM context limit
messages: await convertToModelMessages(this.messages),
reasoning: "before-last-message",
toolCalls: "before-last-2-messages",
}),
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
使用不同的 AI provider
AIChatAgent 支持任何与 AI SDK 兼容的 provider。服务端代码决定使用哪个模型 —— 客户端不需要手动改动。
Workers AI(Cloudflare)
JavaScript
import { createWorkersAI } from "workers-ai-provider";
const workersai = createWorkersAI({ binding: this.env.AI });
const result = streamText({
model: workersai("@cf/zai-org/glm-4.7-flash"),
messages: await convertToModelMessages(this.messages),
});
TypeScript
import { createWorkersAI } from "workers-ai-provider";
const workersai = createWorkersAI({ binding: this.env.AI });
const result = streamText({
model: workersai("@cf/zai-org/glm-4.7-flash"),
messages: await convertToModelMessages(this.messages),
});
OpenAI
JavaScript
import { createOpenAI } from "@ai-sdk/openai";
const openai = createOpenAI({ apiKey: this.env.OPENAI_API_KEY });
const result = streamText({
model: openai.chat("gpt-4o"),
messages: await convertToModelMessages(this.messages),
});
TypeScript
import { createOpenAI } from "@ai-sdk/openai";
const openai = createOpenAI({ apiKey: this.env.OPENAI_API_KEY });
const result = streamText({
model: openai.chat("gpt-4o"),
messages: await convertToModelMessages(this.messages),
});
Anthropic
JavaScript
import { createAnthropic } from "@ai-sdk/anthropic";
const anthropic = createAnthropic({ apiKey: this.env.ANTHROPIC_API_KEY });
const result = streamText({
model: anthropic("claude-sonnet-4-20250514"),
messages: await convertToModelMessages(this.messages),
});
TypeScript
import { createAnthropic } from "@ai-sdk/anthropic";
const anthropic = createAnthropic({ apiKey: this.env.ANTHROPIC_API_KEY });
const result = streamText({
model: anthropic("claude-sonnet-4-20250514"),
messages: await convertToModelMessages(this.messages),
});
高级模式
由于 onChatMessage 让你完全掌控 streamText 调用,你可以直接使用 AI SDK 的任何特性。下面这些模式都是开箱即用的 —— 不需要 AIChatAgent 任何特殊配置。
动态模型与工具控制
使用 prepareStep ↗,在多步 agent 循环的各步之间切换模型、可用工具或 system prompt:
JavaScript
import { streamText, convertToModelMessages, tool, stepCountIs } from "ai";
import { z } from "zod";
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const result = streamText({
model: cheapModel, // Default model for simple steps
messages: await convertToModelMessages(this.messages),
tools: {
search: searchTool,
analyze: analyzeTool,
summarize: summarizeTool,
},
stopWhen: stepCountIs(10),
prepareStep: async ({ stepNumber, messages }) => {
// Phase 1: Search (steps 0-2)
if (stepNumber <= 2) {
return {
activeTools: ["search"],
toolChoice: "required", // Force tool use
};
}
// Phase 2: Analyze with a stronger model (steps 3-5)
if (stepNumber <= 5) {
return {
model: expensiveModel,
activeTools: ["analyze"],
};
}
// Phase 3: Summarize
return { activeTools: ["summarize"] };
},
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
TypeScript
import { streamText, convertToModelMessages, tool, stepCountIs } from "ai";
import { z } from "zod";
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const result = streamText({
model: cheapModel, // Default model for simple steps
messages: await convertToModelMessages(this.messages),
tools: {
search: searchTool,
analyze: analyzeTool,
summarize: summarizeTool,
},
stopWhen: stepCountIs(10),
prepareStep: async ({ stepNumber, messages }) => {
// Phase 1: Search (steps 0-2)
if (stepNumber <= 2) {
return {
activeTools: ["search"],
toolChoice: "required", // Force tool use
};
}
// Phase 2: Analyze with a stronger model (steps 3-5)
if (stepNumber <= 5) {
return {
model: expensiveModel,
activeTools: ["analyze"],
};
}
// Phase 3: Summarize
return { activeTools: ["summarize"] };
},
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
prepareStep 在每一步之前运行,可以返回对 model、activeTools、toolChoice、system 和 messages 的覆盖。可以用它来:
- 切换模型 —— 简单步骤用便宜模型,推理时升级
- 分阶段开放工具 —— 限制每一步可用的工具
- 管理上下文 —— 修剪或转换消息以保持在 token 限额内
- 强制工具调用 —— 用
toolChoice: { type: "tool", toolName: "search" }来要求使用某个特定工具
语言模型中间件
使用 wrapLanguageModel ↗,无需修改聊天逻辑就能添加 guardrail、RAG、缓存或日志:
JavaScript
import { streamText, convertToModelMessages, wrapLanguageModel } from "ai";
const guardrailMiddleware = {
wrapGenerate: async ({ doGenerate }) => {
const { text, ...rest } = await doGenerate();
// Filter PII or sensitive content from the response
const cleaned = text?.replace(/\b\d{3}-\d{2}-\d{4}\b/g, "[REDACTED]");
return { text: cleaned, ...rest };
},
};
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const model = wrapLanguageModel({
model: baseModel,
middleware: [guardrailMiddleware],
});
const result = streamText({
model,
messages: await convertToModelMessages(this.messages),
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
TypeScript
import { streamText, convertToModelMessages, wrapLanguageModel } from "ai";
import type { LanguageModelV3Middleware } from "@ai-sdk/provider";
const guardrailMiddleware: LanguageModelV3Middleware = {
wrapGenerate: async ({ doGenerate }) => {
const { text, ...rest } = await doGenerate();
// Filter PII or sensitive content from the response
const cleaned = text?.replace(/\b\d{3}-\d{2}-\d{4}\b/g, "[REDACTED]");
return { text: cleaned, ...rest };
},
};
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const model = wrapLanguageModel({
model: baseModel,
middleware: [guardrailMiddleware],
});
const result = streamText({
model,
messages: await convertToModelMessages(this.messages),
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
AI SDK 内置了若干中间件:
extractReasoningMiddleware—— 把 DeepSeek R1 等模型的思维链暴露出来defaultSettingsMiddleware—— 应用默认的 temperature、max tokens 等simulateStreamingMiddleware—— 给非流式模型加上流式
多个中间件按顺序组合:middleware: [first, second] 等价于 first(second(model))。
结构化输出
在工具中使用 generateObject ↗ 进行结构化数据抽取:
JavaScript
import {
streamText,
generateObject,
convertToModelMessages,
tool,
stepCountIs,
} from "ai";
import { z } from "zod";
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const result = streamText({
model: myModel,
messages: await convertToModelMessages(this.messages),
tools: {
extractContactInfo: tool({
description:
"Extract structured contact information from the conversation",
inputSchema: z.object({
text: z.string().describe("The text to extract contact info from"),
}),
execute: async ({ text }) => {
const { object } = await generateObject({
model: myModel,
schema: z.object({
name: z.string(),
email: z.string().email(),
phone: z.string().optional(),
}),
prompt: `Extract contact information from: ${text}`,
});
return object;
},
}),
},
stopWhen: stepCountIs(5),
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
TypeScript
import {
streamText,
generateObject,
convertToModelMessages,
tool,
stepCountIs,
} from "ai";
import { z } from "zod";
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const result = streamText({
model: myModel,
messages: await convertToModelMessages(this.messages),
tools: {
extractContactInfo: tool({
description:
"Extract structured contact information from the conversation",
inputSchema: z.object({
text: z.string().describe("The text to extract contact info from"),
}),
execute: async ({ text }) => {
const { object } = await generateObject({
model: myModel,
schema: z.object({
name: z.string(),
email: z.string().email(),
phone: z.string().optional(),
}),
prompt: `Extract contact information from: ${text}`,
});
return object;
},
}),
},
stopWhen: stepCountIs(5),
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
子 Agent 委派
注意
本节讨论的是使用 AI SDK ToolLoopAgent 的 进程内 子 agent。带有独立隔离存储和类型化 RPC 的 Durable Object 子 agent 请参见 Sub-agents。在子 agent 中流式输出完整的 LLM 回合,请参见 Think: Sub-agent RPC。
工具可以把工作委派给拥有自己上下文的聚焦子调用。使用 ToolLoopAgent ↗ 定义可复用的 agent,然后在工具的 execute 中调用它:
JavaScript
import {
ToolLoopAgent,
streamText,
convertToModelMessages,
tool,
stepCountIs,
} from "ai";
import { z } from "zod";
// Define a reusable research agent with its own tools and instructions
const researchAgent = new ToolLoopAgent({
model: researchModel,
instructions: "You are a research assistant. Be thorough and cite sources.",
tools: { webSearch: webSearchTool },
stopWhen: stepCountIs(10),
});
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const result = streamText({
model: orchestratorModel,
messages: await convertToModelMessages(this.messages),
tools: {
deepResearch: tool({
description: "Research a topic in depth",
inputSchema: z.object({
topic: z.string().describe("The topic to research"),
}),
execute: async ({ topic }) => {
const { text } = await researchAgent.generate({
prompt: topic,
});
return { summary: text };
},
}),
},
stopWhen: stepCountIs(5),
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
TypeScript
import {
ToolLoopAgent,
streamText,
convertToModelMessages,
tool,
stepCountIs,
} from "ai";
import { z } from "zod";
// Define a reusable research agent with its own tools and instructions
const researchAgent = new ToolLoopAgent({
model: researchModel,
instructions: "You are a research assistant. Be thorough and cite sources.",
tools: { webSearch: webSearchTool },
stopWhen: stepCountIs(10),
});
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const result = streamText({
model: orchestratorModel,
messages: await convertToModelMessages(this.messages),
tools: {
deepResearch: tool({
description: "Research a topic in depth",
inputSchema: z.object({
topic: z.string().describe("The topic to research"),
}),
execute: async ({ topic }) => {
const { text } = await researchAgent.generate({
prompt: topic,
});
return { summary: text };
},
}),
},
stopWhen: stepCountIs(5),
});
return result.toUIMessageStreamResponse();
}
}
Explain Code
研究 agent 在自己的上下文中运行 —— 它的 token 预算与编排者完全分开。只有最终摘要会回到父模型那里。
注意
ToolLoopAgent 最适合做子 agent,而不应该在 onChatMessage 内部用来替代 streamText。主 onChatMessage 能直接访问 this.env、this.messages 和 options.body —— 而预先配置好的 ToolLoopAgent 实例无法引用这些。
用预备结果流式展示进度
默认情况下,工具 part 在 execute 返回之前都会显示为加载中。使用异步生成器(async function*),你可以在工具仍在工作时把进度更新流式发送给客户端:
JavaScript
deepResearch: tool({
description: "Research a topic in depth",
inputSchema: z.object({
topic: z.string().describe("The topic to research"),
}),
async *execute({ topic }) {
// Preliminary result — the client sees "searching" immediately
yield { status: "searching", topic, summary: undefined };
const { text } = await researchAgent.generate({ prompt: topic });
// Final result — sent to the model for its next step
yield { status: "done", topic, summary: text };
},
});
Explain Code
TypeScript
deepResearch: tool({
description: "Research a topic in depth",
inputSchema: z.object({
topic: z.string().describe("The topic to research"),
}),
async *execute({ topic }) {
// Preliminary result — the client sees "searching" immediately
yield { status: "searching", topic, summary: undefined };
const { text } = await researchAgent.generate({ prompt: topic });
// Final result — sent to the model for its next step
yield { status: "done", topic, summary: text };
},
});
Explain Code
每次 yield 都会实时(带 preliminary: true)更新客户端上的工具 part。最后一次 yield 的值会成为模型看到的最终输出。
这种模式适用于:
- 需要探索大量信息,而这些信息会让主上下文膨胀
- 想要为长时运行的工具显示实时进度
- 想要并行进行独立的研究(多个工具调用并发执行)
- 不同子任务需要不同的模型或 system prompt
更多内容参见 AI SDK Agents docs ↗、Subagents ↗ 和 Preliminary Tool Results ↗。
多客户端同步
当多个客户端连接到同一个 agent 实例时,消息会自动广播给所有连接。如果一个客户端发送了消息,所有其他已连接的客户端都会收到更新后的消息列表。
Client A ──── sendMessage("Hello") ────▶ AIChatAgent
│
persist + stream
│
Client A ◀── CF_AGENT_USE_CHAT_RESPONSE ──────┤
Client B ◀── CF_AGENT_CHAT_MESSAGES ──────────┘
发起请求的客户端会收到流式响应。所有其他客户端会通过 CF_AGENT_CHAT_MESSAGES 广播收到最终的消息。
API 参考
导出
| 引入路径 | 导出 |
|---|---|
| @cloudflare/ai-chat | AIChatAgent, createToolsFromClientSchemas, ChatRecoveryContext, ChatRecoveryOptions |
| @cloudflare/ai-chat/react | useAgentChat |
| @cloudflare/ai-chat/types | MessageType, OutgoingMessage, IncomingMessage |
WebSocket 协议
聊天协议在 WebSocket 上使用类型化的 JSON 消息:
| 消息 | 方向 | 用途 |
|---|---|---|
| CF_AGENT_USE_CHAT_REQUEST | Client → Server | 发送一条聊天消息 |
| CF_AGENT_USE_CHAT_RESPONSE | Server → Client | 流式返回响应 chunk |
| CF_AGENT_CHAT_MESSAGES | Server → Client | 广播更新后的消息 |
| CF_AGENT_CHAT_CLEAR | 双向 | 清空对话 |
| CF_AGENT_CHAT_REQUEST_CANCEL | Client → Server | 取消正在进行的流 |
| CF_AGENT_TOOL_RESULT | Client → Server | 提交工具输出 |
| CF_AGENT_TOOL_APPROVAL | Client → Server | 批准或拒绝某个工具 |
| CF_AGENT_MESSAGE_UPDATED | Server → Client | 通知消息已更新 |
| CF_AGENT_STREAM_RESUMING | Server → Client | 通知正在恢复流 |
| CF_AGENT_STREAM_RESUME_REQUEST | Client → Server | 请求检查流是否可恢复 |
已废弃的 API
下列 API 已被废弃,使用时会输出 console 警告。它们将在未来版本中被移除。
| 已废弃 | 替代方案 | 说明 |
|---|---|---|
| addToolResult({ toolCallId, result }) | addToolOutput({ toolCallId, output }) | 重命名以与 AI SDK 术语保持一致 |
| createToolsFromClientSchemas() | 客户端工具现在会自动注册 | 不再需要手动 schema 转换 |
| extractClientToolSchemas() | 客户端工具现在会自动注册 | Schema 会随工具结果一起发送 |
| detectToolsRequiringConfirmation() | 在工具定义上使用 needsApproval | 审批现在按工具粒度,不再是全局过滤 |
| useAgentChat 的 tools 选项 | 在服务端的 onChatMessage 内定义工具 | 所有工具定义都属于服务端 |
| toolsRequiringConfirmation 选项 | 在每个工具上使用 needsApproval | 单工具审批取代了全局列表 |
如果你正在从更早的版本升级,把已废弃的调用替换成对应的替代方案即可。被废弃的 API 仍然能工作,但会在未来的某个大版本中被移除。
下一步
Client SDK useAgent hook 与 AgentClient 类。
Human-in-the-loop 审批流程与人工干预模式。
Build a chat agent 构建你的第一个聊天 Agent 的逐步教程。
Durable execution 用于长时任务的 runFiber()、stash() 与崩溃恢复。
Long-running agents 生命周期、恢复模式以及按 provider 的策略。