Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Workflows

什么是 Workflows?

Cloudflare Workflows 为需要在故障中存活、自动重试、并能等待外部事件的任务提供了持久化的多步骤执行能力。与 Agent 集成时,Workflow 处理长时间运行的后台处理,而 Agent 负责实时通信。

Agent 与 Workflows 对比

Agent 与 Workflows 的强项互补:

能力AgentWorkflows
执行模型可无限期运行运行至完成
实时通信WebSocket、HTTP 流式传输不支持
状态持久化内置 SQL 数据库步骤级持久化
失败处理应用层定义自动重试与恢复
外部事件直接处理暂停并等待事件
用户交互直接(聊天、UI)通过 Agent 回调

Agent 可以循环、分支、直接与用户交互。Workflow 顺序执行步骤,具备投递保证,并可以暂停数天等待审批或外部数据。

何时使用哪一个

仅使用 Agent 的场景:

  • 聊天与消息应用
  • 快速的 API 调用与响应
  • 实时协作功能
  • 30 秒以内的任务

Agent 配合 Workflow 的场景:

  • 数据处理流水线
  • 报告生成
  • human-in-the-loop 审批流程
  • 需要投递保证的任务
  • 需要重试的多步骤操作

仅使用 Workflow 的场景:

  • 不论是否需要用户审批的后台 job
  • 定时数据同步
  • 事件驱动的处理流水线

Agent 与 Workflow 如何通信

AgentWorkflow 类(从 agents/workflows 导入)在 Workflow 与发起它的 Agent 之间提供双向通信。

Workflow 到 Agent

Workflow 可以通过多种机制与 Agent 通信:

  • RPC 调用:通过 this.agent 直接调用 Agent 方法,具有完整的类型安全性

  • 进度上报:通过 this.reportProgress() 发送进度更新,触发 Agent 回调

  • 状态更新:通过 step.updateAgentState()step.mergeAgentState() 修改 Agent 状态,会广播给已连接的客户端

  • 客户端广播:通过 this.broadcastToClients() 给所有 WebSocket 客户端发送消息

  • JavaScript

  • TypeScript

JavaScript


// Inside a workflow's run() method

await this.agent.updateTaskStatus(taskId, "processing"); // RPC call

await this.reportProgress({ step: "process", percent: 0.5 }); // Progress (non-durable)

this.broadcastToClients({ type: "update", taskId }); // Broadcast (non-durable)

await step.mergeAgentState({ taskProgress: 0.5 }); // State update (durable)


TypeScript


// Inside a workflow's run() method

await this.agent.updateTaskStatus(taskId, "processing"); // RPC call

await this.reportProgress({ step: "process", percent: 0.5 }); // Progress (non-durable)

this.broadcastToClients({ type: "update", taskId }); // Broadcast (non-durable)

await step.mergeAgentState({ taskProgress: 0.5 }); // State update (durable)


Agent 到 Workflow

Agent 可以这样与正在运行的 Workflow 交互:

  • 启动 workflow:用 runWorkflow() 启动新的 workflow 实例
  • 发送事件:用 sendWorkflowEvent() 派发事件
  • 审批/拒绝:用 approveWorkflow() / rejectWorkflow() 响应审批请求
  • workflow 控制:暂停、恢复、终止或重启 workflow
  • 状态查询:用 getWorkflow() / getWorkflows() 检查 workflow 进度

持久化与非持久化操作

理解持久化是高效使用 workflow 的关键:

非持久化(可能在重试时重复执行)

这些操作轻量,适合频繁更新,但在 workflow 重试时可能多次执行:

  • this.reportProgress() — 进度上报
  • this.broadcastToClients() — WebSocket 广播
  • 直接对 this.agent 的 RPC 调用

持久化(幂等,不会重复)

这些操作使用 step 参数,保证只执行一次:

  • step.do() — 执行持久化步骤
  • step.reportComplete() / step.reportError() — 完成上报
  • step.sendEvent() — 自定义事件
  • step.updateAgentState() / step.mergeAgentState() — 状态同步

持久化保证

Workflow 通过基于 step 的执行提供持久化保证:

  1. 步骤完成是永久的 — 步骤一旦完成,即使 workflow 重启也不会重新执行
  2. 自动重试 — 失败的步骤会按可配置的退避策略重试
  3. 事件持久化 — Workflow 可以等待事件长达一年
  4. 状态恢复 — Workflow 状态可在基础设施故障中存活

这种持久化模型使 workflow 非常适合需要保留部分完成状态的任务,例如多阶段数据处理或跨多个系统的事务。

Workflow 跟踪

当 Agent 通过 runWorkflow() 启动 workflow 时,该 workflow 会自动登记到 Agent 的内部数据库中。这能实现:

  • 通过 ID、名称或元数据按游标分页查询 workflow 状态
  • 通过生命周期回调(onWorkflowProgressonWorkflowCompleteonWorkflowError)监控进度
  • workflow 控制:暂停、恢复、终止、重启
  • deleteWorkflow() / deleteWorkflows() 清理已完成的 workflow 记录
  • 通过元数据将 workflow 与用户或会话关联

常见模式

带进度的后台处理

Agent 收到一个请求,启动一个 Workflow 处理重活,并随着 Workflow 执行每一步,把进度更新广播给已连接的客户端。

JavaScript


// Workflow reports progress after each item

for (let i = 0; i < items.length; i++) {

  await step.do(`process-${i}`, async () => processItem(items[i]));

  await this.reportProgress({

    step: `process-${i}`,

    percent: (i + 1) / items.length,

    message: `Processed ${i + 1}/${items.length}`,

  });

}


TypeScript


// Workflow reports progress after each item

for (let i = 0; i < items.length; i++) {

  await step.do(`process-${i}`, async () => processItem(items[i]));

  await this.reportProgress({

    step: `process-${i}`,

    percent: (i + 1) / items.length,

    message: `Processed ${i + 1}/${items.length}`,

  });

}


Human-in-the-loop 审批

一个 Workflow 准备好请求,通过 waitForApproval() 暂停等待审批,Agent 提供 UI 让用户通过 approveWorkflow() / rejectWorkflow() 进行审批或拒绝。Workflow 根据决定恢复执行,或者抛出 WorkflowRejectedError

健壮的外部 API 调用

Workflow 把外部 API 调用包在带重试逻辑的持久化步骤中。如果 API 失败或 workflow 重启,已完成的调用不会重复执行,失败的调用会自动重试。

JavaScript


const result = await step.do(

  "call-api",

  {

    retries: { limit: 5, delay: "10 seconds", backoff: "exponential" },

    timeout: "5 minutes",

  },

  async () => {

    const response = await fetch("https://api.example.com/process");

    if (!response.ok) throw new Error(`API error: ${response.status}`);

    return response.json();

  },

);


TypeScript


const result = await step.do(

  "call-api",

  {

    retries: { limit: 5, delay: "10 seconds", backoff: "exponential" },

    timeout: "5 minutes",

  },

  async () => {

    const response = await fetch("https://api.example.com/process");

    if (!response.ok) throw new Error(`API error: ${response.status}`);

    return response.json();

  },

);


状态同步

Workflow 在关键里程碑使用 step.updateAgentState()step.mergeAgentState() 更新 Agent 状态。这些状态变更会广播给所有已连接的客户端,让 UI 无需轮询即可保持同步。

相关资源

Run Workflows API agent workflow 的实现细节。

Cloudflare Workflows Workflow 的基础与文档。

Human-in-the-loop 审批流程与人工介入。