/**
* Client-side helpers for progressive Brain graph loading.
*
* Consumed by `src/routes/brain/+page.svelte` (Agent E's shell) from
* `onMount` to fetch tier-1 and tier-2 node batches after the tier-0
* first-paint payload is rendered.
*
* ## Usage (Agent E integration contract)
*
* ```svelte
*
* ```
*
* @module
* @task T990
*/
import type { BrainEdge, BrainGraph, BrainNode } from '@cleocode/brain';
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
/** Callbacks invoked by {@link streamRemainingNodes} as data arrives. */
export interface StreamCallbacks {
/**
* Called with each batch of newly-arrived nodes and edges.
* Merge these into your reactive graph state.
*/
onNodes: (nodes: BrainNode[], edges: BrainEdge[]) => void;
/**
* Called when a full tier has finished loading.
*
* @param tier - The tier that just completed (1 or 2).
* @param totalNodes - Total nodes received in this tier.
* @param elapsedMs - Server-reported duration in milliseconds.
*/
onTierComplete: (tier: 1 | 2, totalNodes: number, elapsedMs: number) => void;
/**
* Called if any network or parse error occurs.
* The stream is abandoned after the first error.
*/
onError: (error: unknown) => void;
}
/** Options for {@link streamRemainingNodes}. */
export interface StreamOptions extends StreamCallbacks {
/** The tier-0 graph already rendered — used to check whether more nodes exist. */
currentGraph: BrainGraph;
/**
* Whether to also fetch tier 2 after tier 1 completes.
* Defaults to `true` when `currentGraph.truncated` is `true`.
*/
fetchTier2?: boolean;
/**
* Comma-separated substrate filter forwarded to the chunks endpoint.
* Omit to fetch all substrates.
*/
substrates?: string;
/** Minimum weight filter forwarded to the chunks endpoint. */
minWeight?: number;
}
/** Return handle from {@link streamRemainingNodes}. */
export interface StreamHandle {
/** Abort the in-progress stream fetch. */
abort: () => void;
}
// ---------------------------------------------------------------------------
// NDJSON chunk types (wire format from /api/brain/chunks)
// ---------------------------------------------------------------------------
interface ChunkEvent {
kind: 'chunk';
tier: 1 | 2;
nodes: BrainNode[];
edges: BrainEdge[];
truncated: boolean;
}
interface DoneEvent {
kind: 'done';
tier: 1 | 2;
totalNodes: number;
elapsed: number;
}
interface ErrorEvent {
kind: 'error';
message: string;
}
type ChunkLine = ChunkEvent | DoneEvent | ErrorEvent;
// ---------------------------------------------------------------------------
// Streaming implementation
// ---------------------------------------------------------------------------
/**
* Fetches tier-1 (and optionally tier-2) brain graph data from
* `/api/brain/chunks` and merges it into the caller's reactive graph.
*
* Each response is a newline-delimited JSON (NDJSON) stream.
* The function reads the stream line-by-line and fires callbacks as
* data arrives so the renderer can append nodes without waiting for
* the full response.
*
* @param opts - Configuration and callback handlers.
* @returns A {@link StreamHandle} that can be used to abort the fetch.
*/
export function streamRemainingNodes(opts: StreamOptions): StreamHandle {
const controller = new AbortController();
const { signal } = controller;
// Start tier-1 fetch; chain tier-2 if needed.
void fetchTier(1, opts, signal).then((shouldFetchTier2) => {
if (!signal.aborted && shouldFetchTier2 && (opts.fetchTier2 ?? opts.currentGraph.truncated)) {
return fetchTier(2, opts, signal);
}
});
return {
abort: () => controller.abort(),
};
}
/**
* Fetches a single tier from `/api/brain/chunks` and processes the NDJSON stream.
*
* @param tier - Tier to fetch (1 or 2).
* @param opts - Stream options (callbacks, filter params).
* @param signal - AbortSignal to cancel the request.
* @returns Promise resolving to `true` when tier-2 should follow, `false` otherwise.
*/
async function fetchTier(tier: 1 | 2, opts: StreamOptions, signal: AbortSignal): Promise {
const params = new URLSearchParams({ tier: String(tier) });
if (opts.substrates) params.set('substrates', opts.substrates);
if (opts.minWeight !== undefined) params.set('min_weight', String(opts.minWeight));
let response: Response;
try {
response = await fetch(`/api/brain/chunks?${params.toString()}`, { signal });
} catch (err) {
if (!signal.aborted) opts.onError(err);
return false;
}
if (!response.ok || !response.body) {
opts.onError(new Error(`HTTP ${response.status} fetching tier-${tier}`));
return false;
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let hadNodes = false;
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Process all complete lines in the buffer.
const lines = buffer.split('\n');
// Keep the last (possibly incomplete) line in the buffer.
buffer = lines.pop() ?? '';
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
let parsed: ChunkLine;
try {
parsed = JSON.parse(trimmed) as ChunkLine;
} catch {
// Malformed line — skip.
continue;
}
if (parsed.kind === 'error') {
opts.onError(new Error(parsed.message));
return false;
}
if (parsed.kind === 'chunk') {
if (parsed.nodes.length > 0 || parsed.edges.length > 0) {
opts.onNodes(parsed.nodes, parsed.edges);
hadNodes = true;
}
}
if (parsed.kind === 'done') {
opts.onTierComplete(tier, parsed.totalNodes, parsed.elapsed);
// tier-2 follow-up depends on whether tier-1 was truncated.
return tier === 1 && parsed.totalNodes >= 1000;
}
}
}
} finally {
reader.releaseLock();
}
return hadNodes && tier === 1;
}
// ---------------------------------------------------------------------------
// Warmup progress signal
// ---------------------------------------------------------------------------
/** Subscriber callback type. */
type ProgressSubscriber = (value: number) => void;
/** A minimal reactive signal for warmup progress (0–1). */
export interface WarmupProgressSignal {
/** Subscribe to progress updates. Returns an unsubscribe function. */
subscribe: (fn: ProgressSubscriber) => () => void;
/** Mark warmup as complete (sets progress to 1.0). */
markComplete: () => void;
/** Read the current progress value without subscribing. */
current: () => number;
}
/**
* Creates a simple pub/sub warmup progress signal.
*
* Agent E wires this into a `$state` variable and optionally into a
* progress bar component. The signal starts at `0` (tier-0 rendered)
* and progresses to `1.0` when all tiers have loaded.
*
* Expected milestone values (suggested — Agent E may override):
* - `0` — tier-0 rendered (initial state)
* - `0.6` — tier-1 complete
* - `1.0` — tier-2 complete (or markComplete called)
*
* @returns {@link WarmupProgressSignal}
*/
export function createWarmupProgressSignal(): WarmupProgressSignal {
let _value = 0;
const _subscribers = new Set();
function notify(): void {
for (const fn of _subscribers) {
fn(_value);
}
}
return {
subscribe(fn) {
_subscribers.add(fn);
fn(_value); // fire immediately with current value
return () => {
_subscribers.delete(fn);
};
},
markComplete() {
_value = 1.0;
notify();
},
current() {
return _value;
},
};
}