SSE介绍
在日常web开发中经常会遇到查看数据最新状态的业务场景,例如查看任务状态与日志内容等。比较场景的解决方案是轮循和SSE。
Server-Sent Events (SSE) 是一种允许服务器通过单向通道向客户端推送更新的技术。它基于HTTP协议,客户端使用一个标准的HTTP请求来连接到服务器,并保持这个连接打开,服务器可以在这个连接上持续地发送数据。
SSE的工作原理
- 客户端通过HTTP请求连接到服务器,并保持连接不断开。
- 服务器推送text/event-stream类型的数据给客户端。
- 客户端监听服务器发送的消息,并进行处理。
SSE对比WebSocket
通信方式
- SSE 是单向的,服务器向客户端推送消息,客户端不能主动向服务器发送数据。
- WebSocket 是双向的,允许客户端和服务器之间的双向通信。
协议
- SSE基于HTTP协议,使用
HTTP/1.1
或HTTP/2
,更适合需要基于现有HTTP/HTTPS基础设施的应用。 - WebSocket 是独立协议,虽然也从HTTP握手开始,但连接升级为WebSocket协议后不再使用HTTP,适合需要低延迟、双向通信的应用。
重连与状态管理
- SSE内置自动重连机制,如果连接断开,客户端会自动尝试重新连接。
- WebSocket需要额外的代码来处理断开后的重连机制。
SSE对比HTTP轮询
通信效率
- HTTP轮询客户端周期性地发送请求以检查服务器是否有新数据,这种方法在没有新数据时会产生大量无效请求,浪费带宽和资源。
- SSE可以保持一个持久连接,只在有新数据时推送消息,效率更高。
实时性
- HTTP轮询由于间隔请求,通常会有较大的延迟。
- SSE提供准实时的推送服务,延迟较小。
SSE的优势
- 简单实现: SSE是基于HTTP的标准,使用方便,客户端只需通过标准的JavaScript API就可以处理消息推送。
- 自动重连: SSE内置自动重连机制,开发者不需要额外编写代码来处理连接丢失问题。
- 消息订阅: SSE支持事件流的多类型消息订阅,客户端可以根据不同的事件类型处理不同的数据。
- 高兼容性: SSE使用HTTP协议,兼容性好,适用于各种网络环境。
SSE的局限性
- 单向通信: 只能从服务器推送消息到客户端,不能实现双向通信。如果需要双向通信,WebSocket更为合适。
- 浏览器兼容性: 虽然现代浏览器都支持SSE,但某些旧版本浏览器可能需要使用Polyfill等兼容解决方案。
- 消息丢失: 如果连接断开且未能自动重连,有可能会丢失未接收到的消息。
适用场景
SSE非常适合需要从服务器向客户端推送实时更新的应用场景,例如:
- 实时数据更新(例如新闻、股票行情)
- 服务器事件通知
- 轻量级的实时通信应用
在需要简单、可靠的服务器推送方案且无需双向通信的场景下,SSE是一个不错的选择。如果需要更复杂的通信机制,WebSocket可能更为合适。
Django后端SSE
创建sse应用
首先创建一个新的APP来实现SSE的功能。
python manage.py startapp sse
在settings.py的INSTALLED_APPS中添加新的APP。
INSTALLED_APPS = [
...,
"sse",
]
创建路由
接下来修改根路由,新增sse前缀的子路由配置,并在APP目录中,新建一个urls.py文件来处理指向这个应用的请求。
- DRF/urls.py
from django.contrib import admin
from django.urls import path, include, re_path
from django.views.static import serve
from api import views
from django.conf import settings
urlpatterns = [
……
# SSE
path('v1/sse/', include('sse.urls', namespace='sse'))
]
- sse/urls.py
from rest_framework import routers
from sse import views
from django.urls import path
app_name = "sse"
urlpatterns = [
# SSE
path('demo/', views.DemoAPIView.as_view())
]
router = routers.DefaultRouter()
urlpatterns += router.urls
创建视图函数
创建APIView视图,模拟后端实时触发消息通知。需要注意的是后端发送符合SSE协议的消息必须满足以下要求:
- 消息格式正确。SSE 要求每个消息必须以
data:
开头,且后面有一个换行符分隔。例如:
data: This is a message
data: {"message": "This is a message"}
- 响应内容类型设置:服务器响应的
Content-Type
必须设置为text/event-stream
,否则浏览器和客户端会将其视为普通的 HTTP 响应。 - 后端跨域配置:如果你的请求是跨域的,确保后端 CORS 配置允许
EventSource
的跨域请求,具体可参考文档:跨域访问-崔亮的博客 (cuiliangblog.cn)
通常情况下我们都会用异步实现SSE,所以生成器event_stream()也得是异步的。如果我们需要处理长时间的任务,那么可以写一个异步函数并通过yield返回状态。
- sse/views.py
import asyncio
import random
import time
from django.http import StreamingHttpResponse
from loguru import logger
from rest_framework.views import APIView
class DemoAPIView(APIView):
"""
SSE示例数据
"""
@staticmethod
def get(request):
# 定义一个生成器,持续输出数据流
async def event_stream():
while True:
value = random.randint(0, 5)
await asyncio.sleep(value) # 模拟随机延时
msg = 'data: {{"message": "now time:{0} value:{1}"}}\n\n'.format(time.strftime('%Y-%m-%d %H:%M:%S'), value)
yield msg
# 返回 StreamingHttpResponse,Content-Type 设置为 text/event-stream
response = StreamingHttpResponse(event_stream(), content_type='text/event-stream',
headers={'Cache-Control': 'no-cache', 'Connection': 'keep-alive'})
return response
启动服务
为了让项目异步运行,我们需要使用ASGI来启动项目,ASGI配置可参考文档:Django新特性汇总-崔亮的博客 (cuiliangblog.cn)。
uvicorn DRF.asgi:application --reload
访问验证
接下来使用浏览器访问api接口地址,浏览器会持续不断的接收响应类型为text/event-stream的最新实时数据并打印。
Nginx中间件SSE
为了保证SSE实时推送,需要对Nginx配置确保禁用缓冲区并确保Nginx不会中断SSE流,确保Nginx可以正确处理SSE连接。
基础反向代理配置
首先配置一个基础的Nginx反向代理,用于反向代理Django应用。
server {
listen 80;
server_name ~^.*$;
location / {
proxy_pass http://drf:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
为SSE配置特定的location
为了让Nginx正确处理SSE,需要禁用Nginx的响应缓冲区,并启用HTTP/1.1协议支持(SSE依赖此协议的持续连接功能)。
server {
listen 80;
server_name ~^.*$;
location /v1/sse/ {
# 代理到Django应用
proxy_pass http://drf:8000;
# 禁用响应缓冲,确保数据流立即发送到客户端
proxy_buffering off;
# 允许Nginx处理的最大响应头的大小(防止头部被缓存)
proxy_cache_bypass $http_upgrade;
# HTTP/1.1 协议支持,防止默认使用HTTP/1.0
proxy_http_version 1.1;
# 防止Nginx在长连接时设置 "Connection: close"
proxy_set_header Connection '';
# 设置请求头
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location / {
# 其他反向代理配置
……
}
}
通过这个配置,Nginx可以正确处理和代理Django Rest Framework中的SSE流,并且不会因为默认的缓冲机制导致SSE数据延迟。这样,前端Vue.js应用就可以实时接收来自后端的SSE推送消息。
VUE前端SSE
前端实现SSE可以使用浏览器的原生API——EventSource来实现。但是EventSource并不支持直接设置自定义的 HTTP 请求头,如 Accept 或 Authorization。如果使用需要自定义请求头,推荐使用 @microsoft/fetch-event-source。
这个库基于 Fetch API 实现,提供了比原生 EventSource 更灵活的控制,可以自定义 headers 和其他配置。并内置了自动重连机制,可以轻松地实现高级的错误处理逻辑。如果连接断开,浏览器会自动重试。
仓库地址:Azure/fetch-event-source: A better API for making Event Source requests, with all the features of fetch() (github.com)
封装请求函数
- src/api/home.js
// 获取SSE数据
export function getSSE() {
return import.meta.env.VITE_APP_BASE_URL + '/sse/demo/'
}
配置路由
- src/router/index.js
import {createRouter, createWebHistory} from 'vue-router';
const router = createRouter({
history: createWebHistory(), //h5模式createWebHistory
routes: [
……
{
path: '/test',
component: () => import('@/views/Test.vue'),
meta: {
title: '测试',
}
}
]
})
// 路由导航守卫
router.beforeEach((to, from, next) => {
document.title = to.meta.title
next()
})
export default router;
创建测试页
在 @microsoft/fetch-event-source 中,主要使用 fetchEventSource 函数来创建一个新的 EventSource 连接。这个函数接受一个 URL 参数,以及一个配置对象,其中可以包含一些选项,如请求方法、请求头、请求体等。当服务器向客户端推送事件时,可以通过 onmessage 回调函数来处理这些事件。此外,还可以提供 onerror 和 onclose 回调函数来处理连接错误和关闭事件。
- src/views/Test.vue
<template>
<h1>这是测试页</h1>
<p v-for="item in data">
{{ item.message }}
</p>
</template>
<script setup>
import {ref, onMounted, onUnmounted} from "vue";
import {fetchEventSource} from '@microsoft/fetch-event-source';
import {getSSE} from "@/api/home";
const data = ref([])
const connectSSE = async () => {
const url = getSSE()
console.log(url)
await fetchEventSource(url, {
method: 'GET',
headers: {
'Accept': '*/*'
},
onmessage: async (event) => {
console.log(event)
console.log(JSON.parse(event.data))
data.value.push(JSON.parse(event.data))
},
onerror(err) {
console.error('Error:', err);
if (err.status === 500) {
// 服务器错误时重新连接
setTimeout(() => connectSSE(), 5000);
}
},
onopen(response) {
if (response.ok) {
console.log('Connection start');
}
},
onclose() {
console.log('Connection close');
}
})
}
onMounted(() => {
connectSSE();
});
onUnmounted(() => {
});
</script>
<style scoped lang="scss">
</style>
访问验证
接下来访问vue测试页面路由,查看控制台信息验证,可以持续打印最新的数据流内容。
查看更多
微信公众号
微信公众号同步更新,欢迎关注微信公众号《崔亮的博客》第一时间获取最近文章。
博客网站
崔亮的博客-专注devops自动化运维,传播优秀it运维技术文章。更多原创运维开发相关文章,欢迎访问https://www.cuiliangblog.cn