// Global import WebSocket from 'ws'; import { Readable, Writable } from 'stream'; import pino from 'pino'; import { EventEmitter } from 'events'; // Project import './config'; import * as Api from './api'; import { ImagePacketTypes } from './api'; import * as Types from './types'; import { getMacros } from './macros'; import { webSocketStateNames } from './ws'; import { decodeCodecHeader, getTime, packPacket, unpackPacket } from './utils'; import delay from 'delay'; import { initAudioStream } from './audio'; import pEvent from 'p-event'; import { DEFAULT_LOGGER_OPTIONS } from './logger'; // // Const // const ZELLO_PUBLIC_SERVER = 'wss://zello.io/ws'; const DEFAULT_COMMAND_TIMEOUT = process.env.COMMAND_TIMEOUT != null ? parseInt(process.env.COMMAND_TIMEOUT) : 10; export const DEFAULT_ZELLO_OPTIONS: Types.Options = { logger: DEFAULT_LOGGER_OPTIONS, name: 'bot', }; const botCounters = new Map(); async function zello( address?: string, script?: Types.Script, options?: Partial, ): Promise; async function zello(address?: string, options?: Partial): Promise; async function zello(script?: Types.Script, options?: Partial): Promise; async function zello(options?: Partial): Promise; async function zello( a?: string | Types.Script | Partial, b?: Types.Script | Partial, c?: Partial, ): Promise { const address = Types.isServerAddress(a) ? a : ZELLO_PUBLIC_SERVER; const script = Types.isScript(a) ? a : Types.isScript(b) ? b : null; const options: Types.Options = { ...DEFAULT_ZELLO_OPTIONS, ...(Types.isOptions(a) ? a : Types.isOptions(b) ? b : Types.isOptions(c) ? c : null), }; const pinoLogger = options.logger instanceof EventEmitter ? options.logger : pino(options.logger); // Get bot name if (!botCounters.has(options.name)) { botCounters.set(options.name, 0); } const counter = botCounters.get(options.name)!; const name = counter > 0 ? options.name + '-' + counter : options.name; botCounters.set(options.name, counter + 1); pinoLogger.debug(`Assigning name "${name}" to the bot`); const logger = pinoLogger.child({ bot: name }); let closeRequested = false; let deferredClosePromise: Types.DeferredPromise; const closePromise = new Promise((resolve) => { deferredClosePromise = { resolve }; }); logger.info('Starting Zello'); let seq: number = 0; let seqReservedCount: number = 0; let ws: WebSocket; const commandPromises = new Map }>(); let deferredExceptionPromise: Types.DeferredPromise; const exceptionPromise = new Promise((resolve) => { deferredExceptionPromise = { resolve }; }); // TODO: Rewrite callbacks to be queued // Currently they get easily overwritten const callbacks: Types.EventCallbacks = {}; const expectedAudioStreams = new Map(); // const expectedImageStreams = new Map(); function runCommand( commandCode: T, request: Omit, ): Promise { logger.debug(`Running command: ${commandCode}`); logger.trace(request, 'Command request'); seqReservedCount++; const seqCurrent = seq + seqReservedCount; const promise = new Promise(function (resolve, reject) { commandPromises.set(seqCurrent, { command: commandCode, promise: { resolve, reject } }); }); if (ws.readyState === WebSocket.OPEN) { if (Types.isCommandSendAudioData(request, commandCode)) { // Special case for sendAudioData command: create and return getDataStream // This promise resolves when send stream is created const commandPromise = commandPromises.get(seqCurrent)!; commandPromises.delete(seqCurrent); const streamInfo = setupAudioDataStream(request); commandPromise.promise.resolve(streamInfo); } else if (Types.isCommandSendImageData(request, commandCode)) { // Special case for sendImageData command: create and return getDataStream // This promise resolves when send stream is created const commandPromise = commandPromises.get(seqCurrent)!; commandPromises.delete(seqCurrent); const sendImageFunction = sendImage(request); commandPromise.promise.resolve(sendImageFunction); } else { // Sending command via websocket //const startTime = getTime(); ws.send( JSON.stringify({ ...request, command: commandCode, seq: seqCurrent, }), (err) => { //logger.debug(`Command sent in ${Number(getTime() - startTime) / 1000} μs`); if (err == null) { seq = seqCurrent; } else { if (commandPromises.has(seqCurrent)) { const commandPromise = commandPromises.get(seqCurrent)!; commandPromises.delete(seqCurrent); commandPromise.promise.reject(err); } } }, ); } } else { const msg = `Cannot send command: socket state is ${webSocketStateNames[ws.readyState]}`; logger.warn(msg); // Cannot send command, reject and remove it immediately const commandPromise = commandPromises.get(seqCurrent)!; commandPromises.delete(seqCurrent); commandPromise.promise.reject(new Error(msg)); } return promise; } async function sendBinaryPacket(packet: Buffer) { return new Promise((resolve, reject) => { if (ws.readyState === WebSocket.OPEN) { ws.send(packet, { binary: true }, function (err) { if (err) { logger.warn(`Cannot send packet`); reject(err); } else { resolve(true); } }); } else { reject(new Error('Socket is not open, cannot send image data')); } }); } function sendImage(request: Types.CommandMap['send_image_data'][0]): Types.CommandMap['send_image_data'][1] { const { imageId, fullSizeData, thumbnailData } = request; const fullSizePacket = packPacket({ data: fullSizeData, type: Api.PacketTypes.IMAGE, imageId, packetType: ImagePacketTypes.FULL_SIZE, }); const thumbnailPacket = packPacket({ data: thumbnailData, type: Api.PacketTypes.IMAGE, imageId, packetType: ImagePacketTypes.THUMBNAIL, }); return async function () { await sendBinaryPacket(thumbnailPacket); await sendBinaryPacket(fullSizePacket); }; } let outgoingDataStream: Writable | null = null; function setupAudioDataStream( request: Types.CommandMap['send_audio_data'][0], ): Types.CommandMap['send_audio_data'][1] { let packetId = 0; let lastTime: bigint; // Create a writable stream outgoingDataStream = new Writable({ write: async function (chunk: Buffer | undefined, encoding, callback) { if (chunk) { logger.trace(`Received packet: ${packetId}`); const packet = packPacket({ data: chunk, type: Api.PacketTypes.AUDIO, streamId: request.streamId, packetId: packetId++, }); if (lastTime != null) { // The timer has been started when sending previous packet. // Measure in milliseconds the time passed since then. const diff = Math.ceil(Number(getTime() - lastTime) / 1000000); // Delay the rest milliseconds logger.trace(`Will send packet ${packetId} after: ${request.frameSize - diff} ms `); await delay(request.frameSize - diff); } try { await sendBinaryPacket(packet); } catch (err) { callback(err); return; } lastTime = getTime(); callback(); return; } callback(new Error('Cannot send: not a packet')); }, }); return { stream: outgoingDataStream, }; } function stopAudioDataStream() { if (outgoingDataStream != null) { logger.info('Stopping outgoing stream'); outgoingDataStream.destroy(); pEvent(outgoingDataStream, 'close').then(() => { outgoingDataStream = null; }); } } async function executeScript(script: Types.Script, props: Types.Zello): Promise { logger.debug('Executing user script'); const gen = script(props); // Check to see if our script is Generator or Promise if (Types.isScriptGenerator(gen)) { // It's not necessary to use this, but it allows to catch // async exceptions which otherwise would be missed. let param: any = undefined; let error: Error | undefined = undefined; const yieldCounter = 1; while (true) { logger.debug(`Executing yield ${yieldCounter} of the user script (${error == null ? 'normal' : 'error'})`); const obj = !error ? gen.next(param) : gen.throw(error); const value = obj.value; const done = obj.done; if (done) { logger.debug('Exiting user script'); return value; } if (Types.isPromise(value)) { let res: any; try { res = await Promise.race([value, exceptionPromise]); } catch (err) { logger.debug('User script promise rejected'); res = err; } // noinspection SuspiciousTypeOfGuard if (res instanceof Error) { error = res; } else { param = res; } } else { logger.error(`User script must yield only promises, "${typeof value}" received`); } } } else { // It's a Promise most likely or just anything return gen; } } function setEventHandler(eventCode: T): Types.Event { return function (cb) { logger.debug(`Setting event callback: ${eventCode}`); // @ts-ignore callbacks[eventCode] = cb; }; } function setAwaitHandler(eventName: T): Types.Await { return async function (filter, timeout) { logger.debug(`Running await "${eventName}" with timeout ${timeout} seconds`); const delayPromise = delay(timeout * 1000); const res = await Promise.race([ new Promise((resolve) => { // @@ts-ignore const event = events[eventName] as Types.Event; event((event) => { if ((typeof filter === 'function' && filter(event)) || filter) { resolve(event); } }); }), exceptionPromise, delayPromise, ]); if (res != null) { delayPromise.clear(); if (res instanceof Error) { // exceptionPromise worked out throw Error; } else { // Normal return return res; } } else { throw new Error('Command timeout'); } }; } function setCommandHandler(commandCode: T): Types.Command { return async function (request, timeout: number = DEFAULT_COMMAND_TIMEOUT) { logger.debug(`Running command "${commandCode}" with timeout ${timeout} seconds`); const delayPromise = delay(timeout * 1000); let error: Error | null = null; let res: Types.CommandMap[T][1] | Error | void; try { res = await Promise.race([runCommand(commandCode, request), exceptionPromise, delayPromise]); } catch (err) { error = err; } // Cancel the timers delayPromise.clear(); if (error != null) { throw error; } if (res != null) { if (res instanceof Error) { // exceptionPromise worked out throw res; } else { // Normal return return res; } } else { // delayPromise worked out throw new Error('Command timeout'); } }; } const events: Types.Events = { onChannelStatus: setEventHandler('on_channel_status'), onTextMessage: setEventHandler('on_text_message'), onStreamStart: setEventHandler('on_stream_start'), onStreamStop: setEventHandler('on_stream_stop'), onAudioData: setEventHandler('on_audio_data'), onImage: setEventHandler('on_image'), onImageData: setEventHandler('on_image_data'), onError: setEventHandler('on_error'), }; const awaits: Types.Awaits = { onChannelStatus: setAwaitHandler('onChannelStatus'), onTextMessage: setAwaitHandler('onTextMessage'), onStreamStart: setAwaitHandler('onStreamStart'), onStreamStop: setAwaitHandler('onStreamStop'), onAudioData: setAwaitHandler('onAudioData'), onImage: setAwaitHandler('onImage'), onImageData: setAwaitHandler('onImageData'), onError: setAwaitHandler('onError'), }; const commands: Types.Commands = { logon: setCommandHandler('logon'), sendTextMessage: setCommandHandler('send_text_message'), startStream: setCommandHandler('start_stream'), stopStream: setCommandHandler('stop_stream'), sendAudioData: setCommandHandler('send_audio_data'), sendImage: setCommandHandler('send_image'), sendImageData: setCommandHandler('send_image_data'), }; try { ws = await new Promise(function (resolve, reject) { try { logger.debug(`Connecting to: ${address}`); const ws = new WebSocket(address, { host: 'zello.io', }); ws.addEventListener( 'open', () => { logger.info(`Connected to: ${address}!`); resolve(ws); }, { once: true }, ); ws.addEventListener( 'error', (err) => { logger.error('Connection error'); logger.debug(err, 'Connection error'); reject(err); }, { once: true }, ); } catch (err) { reject(err); } }); ws.addEventListener('error', (err) => { logger.error('Socket error'); logger.debug(err); const exception = new Error(`Unexpected websocket error: ${err.message}`); deferredExceptionPromise.resolve(exception); }); ws.addEventListener('close', (event) => { if (closeRequested) { logger.info('Closing socket (normal)'); } else { logger.warn('Closing socket (emergency)'); } deferredClosePromise.resolve(); if (!closeRequested) { const exception = new Error(`Unexpected close, code: ${event.code}, reason: ${event.reason}`); deferredExceptionPromise.resolve(exception); } }); ws.addEventListener('message', ({ type, data }) => { logger.trace('Received message'); //logger.trace({ type, data }, 'Message details'); if (ws.readyState === WebSocket.OPEN) { if (type === 'message') { if (typeof data === 'string') { const json = JSON.parse(data); if (Api.isCommandResponse(json)) { // // Command response // logger.trace(`Message is command response, seq = ${seq}`); if (commandPromises.has(json.seq)) { const commandPromise = commandPromises.get(json.seq)!; logger.trace(`Found original command: ${commandPromise.command}`); commandPromises.delete(json.seq); commandPromise.promise.resolve(json); } } else if (Api.isEvent(json)) { // // Event // const eventCode = json.command; logger.trace(`Message is event: ${eventCode}`); const callback = callbacks[eventCode]; if (callback != null) { logger.trace(`Running callback`); callback(json as any); } // Special case for audio stream starts if (Api.isEventStreamStart(json)) { // TODO: close stream if it's not the stream we requested // Check if we have a callback for it const callback = callbacks['on_audio_data']; if (callback != null) { logger.trace('Found audio stream callback, creating audio stream'); // Create audio stream const stream = new Readable({ read: () => {} }); expectedAudioStreams.set(json.stream_id, stream); const opusInfo = decodeCodecHeader(json.codec_header); callback({ event: json, opusInfo, getStream: initAudioStream(json, opusInfo, stream, logger), }); } } else if (Api.isEventStreamStop(json)) { // Stop outgoing stream if any if (outgoingDataStream != null) { logger.info('Received on_stream_stop while transmitting data'); stopAudioDataStream(); } const callback = callbacks['on_audio_data']; if (callback != null) { logger.trace('Found audio stream callback, creating audio stream'); if (expectedAudioStreams.has(json.stream_id)) { const stream = expectedAudioStreams.get(json.stream_id)!; expectedAudioStreams.delete(json.stream_id); // Finishing stream stream.push(null); } } } else if (Api.isEventError(json)) { // Special case for errors // We should reject all deferred commands for (const key of Array.from(commandPromises.keys())) { const commandPromise = commandPromises.get(key)!; logger.trace(`Cancelling command: ${commandPromise.command}`); commandPromises.delete(key); commandPromise.promise.reject(new Error(json.error)); } } } else { logger.warn(json, 'Unknown message'); } } else if (typeof data === 'object' && Buffer.isBuffer(data)) { logger.trace(`Message is data packet of length ${data.length}`); //logger.trace(data, 'Data packet'); const packet = unpackPacket(data); if (packet != null) { if (Api.isPacketAudio(packet)) { logger.trace(`Message is audio packet number: ${packet.packetId}`); if (expectedAudioStreams.has(packet.streamId)) { const stream = expectedAudioStreams.get(packet.streamId)!; stream.push(packet.data); } else { // Stop outgoing stream if any as we cannot receive a packet of // this type during a transmission. if (outgoingDataStream != null) { logger.debug('Received audio data while transmitting audio'); stopAudioDataStream(); } } } else if (Api.isPacketImage(packet)) { logger.warn('Image receiving is not implemented'); } else { logger.debug('Unknown packet'); } } else { logger.warn('Empty packet'); } } else { logger.warn('Unknown message data type'); } } else { logger.warn('Type of message is not "message"'); } } else { logger.warn('Received message while ws.readyState = ' + webSocketStateNames[ws.readyState]); } }); const ctl: Types.Ctl = { close: () => { logger.debug('Ctl: requested socket close'); ws.close(); closeRequested = true; // Destroy outgoing stream if any if (outgoingDataStream != null) { stopAudioDataStream(); } // Close all incoming streams for (const key of expectedAudioStreams.keys()) { const item = expectedAudioStreams.get(key)!; item.destroy(); expectedAudioStreams.delete(key); } return closePromise; }, status: () => { logger.debug('Ctl: requested status'); return webSocketStateNames[ws.readyState]; }, run: async (script) => { logger.debug('Ctl: requested user script execution'); return await executeScript(script, zello); }, }; const zelloMacro: Types.ZelloMacro = { ctl, events, commands, awaits, logger, name, }; const zello: Types.Zello = { ...zelloMacro, macros: getMacros(zelloMacro), }; if (script) { await executeScript(script, zello); } return zello; } catch (err) { throw new Error(err.message); } } export default zello;