import type { IncomingMessage, RequestListener, ServerResponse } from "node:http"; import { pipeline } from "node:stream/promises"; import { timingSafeEqual } from "node:crypto"; import type { PFrameInternal } from "@milaboratories/pl-model-middle-layer"; import { createETag, getFilenameFromUrl, parseRange, isGetOrHead, isGet, isHead, Options, StatusCode, HeaderName, HeaderValue, } from "./utils"; import { isAbortError } from "@milaboratories/pl-model-common"; /** Main request handler for parquet files */ function handleRequest( request: IncomingMessage, response: ServerResponse, store: PFrameInternal.ObjectStore, ): void { // RFC 9110 section 6.6.1: Date header should be present in all responses response.sendDate = true; // RFC 9110 section 8.6: Content-Length 0 as default for error responses response.strictContentLength = true; response.setHeader(HeaderName.ContentLength, 0); // Note: setting Content-Length disables Node.js default Transfer-Encoding: chunked // RFC 9111 section 5.2: Cache-Control header with public allows to cache authenticated responses response.setHeader(HeaderName.CacheControl, HeaderValue.CacheControl); // RFC 9110 section 15.5.6: Method not allowed const method = request.method; if (!isGetOrHead(method)) { response.setHeader(HeaderName.Allow, HeaderValue.Allow); return void response.writeHead(StatusCode.MethodNotAllowed).end(); } const filename = getFilenameFromUrl(request); if (filename === null) { return void response.writeHead(StatusCode.Gone).end(); } // From now on we are sure that the response would be a Parquet file response.setHeader(HeaderName.AcceptRanges, HeaderValue.AcceptRanges); response.setHeader(HeaderName.ContentType, HeaderValue.ContentType); // RFC 9110 section 8.8.3: ETag header is used for cache versioning const etag = createETag(filename); // RFC 9110 section 8.8.2: Last-Modified header field for cache validation const mtime = new Date(0); // Using fake fixed date since files are immutable // RFC 9111 section 5.2: Cache-Control header with public allows to cache authenticated responses response.setHeader(HeaderName.CacheControl, HeaderValue.CacheControl); response.setHeader(HeaderName.ETag, etag); response.setHeader(HeaderName.LastModified, mtime.toUTCString()); const options = new Options(request); // RFC 9110 section 13.1.1: If-Match precondition evaluation // RFC 9110 section 13.1.4: If-Unmodified-Since precondition evaluation if (options.preconditionFailed(etag, mtime)) { return void response.writeHead(StatusCode.PreconditionFailed).end(); } // RFC 9110 section 13.1.2: If-None-Match precondition evaluation // RFC 9110 section 13.1.3: If-Modified-Since precondition evaluation else if (options.notModified(etag, mtime)) { return void response.writeHead(StatusCode.NotModified).end(); } const range = parseRange(request); if (range === null) { return void response.writeHead(StatusCode.BadRequest).end(); } const abortController = new AbortController(); request.on("close", () => abortController.abort()); const signal = abortController.signal; store.request(filename, { method, range, signal, // pipeline automatically destroys the streams if they were not gracefully closed callback: async (result) => { if (request.destroyed) { // request has timed out, close the connection if (response.destroyed) { return; } else if (response.headersSent) { return void response.end(); } else { response.setHeader(HeaderName.Connection, HeaderValue.Connection); return void response.writeHead(StatusCode.RequestTimeout).end(); } } switch (result.type) { case "InternalError": // object store encountered network error, retry by client can help return void response.writeHead(StatusCode.InternalServerError).end(); case "NotFound": // RFC 9110 section 15.4.5: Not found return void response.writeHead(StatusCode.NotFound).end(); case "RangeNotSatisfiable": // RFC 9110 section 15.5.17: Range not satisfiable response.setHeader(HeaderName.ContentRange, `bytes */${result.size}`); return void response.writeHead(StatusCode.RangeNotSatisfiable).end(); case "Ok": break; } if (isGet(method) && !result.data) { // object store implementation is incorrect, retry by client cannot help return void response.writeHead(StatusCode.GatewayTimeout).end(); } if (range) { // RFC 9110 section 14.4: Partial content response response.setHeader(HeaderName.ContentLength, result.range.end - result.range.start + 1); response.setHeader( HeaderName.ContentRange, `bytes ${result.range.start}-${result.range.end}/${result.size}`, ); response.writeHead(StatusCode.PartialContent); } else { // RFC 9110 section 15.3.1: OK response response.setHeader(HeaderName.ContentLength, result.size); response.writeHead(StatusCode.Ok); } // RFC 9110 section 9.3.2: HEAD method must not return message body if (isHead(method)) { return void response.end(); } try { return await pipeline(result.data!, response, { signal }); } catch (error: unknown) { if (!isAbortError(error)) throw error; } }, }); } /** * Create a request handler for serving files from an object store * compatible with HTTP/1.1 as defined in RFC 9110 and RFC 9111: * - * - * * Accepts only paths of the form `/.parquet`, returns 410 Gone otherwise * Assumes that files are immutable (and sets cache headers accordingly) */ export function createRequestHandler( options: PFrameInternal.RequestHandlerOptions, ): RequestListener { const { store } = options; return (request, response) => handleRequest(request, response, store); } /** Request authorization middleware */ function authorizeRequest( request: IncomingMessage, response: ServerResponse, handler: RequestListener, authHeader: string, ): void { // RFC 9110 section 6.6.1: Date header should be present in all responses response.sendDate = true; // RFC 9110 section 8.6: Content-Length 0 as default for error responses response.strictContentLength = true; response.setHeader(HeaderName.ContentLength, 0); // Note: setting Content-Length disables Node.js default Transfer-Encoding: chunked const actualHeader = request.headers[HeaderName.Authorization]; // Early length check to avoid unnecessary processing if (!actualHeader || actualHeader.length !== authHeader.length) { // RFC 9110 section 11.6.1: WWW-Authenticate header field response.setHeader(HeaderName.WWWAuthenticate, HeaderValue.WWWAuthenticate); return void response.writeHead(StatusCode.Unauthorized).end(); } // Use timing-safe comparison to prevent timing attacks // const encoder = new TextEncoder(); const receivedBuffer = encoder.encode(actualHeader); const expectedBuffer = encoder.encode(authHeader); if ( receivedBuffer.byteLength !== expectedBuffer.byteLength || !timingSafeEqual(receivedBuffer, expectedBuffer) ) { response.setHeader(HeaderName.WWWAuthenticate, HeaderValue.WWWAuthenticate); return void response.writeHead(StatusCode.Unauthorized).end(); } return handler(request, response); } /** Apply Bearer token authorization to @param handler */ export function authorizeRequestHandler( handler: RequestListener, authToken: PFrameInternal.HttpAuthorizationToken, ): RequestListener { const authHeader = `Bearer ${authToken}`; return (request, response) => authorizeRequest(request, response, handler, authHeader); }