Python基于Quart框架实现SSE数据传输
- 前言
- SSE简介
- 理论分析
- 代码实现
前言
在类似Chatgpt的应用中要实现数据的流式传输,模仿实现打字机效果,SSE是不二之选。传统的Flask框架不能满足异步处理的要求,没有异步处理就很难实现实时交互的需求,因此全新的Quart框架出现,但是Quart框架并没有原生好用的SSE类,官网只给出了如下的封装:
SSE简介
Server-Sent Events (SSE) 是一种基于 HTTP 的协议,服务器可以使用它来向客户端推送实时更新。在人工智能(AI)领域,SSE 的重要性主要体现在以下几个方面:
-
实时交互:在某些 AI 应用中,如聊天机器人、实时推荐系统等,需要服务器能够实时地向客户端推送新的信息或更新。SSE 提供了一种有效的方式来实现这种实时交互。
-
异步处理:AI 模型的计算过程可能会耗费一定的时间,特别是在处理大量数据或复杂模型时。通过 SSE,服务器可以在计算完成后立即将结果推送给客户端,而无需让客户端等待整个计算过程。
-
减少网络负载:相比于传统的轮询方式,SSE 可以大大减少网络请求的数量,从而降低服务器的负载。这对于处理大量实时数据的 AI 应用来说尤其重要。
-
易于实现和使用:SSE 是基于 HTTP 的,因此在大多数编程语言和框架中都很容易实现。此外,由于它是一种标准协议,因此客户端(如浏览器)通常也提供了对 SSE 的原生支持。
理论分析
Server-Sent Events (SSE) 是一种服务器向客户端推送数据的技术,它基于 HTTP 协议,允许服务器单向地向客户端发送事件。以下是 SSE 的数据传输格式和通信过程原理:
数据传输格式
SSE 使用纯文本格式发送数据,每个事件由一系列以换行符分隔的字段组成。每个字段都以字段名开始,后跟一个冒号和字段值。例如:
data: This is the first message.
data: This is the second message.
这里,data
是一个字段,表示事件的数据。你也可以使用其他字段,如 event
(指定事件类型)和 id
(指定事件 ID)。
通信过程原理
-
建立连接:客户端通过发送一个 GET 请求到服务器上的某个 URL 来建立连接。这个请求的
Accept
头部字段应该设置为text/event-stream
,以告诉服务器客户端希望使用 SSE。 -
发送事件:一旦连接建立,服务器就可以开始发送事件。每个事件都是一个独立的消息,由一系列字段组成。服务器可以在任何时候发送事件,不需要客户端的请求。
-
接收事件:客户端通过监听
message
事件来接收服务器发送的事件。每当服务器发送一个事件,message
事件就会被触发,事件的数据可以通过事件对象的data
属性获取。 -
断开和重新连接:如果连接被断开,客户端会自动尝试重新连接。你可以通过设置
retry
字段来控制重新连接的时间间隔。如果服务器发送了一个带有id
字段的事件,那么在重新连接时,客户端会发送一个Last-Event-ID
头部,值为最后一个接收到的事件的 ID,这样服务器就可以知道客户端接收到哪些事件。
因此需要定义如下四种内容:
- data:数据内容
- event:事件类型,一般是message
- id:事件编号
- retry:断开重传的时间
代码实现
from dataclasses import dataclass
from quart import make_response, json, Response
@dataclass
class ServerSentEvent:
"""
Server Sent Event服务器
服务端作用:
将文本数据变成数据流传向客户端
数据格式:
data: string 传输数据内容, 公有变量
event: string 传输事件类型, 私有变量
id: string 事件id, 私有变量
retry: int 断开重连时间, 私有变量
"""
_data: str
_event: str = None
_id: int = 0
_retry: int = 0
def encode(self):
"""
将数据转换成SSE的传输格式
"""
message = f"data: {self._data}"
if self._event is not None:
message = f"{message}\nevent: {self._event}"
if self._id is not None:
message = f"{message}\nid: {self._id}"
if self._retry is not None:
message = f"{message}\nretry: {self._retry}"
message = f"{message}\n\n"
return message
async def response_sse(chat_generator):
"""
发送请求的响应
chat_generator: generator
return: response
"""
async def send_events():
"""
将数据编码成SSE传输的格式进行传输
"""
# 遍历chat_generator获取其中的字符串内容
for data in chat_generator:
print("data in generator:"+data)
event = ServerSentEvent(data)
encoded_event = event.encode()
yield encoded_event
# 返回响应数据
response = await make_response(
send_events(),
{
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Transfer-Encoding': 'chunked',
'Connection': 'keep-alive',
},
)
response.timeout = None
return response