/* * Philip Crotwell * University of South Carolina, 2019 * https://www.seis.sc.edu */ import * as util from "./util"; // for util.log import * as miniseed from "./miniseed"; import { DataRecord } from "./miniseed"; import { DateTime } from "luxon"; import { dataViewToString, stringify, toError } from "./util"; export const WS_SEEDLINK3_SUBPROTOCOL = "SeedLink3.1"; export type SequencedDataRecord = { rawsequence: string; sequence: number; miniseed: DataRecord; }; /** * A seedlink websocket connection to the given url. * The connection is not made until the connect() method is called. * Note this cannot connect directly to a native TCP socket, instead it * sends the seedlink protocol over a websocket. Currently only the IRIS * ringserver, https://github.com/iris-edu/ringserver, * supports websockets, but it may be possible to use third party * tools to proxy the websocket to a TCP seedlink socket. * * The seedlink (ver 3) protocol does not have an official spec document, but * some details are here: * https://www.seiscomp.de/doc/apps/seedlink.html * * @param url websocket URL to connect to * @param requestConfig an array of seedlink commands * like:

 *   [ 'STATION JSC CO',
 *     'SELECT 00BHZ.D' ]
 *     
* @param receiveMiniseedFn the callback function that * will be invoked for each seedlink packet received * which contains 'sequence', a sequence number * and 'miniseed', a single miniseed record. */ export class SeedlinkConnection { url: string; requestConfig: Array; receiveMiniseedFn: (packet: SequencedDataRecord) => void; errorHandler: (error: Error) => void; closeFn: null | ((close: CloseEvent) => void); webSocket: null | WebSocket; subprotocol: string | Array; command: string; helloLines: Array = []; constructor( url: string, requestConfig: Array, receiveMiniseedFn: (packet: SequencedDataRecord) => void, errorHandler: (error: Error) => void, ) { this.url = url; this.requestConfig = requestConfig; this.receiveMiniseedFn = receiveMiniseedFn; this.errorHandler = errorHandler; this.closeFn = null; this.command = "DATA"; this.webSocket = null; this.subprotocol = WS_SEEDLINK3_SUBPROTOCOL; } setTimeCommand(startTime: DateTime) { this.command = "TIME " + startTime.toFormat("yyyy,LL,dd,HH,mm,ss"); } setOnError(errorHandler: (error: Error) => void) { this.errorHandler = errorHandler; } setOnClose(closeFn: (close: CloseEvent) => void) { this.closeFn = closeFn; } connect() { return this.interactiveConnect() .then(() => { return this.sendHello(); }) .then((lines) => { this.helloLines = lines; return this.sendCmdArray(this.requestConfig); }) .then(() => { return this.sendCmdArray([this.command]); }) .then((val) => { if (this.webSocket === null) { throw new Error("websocket is null"); } this.webSocket.onmessage = (event) => { this.handle(event); }; this.webSocket.send("END\r"); return val; }) .catch((err) => { this.close(); const insureErr = err instanceof Error ? err : new Error(stringify(err)); if (this.errorHandler) { this.errorHandler(insureErr); } else { throw insureErr; } }); } interactiveConnect(): Promise { if (this.webSocket) { this.webSocket.close(); this.webSocket = null; } return new Promise((resolve, reject) => { try { const webSocket = new WebSocket(this.url, this.subprotocol); this.webSocket = webSocket; webSocket.binaryType = "arraybuffer"; webSocket.onopen = () => { resolve(this); }; webSocket.onerror = (event: Event) => { const evtError = toError(event); this.handleError(evtError); reject(evtError); }; webSocket.onclose = (closeEvent) => { if (this.closeFn) { this.closeFn(closeEvent); } if (this.webSocket) { this.webSocket = null; } }; } catch (err) { this.close(); const evtError = toError(err); if (this.errorHandler) { this.errorHandler(evtError); } reject(evtError); } }).then(function (sl3: unknown) { return sl3 as SeedlinkConnection; }).catch( e => { if (!this.webSocket?.protocol || this.webSocket.protocol.length === 0) { throw new Error(`fail to create websocket, possible due to subprotocol: sent subprotocol=${this.subprotocol} received empty`); } throw e; }); } close(): void { if (this.webSocket) { this.webSocket.close(); this.webSocket = null; } } handle(event: MessageEvent): void { if (event.data instanceof ArrayBuffer || event.data instanceof SharedArrayBuffer) { const data: ArrayBufferLike = event.data; if (data.byteLength < 64) { //assume text } else { this.handleMiniseed(data); } } else { // ?? error?? this.handleError(new Error("Unknown message type" + JSON.stringify(event))); } } handleMiniseed(data: ArrayBufferLike): void { try { if (data.byteLength < 64) { this.errorHandler( new Error( "message too small to be miniseed: " + data.byteLength + " " + dataViewToString(new DataView(data)), ), ); return; } const slHeader = new DataView(data, 0, 8); // check for 'SL' at start if (slHeader.getInt8(0) === 83 && slHeader.getInt8(1) === 76) { let seqStr = ""; for (let i = 0; i < 6; i++) { seqStr = seqStr + String.fromCharCode(slHeader.getInt8(2 + i)); } const dataView = new DataView(data, 8, data.byteLength - 8); const out = { rawsequence: seqStr, sequence: parseInt(seqStr, 16), miniseed: miniseed.parseSingleDataRecord(dataView), }; this.receiveMiniseedFn(out); } else { throw new Error( "Not a seedlink packet, no starting SL: " + slHeader.getInt8(0) + " " + slHeader.getInt8(1), ); } } catch (e) { this.errorHandler(toError(e)); this.close(); } } isConnected(): boolean { return this.webSocket !== null; } /** * Sends initial HELLO to server and waits for response. * * @returns Promise that resolves to the response from the server. */ sendHello(): Promise<[string, string]> { const webSocket = this.webSocket; const promise: Promise<[string, string]> = new Promise(function ( resolve, reject, ) { if (webSocket) { webSocket.onmessage = function (event) { if (event.data instanceof ArrayBuffer || event.data instanceof SharedArrayBuffer) { const data: ArrayBufferLike = event.data; const replyMsg = dataViewToString(new DataView(data)); const lines = replyMsg.trim().split("\r"); if (lines.length === 2) { resolve([lines[0], lines[1]]); } else { reject(new Error("not 2 lines: " + replyMsg)); } } else { reject(new Error("event.data not ArrayBufferLike?")); } }; webSocket.send("HELLO\r"); } else { reject(new Error("webSocket has been closed")); } }); return promise; } /** * Sends an array of commands, each as a Promise waiting for the 'OK' response * before sending the next. * * @param cmd array of commands to send * @returns Promise that resolves to the 'OK' returned by the last * command if successful, or rejects on the first failure. */ sendCmdArray(cmd: Array): Promise { return cmd.reduce((accum: Promise, next: string) => { return accum.then((): Promise => { return this.createCmdPromise(next); }); }, Promise.resolve("OK")); } /** * creates a Promise that sends a command and waits resolved with the result. * * @param mycmd command string to send. * @returns Promise that resolves to the reply from the server. */ createCmdPromise(mycmd: string): Promise { const webSocket = this.webSocket; const promise: Promise = new Promise(function (resolve, reject) { if (webSocket) { webSocket.onmessage = function (event) { if (event.data instanceof ArrayBuffer || event.data instanceof SharedArrayBuffer) { const data: ArrayBufferLike = event.data; const replyMsg = dataViewToString(new DataView(data)).trim(); if (replyMsg === "OK") { resolve(replyMsg); } else { reject(new Error("msg not OK: " + replyMsg)); } } else { reject(new Error("event.data not ArrayBufferLike?")); } }; webSocket.send(mycmd + "\r\n"); } else { reject(new Error("webSocket has been closed")); } }); return promise; } /** * handle errors that arise * * @private * @param error the error */ handleError(error: Error): void { if (this.errorHandler) { this.errorHandler(error); } else { util.log("seedlink handleError: " + error.message); } } }