Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

自主响应

在没有人工操作的情况下,从服务端发送消息并触发 LLM 响应。可用于定时跟进、队列处理、邮件触发的响应,以及自主的 Agent workflow。

概览

在典型的聊天流程中,用户发送消息,Agent 响应。但 Agent 经常需要主动行动——一个定时提醒触发了、一个 webhook 到达、一个 workflow 完成、或者 Agent 在审视自己的响应后决定继续。

关键原语:

原语角色
saveMessages注入一条消息并触发 LLM——sendMessage 在服务端的等价物
persistMessages存储消息但不触发响应——用于静默注入上下文
onChatResponse在任何响应完成时作出反应,包括你没有发起的那些
isServerStreaming客户端标志:当存在服务端发起的流时为 true

saveMessages vs persistMessages

saveMessages 将消息持久化到 SQLite 触发 onChatMessage 以产生新的 LLM 响应。它是可 await 的——返回后,LLM 已经响应,消息已被持久化。

persistMessages 存储消息并广播给已连接的客户端,但 触发模型回合。当你想把上下文(例如系统消息或后台数据)注入到对话中而不启动响应时使用它。

何时使用 saveMessagesonChatResponse

当你控制触发时机时,使用 saveMessages —— schedule 回调、webhook、邮件 handler,或任何由你决定何时注入消息的方法。

当你需要响应那些不是你触发的响应时,使用 onChatResponse —— 由用户发起的消息、工具批准后的自动续写,或框架代你运行的任何回合。

waitUntilStable

在从 schedule 回调、webhook、邮件 handler 或其他非 chat 入口读取 this.messages 或调用 saveMessages 之前,务必先调用 waitUntilStable()

waitUntilStable() 等待对话完全稳定:

  • 没有进行中的 LLM 流
  • 没有挂起的客户端工具交互(用户尚未提供的工具结果或批准)
  • 没有排队的续写回合

它在稳定时返回 true,如果在挂起的交互被解决前超时则返回 false。如果没有任何挂起内容,会立即返回。

JavaScript


const stable = await this.waitUntilStable({ timeout: 30_000 });

if (!stable) {

  // The conversation is blocked on a user interaction or an in-flight

  // stream that did not complete within 30 seconds.

  console.warn("Conversation not stable, skipping server-driven message");

  return;

}

// Safe to read this.messages and call saveMessages.


TypeScript


const stable = await this.waitUntilStable({ timeout: 30_000 });

if (!stable) {

  // The conversation is blocked on a user interaction or an in-flight

  // stream that did not complete within 30 seconds.

  console.warn("Conversation not stable, skipping server-driven message");

  return;

}

// Safe to read this.messages and call saveMessages.


如果没有这个守卫,你可能会读取陈旧的消息,或与进行中的流发生重叠。

触发模式

Cron schedule

一个每日摘要 Agent,每天早晨汇总活动。Cron schedule 默认是幂等的,因此在 onStart 中调用 schedule() 是安全的——在 Durable Object 重启后不会创建副本。

JavaScript


import { AIChatAgent } from "@cloudflare/ai-chat";


export class DigestAgent extends AIChatAgent {

  async onChatMessage() {

    // ... your LLM call

  }


  async onStart() {

    await this.schedule("0 9 * * *", "dailyDigest");

  }


  async dailyDigest() {

    const stable = await this.waitUntilStable({ timeout: 30_000 });

    if (!stable) {

      console.warn("Conversation not stable, skipping daily digest");

      return;

    }


    await this.saveMessages((messages) => [

      ...messages,

      {

        id: crypto.randomUUID(),

        role: "user",

        parts: [

          {

            type: "text",

            text: "Summarize what happened since your last digest.",

          },

        ],

        createdAt: new Date(),

      },

    ]);

    // At this point the LLM has responded and the message is persisted.

  }

}


Explain Code

TypeScript


import { AIChatAgent } from "@cloudflare/ai-chat";


export class DigestAgent extends AIChatAgent {

  async onChatMessage() {

    // ... your LLM call

  }


  async onStart() {

    await this.schedule("0 9 * * *", "dailyDigest");

  }


  async dailyDigest() {

    const stable = await this.waitUntilStable({ timeout: 30_000 });

    if (!stable) {

      console.warn("Conversation not stable, skipping daily digest");

      return;

    }


    await this.saveMessages((messages) => [

      ...messages,

      {

        id: crypto.randomUUID(),

        role: "user",

        parts: [

          {

            type: "text",

            text: "Summarize what happened since your last digest.",

          },

        ],

        createdAt: new Date(),

      },

    ]);

    // At this point the LLM has responded and the message is persisted.

  }

}


Explain Code

saveMessages 的函数形式 — saveMessages((messages) => [...]) —— 在执行时读取最新的持久化消息。这避免了多个调用排队时(例如频繁到达的 webhook)出现陈旧的基线。关于 schedule() 与 cron 语法的更多内容,请参阅 调度任务

处理队列

当你控制触发时机时,简单的循环是最清晰的模式:

TypeScript


async processQueue() {

  for (const task of this.taskQueue) {

    const stable = await this.waitUntilStable({ timeout: 30_000 });

    if (!stable) {

      console.warn("Conversation not stable, stopping queue processing");

      break;

    }


    await this.saveMessages((messages) => [

      ...messages,

      {

        id: crypto.randomUUID(),

        role: "user",

        parts: [{ type: "text", text: task }],

        createdAt: new Date(),

      },

    ]);

    // LLM has responded. this.messages is updated. Next iteration.

  }

  this.taskQueue = [];

}


Explain Code

不需要特殊 hook —— saveMessages 在整个回合完成后返回。

由邮件触发

TypeScript


async onEmail(email: AgentEmail) {

  const stable = await this.waitUntilStable({ timeout: 30_000 });

  if (!stable) {

    console.warn("Conversation not stable, cannot process email");

    return;

  }


  const subject = email.headers.get("subject") ?? "(no subject)";

  const body = await new Response(email.raw).text();


  await this.saveMessages((messages) => [

    ...messages,

    {

      id: crypto.randomUUID(),

      role: "user",

      parts: [

        {

          type: "text",

          text: `Email from ${email.from}: ${subject}\n\n${body}`,

        },

      ],

      createdAt: new Date(),

    },

  ]);

}


Explain Code

由 webhook 触发

TypeScript


async onRequest(request: Request): Promise<Response> {

  const url = new URL(request.url);


  if (url.pathname.endsWith("/webhook") && request.method === "POST") {

    const stable = await this.waitUntilStable({ timeout: 30_000 });

    if (!stable) {

      return new Response("Agent is busy", { status: 503 });

    }


    const payload = await request.json();

    try {

      await this.saveMessages((messages) => [

        ...messages,

        {

          id: crypto.randomUUID(),

          role: "user",

          parts: [

            {

              type: "text",

              text: `Webhook event: ${JSON.stringify(payload)}`,

            },

          ],

          createdAt: new Date(),

        },

      ]);

      return new Response("ok");

    } catch (error) {

      console.error("Failed to process webhook:", error);

      return new Response("Internal error", { status: 500 });

    }

  }


  return super.onRequest(request);

}


Explain Code

注入上下文但不触发响应

使用 persistMessages 添加 LLM 在下一次回合中将看到的消息,但当下不开启回合:

TypeScript


async addBackgroundContext(data: string) {

  const stable = await this.waitUntilStable({ timeout: 30_000 });

  if (!stable) return;


  await this.persistMessages([

    ...this.messages,

    {

      id: crypto.randomUUID(),

      role: "user",

      parts: [{ type: "text", text: `[Background context]: ${data}` }],

      createdAt: new Date(),

    },

  ]);

  // Message is stored and broadcast to clients, but no LLM call happens.

}


Explain Code

响应你没有发起的响应

onChatResponse每个 完成的回合后触发——用户发起的消息、saveMessages 调用,以及自动续写。无论响应是如何被触发的,当你需要观察或对其做出反应时,使用它。

广播状态

JavaScript


import { AIChatAgent } from "@cloudflare/ai-chat";


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    // ... your LLM call

  }


  async onChatResponse(result) {

    if (result.status === "completed") {

      this.broadcast(JSON.stringify({ streaming: false }));

    }

  }

}


Explain Code

TypeScript


import { AIChatAgent, type ChatResponseResult } from "@cloudflare/ai-chat";


export class ChatAgent extends AIChatAgent {

  async onChatMessage() {

    // ... your LLM call

  }


  protected async onChatResponse(result: ChatResponseResult) {

    if (result.status === "completed") {

      this.broadcast(JSON.stringify({ streaming: false }));

    }

  }

}


Explain Code

分析

TypeScript


protected async onChatResponse(result: ChatResponseResult) {

  try {

    await fetch("https://analytics.example.com/event", {

      method: "POST",

      body: JSON.stringify({

        requestId: result.requestId,

        status: result.status,

        continuation: result.continuation,

      }),

    });

  } catch (error) {

    console.error("Analytics reporting failed:", error);

  }

}


Explain Code

链式推理

Agent 可以审视自己的响应,并决定是否继续。这同样适用于由用户发起的消息——你无法预测用户会问什么,但你可以对 Agent 说了什么作出反应。

TypeScript


protected async onChatResponse(result: ChatResponseResult) {

  if (result.status !== "completed") return;


  const lastText = result.message.parts

    .filter((p) => p.type === "text")

    .map((p) => p.text)

    .join("");


  if (lastText.includes("[NEEDS_MORE_RESEARCH]")) {

    await this.saveMessages((messages) => [

      ...messages,

      {

        id: crypto.randomUUID(),

        role: "user",

        parts: [{ type: "text", text: "Continue your research." }],

        createdAt: new Date(),

      },

    ]);

  }

}


Explain Code

当从 onChatResponse 内部调用 saveMessages 时,内部回合会运行至完成,然后 saveMessages 返回。当前的 onChatResponse 调用返回后,框架会再次为内部响应触发 onChatResponse。这一过程会一直持续到没有更多排队的工作为止。框架不会嵌套 onChatResponse 调用——结果是顺序排空的。

反应式队列处理

当队列项可以由外部事件(用户消息、webhook)在任意时刻添加时,onChatResponse 让你在每次响应后都能排空队列,无论是谁触发的:

TypeScript


protected async onChatResponse(result: ChatResponseResult) {

  if (result.status === "completed" && this.taskQueue.length > 0) {

    const next = this.taskQueue.shift()!;

    await this.saveMessages((messages) => [

      ...messages,

      {

        id: crypto.randomUUID(),

        role: "user",

        parts: [{ type: "text", text: next }],

        createdAt: new Date(),

      },

    ]);

  }

}


Explain Code

ChatResponseResult 字段

字段类型描述
messageUIMessage最终的 assistant 消息
requestIdstring该回合的唯一 ID
continuationboolean如果是自动续写则为 true
status“completed” | “error”“aborted”该回合如何结束
errorstring | undefined当 status 为 “error” 时的错误详情

客户端:检测服务端发起的流

当服务端通过 saveMessages 触发流时,AI SDK 的 status 仍为 "ready",因为客户端并未发起请求。useAgentChat hook 提供了两个额外的标志来处理这种情况:

标志跟踪的内容
statusAI SDK 生命周期:“submitted”、“streaming”、“ready”、“error”——仅适用于客户端发起的请求
isServerStreaming当存在服务端发起的流时为 true
isStreaming当客户端或服务端流处于活动状态时为 true——使用它作为通用指示器

对于大多数 UI 关注点(禁用发送按钮、显示加载指示器),使用 isStreaming。仅当你需要区分用户发起与服务端发起的流时(例如显示“Agent 正在后台工作…“这样的不同提示),才使用 isServerStreaming


import { useAgent } from "agents/react";

import { useAgentChat } from "@cloudflare/ai-chat/react";


function Chat() {

  const agent = useAgent({ agent: "ChatAgent" });

  const { messages, sendMessage, isStreaming, isServerStreaming } =

    useAgentChat({ agent });


  return (

    <div>

      {messages.map((m) => (

        <div key={m.id}>{/* render message */}</div>

      ))}


      {isServerStreaming && <div>Agent is working in the background...</div>}

      {!isServerStreaming && isStreaming && <div>Agent is responding...</div>}


      <form

        onSubmit={(e) => {

          e.preventDefault();

          const input = e.currentTarget.elements.namedItem(

            "input",

          ) as HTMLInputElement;

          sendMessage({ text: input.value });

          input.value = "";

        }}

      >

        <input name="input" placeholder="Type a message..." />

        <button type="submit" disabled={isStreaming}>

          Send

        </button>

      </form>

    </div>

  );

}


Explain Code

当用户空闲时,服务端驱动的响应到来,已连接的客户端会实时看到新消息出现。在流运行期间,isStreaming 标志会从 false 变为 true 再变回 false,因此发送按钮等 UI 元素会自动禁用并重新启用。

messageConcurrency 的交互

AIChatAgent 上的 messageConcurrency 设置控制重叠的用户提交如何处理("queue""latest""merge""drop""debounce")。该设置仅适用于 sendMessage() —— 来自客户端的、由用户发起的消息。

saveMessages() 始终使用串行(排队)行为,而无视 messageConcurrency 设置。这意味着服务端驱动的消息绝不会被丢弃、合并或防抖——它们总是排队并按顺序执行。

与其他 Agent 原语结合

原语如何结合
schedule()调度一个调用 saveMessages 的回调——参见上面的 cron 示例
queue()将一个调用 saveMessages 的方法入队以延迟处理
runWorkflow()启动一个 Workflow;使用 AgentWorkflow.agent RPC 调用一个触发 saveMessages 的方法
onEmail()将邮件内容转换为聊天消息并调用 saveMessages
onRequest()处理 webhook 并调用 saveMessages
this.broadcast()从 onChatResponse 广播自定义状态

重要提示

  • saveMessages 是可 await 的。 返回后,LLM 已响应,消息已被持久化。当你控制触发时机时使用它。
  • 使用 saveMessages 的函数形式。 saveMessages((messages) => [...messages, newMsg]) 在执行时读取最新的持久化消息,避免多个调用排队时出现陈旧基线。
  • persistMessages 不会触发响应。 用它来静默注入上下文或系统消息。
  • onChatResponse 用于响应你没有发起的回合。 用于由用户发起的消息、自动续写,或任何你没有自己调用 saveMessages 的回合。
  • onChatResponse 不会嵌套。 当从 onChatResponse 内部调用 saveMessages 时,内部回合完成后,onChatResponse 会再次按顺序触发——而非递归。
  • 消息在 onChatResponse 触发之前就被持久化了。 如果 Durable Object 在 hook 期间被驱逐,对话在 SQLite 中是安全的——只有 hook 回调丢失。
  • 在注入前调用 waitUntilStable() 在 schedule 回调、webhook 或其他非 chat 入口中始终调用它,以避免与进行中的流或挂起的工具交互重叠。
  • 客户端在 onChatResponse 运行前看到完成的响应。 服务端 hook 不会延迟客户端。
  • messageConcurrency 不影响 saveMessages 服务端驱动的消息总是排队并按顺序执行。

后续步骤

聊天 Agent AIChatAgent、saveMessages、persistMessages 和 onChatResponse 的完整 API 参考。

调度任务 Agent 回调的延迟、cron 与间隔调度。

Webhooks 接收 webhook 事件并将它们路由到 Agent 实例。

邮件路由 在 Agent 中处理入站邮件。