推送通知
从你的 Agent 发送浏览器推送通知——即使用户已关闭页面。结合 Agent 的持久化状态(用于存储推送订阅)、调度(用于定时投递)和 Web Push API ↗,你可以触达完全离线的用户。
工作原理
Browser Agent (Durable Object)
─────── ──────────────────────
1. Register service worker
2. Subscribe to push (VAPID key)
3. Send subscription to agent ──────► Store in this.state
4. Create reminder ─────────────────► this.schedule(delay, "sendReminder", payload)
... user closes tab ...
5. Alarm fires → sendReminder()
web-push sends encrypted payload
│
6. Service worker receives push ◄─────────────┘
7. showNotification()
Explain Code
Agent 在其状态中持久化存储推送订阅,并通过 this.schedule() 在合适的时间触发通知。当 alarm 触发时,Agent 使用 web-push ↗ 库调用推送服务端点。浏览器的 service worker 接收 push 事件并展示原生通知。
前置条件
生成 VAPID 密钥
Web Push 需要一对 VAPID(Voluntary Application Server Identification)密钥。生成方式:
Terminal window
npx web-push generate-vapid-keys
将密钥存储在 .env 文件中用于本地开发:
VAPID_PUBLIC_KEY=BGxK...
VAPID_PRIVATE_KEY=abc1...
VAPID_SUBJECT=mailto:[email protected]
对于生产环境,使用 wrangler secret put:
Terminal window
wrangler secret put VAPID_PUBLIC_KEY
wrangler secret put VAPID_PRIVATE_KEY
wrangler secret put VAPID_SUBJECT
创建 Agent
Agent 有三项职责:存储推送订阅、调度提醒,以及在 alarm 触发时发送通知。
JavaScript
import { Agent, callable, routeAgentRequest } from "agents";
import webpush from "web-push";
export class ReminderAgent extends Agent {
initialState = {
subscriptions: [],
reminders: [],
};
@callable()
getVapidPublicKey() {
return this.env.VAPID_PUBLIC_KEY;
}
@callable()
async subscribe(subscription) {
const exists = this.state.subscriptions.some(
(s) => s.endpoint === subscription.endpoint,
);
if (!exists) {
this.setState({
...this.state,
subscriptions: [...this.state.subscriptions, subscription],
});
}
return { ok: true };
}
@callable()
async unsubscribe(endpoint) {
this.setState({
...this.state,
subscriptions: this.state.subscriptions.filter(
(s) => s.endpoint !== endpoint,
),
});
return { ok: true };
}
@callable()
async createReminder(message, delaySeconds) {
const id = crypto.randomUUID();
const scheduledAt = Date.now() + delaySeconds * 1000;
const reminder = { id, message, scheduledAt, sent: false };
this.setState({
...this.state,
reminders: [...this.state.reminders, reminder],
});
await this.schedule(delaySeconds, "sendReminder", { id, message });
return reminder;
}
async sendReminder(payload) {
webpush.setVapidDetails(
this.env.VAPID_SUBJECT,
this.env.VAPID_PUBLIC_KEY,
this.env.VAPID_PRIVATE_KEY,
);
const deadEndpoints = [];
await Promise.all(
this.state.subscriptions.map(async (sub) => {
try {
await webpush.sendNotification(
sub,
JSON.stringify({
title: "Reminder",
body: payload.message,
tag: `reminder-${payload.id}`,
}),
);
} catch (err) {
const statusCode =
err instanceof webpush.WebPushError ? err.statusCode : 0;
if (statusCode === 404 || statusCode === 410) {
deadEndpoints.push(sub.endpoint);
}
}
}),
);
if (deadEndpoints.length > 0) {
this.setState({
...this.state,
subscriptions: this.state.subscriptions.filter(
(s) => !deadEndpoints.includes(s.endpoint),
),
});
}
this.setState({
...this.state,
reminders: this.state.reminders.map((r) =>
r.id === payload.id ? { ...r, sent: true } : r,
),
});
this.broadcast(
JSON.stringify({
type: "reminder_sent",
id: payload.id,
timestamp: Date.now(),
}),
);
}
}
export default {
async fetch(request, env) {
return (
(await routeAgentRequest(request, env)) ??
new Response("Not found", { status: 404 })
);
},
};
Explain Code
TypeScript
import { Agent, callable, routeAgentRequest } from "agents";
import webpush from "web-push";
type Subscription = {
endpoint: string;
expirationTime: number | null;
keys: {
p256dh: string;
auth: string;
};
};
type Reminder = {
id: string;
message: string;
scheduledAt: number;
sent: boolean;
};
type ReminderAgentState = {
subscriptions: Subscription[];
reminders: Reminder[];
};
export class ReminderAgent extends Agent<Env, ReminderAgentState> {
initialState: ReminderAgentState = {
subscriptions: [],
reminders: [],
};
@callable()
getVapidPublicKey(): string {
return this.env.VAPID_PUBLIC_KEY;
}
@callable()
async subscribe(subscription: Subscription): Promise<{ ok: boolean }> {
const exists = this.state.subscriptions.some(
(s) => s.endpoint === subscription.endpoint,
);
if (!exists) {
this.setState({
...this.state,
subscriptions: [...this.state.subscriptions, subscription],
});
}
return { ok: true };
}
@callable()
async unsubscribe(endpoint: string): Promise<{ ok: boolean }> {
this.setState({
...this.state,
subscriptions: this.state.subscriptions.filter(
(s) => s.endpoint !== endpoint,
),
});
return { ok: true };
}
@callable()
async createReminder(
message: string,
delaySeconds: number,
): Promise<Reminder> {
const id = crypto.randomUUID();
const scheduledAt = Date.now() + delaySeconds * 1000;
const reminder: Reminder = { id, message, scheduledAt, sent: false };
this.setState({
...this.state,
reminders: [...this.state.reminders, reminder],
});
await this.schedule(delaySeconds, "sendReminder", { id, message });
return reminder;
}
async sendReminder(payload: { id: string; message: string }) {
webpush.setVapidDetails(
this.env.VAPID_SUBJECT,
this.env.VAPID_PUBLIC_KEY,
this.env.VAPID_PRIVATE_KEY,
);
const deadEndpoints: string[] = [];
await Promise.all(
this.state.subscriptions.map(async (sub) => {
try {
await webpush.sendNotification(
sub,
JSON.stringify({
title: "Reminder",
body: payload.message,
tag: `reminder-${payload.id}`,
}),
);
} catch (err: unknown) {
const statusCode =
err instanceof webpush.WebPushError ? err.statusCode : 0;
if (statusCode === 404 || statusCode === 410) {
deadEndpoints.push(sub.endpoint);
}
}
}),
);
if (deadEndpoints.length > 0) {
this.setState({
...this.state,
subscriptions: this.state.subscriptions.filter(
(s) => !deadEndpoints.includes(s.endpoint),
),
});
}
this.setState({
...this.state,
reminders: this.state.reminders.map((r) =>
r.id === payload.id ? { ...r, sent: true } : r,
),
});
this.broadcast(
JSON.stringify({
type: "reminder_sent",
id: payload.id,
timestamp: Date.now(),
}),
);
}
}
export default {
async fetch(request: Request, env: Env) {
return (
(await routeAgentRequest(request, env)) ??
new Response("Not found", { status: 404 })
);
},
} satisfies ExportedHandler<Env>;
Explain Code
sendReminder 回调处理三件事:通过 web-push 库投递推送通知;清理失效的订阅(当订阅不再有效时,推送服务返回 404 或 410);并将事件广播给当前已连接的客户端,以便 UI 实时更新。
设置 service worker
service worker 在浏览器中运行,即使没有打开任何页面也能接收 push 事件。将该文件放在 public/sw.js,使其从域名根路径提供:
JavaScript
self.addEventListener("push", (event) => {
if (!event.data) return;
const data = event.data.json();
event.waitUntil(
self.registration.showNotification(data.title || "Notification", {
body: data.body || "",
icon: data.icon || "/favicon.ico",
tag: data.tag,
data: data.data,
}),
);
});
self.addEventListener("notificationclick", (event) => {
event.notification.close();
event.waitUntil(
self.clients.matchAll({ type: "window" }).then((windowClients) => {
for (const client of windowClients) {
if (
client.url.includes(self.location.origin) &&
"focus" in client
) {
return client.focus();
}
}
return self.clients.openWindow("/");
}),
);
});
Explain Code
push 事件 handler 解析 JSON payload 并显示原生通知。notificationclick handler 在用户点击通知时聚焦已存在的页面或打开新页面。
构建客户端
客户端需要:注册 service worker、请求通知权限、用 VAPID 公钥订阅 push,并把订阅发送给 Agent。
注册 service worker
JavaScript
useEffect(() => {
if (!("serviceWorker" in navigator) || !("PushManager" in window)) {
return;
}
navigator.serviceWorker.register("/sw.js");
}, []);
TypeScript
useEffect(() => {
if (!("serviceWorker" in navigator) || !("PushManager" in window)) {
return;
}
navigator.serviceWorker.register("/sw.js");
}, []);
订阅 push
从 Agent 获取 VAPID 公钥,然后通过 Push API 订阅:
JavaScript
function base64urlToUint8Array(base64url) {
const padded = base64url + "=".repeat((4 - (base64url.length % 4)) % 4);
const binary = atob(padded.replace(/-/g, "+").replace(/_/g, "/"));
const bytes = new Uint8Array(binary.length);
for (let i = 0; i < binary.length; i++) bytes[i] = binary.charCodeAt(i);
return bytes;
}
async function subscribeToPush(agent) {
const permission = await Notification.requestPermission();
if (permission !== "granted") return;
const vapidPublicKey = await agent.call("getVapidPublicKey");
const reg = await navigator.serviceWorker.ready;
const subscription = await reg.pushManager.subscribe({
userVisibleOnly: true,
applicationServerKey: base64urlToUint8Array(vapidPublicKey).buffer,
});
const subJson = subscription.toJSON();
await agent.call("subscribe", [
{
endpoint: subJson.endpoint,
expirationTime: subJson.expirationTime ?? null,
keys: subJson.keys,
},
]);
}
Explain Code
TypeScript
function base64urlToUint8Array(base64url: string): Uint8Array {
const padded = base64url + "=".repeat((4 - (base64url.length % 4)) % 4);
const binary = atob(padded.replace(/-/g, "+").replace(/_/g, "/"));
const bytes = new Uint8Array(binary.length);
for (let i = 0; i < binary.length; i++) bytes[i] = binary.charCodeAt(i);
return bytes;
}
async function subscribeToPush(
agent: ReturnType<typeof useAgent>,
) {
const permission = await Notification.requestPermission();
if (permission !== "granted") return;
const vapidPublicKey = await agent.call("getVapidPublicKey");
const reg = await navigator.serviceWorker.ready;
const subscription = await reg.pushManager.subscribe({
userVisibleOnly: true,
applicationServerKey: base64urlToUint8Array(vapidPublicKey).buffer,
});
const subJson = subscription.toJSON();
await agent.call("subscribe", [
{
endpoint: subJson.endpoint,
expirationTime: subJson.expirationTime ?? null,
keys: subJson.keys,
},
]);
}
Explain Code
创建提醒
订阅存储后,创建提醒只需一次 RPC 调用。Agent 处理调度与投递:
JavaScript
await agent.call("createReminder", ["Check the oven", 300]);
TypeScript
await agent.call("createReminder", ["Check the oven", 300]);
Agent 调度一个 300 秒(5 分钟)的 alarm。当它触发时,推送通知到达——即使用户在数分钟前已经关闭了页面。
配置
wrangler.jsonc
JSONC
{
"name": "push-notifications",
"compatibility_date": "2026-01-28",
"compatibility_flags": ["nodejs_compat"],
"main": "src/server.ts",
"durable_objects": {
"bindings": [
{ "name": "ReminderAgent", "class_name": "ReminderAgent" },
],
},
"migrations": [{ "tag": "v1", "new_sqlite_classes": ["ReminderAgent"] }],
"assets": {
"not_found_handling": "single-page-application",
},
}
Explain Code
web-push 库需要 nodejs_compat 兼容性标志。
依赖
Terminal window
npm install agents web-push
生产环境注意事项
订阅过期
Push 订阅可能过期或被用户撤销。务必通过移除状态中失效的订阅来处理推送服务返回的 404 和 410 响应,如上面的 sendReminder 示例所示。
每用户 vs 共享 Agent
对大多数应用,推荐每个用户使用一个 Agent(以用户 ID 作为 Agent 名)。这样可以隔离每个用户的订阅与提醒。对于广播式通知(同一条消息发给多人),共享 Agent 可以存储所有订阅,但要注意随着订阅列表增长带来的状态体积问题。
把 push 与 WebSocket 广播结合
对当前已连接的客户端使用 this.broadcast()(即时,无需经过推送服务往返),对离线客户端使用 Web Push。上面的 sendReminder 示例两者都做了——已连接的客户端通过实时 WebSocket 消息接收,离线客户端通过推送通知接收。
多设备
单个用户可能从多个浏览器或设备订阅。Agent 分别存储每个订阅,sendReminder 会遍历所有订阅。每台设备各自收到推送通知。
失败重试
如果推送服务返回 5xx 错误(临时失败),你可以使用 this.schedule() 进行短延迟后的重试:
JavaScript
try {
await webpush.sendNotification(sub, payload);
} catch (err) {
const statusCode = err instanceof webpush.WebPushError ? err.statusCode : 0;
if (statusCode >= 500) {
await this.schedule(60, "retrySendNotification", {
endpoint: sub.endpoint,
payload,
});
}
}
Explain Code
TypeScript
try {
await webpush.sendNotification(sub, payload);
} catch (err: unknown) {
const statusCode =
err instanceof webpush.WebPushError ? err.statusCode : 0;
if (statusCode >= 500) {
await this.schedule(60, "retrySendNotification", {
endpoint: sub.endpoint,
payload,
});
}
}
Explain Code
后续步骤
调度任务 了解长时间运行操作的调度与 keepAlive。
存储与同步状态 管理 Agent 状态以存储订阅。
可调用方法 把 Agent 方法暴露为 RPC 端点。