import fs from 'fs'; import https from 'https'; import FormData from 'form-data'; import * as stream from 'stream'; import { buildHttpError } from '../errors.js'; import * as HttpErrors from '../httpErrors.js'; import { Credentials } from '../middlewares/credentials.js'; import Logger from '../resources/logger.js'; import { RequestMetrics } from './requestMetrics.js'; /** * RateLimiter is a wrapper function that you can provide to limit the rate of calls to the provider based on the * caller's credentials. * * When necessary, the Provider's Response headers can be inspected to update the rate limit before being returned. * * NOTE: make sure to return one of the supported HttpErrors from the SDK, otherwise the error will be translated to a * generic server (500) error. * * @param options - The credentials and the logger from the RequestOptions passed with the provider call. * @param targetFunction - The function to call the provider. * @returns The response from the provider. * @throws RateLimitExceededError when the rate limit is exceeded. * @throws HttpError when the provider returns an error. */ export type RateLimiter = ( options: { credentials: Credentials; logger: Logger }, targetFunction: () => Promise>, ) => Promise>; /** * RequestOptions are the options passed to the Provider's call. * * @property credentials - The credentials to use for the call. * @property logger - The logger to use during the call. * @property queryParams - The query parameters to add when calling the provider. * @property additionnalheaders - The headers to add when calling the provider. * @property rawBody - Whether to return the raw response body. */ export type RequestOptions = { credentials: Credentials; logger: Logger; signal?: AbortSignal; queryParams?: { [key: string]: string }; additionnalheaders?: { [key: string]: string }; rawBody?: boolean; requestMetrics?: RequestMetrics; }; type InternalRequestOptions = RequestOptions & { defaultHeaders: { 'Content-Type'?: string; Accept?: string }; method: string; }; /** * Response object returned by the Provider's method. * * Contains; * - the body typed as specified when calling the method * - the status code of the response * - the headers of the response. */ export type Response = { body: T; status: number; headers: Record; }; export type PreparedRequest = { url: string; headers: Record; }; export type RequestBody = Record | RequestBody[]; /** * The Provider class is a wrapper around the fetch function to call a provider's HTTP API. * * Defines methods for the following HTTP methods: GET, POST, PUT, PATCH, DELETE. * * Needs to be initialized with a prepareRequest function to define the Provider's base URL and any specific headers to * add to the requests, can also be configured to use a provided rate limiting function, and custom error handler. * * Multiple `Provider` instances can be created, with different configurations to call different providers APIs with * different rateLimiting functions, as needed. * @see {@link RateLimiter} * @see {@link prepareRequest} * @see {@link customErrorHandler} */ export class Provider { /** * The Rate Limiter function to use to limit the rate of calls made to the provider based on the caller's credentials. */ protected rateLimiter: RateLimiter | undefined = undefined; private static recordingSeq = 0; private static get recordingPath(): string | undefined { return process.env.UNITO_SCHEMA_SNAPSHOT_RECORD_PATH; } /** * Function called before each request to define the Provider's base URL and any specific headers to add to the requests. * * This is applied at large to all requests made to the provider. If you need to add specific headers to a single request, * pass it through the RequestOptions object when calling the Provider's methods. */ protected prepareRequest: (options: { credentials: Credentials; logger: Logger; }) => PreparedRequest | Promise; /** * (Optional) Custom error handler to handle specific errors returned by the provider. * * If provided, this method should only care about custom errors returned by the provider and return the corresponding * HttpError from the SDK. If the error encountered is a standard error, it should return undefined and let the SDK handle it. * * @see buildHttpError for the list of standard errors the SDK can handle. */ protected customErrorHandler: | (( responseStatus: number, message: string, options: { credentials: Credentials; logger: Logger; }, ) => HttpErrors.HttpError | undefined) | undefined; /** * Initializes a Provider with the given options. * * @property {@link prepareRequest} - function to define the Provider's base URL and specific headers to add to the request. * @property {@link RateLimiter} - function to limit the rate of calls to the provider based on the caller's credentials. * @property {@link customErrorHandler} - function to handle specific errors returned by the provider. */ constructor(options: { prepareRequest: typeof Provider.prototype.prepareRequest; rateLimiter: RateLimiter | undefined; customErrorHandler?: typeof Provider.prototype.customErrorHandler; }) { this.prepareRequest = options.prepareRequest; this.rateLimiter = options.rateLimiter; this.customErrorHandler = options.customErrorHandler; } /** * Performs a GET request to the provider. * * Uses the prepareRequest function to get the base URL and any specific headers to add to the request and by default * adds the following headers: * - Accept: application/json * * @param endpoint Path to the provider's resource. Will be added to the URL returned by the prepareRequest function. * @param options RequestOptions used to adjust the call made to the provider (use to override default headers). * @returns The {@link Response} extracted from the provider. */ public async get(endpoint: string, options: RequestOptions): Promise> { return this.fetchWrapper(endpoint, null, { ...options, method: 'GET', defaultHeaders: { Accept: 'application/json', }, }); } /** * Performs a GET request to the provider and return the response as a ReadableStream. * * Uses the prepareRequest function to get the base URL and any specific headers to add to the request and by default * adds the following headers: * - Accept: application/octet-stream * * @param endpoint Path to the provider's resource. Will be added to the URL returned by the prepareRequest function. * @param options RequestOptions used to adjust the call made to the provider (e.g. used to override default headers). * @returns The streaming {@link Response} extracted from the provider. */ public async streamingGet(endpoint: string, options: RequestOptions): Promise>> { return this.fetchWrapper>(endpoint, null, { ...options, method: 'GET', defaultHeaders: { Accept: 'application/octet-stream', }, rawBody: true, }); } /** * Performs a POST request to the provider. * * Uses the prepareRequest function to get the base URL and any specific headers to add to the request and by default * adds the following headers: * - Content-Type: application/json', * - Accept: application/json * * @param endpoint Path to the provider's resource. Will be added to the URL returned by the prepareRequest function. * @param options RequestOptions used to adjust the call made to the provider (use to override default headers). * @returns The {@link Response} extracted from the provider. */ public async post(endpoint: string, body: RequestBody, options: RequestOptions): Promise> { return this.fetchWrapper(endpoint, body, { ...options, method: 'POST', defaultHeaders: { 'Content-Type': 'application/json', Accept: 'application/json', }, }); } public async postForm(endpoint: string, form: FormData, options: RequestOptions): Promise> { const { url: providerUrl, headers: providerHeaders } = await this.prepareRequest(options); const absoluteUrl = this.generateAbsoluteUrl(providerUrl, endpoint, options.queryParams); const headers = { ...form.getHeaders(), ...providerHeaders, ...options.additionnalheaders }; const reqOptions = { method: 'POST', headers, }; /** * For some obscure reason we can't use the fetch API to send a form data, so we have to use the native https module * It seems that there is a miscalculation of the Content-Length headers that generates an error : * --> headers length is different from the actual body length * The goto solution recommended across the internet for this, is to simply drop the header. * However, some integrations like Servicenow, will not accept the request if it doesn't contain that header */ const callToProvider = async (): Promise> => { return new Promise((resolve, reject) => { try { const request = https.request(absoluteUrl, reqOptions, response => { response.setEncoding('utf8'); let responseBody = ''; response.on('data', chunk => { responseBody += chunk; }); response.on('end', () => { try { const body = JSON.parse(responseBody); if (body.error) { reject(this.handleError(400, body.error.message, options)); } resolve({ status: 201, headers: Provider.normalizeNodeHeaders(response.headers), body, }); } catch (error) { reject(this.handleError(500, `Failed to parse response body: "${error}"`, options)); } }); }); request.on('error', error => { reject(this.handleError(400, `Error while calling the provider: "${error}"`, options)); }); if (options.signal) { const abortHandler = () => { request.destroy(); reject(this.handleError(408, 'Timeout', options)); }; if (options.signal.aborted) { abortHandler(); } options.signal.addEventListener('abort', abortHandler); request.on('close', () => { if (options.signal) { options.signal.removeEventListener('abort', abortHandler); } }); } form.pipe(request); } catch (error) { reject(this.handleError(500, `Unexpected error while calling the provider: "${error}"`, options)); } }); }; return this.timedCallToProvider(absoluteUrl, 'POST', callToProvider, options); } /** * Performs a POST request to the provider streaming a Readable directly without loading it into memory. * * @param endpoint Path to the provider's resource. Will be added to the URL returned by the prepareRequest function. * @param stream The Readable stream containing the binary data to be sent. * @param options RequestOptions used to adjust the call made to the provider (use to override default headers). * @returns The {@link Response} extracted from the provider. */ public async postStream(endpoint: string, stream: stream.Readable, options: RequestOptions): Promise> { const { url: providerUrl, headers: providerHeaders } = await this.prepareRequest(options); const absoluteUrl = this.generateAbsoluteUrl(providerUrl, endpoint, options.queryParams); const headers = { 'Content-Type': 'application/octet-stream', Accept: 'application/json', ...providerHeaders, ...options.additionnalheaders, }; const callToProvider = async (): Promise> => { return new Promise((resolve, reject) => { let isSettled = false; // Prevent double rejection const cleanup = () => { if (!stream.destroyed) { stream.destroy(); } }; const safeReject = (error: HttpErrors.HttpError) => { if (!isSettled) { isSettled = true; cleanup(); reject(error); } }; const safeResolve = (response: Response) => { if (!isSettled) { isSettled = true; resolve(response); } }; try { const urlObj = new URL(absoluteUrl); const requestOptions: https.RequestOptions = { hostname: urlObj.hostname, path: urlObj.pathname + urlObj.search, method: 'POST', headers, timeout: 0, }; const request = https.request(requestOptions, response => { response.setEncoding('utf8'); let responseBody = ''; response.on('data', chunk => { responseBody += chunk; }); response.on('error', error => { safeReject(this.handleError(500, `Response stream error: "${error}"`, options)); }); response.on('end', () => { try { if (response.statusCode && response.statusCode >= 400) { safeReject(this.handleError(response.statusCode, responseBody, options)); return; } const body = responseBody ? JSON.parse(responseBody) : undefined; safeResolve({ status: response.statusCode || 200, headers: Provider.normalizeNodeHeaders(response.headers), body, }); } catch (error) { safeReject(this.handleError(500, `Failed to parse response body: "${error}"`, options)); } }); }); request.on('timeout', () => { request.destroy(); safeReject(this.handleError(408, 'Request timeout', options)); }); request.on('error', error => { safeReject(this.handleError(500, `Error while calling the provider: "${error}"`, options)); }); stream.on('error', error => { request.destroy(); safeReject(this.handleError(500, `Stream error: "${error}"`, options)); }); if (options.signal) { const abortHandler = () => { request.destroy(); safeReject(this.handleError(408, 'Timeout', options)); }; if (options.signal.aborted) { abortHandler(); } options.signal.addEventListener('abort', abortHandler); request.on('close', () => { if (options.signal) { options.signal.removeEventListener('abort', abortHandler); } }); } // Stream the data directly without buffering stream.pipe(request); } catch (error) { safeReject(this.handleError(500, `Unexpected error while calling the provider: "${error}"`, options)); } }); }; return this.timedCallToProvider(absoluteUrl, 'POST', callToProvider, options); } /** * Performs a PUT request to the provider. * * Uses the prepareRequest function to get the base URL and any specific headers to add to the request and by default * adds the following headers: * - Content-Type: application/json', * - Accept: application/json * * @param endpoint Path to the provider's resource. Will be added to the URL returned by the prepareRequest function. * @param options RequestOptions used to adjust the call made to the provider (use to override default headers). * @returns The {@link Response} extracted from the provider. */ public async put(endpoint: string, body: RequestBody, options: RequestOptions): Promise> { return this.fetchWrapper(endpoint, body, { ...options, method: 'PUT', defaultHeaders: { 'Content-Type': 'application/json', Accept: 'application/json', }, }); } /** * Performs a PUT request to the provider with a Buffer body, typically used for sending binary data. * * IMPORTANT: This method should ONLY be used as a last resort when FormData cannot be used. * It bypasses normal form handling and is used to **manually send chunked** binary data, which may not be appropriate * for all providers. Always be mindful not to load entire binary files in memory! * * Uses the prepareRequest function to get the base URL and any specific headers to add to the request and by default * adds the following headers: * - Content-Type: application/octet-stream * - Accept: application/json * * @param endpoint Path to the provider's resource. Will be added to the URL returned by the prepareRequest function. * @param body The Buffer containing the binary data to be sent. * @param options RequestOptions used to adjust the call made to the provider (use to override default headers). * @returns The {@link Response} extracted from the provider. */ public async putBuffer(endpoint: string, body: Buffer, options: RequestOptions): Promise> { return this.fetchWrapper(endpoint, body, { ...options, method: 'PUT', defaultHeaders: { 'Content-Type': 'application/octet-stream', Accept: 'application/json', }, }); } /** * Performs a PATCH request to the provider. * * Uses the prepareRequest function to get the base URL and any specific headers to add to the request and by default * adds the following headers: * - Content-Type: application/json', * - Accept: application/json * * @param endpoint Path to the provider's resource. Will be added to the URL returned by the prepareRequest function. * @param options RequestOptions used to adjust the call made to the provider (use to override default headers). * @returns The {@link Response} extracted from the provider. */ public async patch(endpoint: string, body: RequestBody, options: RequestOptions): Promise> { return this.fetchWrapper(endpoint, body, { ...options, method: 'PATCH', defaultHeaders: { 'Content-Type': 'application/json', Accept: 'application/json', }, }); } /** * Performs a DELETE request to the provider. * * Uses the prepareRequest function to get the base URL and any specific headers to add to the request and by default * adds the following headers: * - Accept: application/json * * @param endpoint Path to the provider's resource. Will be added to the URL returned by the prepareRequest function. * @param options RequestOptions used to adjust the call made to the provider (use to override default headers). * @returns The {@link Response} extracted from the provider. */ public async delete( endpoint: string, options: RequestOptions, body: RequestBody | null = null, ): Promise> { const defaultHeaders: { Accept: string; 'Content-Type'?: string } = { Accept: 'application/json', }; // Only add Content-Type header when body is provided if (body !== null) { defaultHeaders['Content-Type'] = 'application/json'; } return this.fetchWrapper(endpoint, body, { ...options, method: 'DELETE', defaultHeaders, }); } private generateAbsoluteUrl(providerUrl: string, endpoint: string, queryParams?: { [key: string]: string }): string { let absoluteUrl; if (/^https?:\/\//.test(endpoint)) { absoluteUrl = endpoint; } else if (endpoint === '') { absoluteUrl = providerUrl; } else { absoluteUrl = [providerUrl, endpoint.charAt(0) === '/' ? endpoint.substring(1) : endpoint].join('/'); } if (queryParams) { absoluteUrl = `${absoluteUrl}?${new URLSearchParams(queryParams)}`; } return absoluteUrl; } private async fetchWrapper( endpoint: string, body: RequestBody | Buffer | null, options: InternalRequestOptions, ): Promise> { const { url: providerUrl, headers: providerHeaders } = await this.prepareRequest(options); const absoluteUrl = this.generateAbsoluteUrl(providerUrl, endpoint, options.queryParams); const headers = { ...options.defaultHeaders, ...providerHeaders, ...options.additionnalheaders }; let fetchBody: string | Buffer | null = null; if (body) { if (headers['Content-Type'] === 'application/x-www-form-urlencoded') { fetchBody = new URLSearchParams(body as Record).toString(); } else if ( headers['Content-Type'] === 'application/json' || headers['Content-Type'] === 'application/json-patch+json' ) { fetchBody = JSON.stringify(body); } else if (headers['Content-Type'] === 'application/octet-stream' && body instanceof Buffer) { fetchBody = body; } else { throw this.handleError(400, `Content type not supported: ${headers['Content-Type']}`, options); } } const callToProvider = async (): Promise> => { let response: globalThis.Response; try { response = await fetch(absoluteUrl, { method: options.method, headers, body: fetchBody as BodyInit | null, ...(options.signal ? { signal: options.signal } : {}), }); } catch (error) { if (error instanceof Error) { switch (error.name) { case 'AbortError': throw this.handleError(408, 'Request aborted', options); case 'TimeoutError': throw this.handleError(408, 'Request timeout', options); } throw this.handleError( 500, `Unexpected error while calling the provider. ErrorName: "${error.name}" \n message: "${error.message}" \n stack: ${error.stack} \n cause: ${error.cause} \n causeStack: ${(error.cause as Error)?.stack}`, options, ); } throw this.handleError( 500, 'Unexpected error while calling the provider - this is not normal, investigate', options, ); } // Record raw response for schema-snapshot coverage analysis. // Clone the response before consuming the body so recording doesn't interfere with normal flow. // Only record JSON-like responses (skip binary streams and raw body requests). if ( Provider.recordingPath && !options.rawBody && headers.Accept !== 'application/octet-stream' && response.status < 400 ) { try { const cloned = response.clone(); const recordBody = await cloned.json().catch(() => undefined); if (recordBody !== undefined) { Provider.recordingSeq++; const entry = JSON.stringify({ seq: Provider.recordingSeq, url: absoluteUrl, method: options.method, status: response.status, body: recordBody, }); fs.appendFileSync(Provider.recordingPath, entry + '\n'); } } catch { // Recording failure should never break the integration. } } if (response.status >= 400) { const textResult = await response.text(); throw this.handleError(response.status, textResult, options); } else if (response.status === 204 || response.body === null) { // No content: return without inspecting the body return { status: response.status, headers: Object.fromEntries(response.headers.entries()), body: undefined as unknown as T, }; } const responseContentType = response.headers.get('content-type'); let body: T; if (options.rawBody || headers.Accept === 'application/octet-stream') { // When we expect octet-stream, we accept any Content-Type the provider sends us, we just want to stream it body = response.body as T; } else if (headers.Accept?.match(/application\/.*json/)) { // Validate that the response content type is at least similar to what we expect // (Provider's response Content-Type might be more specific, e.g. application/json;charset=utf-8) // Default to application/json if no Content-Type header is provided if (responseContentType && !responseContentType.match(/application\/.*json/)) { const textResult = await response.text(); throw this.handleError( 500, `Unsupported content-type, expected 'application/json' but got '${responseContentType}'. Original response (${response.status}): "${textResult}"`, options, ); } try { body = response.body ? await response.json() : undefined; } catch (err) { throw this.handleError(500, `Invalid JSON response`, options); } } else if (headers.Accept?.includes('text/html')) { // Accept text based content types body = (await response.text()) as T; } else { throw this.handleError(500, 'Unsupported Content-Type', options); } return { status: response.status, headers: Object.fromEntries(response.headers.entries()), body }; }; return this.timedCallToProvider(absoluteUrl, options.method, callToProvider, options); } /** * Normalizes Node.js IncomingHttpHeaders (which may have undefined or string[] values) * into a flat Record by joining array values and dropping undefined entries. */ private static normalizeNodeHeaders(headers: Record): Record { return Object.fromEntries( Object.entries(headers) .filter((entry): entry is [string, string | string[]] => entry[1] !== undefined) .map(([key, value]) => [key, Array.isArray(value) ? value.join(',') : value]), ); } private async timedCallToProvider( absoluteUrl: string, method: string, callToProvider: () => Promise>, options: RequestOptions, ): Promise> { const requestedStartTime = process.hrtime.bigint(); const timedCall = async (): Promise> => { const actualStartTime = process.hrtime.bigint(); let response; let thrownError; try { response = await callToProvider(); return response; } catch (error) { if (error instanceof HttpErrors.HttpError) { thrownError = error; } else { thrownError = new HttpErrors.HttpError(`Unexpected error while calling ${absoluteUrl}`, 500); } throw thrownError; } finally { const endTime = process.hrtime.bigint(); const requestDurationInNS = Number(endTime - actualStartTime); const requestDurationInMs = (requestDurationInNS / 1_000_000) | 0; const throttleDelay = Number(actualStartTime - requestedStartTime); options.logger.log( thrownError ? 'error' : 'info', `Connector API Request ${method} ${absoluteUrl} ${response?.status ?? thrownError?.status} - ${requestDurationInMs} ms`, { duration: requestDurationInNS, throttleDelay, http: { method, status_code: response?.status ?? thrownError?.status, content_type: response?.headers['Content-Type'], url_details: { path: absoluteUrl, }, }, }, ); options.requestMetrics?.recordExternalApiCall(requestDurationInNS, throttleDelay); } }; return this.rateLimiter ? this.rateLimiter(options, timedCall) : timedCall(); } private handleError(responseStatus: number, message: string, options: RequestOptions): HttpErrors.HttpError { const customError = this.customErrorHandler?.(responseStatus, message, options); return customError ?? buildHttpError(responseStatus, message); } }