import type { Readable } from "node:stream"; import { stat, open, type FileHandle } from "node:fs/promises"; import { join, resolve } from "node:path"; import { PFrameInternal } from "@milaboratories/pl-model-middle-layer"; import { ensureError, isAbortError } from "@milaboratories/pl-model-common"; /** Object store for serving files from a local directory */ export class FileSystemStore extends PFrameInternal.BaseObjectStore { private readonly rootDir: string; private constructor(options: PFrameInternal.FsStoreOptions) { super(options); this.rootDir = options.rootDir; } static async init(options: PFrameInternal.FsStoreOptions): Promise { const resolvedRootDir = resolve(options.rootDir); const rootStats = await stat(resolvedRootDir).catch(() => { throw new Error(`File system store root directory does not exist: ${resolvedRootDir}`); }); if (!rootStats.isDirectory()) { throw new Error(`File system store root path is not a directory: ${resolvedRootDir}`); } return new FileSystemStore({ ...options, rootDir: resolvedRootDir, }); } override async request( filename: PFrameInternal.ParquetFileName, params: { method: PFrameInternal.HttpMethod; range?: PFrameInternal.HttpRange; signal: AbortSignal; callback: (response: PFrameInternal.ObjectStoreResponse) => Promise; }, ): Promise { let file: FileHandle | undefined; const respond = async (response: PFrameInternal.ObjectStoreResponse) => { try { await params.callback(response).finally(async () => await file?.close()); } catch (error: unknown) { this.logger( "warn", `File system store received unexpected rejection from callback: ${ensureError(error)}`, ); } }; try { try { const path = join(this.rootDir, filename); file = await open(path, "r"); } catch (error: unknown) { this.logger( "error", `File system store failed to open file ${filename}: ${ensureError(error)}`, ); return await respond({ type: "NotFound" }); } params.signal.throwIfAborted(); let size: number; try { ({ size } = await file.stat()); } catch (error: unknown) { this.logger( "error", `File system store failed to get size of file ${filename}: ${ensureError(error)}`, ); return await respond({ type: "InternalError" }); } params.signal.throwIfAborted(); const range = this.translate(size, params.range); if (!range) { return await respond({ type: "RangeNotSatisfiable", size }); } if (params.method === "HEAD") { return await respond({ type: "Ok", size, range }); } let data: Readable; try { data = file.createReadStream({ start: range.start, end: range.end, autoClose: false, }); this.logger( "info", `File system store created read stream for ${filename}[${range.start}..=${range.end}]`, ); } catch (error: unknown) { this.logger( "error", `File system store failed to create read stream for ${filename}[${range.start}..=${range.end}]: ${ensureError(error)}`, ); return await respond({ type: "InternalError" }); } return await respond({ type: "Ok", size, range, data }); } catch (error: unknown) { if (!isAbortError(error)) { this.logger("warn", `File system store unhandled error: ${ensureError(error)}`); } return await respond({ type: "InternalError" }); } } }