概述:
- SSE(Server-Sent Events)协议是一种允许服务器向客户端实时推送更新的技术,基于HTTP协议,常用于实时数据推送
- 特点:
- 单向通信:服务器向客户端推送数据,客户端无法发送数据。
- 基于HTTP:使用简单,无需额外协议。
- 长连接:保持连接,服务器可随时推送数据。
- 自动重连:连接中断时,客户端会自动尝试重连。
- 数据以文本格式发送,每条消息以
\n\n
结尾。
SSE和WebSocket区别
特性 | SSE | WebSocket |
---|---|---|
通信方向 | 单向(服务器 → 客户端) | 双向(服务器 ↔ 客户端) |
协议基础 | 基于 HTTP | 基于 WebSocket 协议 |
数据格式 | 文本(data: <message>\n\n ) | 文本或二进制 |
自动重连 | 内置支持 | 需手动实现 |
性能 | 适合低频、轻量级推送 | 适合高频、双向通信 |
浏览器兼容性 | 普遍支持,部分老的IE不支持 | 普遍支持 |
使用场景 | 实时通知、数据展示、进度更新等 | 实时聊天、游戏、协同编辑 |
实现复杂度 | 简单 | 较复杂 |
简单实现
- 下面是fastapi写的一个demo
import asyncio
from fastapi import FastAPI
from starlette.responses import StreamingResponse
app = FastAPI()
# 模拟一个异步数据源
async def generate_data():
for i in range(10):
await asyncio.sleep(1) # 模拟延迟
yield f"data: Message {i}\n\n" # SSE 格式的数据
yield "data: Done\n\n" # 结束标志
# SSE 接口
@app.get("/sse")
async def sse():
return StreamingResponse(
generate_data(),
media_type="text/event-stream", # 设置媒体类型为 SSE
)
用postman直接调用接口http://127.0.0.1:8000/sse
如果打开相应头,会发现里面的content-type:text/event-stream
,标记了返回数据为流式数据
字段含义
-
SSE 数据以纯文本格式传输,每条消息由若干字段组成,字段之间用换行符(
\n
)分隔,每条消息以两个换行符(\n\n
)结尾。常用字段包括:-
data:
:消息内容(必填)。 -
event:
:事件类型(可选)。 -
id:
:消息 ID(可选)。 -
retry:
:重连时间(可选)。
-
-
将消息内容改成如下,真正的消息只有data中的内容
async def generate_data():
for i in range(10):
await asyncio.sleep(1) # 模拟延迟
# 发送消息,包含 retry 字段(设置重连时间为 3000 毫秒)
yield f"event: update\nid: {i}\nretry: 3000\ndata: Message {i}\n\n"
yield "data: END\n\n" # 结束标志
接入deepseek
如果你用过AI大模型,不难发现AI回答问题一般都是流式输出的,不断接受消息并进行Markdown的渲染,我这里接入deepseek并简单做了一个接口
import asyncio
import os
import replicate
from fastapi import FastAPI
from starlette.responses import StreamingResponse
app = FastAPI()
async def deepseek_generate_data(question: str):
os.environ["REPLICATE_API_TOKEN"] = "your_secret_key"
input = {
"top_p": 1,
"prompt": question,
"max_tokens": 20480,
"temperature": 0.1,
"presence_penalty": 0,
"frequency_penalty": 0
}
try:
for event in replicate.stream("deepseek-ai/deepseek-r1", input=input):
yield f"retry: 5000\ndata: {event}\n\n"
yield "data: END\n\n" # 结束标志
except Exception as e:
yield f"data: {str(e)}\n\n"
yield "data: END\n\n" # 结束标志
# SSE 接口
@app.get("/sse")
async def sse(question: str):
return StreamingResponse(
deepseek_generate_data(question),
media_type="text/event-stream", # 设置媒体类型为 SSE
)