Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

任务队列

Agents SDK 提供了一套内置的队列系统,让你可以将任务安排为异步执行。它适用于后台处理、延迟操作以及不需要立即执行的工作负载管理。

概览

队列系统内建于基类 Agent。任务存储在一个 SQLite 表中,自动按 FIFO(先进先出)顺序处理。

QueueItem 类型

TypeScript


type QueueItem<T> = {

  id: string; // Unique identifier for the queued task

  payload: T; // Data to pass to the callback function

  callback: keyof Agent; // Name of the method to call

  created_at: number; // Timestamp when the task was created

  retry?: RetryOptions; // Retry options for this task

};


核心方法

queue()

将任务加入队列以备后续执行。

TypeScript


async queue<T>(

  callback: keyof this,

  payload: T,

  options?: { retry?: RetryOptions }

): Promise<string>


参数:

  • callback - 处理任务时要调用的方法名
  • payload - 传给 callback 方法的数据
  • options - 可选配置:
    • retry - callback 执行的重试选项。如果 callback 抛出异常,会按指数退避重试。RetryOptions 的细节请参考重试

返回值: 该队列任务的唯一 ID

示例:

JavaScript


class MyAgent extends Agent {

  async processEmail(data) {

    // Process the email

    console.log(`Processing email: ${data.subject}`);

  }


  async onMessage(message) {

    // Queue an email processing task

    const taskId = await this.queue("processEmail", {

      email: "[email protected]",

      subject: "Welcome!",

    });


    console.log(`Queued task with ID: ${taskId}`);

  }

}


TypeScript


class MyAgent extends Agent {

  async processEmail(data: { email: string; subject: string }) {

    // Process the email

    console.log(`Processing email: ${data.subject}`);

  }


  async onMessage(message: string) {

    // Queue an email processing task

    const taskId = await this.queue("processEmail", {

      email: "[email protected]",

      subject: "Welcome!",

    });


    console.log(`Queued task with ID: ${taskId}`);

  }

}


dequeue()

按 ID 从队列中移除指定任务。该方法是同步的。

TypeScript


dequeue(id: string): void


参数:

  • id - 要移除的任务 ID

示例:

JavaScript


// Remove a specific task

agent.dequeue("abc123def");


TypeScript


// Remove a specific task

agent.dequeue("abc123def");


dequeueAll()

移除队列中的所有任务。该方法是同步的。

TypeScript


dequeueAll(): void


示例:

JavaScript


// Clear the entire queue

agent.dequeueAll();


TypeScript


// Clear the entire queue

agent.dequeueAll();


dequeueAllByCallback()

移除所有匹配特定 callback 方法的任务。该方法是同步的。

TypeScript


dequeueAllByCallback(callback: string): void


参数:

  • callback - callback 方法的名称

示例:

JavaScript


// Remove all email processing tasks

agent.dequeueAllByCallback("processEmail");


TypeScript


// Remove all email processing tasks

agent.dequeueAllByCallback("processEmail");


getQueue()

按 ID 获取一个特定的队列任务。该方法是同步的。

TypeScript


getQueue<T>(id: string): QueueItem<T> | undefined


参数:

  • id - 要获取的任务 ID

返回值: 带有解析后 payload 的 QueueItem,如果未找到则返回 undefined

返回前 payload 会自动从 JSON 解析。

示例:

JavaScript


const task = agent.getQueue("abc123def");

if (task) {

  console.log(`Task callback: ${task.callback}`);

  console.log(`Task payload:`, task.payload);

}


TypeScript


const task = agent.getQueue("abc123def");

if (task) {

  console.log(`Task callback: ${task.callback}`);

  console.log(`Task payload:`, task.payload);

}


getQueues()

获取所有 payload 中匹配指定 key-value 对的队列任务。该方法是同步的。

TypeScript


getQueues<T>(key: string, value: string): QueueItem<T>[]


参数:

  • key - 在 payload 中按此 key 过滤
  • value - 要匹配的值

返回值: 匹配的 QueueItem 对象数组

该方法会读取所有队列项,然后在内存中解析每个 payload 并检查指定 key 是否匹配 value。

示例:

JavaScript


// Find all tasks for a specific user

const userTasks = agent.getQueues("userId", "12345");


TypeScript


// Find all tasks for a specific user

const userTasks = agent.getQueues("userId", "12345");


队列处理工作流程

  1. 校验:调用 queue() 时,该方法会校验 callback 是否作为 agent 上的函数存在。
  2. 自动处理:入队后,系统会自动尝试 flush 队列。
  3. FIFO 顺序:任务按创建顺序(created_at 时间戳)处理。
  4. 上下文保留:每个排队任务都使用相同的 agent 上下文(connection、request、email)运行。
  5. 自动出队:成功执行的任务会自动从队列中移除。
  6. 错误处理:如果执行时 callback 方法不存在,会记录错误并跳过该任务。
  7. 持久化:任务存储在 cf_agents_queues SQL 表中,可在 agent 重启后保留。

队列回调方法

为排队任务定义回调方法时,必须遵循以下签名:

TypeScript


async callbackMethod(payload: unknown, queueItem: QueueItem): Promise<void>


示例:

JavaScript


class MyAgent extends Agent {

  async sendNotification(payload, queueItem) {

    console.log(`Processing task ${queueItem.id}`);

    console.log(

      `Sending notification to user ${payload.userId}: ${payload.message}`,

    );


    // Your notification logic here

    await this.notificationService.send(payload.userId, payload.message);

  }


  async onUserSignup(userData) {

    // Queue a welcome notification

    await this.queue("sendNotification", {

      userId: userData.id,

      message: "Welcome to our platform!",

    });

  }

}


TypeScript


class MyAgent extends Agent {

  async sendNotification(

    payload: { userId: string; message: string },

    queueItem: QueueItem<{ userId: string; message: string }>,

  ) {

    console.log(`Processing task ${queueItem.id}`);

    console.log(

      `Sending notification to user ${payload.userId}: ${payload.message}`,

    );


    // Your notification logic here

    await this.notificationService.send(payload.userId, payload.message);

  }


  async onUserSignup(userData: any) {

    // Queue a welcome notification

    await this.queue("sendNotification", {

      userId: userData.id,

      message: "Welcome to our platform!",

    });

  }

}


使用场景

后台处理

JavaScript


class DataProcessor extends Agent {

  async processLargeDataset(data) {

    const results = await this.heavyComputation(data.datasetId);

    await this.notifyUser(data.userId, results);

  }


  async onDataUpload(uploadData) {

    // Queue the processing instead of doing it synchronously

    await this.queue("processLargeDataset", {

      datasetId: uploadData.id,

      userId: uploadData.userId,

    });


    return { message: "Data upload received, processing started" };

  }

}


TypeScript


class DataProcessor extends Agent {

  async processLargeDataset(data: { datasetId: string; userId: string }) {

    const results = await this.heavyComputation(data.datasetId);

    await this.notifyUser(data.userId, results);

  }


  async onDataUpload(uploadData: any) {

    // Queue the processing instead of doing it synchronously

    await this.queue("processLargeDataset", {

      datasetId: uploadData.id,

      userId: uploadData.userId,

    });


    return { message: "Data upload received, processing started" };

  }

}


批处理

JavaScript


class BatchProcessor extends Agent {

  async processBatch(data) {

    for (const item of data.items) {

      await this.processItem(item);

    }

    console.log(`Completed batch ${data.batchId}`);

  }


  async onLargeRequest(items) {

    // Split large requests into smaller batches

    const batchSize = 10;

    for (let i = 0; i < items.length; i += batchSize) {

      const batch = items.slice(i, i + batchSize);

      await this.queue("processBatch", {

        items: batch,

        batchId: `batch-${i / batchSize + 1}`,

      });

    }

  }

}


TypeScript


class BatchProcessor extends Agent {

  async processBatch(data: { items: any[]; batchId: string }) {

    for (const item of data.items) {

      await this.processItem(item);

    }

    console.log(`Completed batch ${data.batchId}`);

  }


  async onLargeRequest(items: any[]) {

    // Split large requests into smaller batches

    const batchSize = 10;

    for (let i = 0; i < items.length; i += batchSize) {

      const batch = items.slice(i, i + batchSize);

      await this.queue("processBatch", {

        items: batch,

        batchId: `batch-${i / batchSize + 1}`,

      });

    }

  }

}


错误处理

使用内置的 retry 选项,而不是手动重新入队的逻辑。当 callback 抛出异常时,任务会按指数退避自动重试:

JavaScript


class RobustAgent extends Agent {

  async reliableTask(payload, queueItem) {

    console.log(`Processing task ${queueItem.id}`);

    const response = await fetch(payload.url);

    if (!response.ok) {

      throw new Error(`Request failed: ${response.status}`);

    }

  }


  async onMessage(connection, message) {

    await this.queue(

      "reliableTask",

      { url: "https://api.example.com/data" },

      {

        retry: {

          maxAttempts: 5,

          baseDelayMs: 500,

          maxDelayMs: 10_000,

        },

      },

    );

  }

}


TypeScript


class RobustAgent extends Agent {

  async reliableTask(payload: { url: string }, queueItem: QueueItem) {

    console.log(`Processing task ${queueItem.id}`);

    const response = await fetch(payload.url);

    if (!response.ok) {

      throw new Error(`Request failed: ${response.status}`);

    }

  }


  async onMessage(connection: Connection, message: WSMessage) {

    await this.queue(

      "reliableTask",

      { url: "https://api.example.com/data" },

      {

        retry: {

          maxAttempts: 5,

          baseDelayMs: 500,

          maxDelayMs: 10_000,

        },

      },

    );

  }

}


如果未提供 retry 选项,会使用 static options.retry 的类级默认值(3 次重试,100ms 基础延迟,3s 最大延迟)。完整细节请参考重试

最佳实践

  1. 保持 payload 小巧:payload 会被 JSON 序列化并存储到数据库。
  2. 幂等操作:把 callback 方法设计成可安全重试的。
  3. 错误处理:在 callback 方法中加入合适的错误处理。
  4. 监控:用日志跟踪队列处理。
  5. 清理:必要时定期清理已完成或失败的任务。

与其他特性的集成

队列系统可以与 Agent SDK 的其他特性配合:

  • 状态管理:在排队的回调中访问 agent 状态。
  • 调度:与 schedule() 结合,实现基于时间的队列处理。
  • 上下文:排队的任务保留原始请求上下文。
  • 数据库:与其他 agent 数据共用同一个数据库。

限制

  • 任务按顺序处理,不会并行。
  • 没有优先级机制(只有 FIFO)。
  • 队列处理在 agent 执行期间进行,而不是作为独立的后台 job。

队列(Queue)与调度(Schedule)对比

如果希望任务尽快按顺序执行,使用 队列;如果需要任务在特定时间或周期性运行,使用 调度

特性队列调度
执行时机立即(FIFO)指定时间或 cron
使用场景后台处理延迟或周期性任务
存储cf_agents_queues 表cf_agents_schedules 表

下一步

Agents API Agents SDK 的完整 API 参考。

调度任务 基于 cron 与延迟的时间调度。

运行 Workflows 持久化的多步骤后台处理。