背景
自从2022年底chatgpt上线后,sse就进入了大众的视野,之前是谁知道这玩意是什么?但是打字机的效果看起来是真的很不错,一度吸引了很多人的趋之若鹜,当然了这个东西的确挺好用,而且实现很简单,之前我用python的demo讲了一下SSE的概念,看起来有很多人看,但是并没有说明白这个原理,这次再彻底把这个原理给说明白,而且我发现通过node.js 的Express框架来说明这个概念更加简洁,所以今天就用Express框架来说明SSE概念,这样对前端同学更加友好。
之前的Python SSE的文章:
https://blog.csdn.net/wangsenling/article/details/130911465
https://blog.csdn.net/wangsenling/article/details/130490769
回看协议演化历史
学过计算机网络的人都知道socket连接就是全双工的,怎么到http协议这里就变成了单向的了?且连一次就over了,这其实是浏览器编程者故意而为,另外服务端根本不保留会话信息,处理完成直接就把这次请求的相关信息从内存清理了,所以才出现这种单向沟通,后来网络资源越来越便宜,才逐渐的走向了SSE和Websocket
HTTP 协议设计背景
HTTP 是在 1990 年代早期互联网环境中设计出来的,当时网络资源(如带宽和连接数)非常有限。基于这种限制,HTTP 协议被设计为无状态、短连接的模型,以尽可能地节省服务器和客户端的资源。这意味着:
-
请求-响应模型:HTTP 是典型的请求-响应协议,客户端发起请求,服务器处理并返回响应,随后连接立即关闭。这样设计的目的是为了快速释放资源,特别是服务器的连接数。
-
短连接:早期的 HTTP/1.0 协议默认每个请求完成后都立即关闭连接,这种方式减少了保持大量长连接对服务器造成的负担,但也带来了效率上的问题,特别是在需要频繁通信的情况下。
-
无状态:HTTP 的无状态设计使得每个请求都是独立的,服务器不必保留任何会话信息,从而进一步降低了对服务器资源的需求。这在资源稀缺的互联网早期非常重要。
HTTP 的局限性
随着网络应用的复杂性增加,HTTP 的短连接和无状态特性逐渐暴露出一些问题,特别是在需要实时更新或双向通信的应用场景中,例如:
-
实时聊天、在线协作工具、股票行情更新、通知系统等。
-
在这些场景下,HTTP 的传统请求-响应模型显得过于笨重,因为每次更新都需要客户端主动发起新的请求。
为了弥补这些局限,出现了一些技术,例如:
-
轮询(Polling):客户端定期向服务器发送请求,以获取最新的数据。这种方法虽然可以在一定程度上实现实时更新,但效率较低,因为大量请求可能只会获得很少或没有新数据。
-
长轮询(Long Polling):客户端发送请求后,服务器保持连接打开直到有新数据,然后返回响应。这减少了一些不必要的请求,但仍然需要不断建立新连接,并且是非标准化的。
SSE 的诞生
为了解决 HTTP 请求-响应模型中的不足,SSE 被引入。SSE 是基于 HTTP 协议之上的一种扩展,它允许服务器在一个长时间保持的连接中,不断地向客户端推送事件流。与 WebSocket 相比,SSE 更加轻量,并且完全基于 HTTP 协议,这使它具有很好的兼容性。
-
长连接保持:SSE 通过建立一个 HTTP 长连接,使得服务器可以在连接保持的状态下,推送多个事件。这样一来,服务器可以在有新数据时立即推送给客户端,而客户端不必频繁发起新的请求。
-
单向通信:SSE 仅允许服务器向客户端发送数据,这与 WebSocket 的全双工通信形成对比。虽然通信方式有限,但它简化了很多实时更新场景中的开发工作,并且避免了 WebSocket 的复杂性。
-
兼容 HTTP:由于 SSE 基于 HTTP 协议,因此它能够很好地与现有的 HTTP 基础设施(如代理、防火墙等)配合工作,不容易遇到兼容性问题。
为什么 HTTP 自动关闭连接?
你提到的 HTTP 自动关闭连接是基于早期互联网的设计初衷——节省资源。在那个时代,保持长连接对于资源有限的服务器来说是一个很大的负担:
-
资源节约:每个连接都占用系统的文件描述符、内存和 CPU 资源。如果每个客户端保持长时间连接,服务器很容易耗尽这些资源。因此,HTTP 通过每次请求结束后立即关闭连接来减少服务器负担。
-
提高并发能力:通过让连接快速关闭,服务器可以同时处理更多的客户端请求,提高并发能力。
WebSocket 与 HTTP 的不同设计哲学
正如你提到的,WebSocket 是长连接全双工的,它在设计上更类似于低层的 TCP 套接字通信。这使得 WebSocket 更适合需要实时双向通信的应用,比如聊天、在线游戏等。
-
WebSocket 提供的长连接全双工通信:适用于需要双向持续交互的场景,但它不是基于 HTTP 的标准模型,因此需要在协议层上进行升级。
相比之下,HTTP 的设计初衷是基于短连接的请求-响应模型,更适合传统的静态内容传输场景。在现代互联网中,虽然有了更高效的 WebSocket,但 HTTP 的请求-响应模型仍然有其合理性,尤其在静态资源加载、API 请求等方面。
总结
SSE 的出现正是为了解决 HTTP 的局限性,提供一种简单、基于 HTTP 协议的长连接机制,适用于实时更新但无需复杂双向通信的场景。HTTP 早期设计为短连接的原因在于资源有限,自动关闭连接是为了节省资源。而像 WebSocket 这样的长连接协议则适用于实时性和双向通信要求较高的场景,因此两者都有各自的应用场景。
SSE就是HTTP协议下的一个协议补充
Demo效果
Gitee 源码地址:
https://gitee.com/sen2020/express-test
我们来看request和response的格式,请求核心就两个,返回核心是三个,不缓存,保持存活,事件流类型,只要发出去的http请求带着这些header参数,那么SSE协议连接就建立了,非常的简单。
结合之前的文档,你就能理解了,fetch只要拼凑出来这样header就可以建立SSE连接,不需要什么特殊的处理,几乎任何一个Web框架都支持,接下来用node.js的Express实现一个打字机的小demo
服务端代码
import express from 'express';
import fs from 'fs';
import readline from 'readline';
import { EventEmitter } from 'events';
import path from 'path';
import { fileURLToPath } from 'url';
const app = express();
const port = 3000;
// 获取当前文件的路径
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
// 创建事件管理器
const eventEmitter = new EventEmitter();
// 设置静态文件目录来提供前端页面和资源
app.use(express.static(path.join(__dirname, 'public')));
// SSE 路由,根据 ISBN 请求电子书
app.get('/events', (req, res) => {
const { isbn } = req.query;
if (!isbn) {
res.status(400).send('Missing ISBN');
return;
}
// 设置 headers 确保保持长连接
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 确保每个连接只会被注册一次,避免重复订阅同一事件
const onNewData = (data) => {
res.write(`data: ${JSON.stringify(data)}\n\n`);
};
eventEmitter.on(isbn, onNewData);
// 当客户端断开时,移除事件监听
req.on('close', () => {
eventEmitter.removeListener(isbn, onNewData);
res.end();
});
// 开始读取并发送字符流
readFileCharacterByCharacter(isbn);
});
// 从文件中逐字符读取数据并触发事件
const readFileCharacterByCharacter = async (isbn) => {
const filePath = path.join(__dirname, 'books', `${isbn}.txt`);
// 如果文件不存在,则返回错误
if (!fs.existsSync(filePath)) {
eventEmitter.emit(isbn, { message: `Error: Book with ISBN ${isbn} not found.` });
return;
}
const fileStream = fs.createReadStream(filePath);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity,
});
for await (const line of rl) {
for (const char of line) {
await new Promise(resolve => setTimeout(resolve, 50)); // 模拟打字机效果,每50ms发送一个字符
eventEmitter.emit(isbn, { message: char });
}
eventEmitter.emit(isbn, { message: '\n' });
}
// 告诉前端书本内容已经结束
eventEmitter.emit(isbn, { message: 'End of book.' });
};
app.listen(port, () => {
console.log(`SSE server and static files serving at http://localhost:${port}`);
});
html代码
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Book Reader</title>
<link rel="stylesheet" href="styles.css">
</head>
<body>
<div class="container">
<h1>Book Reader</h1>
<div class="form-container">
<input type="text" id="isbn" value="1234567890" placeholder="Enter ISBN">
<button id="start">Start Reading</button>
</div>
<div id="content"></div>
</div>
<script src="scripts.js"></script>
</body>
</html>
前端JS代码
document.getElementById('start').addEventListener('click', () => {
const isbn = document.getElementById('isbn').value.trim();
if (!isbn) {
alert('Please enter a valid ISBN');
return;
}
const contentDiv = document.getElementById('content');
contentDiv.innerHTML = ''; // 清空之前的内容
// 如果已经有一个 EventSource,先关闭它
if (window.eventSource) {
window.eventSource.close();
}
// 创建一个新的 EventSource 连接
window.eventSource = new EventSource(`/events?isbn=${isbn}`);
let currentLine = ''; // 用于累积当前行的字符
let paragraph = document.createElement('p'); // 创建一个段落元素
contentDiv.appendChild(paragraph); // 初始添加一个段落
window.eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
const char = data.message;
// 检查是否接收到 "End of book." 消息
if (char === 'End of book.') {
paragraph = document.createElement('p');
paragraph.textContent = 'End of book.';
contentDiv.appendChild(paragraph);
// 关闭 EventSource 连接
window.eventSource.close();
return; // 停止进一步处理
}
// 如果是换行符,渲染当前行并创建新的段落
if (char === '\n') {
paragraph = document.createElement('p'); // 创建新的段落
contentDiv.appendChild(paragraph); // 添加新段落到内容区
currentLine = ''; // 清空当前行
} else {
// 累积字符到当前行并更新当前段落内容
currentLine += char;
paragraph.textContent = currentLine; // 逐字符更新当前段落内容
}
// 自动滚动到底部
contentDiv.scrollTop = contentDiv.scrollHeight;
};
window.eventSource.onerror = () => {
console.log('EventSource connection closed.');
window.eventSource.close();
};
});
CSS样式表
body {
font-family: Arial, sans-serif;
background-color: #f4f4f4;
margin: 0;
padding: 0;
display: flex;
justify-content: center;
align-items: center;
height: 100vh;
}
.container {
text-align: center;
background-color: #fff;
padding: 20px;
border-radius: 8px;
box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
}
.form-container {
margin-bottom: 20px;
}
#isbn {
padding: 10px;
font-size: 16px;
width: 200px;
margin-right: 10px;
}
button {
padding: 10px 15px;
font-size: 16px;
cursor: pointer;
}
#content {
text-align: left;
margin-top: 20px;
max-height: 300px;
overflow-y: auto;
background-color: #f9f9f9;
padding: 15px;
border: 1px solid #ddd;
}
目录结构
package.json配置
SSE的应用场景注意事项
-
php这种语言是无法搞这个事情了,因为一个连接需要一个进程,一台主机4核4G,估计也只能建立200个进程就是极限了,再长期保持,就是扯淡,所以你必须用异步IO的Web框架来实现SSE的连接,才可以大大提高连接数。
用SSE做推送-负载均衡场景怎么破?
-
首先我们要知道的是,一个Web服务一旦启动,就是一个独立的进程,你想往这个进程中塞东西,要么你能拿到这个进程对外的变量app或者server,这个大概率不可能,因为启动时是不对外提供这种变量的,那怎么办?
-
这肯定就是进程间通信的事情了,这里不做深入探讨,无非就是管道,共享内存,队列等等,还有个套接字?套接字是啥?说实话我既懂又不懂
-
但是这个进程可以注册到一个端口上,这样外界就可以发消息进来,这也合理,就像人有个耳朵一样,如果一个人聋了,你当然是没办法跟他说话的。
-
可以让这个进程自己提供一个http服务,这样大家调用时就通用了,通过这个http接口,我们就可以将信息发送给这个服务里去
-
-
拿来做服务端推送机制,那就是要在服务器级别收集client,如果是负载均衡方式启动了多台服务器怎么办?
-
刚好1列表就讲到这个问题,每个独立服务都将自己的IP:端口号存放在redis中,同时也罢自己的收集到的client创建个编码塞到redis里去,例如user_id:client-ip-port 这种对应,这样如果有一个广播需要广播一批人,那么就能从redis中找到这批人,然后再提取他们所在的ip:port,然后把消息发给这些服务,这些服务接收到user_id后,再在自己Map表中找到client,然后一个个发送数据过去,就实现了负载均衡下的消息推送,你学废了吗?
-
类似hyperf框架,这种开了多个进程来承接不同的client,玩法是一样,找到user_id:ip:port:process_id,就可以实现同样的广播效果,不过这玩意自己实现起来有点麻烦,我也没玩过,看了下调度就是这样实现的。
-
Express实现单服务的广播机制代码
import express from 'express';
import fs from 'fs';
import readline from 'readline';
import { EventEmitter } from 'events';
import path from 'path';
import { fileURLToPath } from 'url';
const app = express();
const port = 3000;
// 获取当前文件的路径
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
// 创建事件管理器
const eventEmitter = new EventEmitter();
// 全局连接管理器
let clients = [];
// 设置静态文件目录来提供前端页面和资源
app.use(express.static(path.join(__dirname, 'public')));
// SSE 路由,客户端连接到服务器
app.get('/events', (req, res) => {
const headers = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
};
res.writeHead(200, headers);
// 新的客户端连接,加入全局连接管理器
clients.push(res);
// 清理客户端断开连接时的处理
req.on('close', () => {
clients = clients.filter(client => client !== res);
res.end();
});
});
// 广播新书上架通知给所有客户端
const broadcastNewBook = (bookTitle) => {
clients.forEach(client => {
client.write(`data: ${JSON.stringify({ message: `New book available: ${bookTitle}` })}\n\n`);
});
};
// 上架新书时调用此函数
app.post('/new-book', express.json(), (req, res) => {
const { title } = req.body;
if (!title) {
res.status(400).send('Book title is required');
return;
}
// 广播新书消息给所有客户端
broadcastNewBook(title);
res.status(200).send(`New book "${title}" broadcasted to all clients.`);
});
// 启动服务器
app.listen(port, () => {
console.log(`SSE server running at http://localhost:${port}`);
});
Express中如何一个启动一个服务同时支持http/websocket两种协议?
-
这个要想明白一个事情,Express本身分为两个模块,其他Web框架也是这样的,一个是协议处理模块app,另外,也即当http发送过来请求时,浏览器会根据你输入的协议
http://
和ws://
在request的header中追加一个upgrade:websocket,有了这个标识,被Express的http模块识别到之后,它调用的app,就是ws的模块的app,所有的逻辑都走这边,刚才我们已经了解到http的关闭是由app模块发的close来控制的,只要这边不发close,这个连接就会一直保持着,因为连接就是socket,socket就是全双工的,所以,只要ws模块不主动发close过来,那么这个连接就可以保持长连接,同时ws模块会把这个连接的信息记录下来,以方便后续不断地再接收数据,再发回数据,这就是两个协议可以共用一个端口的机制。 -
将两种协议整合在一个Express下有什么好处?他们连接对象都是在一个进程中,因为启动时就启动了一个进程,这个进程启动了http模块和ws模块,所以两者共用一个一套上下文,因此如果你在Express启动前创建一个clients = [],将连接过来的所有ws client都塞进去,那就意味着?
-
意味着你可以通过发送http请求给Express服务,来实现广播效果,是不是爽歪歪?
-
效果截图
服务端代码:
import express from 'express';
import { WebSocketServer } from 'ws';
import path from 'path';
import { fileURLToPath } from 'url';
const app = express();
const port = 3000;
// 使用 JSON 中间件解析 POST 请求体
app.use(express.json());
// 获取当前文件的路径
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
// 配置静态文件目录
app.use(express.static(path.join(__dirname, 'public/ws')));
app.use((req, res, next) => {
res.setHeader("Content-Security-Policy", "connect-src 'self' http://localhost:3000");
next();
});
// 创建 WebSocket 服务器并与 Express 集成
const wss = new WebSocketServer({ noServer: true });
// 保存所有 WebSocket 客户端
let clients = [];
// WebSocket 连接处理
wss.on('connection', (ws) => {
console.log('Client connected');
clients.push(ws);
// 处理消息
ws.on('message', (message) => {
console.log(`Received message: ${message}`);
});
// 当客户端断开连接时,移除它
ws.on('close', () => {
clients = clients.filter(client => client !== ws);
console.log('Client disconnected');
});
});
// 广播消息给所有 WebSocket 客户端
const broadcastMessage = (message) => {
clients.forEach(client => {
if (client.readyState === client.OPEN) {
client.send(message);
}
});
};
// 上架新书的 HTTP 接口
app.post('/new-book', (req, res) => {
const { title } = req.body;
if (!title) {
return res.status(400).send('Book title is required');
}
const message = `New book available: ${title}`;
broadcastMessage(message);
res.status(200).send(`New book "${title}" has been broadcasted to all WebSocket clients.`);
});
// 处理升级请求,WebSocket 连接时需要的处理逻辑
app.server = app.listen(port, () => {
console.log(`Server is running on port ${port}`);
});
app.server.on('upgrade', (request, socket, head) => {
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request);
});
});
Html 代码
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebSocket Client</title>
</head>
<body>
<h1>WebSocket Book Notification</h1>
<div id="content"></div>
<script>
const contentDiv = document.getElementById('content');
// 创建 WebSocket 连接
const ws = new WebSocket('ws://localhost:3000');
// 处理接收到的消息
ws.onmessage = (event) => {
const paragraph = document.createElement('p');
paragraph.textContent = `Received: ${event.data}`;
contentDiv.appendChild(paragraph);
};
// 处理 WebSocket 连接打开
ws.onopen = () => {
console.log('WebSocket connected');
};
// 处理 WebSocket 连接关闭
ws.onclose = () => {
console.log('WebSocket disconnected');
};
</script>
</body>
</html>
js发送广播的请求fetch
fetch('http://localhost:3000/new-book', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
title: 'The Great Adventure WSSSSSSSS'
})
})
.then(response => response.text())
.then(data => console.log(data))
.catch(error => console.error('Error:', error));