import { Logger } from 'besonders-logger' import { DebouncedFunc, pull, sortedIndexBy } from 'lodash-es' import { Applog, ApplogForInsert } from '../applog/datom-types.ts' import { arrayIfSingle } from '../types/typescript-utils.ts' import { isInitEvent, Thread, ThreadEvent } from './basic.ts' const { WARN, LOG, DEBUG, VERBOSE, ERROR } = Logger.setup(Logger.INFO) // eslint-disable-line no-unused-vars // ── ThreadDerivation ──────────────────────────────────────────── /** Delta event — only incremental changes (no init) */ export type DeltaEvent = { added: readonly Applog[], removed: readonly Applog[] | null } /** Context passed to mapDelta — explicit deps, no `this` needed */ export interface DeltaContext { /** The parent thread that emitted the delta */ source: Thread /** All current parent threads */ parents: readonly Thread[] /** The mapped thread's output before this delta is applied */ state: readonly Applog[] } /** * Derivation for a MappedThread — separates full recomputation from incremental updates. * * - `compute` produces full state from parents (construction, reset, parent reinit) * - `mapDelta` transforms a parent's incremental delta (optional — falls back to compute) */ export interface ThreadDerivation { /** Compute full state from parents. Called at construction, triggerRemap, and parent reinit. */ compute(parents: readonly Thread[]): Applog[] /** * Transform an incremental delta from a parent. * Return null to fall back to full recompute via compute(). * Optional — if omitted, every parent change triggers compute(). */ mapDelta?: (delta: DeltaEvent, ctx: DeltaContext) => DeltaEvent | null } // ── Write mapper (unchanged) ─────────────────────────────────── // export type ApplogWriteMapper = (this: MappedThread, applogs: T) => T | null // FIXME: the TS generics don't actually work how I want them to export type ApplogWriteMapper = (this: MappedThread, applogs: Applog[] | ApplogForInsert[]) => Applog[] | ApplogForInsert[] | null // ── MappedThread ─────────────────────────────────────────────── export class MappedThread extends Thread { static mapWrites(parent: Thread, name: string, mapper: ApplogWriteMapper) { return new MappedThread( `${name}<${parent.nameAndSizeUntracked}>`, parent, parent.filters, null, // no derivation — share parent array mapper, ) } static asReadOnly(parent: Thread) { if (parent.readOnly) return parent // already read-only, no need to wrap return new MappedThread( `readOnly(${parent.name})`, parent, parent.filters, null, // no derivation — share parent array null, true, // readOnly ) } private _parentSubscriptions: Map void)> = null // mapped to unsubscribe function constructor( readonly name: string, parents: Thread | readonly Thread[], filters: readonly string[], private _derivation: ThreadDerivation | null, private _writeMapper: ApplogWriteMapper = null, private _readOnly?: boolean, ) { const parentArr = arrayIfSingle(parents) as readonly Thread[] if (!_derivation && parentArr.length > 1) { throw new Error(`MappedThread without derivation must have exactly one parent`) } const initialLogs = _derivation ? _derivation.compute(parentArr) : (parentArr[0] as any)._applogs as Applog[] // no derivation = share parent array super( name, parents, filters, _derivation ? [...initialLogs] : initialLogs, // clone if derived, share if not ) if (_derivation) { this.subscribeToParents() } } public insert(appLogsToInsert: ApplogForInsert[]) { if (this.readOnly) throw ERROR(`[MappedThread] insert() called on read-only thread:`, this.nameAndSizeUntracked) const mapped = this._writeMapper ? this._writeMapper(appLogsToInsert) : appLogsToInsert if (this._writeMapper && !mapped) return return this.parents.forEach(parent => parent.insert(mapped)) } public insertRaw(appLogsToInsert: Applog[]) { if (this.readOnly) throw ERROR(`[MappedThread] insertRaw() called on read-only thread:`, this.nameAndSizeUntracked) const mapped = this._writeMapper ? this._writeMapper(appLogsToInsert) : appLogsToInsert if (this._writeMapper && !mapped) return return this.parents.forEach(parent => parent.insertRaw(mapped as Applog[])) } private subscribeToParents() { this._parentSubscriptions = new Map() if (!this.parents.length) { WARN(`MappedThread has no parents`, this) } VERBOSE(`[MappedThread: ${this.name}] subscribing to parents:`, this.parents.map(p => p.name)) for (const p of this.parents) { VERBOSE(`[MappedThread: ${this.name}] sub to parent`, p.nameAndSizeUntracked) const sub = this.onParentUpdate.bind(this, p) const unsubscribe = p.subscribe(sub, 'derived') this._parentSubscriptions.set(p, unsubscribe) } } /** Tear down parent subscriptions and clear internal state */ dispose() { if (this._parentSubscriptions) { for (const [, unsubscribe] of this._parentSubscriptions) { unsubscribe() } this._parentSubscriptions = null } } /** Swap parents at runtime — re-subscribes and recomputes applogs, notifying downstream */ setParents(newParents: readonly Thread[]) { this.dispose() ;(this as { parents: readonly Thread[] }).parents = newParents this.subscribeToParents() this.triggerRemap() } subscribe(callback: (event: ThreadEvent) => void, type?: 'derived' | 'reaction') { if (this._derivation && !this._parentSubscriptions) { this.subscribeToParents() } return super.subscribe(callback, type) } /** Recompute full state from parents via compute() */ triggerRemap() { if (!this._derivation) throw ERROR(`triggerRemap on a thread without derivation`, this.nameAndSizeUntracked) DEBUG(`MappedThread{${this.nameAndSizeUntracked}} triggerRemap`) const newLogs = this._derivation.compute(this.parents as readonly Thread[]) this._applogs.length = 0 this._applogs.push(...newLogs) this.notifySubscribers({ init: newLogs }) } protected onParentUpdate(sourceThread: Thread, event: ThreadEvent) { if (!this._derivation) return // no derivation = shared array, parent handles it VERBOSE(`MappedThread{${this.nameAndSizeUntracked}} parentUpdate`, event) let result: ThreadEvent if (isInitEvent(event)) { // Parent reinit → full recompute (ignore payload, read current parent state) const newLogs = this._derivation.compute(this.parents as readonly Thread[]) result = { init: newLogs } } else if (this._derivation.mapDelta) { const ctx: DeltaContext = { source: sourceThread, parents: this.parents as readonly Thread[], state: this._applogs, } const mapped = this._derivation.mapDelta(event, ctx) if (mapped === null) { // mapDelta can't handle it → fall back to full recompute const newLogs = this._derivation.compute(this.parents as readonly Thread[]) result = { init: newLogs } } else { result = mapped } } else { // No mapDelta → always full recompute const newLogs = this._derivation.compute(this.parents as readonly Thread[]) result = { init: newLogs } } // Apply result to _applogs if (isInitEvent(result)) { this._applogs.length = 0 this._applogs.push(...result.init) } else { VERBOSE(`MappedThread{${this.nameAndSizeUntracked}} parentUpdate => mapped`, result) for (const log of result.added) { // insert at right location to maintain sort order this._applogs.splice(sortedIndexBy(this._applogs, log, 'ts'), 0, log) } if (result.removed) { for (const toRemove of result.removed) { const idx = this._applogs.indexOf(toRemove) if (idx >= 0) { this._applogs.splice(idx, 1) } else { throw ERROR(`MappedThread{${this.name}} toRemove: log not found`, toRemove, { thread: this, event, result, }) } } } } this.notifySubscribers(result) } get readOnly() { return this._readOnly ?? false } }