Human-in-the-loop 模式
Human-in-the-loop(HITL,人在回路)模式让 agent 可以暂停执行,等待人类的批准、确认或输入后再继续。这对于 agentic 系统的合规、安全与监督至关重要。
为什么需要 human-in-the-loop?
- 合规: 监管要求可能强制某些操作必须经过人工审批
- 安全: 高风险操作(支付、删除、对外通信)需要监督
- 质量: 人工审查可以发现 AI 漏掉的错误
- 信任: 用户在能够批准关键操作时会更有信心
常见用例
| 用例 | 示例 |
|---|---|
| 财务审批 | 报销单、支付处理 |
| 内容审核 | 发布、邮件发送 |
| 数据操作 | 批量删除、导出 |
| AI 工具执行 | 在工具运行前确认调用 |
| 访问控制 | 授予权限、角色变更 |
选择合适的模式
Cloudflare 为 human-in-the-loop 提供两种主要模式:
| 模式 | 最适合的场景 | 关键 API |
|---|---|---|
| Workflow 审批 | 多步骤流程,持久化的审批关卡 | waitForApproval() |
| MCP elicitation | MCP 服务器在调用过程中向用户索取结构化输入 | elicitInput() |
决策指南:
- 当你需要持久化、多步骤、能等待数小时、数天或数周的审批流程时,使用 Workflow 审批
- 当你构建的 MCP 服务器需要在工具执行过程中向用户请求额外结构化输入时,使用 MCP elicitation
基于 Workflow 的审批
对于持久化的多步骤流程,使用 Cloudflare Workflows 与 waitForApproval() 方法。Workflow 会暂停,直到有人批准或拒绝。
基本模式
JavaScript
import { Agent } from "agents";
import { AgentWorkflow } from "agents/workflows";
export class ExpenseWorkflow extends AgentWorkflow {
async run(event, step) {
const expense = event.payload;
// Step 1: Validate the expense
const validated = await step.do("validate", async () => {
if (expense.amount <= 0) {
throw new Error("Invalid expense amount");
}
return { ...expense, validatedAt: Date.now() };
});
// Step 2: Report that we are waiting for approval
await this.reportProgress({
step: "approval",
status: "pending",
message: `Awaiting approval for $${expense.amount}`,
});
// Step 3: Wait for human approval (pauses the workflow)
const approval = await this.waitForApproval(step, {
timeout: "7 days",
});
console.log(`Approved by: ${approval?.approvedBy}`);
// Step 4: Process the approved expense
const result = await step.do("process", async () => {
return { expenseId: crypto.randomUUID(), ...validated };
});
await step.reportComplete(result);
return result;
}
}
TypeScript
import { Agent } from "agents";
import { AgentWorkflow } from "agents/workflows";
import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";
type ExpenseParams = {
amount: number;
description: string;
requestedBy: string;
};
export class ExpenseWorkflow extends AgentWorkflow<
ExpenseAgent,
ExpenseParams
> {
async run(event: AgentWorkflowEvent<ExpenseParams>, step: AgentWorkflowStep) {
const expense = event.payload;
// Step 1: Validate the expense
const validated = await step.do("validate", async () => {
if (expense.amount <= 0) {
throw new Error("Invalid expense amount");
}
return { ...expense, validatedAt: Date.now() };
});
// Step 2: Report that we are waiting for approval
await this.reportProgress({
step: "approval",
status: "pending",
message: `Awaiting approval for $${expense.amount}`,
});
// Step 3: Wait for human approval (pauses the workflow)
const approval = await this.waitForApproval<{ approvedBy: string }>(step, {
timeout: "7 days",
});
console.log(`Approved by: ${approval?.approvedBy}`);
// Step 4: Process the approved expense
const result = await step.do("process", async () => {
return { expenseId: crypto.randomUUID(), ...validated };
});
await step.reportComplete(result);
return result;
}
}
用于审批的 agent 方法
Agent 提供了批准或拒绝等待中 workflow 的方法:
JavaScript
import { Agent, callable } from "agents";
export class ExpenseAgent extends Agent {
initialState = {
pendingApprovals: [],
};
// Approve a waiting workflow
@callable()
async approve(workflowId, approvedBy) {
await this.approveWorkflow(workflowId, {
reason: "Expense approved",
metadata: { approvedBy, approvedAt: Date.now() },
});
// Update state to reflect approval
this.setState({
...this.state,
pendingApprovals: this.state.pendingApprovals.filter(
(p) => p.workflowId !== workflowId,
),
});
}
// Reject a waiting workflow
@callable()
async reject(workflowId, reason) {
await this.rejectWorkflow(workflowId, { reason });
this.setState({
...this.state,
pendingApprovals: this.state.pendingApprovals.filter(
(p) => p.workflowId !== workflowId,
),
});
}
// Track workflow progress to update pending approvals
async onWorkflowProgress(workflowName, workflowId, progress) {
const p = progress;
if (p.step === "approval" && p.status === "pending") {
// Add to pending approvals list for UI display
this.setState({
...this.state,
pendingApprovals: [
...this.state.pendingApprovals,
{
workflowId,
amount: 0, // Would come from workflow params
description: p.message || "",
requestedBy: "user",
requestedAt: Date.now(),
},
],
});
}
}
}
TypeScript
import { Agent, callable } from "agents";
type PendingApproval = {
workflowId: string;
amount: number;
description: string;
requestedBy: string;
requestedAt: number;
};
type ExpenseState = {
pendingApprovals: PendingApproval[];
};
export class ExpenseAgent extends Agent<Env, ExpenseState> {
initialState: ExpenseState = {
pendingApprovals: [],
};
// Approve a waiting workflow
@callable()
async approve(workflowId: string, approvedBy: string): Promise<void> {
await this.approveWorkflow(workflowId, {
reason: "Expense approved",
metadata: { approvedBy, approvedAt: Date.now() },
});
// Update state to reflect approval
this.setState({
...this.state,
pendingApprovals: this.state.pendingApprovals.filter(
(p) => p.workflowId !== workflowId,
),
});
}
// Reject a waiting workflow
@callable()
async reject(workflowId: string, reason: string): Promise<void> {
await this.rejectWorkflow(workflowId, { reason });
this.setState({
...this.state,
pendingApprovals: this.state.pendingApprovals.filter(
(p) => p.workflowId !== workflowId,
),
});
}
// Track workflow progress to update pending approvals
async onWorkflowProgress(
workflowName: string,
workflowId: string,
progress: unknown,
): Promise<void> {
const p = progress as { step: string; status: string; message?: string };
if (p.step === "approval" && p.status === "pending") {
// Add to pending approvals list for UI display
this.setState({
...this.state,
pendingApprovals: [
...this.state.pendingApprovals,
{
workflowId,
amount: 0, // Would come from workflow params
description: p.message || "",
requestedBy: "user",
requestedAt: Date.now(),
},
],
});
}
}
}
超时处理
设置超时,避免 workflow 无限期等待:
JavaScript
const approval = await this.waitForApproval(step, {
timeout: "7 days", // Also supports: "1 hour", "30 minutes", etc.
});
if (!approval) {
// Timeout expired - escalate or auto-reject
await step.reportError("Approval timeout - escalating to manager");
throw new Error("Approval timeout");
}
TypeScript
const approval = await this.waitForApproval<{ approvedBy: string }>(step, {
timeout: "7 days", // Also supports: "1 hour", "30 minutes", etc.
});
if (!approval) {
// Timeout expired - escalate or auto-reject
await step.reportError("Approval timeout - escalating to manager");
throw new Error("Approval timeout");
}
通过定时任务进行升级
使用 schedule() 设置升级提醒:
JavaScript
import { Agent, callable } from "agents";
class ExpenseAgent extends Agent {
@callable()
async submitForApproval(expense) {
// Start the approval workflow
const workflowId = await this.runWorkflow("EXPENSE_WORKFLOW", expense);
// Schedule reminder after 4 hours
await this.schedule(Date.now() + 4 * 60 * 60 * 1000, "sendReminder", {
workflowId,
});
// Schedule escalation after 24 hours
await this.schedule(Date.now() + 24 * 60 * 60 * 1000, "escalateApproval", {
workflowId,
});
return workflowId;
}
async sendReminder(payload) {
const workflow = this.getWorkflow(payload.workflowId);
if (workflow?.status === "waiting") {
// Send reminder notification
console.log("Reminder: approval still pending");
}
}
async escalateApproval(payload) {
const workflow = this.getWorkflow(payload.workflowId);
if (workflow?.status === "waiting") {
// Escalate to manager
console.log("Escalating to manager");
}
}
}
TypeScript
import { Agent, callable } from "agents";
class ExpenseAgent extends Agent<Env, ExpenseState> {
@callable()
async submitForApproval(expense: ExpenseParams): Promise<string> {
// Start the approval workflow
const workflowId = await this.runWorkflow("EXPENSE_WORKFLOW", expense);
// Schedule reminder after 4 hours
await this.schedule(Date.now() + 4 * 60 * 60 * 1000, "sendReminder", {
workflowId,
});
// Schedule escalation after 24 hours
await this.schedule(Date.now() + 24 * 60 * 60 * 1000, "escalateApproval", {
workflowId,
});
return workflowId;
}
async sendReminder(payload: { workflowId: string }) {
const workflow = this.getWorkflow(payload.workflowId);
if (workflow?.status === "waiting") {
// Send reminder notification
console.log("Reminder: approval still pending");
}
}
async escalateApproval(payload: { workflowId: string }) {
const workflow = this.getWorkflow(payload.workflowId);
if (workflow?.status === "waiting") {
// Escalate to manager
console.log("Escalating to manager");
}
}
}
使用 SQL 维护审计日志
使用 this.sql 维护不可变的审计日志:
JavaScript
import { Agent, callable } from "agents";
class ExpenseAgent extends Agent {
async onStart() {
// Create audit table
this.sql`
CREATE TABLE IF NOT EXISTS approval_audit (
id INTEGER PRIMARY KEY AUTOINCREMENT,
workflow_id TEXT NOT NULL,
decision TEXT NOT NULL CHECK(decision IN ('approved', 'rejected')),
decided_by TEXT NOT NULL,
decided_at INTEGER NOT NULL,
reason TEXT
)
`;
}
@callable()
async approve(workflowId, userId, reason) {
// Record the decision in SQL (immutable audit log)
this.sql`
INSERT INTO approval_audit (workflow_id, decision, decided_by, decided_at, reason)
VALUES (${workflowId}, 'approved', ${userId}, ${Date.now()}, ${reason || null})
`;
// Process the approval
await this.approveWorkflow(workflowId, {
reason: reason || "Approved",
metadata: { approvedBy: userId },
});
}
}
TypeScript
import { Agent, callable } from "agents";
class ExpenseAgent extends Agent<Env, ExpenseState> {
async onStart() {
// Create audit table
this.sql`
CREATE TABLE IF NOT EXISTS approval_audit (
id INTEGER PRIMARY KEY AUTOINCREMENT,
workflow_id TEXT NOT NULL,
decision TEXT NOT NULL CHECK(decision IN ('approved', 'rejected')),
decided_by TEXT NOT NULL,
decided_at INTEGER NOT NULL,
reason TEXT
)
`;
}
@callable()
async approve(
workflowId: string,
userId: string,
reason?: string,
): Promise<void> {
// Record the decision in SQL (immutable audit log)
this.sql`
INSERT INTO approval_audit (workflow_id, decision, decided_by, decided_at, reason)
VALUES (${workflowId}, 'approved', ${userId}, ${Date.now()}, ${reason || null})
`;
// Process the approval
await this.approveWorkflow(workflowId, {
reason: reason || "Approved",
metadata: { approvedBy: userId },
});
}
}
配置
JSONC
{
"name": "expense-approval",
"main": "src/index.ts",
// Set this to today's date
"compatibility_date": "2026-04-29",
"compatibility_flags": ["nodejs_compat"],
"durable_objects": {
"bindings": [{ "name": "EXPENSE_AGENT", "class_name": "ExpenseAgent" }],
},
"workflows": [
{
"name": "expense-workflow",
"binding": "EXPENSE_WORKFLOW",
"class_name": "ExpenseWorkflow",
},
],
"migrations": [{ "tag": "v1", "new_sqlite_classes": ["ExpenseAgent"] }],
}
TOML
name = "expense-approval"
main = "src/index.ts"
# Set this to today's date
compatibility_date = "2026-04-29"
compatibility_flags = [ "nodejs_compat" ]
[[durable_objects.bindings]]
name = "EXPENSE_AGENT"
class_name = "ExpenseAgent"
[[workflows]]
name = "expense-workflow"
binding = "EXPENSE_WORKFLOW"
class_name = "ExpenseWorkflow"
[[migrations]]
tag = "v1"
new_sqlite_classes = [ "ExpenseAgent" ]
MCP elicitation
使用 McpAgent 构建 MCP 服务器时,你可以在工具执行过程中通过 elicitation 请求用户提供更多输入。MCP 客户端会基于你给出的 JSON Schema 渲染表单,并把用户的响应返回给你。
基本模式
JavaScript
import { McpAgent } from "agents/mcp";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
export class CounterMCP extends McpAgent {
server = new McpServer({
name: "counter-server",
version: "1.0.0",
});
initialState = { counter: 0 };
async init() {
this.server.tool(
"increase-counter",
"Increase the counter by a user-specified amount",
{ confirm: z.boolean().describe("Do you want to increase the counter?") },
async ({ confirm }, extra) => {
if (!confirm) {
return { content: [{ type: "text", text: "Cancelled." }] };
}
// Request additional input from the user
const userInput = await this.server.server.elicitInput(
{
message: "By how much do you want to increase the counter?",
requestedSchema: {
type: "object",
properties: {
amount: {
type: "number",
title: "Amount",
description: "The amount to increase the counter by",
},
},
required: ["amount"],
},
},
{ relatedRequestId: extra.requestId },
);
// Check if user accepted or cancelled
if (userInput.action !== "accept" || !userInput.content) {
return { content: [{ type: "text", text: "Cancelled." }] };
}
// Use the input
const amount = Number(userInput.content.amount);
this.setState({
...this.state,
counter: this.state.counter + amount,
});
return {
content: [
{
type: "text",
text: `Counter increased by ${amount}, now at ${this.state.counter}`,
},
],
};
},
);
}
}
TypeScript
import { McpAgent } from "agents/mcp";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
type State = { counter: number };
export class CounterMCP extends McpAgent<Env, State, {}> {
server = new McpServer({
name: "counter-server",
version: "1.0.0",
});
initialState: State = { counter: 0 };
async init() {
this.server.tool(
"increase-counter",
"Increase the counter by a user-specified amount",
{ confirm: z.boolean().describe("Do you want to increase the counter?") },
async ({ confirm }, extra) => {
if (!confirm) {
return { content: [{ type: "text", text: "Cancelled." }] };
}
// Request additional input from the user
const userInput = await this.server.server.elicitInput(
{
message: "By how much do you want to increase the counter?",
requestedSchema: {
type: "object",
properties: {
amount: {
type: "number",
title: "Amount",
description: "The amount to increase the counter by",
},
},
required: ["amount"],
},
},
{ relatedRequestId: extra.requestId },
);
// Check if user accepted or cancelled
if (userInput.action !== "accept" || !userInput.content) {
return { content: [{ type: "text", text: "Cancelled." }] };
}
// Use the input
const amount = Number(userInput.content.amount);
this.setState({
...this.state,
counter: this.state.counter + amount,
});
return {
content: [
{
type: "text",
text: `Counter increased by ${amount}, now at ${this.state.counter}`,
},
],
};
},
);
}
}
Elicitation 与 workflow 审批的对比
| 维度 | MCP Elicitation | Workflow 审批 |
|---|---|---|
| 场景 | MCP 服务器的工具执行过程 | 多步骤的 workflow 流程 |
| 时长 | 即时(在工具调用内) | 可等待数小时/数天/数周 |
| UI | 基于 JSON Schema 的表单 | 通过 agent 状态自定义 UI |
| 状态 | MCP 会话状态 | 持久化的 workflow 状态 |
| 用例 | 工具中的交互式输入 | 流水线中的审批关卡 |
构建审批 UI
待审批列表
使用 agent 的状态在 UI 中展示待审批项:
import { useAgent } from "agents/react";
function PendingApprovals() {
const { state, agent } = useAgent({
agent: "expense-agent",
name: "main",
});
if (!state?.pendingApprovals?.length) {
return <p>No pending approvals</p>;
}
return (
<div className="approval-list">
{state.pendingApprovals.map((item) => (
<div key={item.workflowId} className="approval-card">
<h3>${item.amount}</h3>
<p>{item.description}</p>
<p>Requested by {item.requestedBy}</p>
<div className="actions">
<button
onClick={() => agent.stub.approve(item.workflowId, "admin")}
>
Approve
</button>
<button
onClick={() => agent.stub.reject(item.workflowId, "Declined")}
>
Reject
</button>
</div>
</div>
))}
</div>
);
}
多审批人模式
对于需要多人审批的敏感操作:
JavaScript
import { Agent, callable } from "agents";
class MultiApprovalAgent extends Agent {
@callable()
async approveMulti(workflowId, userId) {
const approval = this.state.pendingMultiApprovals.find(
(p) => p.workflowId === workflowId,
);
if (!approval) throw new Error("Approval not found");
// Check if user already approved
if (approval.currentApprovals.some((a) => a.userId === userId)) {
throw new Error("Already approved by this user");
}
// Add this user's approval
approval.currentApprovals.push({ userId, approvedAt: Date.now() });
// Check if we have enough approvals
if (approval.currentApprovals.length >= approval.requiredApprovals) {
// Execute the approved action
await this.approveWorkflow(workflowId, {
metadata: { approvers: approval.currentApprovals },
});
return true;
}
this.setState({ ...this.state });
return false; // Still waiting for more approvals
}
}
TypeScript
import { Agent, callable } from "agents";
type MultiApproval = {
workflowId: string;
requiredApprovals: number;
currentApprovals: Array<{ userId: string; approvedAt: number }>;
rejections: Array<{ userId: string; rejectedAt: number; reason: string }>;
};
type State = {
pendingMultiApprovals: MultiApproval[];
};
class MultiApprovalAgent extends Agent<Env, State> {
@callable()
async approveMulti(workflowId: string, userId: string): Promise<boolean> {
const approval = this.state.pendingMultiApprovals.find(
(p) => p.workflowId === workflowId,
);
if (!approval) throw new Error("Approval not found");
// Check if user already approved
if (approval.currentApprovals.some((a) => a.userId === userId)) {
throw new Error("Already approved by this user");
}
// Add this user's approval
approval.currentApprovals.push({ userId, approvedAt: Date.now() });
// Check if we have enough approvals
if (approval.currentApprovals.length >= approval.requiredApprovals) {
// Execute the approved action
await this.approveWorkflow(workflowId, {
metadata: { approvers: approval.currentApprovals },
});
return true;
}
this.setState({ ...this.state });
return false; // Still waiting for more approvals
}
}
最佳实践
- 明确审批标准 — 仅对有实际后果的操作(支付、邮件、数据变更)要求确认
- 提供详细上下文 — 向用户清晰展示操作将做什么,包括所有参数
- 设置超时 — 用
schedule()在合理时间后升级或自动拒绝 - 维护审计日志 — 用
this.sql记录所有审批决策,以满足合规要求 - 处理连接断开 — 把待审批项保存在 agent 状态里,这样断线重连后仍然存在
- 优雅降级 — 在审批被拒绝时提供回退行为
后续步骤
运行 Workflows waitForApproval() 的完整 API 参考。
MCP 服务器 构建带 elicitation 的 MCP agent。
邮件通知 为待审批项发送通知。
定时任务 用 schedule 实现审批超时。