任务队列
Agents SDK 提供了一套内置的队列系统,让你可以将任务安排为异步执行。它适用于后台处理、延迟操作以及不需要立即执行的工作负载管理。
概览
队列系统内建于基类 Agent。任务存储在一个 SQLite 表中,自动按 FIFO(先进先出)顺序处理。
QueueItem 类型
TypeScript
type QueueItem<T> = {
id: string; // Unique identifier for the queued task
payload: T; // Data to pass to the callback function
callback: keyof Agent; // Name of the method to call
created_at: number; // Timestamp when the task was created
retry?: RetryOptions; // Retry options for this task
};
核心方法
queue()
将任务加入队列以备后续执行。
TypeScript
async queue<T>(
callback: keyof this,
payload: T,
options?: { retry?: RetryOptions }
): Promise<string>
参数:
callback- 处理任务时要调用的方法名payload- 传给 callback 方法的数据options- 可选配置:retry- callback 执行的重试选项。如果 callback 抛出异常,会按指数退避重试。RetryOptions的细节请参考重试
返回值: 该队列任务的唯一 ID
示例:
JavaScript
class MyAgent extends Agent {
async processEmail(data) {
// Process the email
console.log(`Processing email: ${data.subject}`);
}
async onMessage(message) {
// Queue an email processing task
const taskId = await this.queue("processEmail", {
email: "[email protected]",
subject: "Welcome!",
});
console.log(`Queued task with ID: ${taskId}`);
}
}
TypeScript
class MyAgent extends Agent {
async processEmail(data: { email: string; subject: string }) {
// Process the email
console.log(`Processing email: ${data.subject}`);
}
async onMessage(message: string) {
// Queue an email processing task
const taskId = await this.queue("processEmail", {
email: "[email protected]",
subject: "Welcome!",
});
console.log(`Queued task with ID: ${taskId}`);
}
}
dequeue()
按 ID 从队列中移除指定任务。该方法是同步的。
TypeScript
dequeue(id: string): void
参数:
id- 要移除的任务 ID
示例:
JavaScript
// Remove a specific task
agent.dequeue("abc123def");
TypeScript
// Remove a specific task
agent.dequeue("abc123def");
dequeueAll()
移除队列中的所有任务。该方法是同步的。
TypeScript
dequeueAll(): void
示例:
JavaScript
// Clear the entire queue
agent.dequeueAll();
TypeScript
// Clear the entire queue
agent.dequeueAll();
dequeueAllByCallback()
移除所有匹配特定 callback 方法的任务。该方法是同步的。
TypeScript
dequeueAllByCallback(callback: string): void
参数:
callback- callback 方法的名称
示例:
JavaScript
// Remove all email processing tasks
agent.dequeueAllByCallback("processEmail");
TypeScript
// Remove all email processing tasks
agent.dequeueAllByCallback("processEmail");
getQueue()
按 ID 获取一个特定的队列任务。该方法是同步的。
TypeScript
getQueue<T>(id: string): QueueItem<T> | undefined
参数:
id- 要获取的任务 ID
返回值: 带有解析后 payload 的 QueueItem,如果未找到则返回 undefined
返回前 payload 会自动从 JSON 解析。
示例:
JavaScript
const task = agent.getQueue("abc123def");
if (task) {
console.log(`Task callback: ${task.callback}`);
console.log(`Task payload:`, task.payload);
}
TypeScript
const task = agent.getQueue("abc123def");
if (task) {
console.log(`Task callback: ${task.callback}`);
console.log(`Task payload:`, task.payload);
}
getQueues()
获取所有 payload 中匹配指定 key-value 对的队列任务。该方法是同步的。
TypeScript
getQueues<T>(key: string, value: string): QueueItem<T>[]
参数:
key- 在 payload 中按此 key 过滤value- 要匹配的值
返回值: 匹配的 QueueItem 对象数组
该方法会读取所有队列项,然后在内存中解析每个 payload 并检查指定 key 是否匹配 value。
示例:
JavaScript
// Find all tasks for a specific user
const userTasks = agent.getQueues("userId", "12345");
TypeScript
// Find all tasks for a specific user
const userTasks = agent.getQueues("userId", "12345");
队列处理工作流程
- 校验:调用
queue()时,该方法会校验 callback 是否作为 agent 上的函数存在。 - 自动处理:入队后,系统会自动尝试 flush 队列。
- FIFO 顺序:任务按创建顺序(
created_at时间戳)处理。 - 上下文保留:每个排队任务都使用相同的 agent 上下文(connection、request、email)运行。
- 自动出队:成功执行的任务会自动从队列中移除。
- 错误处理:如果执行时 callback 方法不存在,会记录错误并跳过该任务。
- 持久化:任务存储在
cf_agents_queuesSQL 表中,可在 agent 重启后保留。
队列回调方法
为排队任务定义回调方法时,必须遵循以下签名:
TypeScript
async callbackMethod(payload: unknown, queueItem: QueueItem): Promise<void>
示例:
JavaScript
class MyAgent extends Agent {
async sendNotification(payload, queueItem) {
console.log(`Processing task ${queueItem.id}`);
console.log(
`Sending notification to user ${payload.userId}: ${payload.message}`,
);
// Your notification logic here
await this.notificationService.send(payload.userId, payload.message);
}
async onUserSignup(userData) {
// Queue a welcome notification
await this.queue("sendNotification", {
userId: userData.id,
message: "Welcome to our platform!",
});
}
}
TypeScript
class MyAgent extends Agent {
async sendNotification(
payload: { userId: string; message: string },
queueItem: QueueItem<{ userId: string; message: string }>,
) {
console.log(`Processing task ${queueItem.id}`);
console.log(
`Sending notification to user ${payload.userId}: ${payload.message}`,
);
// Your notification logic here
await this.notificationService.send(payload.userId, payload.message);
}
async onUserSignup(userData: any) {
// Queue a welcome notification
await this.queue("sendNotification", {
userId: userData.id,
message: "Welcome to our platform!",
});
}
}
使用场景
后台处理
JavaScript
class DataProcessor extends Agent {
async processLargeDataset(data) {
const results = await this.heavyComputation(data.datasetId);
await this.notifyUser(data.userId, results);
}
async onDataUpload(uploadData) {
// Queue the processing instead of doing it synchronously
await this.queue("processLargeDataset", {
datasetId: uploadData.id,
userId: uploadData.userId,
});
return { message: "Data upload received, processing started" };
}
}
TypeScript
class DataProcessor extends Agent {
async processLargeDataset(data: { datasetId: string; userId: string }) {
const results = await this.heavyComputation(data.datasetId);
await this.notifyUser(data.userId, results);
}
async onDataUpload(uploadData: any) {
// Queue the processing instead of doing it synchronously
await this.queue("processLargeDataset", {
datasetId: uploadData.id,
userId: uploadData.userId,
});
return { message: "Data upload received, processing started" };
}
}
批处理
JavaScript
class BatchProcessor extends Agent {
async processBatch(data) {
for (const item of data.items) {
await this.processItem(item);
}
console.log(`Completed batch ${data.batchId}`);
}
async onLargeRequest(items) {
// Split large requests into smaller batches
const batchSize = 10;
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
await this.queue("processBatch", {
items: batch,
batchId: `batch-${i / batchSize + 1}`,
});
}
}
}
TypeScript
class BatchProcessor extends Agent {
async processBatch(data: { items: any[]; batchId: string }) {
for (const item of data.items) {
await this.processItem(item);
}
console.log(`Completed batch ${data.batchId}`);
}
async onLargeRequest(items: any[]) {
// Split large requests into smaller batches
const batchSize = 10;
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
await this.queue("processBatch", {
items: batch,
batchId: `batch-${i / batchSize + 1}`,
});
}
}
}
错误处理
使用内置的 retry 选项,而不是手动重新入队的逻辑。当 callback 抛出异常时,任务会按指数退避自动重试:
JavaScript
class RobustAgent extends Agent {
async reliableTask(payload, queueItem) {
console.log(`Processing task ${queueItem.id}`);
const response = await fetch(payload.url);
if (!response.ok) {
throw new Error(`Request failed: ${response.status}`);
}
}
async onMessage(connection, message) {
await this.queue(
"reliableTask",
{ url: "https://api.example.com/data" },
{
retry: {
maxAttempts: 5,
baseDelayMs: 500,
maxDelayMs: 10_000,
},
},
);
}
}
TypeScript
class RobustAgent extends Agent {
async reliableTask(payload: { url: string }, queueItem: QueueItem) {
console.log(`Processing task ${queueItem.id}`);
const response = await fetch(payload.url);
if (!response.ok) {
throw new Error(`Request failed: ${response.status}`);
}
}
async onMessage(connection: Connection, message: WSMessage) {
await this.queue(
"reliableTask",
{ url: "https://api.example.com/data" },
{
retry: {
maxAttempts: 5,
baseDelayMs: 500,
maxDelayMs: 10_000,
},
},
);
}
}
如果未提供 retry 选项,会使用 static options.retry 的类级默认值(3 次重试,100ms 基础延迟,3s 最大延迟)。完整细节请参考重试。
最佳实践
- 保持 payload 小巧:payload 会被 JSON 序列化并存储到数据库。
- 幂等操作:把 callback 方法设计成可安全重试的。
- 错误处理:在 callback 方法中加入合适的错误处理。
- 监控:用日志跟踪队列处理。
- 清理:必要时定期清理已完成或失败的任务。
与其他特性的集成
队列系统可以与 Agent SDK 的其他特性配合:
- 状态管理:在排队的回调中访问 agent 状态。
- 调度:与 schedule() 结合,实现基于时间的队列处理。
- 上下文:排队的任务保留原始请求上下文。
- 数据库:与其他 agent 数据共用同一个数据库。
限制
- 任务按顺序处理,不会并行。
- 没有优先级机制(只有 FIFO)。
- 队列处理在 agent 执行期间进行,而不是作为独立的后台 job。
队列(Queue)与调度(Schedule)对比
如果希望任务尽快按顺序执行,使用 队列;如果需要任务在特定时间或周期性运行,使用 调度。
| 特性 | 队列 | 调度 |
|---|---|---|
| 执行时机 | 立即(FIFO) | 指定时间或 cron |
| 使用场景 | 后台处理 | 延迟或周期性任务 |
| 存储 | cf_agents_queues 表 | cf_agents_schedules 表 |
下一步
Agents API Agents SDK 的完整 API 参考。
调度任务 基于 cron 与延迟的时间调度。
运行 Workflows 持久化的多步骤后台处理。