import { AgentHash, Applog, ApplogValue, CidString, DatalogQueryPattern, EntityID, SearchContext, ValueOrMatcher } from '../applog/datom-types.ts' import { Logger } from 'besonders-logger' import { isEmpty } from 'lodash-es' import stringify from 'safe-stable-stringify' import { isLaterByTsAndPv, isoDateStrCompare, isVariable, resolveOrRemoveVariables, sortApplogsByTs } from '../applog/applog-utils.ts' import { createDebugName } from '../utils/debug-name.ts' import { isInitEvent, StaticThread, Thread, ThreadEvent } from '../thread/basic.ts' import { hasFilter, makeFilter, rollingFilter, rollingMapper, ThreadOnlyCurrent } from '../thread/filters.ts' import { applogsByEntity } from '../thread/indexes.ts' import { MappedThread, type ThreadDerivation } from '../thread/mapped.ts' import { ThreadInMemory } from '../thread/writeable.ts' import { memoizedFn } from './memoized.ts' import { isArrayInitEvent, SubscribableArray, SubscribableArrayImpl, SubscribableImpl, Unsubscribe } from './subscribable.ts' import { LiveQueryResult, QueryNode, QueryResult } from './types.ts' const { WARN, LOG, DEBUG, VERBOSE, ERROR } = Logger.setup(Logger.INFO, { prefix: '[q]' }) // eslint-disable-line no-unused-vars function assertLWW(thread: Thread) { if (!hasFilter(thread, 'lastWriteWins')) { throw ERROR(`requires lastWriteWins-filtered thread, got filters:`, thread.filters, { name: thread.name }) } } let globalQueryTimeoutTime = null // util.inspect.defaultOptions.depth = 5; // export interface QueryExecutorArguments { // db: Thread // // applogs: AppLog[] // nodes: SearchContextWithLog[] // } // export interface QueryExecutorResult { // // applogs: AppLog[] // nodes: SearchContextWithLog[] // } // export type QueryExecutor = (args: QueryExecutorArguments) => QueryExecutorResult ///////////// // QUERIES // ///////////// /** * Keep only the latest logs for each en&at (= last write wins) */ export const lastWriteWins = memoizedFn('lastWriteWins', function lastWriteWins( thread: Thread, { inverseToOnlyReturnFirstLogs, tolerateAlreadyFiltered }: { inverseToOnlyReturnFirstLogs?: boolean tolerateAlreadyFiltered?: boolean } = {}, ): ThreadOnlyCurrent { VERBOSE(`lastWriteWins${inverseToOnlyReturnFirstLogs ? '.inversed' : ''} < ${thread.nameAndSizeUntracked} > initializing`) if (thread.filters.includes('lastWriteWins')) { if (tolerateAlreadyFiltered) { DEBUG(`[lastWriteWins] already filtered, but tolerateAlreadyFiltered=true, so returning`) return thread as ThreadOnlyCurrent } throw ERROR(`thread already filtered lastWriteWins:`, thread.filters, { name: thread.name }) } let rollingMap: Map /** * Iterate `newLogs` (already chain-aware-sorted by `sortApplogsByTs`) updating * `rollingMap` to hold the LWW winner per (en|at) key. Uses `isLaterByTsAndPv` * — pairwise pv-aware predicate — so cross-batch comparisons in mapDelta also * stay deterministic when same-ts chain links collide. */ const processLogs = (newLogs: readonly Applog[], toRemove: Applog[] | null): Applog[] => { const toAdd = [] as Applog[] let prevTs: string | undefined for ( let i = inverseToOnlyReturnFirstLogs ? 0 : newLogs.length - 1; inverseToOnlyReturnFirstLogs ? i < newLogs.length : i >= 0; inverseToOnlyReturnFirstLogs ? i++ : i-- ) { const log = newLogs[i] const key = log.en + '|' + log.at if (prevTs !== undefined) { const cmp = isoDateStrCompare(prevTs, log.ts) if (inverseToOnlyReturnFirstLogs ? cmp > 0 : cmp < 0) { throw ERROR(`lastWriteWins.processLogs logs not ts-sorted:`, prevTs, inverseToOnlyReturnFirstLogs ? '>' : '<', log.ts, { log, i, newLogs, inverseToOnlyReturnFirstLogs, }) } } prevTs = log.ts const existing = rollingMap.get(key) const replaces = !existing || (inverseToOnlyReturnFirstLogs ? isLaterByTsAndPv(existing, log) : isLaterByTsAndPv(log, existing)) if (replaces) { if (existing && toRemove) toRemove.push(existing) toAdd.push(log) rollingMap.set(key, log) } } return toAdd } const lwwName = `lastWriteWins${inverseToOnlyReturnFirstLogs ? '.inversed' : ''}` const derivation: ThreadDerivation = { compute(parents) { if (parents.length !== 1) { throw ERROR(`${lwwName} requires exactly one parent`, { parents: parents.length }) } const [parent] = parents rollingMap = new Map() const toAdd = processLogs(parent.applogs, null) sortApplogsByTs(toAdd) VERBOSE.isDisabled || VERBOSE(`${lwwName}<${thread.nameAndSizeUntracked}> compute`, { toAdd: toAdd.length }) return toAdd }, mapDelta(delta) { const toRemove = [] as Applog[] const toAdd = processLogs(delta.added, toRemove) sortApplogsByTs(toAdd) VERBOSE.isDisabled || VERBOSE(`${lwwName}<${thread.nameAndSizeUntracked}> mapDelta`, { ...delta, toAdd, toRemove }) return { added: toAdd, removed: toRemove } }, } const mappedThread = rollingMapper(thread, derivation, { name: lwwName, extraFilterName: 'lastWriteWins' }) VERBOSE.isDisabled || VERBOSE(`lastWriteWins<${thread.nameAndSizeUntracked}> filtered down to`, mappedThread.applogs.length) return mappedThread as ThreadOnlyCurrent }, { argsDebugName: (thread) => createDebugName({ caller: 'lastWriteWins', thread }) }) const isDeletedAttrs = ['isDeleted', 'relation/isDeleted', 'block/isDeleted'] function isDeletionLog(log: Applog) { return log.vl === true && isDeletedAttrs.includes(log.at) } /** * Remove all applogs for entities that have an applog `{ at: 'isDeleted' | 'relation/isDeleted' | 'block/isDeleted', vl: true }`. * * Emits synthetic `removed` events for the entity's pre-existing applogs when an * entity becomes deleted (and synthetic `added` events for un-deletion). * * Un-deletion: canonical input is appending `{ at: 'isDeleted', vl: false }`. Requires * `lastWriteWins` upstream — without it, the `vl: true` log isn't superseded so we * can't observe a transition. Soft-warned at runtime. */ export const withoutDeleted = memoizedFn('withoutDeleted', function withoutDeleted( thread: Thread, ) { if (VERBOSE.isEnabled) VERBOSE(`withoutDeleted<${thread.nameAndSizeUntracked}>`) if (thread.filters.includes('withoutDeleted')) { throw ERROR(`thread already filtered withoutDeleted:`, thread.filters, { name: thread.name }) } if (!thread.filters.includes('lastWriteWins')) { WARN(`withoutDeleted on non-lastWriteWins thread: un-deletion (isDeleted: false) won't take effect`, { thread: thread.name }) } // FIFO contract: byEntity must subscribe to `thread` BEFORE `result` does, so the index // is up-to-date when our mapper runs. Calling applogsByEntity here forces that order; // memoization makes it idempotent if other code already subscribed. const byEntity = applogsByEntity(thread) // Per-entity count of currently-active isDeleted-class logs. 0→1 = entity becomes hidden; // 1→0 = entity becomes visible. Handles multiple isDeleted-class attrs per entity. const activeDeletionMarkers = new Map() for (const log of thread.applogs) { if (isDeletionLog(log)) { activeDeletionMarkers.set(log.en, (activeDeletionMarkers.get(log.en) ?? 0) + 1) } } const isHidden = (en: EntityID) => activeDeletionMarkers.has(en) const derivation: ThreadDerivation = { compute(parents) { if (parents.length !== 1) throw ERROR(`withoutDeleted requires exactly one parent`, { parents: parents.length }) const [parent] = parents activeDeletionMarkers.clear() for (const log of parent.applogs) { if (isDeletionLog(log)) { activeDeletionMarkers.set(log.en, (activeDeletionMarkers.get(log.en) ?? 0) + 1) } } return parent.applogs.filter(log => !isHidden(log.en)) }, mapDelta: (delta) => { // Snapshot of which entities were hidden BEFORE this delta. Required because // pass-through correctness depends on prior visibility (logs of pre-hidden // entities were never in result, so they must not appear in `removed`), // while the post-mutation `isHidden` state determines whether new content // should be admitted. Filtering only on post-mutation state breaks both // W→N (stale isDeleted=true log slips into removed → MappedThread crash) // and F→D-with-content-removal (legitimate content removal gets filtered). const hiddenBefore = new Set(activeDeletionMarkers.keys()) const entitiesNowHidden: EntityID[] = [] const entitiesNowVisible: EntityID[] = [] for (const log of delta.added) { if (!isDeletionLog(log)) continue const prev = activeDeletionMarkers.get(log.en) ?? 0 activeDeletionMarkers.set(log.en, prev + 1) if (prev === 0) entitiesNowHidden.push(log.en) } if (delta.removed) { for (const log of delta.removed) { if (!isDeletionLog(log)) continue const prev = activeDeletionMarkers.get(log.en) ?? 0 const next = prev - 1 if (next > 0) activeDeletionMarkers.set(log.en, next) else { activeDeletionMarkers.delete(log.en) entitiesNowVisible.push(log.en) } } } // Synthetic removals: applogs of newly-hidden entities currently in result. // byEntity index is updated for this tick already (FIFO), so we filter out // applogs added in this very tick (they were never in result). const newAddedSet = new Set(delta.added) const syntheticRemovals: Applog[] = [] for (const en of entitiesNowHidden) { const applogs = byEntity.get(en) if (!applogs) continue for (const log of applogs) { if (!newAddedSet.has(log)) syntheticRemovals.push(log) } } // Synthetic additions: all current parent applogs of newly-visible entities. const syntheticAdditions: Applog[] = [] for (const en of entitiesNowVisible) { const applogs = byEntity.get(en) if (applogs) syntheticAdditions.push(...applogs) } // Pass-through: a parent log goes through to subscribers iff the entity // wasn't hidden before AND isn't hidden now. Transitions are owned by the // synthetic streams above. const passAdded = delta.added.filter(log => !hiddenBefore.has(log.en) && !isHidden(log.en)) const passRemoved = delta.removed?.filter(log => !hiddenBefore.has(log.en)) ?? [] return { added: [...passAdded, ...syntheticAdditions], removed: [...passRemoved, ...syntheticRemovals], } }, } const result = rollingMapper(thread, derivation, { name: 'withoutDeleted', extraFilterName: 'withoutDeleted' }) return result }) /////////////////////////// // ONE-OFF QUERY (snapshot) // /////////////////////////// /** Shared helper: create a QueryNode from a log and its context */ function makeQueryNode( log: Applog, parentNode: QueryNode | null, varMapper: (log: Applog) => SearchContext, threadName: string, ): QueryNode { const nodeVars = Object.assign({}, parentNode?.variables, varMapper(log)) return new QueryNode( StaticThread.fromArray([log], threadName), nodeVars, parentNode, ) } /** * One-off query — returns a plain snapshot. No subscriptions, no stale-data risk. */ export const query = memoizedFn('query', function query( threadOrLogs: Thread | Applog[], patternOrPatterns: DatalogQueryPattern | DatalogQueryPattern[], startVariables: SearchContext = {}, opts: { debug?: boolean } = {}, ): QueryResult { throwOnTimeout() const thread = threadFromMaybeArray(threadOrLogs) DEBUG(`query<${thread.nameAndSizeUntracked}>:`, patternOrPatterns) const patterns = (Array.isArray(patternOrPatterns) ? patternOrPatterns : [patternOrPatterns]) as DatalogQueryPattern[] warnIfDisjointQuerySteps(patterns) let prevNodes: readonly QueryNode[] | null if (patterns.length === 1) { prevNodes = null } else { const patternsExceptLast = patterns.slice(0, -1) prevNodes = query(thread, patternsExceptLast, startVariables, opts).nodes } const lastPattern = patterns[patterns.length - 1] const stepResult = queryStepOnce(thread, prevNodes, lastPattern, opts) VERBOSE.isDisabled || VERBOSE(`query result:`, stepResult.nodes) return stepResult }, { argsDebugName: (thread, pattern, startVars) => createDebugName({ caller: 'query', thread, args: startVars ? { pattern, startVars } : pattern }), }) /** * One-off query step — pure filtering via makeFilter, no subscriptions. */ export function queryStepOnce( thread: Thread, prevNodes: readonly QueryNode[] | null, pattern: DatalogQueryPattern, opts: { debug?: boolean } = {}, ): QueryResult { DEBUG(`queryStepOnce<${thread.nameAndSizeUntracked}> with`, prevNodes?.length ?? 'all', 'nodes, pattern:', pattern) if (!Object.entries(pattern).length) throw new Error(`Pattern is empty`) function doQueryOnce(node: QueryNode | null): QueryNode[] { const [patternWithResolvedVars, variablesToFill] = resolveOrRemoveVariables(pattern, node?.variables ?? {}) VERBOSE(`[queryStepOnce.doQuery] patternWithoutVars: `, patternWithResolvedVars) const filter = makeFilter(patternWithResolvedVars) const matchingLogs = filter(thread.applogs) const varMapper = createObjMapper(variablesToFill) const nodes = matchingLogs.map(log => makeQueryNode( log, node, varMapper, createDebugName({ caller: 'QueryNode', thread, pattern: `${stringify(Object.assign({}, node?.variables, varMapper(log)))}@${stringify(patternWithResolvedVars)}`, }), )) if (VERBOSE.isEnabled) VERBOSE(`[queryStepOnce.doQuery] nodes:`, nodes.map(n => n.variables)) if (opts.debug) { LOG(`[queryStepOnce] step result:`, nodes.map(({ variables, logsOfThisNode: thread }) => ({ variables, thread, }))) } return nodes } if (!prevNodes) { return new QueryResult(doQueryOnce(null)) } const allNodes = prevNodes.flatMap(inputNode => doQueryOnce(inputNode)) return new QueryResult(allNodes) } /////////////////////////// // LIVE QUERY (reactive) // /////////////////////////// /** * Live query — eagerly activated, always up-to-date. * Returns LiveQueryResult with subscribe + dispose. */ export const liveQuery = memoizedFn('liveQuery', function liveQuery( threadOrLogs: Thread | Applog[], patternOrPatterns: DatalogQueryPattern | DatalogQueryPattern[], startVariables: SearchContext = {}, opts: { debug?: boolean } = {}, ): LiveQueryResult { throwOnTimeout() const thread = threadFromMaybeArray(threadOrLogs) DEBUG(`liveQuery<${thread.nameAndSizeUntracked}>:`, patternOrPatterns) const patterns = (Array.isArray(patternOrPatterns) ? patternOrPatterns : [patternOrPatterns]) as DatalogQueryPattern[] let prevResult: LiveQueryResult | null if (patterns.length === 1) { prevResult = null } else { const patternsExceptLast = patterns.slice(0, -1) prevResult = liveQuery(thread, patternsExceptLast, startVariables, opts) } const lastPattern = patterns[patterns.length - 1] const stepResult = liveQueryStep(thread, prevResult, lastPattern, opts) VERBOSE.isDisabled || VERBOSE(`liveQuery result:`, stepResult.nodes) return stepResult }, { argsDebugName: (thread, pattern, startVars) => createDebugName({ caller: 'liveQuery', thread, args: startVars ? { pattern, startVars } : pattern }), }) export const liveQueryStep = memoizedFn('liveQueryStep', function liveQueryStep( thread: Thread, nodeSet: LiveQueryResult | null, pattern: DatalogQueryPattern, opts: { debug?: boolean } = {}, ): LiveQueryResult { DEBUG(`liveQueryStep<${thread.nameAndSizeUntracked}> with`, nodeSet?.untrackedSize ?? 'all', 'nodes, pattern:', pattern) if (!Object.entries(pattern).length) throw new Error(`Pattern is empty`) function doQuery(node: QueryNode | null): SubscribableArray { const [patternWithResolvedVars, variablesToFill] = resolveOrRemoveVariables(pattern, node?.variables ?? {}) VERBOSE(`[liveQueryStep.doQuery] patternWithoutVars: `, patternWithResolvedVars) const applogsMatchingStatic = rollingFilter(thread, patternWithResolvedVars) const varMapper = createObjMapper(variablesToFill) function makeNode(log: Applog): QueryNode { return makeQueryNode( log, node, varMapper, createDebugName({ caller: 'QueryNode', thread: applogsMatchingStatic, pattern: `${stringify(Object.assign({}, node?.variables, varMapper(log)))}@${stringify(patternWithResolvedVars)}`, }), ) } // Compute initial result synchronously const initialNodes = applogsMatchingStatic.applogs.map(makeNode) if (VERBOSE.isEnabled) VERBOSE(`[liveQueryStep.doQuery] initial nodes:`, initialNodes.map(n => n.variables)) if (opts.debug) { LOG(`[liveQueryStep] step result:`, initialNodes.map(({ variables, logsOfThisNode: thread }) => ({ variables, thread, }))) } // Upstream subscription activates lazily — only when someone subscribes to us const result = new SubscribableArrayImpl( initialNodes, () => applogsMatchingStatic.subscribe((event) => { if (isInitEvent(event)) { result._reset(event.init.map(makeNode)) } else { if (event.added.length) { result._push(...event.added.map(makeNode)) } if (event.removed?.length) { const removedCids = new Set(event.removed.map(log => log.cid)) const toRemove = result.items.filter(qn => removedCids.has(qn.logsOfThisNode.applogs[0]?.cid) ) if (toRemove.length) result._remove(toRemove) } } }, 'derived'), ) return result } // ── Single-step query (nodeSet === null) ────────────────────── if (!nodeSet) { return new LiveQueryResult(doQuery(null)) } // ── Multi-step query (nodeSet !== null) ──────────────────────── // Compute initial result synchronously const initialInners = nodeSet.nodes.map(inputNode => ({ inputNode, inner: doQuery(inputNode), })) const initialItems = initialInners.flatMap(({ inner }) => [...inner.items]) // Lazy activation: upstream subscriptions only created when someone subscribes const aggregated = new SubscribableArrayImpl( initialItems, () => { const subsByInputNode = new Map, unsub: Unsubscribe, nodes: QueryNode[], }>() function wireInner(inputNode: QueryNode, inner: SubscribableArray): QueryNode[] { const entry = { inner, unsub: null! as Unsubscribe, nodes: [...inner.items] } entry.unsub = inner.subscribe((event) => { if (isArrayInitEvent(event)) { if (entry.nodes.length) aggregated._remove(entry.nodes) entry.nodes = [...event.init] if (entry.nodes.length) aggregated._push(...entry.nodes) } else { if (event.added.length) { entry.nodes.push(...event.added) aggregated._push(...event.added) } if (event.removed?.length) { for (const r of event.removed) { const idx = entry.nodes.indexOf(r) if (idx >= 0) entry.nodes.splice(idx, 1) } aggregated._remove(event.removed) } } }, 'derived') subsByInputNode.set(inputNode, entry) return entry.nodes } function addInputNode(inputNode: QueryNode): QueryNode[] { return wireInner(inputNode, doQuery(inputNode)) } function removeInputNode(inputNode: QueryNode): QueryNode[] { const entry = subsByInputNode.get(inputNode) if (!entry) return [] entry.unsub() entry.inner.dispose() const removed = entry.nodes subsByInputNode.delete(inputNode) return removed } // Reuse pre-computed inners (no re-creation of sub-queries) for (const { inputNode, inner } of initialInners) { wireInner(inputNode, inner) } // Subscribe to previous step for FUTURE changes only (no init) const prevUnsub = nodeSet.subscribe((event) => { if (isArrayInitEvent(event)) { for (const [, entry] of subsByInputNode) { entry.unsub(); entry.inner.dispose() } subsByInputNode.clear() const allNodes: QueryNode[] = [] for (const node of event.init) { allNodes.push(...addInputNode(node)) } aggregated._reset(allNodes) } else { if (event.added.length) { const allAdded: QueryNode[] = [] for (const node of event.added) { allAdded.push(...addInputNode(node)) } if (allAdded.length) aggregated._push(...allAdded) } if (event.removed?.length) { const allRemoved: QueryNode[] = [] for (const node of event.removed) { allRemoved.push(...removeInputNode(node)) } if (allRemoved.length) aggregated._remove(allRemoved) } } }, 'derived') return () => { prevUnsub() for (const [, entry] of subsByInputNode) { entry.unsub(); entry.inner.dispose() } subsByInputNode.clear() } }, ) if (VERBOSE.isEnabled) VERBOSE(`[liveQueryStep] aggregated initial:`, [...aggregated.items]) return new LiveQueryResult(aggregated) }, { argsDebugName: (thread, _nodes, pattern) => createDebugName({ caller: 'liveQueryStep', thread, pattern }) }) export const queryNot = memoizedFn('queryNot', function queryNot( thread: Thread, startNodes: QueryResult, patternOrPatterns: DatalogQueryPattern | DatalogQueryPattern[], opts: { debug?: boolean } = {}, ) { const nodes = startNodes.nodes DEBUG(`queryNot<${thread.nameAndSizeUntracked}> from: ${nodes.length} nodes`) const patterns = (Array.isArray(patternOrPatterns) ? patternOrPatterns : [patternOrPatterns]) as DatalogQueryPattern[] // For each node, run all patterns as a joined multi-step query. // Exclude the node if ANY complete binding exists across all steps. const filtered = nodes.filter(function innerNodeFilter({ variables }) { // Start with a single binding from the node's variables let bindings: Record[] = [variables ?? {}] for (const pattern of patterns) { if (!Object.entries(pattern).length) throw new Error(`Pattern is empty`) const nextBindings: Record[] = [] for (const binding of bindings) { const [resolved, varsToFill] = resolveOrRemoveVariables(pattern, binding) const filter = makeFilter(resolved) const matchingLogs = filter(thread.applogs) const varMapper = createObjMapper(varsToFill) for (const log of matchingLogs) { nextBindings.push({ ...binding, ...varMapper(log) }) } } bindings = nextBindings if (bindings.length === 0) break // no matches — node is safe, skip remaining patterns } VERBOSE(`[queryNot] node:`, variables, '=> bindings:', bindings.length) if (opts.debug) LOG(`[queryNot] node result:`, variables, '=>', bindings) return bindings.length === 0 // keep node if no complete match found }) return new QueryResult([...filtered]) }, { argsDebugName: (thread, nodes, pattern) => createDebugName({ caller: 'queryNot', thread, pattern }) }) /** Live variant: queryNot with incremental updates. * - Thread additions: O(new_applogs × included_nodes) — only checks new applogs * - Thread removals/resets: full recompute (rare for append-mostly logs) * - Upstream node additions: O(new_nodes × applogs) * - Upstream node removals: removed from output */ export const liveQueryNot = memoizedFn('liveQueryNot', function liveQueryNot( thread: Thread, upstream: LiveQueryResult, patternOrPatterns: DatalogQueryPattern | DatalogQueryPattern[], opts: { debug?: boolean } = {}, ) { const patterns = (Array.isArray(patternOrPatterns) ? patternOrPatterns : [patternOrPatterns]) as DatalogQueryPattern[] /** Check if a node should be excluded (matches the NOT patterns as a joined multi-step query) */ function nodeMatchesNot(node: QueryNode, applogs: readonly Applog[]): boolean { let bindings: Record[] = [node.variables ?? {}] for (const pattern of patterns) { const nextBindings: Record[] = [] for (const binding of bindings) { const [resolved, varsToFill] = resolveOrRemoveVariables(pattern, binding) const filter = makeFilter(resolved) const varMapper = createObjMapper(varsToFill) for (const log of filter(applogs)) { nextBindings.push({ ...binding, ...varMapper(log) }) } } bindings = nextBindings if (bindings.length === 0) return false // no matches — node passes } return bindings.length > 0 // excluded if any complete binding exists } /** Full recompute: filter all upstream nodes against all thread applogs */ function computeAll(): QueryNode[] { return upstream.nodes.filter(node => !nodeMatchesNot(node, thread.applogs)) } const result = new SubscribableArrayImpl( computeAll(), () => { // Subscribe to thread changes const threadUnsub = thread.subscribe((event) => { if (isInitEvent(event)) { // Full reset — recompute everything result._reset(computeAll()) return } if (event.removed?.length) { // Removals: a previously-excluded node might now pass — full recompute result._reset(computeAll()) return } if (event.added.length) { // Additions: only check new applogs against currently-included nodes const toRemove = result.items.filter(node => nodeMatchesNot(node, event.added)) if (toRemove.length > 0) { result._remove(toRemove) } } }, 'derived') // Subscribe to upstream node changes const upstreamUnsub = upstream.subscribe((event) => { if (isArrayInitEvent(event)) { result._reset(computeAll()) return } // New upstream nodes: check each against full thread if (event.added.length) { const passing = event.added.filter(node => !nodeMatchesNot(node, thread.applogs)) if (passing.length > 0) result._push(...passing) } // Removed upstream nodes: remove from our output if (event.removed?.length) { const removedSet = new Set(event.removed) const toRemove = result.items.filter(node => removedSet.has(node)) if (toRemove.length > 0) result._remove(toRemove) } }, 'derived') return () => { threadUnsub(); upstreamUnsub() } }, ) return new LiveQueryResult(result) }, { argsDebugName: (thread, _nodes, pattern) => createDebugName({ caller: 'liveQueryNot', thread, pattern }) }) // export function or(queries: QueryExecutor[]) { // return tagged( // `or{${stringify(queries)} } `, // function orExecutor(args: QueryExecutorArguments) { // const { db, nodes: contexts } = args // VERBOSE('[or]', { queries, contexts }) // let results = [] // for (const query of queries) { // const res = query(args) // VERBOSE('[or] query', query, 'result =>', res) // results.push(...res.nodes) // } // return { contexts: results } // } // ) // } // export type Tagged = T & { tag: string } // export function tagged(tag: string, thing: T): Tagged { // const e = thing as (T & { tag: string }) // e.tag = tag // return e // } ////////////////////// // COMPOSED QUERIES // ////////////////////// /** One-off: filter thread by pattern, map to values. Returns plain array. */ export const filterAndMap = memoizedFn('filterAndMap', function filterAndMap( thread: Thread, pattern: DatalogQueryPattern, mapper: (keyof Applog) | (Partial<{ [key in keyof Applog]: string }>) | ((applog: Applog) => R), ) { DEBUG(`filterAndMap<${thread.nameAndSizeUntracked}>`, pattern) const filter = makeFilter(pattern) const filtered = filter(thread.applogs) return mapApplogsWith(filtered, mapper) }, { argsDebugName: (thread, pattern) => createDebugName({ caller: 'filterAndMap', thread, pattern }) }) /** Live variant: returns SubscribableArray that updates when thread changes. */ export const liveFilterAndMap = memoizedFn('liveFilterAndMap', function liveFilterAndMap( thread: Thread, pattern: DatalogQueryPattern, mapper: (keyof Applog) | (Partial<{ [key in keyof Applog]: string }>) | ((applog: Applog) => R), ) { DEBUG(`liveFilterAndMap<${thread.nameAndSizeUntracked}>`, pattern) const filtered = rollingFilter(thread, pattern) const mapFn = makeApplogMapper(mapper) const cidToMapped = new Map() const mapAndTrack = (log: Applog): R => { const r = mapFn(log) cidToMapped.set(log.cid, r) return r } const initial = filtered.applogs.map(mapAndTrack) const result = new SubscribableArrayImpl( initial, () => filtered.subscribe((event) => { if (isInitEvent(event)) { cidToMapped.clear() result._reset(event.init.map(mapAndTrack)) } else { if (event.added.length) result._push(...event.added.map(mapAndTrack)) if (event.removed?.length) { const toRemove: R[] = [] for (const log of event.removed) { const r = cidToMapped.get(log.cid) if (r === undefined) { WARN(`[liveFilterAndMap] removed log not in cidToMapped`, { log }) continue } cidToMapped.delete(log.cid) toRemove.push(r) } if (toRemove.length) result._remove(toRemove) } } }, 'derived'), ) return result }, { argsDebugName: (thread, pattern) => createDebugName({ caller: 'liveFilterAndMap', thread, pattern }) }) /** One-off: query and map results. Returns plain array. */ export const queryAndMap = memoizedFn('queryAndMap', function queryAndMap( threadOrLogs: Thread | Applog[], patternOrPatterns: Parameters[1], mapDef: string | (Partial<{ [key in keyof SearchContext]: string }>) | ((record: SearchContext) => R), variables: SearchContext = {}, ) { const thread = threadFromMaybeArray(threadOrLogs) DEBUG(`queryAndMap<${thread.nameAndSizeUntracked}>`, { patternOrPatterns, variables, map: mapDef }) const queryResult = query(thread, patternOrPatterns) return mapQueryResultWith(queryResult, mapDef) }, { argsDebugName: (thread, pattern) => createDebugName({ caller: 'queryAndMap', thread, pattern }) }) /** Live variant: query and map results, returns SubscribableArray that updates reactively. */ export const liveQueryAndMap = memoizedFn('liveQueryAndMap', function liveQueryAndMap( thread: Thread, patternOrPatterns: Parameters[1], mapDef: string | (Partial<{ [key in keyof SearchContext]: string }>) | ((record: SearchContext) => R), ) { DEBUG(`liveQueryAndMap<${thread.nameAndSizeUntracked}>`, { patternOrPatterns, map: mapDef }) const live = liveQuery(thread, patternOrPatterns) function computeAll(): R[] { const snapshot = new QueryResult(live.nodes) return mapQueryResultWith(snapshot, mapDef) as R[] } const result = new SubscribableArrayImpl( computeAll(), () => live.subscribe(() => { result._reset(computeAll()) }, 'derived'), ) return result }, { argsDebugName: (thread, pattern) => createDebugName({ caller: 'liveQueryAndMap', thread, pattern }) }) /** One-off: query entity attributes. Returns Record or null. Requires current-state thread (LWW). */ export const queryEntity = memoizedFn('queryEntity', function queryEntity( thread: Thread, name: string, entityID: EntityID, attributes: readonly string[], ) { assertLWW(thread) DEBUG(`queryEntity<${thread.nameAndSizeUntracked}>`, entityID, name) const filter = makeFilter({ en: entityID, at: prefixAttrs(name, attributes) }) const filtered = filter(thread.applogs) VERBOSE(`queryEntity applogs:`, filtered) if (filtered.length === 0) return null return Object.fromEntries( filtered.map(({ at, vl }) => [at.slice(name.length + 1), vl]), ) }, { argsDebugName: (thread, name, entityID) => createDebugName({ caller: 'queryEntity', thread, args: { name, entityID } }), }) /** Live variant: returns Subscribable that updates when entity attributes change. Requires current-state thread (LWW). */ export const liveQueryEntity = memoizedFn('liveQueryEntity', function liveQueryEntity( thread: Thread, name: string, entityID: EntityID, attributes: readonly string[], ) { assertLWW(thread) DEBUG(`liveQueryEntity<${thread.nameAndSizeUntracked}>`, entityID, name) const filtered = rollingFilter(thread, { en: entityID, at: prefixAttrs(name, attributes) }) function compute() { if (filtered.isEmpty) return null return Object.fromEntries( filtered.map(({ at, vl }) => [at.slice(name.length + 1), vl]), ) } const result = new SubscribableImpl | null>( compute(), () => filtered.subscribe(() => { result._set(compute()) }, 'derived'), ) return result }, { argsDebugName: (thread, name, entityID) => createDebugName({ caller: 'liveQueryEntity', thread, args: { name, entityID } }), }) /** Live single-attribute query. Requires current-state thread (LWW). Returns Subscribable. */ export const liveEntityAt = memoizedFn('liveEntityAt', function liveEntityAt( thread: Thread, entityID: EntityID, at: string, ) { assertLWW(thread) DEBUG(`liveEntityAt<${thread.nameAndSizeUntracked}>`, entityID, at) const filtered = rollingFilter(thread, { en: entityID, at }) function compute(): T | null { if (filtered.isEmpty) return null return filtered.applogs[filtered.applogs.length - 1].vl as T } const result = new SubscribableImpl( compute(), () => filtered.subscribe(() => { result._set(compute()) }, 'derived'), ) return result }, { argsDebugName: (thread, entityID, at) => createDebugName({ caller: 'liveEntityAt', thread, args: { entityID, at } }), }) export const agentsOfThread = memoizedFn('agentsOfThread', function agentsOfThread( thread: Thread, ) { DEBUG(`agentsOfThread<${thread.nameAndSizeUntracked}>`) const mapped = new Map() const onEvent = (event: ThreadEvent) => { for (const log of (isInitEvent(event) ? event.init : event.added)) { const prev = mapped.get(log.ag) ?? 0 mapped.set(log.ag, prev + 1) } for (const log of (!isInitEvent(event) && event.removed || [])) { const prev = mapped.get(log.ag) if (!prev || prev < 1) throw ERROR(`[agentsOfThread] number is now negative`, { log, event, mapped, prev }) mapped.set(log.ag, prev - 1) } LOG(`agentsOfThread<${thread.nameAndSizeUntracked}> processed event`, { event, mapped }) } onEvent({ init: thread.applogs }) thread.subscribe(onEvent, 'derived') // TODO: cleanup via ref-counted disposal when no longer needed return mapped }) export const entityOverlap = memoizedFn('entityOverlap', function entityOverlapCount( threadA: Thread, threadB: Thread, ) { LOG(`entityOverlap<${threadA.nameAndSizeUntracked}, ${threadB.nameAndSizeUntracked}>`) // Compute once — snapshot, not reactive (TODO: migrate to Subscribable) const entitiesA = new Set(threadA.map(log => log.en)) const entitiesB = new Set(threadB.map(log => log.en)) return [...entitiesA].filter(en => entitiesB.has(en)) }) export const entityOverlapMap = function entityOverlapMap( threadA: Thread, threadB: Thread, threadAName = 'incoming', threadBName = 'current', ) { const useInferredVM = (en, thread: Thread) => en const overlapping = entityOverlap(threadA, threadB) const mapped = new Map() overlapping.forEach(eachEntityID => ( mapped.set(eachEntityID, { [threadAName]: useInferredVM(eachEntityID, threadA), [threadBName]: useInferredVM(eachEntityID, threadB), }) )) } export const entityOverlapCount = memoizedFn( 'entityOverlapCount', function entityOverlapCount(threadA: Thread, threadB: Thread) { return entityOverlap(threadA, threadB).length }, ) /** Live variant: entity overlap count as Subscribable. */ export const liveEntityOverlapCount = memoizedFn( 'liveEntityOverlapCount', function liveEntityOverlapCount(threadA: Thread, threadB: Thread) { function compute() { const entitiesA = new Set(threadA.map(log => log.en)) const entitiesB = new Set(threadB.map(log => log.en)) return [...entitiesA].filter(en => entitiesB.has(en)).length } const result = new SubscribableImpl( compute(), () => { const unsub1 = threadA.subscribe(() => result._set(compute()), 'derived') const unsub2 = threadB.subscribe(() => result._set(compute()), 'derived') return () => { unsub1(); unsub2() } }, ) return result }, ) export const querySingle = memoizedFn('querySingle', function querySingle( threadOrLogs: Thread | Applog[], patternOrPatterns: Parameters[1], variables: SearchContext = {}, ) { const result = query(threadOrLogs, patternOrPatterns, variables) // Snapshot — not reactive (TODO: migrate to Subscribable) if (result.isEmpty) return null if (result.size > 1) throw ERROR(`[querySingle] got`, result.size, `results:`, result) const logsOfThisNode = result.nodes[0].logsOfThisNode if (logsOfThisNode.size != 1) throw ERROR(`[querySingle] single result, but got`, logsOfThisNode.size, `logs:`, logsOfThisNode.applogs) return logsOfThisNode.applogs[0] }, { argsDebugName: (thread, pattern) => createDebugName({ caller: 'querySingle', thread, pattern }), }) export const querySingleAndMap = memoizedFn( 'querySingleAndMap', function querySingleAndMap))>( threadOrLogs: Thread | Applog[], patternOrPatterns: Parameters[1], mapDef: MAP, variables: SearchContext = {}, ) { const log = querySingle(threadOrLogs, patternOrPatterns, variables) // Snapshot — not reactive (TODO: migrate to Subscribable) if (!log) return undefined if (typeof mapDef === 'string') { return log[mapDef as string] } else { return createObjMapper(mapDef)(log) } }, { argsDebugName: (thread, pattern) => createDebugName({ caller: 'querySingleAndMap', thread, pattern }), }, ) /** Live variant: querySingle returning Subscribable. */ export const liveQuerySingle = memoizedFn('liveQuerySingle', function liveQuerySingle( thread: Thread, patternOrPatterns: Parameters[1], ) { DEBUG(`liveQuerySingle<${thread.nameAndSizeUntracked}>`) const live = liveQuery(thread, patternOrPatterns) function compute(): Applog | null { if (live.isEmpty) return null if (live.size > 1) throw ERROR(`[liveQuerySingle] got`, live.size, `results`) const logsOfThisNode = live.nodes[0].logsOfThisNode if (logsOfThisNode.size !== 1) throw ERROR(`[liveQuerySingle] single result, but got`, logsOfThisNode.size, `logs`) return logsOfThisNode.applogs[0] } const result = new SubscribableImpl( compute(), () => live.subscribe(() => { result._set(compute()) }), ) return result }, { argsDebugName: (thread, pattern) => createDebugName({ caller: 'liveQuerySingle', thread, pattern }), }) /** Live variant: querySingleAndMap returning Subscribable. */ export const liveQuerySingleAndMap = memoizedFn( 'liveQuerySingleAndMap', function liveQuerySingleAndMap))>( thread: Thread, patternOrPatterns: Parameters[1], mapDef: MAP, ) { DEBUG(`liveQuerySingleAndMap<${thread.nameAndSizeUntracked}>`) const liveSingle = liveQuerySingle(thread, patternOrPatterns) function compute() { const log = liveSingle.value if (!log) return undefined if (typeof mapDef === 'string') { return log[mapDef as string] } else { return createObjMapper(mapDef)(log) } } const result = new SubscribableImpl( compute(), () => liveSingle.subscribe(() => { result._set(compute()) }), ) return result }, { argsDebugName: (thread, pattern) => createDebugName({ caller: 'liveQuerySingleAndMap', thread, pattern }), }, ) ///////////// // HELPERS // ///////////// /** Create a single-applog mapper function from a mapDef */ export function makeApplogMapper( mapDef: (keyof Applog) | (Partial<{ [key in keyof Applog]: string }>) | ((applog: Applog) => R), ): (applog: Applog) => R { if (typeof mapDef === 'function') { return mapDef as (applog: Applog) => R } else if (typeof mapDef === 'string') { return (log: Applog) => log[mapDef] as R } else { return createObjMapper(mapDef) as (applog: Applog) => R } } /** Map an array of applogs using a mapDef */ export function mapApplogsWith( applogs: readonly Applog[], mapDef: (keyof Applog) | (Partial<{ [key in keyof Applog]: string }>) | ((applog: Applog) => R), ) { return applogs.map(makeApplogMapper(mapDef)) } export const mapThreadWith = function filterAndMapGetterFx( thread: Thread, mapDef: (keyof Applog) | (Partial<{ [key in keyof Applog]: string }>) | ((applog: Applog) => R), ) { return mapApplogsWith(thread.applogs, mapDef) } export const mapQueryResultWith = function filterAndMapGetterFx( queryResult: QueryResult, mapDef: string | (Partial<{ [key in keyof SearchContext]: string }>) | ((record: SearchContext) => R), ) { if (typeof mapDef === 'function') { return queryResult.records.map(mapDef) } else if (typeof mapDef === 'string') { return queryResult.nodes.map((node) => { if (!Object.hasOwn(node.record, mapDef)) { if (node.logsOfThisNode.size !== 1) { throw ERROR(`not sure what to map (it's not a var and a result node log count of ${node.logsOfThisNode.size})`) } return node.logsOfThisNode.firstLog[mapDef] } return node.record[mapDef] }) } else { return queryResult.nodes.map((node) => { return createObjMapper(mapDef)(node.record) }) } } /** * Map Applog to custom named record, e.g.: * { en: 'movieID', vl: 'movieName' } * will map the applog to { movieID: .., movieName: .. } */ export function createObjMapper(applogFieldMap: Partial<{ [key in FROM]: TO }>) { return (applog: { [key in FROM]: any }) => { return Object.entries(applogFieldMap).reduce((acc, [key, value]) => { acc[value as TO] = applog[key] return acc }, {} as Partial<{ [key in TO]: ApplogValue }>) } } export function startsWith(str: string) { return (value) => value.startsWith(str) } export function prefixAttrs(prefix: string, attrs: readonly string[]) { return attrs.map(at => prefixAt(prefix, at)) } export function prefixAt(prefix: string, attr: string) { return `${prefix}/${attr}` } /** Inverse of prefixAt — strips everything up to and including the first `/` */ export function stripAtPrefix(attr: string): string { const idx = attr.indexOf('/') return idx >= 0 ? attr.slice(idx + 1) : attr } /** Create a key mapper from an explicit attribute→key record */ export function mapAttributes(mapping: Record): (attr: A) => string { return (attr) => mapping[attr] ?? attr } /** Resolve key mapping options to a concrete mapper function */ export function resolveKeyMapper(opts?: { stripAtPrefix?: true | string; mapKeys?: (attr: string) => string }): (attr: string) => string { if (!opts) return (attr) => attr if (opts.mapKeys) return opts.mapKeys if (opts.stripAtPrefix === true) return stripAtPrefix if (typeof opts.stripAtPrefix === 'string') { const prefix = opts.stripAtPrefix + '/' return (attr) => attr.startsWith(prefix) ? attr.slice(prefix.length) : attr } return (attr) => attr } export function threadFromMaybeArray(threadOrLogs: Thread | Applog[], name?: string) { if (!Array.isArray(threadOrLogs)) { return threadOrLogs } return ThreadInMemory.fromArray(threadOrLogs, name || `threadFromArray[${threadOrLogs.length}]`, true) } export function withTimeout(timeoutMilliseconds: number, func: () => R) { if (globalQueryTimeoutTime) throw ERROR(`Nested timeout not supported`) globalQueryTimeoutTime = performance.now() + timeoutMilliseconds try { return func() } finally { globalQueryTimeoutTime = null } } function getPatternVariableNames(pattern: DatalogQueryPattern): Set { const vars = new Set() for (const value of Object.values(pattern)) { if (isVariable(value)) { vars.add((value as string).slice(1)) } } return vars } /** * Warn if a multi-step query has steps that are not connected via shared variables. * Disconnected steps produce a cartesian product instead of a join. */ function warnIfDisjointQuerySteps(patterns: DatalogQueryPattern[]) { if (patterns.length < 2) return const varSets = patterns.map(getPatternVariableNames) const reachable = new Set(varSets[0]) for (let i = 1; i < varSets.length; i++) { const stepVars = varSets[i] if (stepVars.size === 0) { WARN( `[query] Step ${i} has no variables — it produces identical results regardless of previous steps (cartesian product).`, `Patterns:`, patterns, ) continue } const connected = [...stepVars].some(v => reachable.has(v)) if (!connected) { WARN( `[query] Step ${i} is disconnected from previous steps — no shared variable.`, `This produces a cartesian product instead of a join.`, `Step ${i} variables: {${[...stepVars].join(', ')}}`, `Reachable from prior steps: {${[...reachable].join(', ')}}`, `Patterns:`, patterns, ) } for (const v of stepVars) reachable.add(v) } } export function throwOnTimeout() { if (globalQueryTimeoutTime == null) return if (performance.now() >= globalQueryTimeoutTime) { throw new QueryTimeoutError(globalQueryTimeoutTime) } } class QueryTimeoutError extends Error { constructor(message: string) { super(message) } }