import { type CommonNotification, type Envelope, type FailedNotification, isCommand, isMessage, isNotification, type Json, type MaybePromise, Node, type Notification, type UnknownCommand, type UnknownMessage, } from '../types/index.ts' import { ReasonCodes } from '../types/reason.ts' import { RetryableError } from './retryableerror.ts' import type { OpenConnectionSender } from './sender.ts' export type EventMap = { message: (message: UnknownMessage) => MaybePromise command: (command: UnknownCommand) => MaybePromise | MaybePromise notification: (notification: Notification) => MaybePromise } export type Listener = { predicate?: (...args: Parameters) => boolean callback: (...args: Parameters) => ReturnType } type Listeners = { [K in keyof EventMap]: Array> } export class EnvelopeResolver { private readonly timeoutListener: NodeJS.Timeout private readonly listeners: Listeners = { message: [], command: [], notification: [], } private readonly waitingEnvelopeResponseResolvers: Record< string, { started: number resolve: (response: Envelope) => void reject: (reason?: Error) => void } > = {} constructor( private readonly sender: OpenConnectionSender, timeoutSeconds = 60 * 15, // Default timeout of 15 minutes ) { this.timeoutListener = setInterval(() => { const now = Date.now() for (const id in this.waitingEnvelopeResponseResolvers) { const resolver = this.waitingEnvelopeResponseResolvers[id] if (now - resolver.started > timeoutSeconds * 1000) { this.resolveEnvelopeResponse( id, new RetryableError(`Envelope response timed out after ${timeoutSeconds} seconds`), ) } } }, 5000) } public async resolve(envelope: Envelope) { const deferred = this.waitingEnvelopeResponseResolvers[envelope.id] if (deferred) { this.resolveEnvelopeResponse(envelope.id, envelope) } else if (isCommand(envelope)) { if (envelope.method === 'get' && envelope.uri === '/ping') { await this.sender.sendCommandResponse({ id: envelope.id, to: envelope.from, method: 'get', type: 'application/vnd.lime.ping+json', status: 'success', resource: {}, }) } else { try { const response = await this.emit('command', envelope) const type = typeof response === 'string' ? 'text/plain' : response ? 'application/json' : undefined const resource = typeof response === 'string' || response ? response : undefined await this.sender.sendCommandResponse({ id: envelope.id, method: envelope.method, to: envelope.from, status: 'success', type, resource, }) } catch (err) { await this.sender.sendCommandResponse({ id: envelope.id, method: envelope.method, to: envelope.from, status: 'failure', reason: { code: ReasonCodes.GeneralError, description: err instanceof Error ? err.message : String(err), }, }) } } } else if (isMessage(envelope)) { const shouldNotify = !envelope.to || this.sender.session?.localNode.toIdentity() === Node.from(envelope.to).toIdentity() const notify = async (notification: CommonNotification | FailedNotification) => { if (shouldNotify) { await this.sender.sendNotification({ ...notification, id: envelope.id, to: notification.event === 'failed' ? envelope.from : (envelope.pp ?? envelope.from), }) } } await notify({ event: 'received' }) try { await this.emit('message', envelope) await notify({ event: 'consumed' }) } catch (err) { await notify({ event: 'failed', reason: { code: ReasonCodes.GeneralError, description: err instanceof Error ? err.message : String(err), }, }) } } else if (isNotification(envelope)) { await this.emit('notification', envelope) } } public createEnvelopeResponsePromise(id: string): Promise { const { resolve, reject, promise } = Promise.withResolvers() this.waitingEnvelopeResponseResolvers[id] = { started: Date.now(), resolve, reject } return promise } public addListener(ev: K, listener: Listener) { if (this.listeners[ev].some((l) => l.callback === listener.callback)) { throw new Error(`Listener for '${ev}' already exists.`) } this.listeners[ev].push(listener) } public removeListener(ev: K, callback: Listener['callback']) { this.listeners[ev] = this.listeners[ev].filter((l) => l.callback !== callback) as Listeners[K] } public close() { clearInterval(this.timeoutListener) for (const id in this.waitingEnvelopeResponseResolvers) { this.resolveEnvelopeResponse(id, new Error(`Connection for envelope '${id}' was closed`)) } } public rejectPendingEnvelopes(reason: string) { for (const id in this.waitingEnvelopeResponseResolvers) { this.resolveEnvelopeResponse(id, new RetryableError(reason)) } } private resolveEnvelopeResponse(id: string, envelopeOrError: Envelope | Error) { const deferred = this.waitingEnvelopeResponseResolvers[id] if (deferred) { delete this.waitingEnvelopeResponseResolvers[id] if (envelopeOrError instanceof Error) { deferred.reject(envelopeOrError) } else { deferred.resolve(envelopeOrError) } } } private async emit( ev: K, ...args: Parameters ): Promise { for (const listener of this.listeners[ev]) { if (!listener.predicate || listener.predicate(...args)) { const result = await listener.callback(...args) if (result !== undefined) { return result } } } return undefined } }