import { Observable, Subject } from 'rxjs'; import { Server } from 'ws'; import { AllActions } from '../enum'; import { AddPayload, ClientMessage, DeletePayload, ServerMessage, UpdatePayload } from './interfaces'; import WebSocket = require('ws'); export class WebsocketServer { private _server: Server; private _connections: { [id: string]: WebSocket }; private _connection_ids: string[]; private _message_subjects: { add: Subject>; update: Subject>; delete: Subject>; }; public messages: { add: Observable>; update: Observable>; delete: Observable>; }; constructor(port: number = 3000) { this._server = new Server({ port }); this._connections = {}; this._connection_ids = []; this._message_subjects = { add: new Subject(), update: new Subject(), delete: new Subject(), }; this.messages = { add: this._message_subjects.add.asObservable(), update: this._message_subjects.update.asObservable(), delete: this._message_subjects.delete.asObservable(), }; this._listen(); } broadcast(msg: ServerMessage, except: string[] = []) { let send_to = [...this._connection_ids]; if (except.length > 0) { send_to = this._connection_ids.filter(e => !except.includes(e)); } for (let i = 0; i < send_to.length; i++) { this._connections[send_to[i]].send(this._encode(msg)); } } send(id: string, msg: ServerMessage) { if (this._connections[id]) { this._connections[id].send(this._encode(msg)); } } private _listen() { this._server.on('connection', (socket, req) => { const connection_id = this._parse_id(req.url!); this._add_connection(connection_id, socket); socket.on('message', msg => this._parse_message(this._decode(msg as string), socket)); socket.on('close', () => this._remove_connection(connection_id)); }); } private _add_connection(connection_id: string, socket: WebSocket) { this._connections[connection_id] = socket; this._connection_ids.push(connection_id); } private _remove_connection(connection_id: string) { delete this._connections[connection_id]; this._connection_ids = this._connection_ids.filter(e => e !== connection_id); } private _parse_message(msg: ClientMessage, socket: any) { if (msg.msg === AllActions.Connect) { this._on_connect(msg); } else if (msg.msg === AllActions.Disconnect) { this._on_disconnect(msg); } else if (msg.msg === AllActions.Add) { this._message_subjects.add.next(msg as ClientMessage); } else if (msg.msg === AllActions.Update) { this._message_subjects.update.next(msg as ClientMessage); } else if (msg.msg === AllActions.Delete) { this._message_subjects.delete.next(msg as ClientMessage); } } private _on_connect(msg: ClientMessage) {} private _on_disconnect(msg: ClientMessage) {} private _encode(data: ServerMessage): string { return JSON.stringify(data); } private _decode(data: string): ClientMessage { return JSON.parse(data); } private _parse_id(query: string) { return /_id=([^\/]*)/.exec(query)![1]; } }