Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Human-in-the-loop 模式

Human-in-the-loop(HITL,人在回路)模式让 agent 可以暂停执行,等待人类的批准、确认或输入后再继续。这对于 agentic 系统的合规、安全与监督至关重要。

为什么需要 human-in-the-loop?

  • 合规: 监管要求可能强制某些操作必须经过人工审批
  • 安全: 高风险操作(支付、删除、对外通信)需要监督
  • 质量: 人工审查可以发现 AI 漏掉的错误
  • 信任: 用户在能够批准关键操作时会更有信心

常见用例

用例示例
财务审批报销单、支付处理
内容审核发布、邮件发送
数据操作批量删除、导出
AI 工具执行在工具运行前确认调用
访问控制授予权限、角色变更

选择合适的模式

Cloudflare 为 human-in-the-loop 提供两种主要模式:

模式最适合的场景关键 API
Workflow 审批多步骤流程,持久化的审批关卡waitForApproval()
MCP elicitationMCP 服务器在调用过程中向用户索取结构化输入elicitInput()

决策指南:

  • 当你需要持久化、多步骤、能等待数小时、数天或数周的审批流程时,使用 Workflow 审批
  • 当你构建的 MCP 服务器需要在工具执行过程中向用户请求额外结构化输入时,使用 MCP elicitation

基于 Workflow 的审批

对于持久化的多步骤流程,使用 Cloudflare WorkflowswaitForApproval() 方法。Workflow 会暂停,直到有人批准或拒绝。

基本模式

JavaScript


import { Agent } from "agents";

import { AgentWorkflow } from "agents/workflows";

export class ExpenseWorkflow extends AgentWorkflow {

  async run(event, step) {

    const expense = event.payload;


    // Step 1: Validate the expense

    const validated = await step.do("validate", async () => {

      if (expense.amount <= 0) {

        throw new Error("Invalid expense amount");

      }

      return { ...expense, validatedAt: Date.now() };

    });


    // Step 2: Report that we are waiting for approval

    await this.reportProgress({

      step: "approval",

      status: "pending",

      message: `Awaiting approval for $${expense.amount}`,

    });


    // Step 3: Wait for human approval (pauses the workflow)

    const approval = await this.waitForApproval(step, {

      timeout: "7 days",

    });


    console.log(`Approved by: ${approval?.approvedBy}`);


    // Step 4: Process the approved expense

    const result = await step.do("process", async () => {

      return { expenseId: crypto.randomUUID(), ...validated };

    });


    await step.reportComplete(result);

    return result;

  }

}


TypeScript


import { Agent } from "agents";

import { AgentWorkflow } from "agents/workflows";

import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";


type ExpenseParams = {

  amount: number;

  description: string;

  requestedBy: string;

};


export class ExpenseWorkflow extends AgentWorkflow<

  ExpenseAgent,

  ExpenseParams

> {

  async run(event: AgentWorkflowEvent<ExpenseParams>, step: AgentWorkflowStep) {

    const expense = event.payload;


    // Step 1: Validate the expense

    const validated = await step.do("validate", async () => {

      if (expense.amount <= 0) {

        throw new Error("Invalid expense amount");

      }

      return { ...expense, validatedAt: Date.now() };

    });


    // Step 2: Report that we are waiting for approval

    await this.reportProgress({

      step: "approval",

      status: "pending",

      message: `Awaiting approval for $${expense.amount}`,

    });


    // Step 3: Wait for human approval (pauses the workflow)

    const approval = await this.waitForApproval<{ approvedBy: string }>(step, {

      timeout: "7 days",

    });


    console.log(`Approved by: ${approval?.approvedBy}`);


    // Step 4: Process the approved expense

    const result = await step.do("process", async () => {

      return { expenseId: crypto.randomUUID(), ...validated };

    });


    await step.reportComplete(result);

    return result;

  }

}


用于审批的 agent 方法

Agent 提供了批准或拒绝等待中 workflow 的方法:

JavaScript


import { Agent, callable } from "agents";


export class ExpenseAgent extends Agent {

  initialState = {

    pendingApprovals: [],

  };


  // Approve a waiting workflow

  @callable()

  async approve(workflowId, approvedBy) {

    await this.approveWorkflow(workflowId, {

      reason: "Expense approved",

      metadata: { approvedBy, approvedAt: Date.now() },

    });


    // Update state to reflect approval

    this.setState({

      ...this.state,

      pendingApprovals: this.state.pendingApprovals.filter(

        (p) => p.workflowId !== workflowId,

      ),

    });

  }


  // Reject a waiting workflow

  @callable()

  async reject(workflowId, reason) {

    await this.rejectWorkflow(workflowId, { reason });


    this.setState({

      ...this.state,

      pendingApprovals: this.state.pendingApprovals.filter(

        (p) => p.workflowId !== workflowId,

      ),

    });

  }


  // Track workflow progress to update pending approvals

  async onWorkflowProgress(workflowName, workflowId, progress) {

    const p = progress;


    if (p.step === "approval" && p.status === "pending") {

      // Add to pending approvals list for UI display

      this.setState({

        ...this.state,

        pendingApprovals: [

          ...this.state.pendingApprovals,

          {

            workflowId,

            amount: 0, // Would come from workflow params

            description: p.message || "",

            requestedBy: "user",

            requestedAt: Date.now(),

          },

        ],

      });

    }

  }

}


TypeScript


import { Agent, callable } from "agents";


type PendingApproval = {

  workflowId: string;

  amount: number;

  description: string;

  requestedBy: string;

  requestedAt: number;

};


type ExpenseState = {

  pendingApprovals: PendingApproval[];

};


export class ExpenseAgent extends Agent<Env, ExpenseState> {

  initialState: ExpenseState = {

    pendingApprovals: [],

  };


  // Approve a waiting workflow

  @callable()

  async approve(workflowId: string, approvedBy: string): Promise<void> {

    await this.approveWorkflow(workflowId, {

      reason: "Expense approved",

      metadata: { approvedBy, approvedAt: Date.now() },

    });


    // Update state to reflect approval

    this.setState({

      ...this.state,

      pendingApprovals: this.state.pendingApprovals.filter(

        (p) => p.workflowId !== workflowId,

      ),

    });

  }


  // Reject a waiting workflow

  @callable()

  async reject(workflowId: string, reason: string): Promise<void> {

    await this.rejectWorkflow(workflowId, { reason });


    this.setState({

      ...this.state,

      pendingApprovals: this.state.pendingApprovals.filter(

        (p) => p.workflowId !== workflowId,

      ),

    });

  }


  // Track workflow progress to update pending approvals

  async onWorkflowProgress(

    workflowName: string,

    workflowId: string,

    progress: unknown,

  ): Promise<void> {

    const p = progress as { step: string; status: string; message?: string };


    if (p.step === "approval" && p.status === "pending") {

      // Add to pending approvals list for UI display

      this.setState({

        ...this.state,

        pendingApprovals: [

          ...this.state.pendingApprovals,

          {

            workflowId,

            amount: 0, // Would come from workflow params

            description: p.message || "",

            requestedBy: "user",

            requestedAt: Date.now(),

          },

        ],

      });

    }

  }

}


超时处理

设置超时,避免 workflow 无限期等待:

JavaScript


const approval = await this.waitForApproval(step, {

  timeout: "7 days", // Also supports: "1 hour", "30 minutes", etc.

});


if (!approval) {

  // Timeout expired - escalate or auto-reject

  await step.reportError("Approval timeout - escalating to manager");

  throw new Error("Approval timeout");

}


TypeScript


const approval = await this.waitForApproval<{ approvedBy: string }>(step, {

  timeout: "7 days", // Also supports: "1 hour", "30 minutes", etc.

});


if (!approval) {

  // Timeout expired - escalate or auto-reject

  await step.reportError("Approval timeout - escalating to manager");

  throw new Error("Approval timeout");

}


通过定时任务进行升级

使用 schedule() 设置升级提醒:

JavaScript


import { Agent, callable } from "agents";


class ExpenseAgent extends Agent {

  @callable()

  async submitForApproval(expense) {

    // Start the approval workflow

    const workflowId = await this.runWorkflow("EXPENSE_WORKFLOW", expense);


    // Schedule reminder after 4 hours

    await this.schedule(Date.now() + 4 * 60 * 60 * 1000, "sendReminder", {

      workflowId,

    });


    // Schedule escalation after 24 hours

    await this.schedule(Date.now() + 24 * 60 * 60 * 1000, "escalateApproval", {

      workflowId,

    });


    return workflowId;

  }


  async sendReminder(payload) {

    const workflow = this.getWorkflow(payload.workflowId);

    if (workflow?.status === "waiting") {

      // Send reminder notification

      console.log("Reminder: approval still pending");

    }

  }


  async escalateApproval(payload) {

    const workflow = this.getWorkflow(payload.workflowId);

    if (workflow?.status === "waiting") {

      // Escalate to manager

      console.log("Escalating to manager");

    }

  }

}


TypeScript


import { Agent, callable } from "agents";


class ExpenseAgent extends Agent<Env, ExpenseState> {

  @callable()

  async submitForApproval(expense: ExpenseParams): Promise<string> {

    // Start the approval workflow

    const workflowId = await this.runWorkflow("EXPENSE_WORKFLOW", expense);


    // Schedule reminder after 4 hours

    await this.schedule(Date.now() + 4 * 60 * 60 * 1000, "sendReminder", {

      workflowId,

    });


    // Schedule escalation after 24 hours

    await this.schedule(Date.now() + 24 * 60 * 60 * 1000, "escalateApproval", {

      workflowId,

    });


    return workflowId;

  }


  async sendReminder(payload: { workflowId: string }) {

    const workflow = this.getWorkflow(payload.workflowId);

    if (workflow?.status === "waiting") {

      // Send reminder notification

      console.log("Reminder: approval still pending");

    }

  }


  async escalateApproval(payload: { workflowId: string }) {

    const workflow = this.getWorkflow(payload.workflowId);

    if (workflow?.status === "waiting") {

      // Escalate to manager

      console.log("Escalating to manager");

    }

  }

}


使用 SQL 维护审计日志

使用 this.sql 维护不可变的审计日志:

JavaScript


import { Agent, callable } from "agents";


class ExpenseAgent extends Agent {

  async onStart() {

    // Create audit table

    this.sql`

      CREATE TABLE IF NOT EXISTS approval_audit (

        id INTEGER PRIMARY KEY AUTOINCREMENT,

        workflow_id TEXT NOT NULL,

        decision TEXT NOT NULL CHECK(decision IN ('approved', 'rejected')),

        decided_by TEXT NOT NULL,

        decided_at INTEGER NOT NULL,

        reason TEXT

      )

    `;

  }


  @callable()

  async approve(workflowId, userId, reason) {

    // Record the decision in SQL (immutable audit log)

    this.sql`

      INSERT INTO approval_audit (workflow_id, decision, decided_by, decided_at, reason)

      VALUES (${workflowId}, 'approved', ${userId}, ${Date.now()}, ${reason || null})

    `;


    // Process the approval

    await this.approveWorkflow(workflowId, {

      reason: reason || "Approved",

      metadata: { approvedBy: userId },

    });

  }

}


TypeScript


import { Agent, callable } from "agents";


class ExpenseAgent extends Agent<Env, ExpenseState> {

  async onStart() {

    // Create audit table

    this.sql`

      CREATE TABLE IF NOT EXISTS approval_audit (

        id INTEGER PRIMARY KEY AUTOINCREMENT,

        workflow_id TEXT NOT NULL,

        decision TEXT NOT NULL CHECK(decision IN ('approved', 'rejected')),

        decided_by TEXT NOT NULL,

        decided_at INTEGER NOT NULL,

        reason TEXT

      )

    `;

  }


  @callable()

  async approve(

    workflowId: string,

    userId: string,

    reason?: string,

  ): Promise<void> {

    // Record the decision in SQL (immutable audit log)

    this.sql`

      INSERT INTO approval_audit (workflow_id, decision, decided_by, decided_at, reason)

      VALUES (${workflowId}, 'approved', ${userId}, ${Date.now()}, ${reason || null})

    `;


    // Process the approval

    await this.approveWorkflow(workflowId, {

      reason: reason || "Approved",

      metadata: { approvedBy: userId },

    });

  }

}


配置

JSONC


{

  "name": "expense-approval",

  "main": "src/index.ts",

  // Set this to today's date

  "compatibility_date": "2026-04-29",

  "compatibility_flags": ["nodejs_compat"],

  "durable_objects": {

    "bindings": [{ "name": "EXPENSE_AGENT", "class_name": "ExpenseAgent" }],

  },

  "workflows": [

    {

      "name": "expense-workflow",

      "binding": "EXPENSE_WORKFLOW",

      "class_name": "ExpenseWorkflow",

    },

  ],

  "migrations": [{ "tag": "v1", "new_sqlite_classes": ["ExpenseAgent"] }],

}


TOML


name = "expense-approval"

main = "src/index.ts"

# Set this to today's date

compatibility_date = "2026-04-29"

compatibility_flags = [ "nodejs_compat" ]


[[durable_objects.bindings]]

name = "EXPENSE_AGENT"

class_name = "ExpenseAgent"


[[workflows]]

name = "expense-workflow"

binding = "EXPENSE_WORKFLOW"

class_name = "ExpenseWorkflow"


[[migrations]]

tag = "v1"

new_sqlite_classes = [ "ExpenseAgent" ]


MCP elicitation

使用 McpAgent 构建 MCP 服务器时,你可以在工具执行过程中通过 elicitation 请求用户提供更多输入。MCP 客户端会基于你给出的 JSON Schema 渲染表单,并把用户的响应返回给你。

基本模式

JavaScript


import { McpAgent } from "agents/mcp";

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";

import { z } from "zod";


export class CounterMCP extends McpAgent {

  server = new McpServer({

    name: "counter-server",

    version: "1.0.0",

  });


  initialState = { counter: 0 };


  async init() {

    this.server.tool(

      "increase-counter",

      "Increase the counter by a user-specified amount",

      { confirm: z.boolean().describe("Do you want to increase the counter?") },

      async ({ confirm }, extra) => {

        if (!confirm) {

          return { content: [{ type: "text", text: "Cancelled." }] };

        }


        // Request additional input from the user

        const userInput = await this.server.server.elicitInput(

          {

            message: "By how much do you want to increase the counter?",

            requestedSchema: {

              type: "object",

              properties: {

                amount: {

                  type: "number",

                  title: "Amount",

                  description: "The amount to increase the counter by",

                },

              },

              required: ["amount"],

            },

          },

          { relatedRequestId: extra.requestId },

        );


        // Check if user accepted or cancelled

        if (userInput.action !== "accept" || !userInput.content) {

          return { content: [{ type: "text", text: "Cancelled." }] };

        }


        // Use the input

        const amount = Number(userInput.content.amount);

        this.setState({

          ...this.state,

          counter: this.state.counter + amount,

        });


        return {

          content: [

            {

              type: "text",

              text: `Counter increased by ${amount}, now at ${this.state.counter}`,

            },

          ],

        };

      },

    );

  }

}


TypeScript


import { McpAgent } from "agents/mcp";

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";

import { z } from "zod";


type State = { counter: number };


export class CounterMCP extends McpAgent<Env, State, {}> {

  server = new McpServer({

    name: "counter-server",

    version: "1.0.0",

  });


  initialState: State = { counter: 0 };


  async init() {

    this.server.tool(

      "increase-counter",

      "Increase the counter by a user-specified amount",

      { confirm: z.boolean().describe("Do you want to increase the counter?") },

      async ({ confirm }, extra) => {

        if (!confirm) {

          return { content: [{ type: "text", text: "Cancelled." }] };

        }


        // Request additional input from the user

        const userInput = await this.server.server.elicitInput(

          {

            message: "By how much do you want to increase the counter?",

            requestedSchema: {

              type: "object",

              properties: {

                amount: {

                  type: "number",

                  title: "Amount",

                  description: "The amount to increase the counter by",

                },

              },

              required: ["amount"],

            },

          },

          { relatedRequestId: extra.requestId },

        );


        // Check if user accepted or cancelled

        if (userInput.action !== "accept" || !userInput.content) {

          return { content: [{ type: "text", text: "Cancelled." }] };

        }


        // Use the input

        const amount = Number(userInput.content.amount);

        this.setState({

          ...this.state,

          counter: this.state.counter + amount,

        });


        return {

          content: [

            {

              type: "text",

              text: `Counter increased by ${amount}, now at ${this.state.counter}`,

            },

          ],

        };

      },

    );

  }

}


Elicitation 与 workflow 审批的对比

维度MCP ElicitationWorkflow 审批
场景MCP 服务器的工具执行过程多步骤的 workflow 流程
时长即时(在工具调用内)可等待数小时/数天/数周
UI基于 JSON Schema 的表单通过 agent 状态自定义 UI
状态MCP 会话状态持久化的 workflow 状态
用例工具中的交互式输入流水线中的审批关卡

构建审批 UI

待审批列表

使用 agent 的状态在 UI 中展示待审批项:


import { useAgent } from "agents/react";


function PendingApprovals() {

  const { state, agent } = useAgent({

    agent: "expense-agent",

    name: "main",

  });


  if (!state?.pendingApprovals?.length) {

    return <p>No pending approvals</p>;

  }


  return (

    <div className="approval-list">

      {state.pendingApprovals.map((item) => (

        <div key={item.workflowId} className="approval-card">

          <h3>${item.amount}</h3>

          <p>{item.description}</p>

          <p>Requested by {item.requestedBy}</p>


          <div className="actions">

            <button

              onClick={() => agent.stub.approve(item.workflowId, "admin")}

            >

              Approve

            </button>

            <button

              onClick={() => agent.stub.reject(item.workflowId, "Declined")}

            >

              Reject

            </button>

          </div>

        </div>

      ))}

    </div>

  );

}


多审批人模式

对于需要多人审批的敏感操作:

JavaScript


import { Agent, callable } from "agents";


class MultiApprovalAgent extends Agent {

  @callable()

  async approveMulti(workflowId, userId) {

    const approval = this.state.pendingMultiApprovals.find(

      (p) => p.workflowId === workflowId,

    );

    if (!approval) throw new Error("Approval not found");


    // Check if user already approved

    if (approval.currentApprovals.some((a) => a.userId === userId)) {

      throw new Error("Already approved by this user");

    }


    // Add this user's approval

    approval.currentApprovals.push({ userId, approvedAt: Date.now() });


    // Check if we have enough approvals

    if (approval.currentApprovals.length >= approval.requiredApprovals) {

      // Execute the approved action

      await this.approveWorkflow(workflowId, {

        metadata: { approvers: approval.currentApprovals },

      });

      return true;

    }


    this.setState({ ...this.state });

    return false; // Still waiting for more approvals

  }

}


TypeScript


import { Agent, callable } from "agents";


type MultiApproval = {

  workflowId: string;

  requiredApprovals: number;

  currentApprovals: Array<{ userId: string; approvedAt: number }>;

  rejections: Array<{ userId: string; rejectedAt: number; reason: string }>;

};


type State = {

  pendingMultiApprovals: MultiApproval[];

};


class MultiApprovalAgent extends Agent<Env, State> {

  @callable()

  async approveMulti(workflowId: string, userId: string): Promise<boolean> {

    const approval = this.state.pendingMultiApprovals.find(

      (p) => p.workflowId === workflowId,

    );

    if (!approval) throw new Error("Approval not found");


    // Check if user already approved

    if (approval.currentApprovals.some((a) => a.userId === userId)) {

      throw new Error("Already approved by this user");

    }


    // Add this user's approval

    approval.currentApprovals.push({ userId, approvedAt: Date.now() });


    // Check if we have enough approvals

    if (approval.currentApprovals.length >= approval.requiredApprovals) {

      // Execute the approved action

      await this.approveWorkflow(workflowId, {

        metadata: { approvers: approval.currentApprovals },

      });

      return true;

    }


    this.setState({ ...this.state });

    return false; // Still waiting for more approvals

  }

}


最佳实践

  1. 明确审批标准 — 仅对有实际后果的操作(支付、邮件、数据变更)要求确认
  2. 提供详细上下文 — 向用户清晰展示操作将做什么,包括所有参数
  3. 设置超时 — 用 schedule() 在合理时间后升级或自动拒绝
  4. 维护审计日志 — 用 this.sql 记录所有审批决策,以满足合规要求
  5. 处理连接断开 — 把待审批项保存在 agent 状态里,这样断线重连后仍然存在
  6. 优雅降级 — 在审批被拒绝时提供回退行为

后续步骤

运行 Workflows waitForApproval() 的完整 API 参考。

MCP 服务器 构建带 elicitation 的 MCP agent。

邮件通知 为待审批项发送通知。

定时任务 用 schedule 实现审批超时。