import * as fs from "fs/promises"; import { UnitemporalVersion } from "./GlitchUnitemporalPartition"; import GlitchPartitionImpl, { GlitchPartition } from "./GlitchPartition"; import { INFINITY_TIME } from "./constants"; import GlitchDB from "."; interface BitemporalVersion extends UnitemporalVersion { validFrom: number; validTo: number; } export interface BitemporallyVersionedData extends BitemporalVersion { data: Type; } interface BitemporallyVersioned { data: BitemporallyVersionedData[]; } // todo caching export interface GlitchBitemporalPartition extends GlitchPartition { get: (key: string, validAsOf?: number) => Promise; set: ( key: string, value: Type, validFrom?: number, validTo?: number, metadata?: { [key: string]: string } ) => Promise; getVersion: ( key: string, validAsOf?: number ) => Promise>; getAllVersions: (key: string) => Promise[]>; } export default class GlitchBiTemporalPartitionImpl extends GlitchPartitionImpl implements GlitchBitemporalPartition { constructor( master: GlitchDB, localDir: string, cacheSize?: number, indices?: string[] ) { super(master, localDir, cacheSize, indices); } async get(key: string, validAsOf?: number): Promise { const data = await this.getVersion(key, validAsOf); return data ? Promise.resolve(data.data) : Promise.resolve(undefined); } async #getVersionedData(key: string): Promise> { const resolvedKey = this.resolveKey(key); const keyPath = this.getKeyPath(resolvedKey); try { const fileData = await fs.readFile(keyPath, { encoding: "utf8", }); const parsed = JSON.parse(fileData) as BitemporallyVersioned; return Promise.resolve(parsed); } catch (e) { // console.log( // `Could not read file at ${keyPath} due to error ${e}. Its likely that this key does not exist.` // ); return Promise.resolve(undefined); } } async getVersion( key: string, validAsOf?: number ): Promise> { await this.init(); const fileData = await this.#getVersionedData(key); const validFrom = validAsOf ?? INFINITY_TIME; if (fileData) { const required = fileData.data.filter((each) => { return ( each.deletedAt === INFINITY_TIME && each.validFrom <= validFrom && ((validFrom === INFINITY_TIME && each.validTo === INFINITY_TIME) || validFrom < each.validTo) ); }); if (required?.length) { return Promise.resolve(required[0]); // returns only the first element as more than one elements are not allowed } else { return Promise.resolve(undefined); } } else { return Promise.resolve(undefined); } } async getAllVersions( key: string ): Promise[]> { await this.init(); const data = await this.#getVersionedData(key); return Promise.resolve(data?.data); } // todo caching async set( key: string, value: Type, validFrom?: number, validTo?: number, metadata?: { [key: string]: string } ): Promise { await this.init(); try { let data = await this.#getVersionedData(key); const currentTime = new Date().valueOf(); const newValidFrom = validFrom ?? currentTime; const newValidTo = validTo ?? INFINITY_TIME; if (newValidTo !== INFINITY_TIME && newValidTo <= newValidFrom) { throw new Error("Valid To cannot be less than or equal to Valid From"); } if (!data?.data?.length) { data = { data: [ { data: value, createdAt: currentTime, deletedAt: INFINITY_TIME, validFrom: newValidFrom, validTo: newValidTo, metadata, }, ], }; await fs.writeFile(this.getKeyPath(key), JSON.stringify(data)); await this.setIndices(key, value); } else { let rowValidBeforeCurrentRow: BitemporallyVersionedData; data.data = data.data.map((row) => { const updatedRow = { ...row }; // todo - this logic has 3 buckets // inside span of existing row // leading edge conflict with existing row // trailing edge conflict with existing row if (row.validFrom <= newValidFrom && row.validTo > newValidFrom) { updatedRow.deletedAt = currentTime; rowValidBeforeCurrentRow = row; } else if ( newValidFrom <= row.validFrom && newValidTo > row.validFrom ) { updatedRow.deletedAt = currentTime; } return updatedRow; }); if (rowValidBeforeCurrentRow) { data.data.push({ ...rowValidBeforeCurrentRow, validTo: newValidFrom, }); } data.data.push({ data: value, createdAt: currentTime, deletedAt: INFINITY_TIME, validFrom: newValidFrom, validTo: newValidTo, metadata, }); await fs.writeFile(this.getKeyPath(key), JSON.stringify(data)); await this.deleteIndices(await this.get(key)); await this.setIndices(key, value); } return Promise.resolve(true); } catch (error) { console.log(`Error setting value for key: ${key}, due to error ${error}`); return Promise.resolve(false); } } }