import { type ChildProcess, spawn } from "node:child_process"; import { createInterface, type Interface } from "node:readline/promises"; import { join, dirname } from "node:path"; import { fileURLToPath } from "node:url"; import { Command, InvalidArgumentError } from "commander"; import { HttpHelpers } from "./export"; import { parseJson, stringifyJson, type StringifiedJson } from "@milaboratories/pl-model-common"; import { type PFrameInternal } from "@milaboratories/pl-model-middle-layer"; const Options = { NoHttps: "--no-https", NoAuth: "--no-auth", Port: "--port", } as const; type Info = StringifiedJson; /** * Serves parquet files from the given root directory. * Manages the server lifecycle with graceful shutdown. */ export async function runParquetServer(): Promise { const program = new Command(); program .name("parquet-server") .description("Serve parquet files from a directory over HTTP(S)") .argument("", "Root directory containing parquet files") .option(Options.NoHttps, "Downgrade HTTPS to HTTP") .option(Options.NoAuth, "Disable authorization") .option( `${Options.Port} `, "Port to listen on", (value) => { const port = parseInt(value, 10); if (isNaN(port) || port < 0 || port > 65535) { throw new InvalidArgumentError("valid port numbers are 0-65535"); } return port; }, 0, ) .action( async ( rootDir: string, options: { https: boolean; auth: boolean; port: number; }, ) => { const abortController = new AbortController(); process .on("SIGINT", () => abortController.abort()) .on("SIGTERM", () => abortController.abort()); abortController.signal.throwIfAborted(); const store = await HttpHelpers.createFsStore({ rootDir, logger: (level, message) => { const timestamp = new Date(Date.now()).toISOString(); console.log(`[${timestamp}] [${level}] ${message}`); }, }); abortController.signal.throwIfAborted(); const handler = HttpHelpers.createRequestHandler({ store }); const server = await HttpHelpers.createHttpServer({ handler, ...(!options.https && { noHttps: true }), ...(!options.auth && { noAuth: true }), port: options.port, }); abortController.signal.onabort = () => server.stop(); abortController.signal.throwIfAborted(); const serverInfo: Info = stringifyJson(server.info); console.log(serverInfo); await server.stopped; }, ); await program.parseAsync(); } /** * Reference implementation of a parquet server runner for tests: * - Reads the server configuration from the spawned process stdout * - Forwards the server logs to the console * - Shuts down the server on dispose */ export class ParquetServer implements Disposable { readonly #process: ChildProcess; readonly #info: PFrameInternal.HttpServerInfo; readonly #lineReader: Interface; private constructor( process: ChildProcess, info: PFrameInternal.HttpServerInfo, lineReader: Interface, ) { this.#process = process; this.#info = info; this.#lineReader = lineReader; } get info(): PFrameInternal.HttpServerInfo { return this.#info; } static async serve( rootDir: string, options?: { noHttps?: true; noAuth?: true; port?: number; }, ): Promise { const nodeDirname = dirname(fileURLToPath(import.meta.url)); const binPath = join(nodeDirname, "..", "bin", "parquet-server.mjs"); const serverProcess = spawn( "node", [ binPath, rootDir, ...(options?.noHttps ? [Options.NoHttps] : []), ...(options?.noAuth ? [Options.NoAuth] : []), ...(options?.port ? [Options.Port, options.port.toString()] : []), ], { stdio: ["ignore", "pipe", "inherit"], }, ); const lineReader = createInterface({ input: serverProcess.stdout! }); const exitPromise = new Promise((_, reject) => { serverProcess.once("exit", (code, signal) => { reject( new Error( `parquet-server exited before emitting server info (code=${code}, signal=${signal})`, ), ); }); serverProcess.once("error", reject); }); const firstLine = await Promise.race([lineReader[Symbol.asyncIterator]().next(), exitPromise]); if (firstLine.value === undefined) { throw new Error("parquet-server stdout closed without emitting server info"); } const serverInfo = parseJson(firstLine.value as Info); lineReader.on("line", console.log); return new ParquetServer(serverProcess, serverInfo, lineReader); } [Symbol.dispose](): void { this.#lineReader.close(); this.#process.kill(); } }