Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

存储与同步状态

Agent 提供内置的状态管理,支持自动持久化以及在所有连接的客户端之间实时同步。

概览

Agent 中的状态:

  • 持久化 - 自动保存到 SQLite,在重启和休眠后依然存在
  • 已同步 - 变化会即时广播到所有连接的 WebSocket 客户端
  • 双向 - 服务端和客户端都可以更新状态
  • 类型安全 - 使用泛型完整支持 TypeScript
  • 立即一致 - 读取你自己的写入
  • 线程安全 - 并发更新是安全的
  • 快速 - 状态与 Agent 在同一处运行

Agent 的状态存储在每个独立 Agent 实例内部嵌入的 SQL 数据库中。你可以通过更高层的 this.setState API(推荐)与之交互,这会同步状态并在状态变化时触发事件;或者直接使用 this.sql 查询数据库。

State vs Props

State 是持久化的、在重启后存活并跨客户端同步的数据。Props 是 Agent 实例化时传入的一次性初始化参数 - 用 props 来配置不需要持久化的内容。

JavaScript


import { Agent } from "agents";


export class GameAgent extends Agent {

  // Default state for new agents

  initialState = {

    players: [],

    score: 0,

    status: "waiting",

  };


  // React to state changes

  onStateChanged(state, source) {

    if (source !== "server" && state.players.length >= 2) {

      // Client added a player, start the game

      this.setState({ ...state, status: "playing" });

    }

  }


  addPlayer(name) {

    this.setState({

      ...this.state,

      players: [...this.state.players, name],

    });

  }

}


Explain Code

TypeScript


import { Agent } from "agents";


type GameState = {

  players: string[];

  score: number;

  status: "waiting" | "playing" | "finished";

};


export class GameAgent extends Agent<Env, GameState> {

  // Default state for new agents

  initialState: GameState = {

    players: [],

    score: 0,

    status: "waiting",

  };


  // React to state changes

  onStateChanged(state: GameState, source: Connection | "server") {

    if (source !== "server" && state.players.length >= 2) {

      // Client added a player, start the game

      this.setState({ ...state, status: "playing" });

    }

  }


  addPlayer(name: string) {

    this.setState({

      ...this.state,

      players: [...this.state.players, name],

    });

  }

}


Explain Code

定义初始状态

使用 initialState 属性为新 Agent 实例定义默认值:

JavaScript


export class ChatAgent extends Agent {

  initialState = {

    messages: [],

    settings: { theme: "dark", notifications: true },

    lastActive: null,

  };

}


TypeScript


type State = {

  messages: Message[];

  settings: UserSettings;

  lastActive: string | null;

};


export class ChatAgent extends Agent<Env, State> {

  initialState: State = {

    messages: [],

    settings: { theme: "dark", notifications: true },

    lastActive: null,

  };

}


Explain Code

类型安全

Agent 的第二个泛型参数定义你的状态类型:

JavaScript


// State is fully typed

export class MyAgent extends Agent {

  initialState = { count: 0 };


  increment() {

    // TypeScript knows this.state is MyState

    this.setState({ count: this.state.count + 1 });

  }

}


TypeScript


// State is fully typed

export class MyAgent extends Agent<Env, MyState> {

  initialState: MyState = { count: 0 };


  increment() {

    // TypeScript knows this.state is MyState

    this.setState({ count: this.state.count + 1 });

  }

}


初始状态何时生效

初始状态在首次访问时延迟应用,而不是在每次唤醒时:

  1. 新 Agent - 使用并持久化 initialState
  2. 已存在的 Agent - 从 SQLite 加载已持久化的状态
  3. 未定义 initialState - this.stateundefined

JavaScript


class MyAgent extends Agent {

  initialState = { count: 0 };

  async onStart() {

    // Safe to access - returns initialState if new, or persisted state

    console.log("Current count:", this.state.count);

  }

}


TypeScript


class MyAgent extends Agent<Env, { count: number }> {

  initialState = { count: 0 };

  async onStart() {

    // Safe to access - returns initialState if new, or persisted state

    console.log("Current count:", this.state.count);

  }

}


读取状态

通过 this.state getter 访问当前状态:

JavaScript


class MyAgent extends Agent {

  async onRequest(request) {

    // Read current state

    const { players, status } = this.state;


    if (status === "waiting" && players.length < 2) {

      return new Response("Waiting for players...");

    }


    return Response.json(this.state);

  }

}


Explain Code

TypeScript


class MyAgent extends Agent<

  Env,

  { players: string[]; status: "waiting" | "playing" | "finished" }

> {

  async onRequest(request: Request) {

    // Read current state

    const { players, status } = this.state;


    if (status === "waiting" && players.length < 2) {

      return new Response("Waiting for players...");

    }


    return Response.json(this.state);

  }

}


Explain Code

未定义的状态

如果你不定义 initialState,this.state 会返回 undefined:

JavaScript


export class MinimalAgent extends Agent {

  // No initialState defined


  async onConnect(connection) {

    if (!this.state) {

      // First time - initialize state

      this.setState({ initialized: true });

    }

  }

}


Explain Code

TypeScript


export class MinimalAgent extends Agent {

  // No initialState defined


  async onConnect(connection: Connection) {

    if (!this.state) {

      // First time - initialize state

      this.setState({ initialized: true });

    }

  }

}


Explain Code

更新状态

使用 setState() 更新状态。它会:

  1. 保存到 SQLite(持久化)
  2. 广播到所有连接的客户端(排除那些 shouldSendProtocolMessages 返回 false 的连接)
  3. 触发 onStateChanged()(在广播之后;尽力而为)

JavaScript


// Replace entire state

this.setState({

  players: ["Alice", "Bob"],

  score: 0,

  status: "playing",

});


// Update specific fields (spread existing state)

this.setState({

  ...this.state,

  score: this.state.score + 10,

});


Explain Code

TypeScript


// Replace entire state

this.setState({

  players: ["Alice", "Bob"],

  score: 0,

  status: "playing",

});


// Update specific fields (spread existing state)

this.setState({

  ...this.state,

  score: this.state.score + 10,

});


Explain Code

状态必须可序列化

状态以 JSON 形式存储,因此必须可序列化:

JavaScript


// Good - plain objects, arrays, primitives

this.setState({

  items: ["a", "b", "c"],

  count: 42,

  active: true,

  metadata: { key: "value" },

});


// Bad - functions, classes, circular references

// Functions do not serialize

// Dates become strings, lose methods

// Circular references fail


// For dates, use ISO strings

this.setState({

  createdAt: new Date().toISOString(),

});


Explain Code

TypeScript


// Good - plain objects, arrays, primitives

this.setState({

  items: ["a", "b", "c"],

  count: 42,

  active: true,

  metadata: { key: "value" },

});


// Bad - functions, classes, circular references

// Functions do not serialize

// Dates become strings, lose methods

// Circular references fail


// For dates, use ISO strings

this.setState({

  createdAt: new Date().toISOString(),

});


Explain Code

响应状态变化

重写 onStateChanged() 以在状态变化时作出反应(通知/副作用):

JavaScript


class MyAgent extends Agent {

  onStateChanged(state, source) {

    console.log("State updated:", state);

    console.log("Updated by:", source === "server" ? "server" : source.id);

  }

}


TypeScript


class MyAgent extends Agent<Env, GameState> {

  onStateChanged(state: GameState, source: Connection | "server") {

    console.log("State updated:", state);

    console.log("Updated by:", source === "server" ? "server" : source.id);

  }

}


source 参数

source 表明是谁触发了更新:

含义
“server”Agent 调用了 setState()
Connection一个客户端通过 WebSocket 推送了状态

这对以下情况很有用:

  • 避免无限循环(不要响应自己的更新)

  • 校验客户端输入

  • 仅在客户端动作时触发副作用

  • JavaScript

  • TypeScript

JavaScript


class MyAgent extends Agent {

  onStateChanged(state, source) {

    // Ignore server-initiated updates

    if (source === "server") return;


    // A client updated state - validate and process

    const connection = source;

    console.log(`Client ${connection.id} updated state`);


    // Maybe trigger something based on the change

    if (state.status === "submitted") {

      this.processSubmission(state);

    }

  }

}


Explain Code

TypeScript


class MyAgent extends Agent<

  Env,

  { status: "waiting" | "playing" | "finished" }

> {

  onStateChanged(state: GameState, source: Connection | "server") {

    // Ignore server-initiated updates

    if (source === "server") return;


    // A client updated state - validate and process

    const connection = source;

    console.log(`Client ${connection.id} updated state`);


    // Maybe trigger something based on the change

    if (state.status === "submitted") {

      this.processSubmission(state);

    }

  }

}


Explain Code

常见模式:由客户端驱动的动作

JavaScript


class MyAgent extends Agent {

  onStateChanged(state, source) {

    if (source === "server") return;


    // Client added a message

    const lastMessage = state.messages[state.messages.length - 1];

    if (lastMessage && !lastMessage.processed) {

      // Process and update

      this.setState({

        ...state,

        messages: state.messages.map((m) =>

          m.id === lastMessage.id ? { ...m, processed: true } : m,

        ),

      });

    }

  }

}


Explain Code

TypeScript


class MyAgent extends Agent<Env, { messages: Message[] }> {

  onStateChanged(state: State, source: Connection | "server") {

    if (source === "server") return;


    // Client added a message

    const lastMessage = state.messages[state.messages.length - 1];

    if (lastMessage && !lastMessage.processed) {

      // Process and update

      this.setState({

        ...state,

        messages: state.messages.map((m) =>

          m.id === lastMessage.id ? { ...m, processed: true } : m,

        ),

      });

    }

  }

}


Explain Code

校验状态更新

如果你想校验或拒绝状态更新,重写 validateStateChange():

JavaScript


class MyAgent extends Agent {

  validateStateChange(nextState, source) {

    // Example: reject negative scores

    if (nextState.score < 0) {

      throw new Error("score cannot be negative");

    }


    // Example: only allow certain status transitions

    if (this.state.status === "finished" && nextState.status !== "finished") {

      throw new Error("Cannot restart a finished game");

    }

  }

}


Explain Code

TypeScript


class MyAgent extends Agent<Env, GameState> {

  validateStateChange(nextState: GameState, source: Connection | "server") {

    // Example: reject negative scores

    if (nextState.score < 0) {

      throw new Error("score cannot be negative");

    }


    // Example: only allow certain status transitions

    if (this.state.status === "finished" && nextState.status !== "finished") {

      throw new Error("Cannot restart a finished game");

    }

  }

}


Explain Code

Note

onStateChanged() 不适合用于校验;它是通知 hook,不应阻塞广播。请使用 validateStateChange() 进行校验。

客户端状态同步

状态会自动与连接的客户端同步。

React (useAgent)

JavaScript


import { useAgent } from "agents/react";


function GameUI() {

  const agent = useAgent({

    agent: "game-agent",

    name: "room-123",

    onStateUpdate: (state, source) => {

      console.log("State updated:", state);

    },

  });


  // Push state to agent

  const addPlayer = (name) => {

    agent.setState({

      ...agent.state,

      players: [...agent.state.players, name],

    });

  };


  return <div>Players: {agent.state?.players.join(", ")}</div>;

}


Explain Code

TypeScript


import { useAgent } from "agents/react";


function GameUI() {

  const agent = useAgent({

    agent: "game-agent",

    name: "room-123",

    onStateUpdate: (state, source) => {

      console.log("State updated:", state);

    }

  });


  // Push state to agent

  const addPlayer = (name: string) => {

    agent.setState({

      ...agent.state,

      players: [...agent.state.players, name]

    });

  };


  return <div>Players: {agent.state?.players.join(", ")}</div>;

}


Explain Code

原生 JS (AgentClient)

JavaScript


import { AgentClient } from "agents/client";


const client = new AgentClient({

  agent: "game-agent",

  name: "room-123",

  onStateUpdate: (state) => {

    document.getElementById("score").textContent = state.score;

  },

});


// Push state update

client.setState({ ...client.state, score: 100 });


Explain Code

TypeScript


import { AgentClient } from "agents/client";


const client = new AgentClient({

  agent: "game-agent",

  name: "room-123",

  onStateUpdate: (state) => {

    document.getElementById("score").textContent = state.score;

  },

});


// Push state update

client.setState({ ...client.state, score: 100 });


Explain Code

状态流向

flowchart TD subgraph Agent S[“this.state
(persisted in SQLite)”] end subgraph Clients C1[“Client 1”] C2[“Client 2”] C3[“Client 3”] end C1 & C2 & C3 –>|setState| S S –>|broadcast via WebSocket| C1 & C2 & C3

来自 Workflows 的状态

使用 Workflows 时,你可以从 workflow 步骤中更新 Agent 状态:

JavaScript


// In your workflow

class MyWorkflow extends Workflow {

  async run(event, step) {

    // Replace entire state

    await step.updateAgentState({ status: "processing", progress: 0 });


    // Merge partial updates (preserves other fields)

    await step.mergeAgentState({ progress: 50 });


    // Reset to initialState

    await step.resetAgentState();


    return result;

  }

}


Explain Code

TypeScript


// In your workflow

class MyWorkflow extends Workflow<Env> {

  async run(event: AgentWorkflowEvent, step: AgentWorkflowStep) {

    // Replace entire state

    await step.updateAgentState({ status: "processing", progress: 0 });


    // Merge partial updates (preserves other fields)

    await step.mergeAgentState({ progress: 50 });


    // Reset to initialState

    await step.resetAgentState();


    return result;

  }

}


Explain Code

这些是持久化操作 - 即使 workflow 重试它们也会持久存在。

SQL API

每个独立的 Agent 实例都有自己的 SQL (SQLite) 数据库,运行在与 Agent 自身相同的上下文中。这意味着在 Agent 内插入或查询数据基本是零延迟的:Agent 不必跨越大洲或全球去访问自己的数据。

你可以在 Agent 的任意方法中通过 this.sql 访问 SQL API。SQL API 接受模板字面量:

JavaScript


export class MyAgent extends Agent {

  async onRequest(request) {

    let userId = new URL(request.url).searchParams.get("userId");


    // 'users' is just an example here: you can create arbitrary tables and define your own schemas

    // within each Agent's database using SQL (SQLite syntax).

    let [user] = this.sql`SELECT * FROM users WHERE id = ${userId}`;

    return Response.json(user);

  }

}


Explain Code

TypeScript


export class MyAgent extends Agent {

  async onRequest(request: Request) {

    let userId = new URL(request.url).searchParams.get("userId");


    // 'users' is just an example here: you can create arbitrary tables and define your own schemas

    // within each Agent's database using SQL (SQLite syntax).

    let [user] = this.sql`SELECT * FROM users WHERE id = ${userId}`;

    return Response.json(user);

  }

}


Explain Code

你也可以为查询提供 TypeScript 类型参数,该参数会被用于推断结果类型:

JavaScript


export class MyAgent extends Agent {

  async onRequest(request) {

    let userId = new URL(request.url).searchParams.get("userId");

    // Supply the type parameter to the query when calling this.sql

    // This assumes the results returns one or more User rows with "id", "name", and "email" columns

    const [user] = this.sql`SELECT * FROM users WHERE id = ${userId}`;

    return Response.json(user);

  }

}


TypeScript


type User = {

  id: string;

  name: string;

  email: string;

};


export class MyAgent extends Agent {

  async onRequest(request: Request) {

    let userId = new URL(request.url).searchParams.get("userId");

    // Supply the type parameter to the query when calling this.sql

    // This assumes the results returns one or more User rows with "id", "name", and "email" columns

    const [user] = this.sql<User>`SELECT * FROM users WHERE id = ${userId}`;

    return Response.json(user);

  }

}


Explain Code

不需要指定数组类型(User[]Array<User>),因为 this.sql 总是返回指定类型的数组。

Note

提供类型参数不会校验结果是否符合你的类型定义。如果需要校验传入事件,我们建议使用诸如 zod ↗ 的库或你自己的校验逻辑。

暴露给 Agent 的 SQL API 与 Durable Objects 内 的类似。你可以在 Agent 的数据库上使用相同的 SQL 查询。像在 Durable Objects 或 D1 中一样,创建表并查询数据。

最佳实践

让状态保持小巧

状态在每次变化时都会广播给所有客户端。对于大型数据:

TypeScript


// Bad - storing large arrays in state

initialState = {

  allMessages: [] // Could grow to thousands of items

};


// Good - store in SQL, keep state light

initialState = {

  messageCount: 0,

  lastMessageId: null

};


// Query SQL for full data

async getMessages(limit = 50) {

  return this.sql`SELECT * FROM messages ORDER BY created_at DESC LIMIT ${limit}`;

}


Explain Code

乐观更新

为了响应迅速的 UI,立即更新客户端状态:

JavaScript


// Client-side

function sendMessage(text) {

  const optimisticMessage = {

    id: crypto.randomUUID(),

    text,

    pending: true,

  };


  // Update immediately

  agent.setState({

    ...agent.state,

    messages: [...agent.state.messages, optimisticMessage],

  });


  // Server will confirm/update

}


// Server-side

class MyAgent extends Agent {

  onStateChanged(state, source) {

    if (source === "server") return;


    const pendingMessages = state.messages.filter((m) => m.pending);

    for (const msg of pendingMessages) {

      // Validate and confirm

      this.setState({

        ...state,

        messages: state.messages.map((m) =>

          m.id === msg.id ? { ...m, pending: false, timestamp: Date.now() } : m,

        ),

      });

    }

  }

}


Explain Code

TypeScript


// Client-side

function sendMessage(text: string) {

  const optimisticMessage = {

    id: crypto.randomUUID(),

    text,

    pending: true,

  };


  // Update immediately

  agent.setState({

    ...agent.state,

    messages: [...agent.state.messages, optimisticMessage],

  });


  // Server will confirm/update

}


// Server-side

class MyAgent extends Agent<Env, { messages: Message[] }> {

  onStateChanged(state: GameState, source: Connection | "server") {

    if (source === "server") return;


    const pendingMessages = state.messages.filter((m) => m.pending);

    for (const msg of pendingMessages) {

      // Validate and confirm

      this.setState({

        ...state,

        messages: state.messages.map((m) =>

          m.id === msg.id ? { ...m, pending: false, timestamp: Date.now() } : m,

        ),

      });

    }

  }

}


Explain Code

State vs SQL

用 State 存储用 SQL 存储
UI 状态(loading、选中项)历史数据
实时计数器大型集合
当前会话数据关系数据
配置可查询的数据

JavaScript


export class ChatAgent extends Agent {

  // State: current UI state

  initialState = {

    typing: [],

    unreadCount: 0,

    activeUsers: [],

  };


  // SQL: message history

  async getMessages(limit = 100) {

    return this.sql`

      SELECT * FROM messages

      ORDER BY created_at DESC

      LIMIT ${limit}

    `;

  }


  async saveMessage(message) {

    this.sql`

      INSERT INTO messages (id, text, user_id, created_at)

      VALUES (${message.id}, ${message.text}, ${message.userId}, ${Date.now()})

    `;

    // Update state for real-time UI

    this.setState({

      ...this.state,

      unreadCount: this.state.unreadCount + 1,

    });

  }

}


Explain Code

TypeScript


export class ChatAgent extends Agent {

  // State: current UI state

  initialState = {

    typing: [],

    unreadCount: 0,

    activeUsers: [],

  };


  // SQL: message history

  async getMessages(limit = 100) {

    return this.sql`

      SELECT * FROM messages

      ORDER BY created_at DESC

      LIMIT ${limit}

    `;

  }


  async saveMessage(message: Message) {

    this.sql`

      INSERT INTO messages (id, text, user_id, created_at)

      VALUES (${message.id}, ${message.text}, ${message.userId}, ${Date.now()})

    `;

    // Update state for real-time UI

    this.setState({

      ...this.state,

      unreadCount: this.state.unreadCount + 1,

    });

  }

}


Explain Code

避免无限循环

要小心,不要在响应自己的更新时再触发状态更新:

TypeScript


// Bad - infinite loop

onStateChanged(state: State) {

  this.setState({ ...state, lastUpdated: Date.now() });

}


// Good - check source

onStateChanged(state: State, source: Connection | "server") {

  if (source === "server") return; // Do not react to own updates

  this.setState({ ...state, lastUpdated: Date.now() });

}


Explain Code

把 Agent 状态用作模型上下文

你可以将 Agent 中的 state 与 SQL API,与它调用 AI 模型 的能力结合起来,把历史上下文包含进对模型的提示词中。现代大型语言模型 (LLM) 通常具有非常大的上下文窗口(可达数百万 token),允许你直接把相关上下文拉进提示词。

例如,你可以使用 Agent 的内置 SQL 数据库拉取历史,带着它去查询模型,然后再把回复追加到该历史中,以备下一次模型调用使用:

JavaScript


export class ReasoningAgent extends Agent {

  async callReasoningModel(prompt) {

    let result = this

      .sql`SELECT * FROM history WHERE user = ${prompt.userId} ORDER BY timestamp DESC LIMIT 1000`;

    let context = [];

    for (const row of result) {

      context.push(row.entry);

    }


    const systemPrompt = prompt.system || "You are a helpful assistant.";

    const userPrompt = `${prompt.user}\n\nUser history:\n${context.join("\n")}`;


    try {

      const response = await this.env.AI.run("@cf/zai-org/glm-4.7-flash", {

        messages: [

          { role: "system", content: systemPrompt },

          { role: "user", content: userPrompt },

        ],

      });


      // Store the response in history

      this

        .sql`INSERT INTO history (timestamp, user, entry) VALUES (${new Date()}, ${prompt.userId}, ${response.response})`;


      return response.response;

    } catch (error) {

      console.error("Error calling reasoning model:", error);

      throw error;

    }

  }

}


Explain Code

TypeScript


interface Env {

  AI: Ai;

}


export class ReasoningAgent extends Agent<Env> {

  async callReasoningModel(prompt: Prompt) {

    let result = this

      .sql<History>`SELECT * FROM history WHERE user = ${prompt.userId} ORDER BY timestamp DESC LIMIT 1000`;

    let context = [];

    for (const row of result) {

      context.push(row.entry);

    }


    const systemPrompt = prompt.system || "You are a helpful assistant.";

    const userPrompt = `${prompt.user}\n\nUser history:\n${context.join("\n")}`;


    try {

      const response = await this.env.AI.run("@cf/zai-org/glm-4.7-flash", {

        messages: [

          { role: "system", content: systemPrompt },

          { role: "user", content: userPrompt },

        ],

      });


      // Store the response in history

      this

        .sql`INSERT INTO history (timestamp, user, entry) VALUES (${new Date()}, ${prompt.userId}, ${response.response})`;


      return response.response;

    } catch (error) {

      console.error("Error calling reasoning model:", error);

      throw error;

    }

  }

}


Explain Code

之所以可行,是因为每个 Agent 实例都有自己的数据库,而存储在该数据库中的状态对该 Agent 是私有的:无论它是代表某个用户、某个房间或频道,还是某个深度研究工具行事。默认情况下,你不必管理竞争或访问中央化数据库来检索和存储状态。

API 参考

属性

属性类型描述
stateState当前状态(getter)
initialStateState新 Agent 的默认状态

方法

方法签名描述
setState(state: State) => void更新状态、持久化并广播
onStateChanged(state: State, source: Connection | “server”) => void状态变化时调用
validateStateChange(nextState: State, source: Connection | “server”) => void在持久化前进行校验(抛出异常即拒绝)

Workflow 步骤方法

方法描述
step.updateAgentState(state)从 workflow 替换 Agent 状态
step.mergeAgentState(partial)从 workflow 合并部分状态
step.resetAgentState()从 workflow 重置为 initialState

后续步骤

Agents API Agents SDK 的完整 API 参考。

构建一个聊天 Agent 构建并部署一个 AI 聊天 Agent。

WebSockets 构建带实时数据流的交互式 Agent。

运行 Workflows 在你的 Agent 中编排异步 workflow。