import { Logger } from 'besonders-logger' import { valueKey, type ValueKey } from '../applog/applog-utils.ts' import { Applog, EntityID } from '../applog/datom-types.ts' import { anyOf } from '../query/matchers.ts' import { memoizedFn } from '../query/memoized.ts' import { SubscribableImpl, type Subscribable } from '../query/subscribable.ts' import { isInitEvent, Thread } from './basic.ts' import { rollingFilter } from './filters.ts' const { WARN, LOG, DEBUG, VERBOSE, ERROR } = Logger.setup(Logger.INFO) // eslint-disable-line no-unused-vars /** * Live index of applogs grouped by entity ID. Memoized per thread — one subscription * per thread regardless of how many consumers call this. Updated incrementally on * each parent event (O(event-size)). Lookup is O(1) via `map.get(en)`. * * Subscriber order: registers a 'derived' subscription on `thread` at first call. By * the FIFO subscriber-order convention (`thread/basic.ts:notifySubscribers`), * consumers that subscribe to `thread` AFTER calling `applogsByEntity(thread)` will * see an up-to-date index when their handler runs. */ export const applogsByEntity = memoizedFn( 'applogsByEntity', function applogsByEntity(thread: Thread): ReadonlyMap { const map = new Map() const add = (log: Applog) => { let arr = map.get(log.en) if (!arr) { arr = [] map.set(log.en, arr) } arr.push(log) } const remove = (log: Applog) => { const arr = map.get(log.en) if (!arr) return const idx = arr.indexOf(log) if (idx >= 0) arr.splice(idx, 1) if (arr.length === 0) map.delete(log.en) } for (const log of thread.applogs) add(log) thread.subscribe(event => { if (isInitEvent(event)) { map.clear() for (const log of event.init) add(log) } else { for (const log of event.added) add(log) if (event.removed) for (const log of event.removed) remove(log) } }, 'derived') return map }, ) /** * Live index of applogs grouped by value for a given attribute. Value-based counterpart * to `applogsByEntity` (which groups by entity ID). * * Uses `rollingFilter` internally for the attribute filter (memoized, shared with any * other code filtering by the same attribute). * * Returns a `Subscribable` with lazy upstream activation: * - Snapshot use: read `.value` — zero subscription overhead * - Reactive use: `.subscribe()` activates the chain for incremental updates */ export const applogsByAttrValue = memoizedFn( 'applogsByAttrValue', function applogsByAttrValue( thread: Thread, attr: string, ): Subscribable> { // Keyed by `valueKey(vl)`: object/array values can't be Map keys by identity, so they're // canonicalized. Look up via `valueKey(value)`; for primitive values the key IS the value. const map = new Map() const add = (log: Applog) => { const key = valueKey(log.vl) let arr = map.get(key) if (!arr) { arr = [] map.set(key, arr) } arr.push(log) } const remove = (log: Applog) => { const key = valueKey(log.vl) const arr = map.get(key) if (!arr) return const idx = arr.indexOf(log) if (idx >= 0) arr.splice(idx, 1) if (arr.length === 0) map.delete(key) } const filtered = rollingFilter(thread, { at: attr }) // Initial build from current filtered state for (const log of filtered.applogs) add(log) const result = new SubscribableImpl>( map, () => filtered.subscribe(event => { if (isInitEvent(event)) { map.clear() for (const log of event.init) add(log) } else { if (event.removed) for (const log of event.removed) remove(log) for (const log of event.added) add(log) } result._set(map) }, 'derived'), { equals: false }, ) return result }, ) // ── entityLinkIndex ───────────────────────────────────────────── export interface LinkEntry { /** The link entity's own ID */ linkId: EntityID /** Value of attrA on this link entity */ aValue: EntityID /** Value of attrB on this link entity */ bValue: EntityID } export interface EntityLinkIndexValue { /** Lookup by attrA value → link entries (each has linkId + bValue) */ byA: ReadonlyMap /** Lookup by attrB value → link entries (each has linkId + aValue) */ byB: ReadonlyMap } /** * Live bidirectional index for entity-link relations: a common wovin pattern where * a **link entity** connects two targets via two attributes. * * Example: note3 relations use a relation entity with `relation/block` (→ child block) * and `relation/childOf` (→ parent block). * * Returns a `Subscribable` with lazy upstream activation: * - Snapshot use: read `.value` — zero subscription overhead * - Reactive use: `.subscribe()` activates the chain for incremental updates * * Link entity IDs are included in entries so consumers can look up full link state * via `applogsByEntity`. */ export const entityLinkIndex = memoizedFn( 'entityLinkIndex', function entityLinkIndex( thread: Thread, attrA: string, attrB: string, ): Subscribable { const byA = new Map() const byB = new Map() // Internal state: track partial link entities (only one attr seen so far) const aByLink = new Map() // linkId → aValue const bByLink = new Map() // linkId → bValue function addLink(linkId: EntityID, aValue: EntityID, bValue: EntityID) { const entry: LinkEntry = { linkId, aValue, bValue } let arrA = byA.get(aValue) if (!arrA) { arrA = []; byA.set(aValue, arrA) } arrA.push(entry) let arrB = byB.get(bValue) if (!arrB) { arrB = []; byB.set(bValue, arrB) } arrB.push(entry) } function removeLink(linkId: EntityID, aValue: EntityID, bValue: EntityID) { const arrA = byA.get(aValue) if (arrA) { const idx = arrA.findIndex(e => e.linkId === linkId) if (idx >= 0) arrA.splice(idx, 1) if (arrA.length === 0) byA.delete(aValue) } const arrB = byB.get(bValue) if (arrB) { const idx = arrB.findIndex(e => e.linkId === linkId) if (idx >= 0) arrB.splice(idx, 1) if (arrB.length === 0) byB.delete(bValue) } } function processLog(log: Applog) { if (log.at === attrA) { aByLink.set(log.en, log.vl as EntityID) const bVal = bByLink.get(log.en) if (bVal !== undefined) { addLink(log.en, log.vl as EntityID, bVal) } } else if (log.at === attrB) { bByLink.set(log.en, log.vl as EntityID) const aVal = aByLink.get(log.en) if (aVal !== undefined) { addLink(log.en, aVal, log.vl as EntityID) } } } function removeLog(log: Applog) { if (log.at === attrA) { const aVal = aByLink.get(log.en) const bVal = bByLink.get(log.en) if (aVal !== undefined && bVal !== undefined) { removeLink(log.en, aVal, bVal) } aByLink.delete(log.en) } else if (log.at === attrB) { const aVal = aByLink.get(log.en) const bVal = bByLink.get(log.en) if (aVal !== undefined && bVal !== undefined) { removeLink(log.en, aVal, bVal) } bByLink.delete(log.en) } } function buildFull(applogs: readonly Applog[]) { byA.clear() byB.clear() aByLink.clear() bByLink.clear() for (const log of applogs) processLog(log) } const filtered = rollingFilter(thread, { at: anyOf(attrA, attrB) }) // Initial build buildFull(filtered.applogs) const value: EntityLinkIndexValue = { byA, byB } const result = new SubscribableImpl( value, () => filtered.subscribe(event => { if (isInitEvent(event)) { buildFull(event.init) } else { // Process removes before adds (LWW updates appear as remove+add) if (event.removed) for (const log of event.removed) removeLog(log) for (const log of event.added) processLog(log) } result._set(value) }, 'derived'), { equals: false }, ) return result }, )