{"version":3,"file":"channel.cjs","names":["namespaceKey","isRootNamespace","openProjectionSubscription"],"sources":["../../../src/stream/projections/channel.ts"],"sourcesContent":["/**\n * Raw channel escape hatch.\n *\n * Subscribes to an arbitrary list of channels at an arbitrary\n * namespace and retains a bounded buffer of events. Consumers that\n * need assembly semantics should use `messagesProjection`,\n * `toolCallsProjection`, etc. instead; this one is for inspection,\n * custom reducers, or niche use-cases.\n */\nimport type { Channel, Event } from \"@langchain/protocol\";\nimport type { ProjectionSpec, ProjectionRuntime } from \"../types.js\";\nimport { isRootNamespace, namespaceKey } from \"../namespace.js\";\nimport { openProjectionSubscription } from \"./runtime.js\";\n\n/** Max events retained per raw subscription. Older events are dropped. */\nconst DEFAULT_BUFFER = 4096;\n\nexport interface ChannelProjectionOptions {\n  /**\n   * Maximum number of events retained in the projection snapshot.\n   * Defaults to 4096 so replay-backed discovery hooks can tolerate\n   * bursty token/media streams without dropping early lifecycle events.\n   */\n  bufferSize?: number;\n  /**\n   * Whether to open a real subscription and receive replayed history.\n   * Defaults to true. Set false for live-only root-bus inspection when\n   * replay is unnecessary.\n   */\n  replay?: boolean;\n}\n\nexport function channelProjection(\n  channels: readonly Channel[],\n  namespace: readonly string[],\n  options: ChannelProjectionOptions = {}\n): ProjectionSpec<Event[]> {\n  const chs = [...channels].sort();\n  const ns = [...namespace];\n  const bufferSize = options.bufferSize ?? DEFAULT_BUFFER;\n  const replay = options.replay ?? true;\n  const key = `channel|${bufferSize}|${replay ? \"replay\" : \"live\"}|${chs.join(\",\")}|${namespaceKey(ns)}`;\n\n  return {\n    key,\n    namespace: ns,\n    initial: [],\n    open({ thread, store, rootBus }): ProjectionRuntime {\n      // If this projection is scoped to the root namespace AND every\n      // requested channel is already covered by the controller's root\n      // pump, attach to the shared fan-out instead of opening a\n      // second server subscription. This is the common case for\n      // lightweight event-trace / debug panels.\n      const covered =\n        !replay &&\n        isRootNamespace(ns) &&\n        chs.every((c) => rootBus.channels.includes(c));\n\n      if (covered) {\n        const requestedSet = new Set(chs as Channel[]);\n        // Pre-compute `custom:<name>` sub-filters so incoming events\n        // can be matched in O(1). The server delivers named custom\n        // events as `{ method: \"custom\", params: { data: { name } } }`,\n        // so matching purely on `event.method` would miss them — we\n        // need to peek at `data.name` when the caller asked for a\n        // specific `custom:<name>` channel.\n        const namedCustom = new Set<string>();\n        for (const channel of chs) {\n          if (channel.startsWith(\"custom:\")) {\n            namedCustom.add(channel.slice(\"custom:\".length));\n          }\n        }\n        const matches = (event: Event): boolean => {\n          if (requestedSet.has(event.method as Channel)) return true;\n          if (event.method !== \"custom\" || namedCustom.size === 0) {\n            return false;\n          }\n          const data = (event.params as Record<string, unknown>).data as\n            | { name?: unknown }\n            | undefined;\n          return typeof data?.name === \"string\" && namedCustom.has(data.name);\n        };\n        const push = (event: Event): void => {\n          if (!matches(event)) return;\n          const current = store.getSnapshot();\n          const next =\n            current.length >= bufferSize\n              ? [...current.slice(current.length - bufferSize + 1), event]\n              : [...current, event];\n          store.setValue(next);\n        };\n        const unsubscribe = rootBus.subscribe(push);\n        return {\n          dispose() {\n            unsubscribe();\n          },\n        };\n      }\n\n      return openProjectionSubscription({\n        thread,\n        channels: chs as Channel[],\n        namespace: ns,\n        onEvent(event) {\n          const current = store.getSnapshot();\n          const next =\n            current.length >= bufferSize\n              ? [...current.slice(current.length - bufferSize + 1), event]\n              : [...current, event];\n          store.setValue(next);\n        },\n      });\n    },\n  };\n}\n"],"mappings":";;;;AAeA,MAAM,iBAAiB;AAiBvB,SAAgB,kBACd,UACA,WACA,UAAoC,EAAE,EACb;CACzB,MAAM,MAAM,CAAC,GAAG,SAAS,CAAC,MAAM;CAChC,MAAM,KAAK,CAAC,GAAG,UAAU;CACzB,MAAM,aAAa,QAAQ,cAAc;CACzC,MAAM,SAAS,QAAQ,UAAU;AAGjC,QAAO;EACL,KAHU,WAAW,WAAW,GAAG,SAAS,WAAW,OAAO,GAAG,IAAI,KAAK,IAAI,CAAC,GAAGA,kBAAAA,aAAa,GAAG;EAIlG,WAAW;EACX,SAAS,EAAE;EACX,KAAK,EAAE,QAAQ,OAAO,WAA8B;AAWlD,OAJE,CAAC,UACDC,kBAAAA,gBAAgB,GAAG,IACnB,IAAI,OAAO,MAAM,QAAQ,SAAS,SAAS,EAAE,CAAC,EAEnC;IACX,MAAM,eAAe,IAAI,IAAI,IAAiB;IAO9C,MAAM,8BAAc,IAAI,KAAa;AACrC,SAAK,MAAM,WAAW,IACpB,KAAI,QAAQ,WAAW,UAAU,CAC/B,aAAY,IAAI,QAAQ,MAAM,EAAiB,CAAC;IAGpD,MAAM,WAAW,UAA0B;AACzC,SAAI,aAAa,IAAI,MAAM,OAAkB,CAAE,QAAO;AACtD,SAAI,MAAM,WAAW,YAAY,YAAY,SAAS,EACpD,QAAO;KAET,MAAM,OAAQ,MAAM,OAAmC;AAGvD,YAAO,OAAO,MAAM,SAAS,YAAY,YAAY,IAAI,KAAK,KAAK;;IAErE,MAAM,QAAQ,UAAuB;AACnC,SAAI,CAAC,QAAQ,MAAM,CAAE;KACrB,MAAM,UAAU,MAAM,aAAa;KACnC,MAAM,OACJ,QAAQ,UAAU,aACd,CAAC,GAAG,QAAQ,MAAM,QAAQ,SAAS,aAAa,EAAE,EAAE,MAAM,GAC1D,CAAC,GAAG,SAAS,MAAM;AACzB,WAAM,SAAS,KAAK;;IAEtB,MAAM,cAAc,QAAQ,UAAU,KAAK;AAC3C,WAAO,EACL,UAAU;AACR,kBAAa;OAEhB;;AAGH,UAAOC,gBAAAA,2BAA2B;IAChC;IACA,UAAU;IACV,WAAW;IACX,QAAQ,OAAO;KACb,MAAM,UAAU,MAAM,aAAa;KACnC,MAAM,OACJ,QAAQ,UAAU,aACd,CAAC,GAAG,QAAQ,MAAM,QAAQ,SAAS,aAAa,EAAE,EAAE,MAAM,GAC1D,CAAC,GAAG,SAAS,MAAM;AACzB,WAAM,SAAS,KAAK;;IAEvB,CAAC;;EAEL"}