Webhooks
接收来自外部服务的 webhook 事件,并把它们路由到专用的 agent 实例。每个 webhook 来源(仓库、客户、设备)都可以拥有自己的 agent,具备独立的状态、持久化存储以及实时客户端连接。
快速开始
JavaScript
import { Agent, getAgentByName, routeAgentRequest } from "agents";
// Agent that handles webhooks for a specific entity
export class WebhookAgent extends Agent {
async onRequest(request) {
if (request.method !== "POST") {
return new Response("Method not allowed", { status: 405 });
}
// Verify the webhook signature
const signature = request.headers.get("X-Hub-Signature-256");
const body = await request.text();
if (
!(await this.verifySignature(body, signature, this.env.WEBHOOK_SECRET))
) {
return new Response("Invalid signature", { status: 401 });
}
// Process the webhook payload
const payload = JSON.parse(body);
await this.processEvent(payload);
return new Response("OK", { status: 200 });
}
async verifySignature(payload, signature, secret) {
if (!signature) return false;
const encoder = new TextEncoder();
const key = await crypto.subtle.importKey(
"raw",
encoder.encode(secret),
{ name: "HMAC", hash: "SHA-256" },
false,
["sign"],
);
const signatureBytes = await crypto.subtle.sign(
"HMAC",
key,
encoder.encode(payload),
);
const expected = `sha256=${Array.from(new Uint8Array(signatureBytes))
.map((b) => b.toString(16).padStart(2, "0"))
.join("")}`;
return signature === expected;
}
async processEvent(payload) {
// Store event, update state, trigger actions...
}
}
// Route webhooks to the right agent instance
export default {
async fetch(request, env) {
const url = new URL(request.url);
// Webhook endpoint: POST /webhooks/:entityId
if (url.pathname.startsWith("/webhooks/") && request.method === "POST") {
const entityId = url.pathname.split("/")[2];
const agent = await getAgentByName(env.WebhookAgent, entityId);
return agent.fetch(request);
}
// Default routing for WebSocket connections
return (
(await routeAgentRequest(request, env)) ||
new Response("Not found", { status: 404 })
);
},
};
TypeScript
import { Agent, getAgentByName, routeAgentRequest } from "agents";
// Agent that handles webhooks for a specific entity
export class WebhookAgent extends Agent {
async onRequest(request: Request): Promise<Response> {
if (request.method !== "POST") {
return new Response("Method not allowed", { status: 405 });
}
// Verify the webhook signature
const signature = request.headers.get("X-Hub-Signature-256");
const body = await request.text();
if (
!(await this.verifySignature(body, signature, this.env.WEBHOOK_SECRET))
) {
return new Response("Invalid signature", { status: 401 });
}
// Process the webhook payload
const payload = JSON.parse(body);
await this.processEvent(payload);
return new Response("OK", { status: 200 });
}
private async verifySignature(
payload: string,
signature: string | null,
secret: string,
): Promise<boolean> {
if (!signature) return false;
const encoder = new TextEncoder();
const key = await crypto.subtle.importKey(
"raw",
encoder.encode(secret),
{ name: "HMAC", hash: "SHA-256" },
false,
["sign"],
);
const signatureBytes = await crypto.subtle.sign(
"HMAC",
key,
encoder.encode(payload),
);
const expected = `sha256=${Array.from(new Uint8Array(signatureBytes))
.map((b) => b.toString(16).padStart(2, "0"))
.join("")}`;
return signature === expected;
}
private async processEvent(payload: unknown) {
// Store event, update state, trigger actions...
}
}
// Route webhooks to the right agent instance
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
// Webhook endpoint: POST /webhooks/:entityId
if (url.pathname.startsWith("/webhooks/") && request.method === "POST") {
const entityId = url.pathname.split("/")[2];
const agent = await getAgentByName(env.WebhookAgent, entityId);
return agent.fetch(request);
}
// Default routing for WebSocket connections
return (
(await routeAgentRequest(request, env)) ||
new Response("Not found", { status: 404 })
);
},
} satisfies ExportedHandler<Env>;
用例
Webhook 与 agent 结合,可以让每个外部实体拥有专属的、有状态的、隔离的 agent 实例。
开发者工具
| 用例 | 说明 |
|---|---|
| GitHub 仓库监控 | 每个仓库一个 agent,跟踪 commit、PR、issue 和 star |
| CI/CD 流水线 Agent | 响应构建/部署事件,失败时通知,跟踪部署历史 |
| Linear/Jira 跟踪器 | 自动分流 issue,根据内容分配,跟踪解决时间 |
电商与支付
| 用例 | 说明 |
|---|---|
| Stripe 客户 Agent | 每个客户一个 agent,跟踪付款、订阅与争议 |
| Shopify 订单 Agent | 从下单到履约的订单生命周期,带库存同步 |
| 支付对账 | 把 webhook 事件与内部记录匹配,标记差异 |
通信与通知
| 用例 | 说明 |
|---|---|
| Twilio SMS/Voice | 由入站消息或来电触发的对话式 agent |
| Slack 机器人 | 响应 slash 命令、按钮点击和交互式消息 |
| 邮件跟踪 | SendGrid/Mailgun 投递事件、退信处理、互动分析 |
物联网与基础设施
| 用例 | 说明 |
|---|---|
| 设备遥测 | 每台设备一个 agent,处理传感器数据流 |
| 告警聚合 | 收集来自 PagerDuty、Datadog 或自建监控的告警 |
| 家居自动化 | 响应 IFTTT/Zapier 触发器,带持久化状态 |
SaaS 集成
| 用例 | 说明 |
|---|---|
| CRM 同步 | Salesforce/HubSpot 联系人和商机变更 |
| 日历 Agent | Google 日历事件通知与日程安排 |
| 表单提交 | Typeform、Tally 或自定义表单 webhook,带后续动作 |
把 webhook 路由到 agent
关键模式是从 webhook 中提取实体标识,然后用 getAgentByName() 路由到专属 agent 实例。
从 payload 中提取实体
大多数 webhook 在 payload 中包含标识符:
JavaScript
export default {
async fetch(request, env) {
if (request.method === "POST" && url.pathname === "/webhooks/github") {
const payload = await request.clone().json();
// Extract entity ID from payload
const repoFullName = payload.repository?.full_name;
if (!repoFullName) {
return new Response("Missing repository", { status: 400 });
}
// Sanitize for use as agent name
const agentName = repoFullName.toLowerCase().replace(/\//g, "-");
// Route to dedicated agent
const agent = await getAgentByName(env.RepoAgent, agentName);
return agent.fetch(request);
}
},
};
TypeScript
export default {
async fetch(request: Request, env: Env): Promise<Response> {
if (request.method === "POST" && url.pathname === "/webhooks/github") {
const payload = await request.clone().json();
// Extract entity ID from payload
const repoFullName = payload.repository?.full_name;
if (!repoFullName) {
return new Response("Missing repository", { status: 400 });
}
// Sanitize for use as agent name
const agentName = repoFullName.toLowerCase().replace(/\//g, "-");
// Route to dedicated agent
const agent = await getAgentByName(env.RepoAgent, agentName);
return agent.fetch(request);
}
},
} satisfies ExportedHandler<Env>;
从 URL 中提取实体
你也可以把实体 ID 直接放进 webhook URL:
JavaScript
// Webhook URL: https://your-worker.dev/webhooks/stripe/cus_123456
if (url.pathname.startsWith("/webhooks/stripe/")) {
const customerId = url.pathname.split("/")[3]; // "cus_123456"
const agent = await getAgentByName(env.StripeAgent, customerId);
return agent.fetch(request);
}
TypeScript
// Webhook URL: https://your-worker.dev/webhooks/stripe/cus_123456
if (url.pathname.startsWith("/webhooks/stripe/")) {
const customerId = url.pathname.split("/")[3]; // "cus_123456"
const agent = await getAgentByName(env.StripeAgent, customerId);
return agent.fetch(request);
}
从请求头中提取实体
部分服务把标识符放在请求头里:
JavaScript
// Slack sends workspace info in headers
const teamId = request.headers.get("X-Slack-Team-Id");
if (teamId) {
const agent = await getAgentByName(env.SlackAgent, teamId);
return agent.fetch(request);
}
TypeScript
// Slack sends workspace info in headers
const teamId = request.headers.get("X-Slack-Team-Id");
if (teamId) {
const agent = await getAgentByName(env.SlackAgent, teamId);
return agent.fetch(request);
}
签名校验
务必校验 webhook 签名,确保请求是可信的。多数服务商使用 HMAC-SHA256。
HMAC-SHA256 模式
JavaScript
async function verifySignature(payload, signature, secret) {
if (!signature) return false;
const encoder = new TextEncoder();
const key = await crypto.subtle.importKey(
"raw",
encoder.encode(secret),
{ name: "HMAC", hash: "SHA-256" },
false,
["sign"],
);
const signatureBytes = await crypto.subtle.sign(
"HMAC",
key,
encoder.encode(payload),
);
const expected = `sha256=${Array.from(new Uint8Array(signatureBytes))
.map((b) => b.toString(16).padStart(2, "0"))
.join("")}`;
// Use timing-safe comparison in production
return signature === expected;
}
TypeScript
async function verifySignature(
payload: string,
signature: string | null,
secret: string,
): Promise<boolean> {
if (!signature) return false;
const encoder = new TextEncoder();
const key = await crypto.subtle.importKey(
"raw",
encoder.encode(secret),
{ name: "HMAC", hash: "SHA-256" },
false,
["sign"],
);
const signatureBytes = await crypto.subtle.sign(
"HMAC",
key,
encoder.encode(payload),
);
const expected = `sha256=${Array.from(new Uint8Array(signatureBytes))
.map((b) => b.toString(16).padStart(2, "0"))
.join("")}`;
// Use timing-safe comparison in production
return signature === expected;
}
各服务商的特定头部
| 服务商 | 签名头 | 算法 |
|---|---|---|
| GitHub | X-Hub-Signature-256 | HMAC-SHA256 |
| Stripe | Stripe-Signature | HMAC-SHA256(带时间戳) |
| Twilio | X-Twilio-Signature | HMAC-SHA1 |
| Slack | X-Slack-Signature | HMAC-SHA256(带时间戳) |
| Shopify | X-Shopify-Hmac-Sha256 | HMAC-SHA256(base64) |
处理 webhook
onRequest 处理函数
使用 onRequest() 处理进入 agent 的 webhook:
JavaScript
export class WebhookAgent extends Agent {
async onRequest(request) {
// 1. Validate method
if (request.method !== "POST") {
return new Response("Method not allowed", { status: 405 });
}
// 2. Get event type from headers
const eventType = request.headers.get("X-Event-Type");
// 3. Verify signature
const signature = request.headers.get("X-Signature");
const body = await request.text();
if (!(await this.verifySignature(body, signature))) {
return new Response("Invalid signature", { status: 401 });
}
// 4. Parse and process
const payload = JSON.parse(body);
await this.handleEvent(eventType, payload);
// 5. Respond quickly
return new Response("OK", { status: 200 });
}
async handleEvent(type, payload) {
// Update state (broadcasts to connected clients)
this.setState({
...this.state,
lastEventType: type,
lastEventTime: new Date().toISOString(),
});
// Store in SQL for history
this
.sql`INSERT INTO events (type, payload, timestamp) VALUES (${type}, ${JSON.stringify(payload)}, ${Date.now()})`;
}
}
TypeScript
export class WebhookAgent extends Agent {
async onRequest(request: Request): Promise<Response> {
// 1. Validate method
if (request.method !== "POST") {
return new Response("Method not allowed", { status: 405 });
}
// 2. Get event type from headers
const eventType = request.headers.get("X-Event-Type");
// 3. Verify signature
const signature = request.headers.get("X-Signature");
const body = await request.text();
if (!(await this.verifySignature(body, signature))) {
return new Response("Invalid signature", { status: 401 });
}
// 4. Parse and process
const payload = JSON.parse(body);
await this.handleEvent(eventType, payload);
// 5. Respond quickly
return new Response("OK", { status: 200 });
}
private async handleEvent(type: string, payload: unknown) {
// Update state (broadcasts to connected clients)
this.setState({
...this.state,
lastEventType: type,
lastEventTime: new Date().toISOString(),
});
// Store in SQL for history
this
.sql`INSERT INTO events (type, payload, timestamp) VALUES (${type}, ${JSON.stringify(payload)}, ${Date.now()})`;
}
}
存储 webhook 事件
使用 SQLite 持久化 webhook 事件,以便回放和查询历史。
事件表结构
JavaScript
class WebhookAgent extends Agent {
async onStart() {
this.sql`
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY,
type TEXT NOT NULL,
action TEXT,
title TEXT NOT NULL,
description TEXT,
url TEXT,
actor TEXT,
payload TEXT,
timestamp TEXT NOT NULL
)
`;
this.sql`
CREATE INDEX IF NOT EXISTS idx_events_timestamp
ON events(timestamp DESC)
`;
}
}
TypeScript
class WebhookAgent extends Agent {
async onStart(): Promise<void> {
this.sql`
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY,
type TEXT NOT NULL,
action TEXT,
title TEXT NOT NULL,
description TEXT,
url TEXT,
actor TEXT,
payload TEXT,
timestamp TEXT NOT NULL
)
`;
this.sql`
CREATE INDEX IF NOT EXISTS idx_events_timestamp
ON events(timestamp DESC)
`;
}
}
清理旧事件
仅保留近期事件,避免数据无限增长:
JavaScript
// Keep last 100 events
this.sql`
DELETE FROM events WHERE id NOT IN (
SELECT id FROM events ORDER BY timestamp DESC LIMIT 100
)
`;
// Or delete events older than 30 days
this.sql`
DELETE FROM events
WHERE timestamp < datetime('now', '-30 days')
`;
TypeScript
// Keep last 100 events
this.sql`
DELETE FROM events WHERE id NOT IN (
SELECT id FROM events ORDER BY timestamp DESC LIMIT 100
)
`;
// Or delete events older than 30 days
this.sql`
DELETE FROM events
WHERE timestamp < datetime('now', '-30 days')
`;
查询事件
JavaScript
import { Agent, callable } from "agents";
class WebhookAgent extends Agent {
@callable()
getEvents(limit = 20) {
return [
...this.sql`
SELECT * FROM events
ORDER BY timestamp DESC
LIMIT ${limit}
`,
];
}
@callable()
getEventsByType(type, limit = 20) {
return [
...this.sql`
SELECT * FROM events
WHERE type = ${type}
ORDER BY timestamp DESC
LIMIT ${limit}
`,
];
}
}
TypeScript
import { Agent, callable } from "agents";
class WebhookAgent extends Agent {
@callable()
getEvents(limit = 20) {
return [
...this.sql`
SELECT * FROM events
ORDER BY timestamp DESC
LIMIT ${limit}
`,
];
}
@callable()
getEventsByType(type: string, limit = 20) {
return [
...this.sql`
SELECT * FROM events
WHERE type = ${type}
ORDER BY timestamp DESC
LIMIT ${limit}
`,
];
}
}
实时广播
webhook 到达时,更新 agent 状态会自动广播给所有连接的 WebSocket 客户端。
JavaScript
class WebhookAgent extends Agent {
async processWebhook(eventType, payload) {
// Update state - this automatically broadcasts to all connected clients
this.setState({
...this.state,
stats: payload.stats,
lastEvent: {
type: eventType,
timestamp: new Date().toISOString(),
},
});
}
}
TypeScript
class WebhookAgent extends Agent {
private async processWebhook(eventType: string, payload: WebhookPayload) {
// Update state - this automatically broadcasts to all connected clients
this.setState({
...this.state,
stats: payload.stats,
lastEvent: {
type: eventType,
timestamp: new Date().toISOString(),
},
});
}
}
客户端代码:
import { useAgent } from "agents/react";
function Dashboard() {
const [state, setState] = useState(null);
const agent = useAgent({
agent: "webhook-agent",
name: "my-entity-id",
onStateUpdate: (newState) => {
setState(newState); // Automatically updates when webhooks arrive
},
});
return <div>Last event: {state?.lastEvent?.type}</div>;
}
模式
事件去重
使用事件 ID 防止重复处理同一事件:
JavaScript
class WebhookAgent extends Agent {
async handleEvent(eventId, payload) {
// Check if already processed
const existing = [
...this.sql`
SELECT id FROM events WHERE id = ${eventId}
`,
];
if (existing.length > 0) {
console.log(`Event ${eventId} already processed, skipping`);
return;
}
// Process and store
await this.processPayload(payload);
this.sql`INSERT INTO events (id, ...) VALUES (${eventId}, ...)`;
}
}
TypeScript
class WebhookAgent extends Agent {
async handleEvent(eventId: string, payload: unknown) {
// Check if already processed
const existing = [
...this.sql`
SELECT id FROM events WHERE id = ${eventId}
`,
];
if (existing.length > 0) {
console.log(`Event ${eventId} already processed, skipping`);
return;
}
// Process and store
await this.processPayload(payload);
this.sql`INSERT INTO events (id, ...) VALUES (${eventId}, ...)`;
}
}
快速响应,异步处理
Webhook 提供方期望快速响应。重量级处理请用队列:
JavaScript
class WebhookAgent extends Agent {
async onRequest(request) {
const payload = await request.json();
// Quick validation
if (!this.isValid(payload)) {
return new Response("Invalid", { status: 400 });
}
// Queue heavy processing
await this.queue("processWebhook", payload);
// Respond immediately
return new Response("Accepted", { status: 202 });
}
async processWebhook(payload) {
// Heavy processing happens here, after response sent
await this.enrichData(payload);
await this.notifyDownstream(payload);
await this.updateAnalytics(payload);
}
}
TypeScript
class WebhookAgent extends Agent {
async onRequest(request: Request): Promise<Response> {
const payload = await request.json();
// Quick validation
if (!this.isValid(payload)) {
return new Response("Invalid", { status: 400 });
}
// Queue heavy processing
await this.queue("processWebhook", payload);
// Respond immediately
return new Response("Accepted", { status: 202 });
}
async processWebhook(payload: WebhookPayload) {
// Heavy processing happens here, after response sent
await this.enrichData(payload);
await this.notifyDownstream(payload);
await this.updateAnalytics(payload);
}
}
多服务商路由
在一个 Worker 中处理来自多个服务的 webhook:
JavaScript
export default {
async fetch(request, env) {
const url = new URL(request.url);
if (request.method === "POST") {
// GitHub webhooks
if (url.pathname.startsWith("/webhooks/github/")) {
const payload = await request.clone().json();
const repoName = payload.repository?.full_name?.replace("/", "-");
const agent = await getAgentByName(env.GitHubAgent, repoName);
return agent.fetch(request);
}
// Stripe webhooks
if (url.pathname.startsWith("/webhooks/stripe/")) {
const payload = await request.clone().json();
const customerId = payload.data?.object?.customer;
const agent = await getAgentByName(env.StripeAgent, customerId);
return agent.fetch(request);
}
// Slack webhooks
if (url.pathname === "/webhooks/slack") {
const teamId = request.headers.get("X-Slack-Team-Id");
const agent = await getAgentByName(env.SlackAgent, teamId);
return agent.fetch(request);
}
}
return (
(await routeAgentRequest(request, env)) ??
new Response("Not found", { status: 404 })
);
},
};
TypeScript
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
if (request.method === "POST") {
// GitHub webhooks
if (url.pathname.startsWith("/webhooks/github/")) {
const payload = await request.clone().json();
const repoName = payload.repository?.full_name?.replace("/", "-");
const agent = await getAgentByName(env.GitHubAgent, repoName);
return agent.fetch(request);
}
// Stripe webhooks
if (url.pathname.startsWith("/webhooks/stripe/")) {
const payload = await request.clone().json();
const customerId = payload.data?.object?.customer;
const agent = await getAgentByName(env.StripeAgent, customerId);
return agent.fetch(request);
}
// Slack webhooks
if (url.pathname === "/webhooks/slack") {
const teamId = request.headers.get("X-Slack-Team-Id");
const agent = await getAgentByName(env.SlackAgent, teamId);
return agent.fetch(request);
}
}
return (
(await routeAgentRequest(request, env)) ??
new Response("Not found", { status: 404 })
);
},
} satisfies ExportedHandler<Env>;
发送出站 webhook
Agent 也可以向外部服务发送 webhook:
JavaScript
export class NotificationAgent extends Agent {
async notifySlack(message) {
const response = await fetch(this.env.SLACK_WEBHOOK_URL, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ text: message }),
});
if (!response.ok) {
throw new Error(`Slack notification failed: ${response.status}`);
}
}
async sendSignedWebhook(url, payload) {
const body = JSON.stringify(payload);
const signature = await this.sign(body, this.env.WEBHOOK_SECRET);
await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-Signature": signature,
},
body,
});
}
}
TypeScript
export class NotificationAgent extends Agent {
async notifySlack(message: string) {
const response = await fetch(this.env.SLACK_WEBHOOK_URL, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ text: message }),
});
if (!response.ok) {
throw new Error(`Slack notification failed: ${response.status}`);
}
}
async sendSignedWebhook(url: string, payload: unknown) {
const body = JSON.stringify(payload);
const signature = await this.sign(body, this.env.WEBHOOK_SECRET);
await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-Signature": signature,
},
body,
});
}
}
安全最佳实践
- 始终校验签名 — 永远不要信任未经校验的 webhook。
- 使用环境 secret — 用
wrangler secret put保存 secret,而不是写在代码里。 - 快速响应 — 在数秒内返回 200/202,避免重发。
- 校验 payload — 处理前检查必需字段。
- 记录拒绝事件 — 跟踪无效签名以便安全监控。
- 使用 HTTPS — webhook URL 务必使用 TLS。
JavaScript
// Store secrets securely
// wrangler secret put GITHUB_WEBHOOK_SECRET
// Access in agent
const secret = this.env.GITHUB_WEBHOOK_SECRET;
TypeScript
// Store secrets securely
// wrangler secret put GITHUB_WEBHOOK_SECRET
// Access in agent
const secret = this.env.GITHUB_WEBHOOK_SECRET;
常见 webhook 服务商
| 服务商 | 文档 |
|---|---|
| GitHub | Webhook events and payloads ↗ |
| Stripe | Webhook signatures ↗ |
| Twilio | Validate webhook requests ↗ |
| Slack | Verifying requests ↗ |
| Shopify | Webhook verification ↗ |
| SendGrid | Event webhook ↗ |
| Linear | Webhooks ↗ |
后续步骤
队列任务 后台任务处理。
邮件路由 在你的 agent 中处理入站邮件。
Agents API Agents SDK 的完整 API 参考。