调度任务
调度任务以在未来运行——可以是几秒后、特定的日期/时间,或按周期性 cron 计划。被调度的任务在 Agent 重启后依然存在,并被持久化到 SQLite 中。
被调度的任务可以做用户请求或消息能做的所有事情:发起请求、查询数据库、发送邮件、读写状态。被调度任务可以调用 Agent 上的任何常规方法。
概览
调度系统支持四种模式:
| 模式 | 语法 | 用途 |
|---|---|---|
| Delayed | this.schedule(60, …) | 60 秒后运行 |
| Scheduled | this.schedule(new Date(…), …) | 在指定时间运行 |
| Cron | this.schedule(“0 8 * * *”, …) | 按周期性 cron 运行 |
| Interval | this.scheduleEvery(30, …) | 每 30 秒运行一次 |
底层上,调度使用 Durable Object alarms 在合适的时间唤醒 Agent。任务存储在 SQLite 表中并按顺序执行。
快速开始
JavaScript
import { Agent } from "agents";
export class ReminderAgent extends Agent {
async onRequest(request) {
const url = new URL(request.url);
// Schedule in 30 seconds
await this.schedule(30, "sendReminder", {
message: "Check your email",
});
// Schedule at specific time
await this.schedule(new Date("2025-02-01T09:00:00Z"), "sendReminder", {
message: "Monthly report due",
});
// Schedule recurring (every day at 8am)
await this.schedule("0 8 * * *", "dailyDigest", {
userId: url.searchParams.get("userId"),
});
return new Response("Scheduled!");
}
async sendReminder(payload) {
console.log(`Reminder: ${payload.message}`);
// Send notification, email, etc.
}
async dailyDigest(payload) {
console.log(`Sending daily digest to ${payload.userId}`);
// Generate and send digest
}
}
Explain Code
TypeScript
import { Agent } from "agents";
export class ReminderAgent extends Agent {
async onRequest(request: Request) {
const url = new URL(request.url);
// Schedule in 30 seconds
await this.schedule(30, "sendReminder", {
message: "Check your email",
});
// Schedule at specific time
await this.schedule(new Date("2025-02-01T09:00:00Z"), "sendReminder", {
message: "Monthly report due",
});
// Schedule recurring (every day at 8am)
await this.schedule("0 8 * * *", "dailyDigest", {
userId: url.searchParams.get("userId"),
});
return new Response("Scheduled!");
}
async sendReminder(payload: { message: string }) {
console.log(`Reminder: ${payload.message}`);
// Send notification, email, etc.
}
async dailyDigest(payload: { userId: string }) {
console.log(`Sending daily digest to ${payload.userId}`);
// Generate and send digest
}
}
Explain Code
调度模式
延迟执行
传一个数字来调度任务在延迟若干秒后运行:
JavaScript
// Run in 10 seconds
await this.schedule(10, "processTask", { taskId: "123" });
// Run in 5 minutes (300 seconds)
await this.schedule(300, "sendFollowUp", { email: "[email protected]" });
// Run in 1 hour
await this.schedule(3600, "checkStatus", { orderId: "abc" });
TypeScript
// Run in 10 seconds
await this.schedule(10, "processTask", { taskId: "123" });
// Run in 5 minutes (300 seconds)
await this.schedule(300, "sendFollowUp", { email: "[email protected]" });
// Run in 1 hour
await this.schedule(3600, "checkStatus", { orderId: "abc" });
适用场景:
- 对快速事件进行去抖
- 延迟通知(“你的购物车里还有商品”)
- 带退避的重试
- 限速
按时间调度
传一个 Date 对象来在指定时间调度任务:
JavaScript
// Run tomorrow at noon
const tomorrow = new Date();
tomorrow.setDate(tomorrow.getDate() + 1);
tomorrow.setHours(12, 0, 0, 0);
await this.schedule(tomorrow, "sendReminder", { message: "Meeting time!" });
// Run at a specific timestamp
await this.schedule(new Date("2025-06-15T14:30:00Z"), "triggerEvent", {
eventId: "conference-2025",
});
// Run in 2 hours using Date math
const twoHoursFromNow = new Date(Date.now() + 2 * 60 * 60 * 1000);
await this.schedule(twoHoursFromNow, "checkIn", {});
Explain Code
TypeScript
// Run tomorrow at noon
const tomorrow = new Date();
tomorrow.setDate(tomorrow.getDate() + 1);
tomorrow.setHours(12, 0, 0, 0);
await this.schedule(tomorrow, "sendReminder", { message: "Meeting time!" });
// Run at a specific timestamp
await this.schedule(new Date("2025-06-15T14:30:00Z"), "triggerEvent", {
eventId: "conference-2025",
});
// Run in 2 hours using Date math
const twoHoursFromNow = new Date(Date.now() + 2 * 60 * 60 * 1000);
await this.schedule(twoHoursFromNow, "checkIn", {});
Explain Code
适用场景:
- 预约提醒
- 截止日期通知
- 计划好的内容发布
- 基于时间的触发
周期性 (cron)
传入一个 cron 表达式字符串来周期性调度:
JavaScript
// Every day at 8:00 AM
await this.schedule("0 8 * * *", "dailyReport", {});
// Every hour
await this.schedule("0 * * * *", "hourlyCheck", {});
// Every Monday at 9:00 AM
await this.schedule("0 9 * * 1", "weeklySync", {});
// Every 15 minutes
await this.schedule("*/15 * * * *", "pollForUpdates", {});
// First day of every month at midnight
await this.schedule("0 0 1 * *", "monthlyCleanup", {});
Explain Code
TypeScript
// Every day at 8:00 AM
await this.schedule("0 8 * * *", "dailyReport", {});
// Every hour
await this.schedule("0 * * * *", "hourlyCheck", {});
// Every Monday at 9:00 AM
await this.schedule("0 9 * * 1", "weeklySync", {});
// Every 15 minutes
await this.schedule("*/15 * * * *", "pollForUpdates", {});
// First day of every month at midnight
await this.schedule("0 0 1 * *", "monthlyCleanup", {});
Explain Code
Cron 语法: minute hour day month weekday
| 字段 | 取值范围 | 特殊字符 |
|---|---|---|
| Minute | 0-59 | * , - / |
| Hour | 0-23 | * , - / |
| Day of Month | 1-31 | * , - / |
| Month | 1-12 | * , - / |
| Day of Week | 0-6 (0=Sunday) | * , - / |
常见模式:
JavaScript
"* * * * *"; // Every minute
"*/5 * * * *"; // Every 5 minutes
"0 * * * *"; // Every hour (on the hour)
"0 0 * * *"; // Every day at midnight
"0 8 * * 1-5"; // Weekdays at 8am
"0 0 * * 0"; // Every Sunday at midnight
"0 0 1 * *"; // First of every month
TypeScript
"* * * * *"; // Every minute
"*/5 * * * *"; // Every 5 minutes
"0 * * * *"; // Every hour (on the hour)
"0 0 * * *"; // Every day at midnight
"0 8 * * 1-5"; // Weekdays at 8am
"0 0 * * 0"; // Every Sunday at midnight
"0 0 1 * *"; // First of every month
适用场景:
- 每日/每周报表
- 周期性清理任务
- 轮询外部服务
- 健康检查
- 订阅续费
Cron schedule 默认是幂等的——以相同的 cron 表达式、回调和 payload 多次调用 schedule() 会返回已存在的 schedule,而不是创建副本。这使得在 onStart() 中设置 cron schedule 是安全的。
间隔 (Interval)
使用 scheduleEvery() 以固定的间隔(秒)运行任务。与 cron 不同,间隔支持亚分钟级精度和任意时长:
JavaScript
// Poll every 30 seconds
await this.scheduleEvery(30, "poll", { source: "api" });
// Health check every 45 seconds
await this.scheduleEvery(45, "healthCheck", {});
// Sync every 90 seconds (1.5 minutes - cannot be expressed in cron)
await this.scheduleEvery(90, "syncData", { destination: "warehouse" });
TypeScript
// Poll every 30 seconds
await this.scheduleEvery(30, "poll", { source: "api" });
// Health check every 45 seconds
await this.scheduleEvery(45, "healthCheck", {});
// Sync every 90 seconds (1.5 minutes - cannot be expressed in cron)
await this.scheduleEvery(90, "syncData", { destination: "warehouse" });
与 cron 的关键差异:
| 特性 | Cron | Interval |
|---|---|---|
| 最小精度 | 1 分钟 | 1 秒 |
| 任意间隔 | 否(必须符合 cron 模式) | 是 |
| 固定时间表 | 是(例如,“每天 8 点”) | 否(相对于启动时间) |
| 重叠预防 | 否 | 是(内置) |
幂等性:
scheduleEvery() 在回调名称、间隔和 payload 的组合上是幂等的——以相同参数多次调用不会创建重复 schedule。这使得在 onStart() 中调用是安全的,而 onStart() 在每次 Durable Object 唤醒时都会运行:
JavaScript
class MyAgent extends Agent {
async onStart() {
// Safe to call on every wake — only one schedule is created
await this.scheduleEvery(30, "poll", { source: "api" });
}
}
TypeScript
class MyAgent extends Agent {
async onStart() {
// Safe to call on every wake — only one schedule is created
await this.scheduleEvery(30, "poll", { source: "api" });
}
}
不同的间隔或 payload 会创建一个新的、独立的 schedule。
重叠预防:
如果回调耗时超过间隔,下一次执行会被跳过(而不是排队)。这避免了资源使用失控:
JavaScript
class PollingAgent extends Agent {
async poll() {
// If this takes 45 seconds and interval is 30 seconds,
// the next poll is skipped (with a warning logged)
const data = await slowExternalApi();
await this.processData(data);
}
}
// Set up 30-second interval
await this.scheduleEvery(30, "poll", {});
Explain Code
TypeScript
class PollingAgent extends Agent {
async poll() {
// If this takes 45 seconds and interval is 30 seconds,
// the next poll is skipped (with a warning logged)
const data = await slowExternalApi();
await this.processData(data);
}
}
// Set up 30-second interval
await this.scheduleEvery(30, "poll", {});
Explain Code
发生跳过时,你会在日志中看到一个警告:
Skipping interval schedule abc123: previous execution still running
错误容忍:
如果回调抛出错误,interval 仍会继续——只是该次执行失败:
JavaScript
class SyncAgent extends Agent {
async syncData() {
// Even if this throws, the interval keeps running
const response = await fetch("https://api.example.com/data");
if (!response.ok) throw new Error("Sync failed");
// ...
}
}
TypeScript
class SyncAgent extends Agent {
async syncData() {
// Even if this throws, the interval keeps running
const response = await fetch("https://api.example.com/data");
if (!response.ok) throw new Error("Sync failed");
// ...
}
}
适用场景:
- 亚分钟级轮询(每 10、30、45 秒)
- 无法映射到 cron 的间隔(每 90 秒、每 7 分钟)
- 精确控制的限速 API 轮询
- 实时数据同步
管理已调度的任务
获取一个 schedule
通过 ID 获取已调度的任务:
JavaScript
const schedule = this.getSchedule(scheduleId);
if (schedule) {
console.log(
`Task ${schedule.id} will run at ${new Date(schedule.time * 1000)}`,
);
console.log(`Callback: ${schedule.callback}`);
console.log(`Type: ${schedule.type}`); // "scheduled" | "delayed" | "cron" | "interval"
} else {
console.log("Schedule not found");
}
Explain Code
TypeScript
const schedule = this.getSchedule(scheduleId);
if (schedule) {
console.log(
`Task ${schedule.id} will run at ${new Date(schedule.time * 1000)}`,
);
console.log(`Callback: ${schedule.callback}`);
console.log(`Type: ${schedule.type}`); // "scheduled" | "delayed" | "cron" | "interval"
} else {
console.log("Schedule not found");
}
Explain Code
列出 schedule
带可选过滤条件查询已调度的任务:
JavaScript
// Get all scheduled tasks
const allSchedules = this.getSchedules();
// Get only cron jobs
const cronJobs = this.getSchedules({ type: "cron" });
// Get tasks in the next hour
const upcoming = this.getSchedules({
timeRange: {
start: new Date(),
end: new Date(Date.now() + 60 * 60 * 1000),
},
});
// Get a specific task by ID
const specific = this.getSchedules({ id: "abc123" });
// Combine filters
const upcomingCronJobs = this.getSchedules({
type: "cron",
timeRange: {
start: new Date(),
end: new Date(Date.now() + 24 * 60 * 60 * 1000),
},
});
Explain Code
TypeScript
// Get all scheduled tasks
const allSchedules = this.getSchedules();
// Get only cron jobs
const cronJobs = this.getSchedules({ type: "cron" });
// Get tasks in the next hour
const upcoming = this.getSchedules({
timeRange: {
start: new Date(),
end: new Date(Date.now() + 60 * 60 * 1000),
},
});
// Get a specific task by ID
const specific = this.getSchedules({ id: "abc123" });
// Combine filters
const upcomingCronJobs = this.getSchedules({
type: "cron",
timeRange: {
start: new Date(),
end: new Date(Date.now() + 24 * 60 * 60 * 1000),
},
});
Explain Code
取消一个 schedule
在执行前移除已调度的任务:
JavaScript
const cancelled = await this.cancelSchedule(scheduleId);
if (cancelled) {
console.log("Schedule cancelled successfully");
} else {
console.log("Schedule not found (may have already executed)");
}
TypeScript
const cancelled = await this.cancelSchedule(scheduleId);
if (cancelled) {
console.log("Schedule cancelled successfully");
} else {
console.log("Schedule not found (may have already executed)");
}
示例:可取消的提醒
JavaScript
class ReminderAgent extends Agent {
async setReminder(userId, message, delaySeconds) {
const schedule = await this.schedule(delaySeconds, "sendReminder", {
userId,
message,
});
// Store the schedule ID so user can cancel later
this.sql`
INSERT INTO user_reminders (user_id, schedule_id, message)
VALUES (${userId}, ${schedule.id}, ${message})
`;
return schedule.id;
}
async cancelReminder(scheduleId) {
const cancelled = await this.cancelSchedule(scheduleId);
if (cancelled) {
this.sql`DELETE FROM user_reminders WHERE schedule_id = ${scheduleId}`;
}
return cancelled;
}
async sendReminder(payload) {
// Send the reminder...
// Clean up the record
this.sql`DELETE FROM user_reminders WHERE user_id = ${payload.userId}`;
}
}
Explain Code
TypeScript
class ReminderAgent extends Agent {
async setReminder(userId: string, message: string, delaySeconds: number) {
const schedule = await this.schedule(delaySeconds, "sendReminder", {
userId,
message,
});
// Store the schedule ID so user can cancel later
this.sql`
INSERT INTO user_reminders (user_id, schedule_id, message)
VALUES (${userId}, ${schedule.id}, ${message})
`;
return schedule.id;
}
async cancelReminder(scheduleId: string) {
const cancelled = await this.cancelSchedule(scheduleId);
if (cancelled) {
this.sql`DELETE FROM user_reminders WHERE schedule_id = ${scheduleId}`;
}
return cancelled;
}
async sendReminder(payload: { userId: string; message: string }) {
// Send the reminder...
// Clean up the record
this.sql`DELETE FROM user_reminders WHERE user_id = ${payload.userId}`;
}
}
Explain Code
Schedule 对象
当你创建或获取一个 schedule 时,会得到一个 Schedule 对象:
TypeScript
type Schedule<T> = {
id: string; // Unique identifier
callback: string; // Method name to call
payload: T; // Data passed to the callback
time: number; // Unix timestamp (seconds) of next execution
} & (
| { type: "scheduled" } // One-time at specific date
| { type: "delayed"; delayInSeconds: number } // One-time after delay
| { type: "cron"; cron: string } // Recurring (cron expression)
| { type: "interval"; intervalSeconds: number } // Recurring (fixed interval)
);
Explain Code
示例:
JavaScript
const schedule = await this.schedule(60, "myTask", { foo: "bar" });
console.log(schedule);
// {
// id: "abc123xyz",
// callback: "myTask",
// payload: { foo: "bar" },
// time: 1706745600,
// type: "delayed",
// delayInSeconds: 60
// }
Explain Code
TypeScript
const schedule = await this.schedule(60, "myTask", { foo: "bar" });
console.log(schedule);
// {
// id: "abc123xyz",
// callback: "myTask",
// payload: { foo: "bar" },
// time: 1706745600,
// type: "delayed",
// delayInSeconds: 60
// }
Explain Code
模式
在回调中重新调度
对于动态的周期性 schedule,可在回调内部安排下一次运行:
JavaScript
class PollingAgent extends Agent {
async startPolling(intervalSeconds) {
await this.schedule(intervalSeconds, "poll", { interval: intervalSeconds });
}
async poll(payload) {
try {
const data = await fetch("https://api.example.com/updates");
await this.processUpdates(await data.json());
} catch (error) {
console.error("Polling failed:", error);
}
// Schedule the next poll (regardless of success/failure)
await this.schedule(payload.interval, "poll", payload);
}
async stopPolling() {
// Cancel all polling schedules
const schedules = this.getSchedules({ type: "delayed" });
for (const schedule of schedules) {
if (schedule.callback === "poll") {
await this.cancelSchedule(schedule.id);
}
}
}
}
Explain Code
TypeScript
class PollingAgent extends Agent {
async startPolling(intervalSeconds: number) {
await this.schedule(intervalSeconds, "poll", { interval: intervalSeconds });
}
async poll(payload: { interval: number }) {
try {
const data = await fetch("https://api.example.com/updates");
await this.processUpdates(await data.json());
} catch (error) {
console.error("Polling failed:", error);
}
// Schedule the next poll (regardless of success/failure)
await this.schedule(payload.interval, "poll", payload);
}
async stopPolling() {
// Cancel all polling schedules
const schedules = this.getSchedules({ type: "delayed" });
for (const schedule of schedules) {
if (schedule.callback === "poll") {
await this.cancelSchedule(schedule.id);
}
}
}
}
Explain Code
指数退避重试
JavaScript
class RetryAgent extends Agent {
async attemptTask(payload) {
try {
await this.doWork(payload.taskId);
console.log(
`Task ${payload.taskId} succeeded on attempt ${payload.attempt}`,
);
} catch (error) {
if (payload.attempt >= payload.maxAttempts) {
console.error(
`Task ${payload.taskId} failed after ${payload.maxAttempts} attempts`,
);
return;
}
// Exponential backoff: 2^attempt seconds (2s, 4s, 8s, 16s...)
const delaySeconds = Math.pow(2, payload.attempt);
await this.schedule(delaySeconds, "attemptTask", {
...payload,
attempt: payload.attempt + 1,
});
console.log(`Retrying task ${payload.taskId} in ${delaySeconds}s`);
}
}
async doWork(taskId) {
// Your actual work here
}
}
Explain Code
TypeScript
class RetryAgent extends Agent {
async attemptTask(payload: {
taskId: string;
attempt: number;
maxAttempts: number;
}) {
try {
await this.doWork(payload.taskId);
console.log(
`Task ${payload.taskId} succeeded on attempt ${payload.attempt}`,
);
} catch (error) {
if (payload.attempt >= payload.maxAttempts) {
console.error(
`Task ${payload.taskId} failed after ${payload.maxAttempts} attempts`,
);
return;
}
// Exponential backoff: 2^attempt seconds (2s, 4s, 8s, 16s...)
const delaySeconds = Math.pow(2, payload.attempt);
await this.schedule(delaySeconds, "attemptTask", {
...payload,
attempt: payload.attempt + 1,
});
console.log(`Retrying task ${payload.taskId} in ${delaySeconds}s`);
}
}
async doWork(taskId: string) {
// Your actual work here
}
}
Explain Code
自销毁的 Agent
你可以安全地在被调度的回调中调用 this.destroy():
JavaScript
class TemporaryAgent extends Agent {
async onStart() {
// Self-destruct in 24 hours
await this.schedule(24 * 60 * 60, "cleanup", {});
}
async cleanup() {
// Perform final cleanup
console.log("Agent lifetime expired, cleaning up...");
// This is safe to call from a scheduled callback
await this.destroy();
}
}
Explain Code
TypeScript
class TemporaryAgent extends Agent {
async onStart() {
// Self-destruct in 24 hours
await this.schedule(24 * 60 * 60, "cleanup", {});
}
async cleanup() {
// Perform final cleanup
console.log("Agent lifetime expired, cleaning up...");
// This is safe to call from a scheduled callback
await this.destroy();
}
}
Explain Code
Note
当从被调度的任务中调用 destroy() 时,Agents SDK 会延迟销毁,以确保被调度的回调成功完成。Agent 实例会在回调执行结束后立即被驱逐。
AI 辅助调度
SDK 包含使用 AI 解析自然语言调度请求的工具。
getSchedulePrompt()
返回一个用于将自然语言解析成调度参数的系统提示:
JavaScript
import { getSchedulePrompt, scheduleSchema } from "agents";
import { generateObject } from "ai";
import { openai } from "@ai-sdk/openai";
class SmartScheduler extends Agent {
async parseScheduleRequest(userInput) {
const result = await generateObject({
model: openai("gpt-4o"),
system: getSchedulePrompt({ date: new Date() }),
prompt: userInput,
schema: scheduleSchema,
});
return result.object;
}
async handleUserRequest(input) {
// Parse: "remind me to call mom tomorrow at 3pm"
const parsed = await this.parseScheduleRequest(input);
// parsed = {
// description: "call mom",
// when: {
// type: "scheduled",
// date: "2025-01-30T15:00:00Z"
// }
// }
if (parsed.when.type === "scheduled" && parsed.when.date) {
await this.schedule(new Date(parsed.when.date), "sendReminder", {
message: parsed.description,
});
} else if (parsed.when.type === "delayed" && parsed.when.delayInSeconds) {
await this.schedule(parsed.when.delayInSeconds, "sendReminder", {
message: parsed.description,
});
} else if (parsed.when.type === "cron" && parsed.when.cron) {
await this.schedule(parsed.when.cron, "sendReminder", {
message: parsed.description,
});
}
}
async sendReminder(payload) {
console.log(`Reminder: ${payload.message}`);
}
}
Explain Code
TypeScript
import { getSchedulePrompt, scheduleSchema } from "agents";
import { generateObject } from "ai";
import { openai } from "@ai-sdk/openai";
class SmartScheduler extends Agent {
async parseScheduleRequest(userInput: string) {
const result = await generateObject({
model: openai("gpt-4o"),
system: getSchedulePrompt({ date: new Date() }),
prompt: userInput,
schema: scheduleSchema,
});
return result.object;
}
async handleUserRequest(input: string) {
// Parse: "remind me to call mom tomorrow at 3pm"
const parsed = await this.parseScheduleRequest(input);
// parsed = {
// description: "call mom",
// when: {
// type: "scheduled",
// date: "2025-01-30T15:00:00Z"
// }
// }
if (parsed.when.type === "scheduled" && parsed.when.date) {
await this.schedule(new Date(parsed.when.date), "sendReminder", {
message: parsed.description,
});
} else if (parsed.when.type === "delayed" && parsed.when.delayInSeconds) {
await this.schedule(parsed.when.delayInSeconds, "sendReminder", {
message: parsed.description,
});
} else if (parsed.when.type === "cron" && parsed.when.cron) {
await this.schedule(parsed.when.cron, "sendReminder", {
message: parsed.description,
});
}
}
async sendReminder(payload: { message: string }) {
console.log(`Reminder: ${payload.message}`);
}
}
Explain Code
scheduleSchema
一个用于校验解析后的调度数据的 Zod schema。它在 when.type 上使用 discriminated union,因此每个变体只包含其需要的字段:
JavaScript
import { scheduleSchema } from "agents";
// The schema is a discriminated union:
// {
// description: string,
// when:
// | { type: "scheduled", date: string } // ISO 8601 date string
// | { type: "delayed", delayInSeconds: number }
// | { type: "cron", cron: string }
// | { type: "no-schedule" }
// }
Explain Code
TypeScript
import { scheduleSchema } from "agents";
// The schema is a discriminated union:
// {
// description: string,
// when:
// | { type: "scheduled", date: string } // ISO 8601 date string
// | { type: "delayed", delayInSeconds: number }
// | { type: "cron", cron: string }
// | { type: "no-schedule" }
// }
Explain Code
Note
日期返回的是 ISO 8601 字符串(而非 Date 对象),以兼容 Zod v3 与 v4 的 JSON schema 生成。
Scheduling vs Queue vs Workflows
| 特性 | Queue | Scheduling | Workflows |
|---|---|---|---|
| 何时 | 立即(FIFO) | 未来某个时间 | 未来某个时间 |
| 执行 | 顺序 | 在调度时间 | 多步 |
| 重试 | 内置 | 内置 | 自动 |
| 持久化 | SQLite | SQLite | Workflow 引擎 |
| 周期性 | 否 | 是(cron) | 否(请用 Scheduling) |
| 复杂逻辑 | 否 | 否 | 是 |
| 人工审批 | 否 | 否 | 是 |
何时使用 Queue:
- 你需要后台处理而不阻塞响应
- 任务应尽快运行但不需要阻塞
- 顺序很重要(FIFO)
何时使用 Scheduling:
- 任务需要在特定时间运行
- 你需要周期性任务(cron)
- 延迟执行(去抖、重试)
何时使用 Workflows:
- 有依赖关系的多步流程
- 自动重试且带退避
- 人在回路中的审批
- 长时间运行的任务(数分钟到数小时)
API 参考
schedule()
TypeScript
async schedule<T>(
when: Date | string | number,
callback: keyof this,
payload?: T,
options?: { retry?: RetryOptions; idempotent?: boolean }
): Promise<Schedule<T>>
调度一个任务以在未来执行。
参数:
when- 何时执行:number(延迟秒数)、Date(指定时间)或string(cron 表达式)callback- 要调用的方法名payload- 传给回调的数据(必须可 JSON 序列化)options.retry- 可选的重试配置。详情请参阅 Retriesoptions.idempotent- 按 callback + payload 去重。cron schedule 默认true,delayed 与基于 Date 的 schedule 默认false
返回: 一个包含任务详情的 Schedule 对象
幂等性:
Cron schedule 默认是幂等的——以相同的 callback、cron 表达式和 payload 多次调用 schedule("0 * * * *", "tick") 会返回已存在的 schedule,而不是创建副本。设置 idempotent: false 可覆盖此行为。
对于 delayed 与基于 Date 的 schedule,设置 idempotent: true 可启用同样的去重(按 callback + payload 匹配)。这在 onStart() 中调用 schedule() 时尤其有用,可以避免在 Durable Object 重启间累积重复行:
JavaScript
class MyAgent extends Agent {
async onStart() {
// Without idempotent: true, this creates a new row on every DO restart
await this.schedule(3600, "hourlyCleanup", {}, { idempotent: true });
}
}
TypeScript
class MyAgent extends Agent {
async onStart() {
// Without idempotent: true, this creates a new row on every DO restart
await this.schedule(3600, "hourlyCleanup", {}, { idempotent: true });
}
}
Warning
为不存在的方法设置回调的任务会抛出异常。请确保 callback 参数中指定的方法存在于你的 Agent 类上。
scheduleEvery()
TypeScript
async scheduleEvery<T>(
intervalSeconds: number,
callback: keyof this,
payload?: T,
options?: { retry?: RetryOptions }
): Promise<Schedule<T>>
以固定间隔重复运行任务。
参数:
intervalSeconds- 两次执行之间的秒数(必须大于 0)callback- 要调用的方法名payload- 传给回调的数据(必须可 JSON 序列化)options.retry- 可选的重试配置。详情请参阅 Retries。
返回: 一个 type: "interval" 的 Schedule 对象
行为:
- 第一次执行在
intervalSeconds后(不是立即) - 如果到下一次执行时间时回调仍在运行,则跳过(防重叠)
- 如果回调抛出错误,interval 仍继续
- 通过
cancelSchedule(id)取消整个 interval
getSchedule()
TypeScript
getSchedule<T>(id: string): Schedule<T> | undefined
按 ID 获取已调度的任务。未找到时返回 undefined。此方法是同步的。
getSchedules()
TypeScript
getSchedules<T>(criteria?: {
id?: string;
type?: "scheduled" | "delayed" | "cron" | "interval";
timeRange?: { start?: Date; end?: Date };
}): Schedule<T>[]
获取符合条件的已调度任务。此方法是同步的。
cancelSchedule()
TypeScript
async cancelSchedule(id: string): Promise<boolean>
取消一个已调度的任务。已取消返回 true,未找到返回 false。
keepAlive()
TypeScript
async keepAlive(): Promise<() => void>
通过创建 30 秒的心跳 schedule,防止 Durable Object 因不活动被驱逐。返回一个调用即可取消心跳的 disposer 函数。disposer 是幂等的——多次调用是安全的。
工作完成后务必调用 disposer——否则心跳会无限期持续。
JavaScript
const dispose = await this.keepAlive();
try {
// Long-running work that must not be interrupted
const result = await longRunningComputation();
await sendResults(result);
} finally {
dispose();
}
TypeScript
const dispose = await this.keepAlive();
try {
// Long-running work that must not be interrupted
const result = await longRunningComputation();
await sendResults(result);
} finally {
dispose();
}
keepAliveWhile()
TypeScript
async keepAliveWhile<T>(fn: () => Promise<T>): Promise<T>
在保持 Durable Object 活跃的同时运行一个 async 函数。心跳会在函数执行前自动开始,并在其完成时停止(无论成功或抛出)。返回该函数返回的值。
这是使用 keepAlive 的推荐方式——它保证清理。
JavaScript
const result = await this.keepAliveWhile(async () => {
const data = await longRunningComputation();
return data;
});
TypeScript
const result = await this.keepAliveWhile(async () => {
const data = await longRunningComputation();
return data;
});
让 Agent 保持活跃
Durable Object 在不活动一段时间后会被驱逐(通常 70-140 秒内没有传入请求、WebSocket 消息或 alarm)。在长时间运行的操作中——流式 LLM 响应、等待外部 API、运行多步计算——Agent 可能在中途被驱逐。
keepAlive() 通过创建 30 秒的心跳 schedule 来防止这种情况。内部心跳回调是 no-op——是 alarm 触发本身重置了不活动计时器。由于它使用调度系统:
- 心跳不会与你自己的 schedule 冲突(调度系统通过单一 alarm 槽位复用)
- 心跳如果你需要也会出现在
getSchedules()中 - 多次并发的
keepAlive()各自获得独立 schedule,互不干扰
多个并发调用者
每次 keepAlive() 调用都返回独立的 disposer:
JavaScript
const dispose1 = await this.keepAlive();
const dispose2 = await this.keepAlive();
// Both heartbeats are active
dispose1(); // Only cancels the first heartbeat
// Agent is still alive via dispose2's heartbeat
dispose2(); // Now the agent can go idle
TypeScript
const dispose1 = await this.keepAlive();
const dispose2 = await this.keepAlive();
// Both heartbeats are active
dispose1(); // Only cancels the first heartbeat
// Agent is still alive via dispose2's heartbeat
dispose2(); // Now the agent can go idle
AIChatAgent
AIChatAgent 在流式响应期间会自动调用 keepAlive()。使用 AIChatAgent 时,你不需要自己添加——每个 LLM 流默认都受到防止空闲驱逐的保护。
何时使用 keepAlive
| 场景 | 使用 keepAlive? |
|---|---|
| 通过 AIChatAgent 流式响应 LLM | 否——已内置 |
| 自定义 Agent 中的长时间计算 | 是 |
| 等待较慢的外部 API | 是 |
| 多步工具执行 | 是 |
| 短的请求-响应处理器 | 否——不需要 |
| 通过调度或 workflow 进行的后台工作 | 否——alarm 已经让 DO 保持活跃 |
Note
keepAlive() 标记为 @experimental,在不同版本之间可能会有变化。
限制
- 最大任务数: 受 SQLite 存储限制(每个任务一行)。实际上每个 Agent 数万级别。
- 任务大小: 每个任务(包括 payload)最多 2MB。
- 最小延迟: 0 秒(在下次 alarm tick 时运行)
- Cron 精度: 分钟级(不是秒)
- Interval 精度: 秒级
- Cron 任务: 执行后,自动重新调度到下一次发生时间
- Interval 任务: 执行后,重新调度到
now + intervalSeconds;如果仍在运行则跳过
后续步骤
推送通知 使用调度和 web-push 发送浏览器推送通知。
队列任务 立即执行的后台任务处理。
运行 Workflows 持久化的多步后台处理。
Agents API Agents SDK 的完整 API 参考。