import { Logger } from 'besonders-logger' import { Applog, ApplogValue, DatalogQueryPattern, EntityID } from '../applog/datom-types.ts' import { isInitEvent, Thread } from '../thread/basic.ts' import { makeFilter, rollingFilter } from '../thread/filters.ts' import { resolveKeyMapper } from './basic.ts' import { anyOf } from './matchers.ts' import { memoizedFn } from './memoized.ts' import { SubscribableImpl } from './subscribable.ts' import type { Subscribable } from './subscribable.ts' import type { StripExplicitPrefix, StripFirstPrefix } from './attr-helpers.ts' const { DEBUG } = Logger.setup(Logger.INFO) // eslint-disable-line no-unused-vars export function liveEntityCollection( thread: Thread, discoveryPattern: DatalogQueryPattern, liveAttributes: readonly A[], ): Subscribable>> export function liveEntityCollection( thread: Thread, discoveryPattern: DatalogQueryPattern, liveAttributes: readonly A[], opts: { stripAtPrefix: true }, ): Subscribable, ApplogValue | null>>> export function liveEntityCollection( thread: Thread, discoveryPattern: DatalogQueryPattern, liveAttributes: readonly A[], opts: { stripAtPrefix: P }, ): Subscribable, ApplogValue | null>>> export function liveEntityCollection( thread: Thread, discoveryPattern: DatalogQueryPattern, liveAttributes: readonly A[], opts: { mapKeys: (attr: A) => string }, ): Subscribable>> export function liveEntityCollection( thread: Thread, discoveryPattern: DatalogQueryPattern, liveAttributes: readonly A[], opts: { stripAtPrefix?: true | string; mapKeys?: (attr: A) => string }, ): Subscribable>> export function liveEntityCollection( thread: Thread, discoveryPattern: DatalogQueryPattern, liveAttributes: readonly A[], opts?: { stripAtPrefix?: true | string; mapKeys?: (attr: A) => string }, ): Subscribable>> { return _liveEntityCollection(thread, discoveryPattern, liveAttributes, opts as { stripAtPrefix?: true | string; mapKeys?: (attr: string) => string }) } const _liveEntityCollection = memoizedFn('liveEntityCollection', function liveEntityCollection( thread: Thread, discoveryPattern: DatalogQueryPattern, liveAttributes: readonly A[], opts?: { stripAtPrefix?: true | string; mapKeys?: (attr: string) => string }, ): Subscribable>> { DEBUG('liveEntityCollection', discoveryPattern, liveAttributes) const discoveryAttr = discoveryPattern.at as string const allAttrs = new Set([discoveryAttr, ...liveAttributes]) const filtered = rollingFilter(thread, { at: anyOf(...allAttrs) }) const isDiscoveryMatch = makeFilter(discoveryPattern) const attrSet = new Set(liveAttributes) const key = resolveKeyMapper(opts) const map = new Map>() function makeRecord(entityId: EntityID): Record { const record = {} as Record for (const attr of liveAttributes) record[key(attr)] = null // Backfill from current filtered state for (const log of filtered.applogs) { if (log.en === entityId && attrSet.has(log.at)) { record[key(log.at)] = log.vl } } return record } function buildFull(applogs: readonly Applog[]) { map.clear() for (const log of isDiscoveryMatch(applogs)) { if (!map.has(log.en)) map.set(log.en, makeRecord(log.en)) } } function addLog(log: Applog) { // Discovery match → ensure entity exists if (isDiscoveryMatch([log]).length > 0 && !map.has(log.en)) { map.set(log.en, makeRecord(log.en)) return // makeRecord already backfilled attrs } // Attribute match → update value if (attrSet.has(log.at)) { const record = map.get(log.en) if (record) record[key(log.at)] = log.vl } } function removeLog(log: Applog) { if (isDiscoveryMatch([log]).length > 0) { // Check if entity still has another discovery match const stillDiscovered = filtered.applogs.some( l => l.en === log.en && isDiscoveryMatch([l]).length > 0, ) if (!stillDiscovered) { map.delete(log.en) return } } if (attrSet.has(log.at)) { const record = map.get(log.en) if (record) { // Find current value from remaining applogs const current = filtered.applogs.find(l => l.en === log.en && l.at === log.at) record[key(log.at)] = current?.vl ?? null } } } // Initial build buildFull(filtered.applogs) const result = new SubscribableImpl>>( map, () => filtered.subscribe((event) => { if (isInitEvent(event)) { buildFull(event.init) } else { // Process removes before adds — LWW updates appear as remove+add in same delta if (event.removed) for (const log of event.removed) removeLog(log) for (const log of event.added) addLog(log) } result._set(map) }, 'derived'), { equals: false }, ) return result }, )