{"version":3,"file":"stream-channel.cjs","names":["#items","#wake","#onPush","#done","#error","#waiters"],"sources":["../../src/stream/stream-channel.ts"],"sourcesContent":["/**\n * StreamChannel — projection channel for local or remote streaming.\n *\n * A `StreamChannel<T>` is an append-only async stream with independent\n * cursors. Local channels stay in-process only. Remote channels declare a\n * protocol channel name; when registered with a {@link StreamMux} (via a\n * transformer's `init()` return value), every {@link push} is automatically\n * forwarded as a {@link ProtocolEvent} on `custom:<channelName>` — making the\n * data available both in-process (via `run.extensions`) and to remote clients\n * (via `session.subscribe(\"custom:<channelName>\")`).\n *\n * Lifecycle (`close` / `fail`) is managed by the mux automatically;\n * transformers do not need to call them.\n */\n\n/**\n * Branded symbol placed on every {@link StreamChannel} instance.\n *\n * Uses `Symbol.for` so the same symbol is shared across multiple\n * copies of this package that may coexist in a dependency graph\n * (e.g. when a user app imports `@langchain/langgraph` directly and a\n * wrapping library like `langchain` bundles its own copy). Using a\n * symbol brand instead of `instanceof` lets channels created against\n * one copy of the class be recognised by a mux from another.\n * @internal\n */\nexport const STREAM_CHANNEL_BRAND: unique symbol = Symbol.for(\n  \"langgraph.stream_channel\"\n) as typeof STREAM_CHANNEL_BRAND;\n\nexport interface StreamChannelEventStreamOptions<T> {\n  /**\n   * SSE event name. Defaults to the channel's remote protocol name, if any.\n   * Set this for local channels or when exposing the same channel under a\n   * route-specific event name.\n   */\n  event?: string;\n  /**\n   * Cursor position to start streaming from. Useful for reconnects or\n   * secondary subscribers that already consumed the first N buffered items and\n   * only need replay from a known offset.\n   */\n  startAt?: number;\n  /**\n   * Serialize each item into the SSE `data:` field. Defaults to JSON. Use this\n   * when a channel item needs a wire format other than its raw JSON shape, or\n   * when the consumer expects line-oriented text payloads.\n   */\n  serialize?: (item: T) => string;\n}\n\n/**\n * A projection channel for {@link StreamTransformer}s.\n *\n * Implements `AsyncIterable<T>` so it can be iterated directly by\n * in-process consumers via `run.extensions.<key>`. Channels created with\n * {@link StreamChannel.remote} or `new StreamChannel(name)` are also\n * auto-forwarded to remote clients.\n *\n * @typeParam T - The type of items pushed into the channel.\n */\nexport class StreamChannel<T> implements AsyncIterable<T> {\n  /** @internal Brand used by {@link StreamChannel.isInstance}. */\n  readonly [STREAM_CHANNEL_BRAND] = true as const;\n\n  /** Protocol channel name used for auto-forwarded events, if remote. */\n  readonly channelName?: string;\n\n  #items: T[] = [];\n  #waiters: Array<() => void> = [];\n  #done = false;\n  #error: unknown;\n  #onPush?: (item: T) => void;\n\n  constructor(name?: string) {\n    this.channelName = name;\n  }\n\n  /**\n   * Create an in-process-only channel.  Values remain available through\n   * `run.extensions.<key>` but are not forwarded to remote clients.\n   */\n  static local<T>(): StreamChannel<T> {\n    return new StreamChannel<T>();\n  }\n\n  /**\n   * Create a channel whose pushes are forwarded to remote clients under\n   * the given protocol channel name.\n   */\n  static remote<T>(name: string): StreamChannel<T> {\n    return new StreamChannel<T>(name);\n  }\n\n  /**\n   * Brand-based type guard that recognises any {@link StreamChannel}\n   * instance, even ones originating from a different copy of this\n   * package. Prefer this over `instanceof StreamChannel` when code\n   * may observe channels that were constructed elsewhere.\n   */\n  static isInstance(value: unknown): value is StreamChannel<unknown> {\n    return (\n      typeof value === \"object\" &&\n      value !== null &&\n      STREAM_CHANNEL_BRAND in value &&\n      (value as { [STREAM_CHANNEL_BRAND]: unknown })[STREAM_CHANNEL_BRAND] ===\n        true\n    );\n  }\n\n  /**\n   * Append an item to the channel.  If this is a remote channel wired to a\n   * mux, the item is also injected into the main protocol event stream under\n   * {@link channelName}.\n   */\n  push(item: T): void {\n    this.#items.push(item);\n    this.#wake();\n    this.#onPush?.(item);\n  }\n\n  /**\n   * Returns an async iterator starting at position {@link startAt}. Each call\n   * returns an independent cursor so multiple consumers can iterate the same\n   * channel concurrently.\n   */\n  iterate(startAt = 0): AsyncIterator<T> {\n    let cursor = startAt;\n    return {\n      next: async (): Promise<IteratorResult<T>> => {\n        // eslint-disable-next-line no-constant-condition\n        while (true) {\n          if (cursor < this.#items.length) {\n            return { value: this.#items[cursor++], done: false };\n          }\n          if (this.#done) {\n            if (this.#error) throw this.#error;\n            return { value: undefined as unknown as T, done: true };\n          }\n          await new Promise<void>((resolve) => this.#waiters.push(resolve));\n        }\n      },\n    };\n  }\n\n  /**\n   * Creates an {@link AsyncIterable} backed by this channel, starting from\n   * {@link startAt}.\n   */\n  toAsyncIterable(startAt = 0): AsyncIterable<T> {\n    return {\n      [Symbol.asyncIterator]: () => this.iterate(startAt),\n    };\n  }\n\n  /**\n   * Creates a web {@link ReadableStream} that emits channel items as\n   * Server-Sent Events. Useful for returning a channel directly from\n   * `new Response(channel.toEventStream())`.\n   */\n  toEventStream(\n    options: StreamChannelEventStreamOptions<T> = {}\n  ): ReadableStream<Uint8Array> {\n    const encoder = new TextEncoder();\n    const iterator = this.iterate(options.startAt);\n    const event = options.event ?? this.channelName;\n    const serialize =\n      options.serialize ?? ((item: T) => JSON.stringify(item) ?? \"null\");\n\n    return new ReadableStream<Uint8Array>({\n      async pull(controller) {\n        try {\n          const next = await iterator.next();\n          if (next.done) {\n            controller.close();\n            return;\n          }\n\n          const lines: string[] = [];\n          if (event != null) {\n            lines.push(`event: ${event}`);\n          }\n          for (const line of serialize(next.value).split(/\\r\\n|\\r|\\n/)) {\n            lines.push(`data: ${line}`);\n          }\n\n          controller.enqueue(encoder.encode(`${lines.join(\"\\n\")}\\n\\n`));\n        } catch (error) {\n          controller.error(error);\n        }\n      },\n      async cancel() {\n        await iterator.return?.();\n      },\n    });\n  }\n\n  /**\n   * Returns the item at the given zero-based index.\n   *\n   * @throws {RangeError} If the index is out of bounds.\n   */\n  get(index: number): T {\n    if (index < 0 || index >= this.#items.length) {\n      throw new RangeError(\n        `StreamChannel index ${index} out of bounds (size=${this.#items.length})`\n      );\n    }\n    return this.#items[index];\n  }\n\n  /** The number of items currently buffered in the channel. */\n  get size(): number {\n    return this.#items.length;\n  }\n\n  /** Whether the channel has been closed or failed. */\n  get done(): boolean {\n    return this.#done;\n  }\n\n  /** Mark the channel as complete after all buffered items are consumed. */\n  close(): void {\n    this.#done = true;\n    this.#wake();\n  }\n\n  /** Mark the channel as failed after all buffered items are consumed. */\n  fail(err: unknown): void {\n    this.#error = err;\n    this.#done = true;\n    this.#wake();\n  }\n\n  /** @internal Called by the mux to wire auto-forwarding. */\n  _wire(fn: (item: T) => void): void {\n    this.#onPush = fn;\n  }\n\n  /** @internal Called by the mux on normal completion. */\n  _close(): void {\n    this.close();\n  }\n\n  /** @internal Called by the mux on failure. */\n  _fail(err: unknown): void {\n    this.fail(err);\n  }\n\n  [Symbol.asyncIterator](): AsyncIterator<T> {\n    return this.iterate();\n  }\n\n  #wake(): void {\n    const waiters = this.#waiters.splice(0);\n    for (const w of waiters) w();\n  }\n}\n\n/**\n * Type guard that tests whether a value is a {@link StreamChannel}.\n *\n * Uses a symbol brand rather than `instanceof` so channels built\n * against a different copy of this package (e.g. one bundled by the\n * `langchain` umbrella package) are still recognised.\n */\nexport function isStreamChannel(\n  value: unknown\n): value is StreamChannel<unknown> {\n  return StreamChannel.isInstance(value);\n}\n"],"mappings":";;;;;;;;;;;;;;;;;;;;;;;;;;AA0BA,MAAa,uBAAsC,OAAO,IACxD,2BACD;;;;;;;;;;;AAiCD,IAAa,gBAAb,MAAa,cAA6C;;CAExD,CAAU,wBAAwB;;CAGlC;CAEA,SAAc,EAAE;CAChB,WAA8B,EAAE;CAChC,QAAQ;CACR;CACA;CAEA,YAAY,MAAe;AACzB,OAAK,cAAc;;;;;;CAOrB,OAAO,QAA6B;AAClC,SAAO,IAAI,eAAkB;;;;;;CAO/B,OAAO,OAAU,MAAgC;AAC/C,SAAO,IAAI,cAAiB,KAAK;;;;;;;;CASnC,OAAO,WAAW,OAAiD;AACjE,SACE,OAAO,UAAU,YACjB,UAAU,QACV,wBAAwB,SACvB,MAA8C,0BAC7C;;;;;;;CASN,KAAK,MAAe;AAClB,QAAA,MAAY,KAAK,KAAK;AACtB,QAAA,MAAY;AACZ,QAAA,SAAe,KAAK;;;;;;;CAQtB,QAAQ,UAAU,GAAqB;EACrC,IAAI,SAAS;AACb,SAAO,EACL,MAAM,YAAwC;AAE5C,UAAO,MAAM;AACX,QAAI,SAAS,MAAA,MAAY,OACvB,QAAO;KAAE,OAAO,MAAA,MAAY;KAAW,MAAM;KAAO;AAEtD,QAAI,MAAA,MAAY;AACd,SAAI,MAAA,MAAa,OAAM,MAAA;AACvB,YAAO;MAAE,OAAO,KAAA;MAA2B,MAAM;MAAM;;AAEzD,UAAM,IAAI,SAAe,YAAY,MAAA,QAAc,KAAK,QAAQ,CAAC;;KAGtE;;;;;;CAOH,gBAAgB,UAAU,GAAqB;AAC7C,SAAO,GACJ,OAAO,sBAAsB,KAAK,QAAQ,QAAQ,EACpD;;;;;;;CAQH,cACE,UAA8C,EAAE,EACpB;EAC5B,MAAM,UAAU,IAAI,aAAa;EACjC,MAAM,WAAW,KAAK,QAAQ,QAAQ,QAAQ;EAC9C,MAAM,QAAQ,QAAQ,SAAS,KAAK;EACpC,MAAM,YACJ,QAAQ,eAAe,SAAY,KAAK,UAAU,KAAK,IAAI;AAE7D,SAAO,IAAI,eAA2B;GACpC,MAAM,KAAK,YAAY;AACrB,QAAI;KACF,MAAM,OAAO,MAAM,SAAS,MAAM;AAClC,SAAI,KAAK,MAAM;AACb,iBAAW,OAAO;AAClB;;KAGF,MAAM,QAAkB,EAAE;AAC1B,SAAI,SAAS,KACX,OAAM,KAAK,UAAU,QAAQ;AAE/B,UAAK,MAAM,QAAQ,UAAU,KAAK,MAAM,CAAC,MAAM,aAAa,CAC1D,OAAM,KAAK,SAAS,OAAO;AAG7B,gBAAW,QAAQ,QAAQ,OAAO,GAAG,MAAM,KAAK,KAAK,CAAC,MAAM,CAAC;aACtD,OAAO;AACd,gBAAW,MAAM,MAAM;;;GAG3B,MAAM,SAAS;AACb,UAAM,SAAS,UAAU;;GAE5B,CAAC;;;;;;;CAQJ,IAAI,OAAkB;AACpB,MAAI,QAAQ,KAAK,SAAS,MAAA,MAAY,OACpC,OAAM,IAAI,WACR,uBAAuB,MAAM,uBAAuB,MAAA,MAAY,OAAO,GACxE;AAEH,SAAO,MAAA,MAAY;;;CAIrB,IAAI,OAAe;AACjB,SAAO,MAAA,MAAY;;;CAIrB,IAAI,OAAgB;AAClB,SAAO,MAAA;;;CAIT,QAAc;AACZ,QAAA,OAAa;AACb,QAAA,MAAY;;;CAId,KAAK,KAAoB;AACvB,QAAA,QAAc;AACd,QAAA,OAAa;AACb,QAAA,MAAY;;;CAId,MAAM,IAA6B;AACjC,QAAA,SAAe;;;CAIjB,SAAe;AACb,OAAK,OAAO;;;CAId,MAAM,KAAoB;AACxB,OAAK,KAAK,IAAI;;CAGhB,CAAC,OAAO,iBAAmC;AACzC,SAAO,KAAK,SAAS;;CAGvB,QAAc;EACZ,MAAM,UAAU,MAAA,QAAc,OAAO,EAAE;AACvC,OAAK,MAAM,KAAK,QAAS,IAAG;;;;;;;;;;AAWhC,SAAgB,gBACd,OACiC;AACjC,QAAO,cAAc,WAAW,MAAM"}