import { Logger } from 'besonders-logger' import { createBLAKE3 } from 'hash-wasm' import { pick } from 'lodash-es' import { CID } from 'multiformats' import { arraysContainSameElements } from '../applog/applog-utils.ts' import { areApplogsEqual, valueEq } from '../applog/applog-utils.ts' import { type Applog, ApplogForInsert, CidString } from '../applog/datom-types.ts' import type { SubscribableArray, ArrayEvent } from '../query/subscribable.ts' import { isArrayInitEvent } from '../query/subscribable.ts' import { areCidsEqual } from '../ipfs/ipfs-utils.ts' import { prettifyThreadName } from '../utils/debug-name.ts' import { arrayIfSingle, ArrayOrSingle } from '../types/typescript-utils.ts' const { WARN, LOG, DEBUG, VERBOSE, ERROR } = Logger.setup(Logger.INFO, { prefix: '[thread]' }) export type ThreadEvent = ArrayEvent /** @deprecated Use isArrayInitEvent from @wovin/core/query */ export const isInitEvent = isArrayInitEvent export type ApplogsOrThread = Thread | readonly Applog[] // const blakeHasher = await createBLAKE3() export abstract class Thread implements SubscribableArray { readonly filters: readonly string[] readonly parents: Thread[] | readonly Thread[] | null protected _derivedSubscribers: ((event: ThreadEvent) => void)[] = [] protected _subscribers: ((event: ThreadEvent) => void)[] = [] /** Monotonic counter incremented on every mutation. Used by memoizedFn to invalidate caches. */ _version = 0 constructor( readonly name: string, /* = null */ parents: ArrayOrSingle | readonly Thread[] | null, filters: readonly string[], protected _applogs: Applog[] = [], ) { this.parents = parents === null ? null : arrayIfSingle(parents) as readonly Thread[] this.filters = filters // ? uniq([...parents?.map(p => p.filters), filters]) if (this.parents?.length === 0) { WARN(`[Thread] empty parents array`, name) // just to see where it happens, is actually mostly fine } } get readOnly() { if (this.parents.length !== 1) return true // ? multi-parent writable stream? - we don't have a use-case for this yet, but could this be a thing? return this.parents[0].readOnly } public insert(appLogsToInsert: ArrayOrSingle) { if (this.readOnly) throw ERROR(`[Thread] insert() called on read-only thread:`, this.nameAndSizeUntracked) if (!this.parents) throw ERROR(`[Thread] insert() called on non-writable thread without parents:`, this.nameAndSizeUntracked) if (this.parents?.length !== 1) throw ERROR(`[Thread] insert() called on thread with multiple parents:`, this.nameAndSizeUntracked) return this.parents[0].insert(appLogsToInsert) } public insertRaw(appLogsToInsert: readonly Applog[]) { if (this.readOnly) throw ERROR(`[Thread] insertRaw() called on read-only thread:`, this.nameAndSizeUntracked) if (!this.parents) throw ERROR(`[Thread] insertRaw() called on non-writable thread without parents:`, this.nameAndSizeUntracked) if (this.parents?.length !== 1) throw ERROR(`[Thread] insertRaw() called on thread with multiple parents:`, this.nameAndSizeUntracked) return this.parents[0].insertRaw(appLogsToInsert) } subscribe(callback: (event: ThreadEvent) => void, type?: 'derived' | 'reaction') { const list = type === 'derived' ? this._derivedSubscribers : this._subscribers list.push(callback) return () => { const idx = list.indexOf(callback) if (idx >= 0) list.splice(idx, 1) } } protected notifySubscribers(event: ThreadEvent) { this._version++ DEBUG(`[thread: ${this.name}] notifying`, this._derivedSubscribers.length, 'derived +', this._subscribers.length, 'subscribers of', { ...event, subs: this._subscribers }) const derived = [...this._derivedSubscribers] // snapshot — safe if a subscriber unsubs during iteration for (const subscriber of derived) { subscriber(event) } const subs = [...this._subscribers] for (const subscriber of subs) { subscriber(event) } } // ── SubscribableArray ── get items(): readonly Applog[] { return this._applogs } dispose() { this._derivedSubscribers.length = 0 this._subscribers.length = 0 } get applogs(): readonly Applog[] /* (i) only type hint, not actually immutable */ { // VERBOSE.isDisabled || trace() return this._applogs } get applogsCids(): readonly CidString[] { return this._applogs.map(l => l.cid) } get applogsCidSet(): ReadonlySet { return new Set(this._applogs.map(l => l.cid)) } public map(fn: (applog: Applog) => R) { // if (!this.applogs.map) throw ERROR(`thread.applogs is not an array?!`, this.applogs) return this.applogs.map(fn) } public get findLast() { return this.applogs.findLast.bind(this.applogs) } public get findFirst() { return this.applogs.find.bind(this.applogs) } get firstLog() { return this.applogs[0] } get latestLog() { return this.applogs[this.applogs.length - 1] } public hasApplog(applog: Applog, byRef: boolean) { if (byRef) { return this.applogs.includes(applog) } else { if (!applog.cid) throw ERROR(`[hasApplogs] applog without CID:`, applog) // trying to make this be always the case return this.hasApplogCid(applog.cid) // const keySet = Object.keys(applog) / / HACK: sanity check to catch bugs // return !!this.applogs.find(log => { // if (!arraysContainSameElements(keySet, Object.keys(log))) { // /* throw */ ERROR(`[hasApplog] field set mismatch:`, { applog, log }) / / HACK: properly handle this // return comparer.structural(pick(log, ['en', 'at', 'vl', 'ag', 'ts']), pick(applog, ['en', 'at', 'vl', 'ag', 'ts'])) // } // return areApplogsEqual(log, applog) // }) } } public hasApplogCid(cid: CID | CidString) { return this.applogsCidSet.has(cid.toString()) // O(1) via Set vs O(n) via includes } get applogsByCid() { return new Map(this.applogs.map(log => [log.cid, log])) } public getApplog(cid: CID | CidString) { return this.applogsByCid.get(cid.toString()) // .find(function findApplogInThread(log) { // return areCidsEqual(log.cid, cid) // }) } public hasApplogWithDiffTs(applog: ApplogForInsert) { // HACK this is basically as inefficient as it gets return this.applogs.find(existing => ( existing.en === applog.en && existing.at === applog.at && valueEq(existing.vl, applog.vl) && existing.ag === applog.ag )) } // get stateHash() { // blakeHasher.init() // for (const log of this.applogs) { // blakeHasher.update(log.cid) // } // return blakeHasher.digest() // } get isEmpty() { return this.size === 0 } get size() { return this.applogs.length } get length() { return this.applogs.length } get untrackedSize() { return this.size } get nameAndSizeUntracked() { return `${this.name} (${this.size})` } get prettyName() { return prettifyThreadName(this.name) } get hasParents() { return !!this.parents?.length } } export const getLogsFromThread = (logsOrThread: ApplogsOrThread) => logsOrThread instanceof Thread ? logsOrThread.applogs : logsOrThread export class StaticThread extends Thread { static fromArray(applogs: Applog[], name?: string) { return new StaticThread(name || 'static', null, [], applogs) } constructor( name: string, /* = null */ parents: ArrayOrSingle | readonly Thread[] | null, filters: readonly string[], _applogs: Applog[], ) { super(name, parents, filters, _applogs) } get readOnly() { return true } }