由于项目业务逻辑需要,此次前端界面需要接收后端服务器 WebSoket 实时传输的数据,并在页面当中显示实时数据
项目中已经用 js 封装好了能用的 GlobalWebsoket.js
GlobalWebsoket.js
代码如下
GlobalWebsoket.js
// GlobalWebsoket.js
import store from '@/store/index.js';
import Config from '@/core/config' // 引入 cofig ,Config.js 当中配置的是 url 地址
import {Observable
} from "rxjs";// 后端api地址
const wsHost = Config.get('wsUrl')
let ws;
let count = 0;
var subs = {};
let timer = {};
const MAX_RETRIES = 2000;
let trySendCount = 0;
let tempQueue = [];
let socketOpen = false;
const initWebSocket = () => {let token = store.state.token ? store.state.token : store.getters.token;const wsUrl = `${wsHost}/messaging/${token}?:X_Access_Token=${token}`;try {//微信websocket最大并发不能超过5个//https://developers.weixin.qq.com/miniprogram/dev/framework/ability/network.htmlif (count > 0) {return ws;}clearInterval(timer);ws = uni.connectSocket({url: wsUrl,complete: () => {}});count += 1;uni.onSocketClose(function() {socketOpen = false;ws = undefined;setTimeout(initWebSocket, 5000 * count);});uni.onSocketOpen(function() {socketOpen = true;});uni.onSocketMessage(function(msg) {var data = JSON.parse(msg.data);if (data.type === 'error') {uni.showToast({title: data.message,icon: "none",duration: 3500})}if (subs[data.requestId]) {if (data.type === 'complete') {subs[data.requestId].forEach(function(element) {element.complete();});;} else if (data.type === 'result') {subs[data.requestId].forEach(function(element) {element.next(data);});;}}});} catch (error) {setTimeout(initWebSocket, 5000 * count);}timer = setInterval(function() {try {ws && ws.readyState === 1 ? sendSocketMessage(JSON.stringify({"type": "ping"})) : 0;} catch (error) {console.error(error, '发送心跳错误');}//同时判断if (tempQueue.length > 0 && ws && ws.readyState === 1) {sendSocketMessage(tempQueue[0], 1);}}, 2000);return ws;
};//flag,是否处理tempQueue中的数据,如果发送失败,则不会重新加入,发送成功,则去除
function sendSocketMessage(msg, flag) {if (socketOpen) {uni.sendSocketMessage({data: msg});if (flag === 1) {tempQueue.splice(0, 1);}} else {if (flag != 1) {tempQueue.push(msg);}}
}const getWebsocket = (id, topic, parameter) => {return Observable.create(function(observer) {if (!subs[id]) {subs[id] = [];}subs[id].push({next: function(val) {observer.next(val);},complete: function() {observer.complete();}});var msg = JSON.stringify({id: id,topic: topic,parameter: parameter,type: 'sub'});var thisWs = initWebSocket();if (thisWs) {try {sendSocketMessage(msg);} catch (error) {initWebSocket();uni.showToast({title: 'websocket服务连接失败',icon: "none",duration: 3500})}} else {tempQueue.push(msg);ws = undefinedcount = 0initWebSocket();}return function() {var unsub = JSON.stringify({id: id,type: "unsub"});delete subs[id];if (thisWs) {sendSocketMessage(unsub)}};});
};
exports.getWebsocket = getWebsocket;
GlobalWebsoket.js
代码分析GlobalWebsoket.js
import 分析import store from '@/store/index.js'; // vueX 做状态管理的
import Config from '@/core/config' // 引入 cofig ,Config.js 当中配置的是 url 地址
import {Observable
} from "rxjs";
Config 的地址是从 config.js 中来的
RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。 可以把 RxJS 当做是用来处理事件的 Lodash 。
GlobalWebsoket.js
整体分析GlobalWebsoket.js
文件导出了getWebsocket
,这是一个函数
import store from '@/store/index.js';
import Config from '@/core/config'
import {Observable
} from "rxjs";// 后端api地址
// 这里通过 Config 获取需要连接 websoket 的地址
const wsHost = Config.get('wsUrl')
let ws;
let count = 0;
var subs = {};
let timer = {};
const MAX_RETRIES = 2000;
let trySendCount = 0;
let tempQueue = [];
let socketOpen = false;exports.getWebsocket = getWebsocket;
initWebSoket()
const initWebSocket = () => {// 先获取 token let token = store.state.token ? store.state.token : store.getters.token;// 这里是 websoket 的 url 地址// ${wsHost} 是 通过 Config 获取需要连接 websoket 的地址// ${token} 是 token 信息const wsUrl = `${wsHost}/messaging/${token}?:X_Access_Token=${token}`;try {//微信websocket最大并发不能超过5个//https://developers.weixin.qq.com/miniprogram/dev/framework/ability/network.html// 如果连接数量大于 0 if (count > 0) {// 这里就返回当前连接return ws;}// 执行到这里说明连接数为 0 ,以下代码为创建新的 websoket 的连接clearInterval(timer);// 调用 uni.connectSocket 来创建连接ws = uni.connectSocket({url: wsUrl,complete: () => {}});count += 1;// 关闭连接的回调函数uni.onSocketClose(function() {socketOpen = false;ws = undefined;setTimeout(initWebSocket, 5000 * count);});// 连接打开的回调函数uni.onSocketOpen(function() {socketOpen = true;});// 当向 websoket 发送信息时的回调函数uni.onSocketMessage(function(msg) {var data = JSON.parse(msg.data);if (data.type === 'error') {uni.showToast({title: data.message,icon: "none",duration: 3500})}if (subs[data.requestId]) {if (data.type === 'complete') {subs[data.requestId].forEach(function(element) {element.complete();});;} else if (data.type === 'result') {subs[data.requestId].forEach(function(element) {element.next(data);});;}}});} catch (error) {setTimeout(initWebSocket, 5000 * count);}// 设置定时器,每 2 秒执行一次,发送一次 'ping'timer = setInterval(function() {try {ws && ws.readyState === 1 ? sendSocketMessage(JSON.stringify({"type": "ping"})) : 0;} catch (error) {console.error(error, '发送心跳错误');}//同时判断if (tempQueue.length > 0 && ws && ws.readyState === 1) {sendSocketMessage(tempQueue[0], 1);}}, 2000);// 返回新建的 wsreturn ws;
};
getWebsoket
const getWebsocket = (id, topic, parameter) => { // 根据传递的 id, 处理 id, 获取需要监听的内容return Observable.create(function(observer) {if (!subs[id]) {subs[id] = [];}subs[id].push({ // 所有需要监听的内容 push 到 subs[] 数组当中next: function(val) {observer.next(val);},complete: function() {observer.complete();}});// 根据传参的 id,topic,parameter ,讲需要发送的监听的内容封装到 msg 对象中var msg = JSON.stringify({id: id,topic: topic,parameter: parameter,type: 'sub'});// 调用 initWebSoket,初始化 websocket,在 initWebSoket 当中发起连接var thisWs = initWebSocket();if (thisWs) { // 如果连接成功try {sendSocketMessage(msg); // 发送需要 websoket 绑定监听的 msg(上面封装好了)} catch (error) { // 如果发送失败,再次发起连接initWebSocket();uni.showToast({title: 'websocket服务连接失败',icon: "none",duration: 3500})}} else { // 如果没有连接成功tempQueue.push(msg); // 临时队列中先把 msg 存起来ws = undefined // 断掉当前的连接count = 0 // 并把连接数设为 0initWebSocket(); // 再次初始化 websoket}return function() { // 这里是解绑的时候会执行的 (remove)var unsub = JSON.stringify({ id: id,type: "unsub"});delete subs[id];if (thisWs) {sendSocketMessage(unsub)}};});
};
sendSocketMessage
// 发送 soket 信息
//flag,是否处理tempQueue中的数据,如果发送失败,则不会重新加入,发送成功,则去除
function sendSocketMessage(msg, flag) {// 如果当前的 websoket 是打开的if (socketOpen) {// 向 websoket 发送消息uni.sendSocketMessage({data: msg});if (flag === 1) {tempQueue.splice(0, 1);}} else {if (flag != 1) {tempQueue.push(msg);}}
}
GlobalWebsoket.js
使用分析