自主响应
在没有人工操作的情况下,从服务端发送消息并触发 LLM 响应。可用于定时跟进、队列处理、邮件触发的响应,以及自主的 Agent workflow。
概览
在典型的聊天流程中,用户发送消息,Agent 响应。但 Agent 经常需要主动行动——一个定时提醒触发了、一个 webhook 到达、一个 workflow 完成、或者 Agent 在审视自己的响应后决定继续。
关键原语:
| 原语 | 角色 |
|---|---|
| saveMessages | 注入一条消息并触发 LLM——sendMessage 在服务端的等价物 |
| persistMessages | 存储消息但不触发响应——用于静默注入上下文 |
| onChatResponse | 在任何响应完成时作出反应,包括你没有发起的那些 |
| isServerStreaming | 客户端标志:当存在服务端发起的流时为 true |
saveMessages vs persistMessages
saveMessages 将消息持久化到 SQLite 并 触发 onChatMessage 以产生新的 LLM 响应。它是可 await 的——返回后,LLM 已经响应,消息已被持久化。
persistMessages 存储消息并广播给已连接的客户端,但 不 触发模型回合。当你想把上下文(例如系统消息或后台数据)注入到对话中而不启动响应时使用它。
何时使用 saveMessages 与 onChatResponse
当你控制触发时机时,使用 saveMessages —— schedule 回调、webhook、邮件 handler,或任何由你决定何时注入消息的方法。
当你需要响应那些不是你触发的响应时,使用 onChatResponse —— 由用户发起的消息、工具批准后的自动续写,或框架代你运行的任何回合。
waitUntilStable
在从 schedule 回调、webhook、邮件 handler 或其他非 chat 入口读取 this.messages 或调用 saveMessages 之前,务必先调用 waitUntilStable()。
waitUntilStable() 等待对话完全稳定:
- 没有进行中的 LLM 流
- 没有挂起的客户端工具交互(用户尚未提供的工具结果或批准)
- 没有排队的续写回合
它在稳定时返回 true,如果在挂起的交互被解决前超时则返回 false。如果没有任何挂起内容,会立即返回。
JavaScript
const stable = await this.waitUntilStable({ timeout: 30_000 });
if (!stable) {
// The conversation is blocked on a user interaction or an in-flight
// stream that did not complete within 30 seconds.
console.warn("Conversation not stable, skipping server-driven message");
return;
}
// Safe to read this.messages and call saveMessages.
TypeScript
const stable = await this.waitUntilStable({ timeout: 30_000 });
if (!stable) {
// The conversation is blocked on a user interaction or an in-flight
// stream that did not complete within 30 seconds.
console.warn("Conversation not stable, skipping server-driven message");
return;
}
// Safe to read this.messages and call saveMessages.
如果没有这个守卫,你可能会读取陈旧的消息,或与进行中的流发生重叠。
触发模式
Cron schedule
一个每日摘要 Agent,每天早晨汇总活动。Cron schedule 默认是幂等的,因此在 onStart 中调用 schedule() 是安全的——在 Durable Object 重启后不会创建副本。
JavaScript
import { AIChatAgent } from "@cloudflare/ai-chat";
export class DigestAgent extends AIChatAgent {
async onChatMessage() {
// ... your LLM call
}
async onStart() {
await this.schedule("0 9 * * *", "dailyDigest");
}
async dailyDigest() {
const stable = await this.waitUntilStable({ timeout: 30_000 });
if (!stable) {
console.warn("Conversation not stable, skipping daily digest");
return;
}
await this.saveMessages((messages) => [
...messages,
{
id: crypto.randomUUID(),
role: "user",
parts: [
{
type: "text",
text: "Summarize what happened since your last digest.",
},
],
createdAt: new Date(),
},
]);
// At this point the LLM has responded and the message is persisted.
}
}
Explain Code
TypeScript
import { AIChatAgent } from "@cloudflare/ai-chat";
export class DigestAgent extends AIChatAgent {
async onChatMessage() {
// ... your LLM call
}
async onStart() {
await this.schedule("0 9 * * *", "dailyDigest");
}
async dailyDigest() {
const stable = await this.waitUntilStable({ timeout: 30_000 });
if (!stable) {
console.warn("Conversation not stable, skipping daily digest");
return;
}
await this.saveMessages((messages) => [
...messages,
{
id: crypto.randomUUID(),
role: "user",
parts: [
{
type: "text",
text: "Summarize what happened since your last digest.",
},
],
createdAt: new Date(),
},
]);
// At this point the LLM has responded and the message is persisted.
}
}
Explain Code
saveMessages 的函数形式 — saveMessages((messages) => [...]) —— 在执行时读取最新的持久化消息。这避免了多个调用排队时(例如频繁到达的 webhook)出现陈旧的基线。关于 schedule() 与 cron 语法的更多内容,请参阅 调度任务。
处理队列
当你控制触发时机时,简单的循环是最清晰的模式:
TypeScript
async processQueue() {
for (const task of this.taskQueue) {
const stable = await this.waitUntilStable({ timeout: 30_000 });
if (!stable) {
console.warn("Conversation not stable, stopping queue processing");
break;
}
await this.saveMessages((messages) => [
...messages,
{
id: crypto.randomUUID(),
role: "user",
parts: [{ type: "text", text: task }],
createdAt: new Date(),
},
]);
// LLM has responded. this.messages is updated. Next iteration.
}
this.taskQueue = [];
}
Explain Code
不需要特殊 hook —— saveMessages 在整个回合完成后返回。
由邮件触发
TypeScript
async onEmail(email: AgentEmail) {
const stable = await this.waitUntilStable({ timeout: 30_000 });
if (!stable) {
console.warn("Conversation not stable, cannot process email");
return;
}
const subject = email.headers.get("subject") ?? "(no subject)";
const body = await new Response(email.raw).text();
await this.saveMessages((messages) => [
...messages,
{
id: crypto.randomUUID(),
role: "user",
parts: [
{
type: "text",
text: `Email from ${email.from}: ${subject}\n\n${body}`,
},
],
createdAt: new Date(),
},
]);
}
Explain Code
由 webhook 触发
TypeScript
async onRequest(request: Request): Promise<Response> {
const url = new URL(request.url);
if (url.pathname.endsWith("/webhook") && request.method === "POST") {
const stable = await this.waitUntilStable({ timeout: 30_000 });
if (!stable) {
return new Response("Agent is busy", { status: 503 });
}
const payload = await request.json();
try {
await this.saveMessages((messages) => [
...messages,
{
id: crypto.randomUUID(),
role: "user",
parts: [
{
type: "text",
text: `Webhook event: ${JSON.stringify(payload)}`,
},
],
createdAt: new Date(),
},
]);
return new Response("ok");
} catch (error) {
console.error("Failed to process webhook:", error);
return new Response("Internal error", { status: 500 });
}
}
return super.onRequest(request);
}
Explain Code
注入上下文但不触发响应
使用 persistMessages 添加 LLM 在下一次回合中将看到的消息,但当下不开启回合:
TypeScript
async addBackgroundContext(data: string) {
const stable = await this.waitUntilStable({ timeout: 30_000 });
if (!stable) return;
await this.persistMessages([
...this.messages,
{
id: crypto.randomUUID(),
role: "user",
parts: [{ type: "text", text: `[Background context]: ${data}` }],
createdAt: new Date(),
},
]);
// Message is stored and broadcast to clients, but no LLM call happens.
}
Explain Code
响应你没有发起的响应
onChatResponse 在 每个 完成的回合后触发——用户发起的消息、saveMessages 调用,以及自动续写。无论响应是如何被触发的,当你需要观察或对其做出反应时,使用它。
广播状态
JavaScript
import { AIChatAgent } from "@cloudflare/ai-chat";
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
// ... your LLM call
}
async onChatResponse(result) {
if (result.status === "completed") {
this.broadcast(JSON.stringify({ streaming: false }));
}
}
}
Explain Code
TypeScript
import { AIChatAgent, type ChatResponseResult } from "@cloudflare/ai-chat";
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
// ... your LLM call
}
protected async onChatResponse(result: ChatResponseResult) {
if (result.status === "completed") {
this.broadcast(JSON.stringify({ streaming: false }));
}
}
}
Explain Code
分析
TypeScript
protected async onChatResponse(result: ChatResponseResult) {
try {
await fetch("https://analytics.example.com/event", {
method: "POST",
body: JSON.stringify({
requestId: result.requestId,
status: result.status,
continuation: result.continuation,
}),
});
} catch (error) {
console.error("Analytics reporting failed:", error);
}
}
Explain Code
链式推理
Agent 可以审视自己的响应,并决定是否继续。这同样适用于由用户发起的消息——你无法预测用户会问什么,但你可以对 Agent 说了什么作出反应。
TypeScript
protected async onChatResponse(result: ChatResponseResult) {
if (result.status !== "completed") return;
const lastText = result.message.parts
.filter((p) => p.type === "text")
.map((p) => p.text)
.join("");
if (lastText.includes("[NEEDS_MORE_RESEARCH]")) {
await this.saveMessages((messages) => [
...messages,
{
id: crypto.randomUUID(),
role: "user",
parts: [{ type: "text", text: "Continue your research." }],
createdAt: new Date(),
},
]);
}
}
Explain Code
当从 onChatResponse 内部调用 saveMessages 时,内部回合会运行至完成,然后 saveMessages 返回。当前的 onChatResponse 调用返回后,框架会再次为内部响应触发 onChatResponse。这一过程会一直持续到没有更多排队的工作为止。框架不会嵌套 onChatResponse 调用——结果是顺序排空的。
反应式队列处理
当队列项可以由外部事件(用户消息、webhook)在任意时刻添加时,onChatResponse 让你在每次响应后都能排空队列,无论是谁触发的:
TypeScript
protected async onChatResponse(result: ChatResponseResult) {
if (result.status === "completed" && this.taskQueue.length > 0) {
const next = this.taskQueue.shift()!;
await this.saveMessages((messages) => [
...messages,
{
id: crypto.randomUUID(),
role: "user",
parts: [{ type: "text", text: next }],
createdAt: new Date(),
},
]);
}
}
Explain Code
ChatResponseResult 字段
| 字段 | 类型 | 描述 | |
|---|---|---|---|
| message | UIMessage | 最终的 assistant 消息 | |
| requestId | string | 该回合的唯一 ID | |
| continuation | boolean | 如果是自动续写则为 true | |
| status | “completed” | “error” | “aborted” | 该回合如何结束 |
| error | string | undefined | 当 status 为 “error” 时的错误详情 |
客户端:检测服务端发起的流
当服务端通过 saveMessages 触发流时,AI SDK 的 status 仍为 "ready",因为客户端并未发起请求。useAgentChat hook 提供了两个额外的标志来处理这种情况:
| 标志 | 跟踪的内容 |
|---|---|
| status | AI SDK 生命周期:“submitted”、“streaming”、“ready”、“error”——仅适用于客户端发起的请求 |
| isServerStreaming | 当存在服务端发起的流时为 true |
| isStreaming | 当客户端或服务端流处于活动状态时为 true——使用它作为通用指示器 |
对于大多数 UI 关注点(禁用发送按钮、显示加载指示器),使用 isStreaming。仅当你需要区分用户发起与服务端发起的流时(例如显示“Agent 正在后台工作…“这样的不同提示),才使用 isServerStreaming。
import { useAgent } from "agents/react";
import { useAgentChat } from "@cloudflare/ai-chat/react";
function Chat() {
const agent = useAgent({ agent: "ChatAgent" });
const { messages, sendMessage, isStreaming, isServerStreaming } =
useAgentChat({ agent });
return (
<div>
{messages.map((m) => (
<div key={m.id}>{/* render message */}</div>
))}
{isServerStreaming && <div>Agent is working in the background...</div>}
{!isServerStreaming && isStreaming && <div>Agent is responding...</div>}
<form
onSubmit={(e) => {
e.preventDefault();
const input = e.currentTarget.elements.namedItem(
"input",
) as HTMLInputElement;
sendMessage({ text: input.value });
input.value = "";
}}
>
<input name="input" placeholder="Type a message..." />
<button type="submit" disabled={isStreaming}>
Send
</button>
</form>
</div>
);
}
Explain Code
当用户空闲时,服务端驱动的响应到来,已连接的客户端会实时看到新消息出现。在流运行期间,isStreaming 标志会从 false 变为 true 再变回 false,因此发送按钮等 UI 元素会自动禁用并重新启用。
与 messageConcurrency 的交互
AIChatAgent 上的 messageConcurrency 设置控制重叠的用户提交如何处理("queue"、"latest"、"merge"、"drop"、"debounce")。该设置仅适用于 sendMessage() —— 来自客户端的、由用户发起的消息。
saveMessages() 始终使用串行(排队)行为,而无视 messageConcurrency 设置。这意味着服务端驱动的消息绝不会被丢弃、合并或防抖——它们总是排队并按顺序执行。
与其他 Agent 原语结合
| 原语 | 如何结合 |
|---|---|
| schedule() | 调度一个调用 saveMessages 的回调——参见上面的 cron 示例 |
| queue() | 将一个调用 saveMessages 的方法入队以延迟处理 |
| runWorkflow() | 启动一个 Workflow;使用 AgentWorkflow.agent RPC 调用一个触发 saveMessages 的方法 |
| onEmail() | 将邮件内容转换为聊天消息并调用 saveMessages |
| onRequest() | 处理 webhook 并调用 saveMessages |
| this.broadcast() | 从 onChatResponse 广播自定义状态 |
重要提示
saveMessages是可 await 的。 返回后,LLM 已响应,消息已被持久化。当你控制触发时机时使用它。- 使用
saveMessages的函数形式。saveMessages((messages) => [...messages, newMsg])在执行时读取最新的持久化消息,避免多个调用排队时出现陈旧基线。 persistMessages不会触发响应。 用它来静默注入上下文或系统消息。onChatResponse用于响应你没有发起的回合。 用于由用户发起的消息、自动续写,或任何你没有自己调用saveMessages的回合。onChatResponse不会嵌套。 当从onChatResponse内部调用saveMessages时,内部回合完成后,onChatResponse会再次按顺序触发——而非递归。- 消息在
onChatResponse触发之前就被持久化了。 如果 Durable Object 在 hook 期间被驱逐,对话在 SQLite 中是安全的——只有 hook 回调丢失。 - 在注入前调用
waitUntilStable()。 在 schedule 回调、webhook 或其他非 chat 入口中始终调用它,以避免与进行中的流或挂起的工具交互重叠。 - 客户端在
onChatResponse运行前看到完成的响应。 服务端 hook 不会延迟客户端。 messageConcurrency不影响saveMessages。 服务端驱动的消息总是排队并按顺序执行。
后续步骤
聊天 Agent AIChatAgent、saveMessages、persistMessages 和 onChatResponse 的完整 API 参考。
调度任务 Agent 回调的延迟、cron 与间隔调度。
Webhooks 接收 webhook 事件并将它们路由到 Agent 实例。
邮件路由 在 Agent 中处理入站邮件。