AI API 流式响应实战:SSE 协议从原理到落地

做 AI 应用,流式响应(Streaming)几乎是标配。没人愿意等 10 秒钟看一个空白屏幕,然后”砰”一下全部输出。

本文从 SSE 协议原理讲起,给出 Python 和 TypeScript 的完整代码,帮你快速实现流式输出。

什么是 SSE?

Server-Sent Events 是一种单向通信协议,服务器主动向客户端推送数据。和 WebSocket 不同,SSE 只需要普通的 HTTP 连接,实现更简单。

在 AI API 场景下,流程是这样的:

  1. 客户端发送请求,设置 stream: true
  2. 服务器逐步返回内容片段(chunk)
  3. 每个 chunk 以 data: 前缀通过 SSE 发送
  4. 生成结束时发送 data: [DONE]

OpenAI 协议流式实现

cURL 快速测试

curl https://api.ofox.ai/v1/chat/completions \
  -H "Authorization: Bearer $OFOX_API_KEY" \
  -H "Content-Type: application/json" \
  -N \
  -d '{
    "model": "openai/gpt-4o",
    "messages": [{"role": "user", "content": "写一首关于编程的诗"}],
    "stream": true
  }'

注意 -N 参数,它禁用 cURL 的输出缓冲,让你实时看到每个 chunk。

Python 实现

from openai import OpenAI

client = OpenAI(
    base_url="https://api.ofox.ai/v1",
    api_key="your-api-key"
)

stream = client.chat.completions.create(
    model="openai/gpt-4o",
    messages=[{"role": "user", "content": "写一首关于编程的诗"}],
    stream=True
)

for chunk in stream:
    content = chunk.choices[0].delta.content
    if content:
        print(content, end="", flush=True)

关键点:flush=True 确保每个 token 立即输出,不会被 Python 的输出缓冲吃掉。

TypeScript 实现

import OpenAI from 'openai'

const client = new OpenAI({
    baseURL: 'https://api.ofox.ai/v1',
    apiKey: 'your-api-key'
})

const stream = await client.chat.completions.create({
    model: 'openai/gpt-4o',
    messages: [{ role: 'user', content: '写一首关于编程的诗' }],
    stream: true
})

for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content
    if (content) process.stdout.write(content)
}

Anthropic 协议流式实现

如果你用 Claude 系列模型,可以走 Anthropic 原生协议,获得更完整的功能支持。

import anthropic

client = anthropic.Anthropic(
    base_url="https://api.ofox.ai/anthropic",
    api_key="your-api-key"
)

with client.messages.stream(
    model="anthropic/claude-sonnet-4.5",
    max_tokens=1024,
    messages=[{"role": "user", "content": "写一首关于编程的诗"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

流式 + 函数调用

流式响应和函数调用可以同时使用。当模型决定调用工具时,工具调用信息也会以 chunk 的形式返回:

stream = client.chat.completions.create(
    model="openai/gpt-4o",
    messages=[{"role": "user", "content": "今天北京天气怎么样?"}],
    tools=[{
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "获取指定城市的天气",
            "parameters": {
                "type": "object",
                "properties": {"city": {"type": "string"}},
                "required": ["city"]
            }
        }
    }],
    stream=True
)

for chunk in stream:
    delta = chunk.choices[0].delta
    if delta.tool_calls:
        print(f"调用工具: {delta.tool_calls[0].function}")
    elif delta.content:
        print(delta.content, end="", flush=True)

断线重连

网络不稳定时,流式连接可能中断。建议实现指数退避重连:

import time

def stream_with_retry(client, max_retries=3, **kwargs):
    for attempt in range(max_retries):
        try:
            stream = client.chat.completions.create(stream=True, **kwargs)
            for chunk in stream:
                yield chunk
            return
        except Exception as e:
            if attempt < max_retries - 1:
                wait = 2 ** attempt
                print(f"\n连接中断,{wait}s 后重试...")
                time.sleep(wait)
            else:
                raise e

最佳实践

  1. 始终设置超时 — 避免无限等待挂起
  2. 处理不完整的 chunk — 某些 chunk 可能没有 content 字段
  3. 前端用 flush — 确保内容即时显示给用户
  4. 实现重连 — 用指数退避策略处理网络波动

流式响应是提升 AI 应用体验的基础。掌握了 SSE 协议和错误处理,你的应用就能像 ChatGPT 一样逐字输出了。


相关阅读