import * as ws from 'ws'; import { SessionRequest, OrisonServer, SessionState } from './server'; import fs from 'fs'; import path from 'path'; export type WebSocketHandler = ( context: { req: SessionRequest; data: any; socket: ws; service: WebSocketService } ) => Promise; export default class WebSocketService { private subscriptions: Subscription[] = []; private endpoints: Record = {}; public initialize() { this.getEndpoints(path.resolve('.orison/ws'), '.orison/ws'); } private getEndpoints(root: string, folder: string) { if (!fs.existsSync(folder)) { return; } const items = fs.readdirSync(folder); for (const item of items) { const abs = path.resolve(path.join(folder, item)); if (fs.lstatSync(abs).isDirectory()) { this.getEndpoints(root, abs); } else { if (item.endsWith('.js')) { let originalEndpoint = '/' + abs.substring(root.length + 1).replace(/\\/g, '/'); let endpoint = originalEndpoint.substring(0, originalEndpoint.length - 3); const file = require(abs); if (typeof file.default === 'function') { console.log('Initializing WebSocket endpoint: ' + endpoint); this.endpoints[endpoint] = file.default; } else { throw new Error('Invalid WebSocket endpoint: ' + endpoint); } } } } } public handleConnection(_: OrisonServer, socket: ws, req: SessionRequest) { socket.on('message', async msg => { const data = JSON.parse(msg as string); if (data.type === 'ping') { socket.send(JSON.stringify({ type: 'ping', nonce: data.nonce, })); return; } if (data.type === 'message') { const endpoint = this.endpoints[data.path]; if (endpoint === undefined) { socket.send(JSON.stringify({ type: 'message', data: { __error: true, reason: 'Message path does not exist' }, nonce: data.nonce, })); return; } const result = await endpoint({ req, socket, data: data.data, service: this }); if (result === null || result === undefined) { socket.send(JSON.stringify({ type: 'message', data: { __error: true, reason: 'Message rejected by server' }, nonce: data.nonce, })); return; } socket.send(JSON.stringify({ type: 'message', nonce: data.nonce, data: result })); } else if (data.type === 'subscribe') { const subscription = this.subscriptions.find(sub => sub.path === data.path); if (subscription === undefined) { socket.send(JSON.stringify({ type: 'message', data: { __error: true, reason: 'Subscription path does not exist' }, nonce: data.nonce, })); return; } for (const listener of subscription.listeners) { if (listener[1].session.sessionToken === req.session.sessionToken) { socket.send(JSON.stringify({ type: 'message', data: { __error: true, reason: 'Client already subscribed' }, nonce: data.nonce, })); return; } } if (subscription.options?.onSubscribeRequest !== undefined) { const result = await subscription.options.onSubscribeRequest({ req, socket, data: data.data }); if (result !== null && result !== undefined) { subscription.listeners.push([socket, req]); socket.send(JSON.stringify({ type: 'message', nonce: data.nonce, data: result })); } else { socket.send(JSON.stringify({ type: 'message', data: { __error: true, reason: 'Subscription request rejected by the server' }, nonce: data.nonce, })); } } else { subscription.listeners.push([socket, req]); } } else if (data.type === 'unsubcribe') { const subscription = this.subscriptions.find(sub => sub.path === data.path); if (subscription === undefined) { socket.send(JSON.stringify({ type: 'message', data: { __error: true, reason: 'Subscription path does not exist' }, nonce: data.nonce, })); return; } for (let i = 0; i < subscription.listeners.length; i++) { const listener = subscription.listeners[i]; if (listener[1].session.sessionToken === req.session.sessionToken) { if (subscription.options?.onUnsubscribeRequest !== undefined) { const result = await subscription.options.onUnsubscribeRequest({ req, socket, data: data.data }); if (result !== null && result !== undefined) { socket.send(JSON.stringify({ type: 'message', nonce: data.nonce, data: result })); return; } } subscription.listeners.splice(i, 1); return; } } socket.send(JSON.stringify({ type: 'message', data: { __error: true, reason: 'Client not subscribed' }, nonce: data.nonce, })); } }); socket.on('close', () => { for (const subscription of this.subscriptions) { for (let i = 0; i < subscription.listeners.length; i++) { const listener = subscription.listeners[i]; if (listener[1].session.sessionToken === req.session.sessionToken) { subscription.listeners.splice(i, 1); break; } } } }); } public addSubscription(subscription: Subscription) { this.subscriptions.push(subscription); } /** * Gets a Subscription added with `addSubscription` by its path name. * @param path the path of the subscription */ public getSubscription(path: string) { const sub = this.subscriptions.find(s => s.path === path); if (sub === undefined) { throw new Error('Subscription does not exist: ' + path); } return sub; } } export class Subscription { public listeners: [ws, SessionRequest][] = []; public constructor(public path: string, public options?: SubscriptionOptions) { } /** * Calls the given function for each subscribed clients, and sends them the data returned by the function. * If the data returned by the function is `null` or `undefined`, then the client will not be sent any data. * @param dataGenerator the generator for the data to send */ public broadcast(dataGenerator: (state: SessionState) => any) { for (const listener of this.listeners) { const data = dataGenerator(listener[1].session); if (data === null || data === undefined) { continue; } listener[0].send(JSON.stringify({ type: 'subscriptionMessage', path: this.path, data })); } } } export interface SubscriptionArgs { req: SessionRequest; socket: ws; data?: any; } export interface SubscriptionOptions { /** * Called when a client socket requests to subscribe. * @returns data to return to the client, or `null` or `undefined` if they should not be added to the list of subscribed sockets. * An empty object (`{}`) will add the client to the list. */ onSubscribeRequest?: (args: SubscriptionArgs) => Promise; /** * Called when a client requests to unsubscribe. * @returns data to return to the client, or `null` or `undefined` if no data */ onUnsubscribeRequest?: (args: SubscriptionArgs) => Promise; }