import type { Socket } from 'node:net' import type { Command, CommandMethods, CommandResponse, Envelope, Message, MessageTypes, Notification, UnknownCommandResponse, } from '../../types/index.ts' import { BlipError } from '../bliperror.ts' import { RetryableError } from '../retryableerror.ts' import { ConnectionSender, type ConnectionSenderConstructor, OpenConnectionSender } from '../sender.ts' import { SessionNegotiator } from '../sessionnegotiator.ts' import { EnvelopeThrottler } from '../throttler.ts' import { tryParseJSON } from './json-parser.ts' /** * @remarks * The TCP implementation has a known limitation where large command * responses can be lost and the awaiting promise will never resolve. * Unless TCP is explicitly required, prefer using {@link WebSocketSender} */ export class TCPSender extends OpenConnectionSender { private readonly throttler = new EnvelopeThrottler() private readonly connectionHandle: TCPHandle constructor(options: ConstructorParameters[0]) { super(options) const prefix = options.tenantId ? `${options.tenantId}.` : '' this.connectionHandle = new TCPHandle( `${prefix}tcp.${this.domain}`, 443, (socket) => { this.sessionNegotiator = new SessionNegotiator(this, (session) => { socket.write(JSON.stringify(session)) }) return this.sessionNegotiator.negotiate({ node: options.node, authentication: options.authentication, }) }, () => this.envelopeResolver.rejectPendingEnvelopes('Connection was closed'), (envelope: Envelope) => { if (this.sessionNegotiator?.negotiating) { return this.sessionNegotiator.handleEnvelope(envelope) } this.envelopeResolver.resolve(envelope) }, ) } public async sendNotification(notification: Notification): Promise { const socket = await this.connectionHandle.get() socket.write(JSON.stringify(notification)) } public async sendMessage(message: Message): Promise { await this.throttler.throttle('message') const socket = await this.connectionHandle.get() await this.sessionNegotiator?.ensurePresence() socket.write(JSON.stringify(message)) } public sendCommand(command: Command): Promise { return this.withRetryPolicy(async () => { await this.throttler.throttle('command') const envelopeResponsePromise = this.envelopeResolver.createEnvelopeResponsePromise(command.id) const socket = await this.connectionHandle.get() await this.sessionNegotiator?.ensurePresence(command.uri) socket.write(JSON.stringify(command)) const response = (await envelopeResponsePromise) as CommandResponse< unknown, CommandMethods, 'success' | 'failure' > if (BlipError.isFailedCommandResponse(response)) { throw BlipError.commandResponseToBlipError(command.uri, response) } else if (response.status === 'success') { return response.resource } else { throw new Error(`Unexpected response for command '${command.uri}': ${JSON.stringify(response)}`) } }) } public async sendCommandResponse(response: UnknownCommandResponse): Promise { const socket = await this.connectionHandle.get() socket.write(`${JSON.stringify(response)}\n`) } public async close() { this.sessionNegotiator?.finish() await this.connectionHandle.close() await super.close() } public static login = ConnectionSender.login private async withRetryPolicy(fn: () => Promise, retries = 10): Promise { try { return await fn() } catch (err) { if (retries > 0 && RetryableError.isRetryable(err)) { // wait before retrying await new Promise((resolve) => setTimeout(resolve, Math.random() * 1000)) return this.withRetryPolicy(fn, retries - 1) } throw err } } } class TCPHandle { private currentSocketPromise: Promise | null = null private closing = false private connectionAttempts = 0 constructor( host: string, port: number, onConnected: (socket: Socket) => Promise, onClose: () => void, onMessage: (message: T) => void, ) { this.currentSocketPromise = this.connect(host, port, onConnected, onClose, onMessage) } public get() { if (!this.currentSocketPromise) { throw new Error('TCP connection is not available.') } return this.currentSocketPromise } public async close() { if (!this.closing && this.currentSocketPromise !== null) { this.closing = true const current = await this.currentSocketPromise current.end().removeAllListeners().destroySoon() } } private async connect( host: string, port: number, onConnected: (socket: Socket) => Promise, onClose: () => void, onMessage: (message: T) => void, ): Promise { const { connect } = await import('node:net') const socket = connect({ host, port }).setKeepAlive(true) let buffer = Buffer.alloc(0) await new Promise((resolve) => { socket.once('connect', () => { resolve() }) socket.once('error', (err) => { if (!this.closing) { throw err } }) socket.once('close', () => { if (!this.closing) { onClose() this.connectionAttempts++ if (this.connectionAttempts < 3) { this.currentSocketPromise = this.connect(host, port, onConnected, onClose, onMessage) } else { throw new Error('Failed to connect/reconnect to TCP socket') } } }) socket.on('data', (chunk: Buffer) => { const result = tryParseJSON(buffer.length === 0 ? chunk : Buffer.concat([buffer, chunk])) buffer = result.remainingBuffer for (const parsed of result.parsedObjects) { onMessage(parsed) } }) }) await onConnected(socket) this.connectionAttempts = 0 return socket } }