websocket具体实践
参考:
如何使用websocket
WebSocket客户端连接不上和掉线的问题以及解决方案
继6月份对websocket一顿了解之后,我们的项目也要上websocket了,虽然这部分不是我做,但是借此机会,我也想要尝试一下:假如是我来写这个模块,我会写成什么样呢?
抱着这样的想法,在11月份开始着手查了一些资料,打算完整地写一份前端的socket实例,一开始完全没有头绪,网上查了查大家写的都很简略的样子,达不到想要的效果,后来写小程序的时候想到可以借鉴小程序里的websocket封装,然后终于确定了类的api和基本功能。
修改过几次后,实现的功能和流程如下:(当然我自己写的这一份只是在自己的网站里做了一些简单测试,里面肯定还有很多bug待解决的)
- 1.0 第一版:引入scoketio客户端依赖做的,已经全面否决
- 1.1 第二版:使用函数实现,实际使用感乱七八糟的。
- 1.2 第三版:借鉴小程序的类封装,定义好了简单的success/fail/onError API 实现
- 1.3 第四版:增加多场景(1、时效性强不需要重连机制,连接失败立马更换轮询场景。2、保持长连接的可重连场景),增加心跳、断线重连
- 1.4 第五版:断线后不会发送close事件,心跳事件仍然照常发送,故取消断线重连的机制,但是检测定时器仍然保留。
疑问:- 心跳事件如果只有客户端发送保活,后端收到后不响应,会不会有什么问题?
- 基本不会有问题,保活是为了确保正确的连接到俩端,出于服务器性能考虑可以这样做。
- 如果断线后没有检测到连接断开,会不会重连时多个客户端连接到同一个后端上去了,这样有办法检测吗?
- 后端可以检测连接的websocket session,把之前的取消掉,否则可能会出现服务端发送了消息但是客户端接受不到的情况,
编写中遇到的问题:
- 重连需要注意把之前的连接关闭,否则就会出现内存泄漏的问题,如下图一下开了好多个连接,后端也收到好多个连接
- websocket和MQ消息队列有什么不同,为什么不采用MQ.
- MQ没办法精准定位到用户,如果有多个用户需要消息,往消息队列中取消息不知道哪个才是自己的。
- 前端获取MQ信息不像后端那么好操作易上手,而ws是html支持的API
前端代码实现如下:
后端是使用express-ws简单地建立了一个本地服务器,代码不贴了吧,网上老多了。
const READY_STATE = {
CONNECTING: 'CONNECTING',
OPEN: 'OPEN',
CLOSING: 'CLOSING',
CLOSED: 'CLOSED',
0: 'CONNECTING',
1: 'OPEN',
2: 'CLOSING',
3: 'CLOSED',
};
/**
* eg: new Socket('/connect',{success:(ws)=>{}, fail:(e)=>{switch(e.code){}}, onMessage:(res)=>{} })
* onclose 如果允许重连则重连,不允许则直接回调fail
* onerror 仅在后台发出警示,回调中不开放这个回调,一切在fail函数中处理
* @param url 需要连接的ws链接,仅 /+pathname host拼接
* @param success 连接成功后调用
* @param fail 连接失败 除了以下code,其他为未知bug
* code=1 -> 不支持websocket
* code=2 -> 不允许重连-连接失败、允许重连-重连三次后也还是连接失败
* code=3 -> 身份验证未通过
* code=4 -> 未收到心跳事件主动断开连接
* code=5 -> (暂时不处理) 因为客户端断线 / 服务端主动断开等原因导致连接关闭后的回调
* @param onMessage 收到消息的回调函数。
* @param allowReConnect 允许连接失败后再重新连接而不立刻返回失败结果。(比如结果页就不需要重连,只需要第一次连接的结果,否则重连几次后,支付结果都出来了)
* 连接成功后将发送身份验证,每次连接发送数据需要带上本次会话的发送时间戳(不然前端无法确定本次回复是哪次消息)
* ? 如果意外在同一个页面同时创建了多个链接,前端无法检测出来,因为对象之间互相不联系,需要后端查询是否有相同用户链接,主动断开。
*/
class Socket {
constructor(connectUrl = '', options = {}) {
this.initData(connectUrl, options);
this.connect();
}
initData = (connectUrl, options) => {
this.connectUrl = connectUrl;
this.props = options;
// socket连接事件
this.socketOpen = false;
this.socketConnecting = false;
this.socketMsgQueue = []; // 待发送、发送后未收到回复 的消息队列
this.maxSocketConnectCount = 3; // 最多断线重连3次
this.socketConnectCount = 0; // 断线重连次数
this.socketConnectTimer = null; // 断线重连定时器
// 心跳事件
this.heartBeatFailCount = 0; // 心跳连接失败次数
this.heartBeatTimer = null; // 心跳事件定时器
this.heartBeatEventCb = null; // 心跳事件回调函数,一旦回应则清空,未回应则重连或关闭连接。
};
/**
* 建立连接
* @param {string} from reconnet | undefined 默认为初始化连接,reconnet为连接失败重连
*/
connect = (from = 'connect') => {
try {
if (!window.WebSocket) {
this.options.fail({ code: 1, error: '不支持' });
}
const ws = new WebSocket('ws://localhost:8100' + this.connectUrl);
this.socketConnecting = true;
// 注册ws相关事件
ws.onopen = () => {
console.log('success');
this.ws = ws;
this.socketOpen = true;
this.socketConnecting = false;
this.cancelReConnect();
this.ping();
};
ws.onerror = (e) => {
this.socketOpen = false;
this.socketConnecting = false;
console.error('ERROR:' + from + '-连接失败');
};
ws.onclose = () => {
console.error('CLOSE:' + from + '-连接失败');
this.socketOpen = false;
this.cancelHeartBeat();
if(this.socketConnecting){
ws.close();
this.socketConnecting = false;
}
if (this.props.allowReConnect) {
this.reConnect();
} else {
this.props.fail?.({ code: 2, error: '连接失败' });
}
};
} catch (error) {
this.options.fail({ code: 1000, error });
}
};
/** 取消重连 */
cancelReConnect = () => {
if (this.socketConnectTimer) {
clearTimeout(this.socketConnectTimer);
this.socketConnectTimer = null;
this.socketConnectCount = 0;
}
};
/** 重连 */
reConnect = () => {
if (this.socketConnectCount < this.maxSocketConnectCount) {
this.socketConnectCount = this.socketConnectCount + 1;
this.socketConnectTimer = setTimeout(()=>{
this.connect();
},1000)
} else {
this.socketConnectCount = 0;
clearTimeout(this.socketConnectTimer);
this.socketConnectTimer = null;
}
};
onmessage = (e, cb) => {
if (e.data === 'pong') {
// 清空当前的心跳回调事件
clearTimeout(this.heartBeatEventCb);
this.heartBeatEventCb = null;
}
console.log('from server: ' + e.data);
cb(e);
};
// 关闭心跳事件
cancelHeartBeat = () => {
clearInterval(this.heartBeatTimer);
this.heartBeatTimer = null;
};
// 注册心跳事件
ping = () => {
if (this.heartBeatTimer) {
this.cancelHeartBeat();
}
this.heartBeatTimer = setInterval(() => {
this.ws.send('ping');
// 向心跳事件列表push一个心跳事件,30秒发送一次,setTimeOut 10s后没有收到Pong消息则重连或关闭连接。
this.heartBeatEventCb = setTimeout(()=>{
this.ws.close();
this.props.fail({code:4,error:'未收到心跳事件'})
},1 * 10 * 1000)
}, 1 * 30 * 1000);
};
}
export default Socket;