{"version":3,"file":"stream.cjs","names":["IterableReadableStream","BaseCallbackHandler"],"sources":["../../src/pregel/stream.ts"],"sourcesContent":["import { IterableReadableStream } from \"@langchain/core/utils/stream\";\nimport type { RunnableConfig } from \"@langchain/core/runnables\";\nimport { BaseCallbackHandler } from \"@langchain/core/callbacks/base\";\nimport { Serialized } from \"@langchain/core/load/serializable\";\nimport type { StreamMode, StreamOutputMap } from \"./types.js\";\nimport { TAG_HIDDEN } from \"../constants.js\";\n\n// [namespace, streamMode, payload]\nexport type StreamChunk = [string[], StreamMode, unknown];\n\ntype StreamCheckpointsOutput<StreamValues> = StreamOutputMap<\n  \"checkpoints\",\n  false,\n  StreamValues,\n  unknown,\n  string,\n  unknown,\n  unknown,\n  undefined\n>;\n\ntype AnyStreamOutput = StreamOutputMap<\n  StreamMode[],\n  true,\n  unknown,\n  unknown,\n  string,\n  unknown,\n  unknown,\n  undefined\n>;\n\ntype ToolRunInfo = {\n  ns: string[];\n  toolCallId?: string;\n  toolName: string;\n  input: unknown;\n};\n\n/**\n * A wrapper around an IterableReadableStream that allows for aborting the stream when\n * {@link cancel} is called.\n */\nexport class IterableReadableStreamWithAbortSignal<\n  T,\n> extends IterableReadableStream<T> {\n  protected _abortController: AbortController;\n\n  protected _innerReader: ReadableStreamDefaultReader<T>;\n\n  /**\n   * @param readableStream - The stream to wrap.\n   * @param abortController - The abort controller to use. Optional. One will be created if not provided.\n   */\n  constructor(\n    readableStream: ReadableStream<T>,\n    abortController?: AbortController\n  ) {\n    const reader = readableStream.getReader();\n    const ac = abortController ?? new AbortController();\n    super({\n      start(controller: ReadableStreamDefaultController<T>) {\n        return pump();\n        function pump(): Promise<T | undefined> {\n          return reader.read().then(({ done, value }) => {\n            // When no more data needs to be consumed, close the stream\n            if (done) {\n              controller.close();\n              return;\n            }\n            // Enqueue the next data chunk into our target stream\n            controller.enqueue(value);\n            return pump();\n          });\n        }\n      },\n    });\n    this._abortController = ac;\n    this._innerReader = reader;\n  }\n\n  /**\n   * Aborts the stream, abandoning any pending operations in progress. Calling this triggers an\n   * {@link AbortSignal} that is propagated to the tasks that are producing the data for this stream.\n   * @param reason - The reason for aborting the stream. Optional.\n   */\n  override async cancel(reason?: unknown) {\n    this._abortController.abort(reason);\n    this._innerReader.releaseLock();\n  }\n\n  /**\n   * The {@link AbortSignal} for the stream. Aborted when {@link cancel} is called.\n   */\n  get signal() {\n    return this._abortController.signal;\n  }\n}\n\nexport class IterableReadableWritableStream extends IterableReadableStream<StreamChunk> {\n  modes: Set<StreamMode>;\n\n  private controller: ReadableStreamDefaultController;\n\n  private passthroughFn?: (chunk: StreamChunk) => void;\n\n  private _closed: boolean = false;\n\n  get closed() {\n    return this._closed;\n  }\n\n  constructor(params: {\n    passthroughFn?: (chunk: StreamChunk) => void;\n    modes: Set<StreamMode>;\n  }) {\n    let streamControllerPromiseResolver: (\n      controller: ReadableStreamDefaultController\n    ) => void;\n    const streamControllerPromise: Promise<ReadableStreamDefaultController> =\n      new Promise<ReadableStreamDefaultController>((resolve) => {\n        streamControllerPromiseResolver = resolve;\n      });\n\n    super({\n      start: (controller) => {\n        streamControllerPromiseResolver!(controller);\n      },\n    });\n\n    // .start() will always be called before the stream can be interacted\n    // with anyway\n    void streamControllerPromise.then((controller) => {\n      this.controller = controller;\n    });\n\n    this.passthroughFn = params.passthroughFn;\n    this.modes = params.modes;\n  }\n\n  push(chunk: StreamChunk) {\n    this.passthroughFn?.(chunk);\n    this.controller.enqueue(chunk);\n  }\n\n  close() {\n    try {\n      this.controller.close();\n    } catch {\n      // pass\n    } finally {\n      this._closed = true;\n    }\n  }\n\n  // eslint-disable-next-line @typescript-eslint/no-explicit-any\n  error(e: any) {\n    this.controller.error(e);\n  }\n}\n\n/**\n * A callback handler that implements stream_mode=tools.\n * Emits on_tool_start, on_tool_event, on_tool_end, on_tool_error events.\n */\nexport class StreamToolsHandler extends BaseCallbackHandler {\n  name = \"StreamToolsHandler\";\n\n  streamFn: (streamChunk: StreamChunk) => void;\n\n  runs: Record<string, ToolRunInfo | undefined> = {};\n\n  constructor(streamFn: (streamChunk: StreamChunk) => void) {\n    super();\n    this.streamFn = streamFn;\n  }\n\n  handleToolStart(\n    _tool: Serialized,\n    input: string,\n    runId: string,\n    _parentRunId?: string,\n    tags?: string[],\n    metadata?: Record<string, unknown>,\n    runName?: string,\n    toolCallId?: string\n  ) {\n    if (!metadata || (tags && tags.includes(TAG_HIDDEN))) return;\n\n    const ns = (metadata.langgraph_checkpoint_ns as string)?.split(\"|\") ?? [];\n    const info: ToolRunInfo = {\n      ns,\n      toolCallId,\n      toolName: runName ?? \"unknown\",\n      input,\n    };\n    this.runs[runId] = info;\n\n    this.streamFn([\n      ns,\n      \"tools\",\n      {\n        event: \"on_tool_start\",\n        toolCallId: info.toolCallId,\n        name: info.toolName,\n        input,\n      },\n    ]);\n  }\n\n  handleToolEvent(chunk: unknown, runId: string) {\n    const info = this.runs[runId];\n    if (!info) return;\n\n    this.streamFn([\n      info.ns,\n      \"tools\",\n      {\n        event: \"on_tool_event\",\n        toolCallId: info.toolCallId,\n        name: info.toolName,\n        data: chunk,\n      },\n    ]);\n  }\n\n  handleToolEnd(output: unknown, runId: string) {\n    const info = this.runs[runId];\n    delete this.runs[runId];\n    if (!info) return;\n\n    this.streamFn([\n      info.ns,\n      \"tools\",\n      {\n        event: \"on_tool_end\",\n        toolCallId: info.toolCallId,\n        name: info.toolName,\n        output,\n      },\n    ]);\n  }\n\n  handleToolError(err: unknown, runId: string) {\n    const info = this.runs[runId];\n    delete this.runs[runId];\n    if (!info) return;\n\n    this.streamFn([\n      info.ns,\n      \"tools\",\n      {\n        event: \"on_tool_error\",\n        toolCallId: info.toolCallId,\n        name: info.toolName,\n        error: err,\n      },\n    ]);\n  }\n}\n\nfunction _stringifyAsDict(obj: unknown) {\n  return JSON.stringify(obj, function (key: string | number, value: unknown) {\n    const rawValue = this[key];\n    if (\n      rawValue != null &&\n      typeof rawValue === \"object\" &&\n      \"toDict\" in rawValue &&\n      typeof rawValue.toDict === \"function\"\n    ) {\n      const { type, data } = rawValue.toDict();\n      return { ...data, type };\n    }\n\n    return value;\n  });\n}\n\nfunction _serializeError(error: unknown) {\n  // eslint-disable-next-line no-instanceof/no-instanceof\n  if (error instanceof Error) {\n    return { error: error.name, message: error.message };\n  }\n  return { error: \"Error\", message: JSON.stringify(error) };\n}\n\nfunction _isRunnableConfig(\n  config: unknown\n): config is RunnableConfig & { configurable: Record<string, unknown> } {\n  if (typeof config !== \"object\" || config == null) return false;\n  return (\n    \"configurable\" in config &&\n    typeof config.configurable === \"object\" &&\n    config.configurable != null\n  );\n}\n\nfunction _extractCheckpointFromConfig(\n  config: RunnableConfig | null | undefined\n) {\n  if (!_isRunnableConfig(config) || !config.configurable.thread_id) {\n    return null;\n  }\n\n  return {\n    thread_id: config.configurable.thread_id,\n    checkpoint_ns: config.configurable.checkpoint_ns || \"\",\n    checkpoint_id: config.configurable.checkpoint_id || null,\n    checkpoint_map: config.configurable.checkpoint_map || null,\n  };\n}\n\nfunction _serializeConfig(config: unknown) {\n  if (_isRunnableConfig(config)) {\n    const configurable = Object.fromEntries(\n      Object.entries(config.configurable).filter(\n        ([key]) => !key.startsWith(\"__\")\n      )\n    );\n\n    const newConfig = { ...config, configurable };\n    delete newConfig.callbacks;\n    return newConfig;\n  }\n\n  return config;\n}\n\nfunction _serializeCheckpoint(payload: StreamCheckpointsOutput<unknown>) {\n  const result: Record<string, unknown> = {\n    ...payload,\n    checkpoint: _extractCheckpointFromConfig(payload.config),\n    parent_checkpoint: _extractCheckpointFromConfig(payload.parentConfig),\n\n    config: _serializeConfig(payload.config),\n    parent_config: _serializeConfig(payload.parentConfig),\n\n    tasks: payload.tasks.map((task) => {\n      if (_isRunnableConfig(task.state)) {\n        const checkpoint = _extractCheckpointFromConfig(task.state);\n        if (checkpoint != null) {\n          const cloneTask: Record<string, unknown> = { ...task, checkpoint };\n          delete cloneTask.state;\n          return cloneTask;\n        }\n      }\n\n      return task;\n    }),\n  };\n\n  delete result.parentConfig;\n  return result;\n}\n\nexport function toEventStream(stream: AsyncGenerator) {\n  const encoder = new TextEncoder();\n  return new ReadableStream<Uint8Array>({\n    async start(controller) {\n      const enqueueChunk = (sse: {\n        id?: string;\n        event: string;\n        data: unknown;\n      }) => {\n        controller.enqueue(\n          encoder.encode(\n            `event: ${sse.event}\\ndata: ${_stringifyAsDict(sse.data)}\\n\\n`\n          )\n        );\n      };\n\n      try {\n        for await (const payload of stream) {\n          const [ns, mode, chunk] = payload as AnyStreamOutput;\n\n          let data: unknown = chunk;\n          if (mode === \"debug\") {\n            const debugChunk = chunk;\n\n            if (debugChunk.type === \"checkpoint\") {\n              data = {\n                ...debugChunk,\n                payload: _serializeCheckpoint(debugChunk.payload),\n              };\n            }\n          }\n\n          if (mode === \"checkpoints\") {\n            data = _serializeCheckpoint(chunk);\n          }\n\n          const event = ns?.length ? `${mode}|${ns.join(\"|\")}` : mode;\n          enqueueChunk({ event, data });\n        }\n      } catch (error) {\n        enqueueChunk({ event: \"error\", data: _serializeError(error) });\n      }\n\n      controller.close();\n    },\n  });\n}\n"],"mappings":";;;;;;;;AA2CA,IAAa,wCAAb,cAEUA,6BAAAA,uBAA0B;CAClC;CAEA;;;;;CAMA,YACE,gBACA,iBACA;EACA,MAAM,SAAS,eAAe,WAAW;EACzC,MAAM,KAAK,mBAAmB,IAAI,iBAAiB;AACnD,QAAM,EACJ,MAAM,YAAgD;AACpD,UAAO,MAAM;GACb,SAAS,OAA+B;AACtC,WAAO,OAAO,MAAM,CAAC,MAAM,EAAE,MAAM,YAAY;AAE7C,SAAI,MAAM;AACR,iBAAW,OAAO;AAClB;;AAGF,gBAAW,QAAQ,MAAM;AACzB,YAAO,MAAM;MACb;;KAGP,CAAC;AACF,OAAK,mBAAmB;AACxB,OAAK,eAAe;;;;;;;CAQtB,MAAe,OAAO,QAAkB;AACtC,OAAK,iBAAiB,MAAM,OAAO;AACnC,OAAK,aAAa,aAAa;;;;;CAMjC,IAAI,SAAS;AACX,SAAO,KAAK,iBAAiB;;;AAIjC,IAAa,iCAAb,cAAoDA,6BAAAA,uBAAoC;CACtF;CAEA;CAEA;CAEA,UAA2B;CAE3B,IAAI,SAAS;AACX,SAAO,KAAK;;CAGd,YAAY,QAGT;EACD,IAAI;EAGJ,MAAM,0BACJ,IAAI,SAA0C,YAAY;AACxD,qCAAkC;IAClC;AAEJ,QAAM,EACJ,QAAQ,eAAe;AACrB,mCAAiC,WAAW;KAE/C,CAAC;AAIG,0BAAwB,MAAM,eAAe;AAChD,QAAK,aAAa;IAClB;AAEF,OAAK,gBAAgB,OAAO;AAC5B,OAAK,QAAQ,OAAO;;CAGtB,KAAK,OAAoB;AACvB,OAAK,gBAAgB,MAAM;AAC3B,OAAK,WAAW,QAAQ,MAAM;;CAGhC,QAAQ;AACN,MAAI;AACF,QAAK,WAAW,OAAO;UACjB,WAEE;AACR,QAAK,UAAU;;;CAKnB,MAAM,GAAQ;AACZ,OAAK,WAAW,MAAM,EAAE;;;;;;;AAQ5B,IAAa,qBAAb,cAAwCC,+BAAAA,oBAAoB;CAC1D,OAAO;CAEP;CAEA,OAAgD,EAAE;CAElD,YAAY,UAA8C;AACxD,SAAO;AACP,OAAK,WAAW;;CAGlB,gBACE,OACA,OACA,OACA,cACA,MACA,UACA,SACA,YACA;AACA,MAAI,CAAC,YAAa,QAAQ,KAAK,SAAA,mBAAoB,CAAG;EAEtD,MAAM,KAAM,SAAS,yBAAoC,MAAM,IAAI,IAAI,EAAE;EACzE,MAAM,OAAoB;GACxB;GACA;GACA,UAAU,WAAW;GACrB;GACD;AACD,OAAK,KAAK,SAAS;AAEnB,OAAK,SAAS;GACZ;GACA;GACA;IACE,OAAO;IACP,YAAY,KAAK;IACjB,MAAM,KAAK;IACX;IACD;GACF,CAAC;;CAGJ,gBAAgB,OAAgB,OAAe;EAC7C,MAAM,OAAO,KAAK,KAAK;AACvB,MAAI,CAAC,KAAM;AAEX,OAAK,SAAS;GACZ,KAAK;GACL;GACA;IACE,OAAO;IACP,YAAY,KAAK;IACjB,MAAM,KAAK;IACX,MAAM;IACP;GACF,CAAC;;CAGJ,cAAc,QAAiB,OAAe;EAC5C,MAAM,OAAO,KAAK,KAAK;AACvB,SAAO,KAAK,KAAK;AACjB,MAAI,CAAC,KAAM;AAEX,OAAK,SAAS;GACZ,KAAK;GACL;GACA;IACE,OAAO;IACP,YAAY,KAAK;IACjB,MAAM,KAAK;IACX;IACD;GACF,CAAC;;CAGJ,gBAAgB,KAAc,OAAe;EAC3C,MAAM,OAAO,KAAK,KAAK;AACvB,SAAO,KAAK,KAAK;AACjB,MAAI,CAAC,KAAM;AAEX,OAAK,SAAS;GACZ,KAAK;GACL;GACA;IACE,OAAO;IACP,YAAY,KAAK;IACjB,MAAM,KAAK;IACX,OAAO;IACR;GACF,CAAC;;;AAIN,SAAS,iBAAiB,KAAc;AACtC,QAAO,KAAK,UAAU,KAAK,SAAU,KAAsB,OAAgB;EACzE,MAAM,WAAW,KAAK;AACtB,MACE,YAAY,QACZ,OAAO,aAAa,YACpB,YAAY,YACZ,OAAO,SAAS,WAAW,YAC3B;GACA,MAAM,EAAE,MAAM,SAAS,SAAS,QAAQ;AACxC,UAAO;IAAE,GAAG;IAAM;IAAM;;AAG1B,SAAO;GACP;;AAGJ,SAAS,gBAAgB,OAAgB;AAEvC,KAAI,iBAAiB,MACnB,QAAO;EAAE,OAAO,MAAM;EAAM,SAAS,MAAM;EAAS;AAEtD,QAAO;EAAE,OAAO;EAAS,SAAS,KAAK,UAAU,MAAM;EAAE;;AAG3D,SAAS,kBACP,QACsE;AACtE,KAAI,OAAO,WAAW,YAAY,UAAU,KAAM,QAAO;AACzD,QACE,kBAAkB,UAClB,OAAO,OAAO,iBAAiB,YAC/B,OAAO,gBAAgB;;AAI3B,SAAS,6BACP,QACA;AACA,KAAI,CAAC,kBAAkB,OAAO,IAAI,CAAC,OAAO,aAAa,UACrD,QAAO;AAGT,QAAO;EACL,WAAW,OAAO,aAAa;EAC/B,eAAe,OAAO,aAAa,iBAAiB;EACpD,eAAe,OAAO,aAAa,iBAAiB;EACpD,gBAAgB,OAAO,aAAa,kBAAkB;EACvD;;AAGH,SAAS,iBAAiB,QAAiB;AACzC,KAAI,kBAAkB,OAAO,EAAE;EAC7B,MAAM,eAAe,OAAO,YAC1B,OAAO,QAAQ,OAAO,aAAa,CAAC,QACjC,CAAC,SAAS,CAAC,IAAI,WAAW,KAAK,CACjC,CACF;EAED,MAAM,YAAY;GAAE,GAAG;GAAQ;GAAc;AAC7C,SAAO,UAAU;AACjB,SAAO;;AAGT,QAAO;;AAGT,SAAS,qBAAqB,SAA2C;CACvE,MAAM,SAAkC;EACtC,GAAG;EACH,YAAY,6BAA6B,QAAQ,OAAO;EACxD,mBAAmB,6BAA6B,QAAQ,aAAa;EAErE,QAAQ,iBAAiB,QAAQ,OAAO;EACxC,eAAe,iBAAiB,QAAQ,aAAa;EAErD,OAAO,QAAQ,MAAM,KAAK,SAAS;AACjC,OAAI,kBAAkB,KAAK,MAAM,EAAE;IACjC,MAAM,aAAa,6BAA6B,KAAK,MAAM;AAC3D,QAAI,cAAc,MAAM;KACtB,MAAM,YAAqC;MAAE,GAAG;MAAM;MAAY;AAClE,YAAO,UAAU;AACjB,YAAO;;;AAIX,UAAO;IACP;EACH;AAED,QAAO,OAAO;AACd,QAAO;;AAGT,SAAgB,cAAc,QAAwB;CACpD,MAAM,UAAU,IAAI,aAAa;AACjC,QAAO,IAAI,eAA2B,EACpC,MAAM,MAAM,YAAY;EACtB,MAAM,gBAAgB,QAIhB;AACJ,cAAW,QACT,QAAQ,OACN,UAAU,IAAI,MAAM,UAAU,iBAAiB,IAAI,KAAK,CAAC,MAC1D,CACF;;AAGH,MAAI;AACF,cAAW,MAAM,WAAW,QAAQ;IAClC,MAAM,CAAC,IAAI,MAAM,SAAS;IAE1B,IAAI,OAAgB;AACpB,QAAI,SAAS,SAAS;KACpB,MAAM,aAAa;AAEnB,SAAI,WAAW,SAAS,aACtB,QAAO;MACL,GAAG;MACH,SAAS,qBAAqB,WAAW,QAAQ;MAClD;;AAIL,QAAI,SAAS,cACX,QAAO,qBAAqB,MAAM;AAIpC,iBAAa;KAAE,OADD,IAAI,SAAS,GAAG,KAAK,GAAG,GAAG,KAAK,IAAI,KAAK;KACjC;KAAM,CAAC;;WAExB,OAAO;AACd,gBAAa;IAAE,OAAO;IAAS,MAAM,gBAAgB,MAAM;IAAE,CAAC;;AAGhE,aAAW,OAAO;IAErB,CAAC"}