GlobalWebsoket.js 封装配置分析
- 前言
- 一、 封装好的 `GlobalWebsoket.js `
- 1. `GlobalWebsoket.js `
- 二、`GlobalWebsoket.js` 代码分析
- 1.`GlobalWebsoket.js ` import 分析
- 2.`GlobalWebsoket.js ` 整体分析
- 3. `initWebSoket()`
- 3. `getWebsoket`
- 4.` sendSocketMessage`
- 三、`GlobalWebsoket.js` 使用分析
前言
由于项目业务逻辑需要,此次前端界面需要接收后端服务器 WebSoket 实时传输的数据,并在页面当中显示实时数据
项目中已经用 js 封装好了能用的 GlobalWebsoket.js
一、 封装好的 GlobalWebsoket.js
代码如下
1. GlobalWebsoket.js
// GlobalWebsoket.js
import store from '@/store/index.js';
import Config from '@/core/config' // 引入 cofig ,Config.js 当中配置的是 url 地址
import {
Observable
} from "rxjs";
// 后端api地址
const wsHost = Config.get('wsUrl')
let ws;
let count = 0;
var subs = {};
let timer = {};
const MAX_RETRIES = 2000;
let trySendCount = 0;
let tempQueue = [];
let socketOpen = false;
const initWebSocket = () => {
let token = store.state.token ? store.state.token : store.getters.token;
const wsUrl = `${wsHost}/messaging/${token}?:X_Access_Token=${token}`;
try {
//微信websocket最大并发不能超过5个
//https://developers.weixin.qq.com/miniprogram/dev/framework/ability/network.html
if (count > 0) {
return ws;
}
clearInterval(timer);
ws = uni.connectSocket({
url: wsUrl,
complete: () => {}
});
count += 1;
uni.onSocketClose(function() {
socketOpen = false;
ws = undefined;
setTimeout(initWebSocket, 5000 * count);
});
uni.onSocketOpen(function() {
socketOpen = true;
});
uni.onSocketMessage(function(msg) {
var data = JSON.parse(msg.data);
if (data.type === 'error') {
uni.showToast({
title: data.message,
icon: "none",
duration: 3500
})
}
if (subs[data.requestId]) {
if (data.type === 'complete') {
subs[data.requestId].forEach(function(element) {
element.complete();
});;
} else if (data.type === 'result') {
subs[data.requestId].forEach(function(element) {
element.next(data);
});;
}
}
});
} catch (error) {
setTimeout(initWebSocket, 5000 * count);
}
timer = setInterval(function() {
try {
ws && ws.readyState === 1 ? sendSocketMessage(JSON.stringify({
"type": "ping"
})) : 0;
} catch (error) {
console.error(error, '发送心跳错误');
}
//同时判断
if (tempQueue.length > 0 && ws && ws.readyState === 1) {
sendSocketMessage(tempQueue[0], 1);
}
}, 2000);
return ws;
};
//flag,是否处理tempQueue中的数据,如果发送失败,则不会重新加入,发送成功,则去除
function sendSocketMessage(msg, flag) {
if (socketOpen) {
uni.sendSocketMessage({
data: msg
});
if (flag === 1) {
tempQueue.splice(0, 1);
}
} else {
if (flag != 1) {
tempQueue.push(msg);
}
}
}
const getWebsocket = (id, topic, parameter) => {
return Observable.create(function(observer) {
if (!subs[id]) {
subs[id] = [];
}
subs[id].push({
next: function(val) {
observer.next(val);
},
complete: function() {
observer.complete();
}
});
var msg = JSON.stringify({
id: id,
topic: topic,
parameter: parameter,
type: 'sub'
});
var thisWs = initWebSocket();
if (thisWs) {
try {
sendSocketMessage(msg);
} catch (error) {
initWebSocket();
uni.showToast({
title: 'websocket服务连接失败',
icon: "none",
duration: 3500
})
}
} else {
tempQueue.push(msg);
ws = undefined
count = 0
initWebSocket();
}
return function() {
var unsub = JSON.stringify({
id: id,
type: "unsub"
});
delete subs[id];
if (thisWs) {
sendSocketMessage(unsub)
}
};
});
};
exports.getWebsocket = getWebsocket;
二、GlobalWebsoket.js
代码分析
1.GlobalWebsoket.js
import 分析
import store from '@/store/index.js'; // vueX 做状态管理的
import Config from '@/core/config' // 引入 cofig ,Config.js 当中配置的是 url 地址
import {
Observable
} from "rxjs";
Config 的地址是从 config.js 中来的
RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。 可以把 RxJS 当做是用来处理事件的 Lodash 。
2.GlobalWebsoket.js
整体分析
GlobalWebsoket.js
文件导出了getWebsocket
,这是一个函数
import store from '@/store/index.js';
import Config from '@/core/config'
import {
Observable
} from "rxjs";
// 后端api地址
// 这里通过 Config 获取需要连接 websoket 的地址
const wsHost = Config.get('wsUrl')
let ws;
let count = 0;
var subs = {};
let timer = {};
const MAX_RETRIES = 2000;
let trySendCount = 0;
let tempQueue = [];
let socketOpen = false;
exports.getWebsocket = getWebsocket;
3. initWebSoket()
const initWebSocket = () => {
// 先获取 token
let token = store.state.token ? store.state.token : store.getters.token;
// 这里是 websoket 的 url 地址
// ${wsHost} 是 通过 Config 获取需要连接 websoket 的地址
// ${token} 是 token 信息
const wsUrl = `${wsHost}/messaging/${token}?:X_Access_Token=${token}`;
try {
//微信websocket最大并发不能超过5个
//https://developers.weixin.qq.com/miniprogram/dev/framework/ability/network.html
// 如果连接数量大于 0
if (count > 0) {
// 这里就返回当前连接
return ws;
}
// 执行到这里说明连接数为 0 ,以下代码为创建新的 websoket 的连接
clearInterval(timer);
// 调用 uni.connectSocket 来创建连接
ws = uni.connectSocket({
url: wsUrl,
complete: () => {}
});
count += 1;
// 关闭连接的回调函数
uni.onSocketClose(function() {
socketOpen = false;
ws = undefined;
setTimeout(initWebSocket, 5000 * count);
});
// 连接打开的回调函数
uni.onSocketOpen(function() {
socketOpen = true;
});
// 当向 websoket 发送信息时的回调函数
uni.onSocketMessage(function(msg) {
var data = JSON.parse(msg.data);
if (data.type === 'error') {
uni.showToast({
title: data.message,
icon: "none",
duration: 3500
})
}
if (subs[data.requestId]) {
if (data.type === 'complete') {
subs[data.requestId].forEach(function(element) {
element.complete();
});;
} else if (data.type === 'result') {
subs[data.requestId].forEach(function(element) {
element.next(data);
});;
}
}
});
} catch (error) {
setTimeout(initWebSocket, 5000 * count);
}
// 设置定时器,每 2 秒执行一次,发送一次 'ping'
timer = setInterval(function() {
try {
ws && ws.readyState === 1 ? sendSocketMessage(JSON.stringify({
"type": "ping"
})) : 0;
} catch (error) {
console.error(error, '发送心跳错误');
}
//同时判断
if (tempQueue.length > 0 && ws && ws.readyState === 1) {
sendSocketMessage(tempQueue[0], 1);
}
}, 2000);
// 返回新建的 ws
return ws;
};
3. getWebsoket
const getWebsocket = (id, topic, parameter) => {
// 根据传递的 id, 处理 id, 获取需要监听的内容
return Observable.create(function(observer) {
if (!subs[id]) {
subs[id] = [];
}
subs[id].push({ // 所有需要监听的内容 push 到 subs[] 数组当中
next: function(val) {
observer.next(val);
},
complete: function() {
observer.complete();
}
});
// 根据传参的 id,topic,parameter ,讲需要发送的监听的内容封装到 msg 对象中
var msg = JSON.stringify({
id: id,
topic: topic,
parameter: parameter,
type: 'sub'
});
// 调用 initWebSoket,初始化 websocket,在 initWebSoket 当中发起连接
var thisWs = initWebSocket();
if (thisWs) { // 如果连接成功
try {
sendSocketMessage(msg); // 发送需要 websoket 绑定监听的 msg(上面封装好了)
} catch (error) { // 如果发送失败,再次发起连接
initWebSocket();
uni.showToast({
title: 'websocket服务连接失败',
icon: "none",
duration: 3500
})
}
} else { // 如果没有连接成功
tempQueue.push(msg); // 临时队列中先把 msg 存起来
ws = undefined // 断掉当前的连接
count = 0 // 并把连接数设为 0
initWebSocket(); // 再次初始化 websoket
}
return function() { // 这里是解绑的时候会执行的 (remove)
var unsub = JSON.stringify({
id: id,
type: "unsub"
});
delete subs[id];
if (thisWs) {
sendSocketMessage(unsub)
}
};
});
};
4. sendSocketMessage
// 发送 soket 信息
//flag,是否处理tempQueue中的数据,如果发送失败,则不会重新加入,发送成功,则去除
function sendSocketMessage(msg, flag) {
// 如果当前的 websoket 是打开的
if (socketOpen) {
// 向 websoket 发送消息
uni.sendSocketMessage({
data: msg
});
if (flag === 1) {
tempQueue.splice(0, 1);
}
} else {
if (flag != 1) {
tempQueue.push(msg);
}
}
}