Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

聊天 Agent

使用 AIChatAgentuseAgentChat 构建由 AI 驱动的聊天界面。消息会自动持久化到 SQLite,断开后流会自动续传,工具调用在服务端和客户端都可工作。

概览

@cloudflare/ai-chat 包提供两个主要的导出:

导出引入路径用途
AIChatAgent@cloudflare/ai-chat服务端 Agent 类,带消息持久化和流式输出
useAgentChat@cloudflare/ai-chat/react用于构建聊天 UI 的 React hook

构建在 AI SDK ↗ 和 Cloudflare Durable Objects 之上,你可以获得:

  • 自动消息持久化 —— 对话存储在 SQLite,可在重启后保留
  • 可恢复的流式传输 —— 客户端断开后可在流的中间继续接收,不丢失数据
  • 实时同步 —— 消息通过 WebSocket 广播给所有已连接的客户端
  • 工具支持 —— 服务端工具、客户端工具,以及 human-in-the-loop 审批模式
  • Data parts —— 在消息中除了文本外还可附加类型化的 JSON(引用、进度、用量)
  • 行大小保护 —— 当消息接近 SQLite 限制时自动压缩

快速开始

安装

Terminal window


npm install @cloudflare/ai-chat agents ai


服务端

JavaScript


import { AIChatAgent } from "@cloudflare/ai-chat";

import { createWorkersAI } from "workers-ai-provider";

import { streamText, convertToModelMessages } from "ai";


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    // Use any provider such as workers-ai-provider, openai, anthropic, google, etc.

    const workersai = createWorkersAI({ binding: this.env.AI });


    const result = streamText({

      model: workersai("@cf/zai-org/glm-4.7-flash"),

      messages: await convertToModelMessages(this.messages),

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

TypeScript


import { AIChatAgent } from "@cloudflare/ai-chat";

import { createWorkersAI } from "workers-ai-provider";

import { streamText, convertToModelMessages } from "ai";


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    // Use any provider such as workers-ai-provider, openai, anthropic, google, etc.

    const workersai = createWorkersAI({ binding: this.env.AI });


    const result = streamText({

      model: workersai("@cf/zai-org/glm-4.7-flash"),

      messages: await convertToModelMessages(this.messages),

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

客户端

JavaScript


import { useAgent } from "agents/react";

import { useAgentChat } from "@cloudflare/ai-chat/react";


function Chat() {

  const agent = useAgent({ agent: "ChatAgent" });

  const { messages, sendMessage, status } = useAgentChat({ agent });


  return (

    <div>

      {messages.map((msg) => (

        <div key={msg.id}>

          <strong>{msg.role}:</strong>

          {msg.parts.map((part, i) =>

            part.type === "text" ? <span key={i}>{part.text}</span> : null,

          )}

        </div>

      ))}


      <form

        onSubmit={(e) => {

          e.preventDefault();

          const input = e.currentTarget.elements.namedItem("input");

          sendMessage({ text: input.value });

          input.value = "";

        }}

      >

        <input name="input" placeholder="Type a message..." />

        <button type="submit" disabled={status === "streaming"}>

          Send

        </button>

      </form>

    </div>

  );

}


Explain Code

TypeScript


import { useAgent } from "agents/react";

import { useAgentChat } from "@cloudflare/ai-chat/react";


function Chat() {

  const agent = useAgent({ agent: "ChatAgent" });

  const { messages, sendMessage, status } = useAgentChat({ agent });


  return (

    <div>

      {messages.map((msg) => (

        <div key={msg.id}>

          <strong>{msg.role}:</strong>

          {msg.parts.map((part, i) =>

            part.type === "text" ? <span key={i}>{part.text}</span> : null,

          )}

        </div>

      ))}


      <form

        onSubmit={(e) => {

          e.preventDefault();

          const input = e.currentTarget.elements.namedItem(

            "input",

          ) as HTMLInputElement;

          sendMessage({ text: input.value });

          input.value = "";

        }}

      >

        <input name="input" placeholder="Type a message..." />

        <button type="submit" disabled={status === "streaming"}>

          Send

        </button>

      </form>

    </div>

  );

}


Explain Code

Wrangler 配置

JSONC


// wrangler.jsonc

{

  "ai": { "binding": "AI" },

  "durable_objects": {

    "bindings": [{ "name": "ChatAgent", "class_name": "ChatAgent" }],

  },

  "migrations": [{ "tag": "v1", "new_sqlite_classes": ["ChatAgent"] }],

}


new_sqlite_classes migration 是必需的 —— AIChatAgent 使用 SQLite 来持久化消息和缓存流式 chunk。

工作原理

sequenceDiagram participant Client as Client (useAgentChat) participant Agent as AIChatAgent participant DB as SQLite

Client->>Agent: CF_AGENT_USE_CHAT_REQUEST (WebSocket)
Agent->>DB: Persist messages
Agent->>Agent: onChatMessage()
loop Streaming response
    Agent-->>Client: CF_AGENT_USE_CHAT_RESPONSE (chunks)
    Agent->>DB: Buffer chunks
end
Agent->>DB: Persist final message
Agent-->>Client: CF_AGENT_CHAT_MESSAGES (broadcast to all clients)
  1. 客户端通过 WebSocket 发送一条消息
  2. AIChatAgent 把消息持久化到 SQLite,并调用你的 onChatMessage 方法
  3. 你的方法返回一个流式 Response(通常来自 streamText)
  4. Chunk 通过 WebSocket 实时回流
  5. 流结束后,最终消息被持久化,并广播给所有连接

服务端 API

AIChatAgent

继承自 agents 包中的 Agent。负责管理对话状态、持久化和流式输出。

JavaScript


import { AIChatAgent } from "@cloudflare/ai-chat";


export class ChatAgent extends AIChatAgent {

  // Access current messages

  // this.messages: UIMessage[]


  // Limit stored messages (optional)

  maxPersistedMessages = 200;


  async onChatMessage(onFinish, options) {

    // onFinish: optional callback for streamText (cleanup is automatic)

    // options.abortSignal: cancel signal

    // options.body: custom data from client

    // Return a Response (streaming or plain text)

  }

}


Explain Code

TypeScript


import { AIChatAgent } from "@cloudflare/ai-chat";


export class ChatAgent extends AIChatAgent {

  // Access current messages

  // this.messages: UIMessage[]


  // Limit stored messages (optional)

  maxPersistedMessages = 200;


  async onChatMessage(onFinish?, options?) {

    // onFinish: optional callback for streamText (cleanup is automatic)

    // options.abortSignal: cancel signal

    // options.body: custom data from client

    // Return a Response (streaming or plain text)

  }

}


Explain Code

onChatMessage

这是你需要重写的主要方法。它接收对话上下文,并需要返回一个 Response

流式响应(最常见):

JavaScript


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    const workersai = createWorkersAI({ binding: this.env.AI });


    const result = streamText({

      model: workersai("@cf/zai-org/glm-4.7-flash"),

      system: "You are a helpful assistant.",

      messages: await convertToModelMessages(this.messages),

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

TypeScript


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    const workersai = createWorkersAI({ binding: this.env.AI });


    const result = streamText({

      model: workersai("@cf/zai-org/glm-4.7-flash"),

      system: "You are a helpful assistant.",

      messages: await convertToModelMessages(this.messages),

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

纯文本响应:

TypeScript


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    return new Response("Hello! I am a simple agent.", {

      headers: { "Content-Type": "text/plain" },

    });

  }

}


访问自定义 body 数据和 request ID:

TypeScript


export class ChatAgent extends AIChatAgent {

  async onChatMessage(_onFinish, options) {

    const { timezone, userId } = options?.body ?? {};

    // Use these values in your LLM call or business logic


    // options.requestId — unique identifier for this chat request,

    // useful for logging and correlating events

    console.log("Request ID:", options?.requestId);

  }

}


Explain Code

this.messages

当前对话历史,从 SQLite 中加载。它是一个由 AI SDK 的 UIMessage 对象组成的数组。每次交互之后,消息都会被自动持久化。

maxPersistedMessages

限制 SQLite 中存储的消息数量。一旦超过这个限制,最旧的消息就会被删除。这只控制存储 —— 不影响发送给 LLM 的内容。

JavaScript


export class ChatAgent extends AIChatAgent {

  maxPersistedMessages = 200;

}


TypeScript


export class ChatAgent extends AIChatAgent {

  maxPersistedMessages = 200;

}


要控制发送给模型的内容,可以使用 AI SDK 的 pruneMessages():

JavaScript


import { streamText, convertToModelMessages, pruneMessages } from "ai";


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    const workersai = createWorkersAI({ binding: this.env.AI });


    const result = streamText({

      model: workersai("@cf/zai-org/glm-4.7-flash"),

      messages: pruneMessages({

        messages: await convertToModelMessages(this.messages),

        reasoning: "before-last-message",

        toolCalls: "before-last-2-messages",

      }),

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

TypeScript


import { streamText, convertToModelMessages, pruneMessages } from "ai";


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    const workersai = createWorkersAI({ binding: this.env.AI });


    const result = streamText({

      model: workersai("@cf/zai-org/glm-4.7-flash"),

      messages: pruneMessages({

        messages: await convertToModelMessages(this.messages),

        reasoning: "before-last-message",

        toolCalls: "before-last-2-messages",

      }),

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

waitForMcpConnections

控制 AIChatAgent 在调用 onChatMessage 之前,是否等待 MCP 服务器连接稳定下来。这能确保 this.mcp.getAITools() 返回完整的工具集合 —— 在 Durable Object 从休眠唤醒、连接还在后台恢复时尤其重要。

取值行为
{ timeout: 10_000 }最多等待 10 秒(默认)
{ timeout: N }最多等待 N 毫秒
true一直等待,直到所有连接都准备好
false不等待(0.2.0 之前的旧行为)

JavaScript


export class ChatAgent extends AIChatAgent {

  // Default — waits up to 10 seconds

  // waitForMcpConnections = { timeout: 10_000 };


  // Wait forever

  waitForMcpConnections = true;


  // Disable waiting

  waitForMcpConnections = false;

}


Explain Code

TypeScript


export class ChatAgent extends AIChatAgent {

  // Default — waits up to 10 seconds

  // waitForMcpConnections = { timeout: 10_000 };


  // Wait forever

  waitForMcpConnections = true;


  // Disable waiting

  waitForMcpConnections = false;

}


Explain Code

如果想要更细粒度的控制,可以直接在 onChatMessage 内部调用 this.mcp.waitForConnections()

messageConcurrency

控制当前已经有一个聊天回合在进行或排队时,新到的用户提交该如何处理。

JavaScript


export class ChatAgent extends AIChatAgent {

  messageConcurrency = "queue";

}


TypeScript


export class ChatAgent extends AIChatAgent {

  messageConcurrency = "queue";

}


策略行为
“queue”(默认)把每次提交都排队,按顺序处理
“latest”只保留最新的一次重叠提交;被覆盖掉的提交其用户消息仍会被持久化,但不会启动模型回合
“merge”把重叠提交排队,然后在最新一个排队回合开始之前,把这些提交末尾的用户消息合并为一次回合
“drop”完全忽略重叠的提交
{ strategy: “debounce”, debounceMs?: number }尾沿最新值,带一个静默窗口(默认 750ms)

这个设置只对 sendMessage() 提交生效。重新生成、工具续传、批准、清空和程序化的 saveMessages() 调用仍保持原有的串行行为。

persistMessagessaveMessages

persistMessages 把消息存到 SQLite 并把更新广播给所有已连接的客户端,但 不会 触发模型回合。当你想往对话里注入消息但又不想启动新的响应时,使用它。

saveMessages 既会持久化消息 也会 触发 onChatMessage() 生成新的响应。它会等到当前正在运行的聊天回合结束后再启动,所以定时或程序化的消息绝不会与正在进行的流叠加。

JavaScript


// Store messages without triggering a response

await this.persistMessages(messages);


// Store messages AND trigger onChatMessage

const { requestId, status } = await this.saveMessages(messages);


TypeScript


// Store messages without triggering a response

await this.persistMessages(messages);


// Store messages AND trigger onChatMessage

const { requestId, status } = await this.saveMessages(messages);


saveMessages 接收一个消息数组,或者一个根据当前最新 this.messages 派生出下一份消息列表的函数。当多次调用排队时,使用函数形式可以避免基线过期:

JavaScript


await this.saveMessages((messages) => [

  ...messages,

  {

    id: crypto.randomUUID(),

    role: "user",

    parts: [{ type: "text", text: "Summarize the latest data" }],

    createdAt: new Date(),

  },

]);


TypeScript


await this.saveMessages((messages) => [

  ...messages,

  {

    id: crypto.randomUUID(),

    role: "user",

    parts: [{ type: "text", text: "Summarize the latest data" }],

    createdAt: new Date(),

  },

]);


saveMessages 返回 { requestId, status },其中 status 在回合实际运行时为 "completed",在还没开始就被聊天清空时为 "skipped"

onChatResponse

在一次聊天回合完成、且 assistant 消息已经持久化之后被调用。这个钩子运行前,回合锁已经释放,所以可以放心地在内部调用 saveMessages。它会在所有回合完成路径上触发:WebSocket 聊天请求、saveMessages 以及自动续接。

JavaScript


export class ChatAgent extends AIChatAgent {

  async onChatResponse(result) {

    if (result.status === "completed") {

      console.log("Turn completed:", result.requestId);

    }

    if (result.status === "error") {

      console.error("Turn failed:", result.error);

    }

  }

}


Explain Code

TypeScript


import type { ChatResponseResult } from "@cloudflare/ai-chat";


export class ChatAgent extends AIChatAgent {

  protected async onChatResponse(result: ChatResponseResult) {

    if (result.status === "completed") {

      console.log("Turn completed:", result.requestId);

    }

    if (result.status === "error") {

      console.error("Turn failed:", result.error);

    }

  }

}


Explain Code

ChatResponseResult 包含:

字段类型描述
messageUIMessage本次回合最终生成的 assistant 消息
requestIdstring与本次回合关联的 request ID
continuationboolean本次回合是否是上一次 assistant 回合的续接
status“completed” | “error”“aborted”回合的结束方式
errorstring | undefined当 status 为 “error” 时的错误信息

注意

onChatResponse 内部触发的响应(例如通过 saveMessages)不会再递归触发 onChatResponse

sanitizeMessageForPersistence

重写这个方法可以在消息被持久化到存储之前,对它们做自定义的转换。这个钩子会在内置的清理逻辑(剥离 OpenAI metadata、截断 Anthropic 由 provider 执行的工具 payload、过滤空的 reasoning 部分)之后 运行。

JavaScript


export class ChatAgent extends AIChatAgent {

  sanitizeMessageForPersistence(message) {

    return {

      ...message,

      parts: message.parts.map((part) => {

        if (

          "output" in part &&

          typeof part.output === "string" &&

          part.output.length > 1000

        ) {

          return { ...part, output: "[redacted]" };

        }

        return part;

      }),

    };

  }

}


Explain Code

TypeScript


export class ChatAgent extends AIChatAgent {

  protected sanitizeMessageForPersistence(message: UIMessage): UIMessage {

    return {

      ...message,

      parts: message.parts.map((part) => {

        if (

          "output" in part &&

          typeof part.output === "string" &&

          part.output.length > 1000

        ) {

          return { ...part, output: "[redacted]" };

        }

        return part;

      }),

    };

  }

}


Explain Code

回合生命周期辅助方法

下列方法可以帮助你协调程序化回合,并等待挂起的交互完成。

hasPendingInteraction()

当一条 assistant 消息正在等待客户端工具结果或审批时,返回 true

JavaScript


if (this.hasPendingInteraction()) {

  console.log("Waiting for user to approve or provide tool output");

}


TypeScript


if (this.hasPendingInteraction()) {

  console.log("Waiting for user to approve or provide tool output");

}


waitUntilStable()

等待对话彻底进入稳定状态 —— 没有正在进行的流、没有挂起的客户端工具交互、也没有排队的续接回合。在变得稳定时返回 true,如果在挂起的交互完成之前超时则返回 false

JavaScript


const stable = await this.waitUntilStable({ timeout: 30_000 });

if (stable) {

  console.log("All turns complete, safe to proceed");

}


TypeScript


const stable = await this.waitUntilStable({ timeout: 30_000 });

if (stable) {

  console.log("All turns complete, safe to proceed");

}


这在配合 saveMessages 实现服务端驱动的流程时尤其有用:

JavaScript


await this.saveMessages((messages) => [...messages, syntheticUserMessage]);

await this.waitUntilStable({ timeout: 60_000 });

// The assistant has finished responding


TypeScript


await this.saveMessages((messages) => [...messages, syntheticUserMessage]);

await this.waitUntilStable({ timeout: 60_000 });

// The assistant has finished responding


resetTurnState()

中止当前正在进行的回合,并使排队的续接失效。内置的 CF_AGENT_CHAT_CLEAR 处理器会自动调用它,如果需要,你也可以手动调用。

生命周期钩子

重写 onConnectonClose 可以添加自定义逻辑。流恢复和消息同步会自动处理:

JavaScript


export class ChatAgent extends AIChatAgent {

  async onConnect(connection, ctx) {

    // Your custom logic (e.g., logging, auth checks)

    console.log("Client connected:", connection.id);

    // Stream resumption and message sync are handled automatically

  }


  async onClose(connection, code, reason, wasClean) {

    console.log("Client disconnected:", connection.id);

    // Connection cleanup is handled automatically

  }

}


Explain Code

TypeScript


export class ChatAgent extends AIChatAgent {

  async onConnect(connection, ctx) {

    // Your custom logic (e.g., logging, auth checks)

    console.log("Client connected:", connection.id);

    // Stream resumption and message sync are handled automatically

  }


  async onClose(connection, code, reason, wasClean) {

    console.log("Client disconnected:", connection.id);

    // Connection cleanup is handled automatically

  }

}


Explain Code

destroy() 方法会取消所有挂起的聊天请求并清理流状态。当 Durable Object 被回收时它会被自动调用,你也可以根据需要手动调用。

请求取消

当用户在聊天 UI 中点击 “stop”,客户端会发送一条 CF_AGENT_CHAT_REQUEST_CANCEL 消息。服务端会把它传播到 options 中的 abortSignal:

JavaScript


export class ChatAgent extends AIChatAgent {

  async onChatMessage(_onFinish, options) {

    const result = streamText({

      model: workersai("@cf/zai-org/glm-4.7-flash"),

      messages: await convertToModelMessages(this.messages),

      abortSignal: options?.abortSignal, // Pass through for cancellation

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

TypeScript


export class ChatAgent extends AIChatAgent {

  async onChatMessage(_onFinish, options) {

    const result = streamText({

      model: workersai("@cf/zai-org/glm-4.7-flash"),

      messages: await convertToModelMessages(this.messages),

      abortSignal: options?.abortSignal, // Pass through for cancellation

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

警告

如果你不把 abortSignal 传给 streamText,即便用户已经取消,LLM 的调用仍然会在后台继续运行。请尽量始终把它传过去。

流恢复

当 Durable Object 在流式过程中被驱逐(代码更新、空闲超时、资源限制)时,与 LLM 之间的连接会被永久切断,内存中的流式状态也会丢失。chatRecovery 会把每次聊天回合都包在一个 runFiber() 内,提供流式期间的自动 keepAlive,以及在重启时的恢复钩子。

JavaScript


export class ChatAgent extends AIChatAgent {

  chatRecovery = true;

}


TypeScript


export class ChatAgent extends AIChatAgent {

  override chatRecovery = true;

}


启用之后,每次 onChatMessage 调用都会运行在一个 fiber 内。如果 Agent 在流式过程中被驱逐,fiber 行会保留在 SQLite 中。在下次激活时,框架会检测到被中断的 fiber,从缓存的流 chunk 重建出部分响应,并调用 onChatRecovery

onChatRecovery

重写它可以实现按 provider 定制的恢复策略。默认行为会持久化部分响应,并通过 continueLastTurn() 安排续接。

JavaScript


export class ChatAgent extends AIChatAgent {

  chatRecovery = true;


  async onChatRecovery(ctx) {

    console.log(`Recovered ${ctx.partialText.length} chars of partial text`);


    // Default: persist partial + schedule continuation

    return {};

  }

}


Explain Code

TypeScript


import type { ChatRecoveryContext, ChatRecoveryOptions } from "@cloudflare/ai-chat";


export class ChatAgent extends AIChatAgent {

  override chatRecovery = true;


  override async onChatRecovery(

    ctx: ChatRecoveryContext,

  ): Promise<ChatRecoveryOptions> {

    console.log(`Recovered ${ctx.partialText.length} chars of partial text`);


    // Default: persist partial + schedule continuation

    return {};

  }

}


Explain Code

ChatRecoveryContext:

字段类型描述
streamIdstring被中断流的 ID
requestIdstring原始聊天请求的 ID
partialTextstring驱逐前已经生成的文本
partialPartsMessagePart[]驱逐前已经生成的消息部分(text、reasoning、tool calls)
recoveryDataunknown | null来自 this.stash() 的数据 —— 完全由用户控制
messagesChatMessage[]完整的对话历史
lastBodyRecord<string, unknown> | undefined原始请求的 body
lastClientToolsClientToolSchema[] | undefined原始请求中携带的客户端工具 schema

ChatRecoveryOptions:

字段默认值描述
persisttrue把部分响应保存为一条 assistant 消息
continuetrue通过 continueLastTurn() 安排续接

常见的返回值:

  • {} —— 持久化部分响应 + 自动续接(默认,适用于支持 assistant 预填充的 provider)
  • { continue: false } —— 持久化部分响应但不自动续接(自己处理续接逻辑)
  • { persist: false, continue: false } —— 一切自己处理(例如从 provider 获取已经完成的响应)

continueLastTurn

通过用保存好的请求 body 重新调用 onChatMessage,把内容追加到上一条 assistant 消息上。响应会作为续接流式输出 —— 追加到现有的 assistant 消息上,而不是新建一条。不会创建任何合成的 user 消息。

TypeScript


protected continueLastTurn(body?: Record<string, unknown>): Promise<SaveMessagesResult>;


默认的恢复路径会自动调用它。也可以在定时回调或其他入口点中手动调用。可选的 body 参数会与保存的 _lastBody 合并。

暂存恢复数据

onChatMessage 内部使用 this.stash() 持久化与 provider 相关的数据,以备恢复使用。stash 会被存入 fiber 的 SQLite 行,与 agent state 分开,在 onChatRecovery 中作为 ctx.recoveryData 提供。

JavaScript


export class ChatAgent extends AIChatAgent {

  chatRecovery = true;


  async onChatMessage(_onFinish, options) {

    const result = streamText({

      model: openai("gpt-5.4"),

      messages: await convertToModelMessages(this.messages),

      providerOptions: { openai: { store: true } },

      includeRawChunks: true,

      onChunk: ({ chunk }) => {

        if (chunk.type === "raw") {

          const raw = chunk.rawValue;


          if (raw?.type === "response.created" && raw.response?.id) {

            this.stash({ responseId: raw.response.id });

          }

        }

      },

    });

    return result.toUIMessageStreamResponse();

  }

}


Explain Code

TypeScript


export class ChatAgent extends AIChatAgent {

  override chatRecovery = true;


  async onChatMessage(_onFinish, options) {

    const result = streamText({

      model: openai("gpt-5.4"),

      messages: await convertToModelMessages(this.messages),

      providerOptions: { openai: { store: true } },

      includeRawChunks: true,

      onChunk: ({ chunk }) => {

        if (chunk.type === "raw") {

          const raw = chunk.rawValue as {

            type?: string;

            response?: { id?: string };

          };

          if (raw?.type === "response.created" && raw.response?.id) {

            this.stash({ responseId: raw.response.id });

          }

        }

      },

    });

    return result.toUIMessageStreamResponse();

  }

}


Explain Code

各 provider 的恢复策略

合适的策略取决于 provider 是否支持 assistant 预填充,以及响应在断开后是否会在服务端继续:

Provider策略Token 成本
Workers AIcontinueLastTurn() —— 模型通过 assistant 预填充继续
OpenAI (Responses API)通过 ID 取回已完成的响应 —— 零浪费 token
Anthropic持久化部分响应,发送一条合成 user 消息以续接

要了解 chat recovery 在更宏观的长时运行 agent 中的位置,参见 Long-running agents: Recovering interrupted LLM streams。底层的 fiber API 参见 Durable Execution

客户端 API

useAgentChat

通过 WebSocket 连接到 AIChatAgent 的 React hook。它把 AI SDK 的 useChat 封装在一个原生 WebSocket transport 之上。

JavaScript


import { useAgent } from "agents/react";

import { useAgentChat } from "@cloudflare/ai-chat/react";


function Chat() {

  const agent = useAgent({ agent: "ChatAgent" });

  const {

    messages,

    sendMessage,

    clearHistory,

    addToolOutput,

    addToolApprovalResponse,

    setMessages,

    status,

  } = useAgentChat({ agent });


  // ...

}


Explain Code

TypeScript


import { useAgent } from "agents/react";

import { useAgentChat } from "@cloudflare/ai-chat/react";


function Chat() {

  const agent = useAgent({ agent: "ChatAgent" });

  const {

    messages,

    sendMessage,

    clearHistory,

    addToolOutput,

    addToolApprovalResponse,

    setMessages,

    status,

  } = useAgentChat({ agent });


  // ...

}


Explain Code

选项

选项类型默认值描述
agentReturnType<typeof useAgent>必填来自 useAgent 的 agent 连接
onToolCall({ toolCall, addToolOutput }) => void处理客户端工具的执行
autoContinueAfterToolResultbooleantrue在客户端工具结果和审批之后自动继续对话
resumebooleantrue在重连时启用自动流恢复
bodyobject | () => object每次请求都会附带的自定义数据
prepareSendMessagesRequest(options) => { body?, headers? }高级的逐请求自定义
getInitialMessages(options) => Promise<UIMessage[]> or null自定义初始消息加载器。设为 null 可完全跳过 HTTP 请求(在你直接提供消息时有用)

返回值

属性类型描述
messagesUIMessage[]当前对话消息
sendMessage(message) => void发送一条消息
clearHistory() => void清空对话(客户端和服务端)
addToolOutput({ toolCallId, output }) => void为客户端工具提供输出
addToolApprovalResponse({ id, approved }) => void批准或拒绝需要审批的工具
setMessages(messages | updater) => void直接设置消息(同步到服务端)
statusstring“idle”、“submitted”、“streaming” 或 “error”

工具

AIChatAgent 支持三种工具模式,全部使用 AI SDK 的 tool() 函数:

模式运行位置适用场景
服务端服务器(自动)API 调用、数据库查询、计算
客户端浏览器(通过 onToolCall)地理位置、剪贴板、摄像头、本地存储
审批服务端(在用户批准之后)支付、删除、外部操作

服务端工具

带有 execute 函数的工具会自动在服务端运行:

JavaScript


import { streamText, convertToModelMessages, tool, stepCountIs } from "ai";

import { z } from "zod";

export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    const workersai = createWorkersAI({ binding: this.env.AI });


    const result = streamText({

      model: workersai("@cf/zai-org/glm-4.7-flash"),

      messages: await convertToModelMessages(this.messages),

      tools: {

        getWeather: tool({

          description: "Get weather for a city",

          inputSchema: z.object({ city: z.string() }),

          execute: async ({ city }) => {

            const data = await fetchWeather(city);

            return { temperature: data.temp, condition: data.condition };

          },

        }),

      },

      stopWhen: stepCountIs(5),

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

TypeScript


import { streamText, convertToModelMessages, tool, stepCountIs } from "ai";

import { z } from "zod";

export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    const workersai = createWorkersAI({ binding: this.env.AI });


    const result = streamText({

      model: workersai("@cf/zai-org/glm-4.7-flash"),

      messages: await convertToModelMessages(this.messages),

      tools: {

        getWeather: tool({

          description: "Get weather for a city",

          inputSchema: z.object({ city: z.string() }),

          execute: async ({ city }) => {

            const data = await fetchWeather(city);

            return { temperature: data.temp, condition: data.condition };

          },

        }),

      },

      stopWhen: stepCountIs(5),

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

客户端工具

在服务端定义一个不带 execute 的工具,然后在客户端用 onToolCall 处理。这适用于需要浏览器 API 的工具。

服务端:

JavaScript


tools: {

  getLocation: tool({

    description: "Get the user's location from the browser",

    inputSchema: z.object({}),

    // No execute — the client handles it

  });

}


TypeScript


tools: {

  getLocation: tool({

    description: "Get the user's location from the browser",

    inputSchema: z.object({}),

    // No execute — the client handles it

  });

}


客户端:

JavaScript


const { messages, sendMessage } = useAgentChat({

  agent,

  onToolCall: async ({ toolCall, addToolOutput }) => {

    if (toolCall.toolName === "getLocation") {

      const pos = await new Promise((resolve, reject) =>

        navigator.geolocation.getCurrentPosition(resolve, reject),

      );

      addToolOutput({

        toolCallId: toolCall.toolCallId,

        output: { lat: pos.coords.latitude, lng: pos.coords.longitude },

      });

    }

  },

});


Explain Code

TypeScript


const { messages, sendMessage } = useAgentChat({

  agent,

  onToolCall: async ({ toolCall, addToolOutput }) => {

    if (toolCall.toolName === "getLocation") {

      const pos = await new Promise((resolve, reject) =>

        navigator.geolocation.getCurrentPosition(resolve, reject),

      );

      addToolOutput({

        toolCallId: toolCall.toolCallId,

        output: { lat: pos.coords.latitude, lng: pos.coords.longitude },

      });

    }

  },

});


Explain Code

当 LLM 调用 getLocation 时,流会暂停。onToolCall 回调被触发,你的代码提供输出,然后对话继续。

工具审批(human-in-the-loop)

对于在执行前需要用户确认的工具,使用 needsApproval

服务端:

JavaScript


tools: {

  processPayment: tool({

    description: "Process a payment",

    inputSchema: z.object({

      amount: z.number(),

      recipient: z.string(),

    }),

    needsApproval: async ({ amount }) => amount > 100,

    execute: async ({ amount, recipient }) => charge(amount, recipient),

  });

}


Explain Code

TypeScript


tools: {

  processPayment: tool({

    description: "Process a payment",

    inputSchema: z.object({

      amount: z.number(),

      recipient: z.string(),

    }),

    needsApproval: async ({ amount }) => amount > 100,

    execute: async ({ amount, recipient }) => charge(amount, recipient),

  });

}


Explain Code

客户端:

JavaScript


const { messages, addToolApprovalResponse } = useAgentChat({ agent });


// Render pending approvals from message parts

{

  messages.map((msg) =>

    msg.parts

      .filter(

        (part) => part.type === "tool" && part.state === "approval-required",

      )

      .map((part) => (

        <div key={part.toolCallId}>

          <p>Approve {part.toolName}?</p>

          <button

            onClick={() =>

              addToolApprovalResponse({

                id: part.toolCallId,

                approved: true,

              })

            }

          >

            Approve

          </button>

          <button

            onClick={() =>

              addToolApprovalResponse({

                id: part.toolCallId,

                approved: false,

              })

            }

          >

            Reject

          </button>

        </div>

      )),

  );

}


Explain Code

TypeScript


const { messages, addToolApprovalResponse } = useAgentChat({ agent });


// Render pending approvals from message parts

{

  messages.map((msg) =>

    msg.parts

      .filter(

        (part) => part.type === "tool" && part.state === "approval-required",

      )

      .map((part) => (

        <div key={part.toolCallId}>

          <p>Approve {part.toolName}?</p>

          <button

            onClick={() =>

              addToolApprovalResponse({

                id: part.toolCallId,

                approved: true,

              })

            }

          >

            Approve

          </button>

          <button

            onClick={() =>

              addToolApprovalResponse({

                id: part.toolCallId,

                approved: false,

              })

            }

          >

            Reject

          </button>

        </div>

      )),

  );

}


Explain Code

addToolOutput 自定义拒绝消息

当用户拒绝一个工具时,addToolApprovalResponse({ id, approved: false }) 会把工具状态设置为 output-denied,带一条通用的消息。如果想让 LLM 知道拒绝的更具体原因,可以改用带 state: "output-error"addToolOutput:

JavaScript


const { addToolOutput } = useAgentChat({ agent });


// Reject with a custom error message

addToolOutput({

  toolCallId: part.toolCallId,

  state: "output-error",

  errorText: "User declined: insufficient budget for this quarter",

});


TypeScript


const { addToolOutput } = useAgentChat({ agent });


// Reject with a custom error message

addToolOutput({

  toolCallId: part.toolCallId,

  state: "output-error",

  errorText: "User declined: insufficient budget for this quarter",

});


这会向 LLM 发送一条带你自定义错误文本的 tool_result,让它能够做出合理的回应(例如,建议替代方案或追问澄清问题)。

addToolApprovalResponse(且 approved: false)在 autoContinueAfterToolResult 启用时(默认值)会自动继续对话。带 state: "output-error"addToolOutput 不会 自动续接 —— 如果你想让 LLM 对错误做出回应,请在之后调用 sendMessage()

更多模式参见 Human-in-the-loop

自定义请求数据

通过 body 选项,在每次聊天请求中包含自定义数据:

JavaScript


const { messages, sendMessage } = useAgentChat({

  agent,

  body: {

    timezone: Intl.DateTimeFormat().resolvedOptions().timeZone,

    userId: currentUser.id,

  },

});


TypeScript


const { messages, sendMessage } = useAgentChat({

  agent,

  body: {

    timezone: Intl.DateTimeFormat().resolvedOptions().timeZone,

    userId: currentUser.id,

  },

});


对于动态值,使用一个函数:

JavaScript


body: () => ({

  token: getAuthToken(),

  timestamp: Date.now(),

});


TypeScript


body: () => ({

  token: getAuthToken(),

  timestamp: Date.now(),

});


在服务端访问这些字段:

JavaScript


export class ChatAgent extends AIChatAgent {

  async onChatMessage(_onFinish, options) {

    const { timezone, userId } = options?.body ?? {};

    // ...

  }

}


TypeScript


export class ChatAgent extends AIChatAgent {

  async onChatMessage(_onFinish, options) {

    const { timezone, userId } = options?.body ?? {};

    // ...

  }

}


如果需要更高级的逐请求自定义(自定义 header、不同请求使用不同 body),使用 prepareSendMessagesRequest:

JavaScript


const { messages, sendMessage } = useAgentChat({

  agent,

  prepareSendMessagesRequest: async ({ messages, trigger }) => ({

    headers: { Authorization: `Bearer ${await getToken()}` },

    body: { requestedAt: Date.now() },

  }),

});


TypeScript


const { messages, sendMessage } = useAgentChat({

  agent,

  prepareSendMessagesRequest: async ({ messages, trigger }) => ({

    headers: { Authorization: `Bearer ${await getToken()}` },

    body: { requestedAt: Date.now() },

  }),

});


Data parts

Data parts 让你可以在消息中除了文本外附加类型化的 JSON —— 进度指示器、来源引用、token 用量,或者任何 UI 需要的结构化数据。

写入 data parts(服务端)

使用 createUIMessageStream 配合 writer.write() 从服务端发送 data parts:

JavaScript


import {

  streamText,

  convertToModelMessages,

  createUIMessageStream,

  createUIMessageStreamResponse,

} from "ai";


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    const workersai = createWorkersAI({ binding: this.env.AI });


    const stream = createUIMessageStream({

      execute: async ({ writer }) => {

        const result = streamText({

          model: workersai("@cf/zai-org/glm-4.7-flash"),

          messages: await convertToModelMessages(this.messages),

        });


        // Merge the LLM stream

        writer.merge(result.toUIMessageStream());


        // Write a data part — persisted to message.parts

        writer.write({

          type: "data-sources",

          id: "src-1",

          data: { query: "agents", status: "searching", results: [] },

        });


        // Later: update the same part in-place (same type + id)

        writer.write({

          type: "data-sources",

          id: "src-1",

          data: {

            query: "agents",

            status: "found",

            results: ["Agents SDK docs", "Durable Objects guide"],

          },

        });

      },

    });


    return createUIMessageStreamResponse({ stream });

  }

}


Explain Code

TypeScript


import {

  streamText,

  convertToModelMessages,

  createUIMessageStream,

  createUIMessageStreamResponse,

} from "ai";


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    const workersai = createWorkersAI({ binding: this.env.AI });


    const stream = createUIMessageStream({

      execute: async ({ writer }) => {

        const result = streamText({

          model: workersai("@cf/zai-org/glm-4.7-flash"),

          messages: await convertToModelMessages(this.messages),

        });


        // Merge the LLM stream

        writer.merge(result.toUIMessageStream());


        // Write a data part — persisted to message.parts

        writer.write({

          type: "data-sources",

          id: "src-1",

          data: { query: "agents", status: "searching", results: [] },

        });


        // Later: update the same part in-place (same type + id)

        writer.write({

          type: "data-sources",

          id: "src-1",

          data: {

            query: "agents",

            status: "found",

            results: ["Agents SDK docs", "Durable Objects guide"],

          },

        });

      },

    });


    return createUIMessageStreamResponse({ stream });

  }

}


Explain Code

三种模式

模式方式持久化?用例
就地更新相同 type + id → 在原位置更新渐进式状态(searching → found)
追加没有 id,或者使用不同的 id → 追加日志条目、多条引用
瞬时transient: true → 不会进入 message.parts临时状态(thinking 指示器)

瞬时(transient)part 会实时广播给已连接的客户端,但不会进入 SQLite 持久化,也不会出现在 message.parts 中。使用 onData 回调来消费它们。

读取 data parts(客户端)

非 transient 的 data parts 会出现在 message.parts 里。使用 UIMessage 泛型给它们加类型:

JavaScript


import { useAgentChat } from "@cloudflare/ai-chat/react";

const { messages } = useAgentChat({ agent });


// Typed access — no casts needed

for (const msg of messages) {

  for (const part of msg.parts) {

    if (part.type === "data-sources") {

      console.log(part.data.results); // string[]

    }

  }

}


Explain Code

TypeScript


import { useAgentChat } from "@cloudflare/ai-chat/react";

import type { UIMessage } from "ai";


type ChatMessage = UIMessage<

  unknown,

  {

    sources: { query: string; status: string; results: string[] };

    usage: { model: string; inputTokens: number; outputTokens: number };

  }

>;


const { messages } = useAgentChat<unknown, ChatMessage>({ agent });


// Typed access — no casts needed

for (const msg of messages) {

  for (const part of msg.parts) {

    if (part.type === "data-sources") {

      console.log(part.data.results); // string[]

    }

  }

}


Explain Code

onData 处理瞬时 part

瞬时 data parts 不会出现在 message.parts 里。请改用 onData 回调:

JavaScript


const [thinking, setThinking] = useState(false);


const { messages } = useAgentChat({

  agent,

  onData(part) {

    if (part.type === "data-thinking") {

      setThinking(true);

    }

  },

});


Explain Code

TypeScript


const [thinking, setThinking] = useState(false);


const { messages } = useAgentChat<unknown, ChatMessage>({

  agent,

  onData(part) {

    if (part.type === "data-thinking") {

      setThinking(true);

    }

  },

});


Explain Code

在服务端,使用 transient: true 写入瞬时 part:

JavaScript


writer.write({

  transient: true,

  type: "data-thinking",

  data: { model: "glm-4.7-flash", startedAt: new Date().toISOString() },

});


TypeScript


writer.write({

  transient: true,

  type: "data-thinking",

  data: { model: "glm-4.7-flash", startedAt: new Date().toISOString() },

});


onData 在所有路径上都会触发 —— 新消息、流恢复以及跨标签页广播。

可恢复的流式传输

当客户端断开后再重新连接时,流会自动恢复。无需任何配置 —— 它开箱即用。

当流式传输处于活动状态时:

  1. 所有 chunk 在生成时都会缓存到 SQLite
  2. 如果客户端断开,服务器会继续生成并缓存
  3. 当客户端重新连接时,它会收到所有缓存的 chunk,并继续接收实时流

通过 resume: false 关闭:

JavaScript


const { messages } = useAgentChat({ agent, resume: false });


TypeScript


const { messages } = useAgentChat({ agent, resume: false });


存储管理

行大小保护

SQLite 的行最大尺寸为 2 MB。当一条消息接近这个限制(例如,某个工具返回了非常大的输出),AIChatAgent 会自动压缩这条消息:

  1. 工具输出压缩 —— 超大工具输出会被替换为一段对 LLM 友好的摘要,告知模型可以建议重新调用工具
  2. 文本截断 —— 如果在工具压缩之后消息仍然太大,文本部分会被截断并附上一段说明

被压缩过的消息会带有 metadata.compactedToolOutputs,这样客户端可以检测并优雅地展示。

控制 LLM 上下文与存储

存储(maxPersistedMessages)与 LLM 上下文是相互独立的:

关注点控制方式范围
SQLite 存储多少条消息maxPersistedMessages持久化
模型实际看到什么pruneMessages()LLM 上下文
行大小限制自动压缩单条消息

JavaScript


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    const result = streamText({

      model: workersai("@cf/zai-org/glm-4.7-flash"),

      messages: pruneMessages({

        // LLM context limit

        messages: await convertToModelMessages(this.messages),

        reasoning: "before-last-message",

        toolCalls: "before-last-2-messages",

      }),

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

TypeScript


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    const result = streamText({

      model: workersai("@cf/zai-org/glm-4.7-flash"),

      messages: pruneMessages({

        // LLM context limit

        messages: await convertToModelMessages(this.messages),

        reasoning: "before-last-message",

        toolCalls: "before-last-2-messages",

      }),

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

使用不同的 AI provider

AIChatAgent 支持任何与 AI SDK 兼容的 provider。服务端代码决定使用哪个模型 —— 客户端不需要手动改动。

Workers AI(Cloudflare)

JavaScript


import { createWorkersAI } from "workers-ai-provider";


const workersai = createWorkersAI({ binding: this.env.AI });

const result = streamText({

  model: workersai("@cf/zai-org/glm-4.7-flash"),

  messages: await convertToModelMessages(this.messages),

});


TypeScript


import { createWorkersAI } from "workers-ai-provider";


const workersai = createWorkersAI({ binding: this.env.AI });

const result = streamText({

  model: workersai("@cf/zai-org/glm-4.7-flash"),

  messages: await convertToModelMessages(this.messages),

});


OpenAI

JavaScript


import { createOpenAI } from "@ai-sdk/openai";


const openai = createOpenAI({ apiKey: this.env.OPENAI_API_KEY });

const result = streamText({

  model: openai.chat("gpt-4o"),

  messages: await convertToModelMessages(this.messages),

});


TypeScript


import { createOpenAI } from "@ai-sdk/openai";


const openai = createOpenAI({ apiKey: this.env.OPENAI_API_KEY });

const result = streamText({

  model: openai.chat("gpt-4o"),

  messages: await convertToModelMessages(this.messages),

});


Anthropic

JavaScript


import { createAnthropic } from "@ai-sdk/anthropic";


const anthropic = createAnthropic({ apiKey: this.env.ANTHROPIC_API_KEY });

const result = streamText({

  model: anthropic("claude-sonnet-4-20250514"),

  messages: await convertToModelMessages(this.messages),

});


TypeScript


import { createAnthropic } from "@ai-sdk/anthropic";


const anthropic = createAnthropic({ apiKey: this.env.ANTHROPIC_API_KEY });

const result = streamText({

  model: anthropic("claude-sonnet-4-20250514"),

  messages: await convertToModelMessages(this.messages),

});


高级模式

由于 onChatMessage 让你完全掌控 streamText 调用,你可以直接使用 AI SDK 的任何特性。下面这些模式都是开箱即用的 —— 不需要 AIChatAgent 任何特殊配置。

动态模型与工具控制

使用 prepareStep ↗,在多步 agent 循环的各步之间切换模型、可用工具或 system prompt:

JavaScript


import { streamText, convertToModelMessages, tool, stepCountIs } from "ai";

import { z } from "zod";


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    const result = streamText({

      model: cheapModel, // Default model for simple steps

      messages: await convertToModelMessages(this.messages),

      tools: {

        search: searchTool,

        analyze: analyzeTool,

        summarize: summarizeTool,

      },

      stopWhen: stepCountIs(10),

      prepareStep: async ({ stepNumber, messages }) => {

        // Phase 1: Search (steps 0-2)

        if (stepNumber <= 2) {

          return {

            activeTools: ["search"],

            toolChoice: "required", // Force tool use

          };

        }


        // Phase 2: Analyze with a stronger model (steps 3-5)

        if (stepNumber <= 5) {

          return {

            model: expensiveModel,

            activeTools: ["analyze"],

          };

        }


        // Phase 3: Summarize

        return { activeTools: ["summarize"] };

      },

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

TypeScript


import { streamText, convertToModelMessages, tool, stepCountIs } from "ai";

import { z } from "zod";


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    const result = streamText({

      model: cheapModel, // Default model for simple steps

      messages: await convertToModelMessages(this.messages),

      tools: {

        search: searchTool,

        analyze: analyzeTool,

        summarize: summarizeTool,

      },

      stopWhen: stepCountIs(10),

      prepareStep: async ({ stepNumber, messages }) => {

        // Phase 1: Search (steps 0-2)

        if (stepNumber <= 2) {

          return {

            activeTools: ["search"],

            toolChoice: "required", // Force tool use

          };

        }


        // Phase 2: Analyze with a stronger model (steps 3-5)

        if (stepNumber <= 5) {

          return {

            model: expensiveModel,

            activeTools: ["analyze"],

          };

        }


        // Phase 3: Summarize

        return { activeTools: ["summarize"] };

      },

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

prepareStep 在每一步之前运行,可以返回对 modelactiveToolstoolChoicesystemmessages 的覆盖。可以用它来:

  • 切换模型 —— 简单步骤用便宜模型,推理时升级
  • 分阶段开放工具 —— 限制每一步可用的工具
  • 管理上下文 —— 修剪或转换消息以保持在 token 限额内
  • 强制工具调用 —— 用 toolChoice: { type: "tool", toolName: "search" } 来要求使用某个特定工具

语言模型中间件

使用 wrapLanguageModel ↗,无需修改聊天逻辑就能添加 guardrail、RAG、缓存或日志:

JavaScript


import { streamText, convertToModelMessages, wrapLanguageModel } from "ai";

const guardrailMiddleware = {

  wrapGenerate: async ({ doGenerate }) => {

    const { text, ...rest } = await doGenerate();

    // Filter PII or sensitive content from the response

    const cleaned = text?.replace(/\b\d{3}-\d{2}-\d{4}\b/g, "[REDACTED]");

    return { text: cleaned, ...rest };

  },

};


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    const model = wrapLanguageModel({

      model: baseModel,

      middleware: [guardrailMiddleware],

    });


    const result = streamText({

      model,

      messages: await convertToModelMessages(this.messages),

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

TypeScript


import { streamText, convertToModelMessages, wrapLanguageModel } from "ai";

import type { LanguageModelV3Middleware } from "@ai-sdk/provider";


const guardrailMiddleware: LanguageModelV3Middleware = {

  wrapGenerate: async ({ doGenerate }) => {

    const { text, ...rest } = await doGenerate();

    // Filter PII or sensitive content from the response

    const cleaned = text?.replace(/\b\d{3}-\d{2}-\d{4}\b/g, "[REDACTED]");

    return { text: cleaned, ...rest };

  },

};


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    const model = wrapLanguageModel({

      model: baseModel,

      middleware: [guardrailMiddleware],

    });


    const result = streamText({

      model,

      messages: await convertToModelMessages(this.messages),

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

AI SDK 内置了若干中间件:

  • extractReasoningMiddleware —— 把 DeepSeek R1 等模型的思维链暴露出来
  • defaultSettingsMiddleware —— 应用默认的 temperature、max tokens 等
  • simulateStreamingMiddleware —— 给非流式模型加上流式

多个中间件按顺序组合:middleware: [first, second] 等价于 first(second(model))

结构化输出

在工具中使用 generateObject ↗ 进行结构化数据抽取:

JavaScript


import {

  streamText,

  generateObject,

  convertToModelMessages,

  tool,

  stepCountIs,

} from "ai";

import { z } from "zod";


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    const result = streamText({

      model: myModel,

      messages: await convertToModelMessages(this.messages),

      tools: {

        extractContactInfo: tool({

          description:

            "Extract structured contact information from the conversation",

          inputSchema: z.object({

            text: z.string().describe("The text to extract contact info from"),

          }),

          execute: async ({ text }) => {

            const { object } = await generateObject({

              model: myModel,

              schema: z.object({

                name: z.string(),

                email: z.string().email(),

                phone: z.string().optional(),

              }),

              prompt: `Extract contact information from: ${text}`,

            });

            return object;

          },

        }),

      },

      stopWhen: stepCountIs(5),

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

TypeScript


import {

  streamText,

  generateObject,

  convertToModelMessages,

  tool,

  stepCountIs,

} from "ai";

import { z } from "zod";


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    const result = streamText({

      model: myModel,

      messages: await convertToModelMessages(this.messages),

      tools: {

        extractContactInfo: tool({

          description:

            "Extract structured contact information from the conversation",

          inputSchema: z.object({

            text: z.string().describe("The text to extract contact info from"),

          }),

          execute: async ({ text }) => {

            const { object } = await generateObject({

              model: myModel,

              schema: z.object({

                name: z.string(),

                email: z.string().email(),

                phone: z.string().optional(),

              }),

              prompt: `Extract contact information from: ${text}`,

            });

            return object;

          },

        }),

      },

      stopWhen: stepCountIs(5),

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

子 Agent 委派

注意

本节讨论的是使用 AI SDK ToolLoopAgent进程内 子 agent。带有独立隔离存储和类型化 RPC 的 Durable Object 子 agent 请参见 Sub-agents。在子 agent 中流式输出完整的 LLM 回合,请参见 Think: Sub-agent RPC

工具可以把工作委派给拥有自己上下文的聚焦子调用。使用 ToolLoopAgent ↗ 定义可复用的 agent,然后在工具的 execute 中调用它:

JavaScript


import {

  ToolLoopAgent,

  streamText,

  convertToModelMessages,

  tool,

  stepCountIs,

} from "ai";

import { z } from "zod";


// Define a reusable research agent with its own tools and instructions

const researchAgent = new ToolLoopAgent({

  model: researchModel,

  instructions: "You are a research assistant. Be thorough and cite sources.",

  tools: { webSearch: webSearchTool },

  stopWhen: stepCountIs(10),

});


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    const result = streamText({

      model: orchestratorModel,

      messages: await convertToModelMessages(this.messages),

      tools: {

        deepResearch: tool({

          description: "Research a topic in depth",

          inputSchema: z.object({

            topic: z.string().describe("The topic to research"),

          }),

          execute: async ({ topic }) => {

            const { text } = await researchAgent.generate({

              prompt: topic,

            });

            return { summary: text };

          },

        }),

      },

      stopWhen: stepCountIs(5),

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

TypeScript


import {

  ToolLoopAgent,

  streamText,

  convertToModelMessages,

  tool,

  stepCountIs,

} from "ai";

import { z } from "zod";


// Define a reusable research agent with its own tools and instructions

const researchAgent = new ToolLoopAgent({

  model: researchModel,

  instructions: "You are a research assistant. Be thorough and cite sources.",

  tools: { webSearch: webSearchTool },

  stopWhen: stepCountIs(10),

});


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    const result = streamText({

      model: orchestratorModel,

      messages: await convertToModelMessages(this.messages),

      tools: {

        deepResearch: tool({

          description: "Research a topic in depth",

          inputSchema: z.object({

            topic: z.string().describe("The topic to research"),

          }),

          execute: async ({ topic }) => {

            const { text } = await researchAgent.generate({

              prompt: topic,

            });

            return { summary: text };

          },

        }),

      },

      stopWhen: stepCountIs(5),

    });


    return result.toUIMessageStreamResponse();

  }

}


Explain Code

研究 agent 在自己的上下文中运行 —— 它的 token 预算与编排者完全分开。只有最终摘要会回到父模型那里。

注意

ToolLoopAgent 最适合做子 agent,而不应该在 onChatMessage 内部用来替代 streamText。主 onChatMessage 能直接访问 this.envthis.messagesoptions.body —— 而预先配置好的 ToolLoopAgent 实例无法引用这些。

用预备结果流式展示进度

默认情况下,工具 part 在 execute 返回之前都会显示为加载中。使用异步生成器(async function*),你可以在工具仍在工作时把进度更新流式发送给客户端:

JavaScript


deepResearch: tool({

  description: "Research a topic in depth",

  inputSchema: z.object({

    topic: z.string().describe("The topic to research"),

  }),

  async *execute({ topic }) {

    // Preliminary result — the client sees "searching" immediately

    yield { status: "searching", topic, summary: undefined };


    const { text } = await researchAgent.generate({ prompt: topic });


    // Final result — sent to the model for its next step

    yield { status: "done", topic, summary: text };

  },

});


Explain Code

TypeScript


deepResearch: tool({

  description: "Research a topic in depth",

  inputSchema: z.object({

    topic: z.string().describe("The topic to research"),

  }),

  async *execute({ topic }) {

    // Preliminary result — the client sees "searching" immediately

    yield { status: "searching", topic, summary: undefined };


    const { text } = await researchAgent.generate({ prompt: topic });


    // Final result — sent to the model for its next step

    yield { status: "done", topic, summary: text };

  },

});


Explain Code

每次 yield 都会实时(带 preliminary: true)更新客户端上的工具 part。最后一次 yield 的值会成为模型看到的最终输出。

这种模式适用于:

  • 需要探索大量信息,而这些信息会让主上下文膨胀
  • 想要为长时运行的工具显示实时进度
  • 想要并行进行独立的研究(多个工具调用并发执行)
  • 不同子任务需要不同的模型或 system prompt

更多内容参见 AI SDK Agents docs ↗Subagents ↗Preliminary Tool Results ↗

多客户端同步

当多个客户端连接到同一个 agent 实例时,消息会自动广播给所有连接。如果一个客户端发送了消息,所有其他已连接的客户端都会收到更新后的消息列表。


Client A ──── sendMessage("Hello") ────▶ AIChatAgent

                                              │

                                        persist + stream

                                              │

Client A ◀── CF_AGENT_USE_CHAT_RESPONSE ──────┤

Client B ◀── CF_AGENT_CHAT_MESSAGES ──────────┘


发起请求的客户端会收到流式响应。所有其他客户端会通过 CF_AGENT_CHAT_MESSAGES 广播收到最终的消息。

API 参考

导出

引入路径导出
@cloudflare/ai-chatAIChatAgent, createToolsFromClientSchemas, ChatRecoveryContext, ChatRecoveryOptions
@cloudflare/ai-chat/reactuseAgentChat
@cloudflare/ai-chat/typesMessageType, OutgoingMessage, IncomingMessage

WebSocket 协议

聊天协议在 WebSocket 上使用类型化的 JSON 消息:

消息方向用途
CF_AGENT_USE_CHAT_REQUESTClient → Server发送一条聊天消息
CF_AGENT_USE_CHAT_RESPONSEServer → Client流式返回响应 chunk
CF_AGENT_CHAT_MESSAGESServer → Client广播更新后的消息
CF_AGENT_CHAT_CLEAR双向清空对话
CF_AGENT_CHAT_REQUEST_CANCELClient → Server取消正在进行的流
CF_AGENT_TOOL_RESULTClient → Server提交工具输出
CF_AGENT_TOOL_APPROVALClient → Server批准或拒绝某个工具
CF_AGENT_MESSAGE_UPDATEDServer → Client通知消息已更新
CF_AGENT_STREAM_RESUMINGServer → Client通知正在恢复流
CF_AGENT_STREAM_RESUME_REQUESTClient → Server请求检查流是否可恢复

已废弃的 API

下列 API 已被废弃,使用时会输出 console 警告。它们将在未来版本中被移除。

已废弃替代方案说明
addToolResult({ toolCallId, result })addToolOutput({ toolCallId, output })重命名以与 AI SDK 术语保持一致
createToolsFromClientSchemas()客户端工具现在会自动注册不再需要手动 schema 转换
extractClientToolSchemas()客户端工具现在会自动注册Schema 会随工具结果一起发送
detectToolsRequiringConfirmation()在工具定义上使用 needsApproval审批现在按工具粒度,不再是全局过滤
useAgentChat 的 tools 选项在服务端的 onChatMessage 内定义工具所有工具定义都属于服务端
toolsRequiringConfirmation 选项在每个工具上使用 needsApproval单工具审批取代了全局列表

如果你正在从更早的版本升级,把已废弃的调用替换成对应的替代方案即可。被废弃的 API 仍然能工作,但会在未来的某个大版本中被移除。

下一步

Client SDK useAgent hook 与 AgentClient 类。

Human-in-the-loop 审批流程与人工干预模式。

Build a chat agent 构建你的第一个聊天 Agent 的逐步教程。

Durable execution 用于长时任务的 runFiber()、stash() 与崩溃恢复。

Long-running agents 生命周期、恢复模式以及按 provider 的策略。