import type { Socket } from 'node:net' import type { ConnectionOptions } from 'node:tls' import type { Command, CommandMethods, CommandResponse, Envelope, Message, MessageTypes, Notification, UnknownCommandResponse, } from '../../types/index.ts' import { logger } from '../../utils/logger.ts' import { BlipError } from '../bliperror.ts' import { RetryableError } from '../retryableerror.ts' import { ConnectionSender, type ConnectionSenderConstructor, OpenConnectionSender } from '../sender.ts' import { SessionNegotiator } from '../sessionnegotiator.ts' import { EnvelopeThrottler } from '../throttler.ts' import { buildTlsConnectOptions } from '../transporttls.ts' import { tryParseJSON } from './json-parser.ts' /** * @remarks * The TCP implementation has a known limitation where large command * responses can be lost and the awaiting promise will never resolve. * Unless TCP is explicitly required, prefer using {@link WebSocketSender} */ export class TCPSender extends OpenConnectionSender { private readonly throttler = new EnvelopeThrottler() private readonly connectionHandle: TCPHandle constructor(options: ConstructorParameters[0]) { super(options) const prefix = options.tenantId ? `${options.tenantId}.` : '' const host = `${prefix}tcp.${this.domain}` const auth = options.authentication const tlsOptions: ConnectionOptions | null = auth.scheme === 'transport' ? { ...buildTlsConnectOptions(auth), servername: host } : null this.connectionHandle = new TCPHandle( host, 443, () => { this.sessionNegotiator = new SessionNegotiator(this, (session) => { this.connectionHandle .get() .then((s) => s.write(JSON.stringify(session))) .catch((err) => logger.warn('TCPSender', 'Failed to write session frame', err)) }) const upgradeToTls = tlsOptions ? () => this.connectionHandle.upgradeToTls(tlsOptions) : undefined return this.sessionNegotiator.negotiate({ node: options.node, authentication: auth, upgradeToTls, }) }, () => this.envelopeResolver.rejectPendingEnvelopes('Connection was closed'), (envelope: Envelope) => { if (this.sessionNegotiator?.negotiating) { return this.sessionNegotiator.handleEnvelope(envelope) } this.envelopeResolver.resolve(envelope) }, ) } public async sendNotification(notification: Notification): Promise { const socket = await this.connectionHandle.get() socket.write(JSON.stringify(notification)) } public async sendMessage(message: Message): Promise { await this.throttler.throttle('message') const socket = await this.connectionHandle.get() await this.sessionNegotiator?.ensurePresence() socket.write(JSON.stringify(message)) } public sendCommand(command: Command): Promise { return this.withRetryPolicy(async () => { await this.throttler.throttle('command') const envelopeResponsePromise = this.envelopeResolver.createEnvelopeResponsePromise(command.id) const socket = await this.connectionHandle.get() await this.sessionNegotiator?.ensurePresence(command.uri) socket.write(JSON.stringify(command)) const response = (await envelopeResponsePromise) as CommandResponse< unknown, CommandMethods, 'success' | 'failure' > if (BlipError.isFailedCommandResponse(response)) { throw BlipError.commandResponseToBlipError(command.uri, response) } else if (response.status === 'success') { return response.resource } else { throw new Error(`Unexpected response for command '${command.uri}': ${JSON.stringify(response)}`) } }) } public async sendCommandResponse(response: UnknownCommandResponse): Promise { const socket = await this.connectionHandle.get() socket.write(`${JSON.stringify(response)}\n`) } public async close() { this.sessionNegotiator?.finish() await this.connectionHandle.close() await super.close() } public static login = ConnectionSender.login private async withRetryPolicy(fn: () => Promise, retries = 10): Promise { try { return await fn() } catch (err) { if (retries > 0 && RetryableError.isRetryable(err)) { // wait before retrying await new Promise((resolve) => setTimeout(resolve, Math.random() * 1000)) return this.withRetryPolicy(fn, retries - 1) } throw err } } } class TCPHandle { private currentSocketPromise: Promise | null = null private closing = false private connectionAttempts = 0 private buffer = Buffer.alloc(0) private readonly onMessage: (message: T) => void constructor( host: string, port: number, onConnected: () => Promise, onClose: () => void, onMessage: (message: T) => void, ) { this.onMessage = onMessage this.currentSocketPromise = this.connect(host, port, onConnected, onClose) } public get() { if (!this.currentSocketPromise) { throw new Error('TCP connection is not available.') } return this.currentSocketPromise } public async close() { if (!this.closing && this.currentSocketPromise !== null) { this.closing = true const current = await this.currentSocketPromise current.end().removeAllListeners().destroySoon() } } public async upgradeToTls(options: ConnectionOptions): Promise { if (!this.currentSocketPromise) { throw new Error('Cannot upgrade: no active socket.') } const plain = await this.currentSocketPromise plain.removeAllListeners('data') this.buffer = Buffer.alloc(0) const { connect: tlsConnect } = await import('node:tls') const secured = tlsConnect({ ...options, socket: plain }) await new Promise((resolve, reject) => { const onError = (err: Error) => { secured.off('secureConnect', onConnect) reject(err) } const onConnect = () => { secured.off('error', onError) resolve() } secured.once('secureConnect', onConnect) secured.once('error', onError) }) this.attachDataListener(secured) this.currentSocketPromise = Promise.resolve(secured) } private attachDataListener(socket: NodeJS.ReadableStream) { socket.on('data', (chunk: Buffer) => { const result = tryParseJSON(this.buffer.length === 0 ? chunk : Buffer.concat([this.buffer, chunk])) this.buffer = result.remainingBuffer for (const parsed of result.parsedObjects) { this.onMessage(parsed) } }) } private async connect( host: string, port: number, onConnected: () => Promise, onClose: () => void, ): Promise { const { connect } = await import('node:net') const socket = connect({ host, port }).setKeepAlive(true) this.buffer = Buffer.alloc(0) await new Promise((resolve) => { socket.once('connect', () => { resolve() }) socket.once('error', (err) => { if (!this.closing) { throw err } }) socket.once('close', () => { socket.removeAllListeners('data') if (!this.closing) { onClose() this.connectionAttempts++ if (this.connectionAttempts < 3) { this.currentSocketPromise = this.connect(host, port, onConnected, onClose) } else { throw new Error('Failed to connect/reconnect to TCP socket') } } }) this.attachDataListener(socket) }) // sendSession callbacks read currentSocketPromise via .get(); resolve it // before onConnected runs so negotiate() can write the 'new' session frame // (otherwise we deadlock waiting on the outer connect() promise). this.currentSocketPromise = Promise.resolve(socket) await onConnected() this.connectionAttempts = 0 return socket } }