angular + express 實現(xiàn)websocket通信
最近需要實現(xiàn)一個功能,后端通過TCP協(xié)議連接雷達硬件的控制器,前端通過websocket連接后端,當控制器觸發(fā)消息的時候,把信息通知給所以前端;
第一個思路是單獨寫一個后端服務(wù)用來實現(xiàn)websocket,調(diào)試成功了,后來又發(fā)現(xiàn)一個插件express-ws,于是決定改變思路,研究了下,最終代碼如下,希望幫助更多的朋友,不再害怕websocket
首先寫一個前端websocket服務(wù)。這里我選擇放棄單例模式,采用誰調(diào)用誰負責(zé)銷毀的思路
import { Injectable } from '@angular/core'; import { Observable } from 'rxjs'; import { LoginService } from '../login/login.service'; import { environment } from 'src/environments/environment'; export class WsConnect { ws!:WebSocket; sendWs!:(msg:string)=>void; closeWs!:()=>void; result!:Observable<any> } @Injectable({providedIn:"root"}) export class WebsocketService { origin = window.location.origin.replace('http', 'ws'); constructor( private loginService: LoginService ) { } getUrl(path:string){ return `${this.origin}${path}`; } connect(path:string):WsConnect{ let url = this.getUrl(path); let ws = new WebSocket(url, this.loginService.userInfo.jwt); // 在這里放入jwt信息,目前沒有找到其它地方可以放。有些網(wǎng)友建議先放入地址,然后在nginx里重新放入header,我覺得不夠接地氣 return { ws, sendWs:function(message:string){ ws.send(message); }, closeWs:function(){ ws.close(); }, result:new Observable( observer => { ws.onmessage = (event) => { observer.next(event.data)};//接收數(shù)據(jù) ws.onerror = (event) => {console.log("ws連接錯誤:",event);observer.error(event)};//發(fā)生錯誤 ws.onclose = (event) => {console.log("ws連接斷開:",event); observer.complete() };//結(jié)束事件 ws.onopen = (event) => { console.log("ws連接成功:",event);};//結(jié)束事件 } ) } } }
然后在組件里調(diào)用
import { Component, OnDestroy, OnInit } from '@angular/core';import { WebsocketService, WsConnect } from?'../utils/websocket-client.service'; @Component({ selector: 'app-car-measure', templateUrl: './car-measure.component.html', styleUrls: ['./car-measure.component.scss'] }) export class CarMeasureComponent implements OnInit , OnDestroy{ connect!:WsConnect; constructor(public wsService:WebsocketService) { } ngOnInit() { this.connectServer(); } connectServer(){ this.connect = this.wsService.connect('/websocket/carMeasure') this.connect.result.subscribe( (data:any) => { //接收到服務(wù)端發(fā)來的消息 console.log("服務(wù)器消息:",data); setTimeout(() => { this.connect.sendWs("這是從客戶端發(fā)出的消息"); }, 5000); } ) } ngOnDestroy() { this.connect.closeWs(); // 這個方法時把整個ws銷毀,而不是取消訂閱哦,所以有需要的同學(xué)可以考慮取消訂閱的方案 } }
后端引入express-ws,封裝一個可調(diào)用的文件,部分代碼借鑒了網(wǎng)上的代碼,做了一些改善
//websocket.js const express = require('express'); const router = express.Router(); const expressWs = require('express-ws') // 初始化 let WS = null; // 聲明一個通道類 let channels = null; let pathList = [ '/websocket/carMeasure', '/path2' ] function initWebSocket(app) { WS = expressWs(app) //混入app, wsServer 存儲所有已連接實例 // 創(chuàng)建通道 channels = new channel(router) pathList.forEach(path=>{ channels.createChannel(path) // channels.createChannel('/carMeasure/websocket/carSize') }) app.use(router) } // 通道類 class channel { router; constructor(props) { this.router = props; } createChannel(path) { // 建立通道 this.router.ws( path, (ws, req) => { //把自定義信息加入到socket里面取,expressws會自動放入到從WS.getWss().clients, // 并且會自動根據(jù)活動用戶刪除或者增加客戶端 ws['wsPath'] = path; ws['userId'] = req.userInfo._id; ws['roleId'] = req.userInfo.role; ws.on('message', (msg) => getMsg(msg, path)) ws.on('close', (code) => close(code, path)) ws.on('error', (e) => error(e, path)) }) } } /** * * @param {*} msg 消息內(nèi)容 * @param {String} from 消息來源 */ // 監(jiān)聽消息 let getMsg = (msg, from) => { console.log(msg, from); // SendMsgAll({path:'/path2', data: msg }) } // 發(fā)送消息 let sendMsg = (client, data) => { if (!client) return client.send(JSON.stringify(data)) } let close = (code) => { console.log('關(guān)閉連接', code); } let error = (e) => { console.log('error: ', e); } // 群發(fā) /** * * @param {String} path 需要發(fā)送的用戶來源 路由,默認全部 * @param {*} data 發(fā)送的數(shù)據(jù) */ function sendMsgToClients(clients,data){ clients.forEach((client)=> { if (client._readyState == 1) { sendMsg(client, data) } }) } function sendMsgToAll(data = "") { let allClientsList = Array.from(WS.getWss().clients) sendMsgToClients(allClientsList,data) } function sendMsgToPath(data = "", path = '') { let allClientsList = Array.from(WS.getWss().clients).filter((ws)=>ws['wsPath'] == path) sendMsgToClients(allClientsList,data) } function sendMsgToId(data = "", userId = '') { let allClientsList = Array.from(WS.getWss().clients).filter((ws)=>ws['userId'] == userId) sendMsgToClients(allClientsList,data) } function sendMsgToRole(data = "", roleId = '') { let allClientsList = Array.from(WS.getWss().clients).filter((ws)=>ws['roleId'] == roleId) sendMsgToClients(allClientsList,data) } module.exports = { initWebSocket, sendMsgToAll, sendMsgToPath, sendMsgToId, sendMsgToRole, }
然后再app.js里面調(diào)用就可以了
const {initWebSocket} = require('./public/utils/websocket') initWebSocket(app)
其中涉及到了權(quán)限驗證的問題,也可以直接驗證jwt
app.use((req,res,next) => { if(!whiteList.some(item => req.url.startsWith(item))) { let httpJwt= req.headers['jwt']; let wsJwt= req.headers['sec-websocket-protocol']; // 這里驗證websocket的身份信息,其它代碼 utils.verifyToken(httpJwt || wsJwt).then(res => { //utils.verifyToken封裝了jwt的驗證 req["userInfo"] = res; //放入一些信息,方便后續(xù)操作 next() }).catch(e => { console.error(e); res.status(401).send('invalid token') }) } else { next() } })
萬事具備,最后一步就是等待硬件設(shè)備的觸發(fā)了,其它tcp客戶端的代碼就不放出來干擾大家了,就是粗暴的調(diào)用即可
var {sendMsgToPath} = require('../public/utils/websocket'); sendMsgToPath(JSON.stringify(result), this.carMeasurePath); // 注意websocket或者tcp的傳輸都只能用字符串或者blob
另外注意要配置nginx代理,nginx的配置各位應(yīng)該都清楚吧,這里就不多說了,注意的是這里有幾個可選擇的地方,一個是前端,可以把ws服務(wù)做成單例,另一個是后端路由其實可以寫在http的路由文件里,還有一個是對后端ws client的使用,利用了express-ws自身的方法,當然也可以自己寫對象來搜集clients (不太建議)
想了以下還是放出來給小白,這里是proxy.config.json
{ "/api": { "target": "http://localhost:3000", "secure": false, "logLevel": "debug", "changeOrigin": true, "pathRewrite": { "^/api": "/" } }, "/websocket":{ "target": "http://localhost:3000", "secure": false, "ws": true } }
畢竟講究的是手把手把你教會,不會也得會,這里是放入服務(wù)器的nginx.cong
worker_processes 1; events { worker_connections 1024; } http { server { listen 80; server_name localhost; client_max_body_size 20M; underscores_in_headers on; include /etc/nginx/mime.types; gzip on; gzip_static on; gzip_min_length 1000; gzip_types text/plain text/css application/json application/javascript application/x-javascript text/xml application/xml application/xml+rss text/javascript; location /{ root /usr/share/nginx/html; index index.html index.htm; try_files $uri $uri/ /index.html; } location /api { rewrite ^/api/(.*)$ /$1 break; proxy_pass http://localhost:3000; } location /websocket { proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Host $http_host; proxy_set_header X-NginX-Proxy true; proxy_pass http://localhost:3000; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; } } }
到此這篇關(guān)于angular + express 實現(xiàn)websocket通信的文章就介紹到這了,更多相關(guān)angular websocket通信內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Angular 中 ngOnInit 和 constructor 使用場景
最初學(xué)習(xí)Angular的時候總是搞不清楚ngOnInit和constructor的區(qū)別,現(xiàn)在我們來稍微理一下兩者之間的區(qū)別。2017-06-06Angular2中監(jiān)聽數(shù)據(jù)更新的方法
今天小編就為大家分享一篇Angular2中監(jiān)聽數(shù)據(jù)更新的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-08-08angular 用攔截器統(tǒng)一處理http請求和響應(yīng)的方法
下面小編就為大家?guī)硪黄猘ngular 用攔截器統(tǒng)一處理http請求和響應(yīng)的方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-06-06淺析angularJS中的ui-router和ng-grid模塊
下面小編就為大家?guī)硪黄獪\析angularJS中的ui-router和ng-grid模塊。小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-05-05Angularjs實現(xiàn)控制器之間通信方式實例總結(jié)
這篇文章主要介紹了Angularjs實現(xiàn)控制器之間通信方式,結(jié)合實例形式總結(jié)分析了AngularJS控制器常用通信方式及相關(guān)操作注意事項,需要的朋友可以參考下2018-03-03AngularJS修改model值時,顯示內(nèi)容不變的實例
今天小編就為大家分享一篇AngularJS修改model值時,顯示內(nèi)容不變的實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-09-09