{"version":3,"file":"http.cjs","names":["AsyncQueue","isProtocolResponse","BytesLineDecoder","SSEDecoder","IterableReadableStream","isRecord","toAbsoluteUrl","mergeHeaders","toError"],"sources":["../../../../src/client/stream/transport/http.ts"],"sourcesContent":["import { AsyncQueue } from \"./queue.js\";\nimport type {\n  Message,\n  SubscribeParams,\n  Command,\n  CommandResponse,\n  ErrorResponse,\n} from \"@langchain/protocol\";\n\nimport type {\n  HeaderValue,\n  ProtocolRequestHook,\n  ProtocolSseTransportOptions,\n} from \"./types.js\";\nimport type { TransportAdapter, EventStreamHandle } from \"../transport.js\";\nimport {\n  toAbsoluteUrl,\n  isRecord,\n  mergeHeaders,\n  toError,\n  isProtocolResponse,\n} from \"./utils.js\";\nimport { BytesLineDecoder, SSEDecoder } from \"./decoder.js\";\nimport { IterableReadableStream } from \"./stream.js\";\n\n/**\n * Transport adapter that speaks the thread-centric protocol over HTTP\n * commands plus SSE event streams. Bound to a specific `threadId`\n * at construction. Each {@link openEventStream} call opens an independent\n * filtered SSE connection via `POST /threads/:thread_id/stream/events`.\n */\nexport class ProtocolSseTransportAdapter implements TransportAdapter {\n  readonly threadId: string;\n\n  private readonly queue = new AsyncQueue<Message>();\n\n  private readonly fetchImpl: typeof fetch;\n\n  private readonly apiUrl: string;\n\n  private readonly defaultHeaders: Record<string, HeaderValue>;\n\n  private readonly onRequest?: ProtocolRequestHook;\n\n  private readonly fetchFactory?: () => typeof fetch | Promise<typeof fetch>;\n\n  private readonly commandsUrl: string;\n\n  private readonly streamUrl: string;\n\n  private readonly sessionAbortController = new AbortController();\n\n  private readonly eventStreams = new Set<AbortController>();\n\n  private closed = false;\n\n  constructor(options: ProtocolSseTransportOptions) {\n    this.fetchImpl = options.fetch ?? fetch;\n    this.apiUrl = options.apiUrl;\n    this.defaultHeaders = options.defaultHeaders ?? {};\n    this.onRequest = options.onRequest;\n    this.fetchFactory = options.fetchFactory;\n    this.threadId = options.threadId;\n    this.commandsUrl =\n      options.paths?.commands ?? `/threads/${this.threadId}/commands`;\n    this.streamUrl =\n      options.paths?.stream ?? `/threads/${this.threadId}/stream/events`;\n  }\n\n  private async resolveFetch(): Promise<typeof fetch> {\n    if (this.fetchFactory) {\n      return await this.fetchFactory();\n    }\n    return this.fetchImpl;\n  }\n\n  /**\n   * HTTP/SSE transports have no handshake — connections are made\n   * per-command and per-subscription.\n   */\n  async open(): Promise<void> {\n    // no-op\n  }\n\n  async send(\n    command: Command\n  ): Promise<CommandResponse | ErrorResponse | void> {\n    const response = await this.request(this.commandsUrl, {\n      method: \"POST\",\n      headers: { \"content-type\": \"application/json\" },\n      body: JSON.stringify(command),\n      signal: this.sessionAbortController.signal,\n    });\n\n    if (response.status === 202 || response.status === 204) {\n      return undefined;\n    }\n\n    const payload = (await response.json()) as unknown;\n    if (!isProtocolResponse(payload)) {\n      throw new Error(\"Protocol command did not return a valid response.\");\n    }\n    return payload;\n  }\n\n  /**\n   * WebSocket-style single event stream.\n   * For the SSE transport this returns a dummy iterable; real event\n   * delivery happens via {@link openEventStream}.\n   */\n  events(): AsyncIterable<Message> {\n    const queue = this.queue;\n    return {\n      [Symbol.asyncIterator]: () => ({\n        next: async () => await queue.shift(),\n        return: async () => {\n          queue.close();\n          return { done: true, value: undefined };\n        },\n      }),\n    };\n  }\n\n  openEventStream(params: SubscribeParams): EventStreamHandle {\n    if (this.closed) {\n      throw new Error(\"Protocol transport is closed.\");\n    }\n\n    const ac = new AbortController();\n    this.eventStreams.add(ac);\n    const streamQueue = new AsyncQueue<Message>();\n    const streamUrl = this.streamUrl;\n\n    let resolveReady!: () => void;\n    let rejectReady!: (err: unknown) => void;\n    const ready = new Promise<void>((resolve, reject) => {\n      resolveReady = resolve;\n      rejectReady = reject;\n    });\n\n    const since = (params as SubscribeParams & { since?: unknown }).since;\n\n    const startStream = async () => {\n      try {\n        const response = await this.request(streamUrl, {\n          method: \"POST\",\n          headers: {\n            \"content-type\": \"application/json\",\n            accept: \"text/event-stream\",\n          },\n          body: JSON.stringify({\n            channels: params.channels,\n            ...(params.namespaces ? { namespaces: params.namespaces } : {}),\n            ...(params.depth != null ? { depth: params.depth } : {}),\n            ...(typeof since === \"number\" ? { since } : {}),\n          }),\n          signal: ac.signal,\n        });\n\n        resolveReady();\n\n        const readable =\n          response.body ??\n          new ReadableStream<Uint8Array>({\n            start(controller) {\n              controller.close();\n            },\n          });\n\n        const stream = readable\n          .pipeThrough(BytesLineDecoder())\n          .pipeThrough(SSEDecoder());\n        const iterable = IterableReadableStream.fromReadableStream(stream);\n\n        for await (const event of iterable) {\n          if (ac.signal.aborted || this.closed) {\n            break;\n          }\n          if (isRecord(event.data)) {\n            const msg = event.data as Message & {\n              seq?: number;\n              method?: string;\n            };\n            streamQueue.push(msg);\n          }\n        }\n        streamQueue.close();\n      } catch (error) {\n        rejectReady(error);\n        if (ac.signal.aborted || this.closed) {\n          streamQueue.close();\n          return;\n        }\n        streamQueue.close(error);\n      }\n    };\n\n    void startStream();\n\n    const cleanup = () => {\n      this.eventStreams.delete(ac);\n      ac.abort();\n      streamQueue.close();\n    };\n\n    return {\n      events: {\n        [Symbol.asyncIterator]: () => ({\n          next: async () => await streamQueue.shift(),\n          return: async () => {\n            cleanup();\n            return { done: true, value: undefined };\n          },\n        }),\n      },\n      ready,\n      close: cleanup,\n    };\n  }\n\n  async close(): Promise<void> {\n    if (this.closed) {\n      return;\n    }\n    this.closed = true;\n    this.sessionAbortController.abort();\n    for (const ac of this.eventStreams) ac.abort();\n    this.eventStreams.clear();\n    this.queue.close();\n  }\n\n  private async request(path: string, init: RequestInit): Promise<Response> {\n    const url = toAbsoluteUrl(this.apiUrl, path);\n    let requestInit: RequestInit = {\n      ...init,\n      headers: mergeHeaders(this.defaultHeaders, init.headers),\n    };\n\n    if (this.onRequest) {\n      requestInit = await this.onRequest(url, requestInit);\n    }\n\n    try {\n      const fetchImpl = await this.resolveFetch();\n      const response = await fetchImpl(url.toString(), requestInit);\n      if (!response.ok) {\n        let detail = \"\";\n        try {\n          const body = await response.text();\n          const parsed = JSON.parse(body);\n          if (typeof parsed === \"object\" && parsed != null) {\n            detail =\n              ((parsed as Record<string, unknown>).message as string) ??\n              ((parsed as Record<string, unknown>).error as string) ??\n              \"\";\n          }\n          if (!detail) detail = body;\n        } catch {\n          // body unreadable or not JSON — fall through\n        }\n        const message = detail\n          ? `Protocol request failed: ${response.status} ${response.statusText} — ${detail}`\n          : `Protocol request failed: ${response.status} ${response.statusText}`;\n        throw new Error(message);\n      }\n      return response;\n    } catch (error) {\n      throw toError(error);\n    }\n  }\n}\n"],"mappings":";;;;;;;;;;;AA+BA,IAAa,8BAAb,MAAqE;CACnE;CAEA,QAAyB,IAAIA,cAAAA,YAAqB;CAElD;CAEA;CAEA;CAEA;CAEA;CAEA;CAEA;CAEA,yBAA0C,IAAI,iBAAiB;CAE/D,+BAAgC,IAAI,KAAsB;CAE1D,SAAiB;CAEjB,YAAY,SAAsC;AAChD,OAAK,YAAY,QAAQ,SAAS;AAClC,OAAK,SAAS,QAAQ;AACtB,OAAK,iBAAiB,QAAQ,kBAAkB,EAAE;AAClD,OAAK,YAAY,QAAQ;AACzB,OAAK,eAAe,QAAQ;AAC5B,OAAK,WAAW,QAAQ;AACxB,OAAK,cACH,QAAQ,OAAO,YAAY,YAAY,KAAK,SAAS;AACvD,OAAK,YACH,QAAQ,OAAO,UAAU,YAAY,KAAK,SAAS;;CAGvD,MAAc,eAAsC;AAClD,MAAI,KAAK,aACP,QAAO,MAAM,KAAK,cAAc;AAElC,SAAO,KAAK;;;;;;CAOd,MAAM,OAAsB;CAI5B,MAAM,KACJ,SACiD;EACjD,MAAM,WAAW,MAAM,KAAK,QAAQ,KAAK,aAAa;GACpD,QAAQ;GACR,SAAS,EAAE,gBAAgB,oBAAoB;GAC/C,MAAM,KAAK,UAAU,QAAQ;GAC7B,QAAQ,KAAK,uBAAuB;GACrC,CAAC;AAEF,MAAI,SAAS,WAAW,OAAO,SAAS,WAAW,IACjD;EAGF,MAAM,UAAW,MAAM,SAAS,MAAM;AACtC,MAAI,CAACC,cAAAA,mBAAmB,QAAQ,CAC9B,OAAM,IAAI,MAAM,oDAAoD;AAEtE,SAAO;;;;;;;CAQT,SAAiC;EAC/B,MAAM,QAAQ,KAAK;AACnB,SAAO,GACJ,OAAO,uBAAuB;GAC7B,MAAM,YAAY,MAAM,MAAM,OAAO;GACrC,QAAQ,YAAY;AAClB,UAAM,OAAO;AACb,WAAO;KAAE,MAAM;KAAM,OAAO,KAAA;KAAW;;GAE1C,GACF;;CAGH,gBAAgB,QAA4C;AAC1D,MAAI,KAAK,OACP,OAAM,IAAI,MAAM,gCAAgC;EAGlD,MAAM,KAAK,IAAI,iBAAiB;AAChC,OAAK,aAAa,IAAI,GAAG;EACzB,MAAM,cAAc,IAAID,cAAAA,YAAqB;EAC7C,MAAM,YAAY,KAAK;EAEvB,IAAI;EACJ,IAAI;EACJ,MAAM,QAAQ,IAAI,SAAe,SAAS,WAAW;AACnD,kBAAe;AACf,iBAAc;IACd;EAEF,MAAM,QAAS,OAAiD;EAEhE,MAAM,cAAc,YAAY;AAC9B,OAAI;IACF,MAAM,WAAW,MAAM,KAAK,QAAQ,WAAW;KAC7C,QAAQ;KACR,SAAS;MACP,gBAAgB;MAChB,QAAQ;MACT;KACD,MAAM,KAAK,UAAU;MACnB,UAAU,OAAO;MACjB,GAAI,OAAO,aAAa,EAAE,YAAY,OAAO,YAAY,GAAG,EAAE;MAC9D,GAAI,OAAO,SAAS,OAAO,EAAE,OAAO,OAAO,OAAO,GAAG,EAAE;MACvD,GAAI,OAAO,UAAU,WAAW,EAAE,OAAO,GAAG,EAAE;MAC/C,CAAC;KACF,QAAQ,GAAG;KACZ,CAAC;AAEF,kBAAc;IAUd,MAAM,UAPJ,SAAS,QACT,IAAI,eAA2B,EAC7B,MAAM,YAAY;AAChB,gBAAW,OAAO;OAErB,CAAC,EAGD,YAAYE,gBAAAA,kBAAkB,CAAC,CAC/B,YAAYC,gBAAAA,YAAY,CAAC;IAC5B,MAAM,WAAWC,eAAAA,uBAAuB,mBAAmB,OAAO;AAElE,eAAW,MAAM,SAAS,UAAU;AAClC,SAAI,GAAG,OAAO,WAAW,KAAK,OAC5B;AAEF,SAAIC,cAAAA,SAAS,MAAM,KAAK,EAAE;MACxB,MAAM,MAAM,MAAM;AAIlB,kBAAY,KAAK,IAAI;;;AAGzB,gBAAY,OAAO;YACZ,OAAO;AACd,gBAAY,MAAM;AAClB,QAAI,GAAG,OAAO,WAAW,KAAK,QAAQ;AACpC,iBAAY,OAAO;AACnB;;AAEF,gBAAY,MAAM,MAAM;;;AAIvB,eAAa;EAElB,MAAM,gBAAgB;AACpB,QAAK,aAAa,OAAO,GAAG;AAC5B,MAAG,OAAO;AACV,eAAY,OAAO;;AAGrB,SAAO;GACL,QAAQ,GACL,OAAO,uBAAuB;IAC7B,MAAM,YAAY,MAAM,YAAY,OAAO;IAC3C,QAAQ,YAAY;AAClB,cAAS;AACT,YAAO;MAAE,MAAM;MAAM,OAAO,KAAA;MAAW;;IAE1C,GACF;GACD;GACA,OAAO;GACR;;CAGH,MAAM,QAAuB;AAC3B,MAAI,KAAK,OACP;AAEF,OAAK,SAAS;AACd,OAAK,uBAAuB,OAAO;AACnC,OAAK,MAAM,MAAM,KAAK,aAAc,IAAG,OAAO;AAC9C,OAAK,aAAa,OAAO;AACzB,OAAK,MAAM,OAAO;;CAGpB,MAAc,QAAQ,MAAc,MAAsC;EACxE,MAAM,MAAMC,cAAAA,cAAc,KAAK,QAAQ,KAAK;EAC5C,IAAI,cAA2B;GAC7B,GAAG;GACH,SAASC,cAAAA,aAAa,KAAK,gBAAgB,KAAK,QAAQ;GACzD;AAED,MAAI,KAAK,UACP,eAAc,MAAM,KAAK,UAAU,KAAK,YAAY;AAGtD,MAAI;GAEF,MAAM,WAAW,OADC,MAAM,KAAK,cAAc,EACV,IAAI,UAAU,EAAE,YAAY;AAC7D,OAAI,CAAC,SAAS,IAAI;IAChB,IAAI,SAAS;AACb,QAAI;KACF,MAAM,OAAO,MAAM,SAAS,MAAM;KAClC,MAAM,SAAS,KAAK,MAAM,KAAK;AAC/B,SAAI,OAAO,WAAW,YAAY,UAAU,KAC1C,UACI,OAAmC,WACnC,OAAmC,SACrC;AAEJ,SAAI,CAAC,OAAQ,UAAS;YAChB;IAGR,MAAM,UAAU,SACZ,4BAA4B,SAAS,OAAO,GAAG,SAAS,WAAW,KAAK,WACxE,4BAA4B,SAAS,OAAO,GAAG,SAAS;AAC5D,UAAM,IAAI,MAAM,QAAQ;;AAE1B,UAAO;WACA,OAAO;AACd,SAAMC,cAAAA,QAAQ,MAAM"}