Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Run Workflows

Cloudflare Workflows 与 Agents 集成,在 Agent 处理实时通信的同时,由 Workflow 负责耐久的多步后台处理。

Agents 与 Workflows 的对比

Agents 擅长实时通信与状态管理。Workflows 擅长持久化执行,具备自动重试、故障恢复以及等待外部事件的能力。

如果只是聊天、消息处理与快速 API 调用,使用 Agent 即可。对于长时间运行的任务(超过 30 秒)、多步骤流水线以及人工审批流程,使用 Agent + Workflow。

快速开始

1. 定义一个 Workflow

继承 AgentWorkflow 即可获得对源 Agent 的类型化访问:

JavaScript


import { AgentWorkflow } from "agents/workflows";

export class ProcessingWorkflow extends AgentWorkflow {

  async run(event, step) {

    const params = event.payload;


    const result = await step.do("process-data", async () => {

      return processData(params.data);

    });


    // Non-durable: progress reporting (may repeat on retry)

    await this.reportProgress({

      step: "process",

      status: "complete",

      percent: 0.5,

    });


    // Broadcast to connected WebSocket clients

    this.broadcastToClients({ type: "update", taskId: params.taskId });


    await step.do("save-results", async () => {

      // Call Agent methods via RPC

      await this.agent.saveResult(params.taskId, result);

    });


    // Durable: idempotent, won't repeat on retry

    await step.reportComplete(result);

    return result;

  }

}


Explain Code

TypeScript


import { AgentWorkflow } from "agents/workflows";

import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";

import type { MyAgent } from "./agent";


type TaskParams = { taskId: string; data: string };


export class ProcessingWorkflow extends AgentWorkflow<MyAgent, TaskParams> {

  async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) {

    const params = event.payload;


    const result = await step.do("process-data", async () => {

      return processData(params.data);

    });


    // Non-durable: progress reporting (may repeat on retry)

    await this.reportProgress({

      step: "process",

      status: "complete",

      percent: 0.5,

    });


    // Broadcast to connected WebSocket clients

    this.broadcastToClients({ type: "update", taskId: params.taskId });


    await step.do("save-results", async () => {

      // Call Agent methods via RPC

      await this.agent.saveResult(params.taskId, result);

    });


    // Durable: idempotent, won't repeat on retry

    await step.reportComplete(result);

    return result;

  }

}


Explain Code

2. 从 Agent 启动 Workflow

使用 runWorkflow() 启动并跟踪 workflow:

JavaScript


import { Agent } from "agents";


export class MyAgent extends Agent {

  async startTask(taskId, data) {

    const instanceId = await this.runWorkflow("PROCESSING_WORKFLOW", {

      taskId,

      data,

    });

    return { instanceId };

  }


  async onWorkflowProgress(workflowName, instanceId, progress) {

    this.broadcast(JSON.stringify({ type: "workflow-progress", progress }));

  }


  async onWorkflowComplete(workflowName, instanceId, result) {

    console.log(`Workflow completed:`, result);

  }


  async saveResult(taskId, result) {

    this

      .sql`INSERT INTO results (task_id, data) VALUES (${taskId}, ${JSON.stringify(result)})`;

  }

}


Explain Code

TypeScript


import { Agent } from "agents";


export class MyAgent extends Agent {

  async startTask(taskId: string, data: string) {

    const instanceId = await this.runWorkflow("PROCESSING_WORKFLOW", {

      taskId,

      data,

    });

    return { instanceId };

  }


  async onWorkflowProgress(

    workflowName: string,

    instanceId: string,

    progress: unknown,

  ) {

    this.broadcast(JSON.stringify({ type: "workflow-progress", progress }));

  }


  async onWorkflowComplete(

    workflowName: string,

    instanceId: string,

    result?: unknown,

  ) {

    console.log(`Workflow completed:`, result);

  }


  async saveResult(taskId: string, result: unknown) {

    this

      .sql`INSERT INTO results (task_id, data) VALUES (${taskId}, ${JSON.stringify(result)})`;

  }

}


Explain Code

3. 配置 Wrangler

JSONC


{

  "name": "my-app",

  "main": "src/index.ts",

  // Set this to today's date

  "compatibility_date": "2026-04-29",

  "durable_objects": {

    "bindings": [{ "name": "MY_AGENT", "class_name": "MyAgent" }],

  },

  "workflows": [

    {

      "name": "processing-workflow",

      "binding": "PROCESSING_WORKFLOW",

      "class_name": "ProcessingWorkflow",

    },

  ],

  "migrations": [{ "tag": "v1", "new_sqlite_classes": ["MyAgent"] }],

}


Explain Code

TOML


name = "my-app"

main = "src/index.ts"

# Set this to today's date

compatibility_date = "2026-04-29"


[[durable_objects.bindings]]

name = "MY_AGENT"

class_name = "MyAgent"


[[workflows]]

name = "processing-workflow"

binding = "PROCESSING_WORKFLOW"

class_name = "ProcessingWorkflow"


[[migrations]]

tag = "v1"

new_sqlite_classes = [ "MyAgent" ]


Explain Code

AgentWorkflow 类

用于与 Agents 集成的 Workflow 基类。

类型参数

参数描述
AgentType用于类型化 RPC 的 Agent 类类型
Params传给 workflow 的参数
ProgressType进度上报使用的类型(默认为 DefaultProgress)
Env环境类型(默认为 Cloudflare.Env)

属性

属性类型描述
agentStub调用 Agent 方法的类型化 stub
instanceIdstringworkflow 实例 ID
workflowNamestringworkflow 的 binding 名称
envEnv环境 bindings

实例方法(非耐久)

这些方法在重试时可能重复执行。适合用于轻量、高频的更新。

reportProgress(progress)

向 Agent 上报进度,触发 onWorkflowProgress 回调。

JavaScript


await this.reportProgress({

  step: "processing",

  status: "running",

  percent: 0.5,

});


TypeScript


await this.reportProgress({

  step: "processing",

  status: "running",

  percent: 0.5,

});


broadcastToClients(message)

向所有连接到该 Agent 的 WebSocket 客户端广播消息。

JavaScript


this.broadcastToClients({ type: "update", data: result });


TypeScript


this.broadcastToClients({ type: "update", data: result });


waitForApproval(step, options?)

等待审批事件。如果被拒绝,会抛出 WorkflowRejectedError

JavaScript


const approval = await this.waitForApproval(step, {

  timeout: "7 days",

});


TypeScript


const approval = await this.waitForApproval<{ approvedBy: string }>(step, {

  timeout: "7 days",

});


Step 方法(耐久)

这些方法是幂等的,重试时不会重复执行。适合用于必须持久化的状态变更。

方法描述
step.reportComplete(result?)上报成功完成
step.reportError(error)上报错误
step.sendEvent(event)向 Agent 发送自定义事件
step.updateAgentState(state)替换 Agent state(广播给客户端)
step.mergeAgentState(partial)合并到 Agent state(广播给客户端)
step.resetAgentState()将 Agent state 重置为 initialState

DefaultProgress 类型

TypeScript


type DefaultProgress = {

  step?: string;

  status?: "pending" | "running" | "complete" | "error";

  message?: string;

  percent?: number;

  [key: string]: unknown;

};


Agent 上的 Workflow 方法

Agent 类上可用于管理 Workflow 的方法。

runWorkflow(workflowName, params, options?)

启动一个 workflow 实例并在 Agent 数据库中跟踪它。

参数:

参数类型描述
workflowNamestringenv 中的 workflow binding 名称
paramsobject传给 workflow 的参数
options.idstring自定义 workflow ID(未提供则自动生成)
options.metadataobject用于查询的元数据(不会传给 workflow)
options.agentBindingstringAgent binding 名称(未提供则自动检测)

返回值: Promise<string> —— Workflow 实例 ID

JavaScript


const instanceId = await this.runWorkflow(

  "MY_WORKFLOW",

  { taskId: "123" },

  {

    metadata: { userId: "user-456", priority: "high" },

  },

);


TypeScript


const instanceId = await this.runWorkflow(

  "MY_WORKFLOW",

  { taskId: "123" },

  {

    metadata: { userId: "user-456", priority: "high" },

  },

);


sendWorkflowEvent(workflowName, instanceId, event)

向运行中的 workflow 发送事件。

JavaScript


await this.sendWorkflowEvent("MY_WORKFLOW", instanceId, {

  type: "custom-event",

  payload: { action: "proceed" },

});


TypeScript


await this.sendWorkflowEvent("MY_WORKFLOW", instanceId, {

  type: "custom-event",

  payload: { action: "proceed" },

});


getWorkflowStatus(workflowName, instanceId)

获取 workflow 的状态,并更新跟踪记录。

JavaScript


const status = await this.getWorkflowStatus("MY_WORKFLOW", instanceId);

// { status: 'running', output: null, error: null }


TypeScript


const status = await this.getWorkflowStatus("MY_WORKFLOW", instanceId);

// { status: 'running', output: null, error: null }


getWorkflow(instanceId)

按 ID 获取一个被跟踪的 workflow。

JavaScript


const workflow = this.getWorkflow(instanceId);

// { instanceId, workflowName, status, metadata, error, createdAt, ... }


TypeScript


const workflow = this.getWorkflow(instanceId);

// { instanceId, workflowName, status, metadata, error, createdAt, ... }


getWorkflows(criteria?)

使用基于 cursor 的分页查询被跟踪的 workflows。返回一个 WorkflowPage,包含 workflow 列表、总数,以及下一页的 cursor。

JavaScript


// Get running workflows (default limit is 50, max is 100)

const { workflows, total } = this.getWorkflows({ status: "running" });


// Filter by metadata

const { workflows: userWorkflows } = this.getWorkflows({

  metadata: { userId: "user-456" },

});


// Pagination with cursor

const page1 = this.getWorkflows({

  status: ["complete", "errored"],

  limit: 20,

  orderBy: "desc",

});


console.log(`Showing ${page1.workflows.length} of ${page1.total} workflows`);


// Get next page using cursor

if (page1.nextCursor) {

  const page2 = this.getWorkflows({

    status: ["complete", "errored"],

    limit: 20,

    orderBy: "desc",

    cursor: page1.nextCursor,

  });

}


Explain Code

TypeScript


// Get running workflows (default limit is 50, max is 100)

const { workflows, total } = this.getWorkflows({ status: "running" });


// Filter by metadata

const { workflows: userWorkflows } = this.getWorkflows({

  metadata: { userId: "user-456" },

});


// Pagination with cursor

const page1 = this.getWorkflows({

  status: ["complete", "errored"],

  limit: 20,

  orderBy: "desc",

});


console.log(`Showing ${page1.workflows.length} of ${page1.total} workflows`);


// Get next page using cursor

if (page1.nextCursor) {

  const page2 = this.getWorkflows({

    status: ["complete", "errored"],

    limit: 20,

    orderBy: "desc",

    cursor: page1.nextCursor,

  });

}


Explain Code

WorkflowPage 类型:

TypeScript


type WorkflowPage = {

  workflows: WorkflowInfo[];

  total: number; // Total matching workflows

  nextCursor: string | null; // null when no more pages

};


deleteWorkflow(instanceId)

删除单条 workflow 实例的跟踪记录。删除成功返回 true,未找到则返回 false

deleteWorkflows(criteria?)

按条件删除 workflow 实例的跟踪记录。

JavaScript


// Delete completed workflow instances older than 7 days

this.deleteWorkflows({

  status: "complete",

  createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),

});


// Delete all errored and terminated workflows

this.deleteWorkflows({

  status: ["errored", "terminated"],

});


Explain Code

TypeScript


// Delete completed workflow instances older than 7 days

this.deleteWorkflows({

  status: "complete",

  createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),

});


// Delete all errored and terminated workflows

this.deleteWorkflows({

  status: ["errored", "terminated"],

});


Explain Code

terminateWorkflow(instanceId)

立即终止一个运行中的 workflow,并将其状态设为 "terminated"

JavaScript


await this.terminateWorkflow(instanceId);


TypeScript


await this.terminateWorkflow(instanceId);


注意

terminate()wrangler dev 本地开发中尚未支持,部署到 Cloudflare 后可用。

pauseWorkflow(instanceId)

暂停一个正在运行的 workflow,之后可以通过 resumeWorkflow() 恢复。

JavaScript


await this.pauseWorkflow(instanceId);


TypeScript


await this.pauseWorkflow(instanceId);


注意

pause()wrangler dev 本地开发中尚未支持,部署到 Cloudflare 后可用。

resumeWorkflow(instanceId)

恢复一个已暂停的 workflow。

JavaScript


await this.resumeWorkflow(instanceId);


TypeScript


await this.resumeWorkflow(instanceId);


注意

resume()wrangler dev 本地开发中尚未支持,部署到 Cloudflare 后可用。

restartWorkflow(instanceId, options?)

使用相同 ID 从头重新启动一个 workflow 实例。

JavaScript


// Reset tracking (default) - clears timestamps and error fields

await this.restartWorkflow(instanceId);


// Preserve original timestamps

await this.restartWorkflow(instanceId, { resetTracking: false });


TypeScript


// Reset tracking (default) - clears timestamps and error fields

await this.restartWorkflow(instanceId);


// Preserve original timestamps

await this.restartWorkflow(instanceId, { resetTracking: false });


注意

restart()wrangler dev 本地开发中尚未支持,部署到 Cloudflare 后可用。

approveWorkflow(instanceId, options?)

批准一个等待中的 workflow,与 workflow 中的 waitForApproval() 配合使用。

JavaScript


await this.approveWorkflow(instanceId, {

  reason: "Approved by admin",

  metadata: { approvedBy: userId },

});


TypeScript


await this.approveWorkflow(instanceId, {

  reason: "Approved by admin",

  metadata: { approvedBy: userId },

});


rejectWorkflow(instanceId, options?)

拒绝一个等待中的 workflow,使 waitForApproval() 抛出 WorkflowRejectedError

JavaScript


await this.rejectWorkflow(instanceId, { reason: "Request denied" });


TypeScript


await this.rejectWorkflow(instanceId, { reason: "Request denied" });


migrateWorkflowBinding(oldName, newName)

在重命名 workflow binding 后,迁移已被跟踪的 workflows。

JavaScript


class MyAgent extends Agent {

  async onStart() {

    this.migrateWorkflowBinding("OLD_WORKFLOW", "NEW_WORKFLOW");

  }

}


TypeScript


class MyAgent extends Agent {

  async onStart() {

    this.migrateWorkflowBinding("OLD_WORKFLOW", "NEW_WORKFLOW");

  }

}


生命周期回调

在你的 Agent 中重写以下方法以处理 workflow 事件:

回调参数描述
onWorkflowProgressworkflowName、instanceId、progressworkflow 上报进度时触发
onWorkflowCompleteworkflowName、instanceId、result?workflow 完成时触发
onWorkflowErrorworkflowName、instanceId、errorworkflow 出错时触发
onWorkflowEventworkflowName、instanceId、eventworkflow 发送事件时触发
onWorkflowCallbackcallback: WorkflowCallback所有类型的回调都会触发

JavaScript


class MyAgent extends Agent {

  async onWorkflowProgress(workflowName, instanceId, progress) {

    this.broadcast(

      JSON.stringify({ type: "progress", workflowName, instanceId, progress }),

    );

  }


  async onWorkflowComplete(workflowName, instanceId, result) {

    console.log(`${workflowName}/${instanceId} completed`);

  }


  async onWorkflowError(workflowName, instanceId, error) {

    console.error(`${workflowName}/${instanceId} failed:`, error);

  }

}


Explain Code

TypeScript


class MyAgent extends Agent {

  async onWorkflowProgress(

    workflowName: string,

    instanceId: string,

    progress: unknown,

  ) {

    this.broadcast(

      JSON.stringify({ type: "progress", workflowName, instanceId, progress }),

    );

  }


  async onWorkflowComplete(

    workflowName: string,

    instanceId: string,

    result?: unknown,

  ) {

    console.log(`${workflowName}/${instanceId} completed`);

  }


  async onWorkflowError(

    workflowName: string,

    instanceId: string,

    error: string,

  ) {

    console.error(`${workflowName}/${instanceId} failed:`, error);

  }

}


Explain Code

Workflow 跟踪

通过 runWorkflow() 启动的 workflow 会自动在 Agent 的内部数据库中被跟踪。你可以使用上面提到的方法(getWorkflow()getWorkflows()deleteWorkflow() 等)进行查询、过滤与管理。

状态值

状态描述
queued等待启动
running正在执行
paused被用户暂停
waiting等待事件
complete成功结束
errored出错失败
terminated被手动终止

可在 runWorkflow() 中通过 metadata 选项存储可查询的信息(如用户 ID 或任务类型),后续使用 getWorkflows() 进行过滤。

示例

人工审批

JavaScript


import { AgentWorkflow } from "agents/workflows";

export class ApprovalWorkflow extends AgentWorkflow {

  async run(event, step) {

    const request = await step.do("prepare", async () => {

      return { ...event.payload, preparedAt: Date.now() };

    });


    await this.reportProgress({

      step: "approval",

      status: "pending",

      message: "Awaiting approval",

    });


    // Throws WorkflowRejectedError if rejected

    const approval = await this.waitForApproval(step, {

      timeout: "7 days",

    });


    console.log("Approved by:", approval?.approvedBy);


    const result = await step.do("execute", async () => {

      return executeRequest(request);

    });


    await step.reportComplete(result);

    return result;

  }

}


class MyAgent extends Agent {

  async handleApproval(instanceId, userId) {

    await this.approveWorkflow(instanceId, {

      reason: "Approved by admin",

      metadata: { approvedBy: userId },

    });

  }


  async handleRejection(instanceId, reason) {

    await this.rejectWorkflow(instanceId, { reason });

  }

}


Explain Code

TypeScript


import { AgentWorkflow } from "agents/workflows";

import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";


export class ApprovalWorkflow extends AgentWorkflow<MyAgent, RequestParams> {

  async run(event: AgentWorkflowEvent<RequestParams>, step: AgentWorkflowStep) {

    const request = await step.do("prepare", async () => {

      return { ...event.payload, preparedAt: Date.now() };

    });


    await this.reportProgress({

      step: "approval",

      status: "pending",

      message: "Awaiting approval",

    });


    // Throws WorkflowRejectedError if rejected

    const approval = await this.waitForApproval<{ approvedBy: string }>(step, {

      timeout: "7 days",

    });


    console.log("Approved by:", approval?.approvedBy);


    const result = await step.do("execute", async () => {

      return executeRequest(request);

    });


    await step.reportComplete(result);

    return result;

  }

}


class MyAgent extends Agent {

  async handleApproval(instanceId: string, userId: string) {

    await this.approveWorkflow(instanceId, {

      reason: "Approved by admin",

      metadata: { approvedBy: userId },

    });

  }


  async handleRejection(instanceId: string, reason: string) {

    await this.rejectWorkflow(instanceId, { reason });

  }

}


Explain Code

带退避的重试

JavaScript


import { AgentWorkflow } from "agents/workflows";

export class ResilientWorkflow extends AgentWorkflow {

  async run(event, step) {

    const result = await step.do(

      "call-api",

      {

        retries: { limit: 5, delay: "10 seconds", backoff: "exponential" },

        timeout: "5 minutes",

      },

      async () => {

        const response = await fetch("https://api.example.com/process", {

          method: "POST",

          body: JSON.stringify(event.payload),

        });

        if (!response.ok) throw new Error(`API error: ${response.status}`);

        return response.json();

      },

    );


    await step.reportComplete(result);

    return result;

  }

}


Explain Code

TypeScript


import { AgentWorkflow } from "agents/workflows";

import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";


export class ResilientWorkflow extends AgentWorkflow<MyAgent, TaskParams> {

  async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) {

    const result = await step.do(

      "call-api",

      {

        retries: { limit: 5, delay: "10 seconds", backoff: "exponential" },

        timeout: "5 minutes",

      },

      async () => {

        const response = await fetch("https://api.example.com/process", {

          method: "POST",

          body: JSON.stringify(event.payload),

        });

        if (!response.ok) throw new Error(`API error: ${response.status}`);

        return response.json();

      },

    );


    await step.reportComplete(result);

    return result;

  }

}


Explain Code

状态同步

Workflow 可以通过 step 持久化地更新 Agent state,并自动广播给所有连接的客户端:

JavaScript


import { AgentWorkflow } from "agents/workflows";

export class StatefulWorkflow extends AgentWorkflow {

  async run(event, step) {

    // Replace entire state (durable, broadcasts to clients)

    await step.updateAgentState({

      currentTask: {

        id: event.payload.taskId,

        status: "processing",

        startedAt: Date.now(),

      },

    });


    const result = await step.do("process", async () =>

      processTask(event.payload),

    );


    // Merge partial state (durable, keeps existing fields)

    await step.mergeAgentState({

      currentTask: { status: "complete", result, completedAt: Date.now() },

    });


    await step.reportComplete(result);

    return result;

  }

}


Explain Code

TypeScript


import { AgentWorkflow } from "agents/workflows";

import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";


export class StatefulWorkflow extends AgentWorkflow<MyAgent, TaskParams> {

  async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) {

    // Replace entire state (durable, broadcasts to clients)

    await step.updateAgentState({

      currentTask: {

        id: event.payload.taskId,

        status: "processing",

        startedAt: Date.now(),

      },

    });


    const result = await step.do("process", async () =>

      processTask(event.payload),

    );


    // Merge partial state (durable, keeps existing fields)

    await step.mergeAgentState({

      currentTask: { status: "complete", result, completedAt: Date.now() },

    });


    await step.reportComplete(result);

    return result;

  }

}


Explain Code

自定义进度类型

为特定领域定义自定义的进度类型:

JavaScript


import { AgentWorkflow } from "agents/workflows";

// Custom progress type for data pipeline

// Workflow with custom progress type (3rd type parameter)

export class ETLWorkflow extends AgentWorkflow {

  async run(event, step) {

    await this.reportProgress({

      stage: "extract",

      recordsProcessed: 0,

      totalRecords: 1000,

      currentTable: "users",

    });


    // ... processing

  }

}


// Agent receives typed progress

class MyAgent extends Agent {

  async onWorkflowProgress(workflowName, instanceId, progress) {

    const p = progress;

    console.log(`Stage: ${p.stage}, ${p.recordsProcessed}/${p.totalRecords}`);

  }

}


Explain Code

TypeScript


import { AgentWorkflow } from "agents/workflows";

import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";


// Custom progress type for data pipeline

type PipelineProgress = {

  stage: "extract" | "transform" | "load";

  recordsProcessed: number;

  totalRecords: number;

  currentTable?: string;

};


// Workflow with custom progress type (3rd type parameter)

export class ETLWorkflow extends AgentWorkflow<

  MyAgent,

  ETLParams,

  PipelineProgress

> {

  async run(event: AgentWorkflowEvent<ETLParams>, step: AgentWorkflowStep) {

    await this.reportProgress({

      stage: "extract",

      recordsProcessed: 0,

      totalRecords: 1000,

      currentTable: "users",

    });


    // ... processing

  }

}


// Agent receives typed progress

class MyAgent extends Agent {

  async onWorkflowProgress(

    workflowName: string,

    instanceId: string,

    progress: unknown,

  ) {

    const p = progress as PipelineProgress;

    console.log(`Stage: ${p.stage}, ${p.recordsProcessed}/${p.totalRecords}`);

  }

}


Explain Code

清理策略

内部的 cf_agents_workflows 表可能无限增长,建议实施保留策略:

JavaScript


class MyAgent extends Agent {

  // Option 1: Delete on completion

  async onWorkflowComplete(workflowName, instanceId, result) {

    // Process result first, then delete

    this.deleteWorkflow(instanceId);

  }


  // Option 2: Scheduled cleanup (keep recent history)

  async cleanupOldWorkflows() {

    this.deleteWorkflows({

      status: ["complete", "errored"],

      createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),

    });

  }


  // Option 3: Keep all history for compliance/auditing

  // Don't call deleteWorkflows() - query historical data as needed

}


Explain Code

TypeScript


class MyAgent extends Agent {

  // Option 1: Delete on completion

  async onWorkflowComplete(

    workflowName: string,

    instanceId: string,

    result?: unknown,

  ) {

    // Process result first, then delete

    this.deleteWorkflow(instanceId);

  }


  // Option 2: Scheduled cleanup (keep recent history)

  async cleanupOldWorkflows() {

    this.deleteWorkflows({

      status: ["complete", "errored"],

      createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),

    });

  }


  // Option 3: Keep all history for compliance/auditing

  // Don't call deleteWorkflows() - query historical data as needed

}


Explain Code

双向通信

Workflow 到 Agent

JavaScript


// Direct RPC call (typed)

await this.agent.updateTaskStatus(taskId, "processing");

const data = await this.agent.getData(taskId);


// Non-durable callbacks (may repeat on retry, use for frequent updates)

await this.reportProgress({ step: "process", percent: 0.5 });

this.broadcastToClients({ type: "update", data });


// Durable callbacks via step (idempotent, won't repeat on retry)

await step.reportComplete(result);

await step.reportError("Something went wrong");

await step.sendEvent({ type: "custom", data: {} });


// Durable state synchronization via step (broadcasts to clients)

await step.updateAgentState({ status: "processing" });

await step.mergeAgentState({ progress: 0.5 });


Explain Code

TypeScript


// Direct RPC call (typed)

await this.agent.updateTaskStatus(taskId, "processing");

const data = await this.agent.getData(taskId);


// Non-durable callbacks (may repeat on retry, use for frequent updates)

await this.reportProgress({ step: "process", percent: 0.5 });

this.broadcastToClients({ type: "update", data });


// Durable callbacks via step (idempotent, won't repeat on retry)

await step.reportComplete(result);

await step.reportError("Something went wrong");

await step.sendEvent({ type: "custom", data: {} });


// Durable state synchronization via step (broadcasts to clients)

await step.updateAgentState({ status: "processing" });

await step.mergeAgentState({ progress: 0.5 });


Explain Code

Agent 到 Workflow

JavaScript


// Send event to waiting workflow

await this.sendWorkflowEvent("MY_WORKFLOW", instanceId, {

  type: "custom-event",

  payload: { action: "proceed" },

});


// Approve/reject workflows using convenience methods

await this.approveWorkflow(instanceId, {

  reason: "Approved by admin",

  metadata: { approvedBy: userId },

});


await this.rejectWorkflow(instanceId, { reason: "Request denied" });


Explain Code

TypeScript


// Send event to waiting workflow

await this.sendWorkflowEvent("MY_WORKFLOW", instanceId, {

  type: "custom-event",

  payload: { action: "proceed" },

});


// Approve/reject workflows using convenience methods

await this.approveWorkflow(instanceId, {

  reason: "Approved by admin",

  metadata: { approvedBy: userId },

});


await this.rejectWorkflow(instanceId, { reason: "Request denied" });


Explain Code

最佳实践

  1. 保持 workflow 聚焦 —— 一个 workflow 对应一个逻辑任务
  2. 使用有意义的 step 名称 —— 便于调试与可观测性
  3. 定期上报进度 —— 让用户随时了解情况
  4. 优雅地处理错误 —— 在抛出异常前调用 reportError()
  5. 清理已完成的 workflow —— 为跟踪表实施保留策略
  6. 处理 workflow binding 重命名 —— 在 wrangler.jsonc 中重命名 workflow binding 时使用 migrateWorkflowBinding()

限制

限制项数值
最大 step 数每个 workflow 默认 10,000 步,可配置至 25,000 步
state 大小每个 workflow 10 MB
事件等待时间最长 1 年
单个 step 执行时间每个 step 30 分钟

Workflow 不能直接打开 WebSocket 连接。请通过 broadcastToClients() 经由 Agent 与已连接的客户端通信。

相关资源

Workflows documentation 了解 Cloudflare Workflows 的基础知识。

Store and sync state 持久化与同步 Agent 状态。

Schedule tasks 基于时间的任务执行。

Human-in-the-loop 审批流与人工介入模式。