Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

推送通知

从你的 Agent 发送浏览器推送通知——即使用户已关闭页面。结合 Agent 的持久化状态(用于存储推送订阅)、调度(用于定时投递)和 Web Push API ↗,你可以触达完全离线的用户。

工作原理


Browser                              Agent (Durable Object)

───────                              ──────────────────────

1. Register service worker

2. Subscribe to push (VAPID key)

3. Send subscription to agent ──────► Store in this.state

4. Create reminder ─────────────────► this.schedule(delay, "sendReminder", payload)


   ... user closes tab ...


5.                                    Alarm fires → sendReminder()

                                      web-push sends encrypted payload

                                              │

6. Service worker receives push ◄─────────────┘

7. showNotification()


Explain Code

Agent 在其状态中持久化存储推送订阅,并通过 this.schedule() 在合适的时间触发通知。当 alarm 触发时,Agent 使用 web-push ↗ 库调用推送服务端点。浏览器的 service worker 接收 push 事件并展示原生通知。

前置条件

生成 VAPID 密钥

Web Push 需要一对 VAPID(Voluntary Application Server Identification)密钥。生成方式:

Terminal window


npx web-push generate-vapid-keys


将密钥存储在 .env 文件中用于本地开发:


VAPID_PUBLIC_KEY=BGxK...

VAPID_PRIVATE_KEY=abc1...

VAPID_SUBJECT=mailto:[email protected]


对于生产环境,使用 wrangler secret put:

Terminal window


wrangler secret put VAPID_PUBLIC_KEY

wrangler secret put VAPID_PRIVATE_KEY

wrangler secret put VAPID_SUBJECT


创建 Agent

Agent 有三项职责:存储推送订阅、调度提醒,以及在 alarm 触发时发送通知。

JavaScript


import { Agent, callable, routeAgentRequest } from "agents";

import webpush from "web-push";


export class ReminderAgent extends Agent {

  initialState = {

    subscriptions: [],

    reminders: [],

  };


  @callable()

  getVapidPublicKey() {

    return this.env.VAPID_PUBLIC_KEY;

  }


  @callable()

  async subscribe(subscription) {

    const exists = this.state.subscriptions.some(

      (s) => s.endpoint === subscription.endpoint,

    );

    if (!exists) {

      this.setState({

        ...this.state,

        subscriptions: [...this.state.subscriptions, subscription],

      });

    }

    return { ok: true };

  }


  @callable()

  async unsubscribe(endpoint) {

    this.setState({

      ...this.state,

      subscriptions: this.state.subscriptions.filter(

        (s) => s.endpoint !== endpoint,

      ),

    });

    return { ok: true };

  }


  @callable()

  async createReminder(message, delaySeconds) {

    const id = crypto.randomUUID();

    const scheduledAt = Date.now() + delaySeconds * 1000;

    const reminder = { id, message, scheduledAt, sent: false };


    this.setState({

      ...this.state,

      reminders: [...this.state.reminders, reminder],

    });


    await this.schedule(delaySeconds, "sendReminder", { id, message });

    return reminder;

  }


  async sendReminder(payload) {

    webpush.setVapidDetails(

      this.env.VAPID_SUBJECT,

      this.env.VAPID_PUBLIC_KEY,

      this.env.VAPID_PRIVATE_KEY,

    );


    const deadEndpoints = [];


    await Promise.all(

      this.state.subscriptions.map(async (sub) => {

        try {

          await webpush.sendNotification(

            sub,

            JSON.stringify({

              title: "Reminder",

              body: payload.message,

              tag: `reminder-${payload.id}`,

            }),

          );

        } catch (err) {

          const statusCode =

            err instanceof webpush.WebPushError ? err.statusCode : 0;

          if (statusCode === 404 || statusCode === 410) {

            deadEndpoints.push(sub.endpoint);

          }

        }

      }),

    );


    if (deadEndpoints.length > 0) {

      this.setState({

        ...this.state,

        subscriptions: this.state.subscriptions.filter(

          (s) => !deadEndpoints.includes(s.endpoint),

        ),

      });

    }


    this.setState({

      ...this.state,

      reminders: this.state.reminders.map((r) =>

        r.id === payload.id ? { ...r, sent: true } : r,

      ),

    });


    this.broadcast(

      JSON.stringify({

        type: "reminder_sent",

        id: payload.id,

        timestamp: Date.now(),

      }),

    );

  }

}


export default {

  async fetch(request, env) {

    return (

      (await routeAgentRequest(request, env)) ??

      new Response("Not found", { status: 404 })

    );

  },

};


Explain Code

TypeScript


import { Agent, callable, routeAgentRequest } from "agents";

import webpush from "web-push";


type Subscription = {

  endpoint: string;

  expirationTime: number | null;

  keys: {

    p256dh: string;

    auth: string;

  };

};


type Reminder = {

  id: string;

  message: string;

  scheduledAt: number;

  sent: boolean;

};


type ReminderAgentState = {

  subscriptions: Subscription[];

  reminders: Reminder[];

};


export class ReminderAgent extends Agent<Env, ReminderAgentState> {

  initialState: ReminderAgentState = {

    subscriptions: [],

    reminders: [],

  };


  @callable()

  getVapidPublicKey(): string {

    return this.env.VAPID_PUBLIC_KEY;

  }


  @callable()

  async subscribe(subscription: Subscription): Promise<{ ok: boolean }> {

    const exists = this.state.subscriptions.some(

      (s) => s.endpoint === subscription.endpoint,

    );

    if (!exists) {

      this.setState({

        ...this.state,

        subscriptions: [...this.state.subscriptions, subscription],

      });

    }

    return { ok: true };

  }


  @callable()

  async unsubscribe(endpoint: string): Promise<{ ok: boolean }> {

    this.setState({

      ...this.state,

      subscriptions: this.state.subscriptions.filter(

        (s) => s.endpoint !== endpoint,

      ),

    });

    return { ok: true };

  }


  @callable()

  async createReminder(

    message: string,

    delaySeconds: number,

  ): Promise<Reminder> {

    const id = crypto.randomUUID();

    const scheduledAt = Date.now() + delaySeconds * 1000;

    const reminder: Reminder = { id, message, scheduledAt, sent: false };


    this.setState({

      ...this.state,

      reminders: [...this.state.reminders, reminder],

    });


    await this.schedule(delaySeconds, "sendReminder", { id, message });

    return reminder;

  }


  async sendReminder(payload: { id: string; message: string }) {

    webpush.setVapidDetails(

      this.env.VAPID_SUBJECT,

      this.env.VAPID_PUBLIC_KEY,

      this.env.VAPID_PRIVATE_KEY,

    );


    const deadEndpoints: string[] = [];


    await Promise.all(

      this.state.subscriptions.map(async (sub) => {

        try {

          await webpush.sendNotification(

            sub,

            JSON.stringify({

              title: "Reminder",

              body: payload.message,

              tag: `reminder-${payload.id}`,

            }),

          );

        } catch (err: unknown) {

          const statusCode =

            err instanceof webpush.WebPushError ? err.statusCode : 0;

          if (statusCode === 404 || statusCode === 410) {

            deadEndpoints.push(sub.endpoint);

          }

        }

      }),

    );


    if (deadEndpoints.length > 0) {

      this.setState({

        ...this.state,

        subscriptions: this.state.subscriptions.filter(

          (s) => !deadEndpoints.includes(s.endpoint),

        ),

      });

    }


    this.setState({

      ...this.state,

      reminders: this.state.reminders.map((r) =>

        r.id === payload.id ? { ...r, sent: true } : r,

      ),

    });


    this.broadcast(

      JSON.stringify({

        type: "reminder_sent",

        id: payload.id,

        timestamp: Date.now(),

      }),

    );

  }

}


export default {

  async fetch(request: Request, env: Env) {

    return (

      (await routeAgentRequest(request, env)) ??

      new Response("Not found", { status: 404 })

    );

  },

} satisfies ExportedHandler<Env>;


Explain Code

sendReminder 回调处理三件事:通过 web-push 库投递推送通知;清理失效的订阅(当订阅不再有效时,推送服务返回 404 或 410);并将事件广播给当前已连接的客户端,以便 UI 实时更新。

设置 service worker

service worker 在浏览器中运行,即使没有打开任何页面也能接收 push 事件。将该文件放在 public/sw.js,使其从域名根路径提供:

JavaScript


self.addEventListener("push", (event) => {

  if (!event.data) return;


  const data = event.data.json();


  event.waitUntil(

    self.registration.showNotification(data.title || "Notification", {

      body: data.body || "",

      icon: data.icon || "/favicon.ico",

      tag: data.tag,

      data: data.data,

    }),

  );

});


self.addEventListener("notificationclick", (event) => {

  event.notification.close();


  event.waitUntil(

    self.clients.matchAll({ type: "window" }).then((windowClients) => {

      for (const client of windowClients) {

        if (

          client.url.includes(self.location.origin) &&

          "focus" in client

        ) {

          return client.focus();

        }

      }

      return self.clients.openWindow("/");

    }),

  );

});


Explain Code

push 事件 handler 解析 JSON payload 并显示原生通知。notificationclick handler 在用户点击通知时聚焦已存在的页面或打开新页面。

构建客户端

客户端需要:注册 service worker、请求通知权限、用 VAPID 公钥订阅 push,并把订阅发送给 Agent。

注册 service worker

JavaScript


useEffect(() => {

  if (!("serviceWorker" in navigator) || !("PushManager" in window)) {

    return;

  }

  navigator.serviceWorker.register("/sw.js");

}, []);


TypeScript


useEffect(() => {

  if (!("serviceWorker" in navigator) || !("PushManager" in window)) {

    return;

  }

  navigator.serviceWorker.register("/sw.js");

}, []);


订阅 push

从 Agent 获取 VAPID 公钥,然后通过 Push API 订阅:

JavaScript


function base64urlToUint8Array(base64url) {

  const padded = base64url + "=".repeat((4 - (base64url.length % 4)) % 4);

  const binary = atob(padded.replace(/-/g, "+").replace(/_/g, "/"));

  const bytes = new Uint8Array(binary.length);

  for (let i = 0; i < binary.length; i++) bytes[i] = binary.charCodeAt(i);

  return bytes;

}


async function subscribeToPush(agent) {

  const permission = await Notification.requestPermission();

  if (permission !== "granted") return;


  const vapidPublicKey = await agent.call("getVapidPublicKey");

  const reg = await navigator.serviceWorker.ready;

  const subscription = await reg.pushManager.subscribe({

    userVisibleOnly: true,

    applicationServerKey: base64urlToUint8Array(vapidPublicKey).buffer,

  });


  const subJson = subscription.toJSON();

  await agent.call("subscribe", [

    {

      endpoint: subJson.endpoint,

      expirationTime: subJson.expirationTime ?? null,

      keys: subJson.keys,

    },

  ]);

}


Explain Code

TypeScript


function base64urlToUint8Array(base64url: string): Uint8Array {

  const padded = base64url + "=".repeat((4 - (base64url.length % 4)) % 4);

  const binary = atob(padded.replace(/-/g, "+").replace(/_/g, "/"));

  const bytes = new Uint8Array(binary.length);

  for (let i = 0; i < binary.length; i++) bytes[i] = binary.charCodeAt(i);

  return bytes;

}


async function subscribeToPush(

  agent: ReturnType<typeof useAgent>,

) {

  const permission = await Notification.requestPermission();

  if (permission !== "granted") return;


  const vapidPublicKey = await agent.call("getVapidPublicKey");

  const reg = await navigator.serviceWorker.ready;

  const subscription = await reg.pushManager.subscribe({

    userVisibleOnly: true,

    applicationServerKey: base64urlToUint8Array(vapidPublicKey).buffer,

  });


  const subJson = subscription.toJSON();

  await agent.call("subscribe", [

    {

      endpoint: subJson.endpoint,

      expirationTime: subJson.expirationTime ?? null,

      keys: subJson.keys,

    },

  ]);

}


Explain Code

创建提醒

订阅存储后,创建提醒只需一次 RPC 调用。Agent 处理调度与投递:

JavaScript


await agent.call("createReminder", ["Check the oven", 300]);


TypeScript


await agent.call("createReminder", ["Check the oven", 300]);


Agent 调度一个 300 秒(5 分钟)的 alarm。当它触发时,推送通知到达——即使用户在数分钟前已经关闭了页面。

配置

wrangler.jsonc

JSONC


{

  "name": "push-notifications",

  "compatibility_date": "2026-01-28",

  "compatibility_flags": ["nodejs_compat"],

  "main": "src/server.ts",

  "durable_objects": {

    "bindings": [

      { "name": "ReminderAgent", "class_name": "ReminderAgent" },

    ],

  },

  "migrations": [{ "tag": "v1", "new_sqlite_classes": ["ReminderAgent"] }],

  "assets": {

    "not_found_handling": "single-page-application",

  },

}


Explain Code

web-push 库需要 nodejs_compat 兼容性标志。

依赖

Terminal window


npm install agents web-push


生产环境注意事项

订阅过期

Push 订阅可能过期或被用户撤销。务必通过移除状态中失效的订阅来处理推送服务返回的 404 和 410 响应,如上面的 sendReminder 示例所示。

每用户 vs 共享 Agent

对大多数应用,推荐每个用户使用一个 Agent(以用户 ID 作为 Agent 名)。这样可以隔离每个用户的订阅与提醒。对于广播式通知(同一条消息发给多人),共享 Agent 可以存储所有订阅,但要注意随着订阅列表增长带来的状态体积问题。

把 push 与 WebSocket 广播结合

对当前已连接的客户端使用 this.broadcast()(即时,无需经过推送服务往返),对离线客户端使用 Web Push。上面的 sendReminder 示例两者都做了——已连接的客户端通过实时 WebSocket 消息接收,离线客户端通过推送通知接收。

多设备

单个用户可能从多个浏览器或设备订阅。Agent 分别存储每个订阅,sendReminder 会遍历所有订阅。每台设备各自收到推送通知。

失败重试

如果推送服务返回 5xx 错误(临时失败),你可以使用 this.schedule() 进行短延迟后的重试:

JavaScript


try {

  await webpush.sendNotification(sub, payload);

} catch (err) {

  const statusCode = err instanceof webpush.WebPushError ? err.statusCode : 0;

  if (statusCode >= 500) {

    await this.schedule(60, "retrySendNotification", {

      endpoint: sub.endpoint,

      payload,

    });

  }

}


Explain Code

TypeScript


try {

  await webpush.sendNotification(sub, payload);

} catch (err: unknown) {

  const statusCode =

    err instanceof webpush.WebPushError ? err.statusCode : 0;

  if (statusCode >= 500) {

    await this.schedule(60, "retrySendNotification", {

      endpoint: sub.endpoint,

      payload,

    });

  }

}


Explain Code

后续步骤

调度任务 了解长时间运行操作的调度与 keepAlive。

存储与同步状态 管理 Agent 状态以存储订阅。

可调用方法 把 Agent 方法暴露为 RPC 端点。