// Copyright (C) 2018 Zilliqa // // This file is part of zilliqa-js // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see . import { Buffer } from "buffer"; /* tslint:disable:no-unused-variable */ import mitt from "mitt"; import * as websocket from "websocket"; import { MessageType, NewBlockQuery, NewEventQuery, QueryParam, SocketConnect, SocketState, StatusType, SubscriptionOption, Unsubscribe, } from "./types"; type W3CWebsocketType = websocket.w3cwebsocket; const W3CWebsocket = websocket.w3cwebsocket; export class WebSocketProvider { public static NewWebSocket( url: string, options?: SubscriptionOption ): WebSocket | W3CWebsocketType { if (typeof window !== "undefined" && (window).WebSocket) { return new WebSocket(url, options !== undefined ? options.protocol : []); } else { const headers = options !== undefined ? options.headers || {} : undefined; const urlObject = new URL(url); if ( headers !== undefined && !headers.authorization && urlObject.username && urlObject.password ) { const authToken = Buffer.from( `${urlObject.username}:${urlObject.password}` ).toString("base64"); headers.authorization = `Basic ${authToken}`; } return new W3CWebsocket( url, options !== undefined ? options.protocol : undefined, undefined, headers, undefined, options !== undefined ? options.clientConfig : undefined ); } } url: string; options?: SubscriptionOption; emitter: mitt.Emitter; handlers: any = {}; websocket: WebSocket | W3CWebsocketType; subscriptions: any; // basically, options is a collection of metadata things like protocol or headers constructor(url: string, options?: SubscriptionOption) { this.url = url; this.options = options; // TODO: type cast to prevent: // zilliqa/js/subscriptions/src/ws.ts(77,20): error TS7009: 'new' expression, whose target lacks a construct signature, implicitly has an 'any' type. this.emitter = new (mitt as any)(this.handlers) as mitt.Emitter; this.websocket = WebSocketProvider.NewWebSocket(url, options); this.subscriptions = {}; this.registerEventListeners(); } registerEventListeners() { this.websocket.onopen = this.onConnect.bind(this); this.websocket.onclose = this.onClose.bind(this); this.websocket.onmessage = this.onMessage.bind(this); this.websocket.onerror = this.onError.bind(this); } removeAllSocketListeners() { this.removeEventListener(SocketState.SOCKET_MESSAGE); this.removeEventListener(SocketState.SOCKET_READY); this.removeEventListener(SocketState.SOCKET_CLOSE); this.removeEventListener(SocketState.SOCKET_ERROR); this.removeEventListener(SocketState.SOCKET_CONNECT); } removeEventListener(type?: string, handler?: mitt.Handler) { if (!type) { this.handlers = {}; return; } if (!handler) { delete this.handlers[type]; } else { return this.emitter.off(type, handler); } } reconnect() { setTimeout(() => { this.removeAllSocketListeners(); this.websocket = WebSocketProvider.NewWebSocket(this.url, this.options); this.registerEventListeners(); }, 5000); } async onClose(event: CloseEvent) { // reconnect if (this.subscriptions !== null && !event.wasClean) { this.emitter.emit(SocketConnect.RECONNECT, event); this.reconnect(); return; } // normal close if (this.websocket.CONNECTING) { this.emitter.emit(SocketConnect.CLOSE, event); this.websocket.close(); return; } } onError(event: Event) { this.emitter.emit(SocketConnect.ERROR, event); if (this.websocket.CONNECTING) { this.websocket.close(); } return; } async onConnect() { if (!this.subscriptions) { this.subscriptions = {}; } // retry logic const subscriptionKeys = Object.keys(this.subscriptions); if (subscriptionKeys.length > 0) { for (const key of subscriptionKeys) { const id = key; const parameters = this.subscriptions[key].parameters; delete this.subscriptions[id]; await this.subscribe(parameters); } } this.emitter.emit(SocketState.SOCKET_CONNECT); this.emitter.emit(SocketConnect.CONNECT); } onMessage(msg: MessageEvent) { if (msg.data) { const dataObj = JSON.parse(msg.data); if (dataObj.type === MessageType.NOTIFICATION) { this.emitter.emit(SocketState.SOCKET_MESSAGE, dataObj); for (const value of dataObj.values) { if (value.query === MessageType.NEW_BLOCK) { this.emitter.emit(MessageType.NEW_BLOCK, value); } else if (value.query === MessageType.EVENT_LOG) { this.emitter.emit(MessageType.EVENT_LOG, value); } else if (value.query === MessageType.UNSUBSCRIBE) { this.emitter.emit(MessageType.UNSUBSCRIBE, value); } else { throw new Error("unsupported value type"); } } } else if (dataObj.query === QueryParam.NEW_BLOCK) { // subscribe NewBlock succeed this.subscriptions[dataObj.query] = { id: dataObj.query, parameters: dataObj, }; this.emitter.emit(StatusType.SUBSCRIBE_NEW_BLOCK, dataObj); this.emitter.emit(SocketConnect.RECONNECT); } else if (dataObj.query === QueryParam.EVENT_LOG) { // subscribe EventLog succeed this.subscriptions[dataObj.query] = { id: dataObj.query, parameters: dataObj, }; this.emitter.emit(StatusType.SUBSCRIBE_EVENT_LOG, dataObj); this.emitter.emit(SocketConnect.RECONNECT); } else if (dataObj.query === QueryParam.UNSUBSCRIBE) { this.emitter.emit(MessageType.UNSUBSCRIBE, dataObj); } else { throw new Error("unsupported message type"); } } else { throw new Error("message data is empty"); } } addEventListener(type: string, handler: mitt.Handler) { this.emitter.on(type, handler); } connecting() { return this.websocket.readyState === this.websocket.CONNECTING; } send(query: NewBlockQuery | NewEventQuery): Promise { return new Promise((resolve, reject) => { if (!this.connecting()) { try { this.websocket.send(JSON.stringify(query)); } catch (error) { throw error; } let queryParam; if (query.query === QueryParam.NEW_BLOCK) { queryParam = StatusType.SUBSCRIBE_NEW_BLOCK; } else if (query.query === QueryParam.EVENT_LOG) { queryParam = StatusType.SUBSCRIBE_EVENT_LOG; } else { queryParam = query.query; } this.emitter.on(queryParam, (data) => { resolve(data); }); this.emitter.on(SocketConnect.ERROR, reject); } const connectHandler = () => { this.send(query).then(resolve).catch(reject); }; const offConnectHandler = () => { this.emitter.off(SocketConnect.CONNECT, connectHandler); }; this.emitter.on(SocketConnect.CONNECT, connectHandler); this.emitter.on(SocketConnect.RECONNECT, offConnectHandler); }); } async subscribe(payload: NewBlockQuery | NewEventQuery): Promise { const result = await this.send(payload); return result.query === payload.query; } async unsubscribe(payload: Unsubscribe): Promise { const result = await this.send(payload); // todo handle separately const succeed = result.query === payload.query; if (succeed) { this.subscriptions[payload.query] = null; } return succeed; } }