import { type Command, type CommandMethods, type Message, type MessageTypes, Node, type UnknownCommandResponse, } from '../../types/index.ts' import { BlipError } from '../bliperror.ts' import { RetryableError } from '../retryableerror.ts' import { ConnectionSender, type ConnectionSenderConstructor, type Sender } from '../sender.ts' import { EnvelopeThrottler } from '../throttler.ts' export class HttpSender extends ConnectionSender implements Sender { private readonly baseurl: string private readonly authHeaders: Record private readonly throttler = new EnvelopeThrottler() constructor(options: ConstructorParameters[0]) { super(options) const prefix = options.tenantId ? `${options.tenantId}.` : '' this.baseurl = `https://${prefix}http.${this.domain}` if (options.authentication.scheme === 'key') { const token = HttpSender.createToken(options.node, options.authentication.key) this.authHeaders = { Authorization: `Key ${token}` } } else if (options.authentication.scheme === 'token') { this.authHeaders = { Authorization: `Key ${options.authentication.token}` } } else if (options.authentication.scheme === 'external') { this.authHeaders = { Authorization: `Bearer ${options.authentication.token}`, 'http-session-authentication-external-issuer': options.authentication.issuer, 'http-session-identity': Node.from(options.node).toIdentity(), } } else { throw new Error('HttpSender only supports key, token, or external authentication') } } public sendMessage(message: Message): Promise { return this.withFetchRetryPolicy(async () => { await this.throttler.throttle('message') const response = await this.fetch('messages', JSON.stringify(message)) if (!response.ok) { throw new Error(`Failed to send message: ${response.statusText} (${response.status})`) } }) } public sendCommand(command: Command): Promise { return this.withFetchRetryPolicy(async () => { await this.throttler.throttle('command') const response = await this.fetch('commands', JSON.stringify(command)) if (!response.ok) { throw new Error(`Failed to send command: ${response.statusText} (${response.status})`) } const result: UnknownCommandResponse = await response.json() if (BlipError.isFailedCommandResponse(result)) { throw BlipError.commandResponseToBlipError(command.uri, result) } else if (result.status === 'success') { return result.resource } else { throw new Error(`Unexpected response for command '${command.uri}': ${JSON.stringify(response)}`) } }) } public static login = ConnectionSender.login private async fetch(path: 'commands' | 'messages', body: string) { return await fetch(`${this.baseurl}/${path}`, { method: 'POST', headers: { 'Content-Type': 'application/json', ...this.authHeaders, }, body, }) } private async withFetchRetryPolicy(fn: () => Promise, retries = 5): Promise { try { return await fn() } catch (err) { if ( retries > 0 && err instanceof Error && (err.message.endsWith('(429)') || err.message.endsWith('(504)') || err.message.endsWith('(502)') || RetryableError.isRetryable(err) || // Probably cloudflare trying to mitigate a DDoS attack (err.cause instanceof Error && err.cause?.name === 'ConnectTimeoutError') || err.message === 'terminated' || err.message === 'fetch failed') ) { // wait before retrying await new Promise((resolve) => setTimeout(resolve, Math.random() * 1000)) return this.withFetchRetryPolicy(fn, retries - 1) } throw err } } }