{"version":3,"file":"messages.cjs","names":["namespaceKey","MessageAssembler","isRootNamespace","reconcileMessagesFromValues","ensureMessageInstances","shouldPreferValuesMessageForToolCalls","buildMessageIndex","assembledMessageToBaseMessage","openProjectionSubscription"],"sources":["../../../src/stream/projections/messages.ts"],"sourcesContent":["/**\n * Namespace-scoped `messages` projection.\n *\n * Opens `thread.subscribe({ channels: [\"messages\"], namespaces: [ns] })`\n * and folds each `messages` event through {@link MessageAssembler}.\n * Every update — start, block delta, block finish, message finish —\n * re-derives a `BaseMessage` class instance for the currently-active\n * message and updates its slot in the store.\n *\n * The projection emits `BaseMessage[]` (class instances from\n * `@langchain/core/messages`), never plain serialized objects.\n */\nimport type {\n  MessagesEvent,\n  MessageRole,\n  MessageStartData,\n  ValuesEvent,\n} from \"@langchain/protocol\";\nimport type { BaseMessage } from \"@langchain/core/messages\";\nimport { MessageAssembler } from \"../../client/stream/messages.js\";\nimport {\n  assembledMessageToBaseMessage,\n  type ExtendedMessageRole,\n} from \"../assembled-to-message.js\";\nimport { ensureMessageInstances } from \"../../ui/messages.js\";\nimport type { Message } from \"../../types.messages.js\";\nimport type { ProjectionSpec, ProjectionRuntime } from \"../types.js\";\nimport { isRootNamespace, namespaceKey } from \"../namespace.js\";\nimport {\n  buildMessageIndex,\n  reconcileMessagesFromValues,\n  shouldPreferValuesMessageForToolCalls,\n} from \"../message-reconciliation.js\";\nimport { openProjectionSubscription } from \"./runtime.js\";\n\nexport function messagesProjection(\n  namespace: readonly string[]\n): ProjectionSpec<BaseMessage[]> {\n  const ns = [...namespace];\n  const key = `messages|${namespaceKey(ns)}`;\n\n  return {\n    key,\n    namespace: ns,\n    initial: [],\n    open({ thread, store, rootBus }): ProjectionRuntime {\n      const assembler = new MessageAssembler();\n      // Per-messageId state needed for BaseMessage projection:\n      //  - `role` is only in the `message-start` event; we cache it\n      //    so subsequent delta events still produce a typed message.\n      //  - `toolCallId` is pulled from message-start extras when role\n      //    is `tool` (a convention we keep compatible with serialized\n      //    v1 tool messages).\n      const roleByKey = new Map<\n        string,\n        { role: ExtendedMessageRole; toolCallId?: string }\n      >();\n      const indexById = new Map<string, number>();\n      // Ids this projection has observed via the `messages` channel\n      // (token-level deltas). Used by `applyValuesEvent` to prefer the\n      // stream-assembled version over the values-coerced shape while a\n      // turn is streaming, matching the root controller's policy.\n      const streamMessageIds = new Set<string>();\n      // Ids observed in the most recent `values.messages` snapshot.\n      // Messages that were present in a prior snapshot but are absent\n      // from this one are treated as explicit removals (server-side\n      // `RemoveMessage` reducer deltas). Stream-only messages (seen on\n      // the messages channel but never echoed in a values snapshot)\n      // are preserved — their enclosing superstep may simply not have\n      // committed yet.\n      let valuesMessageIds = new Set<string>();\n\n      // Root-scoped projections whose channels are already covered by\n      // the controller's root pump attach to the shared fan-out\n      // instead of opening a second server subscription. The root\n      // pump runs at `{namespaces: [[]], depth: 1}`, which is exactly\n      // the scope a root-namespace `messagesProjection` wants.\n      const rootShortCircuit =\n        isRootNamespace(ns) && rootBus.channels.includes(\"messages\");\n\n      if (rootShortCircuit) {\n        const unsubscribe = rootBus.subscribe((event) => {\n          if (event.method !== \"messages\") return;\n          if (!isRootNamespace(event.params.namespace)) return;\n          applyEvent(event as MessagesEvent);\n        });\n        return {\n          dispose() {\n            unsubscribe();\n          },\n        };\n      }\n\n      let disposed = false;\n\n      // Local mirror of the store contents. Every `applyEvent` /\n      // `applyValuesEvent` mutates this synchronously; a coalesced\n      // `scheduleFlush` copies it to `store` once per macrotask.\n      //\n      // Why the indirection? When a namespace-scoped projection\n      // (e.g. a subagent modal opened after the run finished) first\n      // subscribes, the server replays the entire history from\n      // `seq=0`. Dozens of `messages`-channel events can land in a\n      // single SSE parse — they drain through the `for await` loop\n      // as a long microtask chain. Microtasks run before any\n      // macrotask, so React's concurrent scheduler never gets a\n      // chance to commit between updates. Calling `store.setValue`\n      // per event in that burst overflows React's\n      // `nestedUpdateCount` guard and throws \"Maximum update depth\n      // exceeded\", permanently killing the projection and\n      // leaving the store stuck at its first few messages.\n      //\n      // Batching via `MessageChannel` (macrotask) coalesces the\n      // replay burst into one `setValue` call and lets React\n      // commit between flushes for live token streaming too.\n      const pendingMessages: BaseMessage[] = [];\n      let dirty = false;\n      let flushScheduled = false;\n      const flushChannel =\n        typeof MessageChannel !== \"undefined\" ? new MessageChannel() : null;\n\n      const flush = (): void => {\n        flushScheduled = false;\n        if (!dirty || disposed) return;\n        dirty = false;\n        // `.slice()` breaks identity so React's `Object.is` bail-out\n        // in `StreamStore.setValue` propagates the change.\n        store.setValue(pendingMessages.slice());\n      };\n      if (flushChannel != null) {\n        flushChannel.port1.onmessage = flush;\n      }\n\n      const scheduleFlush = (): void => {\n        dirty = true;\n        if (flushScheduled) return;\n        flushScheduled = true;\n        if (flushChannel != null) {\n          flushChannel.port2.postMessage(null);\n        } else {\n          setTimeout(flush, 0);\n        }\n      };\n\n      // Rebuild the store from `values.messages` snapshots.\n      //\n      // `values` events carry the full, committed state of the\n      // thread's `messages` channel at a checkpoint — they fire\n      // on node completion, AFTER every `messages`-channel delta\n      // for that turn has been emitted. They are the authoritative\n      // source of truth for ORDER and for non-streamed messages\n      // (human turns, serialised tool results, subagent echoes, …).\n      //\n      // Why rebuild rather than merge-by-id?\n      //\n      // In practice the server may emit the same logical message\n      // with DIFFERENT ids across successive `values` snapshots at\n      // the same namespace — e.g. a subagent first surfaces its\n      // seed prompt with a synthetic id like\n      // `subagent:<tool_call_id>:human`, then a later superstep\n      // echoes the same prompt back with a real UUID (or vice\n      // versa). A naive \"match-or-append by id\" strategy treats\n      // each fresh id as a new entry and the list grows\n      // monotonically, showing the same content twice (or more)\n      // in the UI.\n      //\n      // Policy (mirrors the root controller's `#applyValues`):\n      //\n      //  1. Walk `values.messages` in order. For each id, prefer\n      //     the stream-assembled entry if we have one for that id\n      //     (keeps in-progress token streaming visible); otherwise\n      //     take the values-coerced instance. This self-heals the\n      //     two classes of glitch the old merge-by-id handler\n      //     targeted:\n      //       - tool messages arriving without `tool_call_id` on\n      //         the messages channel — the values snapshot always\n      //         carries it;\n      //       - AI messages whose finalized `tool_calls` didn't\n      //         fully land via the messages channel — the values\n      //         snapshot's AI message has them populated.\n      //\n      //  2. Append any stream-only ids (seen on the messages\n      //     channel but never echoed in ANY values snapshot yet)\n      //     — their enclosing superstep hasn't committed yet, so\n      //     dropping them would flash the UI.\n      //\n      //  3. Ids that WERE in a prior values snapshot but are gone\n      //     from this one are treated as explicit removals\n      //     (`RemoveMessage` reducer deltas) and dropped.\n      //\n      // Unkeyed messages (no stable id) are passed through in\n      // their values order because we can't dedupe them safely.\n      const applyValuesEvent = (event: ValuesEvent): void => {\n        const data = event.params.data;\n        if (data == null || typeof data !== \"object\" || Array.isArray(data)) {\n          return;\n        }\n        const state = data as Record<string, unknown>;\n        const rawMessages = state.messages;\n        if (!Array.isArray(rawMessages) || rawMessages.length === 0) return;\n\n        const coerced = ensureMessageInstances(\n          rawMessages as (Message | BaseMessage)[]\n        );\n\n        const reconciliation = reconcileMessagesFromValues({\n          valueMessages: coerced,\n          currentMessages: pendingMessages,\n          currentIndexById: indexById,\n          previousValueMessageIds: valuesMessageIds,\n          streamedMessageIds: streamMessageIds,\n          preferValuesMessage: shouldPreferValuesMessageForToolCalls,\n        });\n        valuesMessageIds = reconciliation.valueMessageIds;\n        const reconciledMessages = [...reconciliation.messages];\n\n        pendingMessages.length = 0;\n        for (const message of reconciledMessages) pendingMessages.push(message);\n        indexById.clear();\n        for (const [id, idx] of buildMessageIndex(pendingMessages)) {\n          indexById.set(id, idx);\n        }\n        scheduleFlush();\n      };\n\n      const applyEvent = (event: MessagesEvent): void => {\n        const data = event.params.data;\n\n        if (data.event === \"message-start\") {\n          const startData = data as MessageStartData;\n          const role = (startData.role ?? \"ai\") as MessageRole;\n          // \"tool\" role is a v1 convention not represented in the\n          // protocol enum but common in practice — keep it working\n          // for graphs that emit it as an extensible field.\n          const extendedRole =\n            (startData as { role?: ExtendedMessageRole }).role ?? role;\n          const maybeToolCallId = (startData as { tool_call_id?: string })\n            .tool_call_id;\n          if (startData.id != null) {\n            roleByKey.set(startData.id, {\n              role: extendedRole,\n              toolCallId: maybeToolCallId,\n            });\n          }\n        }\n\n        const update = assembler.consume(event);\n        if (update == null) return;\n        const msg = update.message;\n        const id = msg.id;\n        if (id == null) return;\n        const captured = roleByKey.get(id) ?? { role: \"ai\" as const };\n        const base = assembledMessageToBaseMessage(msg, captured.role, {\n          toolCallId: captured.toolCallId,\n        });\n\n        streamMessageIds.add(id);\n        const existingIdx = indexById.get(id);\n        if (existingIdx == null) {\n          indexById.set(id, pendingMessages.length);\n          pendingMessages.push(base);\n        } else {\n          pendingMessages[existingIdx] = base;\n        }\n        scheduleFlush();\n      };\n\n      const runtime = openProjectionSubscription({\n        thread,\n        // Subscribe to both `messages` (live token deltas that drive\n        // the in-flight assistant bubble) and `values` (periodic full-\n        // state snapshots). Consuming values lets late-mounted scoped\n        // projections backfill history after the run has finished.\n        channels: [\"messages\", \"values\"],\n        namespace: ns,\n        onEvent(event) {\n          if (event.method === \"messages\") {\n            applyEvent(event as MessagesEvent);\n          } else if (event.method === \"values\") {\n            applyValuesEvent(event as ValuesEvent);\n          }\n        },\n      });\n\n      return {\n        async dispose() {\n          disposed = true;\n          if (flushChannel != null) {\n            flushChannel.port1.onmessage = null;\n            flushChannel.port1.close();\n            flushChannel.port2.close();\n          }\n          await runtime.dispose();\n        },\n      };\n    },\n  };\n}\n"],"mappings":";;;;;;;AAmCA,SAAgB,mBACd,WAC+B;CAC/B,MAAM,KAAK,CAAC,GAAG,UAAU;AAGzB,QAAO;EACL,KAHU,YAAYA,kBAAAA,aAAa,GAAG;EAItC,WAAW;EACX,SAAS,EAAE;EACX,KAAK,EAAE,QAAQ,OAAO,WAA8B;GAClD,MAAM,YAAY,IAAIC,mBAAAA,kBAAkB;GAOxC,MAAM,4BAAY,IAAI,KAGnB;GACH,MAAM,4BAAY,IAAI,KAAqB;GAK3C,MAAM,mCAAmB,IAAI,KAAa;GAQ1C,IAAI,mCAAmB,IAAI,KAAa;AAUxC,OAFEC,kBAAAA,gBAAgB,GAAG,IAAI,QAAQ,SAAS,SAAS,WAAW,EAExC;IACpB,MAAM,cAAc,QAAQ,WAAW,UAAU;AAC/C,SAAI,MAAM,WAAW,WAAY;AACjC,SAAI,CAACA,kBAAAA,gBAAgB,MAAM,OAAO,UAAU,CAAE;AAC9C,gBAAW,MAAuB;MAClC;AACF,WAAO,EACL,UAAU;AACR,kBAAa;OAEhB;;GAGH,IAAI,WAAW;GAsBf,MAAM,kBAAiC,EAAE;GACzC,IAAI,QAAQ;GACZ,IAAI,iBAAiB;GACrB,MAAM,eACJ,OAAO,mBAAmB,cAAc,IAAI,gBAAgB,GAAG;GAEjE,MAAM,cAAoB;AACxB,qBAAiB;AACjB,QAAI,CAAC,SAAS,SAAU;AACxB,YAAQ;AAGR,UAAM,SAAS,gBAAgB,OAAO,CAAC;;AAEzC,OAAI,gBAAgB,KAClB,cAAa,MAAM,YAAY;GAGjC,MAAM,sBAA4B;AAChC,YAAQ;AACR,QAAI,eAAgB;AACpB,qBAAiB;AACjB,QAAI,gBAAgB,KAClB,cAAa,MAAM,YAAY,KAAK;QAEpC,YAAW,OAAO,EAAE;;GAoDxB,MAAM,oBAAoB,UAA6B;IACrD,MAAM,OAAO,MAAM,OAAO;AAC1B,QAAI,QAAQ,QAAQ,OAAO,SAAS,YAAY,MAAM,QAAQ,KAAK,CACjE;IAGF,MAAM,cADQ,KACY;AAC1B,QAAI,CAAC,MAAM,QAAQ,YAAY,IAAI,YAAY,WAAW,EAAG;IAM7D,MAAM,iBAAiBC,+BAAAA,4BAA4B;KACjD,eALcC,iBAAAA,uBACd,YACD;KAIC,iBAAiB;KACjB,kBAAkB;KAClB,yBAAyB;KACzB,oBAAoB;KACpB,qBAAqBC,+BAAAA;KACtB,CAAC;AACF,uBAAmB,eAAe;IAClC,MAAM,qBAAqB,CAAC,GAAG,eAAe,SAAS;AAEvD,oBAAgB,SAAS;AACzB,SAAK,MAAM,WAAW,mBAAoB,iBAAgB,KAAK,QAAQ;AACvE,cAAU,OAAO;AACjB,SAAK,MAAM,CAAC,IAAI,QAAQC,+BAAAA,kBAAkB,gBAAgB,CACxD,WAAU,IAAI,IAAI,IAAI;AAExB,mBAAe;;GAGjB,MAAM,cAAc,UAA+B;IACjD,MAAM,OAAO,MAAM,OAAO;AAE1B,QAAI,KAAK,UAAU,iBAAiB;KAClC,MAAM,YAAY;KAClB,MAAM,OAAQ,UAAU,QAAQ;KAIhC,MAAM,eACH,UAA6C,QAAQ;KACxD,MAAM,kBAAmB,UACtB;AACH,SAAI,UAAU,MAAM,KAClB,WAAU,IAAI,UAAU,IAAI;MAC1B,MAAM;MACN,YAAY;MACb,CAAC;;IAIN,MAAM,SAAS,UAAU,QAAQ,MAAM;AACvC,QAAI,UAAU,KAAM;IACpB,MAAM,MAAM,OAAO;IACnB,MAAM,KAAK,IAAI;AACf,QAAI,MAAM,KAAM;IAChB,MAAM,WAAW,UAAU,IAAI,GAAG,IAAI,EAAE,MAAM,MAAe;IAC7D,MAAM,OAAOC,6BAAAA,8BAA8B,KAAK,SAAS,MAAM,EAC7D,YAAY,SAAS,YACtB,CAAC;AAEF,qBAAiB,IAAI,GAAG;IACxB,MAAM,cAAc,UAAU,IAAI,GAAG;AACrC,QAAI,eAAe,MAAM;AACvB,eAAU,IAAI,IAAI,gBAAgB,OAAO;AACzC,qBAAgB,KAAK,KAAK;UAE1B,iBAAgB,eAAe;AAEjC,mBAAe;;GAGjB,MAAM,UAAUC,gBAAAA,2BAA2B;IACzC;IAKA,UAAU,CAAC,YAAY,SAAS;IAChC,WAAW;IACX,QAAQ,OAAO;AACb,SAAI,MAAM,WAAW,WACnB,YAAW,MAAuB;cACzB,MAAM,WAAW,SAC1B,kBAAiB,MAAqB;;IAG3C,CAAC;AAEF,UAAO,EACL,MAAM,UAAU;AACd,eAAW;AACX,QAAI,gBAAgB,MAAM;AACxB,kBAAa,MAAM,YAAY;AAC/B,kBAAa,MAAM,OAAO;AAC1B,kBAAa,MAAM,OAAO;;AAE5B,UAAM,QAAQ,SAAS;MAE1B;;EAEJ"}