/** * Tests for joinThreads — the multi-parent thread merge. * * Covers: initial merge, incremental adds, deduplication, setParents, * triggerRemap, and subscriber notifications. */ import { afterEach, describe, expect, it, vi } from 'vitest' import { finalizeApplogForInsert, joinThreads } from '../applog/applog-helpers.ts' import { sortApplogsByTs } from '../applog/applog-utils.ts' import type { Applog, ApplogForInsert, CidString } from '../applog/datom-types.ts' import { isInitEvent, type ThreadEvent } from './basic.ts' import { ThreadInMemory } from './writeable.ts' // ─── Helpers ──────────────────────────────────────────────────── let tsCounter = 0 /** Create a valid Applog with proper CID (can be used with insertRaw) */ function makeLog(overrides: Partial & Pick): Applog { tsCounter++ const base = { ts: overrides.ts ?? new Date(1700000000000 + tsCounter * 1000).toISOString(), pv: null, ag: 'testAgent', ...overrides, } // Use finalizeApplogForInsert to get a real CID return finalizeApplogForInsert(base as ApplogForInsert, {}) } function makeLogs(...specs: Array & Pick>): Applog[] { return sortApplogsByTs(specs.map(s => makeLog(s))) } function threadFrom(logs: Applog[], name = 'test'): ThreadInMemory { return ThreadInMemory.fromArray(logs, name) } afterEach(() => { tsCounter = 0 }) // ═════════════════════════════════════════════════════════════════ // Initial merge (compute) // ═════════════════════════════════════════════════════════════════ describe('joinThreads — initial merge', () => { it('merges applogs from two threads', () => { const logsA = makeLogs( { en: 'e1', at: 'name', vl: 'Alice' }, ) const logsB = makeLogs( { en: 'e2', at: 'name', vl: 'Bob' }, ) const joined = joinThreads([threadFrom(logsA, 'A'), threadFrom(logsB, 'B')]) expect(joined.size).toBe(2) expect(joined.applogs.map(l => l.vl)).toContain('Alice') expect(joined.applogs.map(l => l.vl)).toContain('Bob') }) it('deduplicates identical applogs across threads', () => { const shared = makeLog({ en: 'e1', at: 'name', vl: 'Alice' }) const a = threadFrom([shared], 'A') const b = threadFrom([shared], 'B') const joined = joinThreads([a, b]) expect(joined.size).toBe(1) }) it('sorts merged result by timestamp', () => { const early = makeLog({ en: 'e1', at: 'x', vl: 1, ts: '2020-01-01T00:00:00Z' }) const late = makeLog({ en: 'e2', at: 'x', vl: 2, ts: '2025-01-01T00:00:00Z' }) // Give them to threads in reverse order const joined = joinThreads([threadFrom([late], 'A'), threadFrom([early], 'B')]) expect(joined.applogs[0]).toBe(early) expect(joined.applogs[1]).toBe(late) }) it('handles empty threads', () => { const logsA = makeLogs({ en: 'e1', at: 'name', vl: 'Alice' }) const joined = joinThreads([threadFrom(logsA, 'A'), threadFrom([], 'B')]) expect(joined.size).toBe(1) }) it('handles joining a single thread', () => { const logsA = makeLogs({ en: 'e1', at: 'name', vl: 'Alice' }) const joined = joinThreads([threadFrom(logsA, 'A')]) expect(joined.size).toBe(1) }) it('merges three threads', () => { const a = threadFrom(makeLogs({ en: 'e1', at: 'x', vl: 1 }), 'A') const b = threadFrom(makeLogs({ en: 'e2', at: 'x', vl: 2 }), 'B') const c = threadFrom(makeLogs({ en: 'e3', at: 'x', vl: 3 }), 'C') const joined = joinThreads([a, b, c]) expect(joined.size).toBe(3) }) }) // ═════════════════════════════════════════════════════════════════ // Incremental updates (mapDelta — added) // ═════════════════════════════════════════════════════════════════ describe('joinThreads — incremental adds', () => { it('propagates new applog from one parent', () => { const a = threadFrom(makeLogs({ en: 'e1', at: 'x', vl: 1 }), 'A') const b = threadFrom([], 'B') const joined = joinThreads([a, b]) expect(joined.size).toBe(1) // Insert into B const newLog = makeLog({ en: 'e2', at: 'x', vl: 2 }) b.insertRaw([newLog]) expect(joined.size).toBe(2) expect(joined.applogs).toContain(newLog) }) it('deduplicates when same log is added to second parent', () => { const shared = makeLog({ en: 'e1', at: 'x', vl: 1 }) const a = threadFrom([shared], 'A') const b = threadFrom([], 'B') const joined = joinThreads([a, b]) expect(joined.size).toBe(1) // Insert the same log into B — should not duplicate b.insertRaw([shared]) expect(joined.size).toBe(1) }) it('notifies subscribers on add', () => { const a = threadFrom([], 'A') const b = threadFrom([], 'B') const joined = joinThreads([a, b]) const events: ThreadEvent[] = [] joined.subscribe(e => events.push(e)) const newLog = makeLog({ en: 'e1', at: 'x', vl: 1 }) a.insertRaw([newLog]) expect(events).toHaveLength(1) expect(isInitEvent(events[0])).toBe(false) if (!isInitEvent(events[0])) { expect(events[0].added).toContain(newLog) } }) it('inserts maintain sort order', () => { const early = makeLog({ en: 'e1', at: 'x', vl: 1, ts: '2020-01-01T00:00:00Z' }) const a = threadFrom([early], 'A') const b = threadFrom([], 'B') const joined = joinThreads([a, b]) const late = makeLog({ en: 'e2', at: 'x', vl: 2, ts: '2025-01-01T00:00:00Z' }) b.insertRaw([late]) expect(joined.applogs[0]).toBe(early) expect(joined.applogs[1]).toBe(late) // Now insert something in between const mid = makeLog({ en: 'e3', at: 'x', vl: 3, ts: '2022-06-01T00:00:00Z' }) a.insertRaw([mid]) expect(joined.applogs[0]).toBe(early) expect(joined.applogs[1]).toBe(mid) expect(joined.applogs[2]).toBe(late) }) }) // ═════════════════════════════════════════════════════════════════ // setParents — swap input threads without new result reference // ═════════════════════════════════════════════════════════════════ describe('joinThreads — setParents', () => { it('replaces parents and recomputes', () => { const a = threadFrom(makeLogs({ en: 'e1', at: 'x', vl: 1 }), 'A') const b = threadFrom(makeLogs({ en: 'e2', at: 'x', vl: 2 }), 'B') const joined = joinThreads([a, b]) expect(joined.size).toBe(2) // Swap to a completely different set of parents const c = threadFrom(makeLogs({ en: 'e3', at: 'x', vl: 3 }), 'C') joined.setParents([c]) expect(joined.size).toBe(1) expect(joined.applogs[0].vl).toBe(3) }) it('notifies subscribers with init event on setParents', () => { const a = threadFrom(makeLogs({ en: 'e1', at: 'x', vl: 1 }), 'A') const joined = joinThreads([a]) const events: ThreadEvent[] = [] joined.subscribe(e => events.push(e)) const b = threadFrom(makeLogs({ en: 'e2', at: 'x', vl: 2 }), 'B') joined.setParents([b]) expect(events).toHaveLength(1) expect(isInitEvent(events[0])).toBe(true) }) it('subscribes to new parents after setParents', () => { const a = threadFrom(makeLogs({ en: 'e1', at: 'x', vl: 1 }), 'A') const joined = joinThreads([a]) const b = threadFrom([], 'B') joined.setParents([b]) expect(joined.size).toBe(0) // Insert into new parent — should propagate const newLog = makeLog({ en: 'e2', at: 'x', vl: 2 }) b.insertRaw([newLog]) expect(joined.size).toBe(1) expect(joined.applogs).toContain(newLog) }) it('unsubscribes from old parents after setParents', () => { const a = threadFrom([], 'A') const joined = joinThreads([a]) const events: ThreadEvent[] = [] joined.subscribe(e => events.push(e)) const b = threadFrom([], 'B') joined.setParents([b]) // Clear the init event from setParents events.length = 0 // Insert into OLD parent — should NOT propagate a.insertRaw([makeLog({ en: 'e1', at: 'x', vl: 1 })]) expect(events).toHaveLength(0) expect(joined.size).toBe(0) }) }) // ═════════════════════════════════════════════════════════════════ // triggerRemap — full recompute // ═════════════════════════════════════════════════════════════════ describe('joinThreads — triggerRemap', () => { it('recomputes from current parents', () => { const a = threadFrom(makeLogs({ en: 'e1', at: 'x', vl: 1 }), 'A') const b = threadFrom(makeLogs({ en: 'e2', at: 'x', vl: 2 }), 'B') const joined = joinThreads([a, b]) expect(joined.size).toBe(2) joined.triggerRemap() expect(joined.size).toBe(2) }) it('emits init event to subscribers', () => { const a = threadFrom(makeLogs({ en: 'e1', at: 'x', vl: 1 }), 'A') const joined = joinThreads([a]) const events: ThreadEvent[] = [] joined.subscribe(e => events.push(e)) joined.triggerRemap() expect(events).toHaveLength(1) expect(isInitEvent(events[0])).toBe(true) if (isInitEvent(events[0])) { expect(events[0].init).toHaveLength(1) } }) }) // ═════════════════════════════════════════════════════════════════ // dispose // ═════════════════════════════════════════════════════════════════ describe('joinThreads — dispose', () => { it('stops receiving parent updates after dispose', () => { const a = threadFrom([], 'A') const joined = joinThreads([a]) const events: ThreadEvent[] = [] joined.subscribe(e => events.push(e)) joined.dispose() a.insertRaw([makeLog({ en: 'e1', at: 'x', vl: 1 })]) // No events should have been received expect(events).toHaveLength(0) }) })