import WebSocket from 'ws'; import { Observable, combineLatest } from 'rxjs'; import { filter, ignoreElements, scan, map, startWith } from 'rxjs/operators'; import { EOL } from 'os'; import { localNow } from '../time'; import { isString, isBuffer } from './helpers'; import { Logger, logEvents } from '../logging'; export function logSocketStats( logger: Logger, data: Observable ) { const numberOfMessages = data.pipe( filter(isString), map((_, i) => i + 1), startWith(0) ); const bytesReceived = data.pipe( filter(isBuffer), scan((sum, item: Buffer) => item.byteLength + sum, 0), startWith(0) ); const stats = combineLatest(numberOfMessages, bytesReceived).pipe( map(([msgs, bytes]) => ({ bytesReceived: bytes, numberOfMessages: msgs, timestamp: localNow(), })), logEvents({ prefix: `${EOL}🔃 Connection stats`, suffix: [EOL], on: ['audit', 'unsubscribe'], logger, unsubscribe: { prefix: `${EOL}🔃 Connection stats upon closure`, }, }), ignoreElements() ); return stats; }