Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Webhooks

接收来自外部服务的 webhook 事件,并把它们路由到专用的 agent 实例。每个 webhook 来源(仓库、客户、设备)都可以拥有自己的 agent,具备独立的状态、持久化存储以及实时客户端连接。

快速开始

JavaScript


import { Agent, getAgentByName, routeAgentRequest } from "agents";


// Agent that handles webhooks for a specific entity

export class WebhookAgent extends Agent {

  async onRequest(request) {

    if (request.method !== "POST") {

      return new Response("Method not allowed", { status: 405 });

    }


    // Verify the webhook signature

    const signature = request.headers.get("X-Hub-Signature-256");

    const body = await request.text();


    if (

      !(await this.verifySignature(body, signature, this.env.WEBHOOK_SECRET))

    ) {

      return new Response("Invalid signature", { status: 401 });

    }


    // Process the webhook payload

    const payload = JSON.parse(body);

    await this.processEvent(payload);


    return new Response("OK", { status: 200 });

  }


  async verifySignature(payload, signature, secret) {

    if (!signature) return false;


    const encoder = new TextEncoder();

    const key = await crypto.subtle.importKey(

      "raw",

      encoder.encode(secret),

      { name: "HMAC", hash: "SHA-256" },

      false,

      ["sign"],

    );


    const signatureBytes = await crypto.subtle.sign(

      "HMAC",

      key,

      encoder.encode(payload),

    );

    const expected = `sha256=${Array.from(new Uint8Array(signatureBytes))

      .map((b) => b.toString(16).padStart(2, "0"))

      .join("")}`;


    return signature === expected;

  }


  async processEvent(payload) {

    // Store event, update state, trigger actions...

  }

}


// Route webhooks to the right agent instance

export default {

  async fetch(request, env) {

    const url = new URL(request.url);


    // Webhook endpoint: POST /webhooks/:entityId

    if (url.pathname.startsWith("/webhooks/") && request.method === "POST") {

      const entityId = url.pathname.split("/")[2];

      const agent = await getAgentByName(env.WebhookAgent, entityId);

      return agent.fetch(request);

    }


    // Default routing for WebSocket connections

    return (

      (await routeAgentRequest(request, env)) ||

      new Response("Not found", { status: 404 })

    );

  },

};


TypeScript


import { Agent, getAgentByName, routeAgentRequest } from "agents";


// Agent that handles webhooks for a specific entity

export class WebhookAgent extends Agent {

  async onRequest(request: Request): Promise<Response> {

    if (request.method !== "POST") {

      return new Response("Method not allowed", { status: 405 });

    }


    // Verify the webhook signature

    const signature = request.headers.get("X-Hub-Signature-256");

    const body = await request.text();


    if (

      !(await this.verifySignature(body, signature, this.env.WEBHOOK_SECRET))

    ) {

      return new Response("Invalid signature", { status: 401 });

    }


    // Process the webhook payload

    const payload = JSON.parse(body);

    await this.processEvent(payload);


    return new Response("OK", { status: 200 });

  }


  private async verifySignature(

    payload: string,

    signature: string | null,

    secret: string,

  ): Promise<boolean> {

    if (!signature) return false;


    const encoder = new TextEncoder();

    const key = await crypto.subtle.importKey(

      "raw",

      encoder.encode(secret),

      { name: "HMAC", hash: "SHA-256" },

      false,

      ["sign"],

    );


    const signatureBytes = await crypto.subtle.sign(

      "HMAC",

      key,

      encoder.encode(payload),

    );

    const expected = `sha256=${Array.from(new Uint8Array(signatureBytes))

      .map((b) => b.toString(16).padStart(2, "0"))

      .join("")}`;


    return signature === expected;

  }


  private async processEvent(payload: unknown) {

    // Store event, update state, trigger actions...

  }

}


// Route webhooks to the right agent instance

export default {

  async fetch(request: Request, env: Env): Promise<Response> {

    const url = new URL(request.url);


    // Webhook endpoint: POST /webhooks/:entityId

    if (url.pathname.startsWith("/webhooks/") && request.method === "POST") {

      const entityId = url.pathname.split("/")[2];

      const agent = await getAgentByName(env.WebhookAgent, entityId);

      return agent.fetch(request);

    }


    // Default routing for WebSocket connections

    return (

      (await routeAgentRequest(request, env)) ||

      new Response("Not found", { status: 404 })

    );

  },

} satisfies ExportedHandler<Env>;


用例

Webhook 与 agent 结合,可以让每个外部实体拥有专属的、有状态的、隔离的 agent 实例。

开发者工具

用例说明
GitHub 仓库监控每个仓库一个 agent,跟踪 commit、PR、issue 和 star
CI/CD 流水线 Agent响应构建/部署事件,失败时通知,跟踪部署历史
Linear/Jira 跟踪器自动分流 issue,根据内容分配,跟踪解决时间

电商与支付

用例说明
Stripe 客户 Agent每个客户一个 agent,跟踪付款、订阅与争议
Shopify 订单 Agent从下单到履约的订单生命周期,带库存同步
支付对账把 webhook 事件与内部记录匹配,标记差异

通信与通知

用例说明
Twilio SMS/Voice由入站消息或来电触发的对话式 agent
Slack 机器人响应 slash 命令、按钮点击和交互式消息
邮件跟踪SendGrid/Mailgun 投递事件、退信处理、互动分析

物联网与基础设施

用例说明
设备遥测每台设备一个 agent,处理传感器数据流
告警聚合收集来自 PagerDuty、Datadog 或自建监控的告警
家居自动化响应 IFTTT/Zapier 触发器,带持久化状态

SaaS 集成

用例说明
CRM 同步Salesforce/HubSpot 联系人和商机变更
日历 AgentGoogle 日历事件通知与日程安排
表单提交Typeform、Tally 或自定义表单 webhook,带后续动作

把 webhook 路由到 agent

关键模式是从 webhook 中提取实体标识,然后用 getAgentByName() 路由到专属 agent 实例。

从 payload 中提取实体

大多数 webhook 在 payload 中包含标识符:

JavaScript


export default {

  async fetch(request, env) {

    if (request.method === "POST" && url.pathname === "/webhooks/github") {

      const payload = await request.clone().json();


      // Extract entity ID from payload

      const repoFullName = payload.repository?.full_name;

      if (!repoFullName) {

        return new Response("Missing repository", { status: 400 });

      }


      // Sanitize for use as agent name

      const agentName = repoFullName.toLowerCase().replace(/\//g, "-");


      // Route to dedicated agent

      const agent = await getAgentByName(env.RepoAgent, agentName);

      return agent.fetch(request);

    }

  },

};


TypeScript


export default {

  async fetch(request: Request, env: Env): Promise<Response> {

    if (request.method === "POST" && url.pathname === "/webhooks/github") {

      const payload = await request.clone().json();


      // Extract entity ID from payload

      const repoFullName = payload.repository?.full_name;

      if (!repoFullName) {

        return new Response("Missing repository", { status: 400 });

      }


      // Sanitize for use as agent name

      const agentName = repoFullName.toLowerCase().replace(/\//g, "-");


      // Route to dedicated agent

      const agent = await getAgentByName(env.RepoAgent, agentName);

      return agent.fetch(request);

    }

  },

} satisfies ExportedHandler<Env>;


从 URL 中提取实体

你也可以把实体 ID 直接放进 webhook URL:

JavaScript


// Webhook URL: https://your-worker.dev/webhooks/stripe/cus_123456

if (url.pathname.startsWith("/webhooks/stripe/")) {

  const customerId = url.pathname.split("/")[3]; // "cus_123456"

  const agent = await getAgentByName(env.StripeAgent, customerId);

  return agent.fetch(request);

}


TypeScript


// Webhook URL: https://your-worker.dev/webhooks/stripe/cus_123456

if (url.pathname.startsWith("/webhooks/stripe/")) {

  const customerId = url.pathname.split("/")[3]; // "cus_123456"

  const agent = await getAgentByName(env.StripeAgent, customerId);

  return agent.fetch(request);

}


从请求头中提取实体

部分服务把标识符放在请求头里:

JavaScript


// Slack sends workspace info in headers

const teamId = request.headers.get("X-Slack-Team-Id");

if (teamId) {

  const agent = await getAgentByName(env.SlackAgent, teamId);

  return agent.fetch(request);

}


TypeScript


// Slack sends workspace info in headers

const teamId = request.headers.get("X-Slack-Team-Id");

if (teamId) {

  const agent = await getAgentByName(env.SlackAgent, teamId);

  return agent.fetch(request);

}


签名校验

务必校验 webhook 签名,确保请求是可信的。多数服务商使用 HMAC-SHA256。

HMAC-SHA256 模式

JavaScript


async function verifySignature(payload, signature, secret) {

  if (!signature) return false;


  const encoder = new TextEncoder();

  const key = await crypto.subtle.importKey(

    "raw",

    encoder.encode(secret),

    { name: "HMAC", hash: "SHA-256" },

    false,

    ["sign"],

  );


  const signatureBytes = await crypto.subtle.sign(

    "HMAC",

    key,

    encoder.encode(payload),

  );


  const expected = `sha256=${Array.from(new Uint8Array(signatureBytes))

    .map((b) => b.toString(16).padStart(2, "0"))

    .join("")}`;


  // Use timing-safe comparison in production

  return signature === expected;

}


TypeScript


async function verifySignature(

  payload: string,

  signature: string | null,

  secret: string,

): Promise<boolean> {

  if (!signature) return false;


  const encoder = new TextEncoder();

  const key = await crypto.subtle.importKey(

    "raw",

    encoder.encode(secret),

    { name: "HMAC", hash: "SHA-256" },

    false,

    ["sign"],

  );


  const signatureBytes = await crypto.subtle.sign(

    "HMAC",

    key,

    encoder.encode(payload),

  );


  const expected = `sha256=${Array.from(new Uint8Array(signatureBytes))

    .map((b) => b.toString(16).padStart(2, "0"))

    .join("")}`;


  // Use timing-safe comparison in production

  return signature === expected;

}


各服务商的特定头部

服务商签名头算法
GitHubX-Hub-Signature-256HMAC-SHA256
StripeStripe-SignatureHMAC-SHA256(带时间戳)
TwilioX-Twilio-SignatureHMAC-SHA1
SlackX-Slack-SignatureHMAC-SHA256(带时间戳)
ShopifyX-Shopify-Hmac-Sha256HMAC-SHA256(base64)

处理 webhook

onRequest 处理函数

使用 onRequest() 处理进入 agent 的 webhook:

JavaScript


export class WebhookAgent extends Agent {

  async onRequest(request) {

    // 1. Validate method

    if (request.method !== "POST") {

      return new Response("Method not allowed", { status: 405 });

    }


    // 2. Get event type from headers

    const eventType = request.headers.get("X-Event-Type");


    // 3. Verify signature

    const signature = request.headers.get("X-Signature");

    const body = await request.text();


    if (!(await this.verifySignature(body, signature))) {

      return new Response("Invalid signature", { status: 401 });

    }


    // 4. Parse and process

    const payload = JSON.parse(body);

    await this.handleEvent(eventType, payload);


    // 5. Respond quickly

    return new Response("OK", { status: 200 });

  }


  async handleEvent(type, payload) {

    // Update state (broadcasts to connected clients)

    this.setState({

      ...this.state,

      lastEventType: type,

      lastEventTime: new Date().toISOString(),

    });


    // Store in SQL for history

    this

      .sql`INSERT INTO events (type, payload, timestamp) VALUES (${type}, ${JSON.stringify(payload)}, ${Date.now()})`;

  }

}


TypeScript


export class WebhookAgent extends Agent {

  async onRequest(request: Request): Promise<Response> {

    // 1. Validate method

    if (request.method !== "POST") {

      return new Response("Method not allowed", { status: 405 });

    }


    // 2. Get event type from headers

    const eventType = request.headers.get("X-Event-Type");


    // 3. Verify signature

    const signature = request.headers.get("X-Signature");

    const body = await request.text();


    if (!(await this.verifySignature(body, signature))) {

      return new Response("Invalid signature", { status: 401 });

    }


    // 4. Parse and process

    const payload = JSON.parse(body);

    await this.handleEvent(eventType, payload);


    // 5. Respond quickly

    return new Response("OK", { status: 200 });

  }


  private async handleEvent(type: string, payload: unknown) {

    // Update state (broadcasts to connected clients)

    this.setState({

      ...this.state,

      lastEventType: type,

      lastEventTime: new Date().toISOString(),

    });


    // Store in SQL for history

    this

      .sql`INSERT INTO events (type, payload, timestamp) VALUES (${type}, ${JSON.stringify(payload)}, ${Date.now()})`;

  }

}


存储 webhook 事件

使用 SQLite 持久化 webhook 事件,以便回放和查询历史。

事件表结构

JavaScript


class WebhookAgent extends Agent {

  async onStart() {

    this.sql`

      CREATE TABLE IF NOT EXISTS events (

        id TEXT PRIMARY KEY,

        type TEXT NOT NULL,

        action TEXT,

        title TEXT NOT NULL,

        description TEXT,

        url TEXT,

        actor TEXT,

        payload TEXT,

        timestamp TEXT NOT NULL

      )

    `;


    this.sql`

      CREATE INDEX IF NOT EXISTS idx_events_timestamp

      ON events(timestamp DESC)

    `;

  }

}


TypeScript


class WebhookAgent extends Agent {

  async onStart(): Promise<void> {

    this.sql`

      CREATE TABLE IF NOT EXISTS events (

        id TEXT PRIMARY KEY,

        type TEXT NOT NULL,

        action TEXT,

        title TEXT NOT NULL,

        description TEXT,

        url TEXT,

        actor TEXT,

        payload TEXT,

        timestamp TEXT NOT NULL

      )

    `;


    this.sql`

      CREATE INDEX IF NOT EXISTS idx_events_timestamp

      ON events(timestamp DESC)

    `;

  }

}


清理旧事件

仅保留近期事件,避免数据无限增长:

JavaScript


// Keep last 100 events

this.sql`

  DELETE FROM events WHERE id NOT IN (

    SELECT id FROM events ORDER BY timestamp DESC LIMIT 100

  )

`;


// Or delete events older than 30 days

this.sql`

  DELETE FROM events

  WHERE timestamp < datetime('now', '-30 days')

`;


TypeScript


// Keep last 100 events

this.sql`

  DELETE FROM events WHERE id NOT IN (

    SELECT id FROM events ORDER BY timestamp DESC LIMIT 100

  )

`;


// Or delete events older than 30 days

this.sql`

  DELETE FROM events

  WHERE timestamp < datetime('now', '-30 days')

`;


查询事件

JavaScript


import { Agent, callable } from "agents";


class WebhookAgent extends Agent {

  @callable()

  getEvents(limit = 20) {

    return [

      ...this.sql`

      SELECT * FROM events

      ORDER BY timestamp DESC

      LIMIT ${limit}

    `,

    ];

  }


  @callable()

  getEventsByType(type, limit = 20) {

    return [

      ...this.sql`

      SELECT * FROM events

      WHERE type = ${type}

      ORDER BY timestamp DESC

      LIMIT ${limit}

    `,

    ];

  }

}


TypeScript


import { Agent, callable } from "agents";


class WebhookAgent extends Agent {

  @callable()

  getEvents(limit = 20) {

    return [

      ...this.sql`

      SELECT * FROM events

      ORDER BY timestamp DESC

      LIMIT ${limit}

    `,

    ];

  }


  @callable()

  getEventsByType(type: string, limit = 20) {

    return [

      ...this.sql`

      SELECT * FROM events

      WHERE type = ${type}

      ORDER BY timestamp DESC

      LIMIT ${limit}

    `,

    ];

  }

}


实时广播

webhook 到达时,更新 agent 状态会自动广播给所有连接的 WebSocket 客户端。

JavaScript


class WebhookAgent extends Agent {

  async processWebhook(eventType, payload) {

    // Update state - this automatically broadcasts to all connected clients

    this.setState({

      ...this.state,

      stats: payload.stats,

      lastEvent: {

        type: eventType,

        timestamp: new Date().toISOString(),

      },

    });

  }

}


TypeScript


class WebhookAgent extends Agent {

  private async processWebhook(eventType: string, payload: WebhookPayload) {

    // Update state - this automatically broadcasts to all connected clients

    this.setState({

      ...this.state,

      stats: payload.stats,

      lastEvent: {

        type: eventType,

        timestamp: new Date().toISOString(),

      },

    });

  }

}


客户端代码:


import { useAgent } from "agents/react";


function Dashboard() {

  const [state, setState] = useState(null);


  const agent = useAgent({

    agent: "webhook-agent",

    name: "my-entity-id",

    onStateUpdate: (newState) => {

      setState(newState); // Automatically updates when webhooks arrive

    },

  });


  return <div>Last event: {state?.lastEvent?.type}</div>;

}


模式

事件去重

使用事件 ID 防止重复处理同一事件:

JavaScript


class WebhookAgent extends Agent {

  async handleEvent(eventId, payload) {

    // Check if already processed

    const existing = [

      ...this.sql`

      SELECT id FROM events WHERE id = ${eventId}

    `,

    ];


    if (existing.length > 0) {

      console.log(`Event ${eventId} already processed, skipping`);

      return;

    }


    // Process and store

    await this.processPayload(payload);

    this.sql`INSERT INTO events (id, ...) VALUES (${eventId}, ...)`;

  }

}


TypeScript


class WebhookAgent extends Agent {

  async handleEvent(eventId: string, payload: unknown) {

    // Check if already processed

    const existing = [

      ...this.sql`

      SELECT id FROM events WHERE id = ${eventId}

    `,

    ];


    if (existing.length > 0) {

      console.log(`Event ${eventId} already processed, skipping`);

      return;

    }


    // Process and store

    await this.processPayload(payload);

    this.sql`INSERT INTO events (id, ...) VALUES (${eventId}, ...)`;

  }

}


快速响应,异步处理

Webhook 提供方期望快速响应。重量级处理请用队列:

JavaScript


class WebhookAgent extends Agent {

  async onRequest(request) {

    const payload = await request.json();


    // Quick validation

    if (!this.isValid(payload)) {

      return new Response("Invalid", { status: 400 });

    }


    // Queue heavy processing

    await this.queue("processWebhook", payload);


    // Respond immediately

    return new Response("Accepted", { status: 202 });

  }


  async processWebhook(payload) {

    // Heavy processing happens here, after response sent

    await this.enrichData(payload);

    await this.notifyDownstream(payload);

    await this.updateAnalytics(payload);

  }

}


TypeScript


class WebhookAgent extends Agent {

  async onRequest(request: Request): Promise<Response> {

    const payload = await request.json();


    // Quick validation

    if (!this.isValid(payload)) {

      return new Response("Invalid", { status: 400 });

    }


    // Queue heavy processing

    await this.queue("processWebhook", payload);


    // Respond immediately

    return new Response("Accepted", { status: 202 });

  }


  async processWebhook(payload: WebhookPayload) {

    // Heavy processing happens here, after response sent

    await this.enrichData(payload);

    await this.notifyDownstream(payload);

    await this.updateAnalytics(payload);

  }

}


多服务商路由

在一个 Worker 中处理来自多个服务的 webhook:

JavaScript


export default {

  async fetch(request, env) {

    const url = new URL(request.url);


    if (request.method === "POST") {

      // GitHub webhooks

      if (url.pathname.startsWith("/webhooks/github/")) {

        const payload = await request.clone().json();

        const repoName = payload.repository?.full_name?.replace("/", "-");

        const agent = await getAgentByName(env.GitHubAgent, repoName);

        return agent.fetch(request);

      }


      // Stripe webhooks

      if (url.pathname.startsWith("/webhooks/stripe/")) {

        const payload = await request.clone().json();

        const customerId = payload.data?.object?.customer;

        const agent = await getAgentByName(env.StripeAgent, customerId);

        return agent.fetch(request);

      }


      // Slack webhooks

      if (url.pathname === "/webhooks/slack") {

        const teamId = request.headers.get("X-Slack-Team-Id");

        const agent = await getAgentByName(env.SlackAgent, teamId);

        return agent.fetch(request);

      }

    }


    return (

      (await routeAgentRequest(request, env)) ??

      new Response("Not found", { status: 404 })

    );

  },

};


TypeScript


export default {

  async fetch(request: Request, env: Env): Promise<Response> {

    const url = new URL(request.url);


    if (request.method === "POST") {

      // GitHub webhooks

      if (url.pathname.startsWith("/webhooks/github/")) {

        const payload = await request.clone().json();

        const repoName = payload.repository?.full_name?.replace("/", "-");

        const agent = await getAgentByName(env.GitHubAgent, repoName);

        return agent.fetch(request);

      }


      // Stripe webhooks

      if (url.pathname.startsWith("/webhooks/stripe/")) {

        const payload = await request.clone().json();

        const customerId = payload.data?.object?.customer;

        const agent = await getAgentByName(env.StripeAgent, customerId);

        return agent.fetch(request);

      }


      // Slack webhooks

      if (url.pathname === "/webhooks/slack") {

        const teamId = request.headers.get("X-Slack-Team-Id");

        const agent = await getAgentByName(env.SlackAgent, teamId);

        return agent.fetch(request);

      }

    }


    return (

      (await routeAgentRequest(request, env)) ??

      new Response("Not found", { status: 404 })

    );

  },

} satisfies ExportedHandler<Env>;


发送出站 webhook

Agent 也可以向外部服务发送 webhook:

JavaScript


export class NotificationAgent extends Agent {

  async notifySlack(message) {

    const response = await fetch(this.env.SLACK_WEBHOOK_URL, {

      method: "POST",

      headers: { "Content-Type": "application/json" },

      body: JSON.stringify({ text: message }),

    });


    if (!response.ok) {

      throw new Error(`Slack notification failed: ${response.status}`);

    }

  }


  async sendSignedWebhook(url, payload) {

    const body = JSON.stringify(payload);

    const signature = await this.sign(body, this.env.WEBHOOK_SECRET);


    await fetch(url, {

      method: "POST",

      headers: {

        "Content-Type": "application/json",

        "X-Signature": signature,

      },

      body,

    });

  }

}


TypeScript


export class NotificationAgent extends Agent {

  async notifySlack(message: string) {

    const response = await fetch(this.env.SLACK_WEBHOOK_URL, {

      method: "POST",

      headers: { "Content-Type": "application/json" },

      body: JSON.stringify({ text: message }),

    });


    if (!response.ok) {

      throw new Error(`Slack notification failed: ${response.status}`);

    }

  }


  async sendSignedWebhook(url: string, payload: unknown) {

    const body = JSON.stringify(payload);

    const signature = await this.sign(body, this.env.WEBHOOK_SECRET);


    await fetch(url, {

      method: "POST",

      headers: {

        "Content-Type": "application/json",

        "X-Signature": signature,

      },

      body,

    });

  }

}


安全最佳实践

  1. 始终校验签名 — 永远不要信任未经校验的 webhook。
  2. 使用环境 secret — 用 wrangler secret put 保存 secret,而不是写在代码里。
  3. 快速响应 — 在数秒内返回 200/202,避免重发。
  4. 校验 payload — 处理前检查必需字段。
  5. 记录拒绝事件 — 跟踪无效签名以便安全监控。
  6. 使用 HTTPS — webhook URL 务必使用 TLS。

JavaScript


// Store secrets securely

// wrangler secret put GITHUB_WEBHOOK_SECRET


// Access in agent

const secret = this.env.GITHUB_WEBHOOK_SECRET;


TypeScript


// Store secrets securely

// wrangler secret put GITHUB_WEBHOOK_SECRET


// Access in agent

const secret = this.env.GITHUB_WEBHOOK_SECRET;


常见 webhook 服务商

后续步骤

队列任务 后台任务处理。

邮件路由 在你的 agent 中处理入站邮件。

Agents API Agents SDK 的完整 API 参考。