存储与同步状态
Agent 提供内置的状态管理,支持自动持久化以及在所有连接的客户端之间实时同步。
概览
Agent 中的状态:
- 持久化 - 自动保存到 SQLite,在重启和休眠后依然存在
- 已同步 - 变化会即时广播到所有连接的 WebSocket 客户端
- 双向 - 服务端和客户端都可以更新状态
- 类型安全 - 使用泛型完整支持 TypeScript
- 立即一致 - 读取你自己的写入
- 线程安全 - 并发更新是安全的
- 快速 - 状态与 Agent 在同一处运行
Agent 的状态存储在每个独立 Agent 实例内部嵌入的 SQL 数据库中。你可以通过更高层的 this.setState API(推荐)与之交互,这会同步状态并在状态变化时触发事件;或者直接使用 this.sql 查询数据库。
State vs Props
State 是持久化的、在重启后存活并跨客户端同步的数据。Props 是 Agent 实例化时传入的一次性初始化参数 - 用 props 来配置不需要持久化的内容。
JavaScript
import { Agent } from "agents";
export class GameAgent extends Agent {
// Default state for new agents
initialState = {
players: [],
score: 0,
status: "waiting",
};
// React to state changes
onStateChanged(state, source) {
if (source !== "server" && state.players.length >= 2) {
// Client added a player, start the game
this.setState({ ...state, status: "playing" });
}
}
addPlayer(name) {
this.setState({
...this.state,
players: [...this.state.players, name],
});
}
}
Explain Code
TypeScript
import { Agent } from "agents";
type GameState = {
players: string[];
score: number;
status: "waiting" | "playing" | "finished";
};
export class GameAgent extends Agent<Env, GameState> {
// Default state for new agents
initialState: GameState = {
players: [],
score: 0,
status: "waiting",
};
// React to state changes
onStateChanged(state: GameState, source: Connection | "server") {
if (source !== "server" && state.players.length >= 2) {
// Client added a player, start the game
this.setState({ ...state, status: "playing" });
}
}
addPlayer(name: string) {
this.setState({
...this.state,
players: [...this.state.players, name],
});
}
}
Explain Code
定义初始状态
使用 initialState 属性为新 Agent 实例定义默认值:
JavaScript
export class ChatAgent extends Agent {
initialState = {
messages: [],
settings: { theme: "dark", notifications: true },
lastActive: null,
};
}
TypeScript
type State = {
messages: Message[];
settings: UserSettings;
lastActive: string | null;
};
export class ChatAgent extends Agent<Env, State> {
initialState: State = {
messages: [],
settings: { theme: "dark", notifications: true },
lastActive: null,
};
}
Explain Code
类型安全
Agent 的第二个泛型参数定义你的状态类型:
JavaScript
// State is fully typed
export class MyAgent extends Agent {
initialState = { count: 0 };
increment() {
// TypeScript knows this.state is MyState
this.setState({ count: this.state.count + 1 });
}
}
TypeScript
// State is fully typed
export class MyAgent extends Agent<Env, MyState> {
initialState: MyState = { count: 0 };
increment() {
// TypeScript knows this.state is MyState
this.setState({ count: this.state.count + 1 });
}
}
初始状态何时生效
初始状态在首次访问时延迟应用,而不是在每次唤醒时:
- 新 Agent - 使用并持久化
initialState - 已存在的 Agent - 从 SQLite 加载已持久化的状态
- 未定义
initialState-this.state为undefined
JavaScript
class MyAgent extends Agent {
initialState = { count: 0 };
async onStart() {
// Safe to access - returns initialState if new, or persisted state
console.log("Current count:", this.state.count);
}
}
TypeScript
class MyAgent extends Agent<Env, { count: number }> {
initialState = { count: 0 };
async onStart() {
// Safe to access - returns initialState if new, or persisted state
console.log("Current count:", this.state.count);
}
}
读取状态
通过 this.state getter 访问当前状态:
JavaScript
class MyAgent extends Agent {
async onRequest(request) {
// Read current state
const { players, status } = this.state;
if (status === "waiting" && players.length < 2) {
return new Response("Waiting for players...");
}
return Response.json(this.state);
}
}
Explain Code
TypeScript
class MyAgent extends Agent<
Env,
{ players: string[]; status: "waiting" | "playing" | "finished" }
> {
async onRequest(request: Request) {
// Read current state
const { players, status } = this.state;
if (status === "waiting" && players.length < 2) {
return new Response("Waiting for players...");
}
return Response.json(this.state);
}
}
Explain Code
未定义的状态
如果你不定义 initialState,this.state 会返回 undefined:
JavaScript
export class MinimalAgent extends Agent {
// No initialState defined
async onConnect(connection) {
if (!this.state) {
// First time - initialize state
this.setState({ initialized: true });
}
}
}
Explain Code
TypeScript
export class MinimalAgent extends Agent {
// No initialState defined
async onConnect(connection: Connection) {
if (!this.state) {
// First time - initialize state
this.setState({ initialized: true });
}
}
}
Explain Code
更新状态
使用 setState() 更新状态。它会:
- 保存到 SQLite(持久化)
- 广播到所有连接的客户端(排除那些 shouldSendProtocolMessages 返回
false的连接) - 触发
onStateChanged()(在广播之后;尽力而为)
JavaScript
// Replace entire state
this.setState({
players: ["Alice", "Bob"],
score: 0,
status: "playing",
});
// Update specific fields (spread existing state)
this.setState({
...this.state,
score: this.state.score + 10,
});
Explain Code
TypeScript
// Replace entire state
this.setState({
players: ["Alice", "Bob"],
score: 0,
status: "playing",
});
// Update specific fields (spread existing state)
this.setState({
...this.state,
score: this.state.score + 10,
});
Explain Code
状态必须可序列化
状态以 JSON 形式存储,因此必须可序列化:
JavaScript
// Good - plain objects, arrays, primitives
this.setState({
items: ["a", "b", "c"],
count: 42,
active: true,
metadata: { key: "value" },
});
// Bad - functions, classes, circular references
// Functions do not serialize
// Dates become strings, lose methods
// Circular references fail
// For dates, use ISO strings
this.setState({
createdAt: new Date().toISOString(),
});
Explain Code
TypeScript
// Good - plain objects, arrays, primitives
this.setState({
items: ["a", "b", "c"],
count: 42,
active: true,
metadata: { key: "value" },
});
// Bad - functions, classes, circular references
// Functions do not serialize
// Dates become strings, lose methods
// Circular references fail
// For dates, use ISO strings
this.setState({
createdAt: new Date().toISOString(),
});
Explain Code
响应状态变化
重写 onStateChanged() 以在状态变化时作出反应(通知/副作用):
JavaScript
class MyAgent extends Agent {
onStateChanged(state, source) {
console.log("State updated:", state);
console.log("Updated by:", source === "server" ? "server" : source.id);
}
}
TypeScript
class MyAgent extends Agent<Env, GameState> {
onStateChanged(state: GameState, source: Connection | "server") {
console.log("State updated:", state);
console.log("Updated by:", source === "server" ? "server" : source.id);
}
}
source 参数
source 表明是谁触发了更新:
| 值 | 含义 |
|---|---|
| “server” | Agent 调用了 setState() |
| Connection | 一个客户端通过 WebSocket 推送了状态 |
这对以下情况很有用:
-
避免无限循环(不要响应自己的更新)
-
校验客户端输入
-
仅在客户端动作时触发副作用
JavaScript
class MyAgent extends Agent {
onStateChanged(state, source) {
// Ignore server-initiated updates
if (source === "server") return;
// A client updated state - validate and process
const connection = source;
console.log(`Client ${connection.id} updated state`);
// Maybe trigger something based on the change
if (state.status === "submitted") {
this.processSubmission(state);
}
}
}
Explain Code
TypeScript
class MyAgent extends Agent<
Env,
{ status: "waiting" | "playing" | "finished" }
> {
onStateChanged(state: GameState, source: Connection | "server") {
// Ignore server-initiated updates
if (source === "server") return;
// A client updated state - validate and process
const connection = source;
console.log(`Client ${connection.id} updated state`);
// Maybe trigger something based on the change
if (state.status === "submitted") {
this.processSubmission(state);
}
}
}
Explain Code
常见模式:由客户端驱动的动作
JavaScript
class MyAgent extends Agent {
onStateChanged(state, source) {
if (source === "server") return;
// Client added a message
const lastMessage = state.messages[state.messages.length - 1];
if (lastMessage && !lastMessage.processed) {
// Process and update
this.setState({
...state,
messages: state.messages.map((m) =>
m.id === lastMessage.id ? { ...m, processed: true } : m,
),
});
}
}
}
Explain Code
TypeScript
class MyAgent extends Agent<Env, { messages: Message[] }> {
onStateChanged(state: State, source: Connection | "server") {
if (source === "server") return;
// Client added a message
const lastMessage = state.messages[state.messages.length - 1];
if (lastMessage && !lastMessage.processed) {
// Process and update
this.setState({
...state,
messages: state.messages.map((m) =>
m.id === lastMessage.id ? { ...m, processed: true } : m,
),
});
}
}
}
Explain Code
校验状态更新
如果你想校验或拒绝状态更新,重写 validateStateChange():
-
在持久化和广播之前运行
-
必须是同步的
-
抛出异常会中止更新
JavaScript
class MyAgent extends Agent {
validateStateChange(nextState, source) {
// Example: reject negative scores
if (nextState.score < 0) {
throw new Error("score cannot be negative");
}
// Example: only allow certain status transitions
if (this.state.status === "finished" && nextState.status !== "finished") {
throw new Error("Cannot restart a finished game");
}
}
}
Explain Code
TypeScript
class MyAgent extends Agent<Env, GameState> {
validateStateChange(nextState: GameState, source: Connection | "server") {
// Example: reject negative scores
if (nextState.score < 0) {
throw new Error("score cannot be negative");
}
// Example: only allow certain status transitions
if (this.state.status === "finished" && nextState.status !== "finished") {
throw new Error("Cannot restart a finished game");
}
}
}
Explain Code
Note
onStateChanged() 不适合用于校验;它是通知 hook,不应阻塞广播。请使用 validateStateChange() 进行校验。
客户端状态同步
状态会自动与连接的客户端同步。
React (useAgent)
JavaScript
import { useAgent } from "agents/react";
function GameUI() {
const agent = useAgent({
agent: "game-agent",
name: "room-123",
onStateUpdate: (state, source) => {
console.log("State updated:", state);
},
});
// Push state to agent
const addPlayer = (name) => {
agent.setState({
...agent.state,
players: [...agent.state.players, name],
});
};
return <div>Players: {agent.state?.players.join(", ")}</div>;
}
Explain Code
TypeScript
import { useAgent } from "agents/react";
function GameUI() {
const agent = useAgent({
agent: "game-agent",
name: "room-123",
onStateUpdate: (state, source) => {
console.log("State updated:", state);
}
});
// Push state to agent
const addPlayer = (name: string) => {
agent.setState({
...agent.state,
players: [...agent.state.players, name]
});
};
return <div>Players: {agent.state?.players.join(", ")}</div>;
}
Explain Code
原生 JS (AgentClient)
JavaScript
import { AgentClient } from "agents/client";
const client = new AgentClient({
agent: "game-agent",
name: "room-123",
onStateUpdate: (state) => {
document.getElementById("score").textContent = state.score;
},
});
// Push state update
client.setState({ ...client.state, score: 100 });
Explain Code
TypeScript
import { AgentClient } from "agents/client";
const client = new AgentClient({
agent: "game-agent",
name: "room-123",
onStateUpdate: (state) => {
document.getElementById("score").textContent = state.score;
},
});
// Push state update
client.setState({ ...client.state, score: 100 });
Explain Code
状态流向
flowchart TD
subgraph Agent
S[“this.state
(persisted in SQLite)”]
end
subgraph Clients
C1[“Client 1”]
C2[“Client 2”]
C3[“Client 3”]
end
C1 & C2 & C3 –>|setState| S
S –>|broadcast via WebSocket| C1 & C2 & C3
来自 Workflows 的状态
使用 Workflows 时,你可以从 workflow 步骤中更新 Agent 状态:
JavaScript
// In your workflow
class MyWorkflow extends Workflow {
async run(event, step) {
// Replace entire state
await step.updateAgentState({ status: "processing", progress: 0 });
// Merge partial updates (preserves other fields)
await step.mergeAgentState({ progress: 50 });
// Reset to initialState
await step.resetAgentState();
return result;
}
}
Explain Code
TypeScript
// In your workflow
class MyWorkflow extends Workflow<Env> {
async run(event: AgentWorkflowEvent, step: AgentWorkflowStep) {
// Replace entire state
await step.updateAgentState({ status: "processing", progress: 0 });
// Merge partial updates (preserves other fields)
await step.mergeAgentState({ progress: 50 });
// Reset to initialState
await step.resetAgentState();
return result;
}
}
Explain Code
这些是持久化操作 - 即使 workflow 重试它们也会持久存在。
SQL API
每个独立的 Agent 实例都有自己的 SQL (SQLite) 数据库,运行在与 Agent 自身相同的上下文中。这意味着在 Agent 内插入或查询数据基本是零延迟的:Agent 不必跨越大洲或全球去访问自己的数据。
你可以在 Agent 的任意方法中通过 this.sql 访问 SQL API。SQL API 接受模板字面量:
JavaScript
export class MyAgent extends Agent {
async onRequest(request) {
let userId = new URL(request.url).searchParams.get("userId");
// 'users' is just an example here: you can create arbitrary tables and define your own schemas
// within each Agent's database using SQL (SQLite syntax).
let [user] = this.sql`SELECT * FROM users WHERE id = ${userId}`;
return Response.json(user);
}
}
Explain Code
TypeScript
export class MyAgent extends Agent {
async onRequest(request: Request) {
let userId = new URL(request.url).searchParams.get("userId");
// 'users' is just an example here: you can create arbitrary tables and define your own schemas
// within each Agent's database using SQL (SQLite syntax).
let [user] = this.sql`SELECT * FROM users WHERE id = ${userId}`;
return Response.json(user);
}
}
Explain Code
你也可以为查询提供 TypeScript 类型参数,该参数会被用于推断结果类型:
JavaScript
export class MyAgent extends Agent {
async onRequest(request) {
let userId = new URL(request.url).searchParams.get("userId");
// Supply the type parameter to the query when calling this.sql
// This assumes the results returns one or more User rows with "id", "name", and "email" columns
const [user] = this.sql`SELECT * FROM users WHERE id = ${userId}`;
return Response.json(user);
}
}
TypeScript
type User = {
id: string;
name: string;
email: string;
};
export class MyAgent extends Agent {
async onRequest(request: Request) {
let userId = new URL(request.url).searchParams.get("userId");
// Supply the type parameter to the query when calling this.sql
// This assumes the results returns one or more User rows with "id", "name", and "email" columns
const [user] = this.sql<User>`SELECT * FROM users WHERE id = ${userId}`;
return Response.json(user);
}
}
Explain Code
不需要指定数组类型(User[] 或 Array<User>),因为 this.sql 总是返回指定类型的数组。
Note
提供类型参数不会校验结果是否符合你的类型定义。如果需要校验传入事件,我们建议使用诸如 zod ↗ 的库或你自己的校验逻辑。
暴露给 Agent 的 SQL API 与 Durable Objects 内 的类似。你可以在 Agent 的数据库上使用相同的 SQL 查询。像在 Durable Objects 或 D1 中一样,创建表并查询数据。
最佳实践
让状态保持小巧
状态在每次变化时都会广播给所有客户端。对于大型数据:
TypeScript
// Bad - storing large arrays in state
initialState = {
allMessages: [] // Could grow to thousands of items
};
// Good - store in SQL, keep state light
initialState = {
messageCount: 0,
lastMessageId: null
};
// Query SQL for full data
async getMessages(limit = 50) {
return this.sql`SELECT * FROM messages ORDER BY created_at DESC LIMIT ${limit}`;
}
Explain Code
乐观更新
为了响应迅速的 UI,立即更新客户端状态:
JavaScript
// Client-side
function sendMessage(text) {
const optimisticMessage = {
id: crypto.randomUUID(),
text,
pending: true,
};
// Update immediately
agent.setState({
...agent.state,
messages: [...agent.state.messages, optimisticMessage],
});
// Server will confirm/update
}
// Server-side
class MyAgent extends Agent {
onStateChanged(state, source) {
if (source === "server") return;
const pendingMessages = state.messages.filter((m) => m.pending);
for (const msg of pendingMessages) {
// Validate and confirm
this.setState({
...state,
messages: state.messages.map((m) =>
m.id === msg.id ? { ...m, pending: false, timestamp: Date.now() } : m,
),
});
}
}
}
Explain Code
TypeScript
// Client-side
function sendMessage(text: string) {
const optimisticMessage = {
id: crypto.randomUUID(),
text,
pending: true,
};
// Update immediately
agent.setState({
...agent.state,
messages: [...agent.state.messages, optimisticMessage],
});
// Server will confirm/update
}
// Server-side
class MyAgent extends Agent<Env, { messages: Message[] }> {
onStateChanged(state: GameState, source: Connection | "server") {
if (source === "server") return;
const pendingMessages = state.messages.filter((m) => m.pending);
for (const msg of pendingMessages) {
// Validate and confirm
this.setState({
...state,
messages: state.messages.map((m) =>
m.id === msg.id ? { ...m, pending: false, timestamp: Date.now() } : m,
),
});
}
}
}
Explain Code
State vs SQL
| 用 State 存储 | 用 SQL 存储 |
|---|---|
| UI 状态(loading、选中项) | 历史数据 |
| 实时计数器 | 大型集合 |
| 当前会话数据 | 关系数据 |
| 配置 | 可查询的数据 |
JavaScript
export class ChatAgent extends Agent {
// State: current UI state
initialState = {
typing: [],
unreadCount: 0,
activeUsers: [],
};
// SQL: message history
async getMessages(limit = 100) {
return this.sql`
SELECT * FROM messages
ORDER BY created_at DESC
LIMIT ${limit}
`;
}
async saveMessage(message) {
this.sql`
INSERT INTO messages (id, text, user_id, created_at)
VALUES (${message.id}, ${message.text}, ${message.userId}, ${Date.now()})
`;
// Update state for real-time UI
this.setState({
...this.state,
unreadCount: this.state.unreadCount + 1,
});
}
}
Explain Code
TypeScript
export class ChatAgent extends Agent {
// State: current UI state
initialState = {
typing: [],
unreadCount: 0,
activeUsers: [],
};
// SQL: message history
async getMessages(limit = 100) {
return this.sql`
SELECT * FROM messages
ORDER BY created_at DESC
LIMIT ${limit}
`;
}
async saveMessage(message: Message) {
this.sql`
INSERT INTO messages (id, text, user_id, created_at)
VALUES (${message.id}, ${message.text}, ${message.userId}, ${Date.now()})
`;
// Update state for real-time UI
this.setState({
...this.state,
unreadCount: this.state.unreadCount + 1,
});
}
}
Explain Code
避免无限循环
要小心,不要在响应自己的更新时再触发状态更新:
TypeScript
// Bad - infinite loop
onStateChanged(state: State) {
this.setState({ ...state, lastUpdated: Date.now() });
}
// Good - check source
onStateChanged(state: State, source: Connection | "server") {
if (source === "server") return; // Do not react to own updates
this.setState({ ...state, lastUpdated: Date.now() });
}
Explain Code
把 Agent 状态用作模型上下文
你可以将 Agent 中的 state 与 SQL API,与它调用 AI 模型 的能力结合起来,把历史上下文包含进对模型的提示词中。现代大型语言模型 (LLM) 通常具有非常大的上下文窗口(可达数百万 token),允许你直接把相关上下文拉进提示词。
例如,你可以使用 Agent 的内置 SQL 数据库拉取历史,带着它去查询模型,然后再把回复追加到该历史中,以备下一次模型调用使用:
JavaScript
export class ReasoningAgent extends Agent {
async callReasoningModel(prompt) {
let result = this
.sql`SELECT * FROM history WHERE user = ${prompt.userId} ORDER BY timestamp DESC LIMIT 1000`;
let context = [];
for (const row of result) {
context.push(row.entry);
}
const systemPrompt = prompt.system || "You are a helpful assistant.";
const userPrompt = `${prompt.user}\n\nUser history:\n${context.join("\n")}`;
try {
const response = await this.env.AI.run("@cf/zai-org/glm-4.7-flash", {
messages: [
{ role: "system", content: systemPrompt },
{ role: "user", content: userPrompt },
],
});
// Store the response in history
this
.sql`INSERT INTO history (timestamp, user, entry) VALUES (${new Date()}, ${prompt.userId}, ${response.response})`;
return response.response;
} catch (error) {
console.error("Error calling reasoning model:", error);
throw error;
}
}
}
Explain Code
TypeScript
interface Env {
AI: Ai;
}
export class ReasoningAgent extends Agent<Env> {
async callReasoningModel(prompt: Prompt) {
let result = this
.sql<History>`SELECT * FROM history WHERE user = ${prompt.userId} ORDER BY timestamp DESC LIMIT 1000`;
let context = [];
for (const row of result) {
context.push(row.entry);
}
const systemPrompt = prompt.system || "You are a helpful assistant.";
const userPrompt = `${prompt.user}\n\nUser history:\n${context.join("\n")}`;
try {
const response = await this.env.AI.run("@cf/zai-org/glm-4.7-flash", {
messages: [
{ role: "system", content: systemPrompt },
{ role: "user", content: userPrompt },
],
});
// Store the response in history
this
.sql`INSERT INTO history (timestamp, user, entry) VALUES (${new Date()}, ${prompt.userId}, ${response.response})`;
return response.response;
} catch (error) {
console.error("Error calling reasoning model:", error);
throw error;
}
}
}
Explain Code
之所以可行,是因为每个 Agent 实例都有自己的数据库,而存储在该数据库中的状态对该 Agent 是私有的:无论它是代表某个用户、某个房间或频道,还是某个深度研究工具行事。默认情况下,你不必管理竞争或访问中央化数据库来检索和存储状态。
API 参考
属性
| 属性 | 类型 | 描述 |
|---|---|---|
| state | State | 当前状态(getter) |
| initialState | State | 新 Agent 的默认状态 |
方法
| 方法 | 签名 | 描述 |
|---|---|---|
| setState | (state: State) => void | 更新状态、持久化并广播 |
| onStateChanged | (state: State, source: Connection | “server”) => void | 状态变化时调用 |
| validateStateChange | (nextState: State, source: Connection | “server”) => void | 在持久化前进行校验(抛出异常即拒绝) |
Workflow 步骤方法
| 方法 | 描述 |
|---|---|
| step.updateAgentState(state) | 从 workflow 替换 Agent 状态 |
| step.mergeAgentState(partial) | 从 workflow 合并部分状态 |
| step.resetAgentState() | 从 workflow 重置为 initialState |
后续步骤
Agents API Agents SDK 的完整 API 参考。
构建一个聊天 Agent 构建并部署一个 AI 聊天 Agent。
WebSockets 构建带实时数据流的交互式 Agent。
运行 Workflows 在你的 Agent 中编排异步 workflow。