概述
服务端发送事件(Server-Sent Events, SSE)是一种允许服务器向客户端推送实时更新的技术。SSE 提供了一种单向的通信通道,服务器可以持续地向客户端发送数据,而不需要客户端频繁发起请求。这对于需要实时更新的应用场景非常有用。
流式传输的特点是将数据逐步传输给客户端,而不需要等待完整的响应生成。这意味着在传输过程中,数据会逐步发送给客户端,而不是一次性发送所有数据,对于基于文本的AI对话来说,这意味着每个单词或短语可以随着模型预测它们时即时显示出来,从而营造出一种更加自然和动态的交流体验。
最近在对接大模型对话生成接口,查找官方文档中并没有找到明确的实现说明,本文根据 Uniapp 及微信小程序开发文档,基于 uni.request 实现了一个简单的SSE客户端。
准备
服务端已提供SSE接口,可通过 Apifox 直接访问进行测试。
Uniapp 客户端实现
要实现在微信小程序中接收 SSE 流式响应,我们需要做几个关键步骤:
- 配置 HTTP 请求:设置适当的请求头和参数,以确保服务器知道我们期望的是流式响应。
- 处理分块数据:由于 SSE 是分块传输的,我们需要监听每个数据块,并适当地解析它们。
- 错误和完成处理:定义当遇到错误或完成时的行为。
下面是一个使用 uni.request API 实现 SSE 的例子:
let buffer = ''
function decode(data: ArrayBuffer): string {
// 根据协议对数据进行解析,省略...
// 注意数据可能是不连续的,需要通过 buffer 进行拼接
}
function streamPost(url, data, onData, onError = null, onComplete = null) {
function onChunkReceived(res) {
onData(decode(res.data))
}
function onHeadersReceived(res) {
console.log('onHeadersReceived', res)
}
const requestTask = uni.request({
url: baseUrl + apiPath + url,
method: 'POST',
header: {
Accept: 'text/event-stream', // 确保服务器知道我们期望的是流式响应
Authorization: uni.getStorageSync('token'),
// ...其他参数
},
data,
enableChunked: true, // onChunkReceived, 否则走success()
responseType: 'arraybuffer',
success: (res) => {
console.log('Data received:', res.data) // 开启 enableChunked 时仅最后一次会走这个
},
fail: (error) => { // 错误处理
if (onError) {
onError(error)
}
console.error('SSE failed:', error)
},
complete: () => { // 完成接收
if (onComplete) {
onComplete()
}
if (onHeadersReceived) {
requestTask?.offHeadersReceived(onHeadersReceived)
}
if (onChunkReceived) {
// @ts-expect-error uni-app types lost
requestTask?.offChunkReceived(onChunkReceived)
}
},
})
if (onHeadersReceived) {
requestTask.onHeadersReceived(onHeadersReceived)
}
if (onChunkReceived) {
// @ts-expect-error uni-app types lost
requestTask.onChunkReceived(onChunkReceived) // 注册数据接收响应函数
}
return requestTask // 外部可通过 requestTask.abort(); 主动结束
}
在 nginx 中开启transfer_encoding, 同时关闭缓存 proxy_buffering。
location /ai/chat/stream {
proxy_set_header Transfer-Encoding "";
chunked_transfer_encoding on;
proxy_buffering off;
}
总结
- 开启:
enableChunked: true
- 设置请求 Header:
Accept: 'text/event-stream'
- 注册数据接收响应函数:
requestTask.onChunkReceived(onChunkReceived)
- 主动结束:
requestTask.abort()
- 分块数据解析:
decode()
通过以上步骤,我们成功地在 UniApp 中实现了 SSE 流式响应,增强了应用程序的实时交互能力。希望这篇文章能为你在 UniApp 中集成实时数据更新功能提供有价值的参考。
参考
- uniapp api 文档: https://uniapp.dcloud.net.cn/api/request/request.html
- 小程序开发文档:wx.request https://developers.weixin.qq.com/miniprogram/dev/api/network/request/RequestTask.onChunkReceived.html
欢迎合作
最近业余在做的个人项目:https://www.aaronzzh.cn
如果这篇文章对您有所帮助,欢迎点赞、分享和留言,让更多的人受益。感谢您的细心阅读,如果发现了任何错误或需要补充的地方,请随时告诉我,我会尽快处理 _