HTTP 与 Server-Sent Events
Agent 可以处理 HTTP 请求,并使用 Server-Sent Events (SSE) 流式返回响应。本页介绍 onRequest 方法和 SSE 的常见用法。
处理 HTTP 请求
定义 onRequest 方法来处理发往你 Agent 的 HTTP 请求:
JavaScript
import { Agent } from "agents";
export class APIAgent extends Agent {
async onRequest(request) {
const url = new URL(request.url);
// Route based on path
if (url.pathname.endsWith("/status")) {
return Response.json({ status: "ok", state: this.state });
}
if (url.pathname.endsWith("/action")) {
if (request.method !== "POST") {
return new Response("Method not allowed", { status: 405 });
}
const data = await request.json();
await this.processAction(data.action);
return Response.json({ success: true });
}
return new Response("Not found", { status: 404 });
}
async processAction(action) {
// Handle the action
}
}
Explain Code
TypeScript
import { Agent } from "agents";
export class APIAgent extends Agent {
async onRequest(request: Request): Promise<Response> {
const url = new URL(request.url);
// Route based on path
if (url.pathname.endsWith("/status")) {
return Response.json({ status: "ok", state: this.state });
}
if (url.pathname.endsWith("/action")) {
if (request.method !== "POST") {
return new Response("Method not allowed", { status: 405 });
}
const data = await request.json<{ action: string }>();
await this.processAction(data.action);
return Response.json({ success: true });
}
return new Response("Not found", { status: 404 });
}
async processAction(action: string) {
// Handle the action
}
}
Explain Code
Server-Sent Events (SSE)
SSE 让你能够通过一条长连 HTTP 连接持续地把数据推给客户端。这非常适合 AI 模型增量生成 token 的场景。
手写 SSE
使用 ReadableStream 手动构造 SSE 流:
JavaScript
export class StreamAgent extends Agent {
async onRequest(request) {
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
// Send events
controller.enqueue(encoder.encode("data: Starting...\n\n"));
for (let i = 1; i <= 5; i++) {
await new Promise((r) => setTimeout(r, 500));
controller.enqueue(encoder.encode(`data: Step ${i} complete\n\n`));
}
controller.enqueue(encoder.encode("data: Done!\n\n"));
controller.close();
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
}
Explain Code
TypeScript
export class StreamAgent extends Agent {
async onRequest(request: Request): Promise<Response> {
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
// Send events
controller.enqueue(encoder.encode("data: Starting...\n\n"));
for (let i = 1; i <= 5; i++) {
await new Promise((r) => setTimeout(r, 500));
controller.enqueue(encoder.encode(`data: Step ${i} complete\n\n`));
}
controller.enqueue(encoder.encode("data: Done!\n\n"));
controller.close();
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
}
Explain Code
SSE 消息格式
SSE 消息遵循特定的格式:
data: your message here\n\n
你也可以包含事件类型和 ID:
event: update\n
id: 123\n
data: {"count": 42}\n\n
与 AI SDK 配合
AI SDK ↗ 内置了 SSE 流式输出能力:
JavaScript
import { Agent } from "agents";
import { streamText } from "ai";
import { createWorkersAI } from "workers-ai-provider";
export class ChatAgent extends Agent {
async onRequest(request) {
const { prompt } = await request.json();
const workersai = createWorkersAI({ binding: this.env.AI });
const result = streamText({
model: workersai("@cf/zai-org/glm-4.7-flash"),
prompt: prompt,
});
return result.toTextStreamResponse();
}
}
Explain Code
TypeScript
import { Agent } from "agents";
import { streamText } from "ai";
import { createWorkersAI } from "workers-ai-provider";
interface Env {
AI: Ai;
}
export class ChatAgent extends Agent<Env> {
async onRequest(request: Request): Promise<Response> {
const { prompt } = await request.json<{ prompt: string }>();
const workersai = createWorkersAI({ binding: this.env.AI });
const result = streamText({
model: workersai("@cf/zai-org/glm-4.7-flash"),
prompt: prompt,
});
return result.toTextStreamResponse();
}
}
Explain Code
连接处理
SSE 连接可能会持续很久。要优雅地处理客户端断开:
-
持久化进度 —— 写入 agent state,让客户端可以续传
-
使用 agent 路由 —— 客户端可以 重新连接到同一个 agent 实例,不需要会话存储
-
没有超时限制 —— Cloudflare Workers 对 SSE 响应时长基本没有有效限制
JavaScript
export class ResumeAgent extends Agent {
async onRequest(request) {
const url = new URL(request.url);
const lastEventId = request.headers.get("Last-Event-ID");
if (lastEventId) {
// Client is resuming - send events after lastEventId
return this.resumeStream(lastEventId);
}
return this.startStream();
}
async startStream() {
// Start new stream, saving progress to this.state
}
async resumeStream(fromId) {
// Resume from saved state
}
}
Explain Code
TypeScript
export class ResumeAgent extends Agent {
async onRequest(request: Request): Promise<Response> {
const url = new URL(request.url);
const lastEventId = request.headers.get("Last-Event-ID");
if (lastEventId) {
// Client is resuming - send events after lastEventId
return this.resumeStream(lastEventId);
}
return this.startStream();
}
async startStream(): Promise<Response> {
// Start new stream, saving progress to this.state
}
async resumeStream(fromId: string): Promise<Response> {
// Resume from saved state
}
}
Explain Code
WebSockets vs SSE
| 特性 | WebSockets | SSE |
|---|---|---|
| 方向 | 双向 | 仅服务端 → 客户端 |
| 协议 | ws:// / wss:// | HTTP |
| 二进制数据 | 支持 | 不支持(仅文本) |
| 重连 | 手动 | 自动(浏览器内置) |
| 适用场景 | 交互式应用、聊天 | 流式响应、服务端推送通知 |
建议:交互式应用用 WebSocket;流式 AI 响应或服务端推送通知用 SSE。
WebSocket 相关文档参见 WebSockets。
下一步
WebSockets 双向实时通信。
State management 持久化流的进度和 agent 状态。
Build a chat agent 用 AI 聊天进行流式响应。