// This is a simple implementation of a reactive dependency graph. // Key Terminology: // Ref: a mutable cell where values can be set // Thunk: a pure computation that depends on other values // Effect: a side effect that runs when a value changes; return value is ignored // Atom: a node returning a value that can be depended on: Ref | Thunk // Super computation: Nodes that depend on a given node ("downstream") // Sub computation: Nodes that a given node depends on ("upstream") // This vocabulary comes from the MiniAdapton paper linked below, although // we don't actually implement the MiniAdapton algorithm because we don't need lazy recomputation. // https://arxiv.org/abs/1609.05337 // Features: // - Dependencies are tracked automatically in thunk computations by using a getter function // to reference other atoms. // - Whenever a ref is updated, the graph is eagerly refreshed to be consistent with the new values. // - We minimize recomputation by refreshing the graph in topological sort order. (The topological height // is maintained eagerly as edges are added and removed.) // - At every thunk we check value equality with the previous value and cutoff propagation if possible. import type * as otel from '@opentelemetry/api' import { BoundArray } from '@livestore/common' import { deepEqual, omitUndefineds, shouldNeverHappen } from '@livestore/utils' import type { Types } from '@livestore/utils/effect' // import { getDurationMsFromSpan } from './otel.ts' export const NOT_REFRESHED_YET = Symbol.for('NOT_REFRESHED_YET') export type NOT_REFRESHED_YET = typeof NOT_REFRESHED_YET export type GetAtom = ( atom: Atom, otelContext?: otel.Context , debugRefreshReason?: TODO , ) => T export type Ref = { _tag: 'ref' id: string isDirty: false isDestroyed: boolean previousResult: T computeResult: () => T sub: Set> // always empty super: Set | Effect> label?: string | undefined /** Container for meta information (e.g. the LiveStore Store) */ meta?: unknown equal: (a: T, b: T) => boolean refreshes: number } export type Thunk = { _tag: 'thunk' id: string isDirty: boolean isDestroyed: boolean computeResult: (otelContext?: otel.Context, debugRefreshReason?: TDebugRefreshReason) => TResult previousResult: TResult | NOT_REFRESHED_YET sub: Set> super: Set | Effect> label?: string | undefined /** Container for meta information (e.g. the LiveStore Store) */ meta?: unknown equal: (a: TResult, b: TResult) => boolean recomputations: number __getResult: any } export type Atom = | Ref | Thunk export type Effect = { _tag: 'effect' id: string isDestroyed: boolean doEffect: (otelContext?: otel.Context , debugRefreshReason?: TDebugRefreshReason ) => void sub: Set> label?: string | undefined invocations: number } export type Node = | Atom | Effect export const isThunk = ( obj: unknown, ): obj is Thunk => { return typeof obj === 'object' && obj !== null && '_tag' in obj && (obj as { _tag: unknown })._tag === 'thunk' } export type DebugThunkInfo = { _tag: T durationMs: number } export type DebugRefreshReasonBase = /** Usually in response to some `commit` calls with `skipRefresh: true` */ | { _tag: 'runDeferredEffects' originalRefreshReasons?: ReadonlyArray | undefined manualRefreshReason?: DebugRefreshReasonBase | undefined } | { _tag: 'makeThunk'; label?: string | undefined } | { _tag: 'unknown' } export type DebugRefreshReason = DebugRefreshReasonBase | { _tag: T } export type AtomDebugInfo = { atom: SerializedAtom resultChanged: boolean debugInfo: TDebugThunkInfo } // TODO possibly find a better name for "refresh" export type RefreshDebugInfo = { /** Currently only used for easier handling in React (e.g. as key) */ id: string reason: TDebugRefreshReason refreshedAtoms: AtomDebugInfo[] skippedRefresh: boolean durationMs: number /** Note we're using a regular `Date.now()` timestamp here as it's faster to produce and we don't need the fine accuracy */ completedTimestamp: number graphSnapshot: ReactiveGraphSnapshot } const unknownRefreshReason = () => { // debugger return { _tag: 'unknown' as const } } export type EncodedOption = { _tag: 'Some'; value?: A | undefined } | { _tag: 'None' } const encodedOptionSome = (value: A): EncodedOption => ({ _tag: 'Some', value }) const encodedOptionNone = (): EncodedOption => ({ _tag: 'None' }) export type SerializedAtom = SerializedRef | SerializedThunk export type SerializedRef = Readonly< Types.Simplify< Pick, '_tag' | 'id' | 'label' | 'meta' | 'isDirty' | 'isDestroyed' | 'refreshes'> & { /** Is `None` if `getSnapshot` was called with `includeResults: false` which is the default */ previousResult: EncodedOption sub: ReadonlyArray super: ReadonlyArray } > > export type SerializedThunk = Readonly< Types.Simplify< Pick< Thunk, '_tag' | 'id' | 'label' | 'meta' | 'isDirty' | 'isDestroyed' | 'recomputations' > & { /** Is `None` if `getSnapshot` was called with `includeResults: false` which is the default */ previousResult: EncodedOption sub: ReadonlyArray super: ReadonlyArray } > > export type SerializedEffect = Readonly< Types.Simplify< Pick, '_tag' | 'id' | 'label' | 'invocations' | 'isDestroyed'> & { sub: ReadonlyArray } > > export type ReactiveGraphSnapshot = { readonly atoms: ReadonlyArray readonly effects: ReadonlyArray /** IDs of deferred effects */ readonly deferredEffects: ReadonlyArray } let globalGraphIdCounter = 0 const uniqueGraphId = () => `graph-${++globalGraphIdCounter}` /** Used for testing */ export const __resetIds = () => { globalGraphIdCounter = 0 } export class ReactiveGraph< TDebugRefreshReason extends DebugRefreshReason, TDebugThunkInfo extends DebugThunkInfo, TContext extends { effectsWrapper?: ((runEffects: () => void) => void) | undefined } = {}, > { id = uniqueGraphId() readonly atoms: Set> = new Set() readonly effects: Set> = new Set() context: TContext | undefined debugRefreshInfos: BoundArray> = new BoundArray(200) private currentDebugRefresh: { refreshedAtoms: AtomDebugInfo[]; startMs: number } | undefined private deferredEffects: Map, Set> = new Map() private refreshCallbacks: Set<() => void> = new Set() private nodeIdCounter = 0 private uniqueNodeId = () => `node-${++this.nodeIdCounter}` private refreshInfoIdCounter = 0 private uniqueRefreshInfoId = () => `refresh-info-${++this.refreshInfoIdCounter}` makeRef( val: T, options?: { label?: string; meta?: unknown; equal?: (a: T, b: T) => boolean }, ): Ref { const ref: Ref = { _tag: 'ref', id: this.uniqueNodeId(), isDirty: false, isDestroyed: false, previousResult: val, computeResult: () => ref.previousResult, sub: new Set(), super: new Set(), ...omitUndefineds({ label: options?.label, meta: options?.meta }), equal: options?.equal ?? deepEqual, refreshes: 0, } this.atoms.add(ref) return ref } makeThunk( getResult: ( get: GetAtom, setDebugInfo: (debugInfo: TDebugThunkInfo) => void, ctx: TContext, otelContext: otel.Context | undefined, debugRefreshReason: TDebugRefreshReason | undefined, ) => T, options?: | { label?: string meta?: any equal?: (a: T, b: T) => boolean } , ): Thunk { const thunk: Thunk = { _tag: 'thunk', id: this.uniqueNodeId(), previousResult: NOT_REFRESHED_YET, isDirty: true, isDestroyed: false, computeResult: (otelContext, debugRefreshReason) => { if (thunk.isDirty === true) { const neededCurrentRefresh = this.currentDebugRefresh === undefined let localDebugRefresh: { refreshedAtoms: any[]; startMs: number } | undefined if (neededCurrentRefresh === true) { // Use local variable to prevent corruption from nested computations localDebugRefresh = { refreshedAtoms: [], startMs: performance.now() } this.currentDebugRefresh = localDebugRefresh } // Reset previous subcomputations as we're about to re-add them as part of the `doEffect` call below thunk.sub = new Set() const getAtom = (atom: Atom, otelContext: otel.Context) => { this.addEdge(thunk, atom) return compute(atom, otelContext, debugRefreshReason) } let debugInfo: TDebugThunkInfo | undefined const setDebugInfo = (debugInfo_: TDebugThunkInfo) => { debugInfo = debugInfo_ } const result = getResult( getAtom as GetAtom, setDebugInfo, this.context ?? throwContextNotSetError(this), otelContext, debugRefreshReason, ) const resultChanged = ! thunk.equal(thunk.previousResult as T, result) const debugInfoForAtom = { atom: serializeAtom(thunk, false), resultChanged, debugInfo: debugInfo ?? (unknownRefreshReason() as TDebugThunkInfo), } satisfies AtomDebugInfo // Use currentDebugRefresh if available (could be from parent or local) const debugRefresh = localDebugRefresh ?? this.currentDebugRefresh if (debugRefresh !== undefined) { debugRefresh.refreshedAtoms.push(debugInfoForAtom) } thunk.isDirty = false thunk.previousResult = result thunk.recomputations++ if (neededCurrentRefresh === true && localDebugRefresh !== undefined) { // Use local reference which can't be corrupted by nested calls const refreshedAtoms = localDebugRefresh.refreshedAtoms const durationMs = performance.now() - localDebugRefresh.startMs this.currentDebugRefresh = undefined this.debugRefreshInfos.push({ id: this.uniqueRefreshInfoId(), reason: debugRefreshReason ?? ({ _tag: 'makeThunk', label: options?.label } as TDebugRefreshReason), skippedRefresh: false, refreshedAtoms, durationMs, completedTimestamp: Date.now(), graphSnapshot: this.getSnapshot({ includeResults: false }), }) } return result } else { return thunk.previousResult as T } }, sub: new Set(), super: new Set(), recomputations: 0, ...omitUndefineds({ label: options?.label, meta: options?.meta }), equal: options?.equal ?? deepEqual, __getResult: getResult, } this.atoms.add(thunk) return thunk } destroyNode(node: Node) { // console.debug(`destroying node (${node._tag})`, node.id, node.label) // Recursively destroy any supercomputations if (node._tag === 'ref' || node._tag === 'thunk') { for (const superComp of node.super) { this.destroyNode(superComp) } } // Destroy this node if (node._tag !== 'ref') { for (const subComp of node.sub) { this.removeEdge(node, subComp) } } if (node._tag === 'effect') { this.deferredEffects.delete(node) this.effects.delete(node) } else { this.atoms.delete(node) } node.isDestroyed = true } destroy() { // NOTE we don't need to sort the atoms first, as `destroyNode` will recursively destroy all supercomputations for (const node of this.atoms) { this.destroyNode(node) } } makeEffect( doEffect: ( get: GetAtom, otelContext: otel.Context | undefined, debugRefreshReason: DebugRefreshReason | undefined, ) => void, options?: { label?: string } , ): Effect { const effect: Effect = { _tag: 'effect', id: this.uniqueNodeId(), isDestroyed: false, doEffect: (otelContext, debugRefreshReason) => { effect.invocations++ // NOTE we're not tracking any debug refresh info for effects as they're tracked by the thunks they depend on // Reset previous subcomputations as we're about to re-add them as part of the `doEffect` call below effect.sub = new Set() const getAtom = ( atom: Atom, otelContext: otel.Context, debugRefreshReason: DebugRefreshReason | undefined, ) => { this.addEdge(effect, atom) return compute(atom, otelContext, debugRefreshReason) } doEffect(getAtom as GetAtom, otelContext, debugRefreshReason) }, sub: new Set(), ...omitUndefineds({ label: options?.label }), invocations: 0, } this.effects.add(effect) return effect } setRef( ref: Ref, val: T, options?: | { skipRefresh?: boolean debugRefreshReason?: TDebugRefreshReason otelContext?: otel.Context } , ) { this.setRefs([[ref, val]], options) } setRefs( refs: [Ref, T][], options?: | { skipRefresh?: boolean debugRefreshReason?: TDebugRefreshReason otelContext?: otel.Context } , ) { const effectsToRefresh = new Set>() for (const [ref, val] of refs) { ref.previousResult = val ref.refreshes++ markSuperCompDirtyRec(ref, effectsToRefresh) } if (options?.skipRefresh === true) { for (const effect of effectsToRefresh) { if (this.deferredEffects.has(effect) === false) { this.deferredEffects.set(effect, new Set()) } if (options?.debugRefreshReason !== undefined) { this.deferredEffects.get(effect)!.add(options.debugRefreshReason) } } } else { this.runEffects(effectsToRefresh, { debugRefreshReason: options?.debugRefreshReason ?? (unknownRefreshReason() as TDebugRefreshReason), ...omitUndefineds({ otelContext: options?.otelContext }), }) } } private runEffects = ( effectsToRefresh: Set>, options: { debugRefreshReason: TDebugRefreshReason otelContext?: otel.Context }, ) => { const effectsWrapper = this.context?.effectsWrapper ?? ((runEffects: () => void) => runEffects()) effectsWrapper(() => { // Capture debug state in local variable to prevent corruption from nested runEffects const previousDebugRefresh = this.currentDebugRefresh const localDebugRefresh = { refreshedAtoms: [], startMs: performance.now() } this.currentDebugRefresh = localDebugRefresh try { for (const effect of effectsToRefresh) { effect.doEffect(options?.otelContext, options.debugRefreshReason) } } finally { this.currentDebugRefresh = previousDebugRefresh } const refreshedAtoms = localDebugRefresh.refreshedAtoms const durationMs = performance.now() - localDebugRefresh.startMs const refreshDebugInfo: RefreshDebugInfo = { id: this.uniqueRefreshInfoId(), reason: options.debugRefreshReason, skippedRefresh: false, refreshedAtoms, durationMs, completedTimestamp: Date.now(), graphSnapshot: this.getSnapshot({ includeResults: false }), } this.debugRefreshInfos.push(refreshDebugInfo) this.runRefreshCallbacks() }) } runDeferredEffects = (options?: { debugRefreshReason?: TDebugRefreshReason; otelContext?: otel.Context }) => { // TODO improve how refresh reasons are propagated for deferred effect execution // TODO also improve "batching" of running deferred effects (i.e. in a single `this.runEffects` call) // but need to be careful to not overwhelm the main thread for (const [effect, debugRefreshReasons] of this.deferredEffects) { this.runEffects(new Set([effect]), { debugRefreshReason: { _tag: 'runDeferredEffects', originalRefreshReasons: Array.from(debugRefreshReasons) as ReadonlyArray, ...omitUndefineds({ manualRefreshReason: options?.debugRefreshReason }), } as unknown as TDebugRefreshReason, ...omitUndefineds({ otelContext: options?.otelContext }), }) } } runRefreshCallbacks = () => { for (const cb of this.refreshCallbacks) { cb() } } addEdge( superComp: Thunk | Effect, subComp: Atom, ) { superComp.sub.add(subComp) subComp.super.add(superComp) if (this.currentDebugRefresh === undefined) { this.runRefreshCallbacks() } } removeEdge( superComp: Thunk | Effect, subComp: Atom, ) { superComp.sub.delete(subComp) const effectsToRefresh = new Set>() markSuperCompDirtyRec(subComp, effectsToRefresh) for (const effect of effectsToRefresh) { this.deferredEffects.set(effect, new Set()) } subComp.super.delete(superComp) if (this.currentDebugRefresh === undefined) { this.runRefreshCallbacks() } } // NOTE This function is performance-optimized (i.e. not using `Array.from`) getSnapshot = (opts?: { includeResults: boolean }): ReactiveGraphSnapshot => { const { includeResults = false } = opts ?? {} const atoms: SerializedAtom[] = [] for (const atom of this.atoms) { atoms.push(serializeAtom(atom, includeResults)) } const effects: SerializedEffect[] = [] for (const effect of this.effects) { effects.push(serializeEffect(effect)) } const deferredEffects: string[] = [] for (const [effect] of this.deferredEffects) { deferredEffects.push(effect.id) } return { atoms, effects, deferredEffects } } subscribeToRefresh = (cb: () => void) => { this.refreshCallbacks.add(cb) return () => { this.refreshCallbacks.delete(cb) } } } const compute = ( atom: Atom, otelContext: otel.Context, debugRefreshReason: DebugRefreshReason | undefined, ): T => { // const __getResult = atom._tag === 'thunk' ? atom.__getResult.toString() : '' if (atom.isDestroyed === true) { shouldNeverHappen(`LiveStore Error: Attempted to compute destroyed ${atom._tag} (${atom.id}): ${atom.label ?? ''}`) } if (atom.isDirty === true) { // console.log('atom is dirty', atom.id, atom.label ?? '', atom._tag, __getResult) const result = atom.computeResult(otelContext, debugRefreshReason) atom.isDirty = false atom.previousResult = result return result } else { // console.log('atom is clean', atom.id, atom.label ?? '', atom._tag, __getResult) return atom.previousResult as T } } const markSuperCompDirtyRec = (atom: Atom, effectsToRefresh: Set>) => { for (const superComp of atom.super) { if (superComp._tag === 'thunk') { superComp.isDirty = true markSuperCompDirtyRec(superComp, effectsToRefresh) } else { effectsToRefresh.add(superComp) } } } export const throwContextNotSetError = (graph: ReactiveGraph): never => { throw new Error(`LiveStore Error: \`context\` not set on ReactiveGraph (${graph.id})`) } // NOTE This function is performance-optimized (i.e. not using `pick` and `Array.from`) const serializeAtom = (atom: Atom, includeResult: boolean): SerializedAtom => { const sub: string[] = [] for (const a of atom.sub) { sub.push(a.id) } const super_: string[] = [] for (const a of atom.super) { super_.push(a.id) } const previousResult: EncodedOption = includeResult === true ? encodedOptionSome( atom.previousResult === NOT_REFRESHED_YET ? '"SYMBOL_NOT_REFRESHED_YET"' : JSON.stringify(atom.previousResult), ) : encodedOptionNone() if (atom._tag === 'ref') { return { _tag: atom._tag, id: atom.id, ...omitUndefineds({ label: atom.label, meta: atom.meta }), isDirty: atom.isDirty, sub, super: super_, isDestroyed: atom.isDestroyed, refreshes: atom.refreshes, previousResult, } } return { _tag: 'thunk', id: atom.id, ...omitUndefineds({ label: atom.label, meta: atom.meta }), isDirty: atom.isDirty, sub, super: super_, isDestroyed: atom.isDestroyed, recomputations: atom.recomputations, previousResult, } } // NOTE This function is performance-optimized (i.e. not using `pick` and `Array.from`) const serializeEffect = (effect: Effect): SerializedEffect => { const sub: string[] = [] for (const a of effect.sub) { sub.push(a.id) } return { _tag: effect._tag, id: effect.id, ...omitUndefineds({ label: effect.label }), sub, invocations: effect.invocations, isDestroyed: effect.isDestroyed, } }