Run Workflows
将 Cloudflare Workflows 与 Agents 集成,在 Agent 处理实时通信的同时,由 Workflow 负责耐久的多步后台处理。
Agents 与 Workflows 的对比
Agents 擅长实时通信与状态管理。Workflows 擅长持久化执行,具备自动重试、故障恢复以及等待外部事件的能力。
如果只是聊天、消息处理与快速 API 调用,使用 Agent 即可。对于长时间运行的任务(超过 30 秒)、多步骤流水线以及人工审批流程,使用 Agent + Workflow。
快速开始
1. 定义一个 Workflow
继承 AgentWorkflow 即可获得对源 Agent 的类型化访问:
JavaScript
import { AgentWorkflow } from "agents/workflows";
export class ProcessingWorkflow extends AgentWorkflow {
async run(event, step) {
const params = event.payload;
const result = await step.do("process-data", async () => {
return processData(params.data);
});
// Non-durable: progress reporting (may repeat on retry)
await this.reportProgress({
step: "process",
status: "complete",
percent: 0.5,
});
// Broadcast to connected WebSocket clients
this.broadcastToClients({ type: "update", taskId: params.taskId });
await step.do("save-results", async () => {
// Call Agent methods via RPC
await this.agent.saveResult(params.taskId, result);
});
// Durable: idempotent, won't repeat on retry
await step.reportComplete(result);
return result;
}
}
Explain Code
TypeScript
import { AgentWorkflow } from "agents/workflows";
import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";
import type { MyAgent } from "./agent";
type TaskParams = { taskId: string; data: string };
export class ProcessingWorkflow extends AgentWorkflow<MyAgent, TaskParams> {
async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) {
const params = event.payload;
const result = await step.do("process-data", async () => {
return processData(params.data);
});
// Non-durable: progress reporting (may repeat on retry)
await this.reportProgress({
step: "process",
status: "complete",
percent: 0.5,
});
// Broadcast to connected WebSocket clients
this.broadcastToClients({ type: "update", taskId: params.taskId });
await step.do("save-results", async () => {
// Call Agent methods via RPC
await this.agent.saveResult(params.taskId, result);
});
// Durable: idempotent, won't repeat on retry
await step.reportComplete(result);
return result;
}
}
Explain Code
2. 从 Agent 启动 Workflow
使用 runWorkflow() 启动并跟踪 workflow:
JavaScript
import { Agent } from "agents";
export class MyAgent extends Agent {
async startTask(taskId, data) {
const instanceId = await this.runWorkflow("PROCESSING_WORKFLOW", {
taskId,
data,
});
return { instanceId };
}
async onWorkflowProgress(workflowName, instanceId, progress) {
this.broadcast(JSON.stringify({ type: "workflow-progress", progress }));
}
async onWorkflowComplete(workflowName, instanceId, result) {
console.log(`Workflow completed:`, result);
}
async saveResult(taskId, result) {
this
.sql`INSERT INTO results (task_id, data) VALUES (${taskId}, ${JSON.stringify(result)})`;
}
}
Explain Code
TypeScript
import { Agent } from "agents";
export class MyAgent extends Agent {
async startTask(taskId: string, data: string) {
const instanceId = await this.runWorkflow("PROCESSING_WORKFLOW", {
taskId,
data,
});
return { instanceId };
}
async onWorkflowProgress(
workflowName: string,
instanceId: string,
progress: unknown,
) {
this.broadcast(JSON.stringify({ type: "workflow-progress", progress }));
}
async onWorkflowComplete(
workflowName: string,
instanceId: string,
result?: unknown,
) {
console.log(`Workflow completed:`, result);
}
async saveResult(taskId: string, result: unknown) {
this
.sql`INSERT INTO results (task_id, data) VALUES (${taskId}, ${JSON.stringify(result)})`;
}
}
Explain Code
3. 配置 Wrangler
JSONC
{
"name": "my-app",
"main": "src/index.ts",
// Set this to today's date
"compatibility_date": "2026-04-29",
"durable_objects": {
"bindings": [{ "name": "MY_AGENT", "class_name": "MyAgent" }],
},
"workflows": [
{
"name": "processing-workflow",
"binding": "PROCESSING_WORKFLOW",
"class_name": "ProcessingWorkflow",
},
],
"migrations": [{ "tag": "v1", "new_sqlite_classes": ["MyAgent"] }],
}
Explain Code
TOML
name = "my-app"
main = "src/index.ts"
# Set this to today's date
compatibility_date = "2026-04-29"
[[durable_objects.bindings]]
name = "MY_AGENT"
class_name = "MyAgent"
[[workflows]]
name = "processing-workflow"
binding = "PROCESSING_WORKFLOW"
class_name = "ProcessingWorkflow"
[[migrations]]
tag = "v1"
new_sqlite_classes = [ "MyAgent" ]
Explain Code
AgentWorkflow 类
用于与 Agents 集成的 Workflow 基类。
类型参数
| 参数 | 描述 |
|---|---|
| AgentType | 用于类型化 RPC 的 Agent 类类型 |
| Params | 传给 workflow 的参数 |
| ProgressType | 进度上报使用的类型(默认为 DefaultProgress) |
| Env | 环境类型(默认为 Cloudflare.Env) |
属性
| 属性 | 类型 | 描述 |
|---|---|---|
| agent | Stub | 调用 Agent 方法的类型化 stub |
| instanceId | string | workflow 实例 ID |
| workflowName | string | workflow 的 binding 名称 |
| env | Env | 环境 bindings |
实例方法(非耐久)
这些方法在重试时可能重复执行。适合用于轻量、高频的更新。
reportProgress(progress)
向 Agent 上报进度,触发 onWorkflowProgress 回调。
JavaScript
await this.reportProgress({
step: "processing",
status: "running",
percent: 0.5,
});
TypeScript
await this.reportProgress({
step: "processing",
status: "running",
percent: 0.5,
});
broadcastToClients(message)
向所有连接到该 Agent 的 WebSocket 客户端广播消息。
JavaScript
this.broadcastToClients({ type: "update", data: result });
TypeScript
this.broadcastToClients({ type: "update", data: result });
waitForApproval(step, options?)
等待审批事件。如果被拒绝,会抛出 WorkflowRejectedError。
JavaScript
const approval = await this.waitForApproval(step, {
timeout: "7 days",
});
TypeScript
const approval = await this.waitForApproval<{ approvedBy: string }>(step, {
timeout: "7 days",
});
Step 方法(耐久)
这些方法是幂等的,重试时不会重复执行。适合用于必须持久化的状态变更。
| 方法 | 描述 |
|---|---|
| step.reportComplete(result?) | 上报成功完成 |
| step.reportError(error) | 上报错误 |
| step.sendEvent(event) | 向 Agent 发送自定义事件 |
| step.updateAgentState(state) | 替换 Agent state(广播给客户端) |
| step.mergeAgentState(partial) | 合并到 Agent state(广播给客户端) |
| step.resetAgentState() | 将 Agent state 重置为 initialState |
DefaultProgress 类型
TypeScript
type DefaultProgress = {
step?: string;
status?: "pending" | "running" | "complete" | "error";
message?: string;
percent?: number;
[key: string]: unknown;
};
Agent 上的 Workflow 方法
Agent 类上可用于管理 Workflow 的方法。
runWorkflow(workflowName, params, options?)
启动一个 workflow 实例并在 Agent 数据库中跟踪它。
参数:
| 参数 | 类型 | 描述 |
|---|---|---|
| workflowName | string | env 中的 workflow binding 名称 |
| params | object | 传给 workflow 的参数 |
| options.id | string | 自定义 workflow ID(未提供则自动生成) |
| options.metadata | object | 用于查询的元数据(不会传给 workflow) |
| options.agentBinding | string | Agent binding 名称(未提供则自动检测) |
返回值: Promise<string> —— Workflow 实例 ID
JavaScript
const instanceId = await this.runWorkflow(
"MY_WORKFLOW",
{ taskId: "123" },
{
metadata: { userId: "user-456", priority: "high" },
},
);
TypeScript
const instanceId = await this.runWorkflow(
"MY_WORKFLOW",
{ taskId: "123" },
{
metadata: { userId: "user-456", priority: "high" },
},
);
sendWorkflowEvent(workflowName, instanceId, event)
向运行中的 workflow 发送事件。
JavaScript
await this.sendWorkflowEvent("MY_WORKFLOW", instanceId, {
type: "custom-event",
payload: { action: "proceed" },
});
TypeScript
await this.sendWorkflowEvent("MY_WORKFLOW", instanceId, {
type: "custom-event",
payload: { action: "proceed" },
});
getWorkflowStatus(workflowName, instanceId)
获取 workflow 的状态,并更新跟踪记录。
JavaScript
const status = await this.getWorkflowStatus("MY_WORKFLOW", instanceId);
// { status: 'running', output: null, error: null }
TypeScript
const status = await this.getWorkflowStatus("MY_WORKFLOW", instanceId);
// { status: 'running', output: null, error: null }
getWorkflow(instanceId)
按 ID 获取一个被跟踪的 workflow。
JavaScript
const workflow = this.getWorkflow(instanceId);
// { instanceId, workflowName, status, metadata, error, createdAt, ... }
TypeScript
const workflow = this.getWorkflow(instanceId);
// { instanceId, workflowName, status, metadata, error, createdAt, ... }
getWorkflows(criteria?)
使用基于 cursor 的分页查询被跟踪的 workflows。返回一个 WorkflowPage,包含 workflow 列表、总数,以及下一页的 cursor。
JavaScript
// Get running workflows (default limit is 50, max is 100)
const { workflows, total } = this.getWorkflows({ status: "running" });
// Filter by metadata
const { workflows: userWorkflows } = this.getWorkflows({
metadata: { userId: "user-456" },
});
// Pagination with cursor
const page1 = this.getWorkflows({
status: ["complete", "errored"],
limit: 20,
orderBy: "desc",
});
console.log(`Showing ${page1.workflows.length} of ${page1.total} workflows`);
// Get next page using cursor
if (page1.nextCursor) {
const page2 = this.getWorkflows({
status: ["complete", "errored"],
limit: 20,
orderBy: "desc",
cursor: page1.nextCursor,
});
}
Explain Code
TypeScript
// Get running workflows (default limit is 50, max is 100)
const { workflows, total } = this.getWorkflows({ status: "running" });
// Filter by metadata
const { workflows: userWorkflows } = this.getWorkflows({
metadata: { userId: "user-456" },
});
// Pagination with cursor
const page1 = this.getWorkflows({
status: ["complete", "errored"],
limit: 20,
orderBy: "desc",
});
console.log(`Showing ${page1.workflows.length} of ${page1.total} workflows`);
// Get next page using cursor
if (page1.nextCursor) {
const page2 = this.getWorkflows({
status: ["complete", "errored"],
limit: 20,
orderBy: "desc",
cursor: page1.nextCursor,
});
}
Explain Code
WorkflowPage 类型:
TypeScript
type WorkflowPage = {
workflows: WorkflowInfo[];
total: number; // Total matching workflows
nextCursor: string | null; // null when no more pages
};
deleteWorkflow(instanceId)
删除单条 workflow 实例的跟踪记录。删除成功返回 true,未找到则返回 false。
deleteWorkflows(criteria?)
按条件删除 workflow 实例的跟踪记录。
JavaScript
// Delete completed workflow instances older than 7 days
this.deleteWorkflows({
status: "complete",
createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),
});
// Delete all errored and terminated workflows
this.deleteWorkflows({
status: ["errored", "terminated"],
});
Explain Code
TypeScript
// Delete completed workflow instances older than 7 days
this.deleteWorkflows({
status: "complete",
createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),
});
// Delete all errored and terminated workflows
this.deleteWorkflows({
status: ["errored", "terminated"],
});
Explain Code
terminateWorkflow(instanceId)
立即终止一个运行中的 workflow,并将其状态设为 "terminated"。
JavaScript
await this.terminateWorkflow(instanceId);
TypeScript
await this.terminateWorkflow(instanceId);
注意
terminate() 在 wrangler dev 本地开发中尚未支持,部署到 Cloudflare 后可用。
pauseWorkflow(instanceId)
暂停一个正在运行的 workflow,之后可以通过 resumeWorkflow() 恢复。
JavaScript
await this.pauseWorkflow(instanceId);
TypeScript
await this.pauseWorkflow(instanceId);
注意
pause() 在 wrangler dev 本地开发中尚未支持,部署到 Cloudflare 后可用。
resumeWorkflow(instanceId)
恢复一个已暂停的 workflow。
JavaScript
await this.resumeWorkflow(instanceId);
TypeScript
await this.resumeWorkflow(instanceId);
注意
resume() 在 wrangler dev 本地开发中尚未支持,部署到 Cloudflare 后可用。
restartWorkflow(instanceId, options?)
使用相同 ID 从头重新启动一个 workflow 实例。
JavaScript
// Reset tracking (default) - clears timestamps and error fields
await this.restartWorkflow(instanceId);
// Preserve original timestamps
await this.restartWorkflow(instanceId, { resetTracking: false });
TypeScript
// Reset tracking (default) - clears timestamps and error fields
await this.restartWorkflow(instanceId);
// Preserve original timestamps
await this.restartWorkflow(instanceId, { resetTracking: false });
注意
restart() 在 wrangler dev 本地开发中尚未支持,部署到 Cloudflare 后可用。
approveWorkflow(instanceId, options?)
批准一个等待中的 workflow,与 workflow 中的 waitForApproval() 配合使用。
JavaScript
await this.approveWorkflow(instanceId, {
reason: "Approved by admin",
metadata: { approvedBy: userId },
});
TypeScript
await this.approveWorkflow(instanceId, {
reason: "Approved by admin",
metadata: { approvedBy: userId },
});
rejectWorkflow(instanceId, options?)
拒绝一个等待中的 workflow,使 waitForApproval() 抛出 WorkflowRejectedError。
JavaScript
await this.rejectWorkflow(instanceId, { reason: "Request denied" });
TypeScript
await this.rejectWorkflow(instanceId, { reason: "Request denied" });
migrateWorkflowBinding(oldName, newName)
在重命名 workflow binding 后,迁移已被跟踪的 workflows。
JavaScript
class MyAgent extends Agent {
async onStart() {
this.migrateWorkflowBinding("OLD_WORKFLOW", "NEW_WORKFLOW");
}
}
TypeScript
class MyAgent extends Agent {
async onStart() {
this.migrateWorkflowBinding("OLD_WORKFLOW", "NEW_WORKFLOW");
}
}
生命周期回调
在你的 Agent 中重写以下方法以处理 workflow 事件:
| 回调 | 参数 | 描述 |
|---|---|---|
| onWorkflowProgress | workflowName、instanceId、progress | workflow 上报进度时触发 |
| onWorkflowComplete | workflowName、instanceId、result? | workflow 完成时触发 |
| onWorkflowError | workflowName、instanceId、error | workflow 出错时触发 |
| onWorkflowEvent | workflowName、instanceId、event | workflow 发送事件时触发 |
| onWorkflowCallback | callback: WorkflowCallback | 所有类型的回调都会触发 |
JavaScript
class MyAgent extends Agent {
async onWorkflowProgress(workflowName, instanceId, progress) {
this.broadcast(
JSON.stringify({ type: "progress", workflowName, instanceId, progress }),
);
}
async onWorkflowComplete(workflowName, instanceId, result) {
console.log(`${workflowName}/${instanceId} completed`);
}
async onWorkflowError(workflowName, instanceId, error) {
console.error(`${workflowName}/${instanceId} failed:`, error);
}
}
Explain Code
TypeScript
class MyAgent extends Agent {
async onWorkflowProgress(
workflowName: string,
instanceId: string,
progress: unknown,
) {
this.broadcast(
JSON.stringify({ type: "progress", workflowName, instanceId, progress }),
);
}
async onWorkflowComplete(
workflowName: string,
instanceId: string,
result?: unknown,
) {
console.log(`${workflowName}/${instanceId} completed`);
}
async onWorkflowError(
workflowName: string,
instanceId: string,
error: string,
) {
console.error(`${workflowName}/${instanceId} failed:`, error);
}
}
Explain Code
Workflow 跟踪
通过 runWorkflow() 启动的 workflow 会自动在 Agent 的内部数据库中被跟踪。你可以使用上面提到的方法(getWorkflow()、getWorkflows()、deleteWorkflow() 等)进行查询、过滤与管理。
状态值
| 状态 | 描述 |
|---|---|
| queued | 等待启动 |
| running | 正在执行 |
| paused | 被用户暂停 |
| waiting | 等待事件 |
| complete | 成功结束 |
| errored | 出错失败 |
| terminated | 被手动终止 |
可在 runWorkflow() 中通过 metadata 选项存储可查询的信息(如用户 ID 或任务类型),后续使用 getWorkflows() 进行过滤。
示例
人工审批
JavaScript
import { AgentWorkflow } from "agents/workflows";
export class ApprovalWorkflow extends AgentWorkflow {
async run(event, step) {
const request = await step.do("prepare", async () => {
return { ...event.payload, preparedAt: Date.now() };
});
await this.reportProgress({
step: "approval",
status: "pending",
message: "Awaiting approval",
});
// Throws WorkflowRejectedError if rejected
const approval = await this.waitForApproval(step, {
timeout: "7 days",
});
console.log("Approved by:", approval?.approvedBy);
const result = await step.do("execute", async () => {
return executeRequest(request);
});
await step.reportComplete(result);
return result;
}
}
class MyAgent extends Agent {
async handleApproval(instanceId, userId) {
await this.approveWorkflow(instanceId, {
reason: "Approved by admin",
metadata: { approvedBy: userId },
});
}
async handleRejection(instanceId, reason) {
await this.rejectWorkflow(instanceId, { reason });
}
}
Explain Code
TypeScript
import { AgentWorkflow } from "agents/workflows";
import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";
export class ApprovalWorkflow extends AgentWorkflow<MyAgent, RequestParams> {
async run(event: AgentWorkflowEvent<RequestParams>, step: AgentWorkflowStep) {
const request = await step.do("prepare", async () => {
return { ...event.payload, preparedAt: Date.now() };
});
await this.reportProgress({
step: "approval",
status: "pending",
message: "Awaiting approval",
});
// Throws WorkflowRejectedError if rejected
const approval = await this.waitForApproval<{ approvedBy: string }>(step, {
timeout: "7 days",
});
console.log("Approved by:", approval?.approvedBy);
const result = await step.do("execute", async () => {
return executeRequest(request);
});
await step.reportComplete(result);
return result;
}
}
class MyAgent extends Agent {
async handleApproval(instanceId: string, userId: string) {
await this.approveWorkflow(instanceId, {
reason: "Approved by admin",
metadata: { approvedBy: userId },
});
}
async handleRejection(instanceId: string, reason: string) {
await this.rejectWorkflow(instanceId, { reason });
}
}
Explain Code
带退避的重试
JavaScript
import { AgentWorkflow } from "agents/workflows";
export class ResilientWorkflow extends AgentWorkflow {
async run(event, step) {
const result = await step.do(
"call-api",
{
retries: { limit: 5, delay: "10 seconds", backoff: "exponential" },
timeout: "5 minutes",
},
async () => {
const response = await fetch("https://api.example.com/process", {
method: "POST",
body: JSON.stringify(event.payload),
});
if (!response.ok) throw new Error(`API error: ${response.status}`);
return response.json();
},
);
await step.reportComplete(result);
return result;
}
}
Explain Code
TypeScript
import { AgentWorkflow } from "agents/workflows";
import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";
export class ResilientWorkflow extends AgentWorkflow<MyAgent, TaskParams> {
async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) {
const result = await step.do(
"call-api",
{
retries: { limit: 5, delay: "10 seconds", backoff: "exponential" },
timeout: "5 minutes",
},
async () => {
const response = await fetch("https://api.example.com/process", {
method: "POST",
body: JSON.stringify(event.payload),
});
if (!response.ok) throw new Error(`API error: ${response.status}`);
return response.json();
},
);
await step.reportComplete(result);
return result;
}
}
Explain Code
状态同步
Workflow 可以通过 step 持久化地更新 Agent state,并自动广播给所有连接的客户端:
JavaScript
import { AgentWorkflow } from "agents/workflows";
export class StatefulWorkflow extends AgentWorkflow {
async run(event, step) {
// Replace entire state (durable, broadcasts to clients)
await step.updateAgentState({
currentTask: {
id: event.payload.taskId,
status: "processing",
startedAt: Date.now(),
},
});
const result = await step.do("process", async () =>
processTask(event.payload),
);
// Merge partial state (durable, keeps existing fields)
await step.mergeAgentState({
currentTask: { status: "complete", result, completedAt: Date.now() },
});
await step.reportComplete(result);
return result;
}
}
Explain Code
TypeScript
import { AgentWorkflow } from "agents/workflows";
import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";
export class StatefulWorkflow extends AgentWorkflow<MyAgent, TaskParams> {
async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) {
// Replace entire state (durable, broadcasts to clients)
await step.updateAgentState({
currentTask: {
id: event.payload.taskId,
status: "processing",
startedAt: Date.now(),
},
});
const result = await step.do("process", async () =>
processTask(event.payload),
);
// Merge partial state (durable, keeps existing fields)
await step.mergeAgentState({
currentTask: { status: "complete", result, completedAt: Date.now() },
});
await step.reportComplete(result);
return result;
}
}
Explain Code
自定义进度类型
为特定领域定义自定义的进度类型:
JavaScript
import { AgentWorkflow } from "agents/workflows";
// Custom progress type for data pipeline
// Workflow with custom progress type (3rd type parameter)
export class ETLWorkflow extends AgentWorkflow {
async run(event, step) {
await this.reportProgress({
stage: "extract",
recordsProcessed: 0,
totalRecords: 1000,
currentTable: "users",
});
// ... processing
}
}
// Agent receives typed progress
class MyAgent extends Agent {
async onWorkflowProgress(workflowName, instanceId, progress) {
const p = progress;
console.log(`Stage: ${p.stage}, ${p.recordsProcessed}/${p.totalRecords}`);
}
}
Explain Code
TypeScript
import { AgentWorkflow } from "agents/workflows";
import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";
// Custom progress type for data pipeline
type PipelineProgress = {
stage: "extract" | "transform" | "load";
recordsProcessed: number;
totalRecords: number;
currentTable?: string;
};
// Workflow with custom progress type (3rd type parameter)
export class ETLWorkflow extends AgentWorkflow<
MyAgent,
ETLParams,
PipelineProgress
> {
async run(event: AgentWorkflowEvent<ETLParams>, step: AgentWorkflowStep) {
await this.reportProgress({
stage: "extract",
recordsProcessed: 0,
totalRecords: 1000,
currentTable: "users",
});
// ... processing
}
}
// Agent receives typed progress
class MyAgent extends Agent {
async onWorkflowProgress(
workflowName: string,
instanceId: string,
progress: unknown,
) {
const p = progress as PipelineProgress;
console.log(`Stage: ${p.stage}, ${p.recordsProcessed}/${p.totalRecords}`);
}
}
Explain Code
清理策略
内部的 cf_agents_workflows 表可能无限增长,建议实施保留策略:
JavaScript
class MyAgent extends Agent {
// Option 1: Delete on completion
async onWorkflowComplete(workflowName, instanceId, result) {
// Process result first, then delete
this.deleteWorkflow(instanceId);
}
// Option 2: Scheduled cleanup (keep recent history)
async cleanupOldWorkflows() {
this.deleteWorkflows({
status: ["complete", "errored"],
createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),
});
}
// Option 3: Keep all history for compliance/auditing
// Don't call deleteWorkflows() - query historical data as needed
}
Explain Code
TypeScript
class MyAgent extends Agent {
// Option 1: Delete on completion
async onWorkflowComplete(
workflowName: string,
instanceId: string,
result?: unknown,
) {
// Process result first, then delete
this.deleteWorkflow(instanceId);
}
// Option 2: Scheduled cleanup (keep recent history)
async cleanupOldWorkflows() {
this.deleteWorkflows({
status: ["complete", "errored"],
createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),
});
}
// Option 3: Keep all history for compliance/auditing
// Don't call deleteWorkflows() - query historical data as needed
}
Explain Code
双向通信
Workflow 到 Agent
JavaScript
// Direct RPC call (typed)
await this.agent.updateTaskStatus(taskId, "processing");
const data = await this.agent.getData(taskId);
// Non-durable callbacks (may repeat on retry, use for frequent updates)
await this.reportProgress({ step: "process", percent: 0.5 });
this.broadcastToClients({ type: "update", data });
// Durable callbacks via step (idempotent, won't repeat on retry)
await step.reportComplete(result);
await step.reportError("Something went wrong");
await step.sendEvent({ type: "custom", data: {} });
// Durable state synchronization via step (broadcasts to clients)
await step.updateAgentState({ status: "processing" });
await step.mergeAgentState({ progress: 0.5 });
Explain Code
TypeScript
// Direct RPC call (typed)
await this.agent.updateTaskStatus(taskId, "processing");
const data = await this.agent.getData(taskId);
// Non-durable callbacks (may repeat on retry, use for frequent updates)
await this.reportProgress({ step: "process", percent: 0.5 });
this.broadcastToClients({ type: "update", data });
// Durable callbacks via step (idempotent, won't repeat on retry)
await step.reportComplete(result);
await step.reportError("Something went wrong");
await step.sendEvent({ type: "custom", data: {} });
// Durable state synchronization via step (broadcasts to clients)
await step.updateAgentState({ status: "processing" });
await step.mergeAgentState({ progress: 0.5 });
Explain Code
Agent 到 Workflow
JavaScript
// Send event to waiting workflow
await this.sendWorkflowEvent("MY_WORKFLOW", instanceId, {
type: "custom-event",
payload: { action: "proceed" },
});
// Approve/reject workflows using convenience methods
await this.approveWorkflow(instanceId, {
reason: "Approved by admin",
metadata: { approvedBy: userId },
});
await this.rejectWorkflow(instanceId, { reason: "Request denied" });
Explain Code
TypeScript
// Send event to waiting workflow
await this.sendWorkflowEvent("MY_WORKFLOW", instanceId, {
type: "custom-event",
payload: { action: "proceed" },
});
// Approve/reject workflows using convenience methods
await this.approveWorkflow(instanceId, {
reason: "Approved by admin",
metadata: { approvedBy: userId },
});
await this.rejectWorkflow(instanceId, { reason: "Request denied" });
Explain Code
最佳实践
- 保持 workflow 聚焦 —— 一个 workflow 对应一个逻辑任务
- 使用有意义的 step 名称 —— 便于调试与可观测性
- 定期上报进度 —— 让用户随时了解情况
- 优雅地处理错误 —— 在抛出异常前调用
reportError() - 清理已完成的 workflow —— 为跟踪表实施保留策略
- 处理 workflow binding 重命名 —— 在
wrangler.jsonc中重命名 workflow binding 时使用migrateWorkflowBinding()
限制
| 限制项 | 数值 |
|---|---|
| 最大 step 数 | 每个 workflow 默认 10,000 步,可配置至 25,000 步 |
| state 大小 | 每个 workflow 10 MB |
| 事件等待时间 | 最长 1 年 |
| 单个 step 执行时间 | 每个 step 30 分钟 |
Workflow 不能直接打开 WebSocket 连接。请通过 broadcastToClients() 经由 Agent 与已连接的客户端通信。
相关资源
Workflows documentation 了解 Cloudflare Workflows 的基础知识。
Store and sync state 持久化与同步 Agent 状态。
Schedule tasks 基于时间的任务执行。
Human-in-the-loop 审批流与人工介入模式。