AI API 流式响应实战:SSE 协议从原理到落地
做 AI 应用,流式响应(Streaming)几乎是标配。没人愿意等 10 秒钟看一个空白屏幕,然后”砰”一下全部输出。
本文从 SSE 协议原理讲起,给出 Python 和 TypeScript 的完整代码,帮你快速实现流式输出。
什么是 SSE?
Server-Sent Events 是一种单向通信协议,服务器主动向客户端推送数据。和 WebSocket 不同,SSE 只需要普通的 HTTP 连接,实现更简单。
在 AI API 场景下,流程是这样的:
- 客户端发送请求,设置
stream: true - 服务器逐步返回内容片段(chunk)
- 每个 chunk 以
data:前缀通过 SSE 发送 - 生成结束时发送
data: [DONE]
OpenAI 协议流式实现
cURL 快速测试
curl https://api.ofox.ai/v1/chat/completions \
-H "Authorization: Bearer $OFOX_API_KEY" \
-H "Content-Type: application/json" \
-N \
-d '{
"model": "openai/gpt-4o",
"messages": [{"role": "user", "content": "写一首关于编程的诗"}],
"stream": true
}'
注意 -N 参数,它禁用 cURL 的输出缓冲,让你实时看到每个 chunk。
Python 实现
from openai import OpenAI
client = OpenAI(
base_url="https://api.ofox.ai/v1",
api_key="your-api-key"
)
stream = client.chat.completions.create(
model="openai/gpt-4o",
messages=[{"role": "user", "content": "写一首关于编程的诗"}],
stream=True
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
print(content, end="", flush=True)
关键点:flush=True 确保每个 token 立即输出,不会被 Python 的输出缓冲吃掉。
TypeScript 实现
import OpenAI from 'openai'
const client = new OpenAI({
baseURL: 'https://api.ofox.ai/v1',
apiKey: 'your-api-key'
})
const stream = await client.chat.completions.create({
model: 'openai/gpt-4o',
messages: [{ role: 'user', content: '写一首关于编程的诗' }],
stream: true
})
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content
if (content) process.stdout.write(content)
}
Anthropic 协议流式实现
如果你用 Claude 系列模型,可以走 Anthropic 原生协议,获得更完整的功能支持。
import anthropic
client = anthropic.Anthropic(
base_url="https://api.ofox.ai/anthropic",
api_key="your-api-key"
)
with client.messages.stream(
model="anthropic/claude-sonnet-4.5",
max_tokens=1024,
messages=[{"role": "user", "content": "写一首关于编程的诗"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
流式 + 函数调用
流式响应和函数调用可以同时使用。当模型决定调用工具时,工具调用信息也会以 chunk 的形式返回:
stream = client.chat.completions.create(
model="openai/gpt-4o",
messages=[{"role": "user", "content": "今天北京天气怎么样?"}],
tools=[{
"type": "function",
"function": {
"name": "get_weather",
"description": "获取指定城市的天气",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"]
}
}
}],
stream=True
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.tool_calls:
print(f"调用工具: {delta.tool_calls[0].function}")
elif delta.content:
print(delta.content, end="", flush=True)
断线重连
网络不稳定时,流式连接可能中断。建议实现指数退避重连:
import time
def stream_with_retry(client, max_retries=3, **kwargs):
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(stream=True, **kwargs)
for chunk in stream:
yield chunk
return
except Exception as e:
if attempt < max_retries - 1:
wait = 2 ** attempt
print(f"\n连接中断,{wait}s 后重试...")
time.sleep(wait)
else:
raise e
最佳实践
- 始终设置超时 — 避免无限等待挂起
- 处理不完整的 chunk — 某些 chunk 可能没有 content 字段
- 前端用
flush— 确保内容即时显示给用户 - 实现重连 — 用指数退避策略处理网络波动
流式响应是提升 AI 应用体验的基础。掌握了 SSE 协议和错误处理,你的应用就能像 ChatGPT 一样逐字输出了。