Workflows
什么是 Workflows?
Cloudflare Workflows 为需要在故障中存活、自动重试、并能等待外部事件的任务提供了持久化的多步骤执行能力。与 Agent 集成时,Workflow 处理长时间运行的后台处理,而 Agent 负责实时通信。
Agent 与 Workflows 对比
Agent 与 Workflows 的强项互补:
| 能力 | Agent | Workflows |
|---|---|---|
| 执行模型 | 可无限期运行 | 运行至完成 |
| 实时通信 | WebSocket、HTTP 流式传输 | 不支持 |
| 状态持久化 | 内置 SQL 数据库 | 步骤级持久化 |
| 失败处理 | 应用层定义 | 自动重试与恢复 |
| 外部事件 | 直接处理 | 暂停并等待事件 |
| 用户交互 | 直接(聊天、UI) | 通过 Agent 回调 |
Agent 可以循环、分支、直接与用户交互。Workflow 顺序执行步骤,具备投递保证,并可以暂停数天等待审批或外部数据。
何时使用哪一个
仅使用 Agent 的场景:
- 聊天与消息应用
- 快速的 API 调用与响应
- 实时协作功能
- 30 秒以内的任务
Agent 配合 Workflow 的场景:
- 数据处理流水线
- 报告生成
- human-in-the-loop 审批流程
- 需要投递保证的任务
- 需要重试的多步骤操作
仅使用 Workflow 的场景:
- 不论是否需要用户审批的后台 job
- 定时数据同步
- 事件驱动的处理流水线
Agent 与 Workflow 如何通信
AgentWorkflow 类(从 agents/workflows 导入)在 Workflow 与发起它的 Agent 之间提供双向通信。
Workflow 到 Agent
Workflow 可以通过多种机制与 Agent 通信:
-
RPC 调用:通过
this.agent直接调用 Agent 方法,具有完整的类型安全性 -
进度上报:通过
this.reportProgress()发送进度更新,触发 Agent 回调 -
状态更新:通过
step.updateAgentState()或step.mergeAgentState()修改 Agent 状态,会广播给已连接的客户端 -
客户端广播:通过
this.broadcastToClients()给所有 WebSocket 客户端发送消息
JavaScript
// Inside a workflow's run() method
await this.agent.updateTaskStatus(taskId, "processing"); // RPC call
await this.reportProgress({ step: "process", percent: 0.5 }); // Progress (non-durable)
this.broadcastToClients({ type: "update", taskId }); // Broadcast (non-durable)
await step.mergeAgentState({ taskProgress: 0.5 }); // State update (durable)
TypeScript
// Inside a workflow's run() method
await this.agent.updateTaskStatus(taskId, "processing"); // RPC call
await this.reportProgress({ step: "process", percent: 0.5 }); // Progress (non-durable)
this.broadcastToClients({ type: "update", taskId }); // Broadcast (non-durable)
await step.mergeAgentState({ taskProgress: 0.5 }); // State update (durable)
Agent 到 Workflow
Agent 可以这样与正在运行的 Workflow 交互:
- 启动 workflow:用
runWorkflow()启动新的 workflow 实例 - 发送事件:用
sendWorkflowEvent()派发事件 - 审批/拒绝:用
approveWorkflow()/rejectWorkflow()响应审批请求 - workflow 控制:暂停、恢复、终止或重启 workflow
- 状态查询:用
getWorkflow()/getWorkflows()检查 workflow 进度
持久化与非持久化操作
理解持久化是高效使用 workflow 的关键:
非持久化(可能在重试时重复执行)
这些操作轻量,适合频繁更新,但在 workflow 重试时可能多次执行:
this.reportProgress()— 进度上报this.broadcastToClients()— WebSocket 广播- 直接对
this.agent的 RPC 调用
持久化(幂等,不会重复)
这些操作使用 step 参数,保证只执行一次:
step.do()— 执行持久化步骤step.reportComplete()/step.reportError()— 完成上报step.sendEvent()— 自定义事件step.updateAgentState()/step.mergeAgentState()— 状态同步
持久化保证
Workflow 通过基于 step 的执行提供持久化保证:
- 步骤完成是永久的 — 步骤一旦完成,即使 workflow 重启也不会重新执行
- 自动重试 — 失败的步骤会按可配置的退避策略重试
- 事件持久化 — Workflow 可以等待事件长达一年
- 状态恢复 — Workflow 状态可在基础设施故障中存活
这种持久化模型使 workflow 非常适合需要保留部分完成状态的任务,例如多阶段数据处理或跨多个系统的事务。
Workflow 跟踪
当 Agent 通过 runWorkflow() 启动 workflow 时,该 workflow 会自动登记到 Agent 的内部数据库中。这能实现:
- 通过 ID、名称或元数据按游标分页查询 workflow 状态
- 通过生命周期回调(
onWorkflowProgress、onWorkflowComplete、onWorkflowError)监控进度 - workflow 控制:暂停、恢复、终止、重启
- 用
deleteWorkflow()/deleteWorkflows()清理已完成的 workflow 记录 - 通过元数据将 workflow 与用户或会话关联
常见模式
带进度的后台处理
Agent 收到一个请求,启动一个 Workflow 处理重活,并随着 Workflow 执行每一步,把进度更新广播给已连接的客户端。
JavaScript
// Workflow reports progress after each item
for (let i = 0; i < items.length; i++) {
await step.do(`process-${i}`, async () => processItem(items[i]));
await this.reportProgress({
step: `process-${i}`,
percent: (i + 1) / items.length,
message: `Processed ${i + 1}/${items.length}`,
});
}
TypeScript
// Workflow reports progress after each item
for (let i = 0; i < items.length; i++) {
await step.do(`process-${i}`, async () => processItem(items[i]));
await this.reportProgress({
step: `process-${i}`,
percent: (i + 1) / items.length,
message: `Processed ${i + 1}/${items.length}`,
});
}
Human-in-the-loop 审批
一个 Workflow 准备好请求,通过 waitForApproval() 暂停等待审批,Agent 提供 UI 让用户通过 approveWorkflow() / rejectWorkflow() 进行审批或拒绝。Workflow 根据决定恢复执行,或者抛出 WorkflowRejectedError。
健壮的外部 API 调用
Workflow 把外部 API 调用包在带重试逻辑的持久化步骤中。如果 API 失败或 workflow 重启,已完成的调用不会重复执行,失败的调用会自动重试。
JavaScript
const result = await step.do(
"call-api",
{
retries: { limit: 5, delay: "10 seconds", backoff: "exponential" },
timeout: "5 minutes",
},
async () => {
const response = await fetch("https://api.example.com/process");
if (!response.ok) throw new Error(`API error: ${response.status}`);
return response.json();
},
);
TypeScript
const result = await step.do(
"call-api",
{
retries: { limit: 5, delay: "10 seconds", backoff: "exponential" },
timeout: "5 minutes",
},
async () => {
const response = await fetch("https://api.example.com/process");
if (!response.ok) throw new Error(`API error: ${response.status}`);
return response.json();
},
);
状态同步
Workflow 在关键里程碑使用 step.updateAgentState() 或 step.mergeAgentState() 更新 Agent 状态。这些状态变更会广播给所有已连接的客户端,让 UI 无需轮询即可保持同步。
相关资源
Run Workflows API agent workflow 的实现细节。
Cloudflare Workflows Workflow 的基础与文档。
Human-in-the-loop 审批流程与人工介入。